From 1c03cab457f1790fd75ebd2fa9960fe50f780a12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20W=C3=A4hner?= Date: Sun, 3 May 2026 20:07:12 +0200 Subject: [PATCH] refactor: simplify to see/interact/exec and split server modules --- README.md | 62 +- docs/API.md | 234 +++--- examples/quickstart.py | 27 +- server/app.py | 1773 ++-------------------------------------- server/config.py | 42 + server/models.py | 124 +++ server/services.py | 462 +++++++++++ skill/SKILL.md | 115 +-- 8 files changed, 911 insertions(+), 1928 deletions(-) create mode 100644 server/config.py create mode 100644 server/models.py create mode 100644 server/services.py diff --git a/README.md b/README.md index ae7ef4f..60d2482 100644 --- a/README.md +++ b/README.md @@ -1,49 +1,37 @@ # Clickthrough -Let an agent interact with a computer over HTTP. +Clickthrough is a lightweight HTTP control layer that lets an AI safely operate a real computer by repeatedly capturing structured screenshots with coordinate-aware grids (`see`), executing precise mouse/keyboard actions from those coordinates (`interact`), and optionally running authenticated shell commands for system-level tasks (`exec`) under a consistent response contract. -## Primary mode (v2) +## Core Methods -Use the v2 contract for faster, less OCR-heavy control loops: -- `POST /v2/observe` -- `POST /v2/localize` -- `POST /v2/act` -- `POST /v2/act-verify` +- `POST /see`: Capture a full screen or region, optionally with a click-ready grid overlay. +- `POST /see/zoom`: Capture a tighter crop around a point and draw a denser grid for precise targeting. +- `POST /interact`: Perform one mouse or keyboard action (`click`, `scroll`, `type`, `hotkey`, etc.). +- `POST /exec`: Run PowerShell/Bash/CMD commands when shell-level control is needed. -This is optimized for agents that cannot directly see the screen and must use screenshot/image tools. +## Why this works for AI agents -## What this provides +- Agents do not need live vision; they iterate on snapshots. +- Grid metadata bridges image understanding to deterministic click coordinates. +- Interaction stays explicit and auditable (one action per request). +- A unified response envelope (`ok`, `data`, `error`) reduces agent-side branching. -- Screen/region capture with optional OCR and timing stats -- Observation IDs for deterministic follow-up localization -- Text localization and image-tool coordinate localization -- Action execution with resolved target IDs -- Risk-aware action+verification defaults -- Unified response envelope across all endpoints +## Minimal Agent Loop -## Quick start +1. Call `see` with a coarse grid. +2. If uncertain, call `see/zoom` with a denser grid. +3. Call `interact` once. +4. Call `see` again to verify state change. +5. Use `exec` only for explicit shell/system tasks. -```bash -cd /root/external-projects/clickthrough -python3 -m venv .venv -. .venv/bin/activate -pip install -r requirements.txt -CLICKTHROUGH_TOKEN=change-me python -m server.app -``` +## Safety and Auth -Server defaults to `127.0.0.1:8123`. +- `x-clickthrough-token` protects API access when enabled. +- `x-clickthrough-exec-secret` is required for `/exec`. +- Optional dry-run and allowed-region constraints reduce accidental risk. -## Fast control loop +## Docs -1. `POST /v2/observe` on a tight region -2. If OCR is enough, `POST /v2/localize` with `text_query` -3. If ambiguous, ask image tool for one x,y in observation bounds -4. `POST /v2/localize` with `image_tool_point` -5. `POST /v2/act` or `POST /v2/act-verify` -6. Re-observe only changed region - -## See docs - -- `docs/API.md` -- `skill/SKILL.md` -- `docs/coordinate-system.md` +- API: `docs/API.md` +- Agent procedure: `skill/SKILL.md` +- Coordinate system details: `docs/coordinate-system.md` diff --git a/docs/API.md b/docs/API.md index 54c7466..41b46d3 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1,116 +1,21 @@ -# API Reference (v2) +# API Reference Base URL: `http://127.0.0.1:8123` -If `CLICKTHROUGH_TOKEN` is set, include: +Auth header when enabled: ```http x-clickthrough-token: ``` -## Endpoints +This API is intended for AI computer control through 3 methods only: +- `see` +- `interact` +- `exec` -- `POST /v2/observe` -- `POST /v2/localize` -- `POST /v2/act` -- `POST /v2/act-verify` -- `GET /health` -- `GET /displays` -- `GET /windows` -- `POST /windows/action` -- `POST /launch` -- `POST /exec` +All responses use one envelope. -No v1 endpoints are supported. - -## `POST /v2/observe` - -```json -{ - "mode": "region", - "region_x": 800, - "region_y": 420, - "region_width": 700, - "region_height": 420, - "include_image": true, - "image_format": "jpeg", - "jpeg_quality": 75, - "ocr_mode": "region", - "language_hint": "eng", - "min_confidence": 0.45, - "max_ocr_area_px": 1500000, - "group_lines": true -} -``` - -Returns observation metadata, optional image, OCR blocks/lines, and timing fields. - -## `POST /v2/localize` - -Text localization: - -```json -{ - "observation_id": "...", - "text_query": "Save", - "text_match": "exact", - "candidate_index": 0 -} -``` - -Image-tool point localization: - -```json -{ - "observation_id": "...", - "image_tool_point": {"x": 312, "y": 188} -} -``` - -Returns `resolved_target_id`, global pixel, and `localization_confidence`. - -## `POST /v2/act` - -```json -{ - "action": { - "action": "click", - "target": {"resolved_target_id": "..."}, - "button": "left", - "clicks": 1 - } -} -``` - -## `POST /v2/act-verify` - -```json -{ - "action": { - "action": "click", - "target": {"resolved_target_id": "..."} - }, - "condition": { - "kind": "text", - "mode": "region", - "text": "Saved", - "match": "contains", - "present": true, - "region_x": 820, - "region_y": 420, - "region_width": 500, - "region_height": 140, - "min_confidence": 0.4 - }, - "risk_level": "low" -} -``` - -Risk defaults: -- `low`: retries `0`, timeout `2500ms` -- `high`: retries `1`, timeout `6000ms` - -## Response envelope +## Response Envelope Success: @@ -119,7 +24,7 @@ Success: "ok": true, "request_id": "...", "time_ms": 1710000000000, - "data": { }, + "data": {}, "error": null } ``` @@ -133,9 +38,124 @@ Error: "time_ms": 1710000000000, "data": null, "error": { - "code": "http_error", - "message": "...", - "details": {} + "code": "validation_error", + "message": "request validation failed", + "details": [] } } ``` + +## 1) See + +### `POST /see` +Capture a full screen or a region. Optional grid overlay returns coordinate metadata for click mapping. + +```json +{ + "screen": 0, + "region_x": null, + "region_y": null, + "region_width": null, + "region_height": null, + "with_grid": true, + "grid_rows": 12, + "grid_cols": 12, + "include_labels": true, + "image_format": "png", + "jpeg_quality": 85 +} +``` + +Returns: +- `data.image.base64` +- `data.meta.region` (global desktop coords) +- `data.meta.grid` (rows/cols/cell size + formula) + +### `POST /see/zoom` +Capture a tighter crop around a global point and draw another grid over that crop. + +```json +{ + "screen": 0, + "center_x": 1200, + "center_y": 720, + "width": 500, + "height": 350, + "with_grid": true, + "grid_rows": 20, + "grid_cols": 20, + "include_labels": true, + "image_format": "png", + "jpeg_quality": 90 +} +``` + +Use this for precision before clicking tiny controls. + +## 2) Interact + +### `POST /interact` +Mouse/keyboard action execution. + +```json +{ + "screen": 0, + "action": { + "action": "click", + "target": { + "mode": "grid", + "region_x": 0, + "region_y": 0, + "region_width": 1920, + "region_height": 1080, + "rows": 12, + "cols": 12, + "row": 7, + "col": 3, + "dx": 0.0, + "dy": 0.0 + }, + "button": "left", + "clicks": 1 + } +} +``` + +Supported actions: +- `move`, `click`, `right_click`, `double_click`, `middle_click` +- `scroll` (`scroll_amount`) +- `type` (`text`, `interval_ms`) +- `hotkey` (`keys`) + +Target modes: +- `pixel`: absolute global `x,y` +- `grid`: grid cell from a `see`/`see/zoom` response + +## 3) Exec + +### `POST /exec` +Run host shell commands (PowerShell/Bash/CMD). + +```json +{ + "command": "Get-Process | Select-Object -First 5", + "shell": "powershell", + "timeout_s": 20, + "cwd": "C:/Users/Paul", + "dry_run": false +} +``` + +Required header: + +```http +x-clickthrough-exec-secret: +``` + +## Minimal Procedure for Agents + +1. `see` full screen with coarse grid. +2. If uncertain, `see/zoom` target area with denser grid. +3. `interact` one action. +4. `see` again to confirm state change. +5. Use `exec` only when GUI interaction is not the right tool. diff --git a/examples/quickstart.py b/examples/quickstart.py index 3ad8ce2..a465fa2 100644 --- a/examples/quickstart.py +++ b/examples/quickstart.py @@ -15,24 +15,25 @@ if TOKEN: def main(): health = requests.get(f"{BASE_URL}/health", headers=headers, timeout=10) health.raise_for_status() - print("health ok:", health.json().get("ok")) + print("health:", health.json()["data"]) - observe = requests.post( - f"{BASE_URL}/v2/observe", + see = requests.post( + f"{BASE_URL}/see", headers=headers, - params={"screen": SCREEN}, json={ - "mode": "screen", - "include_image": False, - "ocr_mode": "none", + "screen": SCREEN, + "with_grid": True, + "grid_rows": 12, + "grid_cols": 12, + "image_format": "jpeg", + "jpeg_quality": 70, }, - timeout=20, + timeout=30, ) - observe.raise_for_status() - payload = observe.json()["data"] - print("observation_id:", payload["observation_id"]) - print("region:", payload["region"]) - print("timing_ms:", payload["timing_ms"]) + see.raise_for_status() + payload = see.json()["data"] + print("region:", payload["meta"]["region"]) + print("grid:", payload["meta"].get("grid", {})) if __name__ == "__main__": diff --git a/server/app.py b/server/app.py index bd43ed6..95d9f08 100644 --- a/server/app.py +++ b/server/app.py @@ -1,28 +1,39 @@ -import base64 -import ctypes import hmac -import io -import os -import re -import subprocess -import sys import time import uuid -from typing import Any, Literal, Optional +from typing import Any, Optional -from dotenv import load_dotenv from fastapi import Depends, FastAPI, Header, HTTPException, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse -from PIL import ImageChops, ImageStat -from pydantic import BaseModel, Field, model_validator +from .config import SETTINGS +from .models import ExecRequest, InteractRequest, LaunchRequest, SeeRequest, SeeZoomRequest, WindowActionRequest, WindowQuery +from .services import ( + apply_window_action, + capture_region_image, + capture_screen, + draw_grid, + encode_image, + exec_action, + exec_command as run_exec_command, + get_displays, + launch_app, + list_windows, +) -load_dotenv(dotenv_path=".env", override=False) app = FastAPI(title="clickthrough", version="0.1.0") +def _now_ms() -> int: + return int(time.time() * 1000) + + +def _request_id() -> str: + return str(uuid.uuid4()) + + def _ok(data: Any, status_code: int = 200): return JSONResponse( status_code=status_code, @@ -44,11 +55,7 @@ def _err(code: str, message: str, status_code: int, details: Any = None): "request_id": _request_id(), "time_ms": _now_ms(), "data": None, - "error": { - "code": code, - "message": message, - "details": details, - }, + "error": {"code": code, "message": message, "details": details}, }, ) @@ -62,420 +69,14 @@ async def _http_exception_handler(_: Request, exc: HTTPException): return _err("http_error", str(detail), exc.status_code) -@app.exception_handler(Exception) -async def _unhandled_exception_handler(_: Request, exc: Exception): - return _err("internal_error", "internal server error", 500, {"type": type(exc).__name__}) - - @app.exception_handler(RequestValidationError) async def _validation_exception_handler(_: Request, exc: RequestValidationError): return _err("validation_error", "request validation failed", 422, exc.errors()) -def _env_bool(name: str, default: bool) -> bool: - raw = os.getenv(name) - if raw is None: - return default - return raw.strip().lower() in {"1", "true", "yes", "on"} - - -def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]: - raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION") - if not raw: - return None - parts = [p.strip() for p in raw.split(",")] - if len(parts) != 4: - raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height") - x, y, w, h = (int(p) for p in parts) - if w <= 0 or h <= 0: - raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0") - return x, y, w, h - - -SETTINGS = { - "host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"), - "port": int(os.getenv("CLICKTHROUGH_PORT", "8123")), - "token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(), - "dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False), - "default_grid_rows": int(os.getenv("CLICKTHROUGH_GRID_ROWS", "12")), - "default_grid_cols": int(os.getenv("CLICKTHROUGH_GRID_COLS", "12")), - "allowed_region": _parse_allowed_region(), - "exec_enabled": _env_bool("CLICKTHROUGH_EXEC_ENABLED", True), - "exec_default_shell": os.getenv("CLICKTHROUGH_EXEC_DEFAULT_SHELL", "powershell").strip().lower(), - "exec_default_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_TIMEOUT_S", "30")), - "exec_max_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_MAX_TIMEOUT_S", "120")), - "exec_max_output_chars": int(os.getenv("CLICKTHROUGH_EXEC_MAX_OUTPUT_CHARS", "20000")), - "exec_secret": os.getenv("CLICKTHROUGH_EXEC_SECRET", "").strip(), - "tesseract_cmd": os.getenv("CLICKTHROUGH_TESSERACT_CMD", "").strip(), -} - - -class ScreenRequest(BaseModel): - with_grid: bool = True - grid_rows: int = Field(default=SETTINGS["default_grid_rows"], ge=1, le=200) - grid_cols: int = Field(default=SETTINGS["default_grid_cols"], ge=1, le=200) - include_labels: bool = True - image_format: Literal["png", "jpeg"] = "png" - jpeg_quality: int = Field(default=85, ge=1, le=100) - - -class ZoomRequest(BaseModel): - center_x: int = Field(ge=0) - center_y: int = Field(ge=0) - width: int = Field(default=500, ge=10) - height: int = Field(default=350, ge=10) - with_grid: bool = True - grid_rows: int = Field(default=20, ge=1, le=300) - grid_cols: int = Field(default=20, ge=1, le=300) - include_labels: bool = True - image_format: Literal["png", "jpeg"] = "png" - jpeg_quality: int = Field(default=90, ge=1, le=100) - - -class PixelTarget(BaseModel): - mode: Literal["pixel"] - x: int - y: int - dx: int = 0 - dy: int = 0 - - -class GridTarget(BaseModel): - mode: Literal["grid"] - region_x: int - region_y: int - region_width: int = Field(gt=0) - region_height: int = Field(gt=0) - rows: int = Field(gt=0) - cols: int = Field(gt=0) - row: int = Field(ge=0) - col: int = Field(ge=0) - dx: float = 0.0 - dy: float = 0.0 - - @model_validator(mode="after") - def _validate_indices(self): - if self.row >= self.rows or self.col >= self.cols: - raise ValueError("row/col must be inside rows/cols") - if not -1.0 <= self.dx <= 1.0: - raise ValueError("dx must be in [-1, 1]") - if not -1.0 <= self.dy <= 1.0: - raise ValueError("dy must be in [-1, 1]") - return self - - -Target = PixelTarget | GridTarget - - -class ActionRequest(BaseModel): - action: Literal[ - "move", - "click", - "right_click", - "double_click", - "middle_click", - "scroll", - "type", - "hotkey", - ] - target: Optional[Target] = None - duration_ms: int = Field(default=0, ge=0, le=20000) - button: Literal["left", "right", "middle"] = "left" - clicks: int = Field(default=1, ge=1, le=10) - scroll_amount: int = 0 - text: str = "" - keys: list[str] = Field(default_factory=list) - interval_ms: int = Field(default=20, ge=0, le=5000) - dry_run: bool = False - - -class BatchRequest(BaseModel): - actions: list[ActionRequest] = Field(min_length=1, max_length=100) - stop_on_error: bool = True - - -class ExecRequest(BaseModel): - command: str = Field(min_length=1, max_length=10000) - shell: Literal["powershell", "bash", "cmd"] | None = None - timeout_s: int | None = Field(default=None, ge=1, le=600) - cwd: str | None = None - dry_run: bool = False - - -class OCRRequest(BaseModel): - mode: Literal["screen", "region", "image"] = "screen" - region_x: int | None = Field(default=None, ge=0) - region_y: int | None = Field(default=None, ge=0) - region_width: int | None = Field(default=None, gt=0) - region_height: int | None = Field(default=None, gt=0) - image_base64: str | None = None - language_hint: str | None = Field(default=None, min_length=1, max_length=64) - min_confidence: float = Field(default=0.0, ge=0.0, le=1.0) - - @model_validator(mode="after") - def _validate_mode_inputs(self): - if self.mode == "region": - required = [self.region_x, self.region_y, self.region_width, self.region_height] - if any(v is None for v in required): - raise ValueError("region_x, region_y, region_width, region_height are required for mode=region") - if self.mode == "image" and not self.image_base64: - raise ValueError("image_base64 is required for mode=image") - return self - - -class WindowQuery(BaseModel): - title_contains: str | None = Field(default=None, max_length=512) - title_regex: str | None = Field(default=None, max_length=512) - process_name: str | None = Field(default=None, max_length=260) - hwnd: int | None = Field(default=None, ge=1) - visible_only: bool = True - - -class WindowActionRequest(WindowQuery): - action: Literal["focus", "restore", "minimize", "maximize", "close"] - timeout_ms: int = Field(default=3000, ge=0, le=60000) - - -class LaunchRequest(BaseModel): - executable: str = Field(min_length=1, max_length=2048) - args: list[str] = Field(default_factory=list, max_length=100) - cwd: str | None = None - wait_for_window: bool = False - match: WindowQuery | None = None - timeout_ms: int = Field(default=5000, ge=0, le=120000) - dry_run: bool = False - - -class WaitTextCondition(BaseModel): - kind: Literal["text"] - mode: Literal["screen", "region"] = "screen" - text: str = Field(min_length=1, max_length=512) - match: Literal["contains", "exact", "regex"] = "contains" - present: bool = True - region_x: int | None = Field(default=None, ge=0) - region_y: int | None = Field(default=None, ge=0) - region_width: int | None = Field(default=None, gt=0) - region_height: int | None = Field(default=None, gt=0) - language_hint: str | None = Field(default=None, min_length=1, max_length=64) - min_confidence: float = Field(default=0.0, ge=0.0, le=1.0) - - @model_validator(mode="after") - def _validate_region(self): - if self.mode == "region": - required = [self.region_x, self.region_y, self.region_width, self.region_height] - if any(v is None for v in required): - raise ValueError("region_x, region_y, region_width, region_height are required for mode=region") - return self - - -class WaitWindowCondition(WindowQuery): - kind: Literal["window"] - state: Literal["exists", "focused", "closed"] = "exists" - - -class WaitVisualCondition(BaseModel): - kind: Literal["visual"] - state: Literal["change", "stable"] = "change" - region_x: int | None = Field(default=None, ge=0) - region_y: int | None = Field(default=None, ge=0) - region_width: int | None = Field(default=None, gt=0) - region_height: int | None = Field(default=None, gt=0) - diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0) - stable_for_ms: int = Field(default=800, ge=0, le=60000) - - -class WaitRequest(BaseModel): - condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition - timeout_ms: int = Field(default=5000, ge=0, le=120000) - poll_interval_ms: int = Field(default=250, ge=50, le=10000) - - -class OCRFindRequest(OCRRequest): - query: str = Field(min_length=1, max_length=512) - match: Literal["contains", "exact", "regex"] = "contains" - group_lines: bool = True - max_results: int = Field(default=20, ge=1, le=200) - - -class VisionDiffRequest(BaseModel): - mode: Literal["screen", "region", "image"] = "screen" - region_x: int | None = Field(default=None, ge=0) - region_y: int | None = Field(default=None, ge=0) - region_width: int | None = Field(default=None, gt=0) - region_height: int | None = Field(default=None, gt=0) - before_image_base64: str | None = None - after_image_base64: str | None = None - delay_ms: int = Field(default=300, ge=0, le=60000) - diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0) - - @model_validator(mode="after") - def _validate_inputs(self): - if self.mode == "region": - required = [self.region_x, self.region_y, self.region_width, self.region_height] - if any(v is None for v in required): - raise ValueError("region_x, region_y, region_width, region_height are required for mode=region") - if self.mode == "image" and (not self.before_image_base64 or not self.after_image_base64): - raise ValueError("before_image_base64 and after_image_base64 are required for mode=image") - return self - - -class VisionStabilityRequest(BaseModel): - region_x: int | None = Field(default=None, ge=0) - region_y: int | None = Field(default=None, ge=0) - region_width: int | None = Field(default=None, gt=0) - region_height: int | None = Field(default=None, gt=0) - sample_interval_ms: int = Field(default=250, ge=50, le=10000) - duration_ms: int = Field(default=1200, ge=0, le=120000) - diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0) - - -class VerifyActionRequest(BaseModel): - action: ActionRequest - condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition - retries: int = Field(default=0, ge=0, le=10) - timeout_ms: int = Field(default=5000, ge=0, le=120000) - poll_interval_ms: int = Field(default=250, ge=50, le=10000) - retry_delay_ms: int = Field(default=200, ge=0, le=60000) - stop_on_action_error: bool = True - - -class ObserveRequestV2(BaseModel): - mode: Literal["screen", "region"] = "screen" - region_x: int | None = Field(default=None, ge=0) - region_y: int | None = Field(default=None, ge=0) - region_width: int | None = Field(default=None, gt=0) - region_height: int | None = Field(default=None, gt=0) - include_image: bool = True - image_format: Literal["png", "jpeg"] = "jpeg" - jpeg_quality: int = Field(default=75, ge=1, le=100) - ocr_mode: Literal["none", "region", "screen"] = "none" - language_hint: str | None = Field(default=None, min_length=1, max_length=64) - min_confidence: float = Field(default=0.4, ge=0.0, le=1.0) - max_ocr_area_px: int | None = Field(default=1_500_000, ge=1000) - group_lines: bool = True - - @model_validator(mode="after") - def _validate_region(self): - if self.mode == "region": - required = [self.region_x, self.region_y, self.region_width, self.region_height] - if any(v is None for v in required): - raise ValueError("region_x, region_y, region_width, region_height are required for mode=region") - return self - - -class ImageToolPoint(BaseModel): - x: int = Field(ge=0) - y: int = Field(ge=0) - - -class LocalizeRequestV2(BaseModel): - observation_id: str = Field(min_length=1, max_length=128) - text_query: str | None = Field(default=None, max_length=512) - text_match: Literal["contains", "exact", "regex"] = "contains" - image_tool_point: ImageToolPoint | None = None - candidate_index: int = Field(default=0, ge=0) - - @model_validator(mode="after") - def _validate_selector(self): - has_text = bool((self.text_query or "").strip()) - has_point = self.image_tool_point is not None - if has_text == has_point: - raise ValueError("provide exactly one of text_query or image_tool_point") - return self - - -class ActionTargetV2(BaseModel): - resolved_target_id: str | None = Field(default=None, max_length=128) - pixel_x: int | None = None - pixel_y: int | None = None - - @model_validator(mode="after") - def _validate_shape(self): - has_resolved = bool(self.resolved_target_id) - has_pixel = self.pixel_x is not None or self.pixel_y is not None - if has_resolved == has_pixel: - raise ValueError("provide either resolved_target_id or pixel_x/pixel_y") - if has_pixel and (self.pixel_x is None or self.pixel_y is None): - raise ValueError("pixel_x and pixel_y are both required") - return self - - -class ActionRequestV2(BaseModel): - action: Literal[ - "move", - "click", - "right_click", - "double_click", - "middle_click", - "scroll", - "type", - "hotkey", - ] - target: ActionTargetV2 | None = None - duration_ms: int = Field(default=0, ge=0, le=20000) - button: Literal["left", "right", "middle"] = "left" - clicks: int = Field(default=1, ge=1, le=10) - scroll_amount: int = 0 - text: str = "" - keys: list[str] = Field(default_factory=list) - interval_ms: int = Field(default=20, ge=0, le=5000) - dry_run: bool = False - - -class ActRequestV2(BaseModel): - action: ActionRequestV2 - - -class ActVerifyRequestV2(BaseModel): - action: ActionRequestV2 - condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition - risk_level: Literal["low", "high"] = "low" - retries: int | None = Field(default=None, ge=0, le=10) - timeout_ms: int | None = Field(default=None, ge=0, le=120000) - poll_interval_ms: int | None = Field(default=None, ge=50, le=10000) - retry_delay_ms: int | None = Field(default=None, ge=0, le=60000) - stop_on_action_error: bool = True - - -OBSERVATIONS: dict[str, dict[str, Any]] = {} -RESOLVED_TARGETS: dict[str, dict[str, Any]] = {} - - -def _get_observation(observation_id: str) -> dict[str, Any]: - observation = OBSERVATIONS.get(observation_id) - if observation is None: - raise HTTPException(status_code=404, detail="observation_id not found") - return observation - - -def _resolve_v2_action(req: ActionRequestV2) -> ActionRequest: - target: Target | None = None - if req.target is not None: - if req.target.resolved_target_id: - item = RESOLVED_TARGETS.get(req.target.resolved_target_id) - if item is None: - raise HTTPException(status_code=404, detail="resolved_target_id not found") - target = PixelTarget(mode="pixel", x=item["x"], y=item["y"], dx=0, dy=0) - else: - target = PixelTarget(mode="pixel", x=req.target.pixel_x or 0, y=req.target.pixel_y or 0, dx=0, dy=0) - return ActionRequest( - action=req.action, - target=target, - duration_ms=req.duration_ms, - button=req.button, - clicks=req.clicks, - scroll_amount=req.scroll_amount, - text=req.text, - keys=req.keys, - interval_ms=req.interval_ms, - dry_run=req.dry_run, - ) - - -def _risk_defaults(risk_level: str) -> dict[str, int]: - if risk_level == "high": - return {"retries": 1, "timeout_ms": 6000, "poll_interval_ms": 250, "retry_delay_ms": 300} - return {"retries": 0, "timeout_ms": 2500, "poll_interval_ms": 200, "retry_delay_ms": 150} +@app.exception_handler(Exception) +async def _unhandled_exception_handler(_: Request, exc: Exception): + return _err("internal_error", "internal server error", 500, {"type": type(exc).__name__}) def _auth(x_clickthrough_token: Optional[str] = Header(default=None)): @@ -484,1282 +85,73 @@ def _auth(x_clickthrough_token: Optional[str] = Header(default=None)): raise HTTPException(status_code=401, detail="invalid token") -def _now_ms() -> int: - return int(time.time() * 1000) - - -def _request_id() -> str: - return str(uuid.uuid4()) - - -def _import_capture_libs(): - try: - from PIL import Image, ImageDraw - import mss - - return Image, ImageDraw, mss - except Exception as exc: - raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc - - -def _display_region(mon: dict, screen: int, mss_index: int, primary: bool) -> dict: - return { - "screen": screen, - "mss_index": mss_index, - "primary": primary, - "x": mon["left"], - "y": mon["top"], - "width": mon["width"], - "height": mon["height"], - } - - -def _ordered_displays(sct) -> list[dict]: - raw_monitors = list(enumerate(sct.monitors[1:], start=1)) - if not raw_monitors: - raise HTTPException(status_code=500, detail="no displays detected") - - primary_pos = next( - (idx for idx, (_, mon) in enumerate(raw_monitors) if mon["left"] == 0 and mon["top"] == 0), - 0, - ) - ordered = [raw_monitors[primary_pos]] + [ - item for idx, item in enumerate(raw_monitors) if idx != primary_pos - ] - return [ - _display_region(mon, screen=index, mss_index=mss_index, primary=(index == 0)) - for index, (mss_index, mon) in enumerate(ordered) - ] - - -def _get_displays() -> list[dict]: - _, _, mss = _import_capture_libs() - with mss.mss() as sct: - return _ordered_displays(sct) - - -def _select_display(screen: int) -> tuple[dict, list[dict], dict]: - displays = _get_displays() - selected = displays[screen] if 0 <= screen < len(displays) else displays[0] - selection = { - "requested": screen, - "selected": selected["screen"], - "fallback": selected["screen"] != screen, - } - return selected, displays, selection - - -def _capture_screen(screen: int = 0): - Image, _, mss = _import_capture_libs() - with mss.mss() as sct: - displays = _ordered_displays(sct) - mon = displays[screen] if 0 <= screen < len(displays) else displays[0] - shot = sct.grab( - { - "left": mon["x"], - "top": mon["y"], - "width": mon["width"], - "height": mon["height"], - } - ) - image = Image.frombytes("RGB", shot.size, shot.rgb) - selection = { - "requested": screen, - "selected": mon["screen"], - "fallback": mon["screen"] != screen, - } - return image, mon, displays, selection - - -def _serialize_image(image, image_format: str, jpeg_quality: int) -> bytes: - buf = io.BytesIO() - if image_format == "jpeg": - image.save(buf, format="JPEG", quality=jpeg_quality) - else: - image.save(buf, format="PNG") - return buf.getvalue() - - -def _encode_image(image, image_format: str, jpeg_quality: int) -> str: - return base64.b64encode(_serialize_image(image, image_format, jpeg_quality)).decode("ascii") - - -def _draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool): - _, ImageDraw, _ = _import_capture_libs() - out = image.copy() - draw = ImageDraw.Draw(out) - w, h = out.size - - cell_w = w / cols - cell_h = h / rows - - for c in range(1, cols): - x = int(round(c * cell_w)) - draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1) - for r in range(1, rows): - y = int(round(r * cell_h)) - draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1) - - draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2) - - if include_labels: - for r in range(rows): - for c in range(cols): - cx = int((c + 0.5) * cell_w) - cy = int((r + 0.5) * cell_h) - label = f"{r},{c}" - draw.text((cx - 12, cy - 6), label, fill=(255, 255, 0)) - - meta = { - "region": {"x": region_x, "y": region_y, "width": w, "height": h}, - "grid": { - "rows": rows, - "cols": cols, - "cell_width": cell_w, - "cell_height": cell_h, - "indexing": "zero-based", - "point_formula": { - "pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)", - "pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)", - "dx_range": "[-1,1]", - "dy_range": "[-1,1]", - }, - }, - } - return out, meta - - -def _resolve_target(target: Target) -> tuple[int, int, dict]: - if isinstance(target, PixelTarget): - x = target.x + target.dx - y = target.y + target.dy - return x, y, {"mode": "pixel", "source": target.model_dump()} - - cell_w = target.region_width / target.cols - cell_h = target.region_height / target.rows - - x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w)) - y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h)) - - return x, y, { - "mode": "grid", - "source": target.model_dump(), - "derived": {"cell_width": cell_w, "cell_height": cell_h}, - } - - -def _enforce_allowed_region(x: int, y: int): - region = SETTINGS["allowed_region"] - if region is None: - return - rx, ry, rw, rh = region - if not (rx <= x < rx + rw and ry <= y < ry + rh): - raise HTTPException(status_code=403, detail="point outside allowed region") - - -def _import_input_lib(): - try: - import pyautogui - - pyautogui.FAILSAFE = True - return pyautogui - except Exception as exc: - raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc - - -def _import_ocr_libs(): - try: - import pytesseract - from pytesseract import Output - - tesseract_cmd = SETTINGS["tesseract_cmd"] - if tesseract_cmd: - pytesseract.pytesseract.tesseract_cmd = tesseract_cmd - - return pytesseract, Output - except Exception as exc: - raise HTTPException(status_code=500, detail=f"ocr backend unavailable: {exc}") from exc - - -def _decode_image_base64(value: str): - try: - from PIL import Image - except Exception as exc: - raise HTTPException(status_code=500, detail=f"image decode backend unavailable: {exc}") from exc - - payload = value.strip() - if payload.startswith("data:"): - parts = payload.split(",", 1) - if len(parts) != 2: - raise HTTPException(status_code=400, detail="invalid data URL image payload") - payload = parts[1] - - try: - image_bytes = base64.b64decode(payload, validate=True) - except Exception as exc: - raise HTTPException(status_code=400, detail="invalid image_base64 payload") from exc - - try: - image = Image.open(io.BytesIO(image_bytes)).convert("RGB") - except Exception as exc: - raise HTTPException(status_code=400, detail="unsupported or unreadable image bytes") from exc - - return image - - -def _run_ocr(image, language_hint: str | None, min_confidence: float, offset_x: int = 0, offset_y: int = 0) -> list[dict]: - pytesseract, Output = _import_ocr_libs() - - config = "--oem 3 --psm 6" - kwargs = { - "image": image, - "output_type": Output.DICT, - "config": config, - } - if language_hint: - kwargs["lang"] = language_hint - - try: - data = pytesseract.image_to_data(**kwargs) - except pytesseract.TesseractNotFoundError as exc: - raise HTTPException(status_code=500, detail="tesseract executable not found") from exc - except pytesseract.TesseractError as exc: - raise HTTPException(status_code=400, detail=f"ocr failed: {exc}") from exc - - blocks = [] - count = len(data.get("text", [])) - for idx in range(count): - text = (data["text"][idx] or "").strip() - if not text: - continue - - raw_conf = str(data["conf"][idx]).strip() - try: - conf_0_100 = float(raw_conf) - except ValueError: - conf_0_100 = -1.0 - if conf_0_100 < 0: - continue - - confidence = round(conf_0_100 / 100.0, 4) - if confidence < min_confidence: - continue - - left = int(data["left"][idx]) - top = int(data["top"][idx]) - width = int(data["width"][idx]) - height = int(data["height"][idx]) - - blocks.append( - { - "text": text, - "confidence": confidence, - "bbox": { - "x": left + offset_x, - "y": top + offset_y, - "width": width, - "height": height, - }, - "_sort": [top + offset_y, left + offset_x, idx], - } - ) - - blocks.sort(key=lambda b: (b["_sort"][0], b["_sort"][1], b["_sort"][2])) - for block in blocks: - block.pop("_sort", None) - return blocks - - -def _normalize_text(value: str) -> str: - return re.sub(r"\s+", " ", value).strip() - - -def _matches_text(haystack: str, needle: str, match_mode: str) -> bool: - if match_mode == "exact": - return haystack == needle - if match_mode == "regex": - return re.search(needle, haystack) is not None - return needle.lower() in haystack.lower() - - -def _windows_only(feature: str): - if sys.platform != "win32": - raise HTTPException(status_code=501, detail=f"{feature} is currently supported on Windows hosts only") - - -def _tasklist_process_name(pid: int) -> str | None: - try: - completed = subprocess.run( - ["tasklist", "/FI", f"PID eq {pid}", "/FO", "CSV", "/NH"], - capture_output=True, - text=True, - timeout=5, - check=False, - ) - except Exception: - return None - - line = (completed.stdout or "").strip().splitlines() - if not line: - return None - row = line[0].strip() - if not row or row.startswith("INFO:"): - return None - if row.startswith('"') and '","' in row: - return row.split('","', 1)[0].strip('"') - return None - - -def _list_windows(query: WindowQuery | None = None) -> list[dict]: - _windows_only("window endpoints") - - user32 = ctypes.windll.user32 - user32.EnumWindows.restype = ctypes.c_bool - user32.EnumWindows.argtypes = [ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p), ctypes.c_void_p] - user32.IsWindowVisible.argtypes = [ctypes.c_void_p] - user32.IsWindowVisible.restype = ctypes.c_bool - user32.IsWindowEnabled.argtypes = [ctypes.c_void_p] - user32.IsWindowEnabled.restype = ctypes.c_bool - user32.IsIconic.argtypes = [ctypes.c_void_p] - user32.IsIconic.restype = ctypes.c_bool - user32.IsZoomed.argtypes = [ctypes.c_void_p] - user32.IsZoomed.restype = ctypes.c_bool - user32.GetWindowTextLengthW.argtypes = [ctypes.c_void_p] - user32.GetWindowTextLengthW.restype = ctypes.c_int - user32.GetWindowTextW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int] - user32.GetClassNameW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int] - user32.GetClassNameW.restype = ctypes.c_int - user32.GetForegroundWindow.restype = ctypes.c_void_p - user32.GetWindowRect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.RECT)] - - foreground = int(user32.GetForegroundWindow() or 0) - title_regex = re.compile(query.title_regex, re.IGNORECASE) if query and query.title_regex else None - windows: list[dict] = [] - - enum_proc = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p) - - def _callback(hwnd, _lparam): - hwnd_int = int(hwnd) - if query and query.hwnd is not None and hwnd_int != query.hwnd: - return True - - title_len = user32.GetWindowTextLengthW(hwnd) - title_buf = ctypes.create_unicode_buffer(max(title_len + 1, 1)) - user32.GetWindowTextW(hwnd, title_buf, len(title_buf)) - title = title_buf.value - - visible = bool(user32.IsWindowVisible(hwnd)) - if query and query.visible_only and not visible: - return True - - class_buf = ctypes.create_unicode_buffer(256) - user32.GetClassNameW(hwnd, class_buf, len(class_buf)) - - pid = ctypes.wintypes.DWORD() - user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) - process_name = _tasklist_process_name(int(pid.value)) - - rect = ctypes.wintypes.RECT() - user32.GetWindowRect(hwnd, ctypes.byref(rect)) - - window = { - "hwnd": hwnd_int, - "title": title, - "class_name": class_buf.value, - "pid": int(pid.value), - "process_name": process_name, - "visible": visible, - "enabled": bool(user32.IsWindowEnabled(hwnd)), - "minimized": bool(user32.IsIconic(hwnd)), - "maximized": bool(user32.IsZoomed(hwnd)), - "foreground": hwnd_int == foreground, - "rect": { - "x": int(rect.left), - "y": int(rect.top), - "width": int(rect.right - rect.left), - "height": int(rect.bottom - rect.top), - }, - } - - if query: - if query.title_contains and query.title_contains.lower() not in title.lower(): - return True - if title_regex and not title_regex.search(title): - return True - if query.process_name and (process_name or "").lower() != query.process_name.lower(): - return True - - windows.append(window) - return True - - user32.EnumWindows(enum_proc(_callback), 0) - windows.sort(key=lambda item: (not item["foreground"], item["title"].lower(), item["hwnd"])) - return windows - - -def _require_window_match(query: WindowQuery) -> dict: - matches = _list_windows(query) - if not matches: - raise HTTPException(status_code=404, detail="no matching window found") - if len(matches) > 1 and query.hwnd is None: - raise HTTPException( - status_code=409, - detail={"message": "multiple windows matched", "matches": matches[:10]}, - ) - return matches[0] - - -def _apply_window_action(req: WindowActionRequest) -> dict: - _windows_only("window endpoints") - match = _require_window_match(req) - hwnd = match["hwnd"] - user32 = ctypes.windll.user32 - WM_CLOSE = 0x0010 - SW_RESTORE = 9 - SW_MINIMIZE = 6 - SW_MAXIMIZE = 3 - - if req.action in {"focus", "restore"}: - user32.ShowWindow(hwnd, SW_RESTORE) - ok = bool(user32.SetForegroundWindow(hwnd)) - elif req.action == "minimize": - ok = bool(user32.ShowWindow(hwnd, SW_MINIMIZE)) - elif req.action == "maximize": - ok = bool(user32.ShowWindow(hwnd, SW_MAXIMIZE)) - elif req.action == "close": - ok = bool(user32.PostMessageW(hwnd, WM_CLOSE, 0, 0)) - else: - raise HTTPException(status_code=400, detail="unsupported window action") - - deadline = time.time() + (req.timeout_ms / 1000.0) - final_match = None - while time.time() <= deadline: - current = _list_windows(WindowQuery(hwnd=hwnd, visible_only=False)) - final_match = current[0] if current else None - if req.action == "close" and final_match is None: - break - if req.action in {"focus", "restore"} and final_match and final_match["foreground"] and not final_match["minimized"]: - break - if req.action == "minimize" and final_match and final_match["minimized"]: - break - if req.action == "maximize" and final_match and final_match["maximized"]: - break - time.sleep(0.1) - - return { - "ok": ok, - "matched": match, - "window": final_match, - "closed": final_match is None, - } - - -def _launch_app(req: LaunchRequest) -> dict: - if req.cwd: - cwd = os.path.abspath(req.cwd) - if not os.path.isdir(cwd): - raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory") - else: - cwd = None - - argv = [req.executable, *req.args] - if SETTINGS["dry_run"] or req.dry_run: - return {"executed": False, "dry_run": True, "argv": argv, "cwd": cwd} - - try: - proc = subprocess.Popen(argv, cwd=cwd) - except FileNotFoundError as exc: - raise HTTPException(status_code=400, detail=f"executable not found: {exc}") from exc - except OSError as exc: - raise HTTPException(status_code=400, detail=f"failed to launch process: {exc}") from exc - - result = { - "executed": True, - "dry_run": False, - "argv": argv, - "cwd": cwd, - "pid": proc.pid, - } - - if req.wait_for_window: - query = req.match or WindowQuery(process_name=os.path.basename(req.executable), visible_only=True) - deadline = time.time() + (req.timeout_ms / 1000.0) - match = None - while time.time() <= deadline: - matches = _list_windows(query) - if matches: - match = matches[0] - break - time.sleep(0.2) - result["window"] = match - result["window_found"] = match is not None - - return result - - -def _capture_region_image(screen: int, region_x: int | None, region_y: int | None, region_width: int | None, region_height: int | None): - base_img, mon, displays, screen_selection = _capture_screen(screen) - if None in {region_x, region_y, region_width, region_height}: - return base_img, {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}, mon, displays, screen_selection - - left = region_x - mon["x"] - top = region_y - mon["y"] - right = left + region_width - bottom = top + region_height - if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]: - raise HTTPException(status_code=400, detail="requested region is outside the captured monitor") - - crop = base_img.crop((left, top, right, bottom)) - region = {"x": region_x, "y": region_y, "width": region_width, "height": region_height} - return crop, region, mon, displays, screen_selection - - -def _capture_ocr_source(req: OCRRequest, screen: int = 0): - source = req.mode - if source == "image": - image = _decode_image_base64(req.image_base64 or "") - region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]} - return image, region, None, None, None, source - - base_img, mon, displays, screen_selection = _capture_screen(screen) - if source == "screen": - image = base_img - region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]} - return image, region, mon, displays, screen_selection, source - - left = req.region_x - mon["x"] - top = req.region_y - mon["y"] - right = left + req.region_width - bottom = top + req.region_height - if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]: - raise HTTPException(status_code=400, detail="requested region is outside the captured monitor") - - image = base_img.crop((left, top, right, bottom)) - region = { - "x": req.region_x, - "y": req.region_y, - "width": req.region_width, - "height": req.region_height, - } - return image, region, mon, displays, screen_selection, source - - -def _image_diff_ratio(before, after) -> float: - diff = ImageChops.difference(before, after) - stat = ImageStat.Stat(diff) - channel_means = stat.mean if isinstance(stat.mean, list) else [stat.mean] - return float(sum(channel_means) / (len(channel_means) * 255.0)) - - -def _merge_bbox(blocks: list[dict]) -> dict: - xs = [b["bbox"]["x"] for b in blocks] - ys = [b["bbox"]["y"] for b in blocks] - rights = [b["bbox"]["x"] + b["bbox"]["width"] for b in blocks] - bottoms = [b["bbox"]["y"] + b["bbox"]["height"] for b in blocks] - return { - "x": min(xs), - "y": min(ys), - "width": max(rights) - min(xs), - "height": max(bottoms) - min(ys), - } - - -def _group_ocr_lines(blocks: list[dict]) -> list[dict]: - if not blocks: - return [] - - sorted_blocks = sorted(blocks, key=lambda b: (b["bbox"]["y"], b["bbox"]["x"])) - lines: list[list[dict]] = [] - current: list[dict] = [] - current_center = None - - for block in sorted_blocks: - bbox = block["bbox"] - center_y = bbox["y"] + (bbox["height"] / 2) - tolerance = max(10.0, bbox["height"] * 0.8) - if current and current_center is not None and abs(center_y - current_center) > tolerance: - lines.append(sorted(current, key=lambda item: item["bbox"]["x"])) - current = [] - current_center = None - current.append(block) - current_center = sum(item["bbox"]["y"] + (item["bbox"]["height"] / 2) for item in current) / len(current) - - if current: - lines.append(sorted(current, key=lambda item: item["bbox"]["x"])) - - grouped = [] - for idx, line_blocks in enumerate(lines): - text = " ".join(item["text"] for item in line_blocks).strip() - if not text: - continue - grouped.append( - { - "text": text, - "confidence": round(sum(item["confidence"] for item in line_blocks) / len(line_blocks), 4), - "bbox": _merge_bbox(line_blocks), - "blocks": line_blocks, - "line_index": idx, - } - ) - return grouped - - -def _find_text_matches(blocks: list[dict], query: str, match_mode: str, group_lines: bool, max_results: int) -> list[dict]: - target = _normalize_text(query) - candidates = _group_ocr_lines(blocks) if group_lines else blocks - matches = [] - for item in candidates: - normalized = _normalize_text(item["text"]) - if not normalized: - continue - if _matches_text(normalized, target, match_mode): - match = { - "text": item["text"], - "normalized_text": normalized, - "confidence": item["confidence"], - "bbox": item["bbox"], - "grouped": group_lines, - } - if group_lines: - match["blocks"] = item["blocks"] - match["line_index"] = item["line_index"] - matches.append(match) - - matches.sort(key=lambda item: (-item["confidence"], item["bbox"]["y"], item["bbox"]["x"])) - return matches[:max_results] - - -def _compute_visual_diff(req: VisionDiffRequest, screen: int = 0) -> dict: - if req.mode == "image": - before = _decode_image_base64(req.before_image_base64 or "") - after = _decode_image_base64(req.after_image_base64 or "") - if before.size != after.size: - raise HTTPException(status_code=400, detail="before and after images must have matching dimensions") - diff_ratio = _image_diff_ratio(before, after) - return { - "mode": req.mode, - "region": {"x": 0, "y": 0, "width": before.size[0], "height": before.size[1]}, - "diff_ratio": diff_ratio, - "changed": diff_ratio >= req.diff_threshold, - "diff_threshold": req.diff_threshold, - } - - before, region, mon, displays, screen_selection = _capture_region_image( - screen, +@app.post("/see") +def see(req: SeeRequest, _: None = Depends(_auth)): + image, region, mon, displays, screen_selection = capture_region_image( + req.screen, req.region_x, req.region_y, req.region_width, req.region_height, ) - if req.delay_ms > 0: - time.sleep(req.delay_ms / 1000.0) - after, _, _, _, _ = _capture_region_image( - screen, - region["x"], - region["y"], - region["width"], - region["height"], - ) - diff_ratio = _image_diff_ratio(before, after) - return { - "mode": req.mode, - "region": region, - "diff_ratio": diff_ratio, - "changed": diff_ratio >= req.diff_threshold, - "diff_threshold": req.diff_threshold, - "screen": screen_selection, - "display": mon, - "delay_ms": req.delay_ms, - } - - -def _measure_stability(req: VisionStabilityRequest, screen: int = 0) -> dict: - baseline, region, mon, displays, screen_selection = _capture_region_image( - screen, - req.region_x, - req.region_y, - req.region_width, - req.region_height, - ) - sample_count = 0 - max_diff_ratio = 0.0 - diffs = [] - deadline = time.time() + (req.duration_ms / 1000.0) - while time.time() < deadline: - time.sleep(req.sample_interval_ms / 1000.0) - current, _, _, _, _ = _capture_region_image( - screen, - region["x"], - region["y"], - region["width"], - region["height"], - ) - diff_ratio = _image_diff_ratio(baseline, current) - diffs.append(diff_ratio) - max_diff_ratio = max(max_diff_ratio, diff_ratio) - sample_count += 1 - baseline = current - - return { - "stable": max_diff_ratio <= req.diff_threshold, - "region": region, - "sample_count": sample_count, - "max_diff_ratio": max_diff_ratio, - "avg_diff_ratio": round(sum(diffs) / len(diffs), 6) if diffs else 0.0, - "diff_threshold": req.diff_threshold, - "duration_ms": req.duration_ms, - "sample_interval_ms": req.sample_interval_ms, - "screen": screen_selection, - "display": mon, - } - - -def _run_verified_action(req: VerifyActionRequest, screen: int = 0) -> dict: - attempts = [] - for attempt in range(req.retries + 1): - action_ok = True - action_result = None - action_error = None - try: - action_result = _exec_action(req.action, screen) - except Exception as exc: - action_ok = False - action_error = str(exc) - if req.stop_on_action_error: - attempts.append( - { - "attempt": attempt, - "action_ok": action_ok, - "action_error": action_error, - "verification": None, - } - ) - return {"success": False, "attempts": attempts, "final_attempt": attempt} - - verification = _wait_for_condition( - WaitRequest( - condition=req.condition, - timeout_ms=req.timeout_ms, - poll_interval_ms=req.poll_interval_ms, - ), - screen, - ) - attempts.append( - { - "attempt": attempt, - "action_ok": action_ok, - "action_error": action_error, - "action_result": action_result, - "verification": verification, - } - ) - if verification.get("satisfied"): - return {"success": True, "attempts": attempts, "final_attempt": attempt} - if attempt < req.retries and req.retry_delay_ms > 0: - time.sleep(req.retry_delay_ms / 1000.0) - - return {"success": False, "attempts": attempts, "final_attempt": req.retries} - - -def _wait_for_condition(req: WaitRequest, screen: int = 0) -> dict: - condition = req.condition - deadline = time.time() + (req.timeout_ms / 1000.0) - polls = 0 - - if isinstance(condition, WaitVisualCondition): - baseline, region, mon, displays, screen_selection = _capture_region_image( - screen, - condition.region_x, - condition.region_y, - condition.region_width, - condition.region_height, - ) - stable_since = None - last_diff = 0.0 - while True: - if time.time() > deadline: - return { - "satisfied": False, - "kind": condition.kind, - "state": condition.state, - "polls": polls, - "region": region, - "diff_ratio": last_diff, - "screen": screen_selection, - "display": mon, - } - time.sleep(req.poll_interval_ms / 1000.0) - current, _, _, _, _ = _capture_region_image( - screen, - region["x"], - region["y"], - region["width"], - region["height"], - ) - polls += 1 - last_diff = _image_diff_ratio(baseline, current) - if condition.state == "change": - if last_diff >= condition.diff_threshold: - return { - "satisfied": True, - "kind": condition.kind, - "state": condition.state, - "polls": polls, - "region": region, - "diff_ratio": last_diff, - "screen": screen_selection, - "display": mon, - } - else: - if last_diff <= condition.diff_threshold: - stable_since = stable_since or time.time() - if (time.time() - stable_since) * 1000 >= condition.stable_for_ms: - return { - "satisfied": True, - "kind": condition.kind, - "state": condition.state, - "polls": polls, - "region": region, - "diff_ratio": last_diff, - "stable_for_ms": int((time.time() - stable_since) * 1000), - "screen": screen_selection, - "display": mon, - } - else: - stable_since = None - baseline = current - - while True: - if isinstance(condition, WaitWindowCondition): - matches = _list_windows(condition) - polls += 1 - satisfied = False - if condition.state == "exists": - satisfied = bool(matches) - elif condition.state == "focused": - satisfied = any(item["foreground"] for item in matches) - elif condition.state == "closed": - satisfied = not matches - if satisfied: - return { - "satisfied": True, - "kind": condition.kind, - "state": condition.state, - "polls": polls, - "matches": matches[:10], - } - elif isinstance(condition, WaitTextCondition): - image, region, mon, displays, screen_selection = _capture_region_image( - screen, - condition.region_x, - condition.region_y, - condition.region_width, - condition.region_height, - ) - blocks = _run_ocr( - image, - condition.language_hint, - condition.min_confidence, - region["x"], - region["y"], - ) - polls += 1 - matched = [] - for block in blocks: - normalized = _normalize_text(block["text"]) - target = _normalize_text(condition.text) - if _matches_text(normalized, target, condition.match): - matched.append(block) - satisfied = bool(matched) if condition.present else not bool(matched) - if satisfied: - return { - "satisfied": True, - "kind": condition.kind, - "mode": condition.mode, - "polls": polls, - "region": region, - "matches": matched, - "screen": screen_selection, - "display": mon, - } - else: - raise HTTPException(status_code=400, detail="unsupported wait condition") - - if time.time() > deadline: - return { - "satisfied": False, - "kind": condition.kind, - "polls": polls, - } - time.sleep(req.poll_interval_ms / 1000.0) - - -def _pick_shell(explicit_shell: str | None) -> str: - shell_name = (explicit_shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip() - if shell_name not in {"powershell", "bash", "cmd"}: - raise HTTPException(status_code=400, detail="unsupported shell") - return shell_name - - -def _truncate_text(text: str, limit: int) -> tuple[str, bool]: - if len(text) <= limit: - return text, False - return text[:limit], True - - -def _resolve_exec_program(shell_name: str, command: str) -> list[str]: - if shell_name == "powershell": - return ["powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command] - if shell_name == "bash": - return ["bash", "-lc", command] - if shell_name == "cmd": - return ["cmd", "/c", command] - raise HTTPException(status_code=400, detail="unsupported shell") - - -def _exec_command(req: ExecRequest) -> dict: - if not SETTINGS["exec_enabled"]: - raise HTTPException(status_code=403, detail="exec endpoint disabled") - if not SETTINGS["exec_secret"]: - raise HTTPException(status_code=403, detail="exec secret not configured") - - run_dry = SETTINGS["dry_run"] or req.dry_run - shell_name = _pick_shell(req.shell) - - timeout_s = req.timeout_s if req.timeout_s is not None else SETTINGS["exec_default_timeout_s"] - timeout_s = min(timeout_s, SETTINGS["exec_max_timeout_s"]) - - cwd = None - if req.cwd: - cwd = os.path.abspath(req.cwd) - if not os.path.isdir(cwd): - raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory") - - argv = _resolve_exec_program(shell_name, req.command) - - if run_dry: - return { - "executed": False, - "dry_run": True, - "shell": shell_name, - "command": req.command, - "argv": argv, - "timeout_s": timeout_s, - "cwd": cwd, - } - - start = time.time() - try: - completed = subprocess.run( - argv, - cwd=cwd, - capture_output=True, - text=True, - timeout=timeout_s, - check=False, - ) - except subprocess.TimeoutExpired as exc: - stdout = exc.stdout or "" - stderr = exc.stderr or "" - stdout, stdout_truncated = _truncate_text(str(stdout), SETTINGS["exec_max_output_chars"]) - stderr, stderr_truncated = _truncate_text(str(stderr), SETTINGS["exec_max_output_chars"]) - return { - "executed": True, - "timed_out": True, - "shell": shell_name, - "command": req.command, - "argv": argv, - "timeout_s": timeout_s, - "cwd": cwd, - "duration_ms": int((time.time() - start) * 1000), - "exit_code": None, - "stdout": stdout, - "stderr": stderr, - "stdout_truncated": stdout_truncated, - "stderr_truncated": stderr_truncated, - } - except FileNotFoundError as exc: - raise HTTPException(status_code=400, detail=f"shell executable not found: {exc}") from exc - - stdout, stdout_truncated = _truncate_text(completed.stdout or "", SETTINGS["exec_max_output_chars"]) - stderr, stderr_truncated = _truncate_text(completed.stderr or "", SETTINGS["exec_max_output_chars"]) - - return { - "executed": True, - "timed_out": False, - "shell": shell_name, - "command": req.command, - "argv": argv, - "timeout_s": timeout_s, - "cwd": cwd, - "duration_ms": int((time.time() - start) * 1000), - "exit_code": completed.returncode, - "stdout": stdout, - "stderr": stderr, - "stdout_truncated": stdout_truncated, - "stderr_truncated": stderr_truncated, - } - - -def _exec_action(req: ActionRequest, screen: int = 0) -> dict: - run_dry = SETTINGS["dry_run"] or req.dry_run - selected_display, displays, screen_selection = _select_display(screen) - - pyautogui = None if run_dry else _import_input_lib() - resolved_target = None - - if req.target is not None: - x, y, info = _resolve_target(req.target) - _enforce_allowed_region(x, y) - resolved_target = {"x": x, "y": y, "target_info": info} - - duration_sec = req.duration_ms / 1000.0 - - if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None: - raise HTTPException(status_code=400, detail="target is required for pointer actions") - - if req.action == "scroll" and resolved_target is None: - raise HTTPException(status_code=400, detail="target is required for scroll") - - if not run_dry: - if req.action == "move": - pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec) - - elif req.action == "click": - pyautogui.click( - x=resolved_target["x"], - y=resolved_target["y"], - clicks=req.clicks, - interval=req.interval_ms / 1000.0, - button=req.button, - duration=duration_sec, - ) - - elif req.action == "right_click": - pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec) - - elif req.action == "double_click": - pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0) - - elif req.action == "middle_click": - pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec) - - elif req.action == "scroll": - pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec) - pyautogui.scroll(req.scroll_amount) - - elif req.action == "type": - pyautogui.write(req.text, interval=req.interval_ms / 1000.0) - - elif req.action == "hotkey": - if len(req.keys) < 1: - raise HTTPException(status_code=400, detail="keys is required for hotkey") - pyautogui.hotkey(*req.keys) - - return { - "action": req.action, - "executed": not run_dry, - "dry_run": run_dry, - "screen": screen_selection, - "display": selected_display, - "resolved_target": resolved_target, - } - - -def _localization_confidence(source: str, confidence: float | None = None) -> str: - if source == "image_tool_point": - return "high" - if source == "ocr" and confidence is not None: - if confidence >= 0.8: - return "high" - if confidence >= 0.55: - return "medium" - return "low" - - -@app.post("/v2/observe") -def observe_v2(req: ObserveRequestV2, screen: int = 0, _: None = Depends(_auth)): - capture_started = time.perf_counter() - image, region, mon, displays, screen_selection = _capture_region_image( - screen, - req.region_x if req.mode == "region" else None, - req.region_y if req.mode == "region" else None, - req.region_width if req.mode == "region" else None, - req.region_height if req.mode == "region" else None, - ) - capture_ms = int((time.perf_counter() - capture_started) * 1000) - - encoded = None - if req.include_image: - encoded = _encode_image(image, req.image_format, req.jpeg_quality) - - ocr_started = time.perf_counter() - blocks: list[dict] = [] - grouped_lines: list[dict] = [] - ocr_applied_mode = "none" - if req.ocr_mode != "none": - if req.ocr_mode == "screen": - ocr_image, ocr_region, _, _, _ = _capture_region_image(screen, None, None, None, None) - else: - ocr_image, ocr_region = image, region - - area = ocr_region["width"] * ocr_region["height"] - if req.max_ocr_area_px is not None and area > req.max_ocr_area_px: - raise HTTPException( - status_code=400, - detail=f"ocr area {area} exceeds max_ocr_area_px {req.max_ocr_area_px}", - ) - - blocks = _run_ocr( - ocr_image, - req.language_hint, - req.min_confidence, - ocr_region["x"], - ocr_region["y"], - ) - if req.group_lines: - grouped_lines = _group_ocr_lines(blocks) - ocr_applied_mode = req.ocr_mode - ocr_ms = int((time.perf_counter() - ocr_started) * 1000) - - observation_id = _request_id() - OBSERVATIONS[observation_id] = { - "id": observation_id, - "region": region, - "screen": screen_selection, - "display": mon, - "image_width": image.size[0], - "image_height": image.size[1], - "ocr_blocks": blocks, - "ocr_lines": grouped_lines, - "created_at_ms": _now_ms(), - } - + out_img = image + meta = {"region": region, "screen": screen_selection, "display": mon, "displays": displays} + if req.with_grid: + out_img, grid_meta = draw_grid(image, region["x"], region["y"], req.grid_rows, req.grid_cols, req.include_labels) + meta.update(grid_meta) return _ok( { - "observation_id": observation_id, - "region": region, - "screen": screen_selection, - "display": mon, "image": { - "included": req.include_image, - "format": req.image_format if req.include_image else None, - "base64": encoded, - "width": image.size[0], - "height": image.size[1], - }, - "ocr": { - "mode": ocr_applied_mode, - "min_confidence": req.min_confidence, - "language_hint": req.language_hint, - "block_count": len(blocks), - "line_count": len(grouped_lines), - "blocks": blocks, - "lines": grouped_lines, - }, - "timing_ms": { - "capture_ms": capture_ms, - "ocr_ms": ocr_ms if req.ocr_mode != "none" else 0, - "total_ms": capture_ms + (ocr_ms if req.ocr_mode != "none" else 0), + "format": req.image_format, + "base64": encode_image(out_img, req.image_format, req.jpeg_quality), + "width": out_img.size[0], + "height": out_img.size[1], }, + "meta": meta, } ) -@app.post("/v2/localize") -def localize_v2(req: LocalizeRequestV2, _: None = Depends(_auth)): - observation = _get_observation(req.observation_id) - region = observation["region"] - image_width = observation["image_width"] - image_height = observation["image_height"] +@app.post("/see/zoom") +def see_zoom(req: SeeZoomRequest, _: None = Depends(_auth)): + base_img, mon, displays, screen_selection = capture_screen(req.screen) + cx = req.center_x - mon["x"] + cy = req.center_y - mon["y"] + left = max(0, cx - (req.width // 2)) + top = max(0, cy - (req.height // 2)) + right = min(base_img.size[0], left + req.width) + bottom = min(base_img.size[1], top + req.height) + crop = base_img.crop((left, top, right, bottom)) - if req.image_tool_point is not None: - if req.image_tool_point.x >= image_width or req.image_tool_point.y >= image_height: - raise HTTPException(status_code=400, detail="image_tool_point outside observation image bounds") - x = region["x"] + req.image_tool_point.x - y = region["y"] + req.image_tool_point.y - _enforce_allowed_region(x, y) - resolved_target_id = _request_id() - RESOLVED_TARGETS[resolved_target_id] = { - "id": resolved_target_id, - "observation_id": req.observation_id, - "x": x, - "y": y, - "source": "image_tool_point", - } - return _ok( - { - "resolved_target_id": resolved_target_id, - "source": "image_tool_point", - "localization_confidence": _localization_confidence("image_tool_point"), - "pixel": {"x": x, "y": y}, - "observation_region": region, - "image_bounds": {"width": image_width, "height": image_height}, - } - ) - - lines = observation.get("ocr_lines") or _group_ocr_lines(observation.get("ocr_blocks", [])) - matches = _find_text_matches(lines, req.text_query or "", req.text_match, False, 200) - if not matches: - return _err("not_found", "no localization candidates found", 404, {"found": False, "matches": []}) - if req.candidate_index >= len(matches): - raise HTTPException(status_code=400, detail="candidate_index is outside match results") - - chosen = matches[req.candidate_index] - bbox = chosen["bbox"] - x = bbox["x"] + max(1, bbox["width"] // 2) - y = bbox["y"] + max(1, bbox["height"] // 2) - _enforce_allowed_region(x, y) - resolved_target_id = _request_id() - RESOLVED_TARGETS[resolved_target_id] = { - "id": resolved_target_id, - "observation_id": req.observation_id, - "x": x, - "y": y, - "source": "ocr", - "match": chosen, + region_x = mon["x"] + left + region_y = mon["y"] + top + meta = { + "region": {"x": region_x, "y": region_y, "width": crop.size[0], "height": crop.size[1]}, + "screen": screen_selection, + "display": mon, + "displays": displays, } + out_img = crop + if req.with_grid: + out_img, grid_meta = draw_grid(crop, region_x, region_y, req.grid_rows, req.grid_cols, req.include_labels) + meta.update(grid_meta) return _ok( { - "resolved_target_id": resolved_target_id, - "source": "ocr", - "localization_confidence": _localization_confidence("ocr", chosen.get("confidence")), - "pixel": {"x": x, "y": y}, - "selected_match": chosen, - "match_count": len(matches), + "image": { + "format": req.image_format, + "base64": encode_image(out_img, req.image_format, req.jpeg_quality), + "width": out_img.size[0], + "height": out_img.size[1], + }, + "meta": meta, } ) -@app.post("/v2/act") -def act_v2(req: ActRequestV2, screen: int = 0, _: None = Depends(_auth)): - legacy_action = _resolve_v2_action(req.action) - result = _exec_action(legacy_action, screen) - return _ok(result) - - -@app.post("/v2/act-verify") -def act_verify_v2(req: ActVerifyRequestV2, screen: int = 0, _: None = Depends(_auth)): - defaults = _risk_defaults(req.risk_level) - verify_req = VerifyActionRequest( - action=_resolve_v2_action(req.action), - condition=req.condition, - retries=defaults["retries"] if req.retries is None else req.retries, - timeout_ms=defaults["timeout_ms"] if req.timeout_ms is None else req.timeout_ms, - poll_interval_ms=defaults["poll_interval_ms"] if req.poll_interval_ms is None else req.poll_interval_ms, - retry_delay_ms=defaults["retry_delay_ms"] if req.retry_delay_ms is None else req.retry_delay_ms, - stop_on_action_error=req.stop_on_action_error, - ) - result = _run_verified_action(verify_req, screen) - payload = { - "risk_level": req.risk_level, - "defaults_applied": defaults, - **result, - } - if result.get("success", False): - return _ok(payload) - return _err("verification_failed", "action verification did not satisfy condition", 409, payload) +@app.post("/interact") +def interact(req: InteractRequest, _: None = Depends(_auth)): + return _ok(exec_action(req.action, req.screen)) @app.get("/health") @@ -1783,24 +175,17 @@ def health(_: None = Depends(_auth)): @app.get("/displays") def displays(_: None = Depends(_auth)): - detected = _get_displays() - return _ok({"displays": detected, "default_screen": 0}) + return _ok({"displays": get_displays(), "default_screen": 0}) @app.post("/exec") -def exec_command( - req: ExecRequest, - x_clickthrough_exec_secret: Optional[str] = Header(default=None), - _: None = Depends(_auth), -): +def exec_command(req: ExecRequest, x_clickthrough_exec_secret: Optional[str] = Header(default=None), _: None = Depends(_auth)): expected = SETTINGS["exec_secret"] if not expected: raise HTTPException(status_code=403, detail="exec secret not configured") if not x_clickthrough_exec_secret or not hmac.compare_digest(x_clickthrough_exec_secret, expected): raise HTTPException(status_code=401, detail="invalid exec secret") - - result = _exec_command(req) - return _ok(result) + return _ok(run_exec_command(req)) @app.get("/windows") @@ -1819,20 +204,18 @@ def windows( hwnd=hwnd, visible_only=visible_only, ) - matches = _list_windows(query) + matches = list_windows(query) return _ok({"windows": matches, "count": len(matches)}) @app.post("/windows/action") def window_action(req: WindowActionRequest, _: None = Depends(_auth)): - result = _apply_window_action(req) - return _ok(result) + return _ok(apply_window_action(req)) @app.post("/launch") def launch(req: LaunchRequest, _: None = Depends(_auth)): - result = _launch_app(req) - return _ok(result) + return _ok(launch_app(req)) if __name__ == "__main__": diff --git a/server/config.py b/server/config.py new file mode 100644 index 0000000..89ce150 --- /dev/null +++ b/server/config.py @@ -0,0 +1,42 @@ +import os +from typing import Optional + +from dotenv import load_dotenv + + +load_dotenv(dotenv_path=".env", override=False) + + +def _env_bool(name: str, default: bool) -> bool: + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]: + raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION") + if not raw: + return None + parts = [p.strip() for p in raw.split(",")] + if len(parts) != 4: + raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height") + x, y, w, h = (int(p) for p in parts) + if w <= 0 or h <= 0: + raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0") + return x, y, w, h + + +SETTINGS = { + "host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"), + "port": int(os.getenv("CLICKTHROUGH_PORT", "8123")), + "token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(), + "dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False), + "allowed_region": _parse_allowed_region(), + "exec_enabled": _env_bool("CLICKTHROUGH_EXEC_ENABLED", True), + "exec_default_shell": os.getenv("CLICKTHROUGH_EXEC_DEFAULT_SHELL", "powershell").strip().lower(), + "exec_default_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_TIMEOUT_S", "30")), + "exec_max_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_MAX_TIMEOUT_S", "120")), + "exec_max_output_chars": int(os.getenv("CLICKTHROUGH_EXEC_MAX_OUTPUT_CHARS", "20000")), + "exec_secret": os.getenv("CLICKTHROUGH_EXEC_SECRET", "").strip(), +} diff --git a/server/models.py b/server/models.py new file mode 100644 index 0000000..0871c53 --- /dev/null +++ b/server/models.py @@ -0,0 +1,124 @@ +from typing import Literal, Optional + +from pydantic import BaseModel, Field, model_validator + + +class PixelTarget(BaseModel): + mode: Literal["pixel"] + x: int + y: int + dx: int = 0 + dy: int = 0 + + +class GridTarget(BaseModel): + mode: Literal["grid"] + region_x: int + region_y: int + region_width: int = Field(gt=0) + region_height: int = Field(gt=0) + rows: int = Field(gt=0) + cols: int = Field(gt=0) + row: int = Field(ge=0) + col: int = Field(ge=0) + dx: float = 0.0 + dy: float = 0.0 + + @model_validator(mode="after") + def _validate_indices(self): + if self.row >= self.rows or self.col >= self.cols: + raise ValueError("row/col must be inside rows/cols") + if not -1.0 <= self.dx <= 1.0: + raise ValueError("dx must be in [-1, 1]") + if not -1.0 <= self.dy <= 1.0: + raise ValueError("dy must be in [-1, 1]") + return self + + +Target = PixelTarget | GridTarget + + +class ActionRequest(BaseModel): + action: Literal[ + "move", + "click", + "right_click", + "double_click", + "middle_click", + "scroll", + "type", + "hotkey", + ] + target: Optional[Target] = None + duration_ms: int = Field(default=0, ge=0, le=20000) + button: Literal["left", "right", "middle"] = "left" + clicks: int = Field(default=1, ge=1, le=10) + scroll_amount: int = 0 + text: str = "" + keys: list[str] = Field(default_factory=list) + interval_ms: int = Field(default=20, ge=0, le=5000) + dry_run: bool = False + + +class ExecRequest(BaseModel): + command: str = Field(min_length=1, max_length=10000) + shell: Literal["powershell", "bash", "cmd"] | None = None + timeout_s: int | None = Field(default=None, ge=1, le=600) + cwd: str | None = None + dry_run: bool = False + + +class WindowQuery(BaseModel): + title_contains: str | None = Field(default=None, max_length=512) + title_regex: str | None = Field(default=None, max_length=512) + process_name: str | None = Field(default=None, max_length=260) + hwnd: int | None = Field(default=None, ge=1) + visible_only: bool = True + + +class WindowActionRequest(WindowQuery): + action: Literal["focus", "restore", "minimize", "maximize", "close"] + timeout_ms: int = Field(default=3000, ge=0, le=60000) + + +class LaunchRequest(BaseModel): + executable: str = Field(min_length=1, max_length=2048) + args: list[str] = Field(default_factory=list, max_length=100) + cwd: str | None = None + wait_for_window: bool = False + match: WindowQuery | None = None + timeout_ms: int = Field(default=5000, ge=0, le=120000) + dry_run: bool = False + + +class SeeRequest(BaseModel): + screen: int = 0 + region_x: int | None = Field(default=None, ge=0) + region_y: int | None = Field(default=None, ge=0) + region_width: int | None = Field(default=None, gt=0) + region_height: int | None = Field(default=None, gt=0) + with_grid: bool = True + grid_rows: int = Field(default=12, ge=1, le=300) + grid_cols: int = Field(default=12, ge=1, le=300) + include_labels: bool = True + image_format: Literal["png", "jpeg"] = "png" + jpeg_quality: int = Field(default=85, ge=1, le=100) + + +class SeeZoomRequest(BaseModel): + screen: int = 0 + center_x: int = Field(ge=0) + center_y: int = Field(ge=0) + width: int = Field(default=500, ge=10) + height: int = Field(default=350, ge=10) + with_grid: bool = True + grid_rows: int = Field(default=20, ge=1, le=300) + grid_cols: int = Field(default=20, ge=1, le=300) + include_labels: bool = True + image_format: Literal["png", "jpeg"] = "png" + jpeg_quality: int = Field(default=90, ge=1, le=100) + + +class InteractRequest(BaseModel): + screen: int = 0 + action: ActionRequest diff --git a/server/services.py b/server/services.py new file mode 100644 index 0000000..b0828cf --- /dev/null +++ b/server/services.py @@ -0,0 +1,462 @@ +import ctypes +import io +import os +import re +import subprocess +import sys +import time +from typing import Literal + +from fastapi import HTTPException +from PIL import ImageChops, ImageStat + +from .config import SETTINGS +from .models import ActionRequest, GridTarget, LaunchRequest, PixelTarget, Target, WindowActionRequest, WindowQuery + + +def import_capture_libs(): + try: + from PIL import Image, ImageDraw + import mss + + return Image, ImageDraw, mss + except Exception as exc: + raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc + + +def display_region(mon: dict, screen: int, mss_index: int, primary: bool) -> dict: + return { + "screen": screen, + "mss_index": mss_index, + "primary": primary, + "x": mon["left"], + "y": mon["top"], + "width": mon["width"], + "height": mon["height"], + } + + +def ordered_displays(sct) -> list[dict]: + raw_monitors = list(enumerate(sct.monitors[1:], start=1)) + if not raw_monitors: + raise HTTPException(status_code=500, detail="no displays detected") + + primary_pos = next((idx for idx, (_, mon) in enumerate(raw_monitors) if mon["left"] == 0 and mon["top"] == 0), 0) + ordered = [raw_monitors[primary_pos]] + [item for idx, item in enumerate(raw_monitors) if idx != primary_pos] + return [display_region(mon, screen=index, mss_index=mss_index, primary=(index == 0)) for index, (mss_index, mon) in enumerate(ordered)] + + +def get_displays() -> list[dict]: + _, _, mss = import_capture_libs() + with mss.mss() as sct: + return ordered_displays(sct) + + +def select_display(screen: int) -> tuple[dict, list[dict], dict]: + displays = get_displays() + selected = displays[screen] if 0 <= screen < len(displays) else displays[0] + return selected, displays, {"requested": screen, "selected": selected["screen"], "fallback": selected["screen"] != screen} + + +def capture_screen(screen: int = 0): + Image, _, mss = import_capture_libs() + with mss.mss() as sct: + displays = ordered_displays(sct) + mon = displays[screen] if 0 <= screen < len(displays) else displays[0] + shot = sct.grab({"left": mon["x"], "top": mon["y"], "width": mon["width"], "height": mon["height"]}) + image = Image.frombytes("RGB", shot.size, shot.rgb) + selection = {"requested": screen, "selected": mon["screen"], "fallback": mon["screen"] != screen} + return image, mon, displays, selection + + +def capture_region_image(screen: int, region_x: int | None, region_y: int | None, region_width: int | None, region_height: int | None): + base_img, mon, displays, screen_selection = capture_screen(screen) + if None in {region_x, region_y, region_width, region_height}: + return base_img, {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}, mon, displays, screen_selection + + left = region_x - mon["x"] + top = region_y - mon["y"] + right = left + region_width + bottom = top + region_height + if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]: + raise HTTPException(status_code=400, detail="requested region is outside the captured monitor") + + crop = base_img.crop((left, top, right, bottom)) + return crop, {"x": region_x, "y": region_y, "width": region_width, "height": region_height}, mon, displays, screen_selection + + +def serialize_image(image, image_format: str, jpeg_quality: int) -> bytes: + buf = io.BytesIO() + if image_format == "jpeg": + image.save(buf, format="JPEG", quality=jpeg_quality) + else: + image.save(buf, format="PNG") + return buf.getvalue() + + +def encode_image(image, image_format: str, jpeg_quality: int) -> str: + import base64 + + return base64.b64encode(serialize_image(image, image_format, jpeg_quality)).decode("ascii") + + +def draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool): + _, ImageDraw, _ = import_capture_libs() + out = image.copy() + draw = ImageDraw.Draw(out) + w, h = out.size + cell_w = w / cols + cell_h = h / rows + + for c in range(1, cols): + x = int(round(c * cell_w)) + draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1) + for r in range(1, rows): + y = int(round(r * cell_h)) + draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1) + + draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2) + if include_labels: + for r in range(rows): + for c in range(cols): + cx = int((c + 0.5) * cell_w) + cy = int((r + 0.5) * cell_h) + draw.text((cx - 12, cy - 6), f"{r},{c}", fill=(255, 255, 0)) + + meta = { + "region": {"x": region_x, "y": region_y, "width": w, "height": h}, + "grid": { + "rows": rows, + "cols": cols, + "cell_width": cell_w, + "cell_height": cell_h, + "indexing": "zero-based", + "point_formula": { + "pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)", + "pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)", + "dx_range": "[-1,1]", + "dy_range": "[-1,1]", + }, + }, + } + return out, meta + + +def resolve_target(target: Target) -> tuple[int, int, dict]: + if isinstance(target, PixelTarget): + x = target.x + target.dx + y = target.y + target.dy + return x, y, {"mode": "pixel", "source": target.model_dump()} + + cell_w = target.region_width / target.cols + cell_h = target.region_height / target.rows + x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w)) + y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h)) + return x, y, {"mode": "grid", "source": target.model_dump(), "derived": {"cell_width": cell_w, "cell_height": cell_h}} + + +def enforce_allowed_region(x: int, y: int): + region = SETTINGS["allowed_region"] + if region is None: + return + rx, ry, rw, rh = region + if not (rx <= x < rx + rw and ry <= y < ry + rh): + raise HTTPException(status_code=403, detail="point outside allowed region") + + +def import_input_lib(): + try: + import pyautogui + + pyautogui.FAILSAFE = True + return pyautogui + except Exception as exc: + raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc + + +def exec_action(req: ActionRequest, screen: int = 0) -> dict: + run_dry = SETTINGS["dry_run"] or req.dry_run + selected_display, _, screen_selection = select_display(screen) + pyautogui = None if run_dry else import_input_lib() + resolved_target = None + + if req.target is not None: + x, y, info = resolve_target(req.target) + enforce_allowed_region(x, y) + resolved_target = {"x": x, "y": y, "target_info": info} + + duration_sec = req.duration_ms / 1000.0 + if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None: + raise HTTPException(status_code=400, detail="target is required for pointer actions") + if req.action == "scroll" and resolved_target is None: + raise HTTPException(status_code=400, detail="target is required for scroll") + + if not run_dry: + if req.action == "move": + pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec) + elif req.action == "click": + pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], clicks=req.clicks, interval=req.interval_ms / 1000.0, button=req.button, duration=duration_sec) + elif req.action == "right_click": + pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec) + elif req.action == "double_click": + pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0) + elif req.action == "middle_click": + pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec) + elif req.action == "scroll": + pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec) + pyautogui.scroll(req.scroll_amount) + elif req.action == "type": + pyautogui.write(req.text, interval=req.interval_ms / 1000.0) + elif req.action == "hotkey": + if len(req.keys) < 1: + raise HTTPException(status_code=400, detail="keys is required for hotkey") + pyautogui.hotkey(*req.keys) + + return {"action": req.action, "executed": not run_dry, "dry_run": run_dry, "screen": screen_selection, "display": selected_display, "resolved_target": resolved_target} + + +def windows_only(feature: str): + if sys.platform != "win32": + raise HTTPException(status_code=501, detail=f"{feature} is currently supported on Windows hosts only") + + +def tasklist_process_name(pid: int) -> str | None: + try: + completed = subprocess.run(["tasklist", "/FI", f"PID eq {pid}", "/FO", "CSV", "/NH"], capture_output=True, text=True, timeout=5, check=False) + except Exception: + return None + line = (completed.stdout or "").strip().splitlines() + if not line: + return None + row = line[0].strip() + if not row or row.startswith("INFO:"): + return None + if row.startswith('"') and '","' in row: + return row.split('","', 1)[0].strip('"') + return None + + +def list_windows(query: WindowQuery | None = None) -> list[dict]: + windows_only("window endpoints") + query = query or WindowQuery() + + user32 = ctypes.windll.user32 + kernel32 = ctypes.windll.kernel32 + psapi = ctypes.windll.psapi + + user32.GetWindowTextLengthW.argtypes = [ctypes.c_void_p] + user32.GetWindowTextLengthW.restype = ctypes.c_int + user32.GetWindowTextW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int] + user32.GetWindowTextW.restype = ctypes.c_int + user32.IsWindowVisible.argtypes = [ctypes.c_void_p] + user32.IsWindowVisible.restype = ctypes.c_bool + user32.IsWindowEnabled.argtypes = [ctypes.c_void_p] + user32.IsWindowEnabled.restype = ctypes.c_bool + user32.IsIconic.argtypes = [ctypes.c_void_p] + user32.IsIconic.restype = ctypes.c_bool + user32.IsZoomed.argtypes = [ctypes.c_void_p] + user32.IsZoomed.restype = ctypes.c_bool + user32.GetForegroundWindow.restype = ctypes.c_void_p + user32.GetWindowRect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.RECT)] + user32.GetWindowRect.restype = ctypes.c_bool + user32.GetClassNameW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int] + user32.GetClassNameW.restype = ctypes.c_int + + kernel32.OpenProcess.argtypes = [ctypes.wintypes.DWORD, ctypes.wintypes.BOOL, ctypes.wintypes.DWORD] + kernel32.OpenProcess.restype = ctypes.wintypes.HANDLE + kernel32.CloseHandle.argtypes = [ctypes.wintypes.HANDLE] + kernel32.CloseHandle.restype = ctypes.wintypes.BOOL + psapi.GetModuleBaseNameW.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.HMODULE, ctypes.c_wchar_p, ctypes.wintypes.DWORD] + psapi.GetModuleBaseNameW.restype = ctypes.wintypes.DWORD + + foreground = int(user32.GetForegroundWindow() or 0) + results: list[dict] = [] + + def callback(hwnd, _lparam): + hwnd_int = int(hwnd) + if query.hwnd and hwnd_int != query.hwnd: + return True + visible = bool(user32.IsWindowVisible(hwnd)) + if query.visible_only and not visible: + return True + + length = user32.GetWindowTextLengthW(hwnd) + title_buf = ctypes.create_unicode_buffer(max(1, length + 1)) + user32.GetWindowTextW(hwnd, title_buf, len(title_buf)) + title = title_buf.value or "" + + if query.title_contains and query.title_contains.lower() not in title.lower(): + return True + if query.title_regex and re.search(query.title_regex, title, flags=re.IGNORECASE) is None: + return True + + pid = ctypes.wintypes.DWORD(0) + user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + process_name = tasklist_process_name(pid.value) + if query.process_name and (process_name or "").lower() != query.process_name.lower(): + return True + + class_buf = ctypes.create_unicode_buffer(256) + user32.GetClassNameW(hwnd, class_buf, len(class_buf)) + rect = ctypes.wintypes.RECT() + user32.GetWindowRect(hwnd, ctypes.byref(rect)) + + results.append( + { + "hwnd": hwnd_int, + "title": title, + "class_name": class_buf.value, + "pid": int(pid.value), + "process_name": process_name, + "visible": visible, + "enabled": bool(user32.IsWindowEnabled(hwnd)), + "minimized": bool(user32.IsIconic(hwnd)), + "maximized": bool(user32.IsZoomed(hwnd)), + "foreground": hwnd_int == foreground, + "rect": {"x": int(rect.left), "y": int(rect.top), "width": int(rect.right - rect.left), "height": int(rect.bottom - rect.top)}, + } + ) + return True + + enum_proc = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p)(callback) + user32.EnumWindows(enum_proc, 0) + results.sort(key=lambda item: (not item["foreground"], item["title"].lower(), item["hwnd"])) + return results + + +def _pick_single_window(query: WindowQuery) -> dict: + matches = list_windows(query) + if not matches: + raise HTTPException(status_code=404, detail="no window matched") + if len(matches) > 1: + raise HTTPException(status_code=409, detail={"message": "multiple windows matched", "matches": matches[:10]}) + return matches[0] + + +def apply_window_action(req: WindowActionRequest) -> dict: + windows_only("window endpoints") + match = _pick_single_window(req) + hwnd = match["hwnd"] + user32 = ctypes.windll.user32 + + SW_RESTORE, SW_MINIMIZE, SW_MAXIMIZE = 9, 6, 3 + WM_CLOSE = 0x0010 + + if req.action == "focus": + user32.ShowWindow(hwnd, SW_RESTORE) + ok = bool(user32.SetForegroundWindow(hwnd)) + if not ok: + raise HTTPException(status_code=500, detail="failed to focus window") + elif req.action == "restore": + user32.ShowWindow(hwnd, SW_RESTORE) + elif req.action == "minimize": + user32.ShowWindow(hwnd, SW_MINIMIZE) + elif req.action == "maximize": + user32.ShowWindow(hwnd, SW_MAXIMIZE) + elif req.action == "close": + user32.PostMessageW(hwnd, WM_CLOSE, 0, 0) + + deadline = time.time() + (req.timeout_ms / 1000.0) + final = None + while time.time() <= deadline: + current = list_windows(WindowQuery(hwnd=hwnd, visible_only=False)) + if not current: + if req.action == "close": + return {"matched": match, "closed": True, "final": None} + time.sleep(0.05) + continue + final = current[0] + if req.action == "focus" and final.get("foreground"): + break + if req.action in {"restore", "minimize", "maximize"}: + break + time.sleep(0.05) + + return {"matched": match, "closed": False, "final": final} + + +def launch_app(req: LaunchRequest) -> dict: + if req.cwd and not os.path.isdir(req.cwd): + raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory") + argv = [req.executable, *req.args] + cwd = req.cwd or None + + if req.dry_run or SETTINGS["dry_run"]: + return {"executed": False, "dry_run": True, "argv": argv, "cwd": cwd} + + try: + proc = subprocess.Popen(argv, cwd=cwd) + except FileNotFoundError as exc: + raise HTTPException(status_code=400, detail=f"executable not found: {exc}") from exc + except OSError as exc: + raise HTTPException(status_code=400, detail=f"failed to launch process: {exc}") from exc + + result = {"executed": True, "dry_run": False, "argv": argv, "cwd": cwd, "pid": proc.pid} + if req.wait_for_window: + query = req.match or WindowQuery(process_name=os.path.basename(req.executable), visible_only=True) + deadline = time.time() + (req.timeout_ms / 1000.0) + match = None + while time.time() <= deadline: + matches = list_windows(query) + if matches: + match = matches[0] + break + time.sleep(0.2) + result["window"] = match + result["window_found"] = match is not None + return result + + +def _truncate_text(text: str, limit: int) -> tuple[str, bool]: + if len(text) <= limit: + return text, False + return text[:limit], True + + +def _resolve_exec_program(shell_name: str, command: str) -> list[str]: + if shell_name == "powershell": + return ["powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command] + if shell_name == "bash": + return ["bash", "-lc", command] + if shell_name == "cmd": + return ["cmd", "/c", command] + raise HTTPException(status_code=400, detail="unsupported shell") + + +def exec_command(req): + if not SETTINGS["exec_enabled"]: + raise HTTPException(status_code=403, detail="exec endpoint disabled") + if not SETTINGS["exec_secret"]: + raise HTTPException(status_code=403, detail="exec secret not configured") + + shell_name = (req.shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip() + if shell_name not in {"powershell", "bash", "cmd"}: + raise HTTPException(status_code=400, detail="unsupported shell") + + run_dry = SETTINGS["dry_run"] or req.dry_run + timeout_s = req.timeout_s if req.timeout_s is not None else SETTINGS["exec_default_timeout_s"] + timeout_s = min(timeout_s, SETTINGS["exec_max_timeout_s"]) + + cwd = None + if req.cwd: + cwd = os.path.abspath(req.cwd) + if not os.path.isdir(cwd): + raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory") + + argv = _resolve_exec_program(shell_name, req.command) + if run_dry: + return {"executed": False, "dry_run": True, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd} + + start = time.time() + try: + completed = subprocess.run(argv, cwd=cwd, capture_output=True, text=True, timeout=timeout_s, check=False) + except subprocess.TimeoutExpired as exc: + stdout, stdout_truncated = _truncate_text(str(exc.stdout or ""), SETTINGS["exec_max_output_chars"]) + stderr, stderr_truncated = _truncate_text(str(exc.stderr or ""), SETTINGS["exec_max_output_chars"]) + return {"executed": True, "timed_out": True, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd, "duration_ms": int((time.time() - start) * 1000), "exit_code": None, "stdout": stdout, "stderr": stderr, "stdout_truncated": stdout_truncated, "stderr_truncated": stderr_truncated} + except FileNotFoundError as exc: + raise HTTPException(status_code=400, detail=f"shell executable not found: {exc}") from exc + + stdout, stdout_truncated = _truncate_text(completed.stdout or "", SETTINGS["exec_max_output_chars"]) + stderr, stderr_truncated = _truncate_text(completed.stderr or "", SETTINGS["exec_max_output_chars"]) + return {"executed": True, "timed_out": False, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd, "duration_ms": int((time.time() - start) * 1000), "exit_code": completed.returncode, "stdout": stdout, "stderr": stderr, "stdout_truncated": stdout_truncated, "stderr_truncated": stderr_truncated} diff --git a/skill/SKILL.md b/skill/SKILL.md index 334befa..9b93b05 100644 --- a/skill/SKILL.md +++ b/skill/SKILL.md @@ -1,97 +1,60 @@ --- name: clickthrough-http-control -description: Drive GUI apps with Clickthrough v2 observe/localize/act APIs. Use image-tool point localization for ambiguous targets and avoid full-screen OCR loops. +description: Use 3 methods to control a computer: see (screenshot+grid), interact (mouse/keyboard), and exec (shell). --- -# Clickthrough HTTP Control (v2) +# Clickthrough Computer Control -Agents do not see live desktop video. They operate on snapshots. -Use this loop: **observe -> localize -> act -> verify**. +Use exactly 3 methods: +- `see` +- `interact` +- `exec` -## Fast defaults +## Method 1: See -- Start with `POST /v2/observe` on a tight region, not full screen. -- Set `ocr_mode` to `none` unless text is required immediately. -- Use `image` tool localization for icon-heavy or dense controls. -- Use `POST /v2/act-verify` instead of manual sleep/poll loops. - -## Mandatory image-tool click localization - -When OCR is weak or ambiguous, ask image tool for one coordinate in bounds. - -Prompt template: -- "Return one click point as JSON `{\"x\":,\"y\":}` inside this image (`width=W`, `height=H`) for the **** control." +Use `POST /see` to capture full screen or a region with a grid overlay. +Use `POST /see/zoom` to capture a tighter crop with a denser grid. Rules: -- Ask for one point only. -- Include bounds in the prompt. -- If answer is not parseable `x,y`, re-ask once with stricter format. -- Send returned point to `POST /v2/localize` via `image_tool_point`. +- Start with coarse grid (`12x12`). +- For precision, zoom and use denser grid (`20x20` or higher). +- Always use returned `meta.region` and `meta.grid` when computing click targets. +- Coordinates are global desktop coordinates. -## API playbook +## Method 2: Interact -1. **Observe** +Use `POST /interact` for one action at a time. -```json -POST /v2/observe?screen=0 -{ - "mode": "region", - "region_x": 820, - "region_y": 420, - "region_width": 700, - "region_height": 420, - "include_image": true, - "ocr_mode": "none" -} -``` +Mouse actions: +- `move`, `click`, `right_click`, `double_click`, `middle_click`, `scroll` -2. **Localize** (choose one) +Keyboard actions: +- `type`, `hotkey` -Text: -```json -POST /v2/localize -{"observation_id":"...","text_query":"Save","text_match":"exact"} -``` +Rules: +- Prefer `grid` targets derived from fresh `see`/`see/zoom` captures. +- Use `pixel` only when you already have reliable coordinates. +- After each important action, call `see` again before continuing. -Image-tool point: -```json -POST /v2/localize -{"observation_id":"...","image_tool_point":{"x":312,"y":188}} -``` +## Method 3: Exec -3. **Act** +Use `POST /exec` only for shell/system tasks. -```json -POST /v2/act?screen=0 -{"action":{"action":"click","target":{"resolved_target_id":"..."}}} -``` +Rules: +- Requires `x-clickthrough-exec-secret`. +- Do not use exec for normal clicking/typing flows. +- Prefer GUI interaction first; exec is fallback or explicit shell task. -4. **Verify** +## Lightweight Procedure -```json -POST /v2/act-verify?screen=0 -{ - "action":{"action":"click","target":{"resolved_target_id":"..."}}, - "condition":{"kind":"visual","state":"change","region_x":820,"region_y":420,"region_width":700,"region_height":420}, - "risk_level":"low" -} -``` +1. `see` capture. +2. If needed, `see/zoom` refine. +3. `interact` one step. +4. `see` verify. +5. Repeat. -## Risk policy +## Quick Safety Rules -- Low risk (navigation, focus, benign clicks): single verification signal. -- High risk (delete/send/purchase/close-lossy): use `risk_level=high` and require two checks before act. -- Never do speculative repeat clicks; switch strategy after one failed verify. - -## Anti-latency rules - -- Never repeat full-screen OCR by default. -- Re-observe only the active pane/region. -- Prefer keyboard + window APIs for app switching. -- Use OCR on region only and cap area with `max_ocr_area_px`. - -## Setup and auth - -- Include `x-clickthrough-token` when token auth is enabled. -- `/exec` additionally requires `x-clickthrough-exec-secret`. -- Validate server first: `GET /health`. +- Never click with stale screenshots. +- Never send multiple uncertain clicks in a row. +- If localization is ambiguous, re-capture with a tighter zoom.