This repository has been archived on 2026-05-20. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
clickthrough/server/app.py
Paul Wähner aced5be25e
All checks were successful
python-syntax / syntax-check (push) Successful in 7s
feat: migrate to v2-only API and unified response envelope
2026-05-03 19:11:11 +02:00

1842 lines
64 KiB
Python

import base64
import ctypes
import hmac
import io
import os
import re
import subprocess
import sys
import time
import uuid
from typing import Any, Literal, Optional
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, Header, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from PIL import ImageChops, ImageStat
from pydantic import BaseModel, Field, model_validator
load_dotenv(dotenv_path=".env", override=False)
app = FastAPI(title="clickthrough", version="0.1.0")
def _ok(data: Any, status_code: int = 200):
return JSONResponse(
status_code=status_code,
content={
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"data": data,
"error": None,
},
)
def _err(code: str, message: str, status_code: int, details: Any = None):
return JSONResponse(
status_code=status_code,
content={
"ok": False,
"request_id": _request_id(),
"time_ms": _now_ms(),
"data": None,
"error": {
"code": code,
"message": message,
"details": details,
},
},
)
@app.exception_handler(HTTPException)
async def _http_exception_handler(_: Request, exc: HTTPException):
detail = exc.detail
if isinstance(detail, dict):
message = str(detail.get("message", "request failed"))
return _err("http_error", message, exc.status_code, detail)
return _err("http_error", str(detail), exc.status_code)
@app.exception_handler(Exception)
async def _unhandled_exception_handler(_: Request, exc: Exception):
return _err("internal_error", "internal server error", 500, {"type": type(exc).__name__})
@app.exception_handler(RequestValidationError)
async def _validation_exception_handler(_: Request, exc: RequestValidationError):
return _err("validation_error", "request validation failed", 422, exc.errors())
def _env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]:
raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION")
if not raw:
return None
parts = [p.strip() for p in raw.split(",")]
if len(parts) != 4:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height")
x, y, w, h = (int(p) for p in parts)
if w <= 0 or h <= 0:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0")
return x, y, w, h
SETTINGS = {
"host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"),
"port": int(os.getenv("CLICKTHROUGH_PORT", "8123")),
"token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(),
"dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False),
"default_grid_rows": int(os.getenv("CLICKTHROUGH_GRID_ROWS", "12")),
"default_grid_cols": int(os.getenv("CLICKTHROUGH_GRID_COLS", "12")),
"allowed_region": _parse_allowed_region(),
"exec_enabled": _env_bool("CLICKTHROUGH_EXEC_ENABLED", True),
"exec_default_shell": os.getenv("CLICKTHROUGH_EXEC_DEFAULT_SHELL", "powershell").strip().lower(),
"exec_default_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_TIMEOUT_S", "30")),
"exec_max_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_MAX_TIMEOUT_S", "120")),
"exec_max_output_chars": int(os.getenv("CLICKTHROUGH_EXEC_MAX_OUTPUT_CHARS", "20000")),
"exec_secret": os.getenv("CLICKTHROUGH_EXEC_SECRET", "").strip(),
"tesseract_cmd": os.getenv("CLICKTHROUGH_TESSERACT_CMD", "").strip(),
}
class ScreenRequest(BaseModel):
with_grid: bool = True
grid_rows: int = Field(default=SETTINGS["default_grid_rows"], ge=1, le=200)
grid_cols: int = Field(default=SETTINGS["default_grid_cols"], ge=1, le=200)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=85, ge=1, le=100)
class ZoomRequest(BaseModel):
center_x: int = Field(ge=0)
center_y: int = Field(ge=0)
width: int = Field(default=500, ge=10)
height: int = Field(default=350, ge=10)
with_grid: bool = True
grid_rows: int = Field(default=20, ge=1, le=300)
grid_cols: int = Field(default=20, ge=1, le=300)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=90, ge=1, le=100)
class PixelTarget(BaseModel):
mode: Literal["pixel"]
x: int
y: int
dx: int = 0
dy: int = 0
class GridTarget(BaseModel):
mode: Literal["grid"]
region_x: int
region_y: int
region_width: int = Field(gt=0)
region_height: int = Field(gt=0)
rows: int = Field(gt=0)
cols: int = Field(gt=0)
row: int = Field(ge=0)
col: int = Field(ge=0)
dx: float = 0.0
dy: float = 0.0
@model_validator(mode="after")
def _validate_indices(self):
if self.row >= self.rows or self.col >= self.cols:
raise ValueError("row/col must be inside rows/cols")
if not -1.0 <= self.dx <= 1.0:
raise ValueError("dx must be in [-1, 1]")
if not -1.0 <= self.dy <= 1.0:
raise ValueError("dy must be in [-1, 1]")
return self
Target = PixelTarget | GridTarget
class ActionRequest(BaseModel):
action: Literal[
"move",
"click",
"right_click",
"double_click",
"middle_click",
"scroll",
"type",
"hotkey",
]
target: Optional[Target] = None
duration_ms: int = Field(default=0, ge=0, le=20000)
button: Literal["left", "right", "middle"] = "left"
clicks: int = Field(default=1, ge=1, le=10)
scroll_amount: int = 0
text: str = ""
keys: list[str] = Field(default_factory=list)
interval_ms: int = Field(default=20, ge=0, le=5000)
dry_run: bool = False
class BatchRequest(BaseModel):
actions: list[ActionRequest] = Field(min_length=1, max_length=100)
stop_on_error: bool = True
class ExecRequest(BaseModel):
command: str = Field(min_length=1, max_length=10000)
shell: Literal["powershell", "bash", "cmd"] | None = None
timeout_s: int | None = Field(default=None, ge=1, le=600)
cwd: str | None = None
dry_run: bool = False
class OCRRequest(BaseModel):
mode: Literal["screen", "region", "image"] = "screen"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
image_base64: str | None = None
language_hint: str | None = Field(default=None, min_length=1, max_length=64)
min_confidence: float = Field(default=0.0, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_mode_inputs(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
if self.mode == "image" and not self.image_base64:
raise ValueError("image_base64 is required for mode=image")
return self
class WindowQuery(BaseModel):
title_contains: str | None = Field(default=None, max_length=512)
title_regex: str | None = Field(default=None, max_length=512)
process_name: str | None = Field(default=None, max_length=260)
hwnd: int | None = Field(default=None, ge=1)
visible_only: bool = True
class WindowActionRequest(WindowQuery):
action: Literal["focus", "restore", "minimize", "maximize", "close"]
timeout_ms: int = Field(default=3000, ge=0, le=60000)
class LaunchRequest(BaseModel):
executable: str = Field(min_length=1, max_length=2048)
args: list[str] = Field(default_factory=list, max_length=100)
cwd: str | None = None
wait_for_window: bool = False
match: WindowQuery | None = None
timeout_ms: int = Field(default=5000, ge=0, le=120000)
dry_run: bool = False
class WaitTextCondition(BaseModel):
kind: Literal["text"]
mode: Literal["screen", "region"] = "screen"
text: str = Field(min_length=1, max_length=512)
match: Literal["contains", "exact", "regex"] = "contains"
present: bool = True
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
language_hint: str | None = Field(default=None, min_length=1, max_length=64)
min_confidence: float = Field(default=0.0, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_region(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
return self
class WaitWindowCondition(WindowQuery):
kind: Literal["window"]
state: Literal["exists", "focused", "closed"] = "exists"
class WaitVisualCondition(BaseModel):
kind: Literal["visual"]
state: Literal["change", "stable"] = "change"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0)
stable_for_ms: int = Field(default=800, ge=0, le=60000)
class WaitRequest(BaseModel):
condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition
timeout_ms: int = Field(default=5000, ge=0, le=120000)
poll_interval_ms: int = Field(default=250, ge=50, le=10000)
class OCRFindRequest(OCRRequest):
query: str = Field(min_length=1, max_length=512)
match: Literal["contains", "exact", "regex"] = "contains"
group_lines: bool = True
max_results: int = Field(default=20, ge=1, le=200)
class VisionDiffRequest(BaseModel):
mode: Literal["screen", "region", "image"] = "screen"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
before_image_base64: str | None = None
after_image_base64: str | None = None
delay_ms: int = Field(default=300, ge=0, le=60000)
diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_inputs(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
if self.mode == "image" and (not self.before_image_base64 or not self.after_image_base64):
raise ValueError("before_image_base64 and after_image_base64 are required for mode=image")
return self
class VisionStabilityRequest(BaseModel):
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
sample_interval_ms: int = Field(default=250, ge=50, le=10000)
duration_ms: int = Field(default=1200, ge=0, le=120000)
diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0)
class VerifyActionRequest(BaseModel):
action: ActionRequest
condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition
retries: int = Field(default=0, ge=0, le=10)
timeout_ms: int = Field(default=5000, ge=0, le=120000)
poll_interval_ms: int = Field(default=250, ge=50, le=10000)
retry_delay_ms: int = Field(default=200, ge=0, le=60000)
stop_on_action_error: bool = True
class ObserveRequestV2(BaseModel):
mode: Literal["screen", "region"] = "screen"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
include_image: bool = True
image_format: Literal["png", "jpeg"] = "jpeg"
jpeg_quality: int = Field(default=75, ge=1, le=100)
ocr_mode: Literal["none", "region", "screen"] = "none"
language_hint: str | None = Field(default=None, min_length=1, max_length=64)
min_confidence: float = Field(default=0.4, ge=0.0, le=1.0)
max_ocr_area_px: int | None = Field(default=1_500_000, ge=1000)
group_lines: bool = True
@model_validator(mode="after")
def _validate_region(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
return self
class ImageToolPoint(BaseModel):
x: int = Field(ge=0)
y: int = Field(ge=0)
class LocalizeRequestV2(BaseModel):
observation_id: str = Field(min_length=1, max_length=128)
text_query: str | None = Field(default=None, max_length=512)
text_match: Literal["contains", "exact", "regex"] = "contains"
image_tool_point: ImageToolPoint | None = None
candidate_index: int = Field(default=0, ge=0)
@model_validator(mode="after")
def _validate_selector(self):
has_text = bool((self.text_query or "").strip())
has_point = self.image_tool_point is not None
if has_text == has_point:
raise ValueError("provide exactly one of text_query or image_tool_point")
return self
class ActionTargetV2(BaseModel):
resolved_target_id: str | None = Field(default=None, max_length=128)
pixel_x: int | None = None
pixel_y: int | None = None
@model_validator(mode="after")
def _validate_shape(self):
has_resolved = bool(self.resolved_target_id)
has_pixel = self.pixel_x is not None or self.pixel_y is not None
if has_resolved == has_pixel:
raise ValueError("provide either resolved_target_id or pixel_x/pixel_y")
if has_pixel and (self.pixel_x is None or self.pixel_y is None):
raise ValueError("pixel_x and pixel_y are both required")
return self
class ActionRequestV2(BaseModel):
action: Literal[
"move",
"click",
"right_click",
"double_click",
"middle_click",
"scroll",
"type",
"hotkey",
]
target: ActionTargetV2 | None = None
duration_ms: int = Field(default=0, ge=0, le=20000)
button: Literal["left", "right", "middle"] = "left"
clicks: int = Field(default=1, ge=1, le=10)
scroll_amount: int = 0
text: str = ""
keys: list[str] = Field(default_factory=list)
interval_ms: int = Field(default=20, ge=0, le=5000)
dry_run: bool = False
class ActRequestV2(BaseModel):
action: ActionRequestV2
class ActVerifyRequestV2(BaseModel):
action: ActionRequestV2
condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition
risk_level: Literal["low", "high"] = "low"
retries: int | None = Field(default=None, ge=0, le=10)
timeout_ms: int | None = Field(default=None, ge=0, le=120000)
poll_interval_ms: int | None = Field(default=None, ge=50, le=10000)
retry_delay_ms: int | None = Field(default=None, ge=0, le=60000)
stop_on_action_error: bool = True
OBSERVATIONS: dict[str, dict[str, Any]] = {}
RESOLVED_TARGETS: dict[str, dict[str, Any]] = {}
def _get_observation(observation_id: str) -> dict[str, Any]:
observation = OBSERVATIONS.get(observation_id)
if observation is None:
raise HTTPException(status_code=404, detail="observation_id not found")
return observation
def _resolve_v2_action(req: ActionRequestV2) -> ActionRequest:
target: Target | None = None
if req.target is not None:
if req.target.resolved_target_id:
item = RESOLVED_TARGETS.get(req.target.resolved_target_id)
if item is None:
raise HTTPException(status_code=404, detail="resolved_target_id not found")
target = PixelTarget(mode="pixel", x=item["x"], y=item["y"], dx=0, dy=0)
else:
target = PixelTarget(mode="pixel", x=req.target.pixel_x or 0, y=req.target.pixel_y or 0, dx=0, dy=0)
return ActionRequest(
action=req.action,
target=target,
duration_ms=req.duration_ms,
button=req.button,
clicks=req.clicks,
scroll_amount=req.scroll_amount,
text=req.text,
keys=req.keys,
interval_ms=req.interval_ms,
dry_run=req.dry_run,
)
def _risk_defaults(risk_level: str) -> dict[str, int]:
if risk_level == "high":
return {"retries": 1, "timeout_ms": 6000, "poll_interval_ms": 250, "retry_delay_ms": 300}
return {"retries": 0, "timeout_ms": 2500, "poll_interval_ms": 200, "retry_delay_ms": 150}
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
token = SETTINGS["token"]
if token and x_clickthrough_token != token:
raise HTTPException(status_code=401, detail="invalid token")
def _now_ms() -> int:
return int(time.time() * 1000)
def _request_id() -> str:
return str(uuid.uuid4())
def _import_capture_libs():
try:
from PIL import Image, ImageDraw
import mss
return Image, ImageDraw, mss
except Exception as exc:
raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc
def _display_region(mon: dict, screen: int, mss_index: int, primary: bool) -> dict:
return {
"screen": screen,
"mss_index": mss_index,
"primary": primary,
"x": mon["left"],
"y": mon["top"],
"width": mon["width"],
"height": mon["height"],
}
def _ordered_displays(sct) -> list[dict]:
raw_monitors = list(enumerate(sct.monitors[1:], start=1))
if not raw_monitors:
raise HTTPException(status_code=500, detail="no displays detected")
primary_pos = next(
(idx for idx, (_, mon) in enumerate(raw_monitors) if mon["left"] == 0 and mon["top"] == 0),
0,
)
ordered = [raw_monitors[primary_pos]] + [
item for idx, item in enumerate(raw_monitors) if idx != primary_pos
]
return [
_display_region(mon, screen=index, mss_index=mss_index, primary=(index == 0))
for index, (mss_index, mon) in enumerate(ordered)
]
def _get_displays() -> list[dict]:
_, _, mss = _import_capture_libs()
with mss.mss() as sct:
return _ordered_displays(sct)
def _select_display(screen: int) -> tuple[dict, list[dict], dict]:
displays = _get_displays()
selected = displays[screen] if 0 <= screen < len(displays) else displays[0]
selection = {
"requested": screen,
"selected": selected["screen"],
"fallback": selected["screen"] != screen,
}
return selected, displays, selection
def _capture_screen(screen: int = 0):
Image, _, mss = _import_capture_libs()
with mss.mss() as sct:
displays = _ordered_displays(sct)
mon = displays[screen] if 0 <= screen < len(displays) else displays[0]
shot = sct.grab(
{
"left": mon["x"],
"top": mon["y"],
"width": mon["width"],
"height": mon["height"],
}
)
image = Image.frombytes("RGB", shot.size, shot.rgb)
selection = {
"requested": screen,
"selected": mon["screen"],
"fallback": mon["screen"] != screen,
}
return image, mon, displays, selection
def _serialize_image(image, image_format: str, jpeg_quality: int) -> bytes:
buf = io.BytesIO()
if image_format == "jpeg":
image.save(buf, format="JPEG", quality=jpeg_quality)
else:
image.save(buf, format="PNG")
return buf.getvalue()
def _encode_image(image, image_format: str, jpeg_quality: int) -> str:
return base64.b64encode(_serialize_image(image, image_format, jpeg_quality)).decode("ascii")
def _draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool):
_, ImageDraw, _ = _import_capture_libs()
out = image.copy()
draw = ImageDraw.Draw(out)
w, h = out.size
cell_w = w / cols
cell_h = h / rows
for c in range(1, cols):
x = int(round(c * cell_w))
draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1)
for r in range(1, rows):
y = int(round(r * cell_h))
draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1)
draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2)
if include_labels:
for r in range(rows):
for c in range(cols):
cx = int((c + 0.5) * cell_w)
cy = int((r + 0.5) * cell_h)
label = f"{r},{c}"
draw.text((cx - 12, cy - 6), label, fill=(255, 255, 0))
meta = {
"region": {"x": region_x, "y": region_y, "width": w, "height": h},
"grid": {
"rows": rows,
"cols": cols,
"cell_width": cell_w,
"cell_height": cell_h,
"indexing": "zero-based",
"point_formula": {
"pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)",
"pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)",
"dx_range": "[-1,1]",
"dy_range": "[-1,1]",
},
},
}
return out, meta
def _resolve_target(target: Target) -> tuple[int, int, dict]:
if isinstance(target, PixelTarget):
x = target.x + target.dx
y = target.y + target.dy
return x, y, {"mode": "pixel", "source": target.model_dump()}
cell_w = target.region_width / target.cols
cell_h = target.region_height / target.rows
x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w))
y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h))
return x, y, {
"mode": "grid",
"source": target.model_dump(),
"derived": {"cell_width": cell_w, "cell_height": cell_h},
}
def _enforce_allowed_region(x: int, y: int):
region = SETTINGS["allowed_region"]
if region is None:
return
rx, ry, rw, rh = region
if not (rx <= x < rx + rw and ry <= y < ry + rh):
raise HTTPException(status_code=403, detail="point outside allowed region")
def _import_input_lib():
try:
import pyautogui
pyautogui.FAILSAFE = True
return pyautogui
except Exception as exc:
raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc
def _import_ocr_libs():
try:
import pytesseract
from pytesseract import Output
tesseract_cmd = SETTINGS["tesseract_cmd"]
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
return pytesseract, Output
except Exception as exc:
raise HTTPException(status_code=500, detail=f"ocr backend unavailable: {exc}") from exc
def _decode_image_base64(value: str):
try:
from PIL import Image
except Exception as exc:
raise HTTPException(status_code=500, detail=f"image decode backend unavailable: {exc}") from exc
payload = value.strip()
if payload.startswith("data:"):
parts = payload.split(",", 1)
if len(parts) != 2:
raise HTTPException(status_code=400, detail="invalid data URL image payload")
payload = parts[1]
try:
image_bytes = base64.b64decode(payload, validate=True)
except Exception as exc:
raise HTTPException(status_code=400, detail="invalid image_base64 payload") from exc
try:
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
except Exception as exc:
raise HTTPException(status_code=400, detail="unsupported or unreadable image bytes") from exc
return image
def _run_ocr(image, language_hint: str | None, min_confidence: float, offset_x: int = 0, offset_y: int = 0) -> list[dict]:
pytesseract, Output = _import_ocr_libs()
config = "--oem 3 --psm 6"
kwargs = {
"image": image,
"output_type": Output.DICT,
"config": config,
}
if language_hint:
kwargs["lang"] = language_hint
try:
data = pytesseract.image_to_data(**kwargs)
except pytesseract.TesseractNotFoundError as exc:
raise HTTPException(status_code=500, detail="tesseract executable not found") from exc
except pytesseract.TesseractError as exc:
raise HTTPException(status_code=400, detail=f"ocr failed: {exc}") from exc
blocks = []
count = len(data.get("text", []))
for idx in range(count):
text = (data["text"][idx] or "").strip()
if not text:
continue
raw_conf = str(data["conf"][idx]).strip()
try:
conf_0_100 = float(raw_conf)
except ValueError:
conf_0_100 = -1.0
if conf_0_100 < 0:
continue
confidence = round(conf_0_100 / 100.0, 4)
if confidence < min_confidence:
continue
left = int(data["left"][idx])
top = int(data["top"][idx])
width = int(data["width"][idx])
height = int(data["height"][idx])
blocks.append(
{
"text": text,
"confidence": confidence,
"bbox": {
"x": left + offset_x,
"y": top + offset_y,
"width": width,
"height": height,
},
"_sort": [top + offset_y, left + offset_x, idx],
}
)
blocks.sort(key=lambda b: (b["_sort"][0], b["_sort"][1], b["_sort"][2]))
for block in blocks:
block.pop("_sort", None)
return blocks
def _normalize_text(value: str) -> str:
return re.sub(r"\s+", " ", value).strip()
def _matches_text(haystack: str, needle: str, match_mode: str) -> bool:
if match_mode == "exact":
return haystack == needle
if match_mode == "regex":
return re.search(needle, haystack) is not None
return needle.lower() in haystack.lower()
def _windows_only(feature: str):
if sys.platform != "win32":
raise HTTPException(status_code=501, detail=f"{feature} is currently supported on Windows hosts only")
def _tasklist_process_name(pid: int) -> str | None:
try:
completed = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}", "/FO", "CSV", "/NH"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
except Exception:
return None
line = (completed.stdout or "").strip().splitlines()
if not line:
return None
row = line[0].strip()
if not row or row.startswith("INFO:"):
return None
if row.startswith('"') and '","' in row:
return row.split('","', 1)[0].strip('"')
return None
def _list_windows(query: WindowQuery | None = None) -> list[dict]:
_windows_only("window endpoints")
user32 = ctypes.windll.user32
user32.EnumWindows.restype = ctypes.c_bool
user32.EnumWindows.argtypes = [ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p), ctypes.c_void_p]
user32.IsWindowVisible.argtypes = [ctypes.c_void_p]
user32.IsWindowVisible.restype = ctypes.c_bool
user32.IsWindowEnabled.argtypes = [ctypes.c_void_p]
user32.IsWindowEnabled.restype = ctypes.c_bool
user32.IsIconic.argtypes = [ctypes.c_void_p]
user32.IsIconic.restype = ctypes.c_bool
user32.IsZoomed.argtypes = [ctypes.c_void_p]
user32.IsZoomed.restype = ctypes.c_bool
user32.GetWindowTextLengthW.argtypes = [ctypes.c_void_p]
user32.GetWindowTextLengthW.restype = ctypes.c_int
user32.GetWindowTextW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetClassNameW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetClassNameW.restype = ctypes.c_int
user32.GetForegroundWindow.restype = ctypes.c_void_p
user32.GetWindowRect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.RECT)]
foreground = int(user32.GetForegroundWindow() or 0)
title_regex = re.compile(query.title_regex, re.IGNORECASE) if query and query.title_regex else None
windows: list[dict] = []
enum_proc = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p)
def _callback(hwnd, _lparam):
hwnd_int = int(hwnd)
if query and query.hwnd is not None and hwnd_int != query.hwnd:
return True
title_len = user32.GetWindowTextLengthW(hwnd)
title_buf = ctypes.create_unicode_buffer(max(title_len + 1, 1))
user32.GetWindowTextW(hwnd, title_buf, len(title_buf))
title = title_buf.value
visible = bool(user32.IsWindowVisible(hwnd))
if query and query.visible_only and not visible:
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, len(class_buf))
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
process_name = _tasklist_process_name(int(pid.value))
rect = ctypes.wintypes.RECT()
user32.GetWindowRect(hwnd, ctypes.byref(rect))
window = {
"hwnd": hwnd_int,
"title": title,
"class_name": class_buf.value,
"pid": int(pid.value),
"process_name": process_name,
"visible": visible,
"enabled": bool(user32.IsWindowEnabled(hwnd)),
"minimized": bool(user32.IsIconic(hwnd)),
"maximized": bool(user32.IsZoomed(hwnd)),
"foreground": hwnd_int == foreground,
"rect": {
"x": int(rect.left),
"y": int(rect.top),
"width": int(rect.right - rect.left),
"height": int(rect.bottom - rect.top),
},
}
if query:
if query.title_contains and query.title_contains.lower() not in title.lower():
return True
if title_regex and not title_regex.search(title):
return True
if query.process_name and (process_name or "").lower() != query.process_name.lower():
return True
windows.append(window)
return True
user32.EnumWindows(enum_proc(_callback), 0)
windows.sort(key=lambda item: (not item["foreground"], item["title"].lower(), item["hwnd"]))
return windows
def _require_window_match(query: WindowQuery) -> dict:
matches = _list_windows(query)
if not matches:
raise HTTPException(status_code=404, detail="no matching window found")
if len(matches) > 1 and query.hwnd is None:
raise HTTPException(
status_code=409,
detail={"message": "multiple windows matched", "matches": matches[:10]},
)
return matches[0]
def _apply_window_action(req: WindowActionRequest) -> dict:
_windows_only("window endpoints")
match = _require_window_match(req)
hwnd = match["hwnd"]
user32 = ctypes.windll.user32
WM_CLOSE = 0x0010
SW_RESTORE = 9
SW_MINIMIZE = 6
SW_MAXIMIZE = 3
if req.action in {"focus", "restore"}:
user32.ShowWindow(hwnd, SW_RESTORE)
ok = bool(user32.SetForegroundWindow(hwnd))
elif req.action == "minimize":
ok = bool(user32.ShowWindow(hwnd, SW_MINIMIZE))
elif req.action == "maximize":
ok = bool(user32.ShowWindow(hwnd, SW_MAXIMIZE))
elif req.action == "close":
ok = bool(user32.PostMessageW(hwnd, WM_CLOSE, 0, 0))
else:
raise HTTPException(status_code=400, detail="unsupported window action")
deadline = time.time() + (req.timeout_ms / 1000.0)
final_match = None
while time.time() <= deadline:
current = _list_windows(WindowQuery(hwnd=hwnd, visible_only=False))
final_match = current[0] if current else None
if req.action == "close" and final_match is None:
break
if req.action in {"focus", "restore"} and final_match and final_match["foreground"] and not final_match["minimized"]:
break
if req.action == "minimize" and final_match and final_match["minimized"]:
break
if req.action == "maximize" and final_match and final_match["maximized"]:
break
time.sleep(0.1)
return {
"ok": ok,
"matched": match,
"window": final_match,
"closed": final_match is None,
}
def _launch_app(req: LaunchRequest) -> dict:
if req.cwd:
cwd = os.path.abspath(req.cwd)
if not os.path.isdir(cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
else:
cwd = None
argv = [req.executable, *req.args]
if SETTINGS["dry_run"] or req.dry_run:
return {"executed": False, "dry_run": True, "argv": argv, "cwd": cwd}
try:
proc = subprocess.Popen(argv, cwd=cwd)
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"executable not found: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=400, detail=f"failed to launch process: {exc}") from exc
result = {
"executed": True,
"dry_run": False,
"argv": argv,
"cwd": cwd,
"pid": proc.pid,
}
if req.wait_for_window:
query = req.match or WindowQuery(process_name=os.path.basename(req.executable), visible_only=True)
deadline = time.time() + (req.timeout_ms / 1000.0)
match = None
while time.time() <= deadline:
matches = _list_windows(query)
if matches:
match = matches[0]
break
time.sleep(0.2)
result["window"] = match
result["window_found"] = match is not None
return result
def _capture_region_image(screen: int, region_x: int | None, region_y: int | None, region_width: int | None, region_height: int | None):
base_img, mon, displays, screen_selection = _capture_screen(screen)
if None in {region_x, region_y, region_width, region_height}:
return base_img, {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}, mon, displays, screen_selection
left = region_x - mon["x"]
top = region_y - mon["y"]
right = left + region_width
bottom = top + region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
crop = base_img.crop((left, top, right, bottom))
region = {"x": region_x, "y": region_y, "width": region_width, "height": region_height}
return crop, region, mon, displays, screen_selection
def _capture_ocr_source(req: OCRRequest, screen: int = 0):
source = req.mode
if source == "image":
image = _decode_image_base64(req.image_base64 or "")
region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]}
return image, region, None, None, None, source
base_img, mon, displays, screen_selection = _capture_screen(screen)
if source == "screen":
image = base_img
region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}
return image, region, mon, displays, screen_selection, source
left = req.region_x - mon["x"]
top = req.region_y - mon["y"]
right = left + req.region_width
bottom = top + req.region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
image = base_img.crop((left, top, right, bottom))
region = {
"x": req.region_x,
"y": req.region_y,
"width": req.region_width,
"height": req.region_height,
}
return image, region, mon, displays, screen_selection, source
def _image_diff_ratio(before, after) -> float:
diff = ImageChops.difference(before, after)
stat = ImageStat.Stat(diff)
channel_means = stat.mean if isinstance(stat.mean, list) else [stat.mean]
return float(sum(channel_means) / (len(channel_means) * 255.0))
def _merge_bbox(blocks: list[dict]) -> dict:
xs = [b["bbox"]["x"] for b in blocks]
ys = [b["bbox"]["y"] for b in blocks]
rights = [b["bbox"]["x"] + b["bbox"]["width"] for b in blocks]
bottoms = [b["bbox"]["y"] + b["bbox"]["height"] for b in blocks]
return {
"x": min(xs),
"y": min(ys),
"width": max(rights) - min(xs),
"height": max(bottoms) - min(ys),
}
def _group_ocr_lines(blocks: list[dict]) -> list[dict]:
if not blocks:
return []
sorted_blocks = sorted(blocks, key=lambda b: (b["bbox"]["y"], b["bbox"]["x"]))
lines: list[list[dict]] = []
current: list[dict] = []
current_center = None
for block in sorted_blocks:
bbox = block["bbox"]
center_y = bbox["y"] + (bbox["height"] / 2)
tolerance = max(10.0, bbox["height"] * 0.8)
if current and current_center is not None and abs(center_y - current_center) > tolerance:
lines.append(sorted(current, key=lambda item: item["bbox"]["x"]))
current = []
current_center = None
current.append(block)
current_center = sum(item["bbox"]["y"] + (item["bbox"]["height"] / 2) for item in current) / len(current)
if current:
lines.append(sorted(current, key=lambda item: item["bbox"]["x"]))
grouped = []
for idx, line_blocks in enumerate(lines):
text = " ".join(item["text"] for item in line_blocks).strip()
if not text:
continue
grouped.append(
{
"text": text,
"confidence": round(sum(item["confidence"] for item in line_blocks) / len(line_blocks), 4),
"bbox": _merge_bbox(line_blocks),
"blocks": line_blocks,
"line_index": idx,
}
)
return grouped
def _find_text_matches(blocks: list[dict], query: str, match_mode: str, group_lines: bool, max_results: int) -> list[dict]:
target = _normalize_text(query)
candidates = _group_ocr_lines(blocks) if group_lines else blocks
matches = []
for item in candidates:
normalized = _normalize_text(item["text"])
if not normalized:
continue
if _matches_text(normalized, target, match_mode):
match = {
"text": item["text"],
"normalized_text": normalized,
"confidence": item["confidence"],
"bbox": item["bbox"],
"grouped": group_lines,
}
if group_lines:
match["blocks"] = item["blocks"]
match["line_index"] = item["line_index"]
matches.append(match)
matches.sort(key=lambda item: (-item["confidence"], item["bbox"]["y"], item["bbox"]["x"]))
return matches[:max_results]
def _compute_visual_diff(req: VisionDiffRequest, screen: int = 0) -> dict:
if req.mode == "image":
before = _decode_image_base64(req.before_image_base64 or "")
after = _decode_image_base64(req.after_image_base64 or "")
if before.size != after.size:
raise HTTPException(status_code=400, detail="before and after images must have matching dimensions")
diff_ratio = _image_diff_ratio(before, after)
return {
"mode": req.mode,
"region": {"x": 0, "y": 0, "width": before.size[0], "height": before.size[1]},
"diff_ratio": diff_ratio,
"changed": diff_ratio >= req.diff_threshold,
"diff_threshold": req.diff_threshold,
}
before, region, mon, displays, screen_selection = _capture_region_image(
screen,
req.region_x,
req.region_y,
req.region_width,
req.region_height,
)
if req.delay_ms > 0:
time.sleep(req.delay_ms / 1000.0)
after, _, _, _, _ = _capture_region_image(
screen,
region["x"],
region["y"],
region["width"],
region["height"],
)
diff_ratio = _image_diff_ratio(before, after)
return {
"mode": req.mode,
"region": region,
"diff_ratio": diff_ratio,
"changed": diff_ratio >= req.diff_threshold,
"diff_threshold": req.diff_threshold,
"screen": screen_selection,
"display": mon,
"delay_ms": req.delay_ms,
}
def _measure_stability(req: VisionStabilityRequest, screen: int = 0) -> dict:
baseline, region, mon, displays, screen_selection = _capture_region_image(
screen,
req.region_x,
req.region_y,
req.region_width,
req.region_height,
)
sample_count = 0
max_diff_ratio = 0.0
diffs = []
deadline = time.time() + (req.duration_ms / 1000.0)
while time.time() < deadline:
time.sleep(req.sample_interval_ms / 1000.0)
current, _, _, _, _ = _capture_region_image(
screen,
region["x"],
region["y"],
region["width"],
region["height"],
)
diff_ratio = _image_diff_ratio(baseline, current)
diffs.append(diff_ratio)
max_diff_ratio = max(max_diff_ratio, diff_ratio)
sample_count += 1
baseline = current
return {
"stable": max_diff_ratio <= req.diff_threshold,
"region": region,
"sample_count": sample_count,
"max_diff_ratio": max_diff_ratio,
"avg_diff_ratio": round(sum(diffs) / len(diffs), 6) if diffs else 0.0,
"diff_threshold": req.diff_threshold,
"duration_ms": req.duration_ms,
"sample_interval_ms": req.sample_interval_ms,
"screen": screen_selection,
"display": mon,
}
def _run_verified_action(req: VerifyActionRequest, screen: int = 0) -> dict:
attempts = []
for attempt in range(req.retries + 1):
action_ok = True
action_result = None
action_error = None
try:
action_result = _exec_action(req.action, screen)
except Exception as exc:
action_ok = False
action_error = str(exc)
if req.stop_on_action_error:
attempts.append(
{
"attempt": attempt,
"action_ok": action_ok,
"action_error": action_error,
"verification": None,
}
)
return {"success": False, "attempts": attempts, "final_attempt": attempt}
verification = _wait_for_condition(
WaitRequest(
condition=req.condition,
timeout_ms=req.timeout_ms,
poll_interval_ms=req.poll_interval_ms,
),
screen,
)
attempts.append(
{
"attempt": attempt,
"action_ok": action_ok,
"action_error": action_error,
"action_result": action_result,
"verification": verification,
}
)
if verification.get("satisfied"):
return {"success": True, "attempts": attempts, "final_attempt": attempt}
if attempt < req.retries and req.retry_delay_ms > 0:
time.sleep(req.retry_delay_ms / 1000.0)
return {"success": False, "attempts": attempts, "final_attempt": req.retries}
def _wait_for_condition(req: WaitRequest, screen: int = 0) -> dict:
condition = req.condition
deadline = time.time() + (req.timeout_ms / 1000.0)
polls = 0
if isinstance(condition, WaitVisualCondition):
baseline, region, mon, displays, screen_selection = _capture_region_image(
screen,
condition.region_x,
condition.region_y,
condition.region_width,
condition.region_height,
)
stable_since = None
last_diff = 0.0
while True:
if time.time() > deadline:
return {
"satisfied": False,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"region": region,
"diff_ratio": last_diff,
"screen": screen_selection,
"display": mon,
}
time.sleep(req.poll_interval_ms / 1000.0)
current, _, _, _, _ = _capture_region_image(
screen,
region["x"],
region["y"],
region["width"],
region["height"],
)
polls += 1
last_diff = _image_diff_ratio(baseline, current)
if condition.state == "change":
if last_diff >= condition.diff_threshold:
return {
"satisfied": True,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"region": region,
"diff_ratio": last_diff,
"screen": screen_selection,
"display": mon,
}
else:
if last_diff <= condition.diff_threshold:
stable_since = stable_since or time.time()
if (time.time() - stable_since) * 1000 >= condition.stable_for_ms:
return {
"satisfied": True,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"region": region,
"diff_ratio": last_diff,
"stable_for_ms": int((time.time() - stable_since) * 1000),
"screen": screen_selection,
"display": mon,
}
else:
stable_since = None
baseline = current
while True:
if isinstance(condition, WaitWindowCondition):
matches = _list_windows(condition)
polls += 1
satisfied = False
if condition.state == "exists":
satisfied = bool(matches)
elif condition.state == "focused":
satisfied = any(item["foreground"] for item in matches)
elif condition.state == "closed":
satisfied = not matches
if satisfied:
return {
"satisfied": True,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"matches": matches[:10],
}
elif isinstance(condition, WaitTextCondition):
image, region, mon, displays, screen_selection = _capture_region_image(
screen,
condition.region_x,
condition.region_y,
condition.region_width,
condition.region_height,
)
blocks = _run_ocr(
image,
condition.language_hint,
condition.min_confidence,
region["x"],
region["y"],
)
polls += 1
matched = []
for block in blocks:
normalized = _normalize_text(block["text"])
target = _normalize_text(condition.text)
if _matches_text(normalized, target, condition.match):
matched.append(block)
satisfied = bool(matched) if condition.present else not bool(matched)
if satisfied:
return {
"satisfied": True,
"kind": condition.kind,
"mode": condition.mode,
"polls": polls,
"region": region,
"matches": matched,
"screen": screen_selection,
"display": mon,
}
else:
raise HTTPException(status_code=400, detail="unsupported wait condition")
if time.time() > deadline:
return {
"satisfied": False,
"kind": condition.kind,
"polls": polls,
}
time.sleep(req.poll_interval_ms / 1000.0)
def _pick_shell(explicit_shell: str | None) -> str:
shell_name = (explicit_shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip()
if shell_name not in {"powershell", "bash", "cmd"}:
raise HTTPException(status_code=400, detail="unsupported shell")
return shell_name
def _truncate_text(text: str, limit: int) -> tuple[str, bool]:
if len(text) <= limit:
return text, False
return text[:limit], True
def _resolve_exec_program(shell_name: str, command: str) -> list[str]:
if shell_name == "powershell":
return ["powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command]
if shell_name == "bash":
return ["bash", "-lc", command]
if shell_name == "cmd":
return ["cmd", "/c", command]
raise HTTPException(status_code=400, detail="unsupported shell")
def _exec_command(req: ExecRequest) -> dict:
if not SETTINGS["exec_enabled"]:
raise HTTPException(status_code=403, detail="exec endpoint disabled")
if not SETTINGS["exec_secret"]:
raise HTTPException(status_code=403, detail="exec secret not configured")
run_dry = SETTINGS["dry_run"] or req.dry_run
shell_name = _pick_shell(req.shell)
timeout_s = req.timeout_s if req.timeout_s is not None else SETTINGS["exec_default_timeout_s"]
timeout_s = min(timeout_s, SETTINGS["exec_max_timeout_s"])
cwd = None
if req.cwd:
cwd = os.path.abspath(req.cwd)
if not os.path.isdir(cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
argv = _resolve_exec_program(shell_name, req.command)
if run_dry:
return {
"executed": False,
"dry_run": True,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
}
start = time.time()
try:
completed = subprocess.run(
argv,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout_s,
check=False,
)
except subprocess.TimeoutExpired as exc:
stdout = exc.stdout or ""
stderr = exc.stderr or ""
stdout, stdout_truncated = _truncate_text(str(stdout), SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(str(stderr), SETTINGS["exec_max_output_chars"])
return {
"executed": True,
"timed_out": True,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
"duration_ms": int((time.time() - start) * 1000),
"exit_code": None,
"stdout": stdout,
"stderr": stderr,
"stdout_truncated": stdout_truncated,
"stderr_truncated": stderr_truncated,
}
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"shell executable not found: {exc}") from exc
stdout, stdout_truncated = _truncate_text(completed.stdout or "", SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(completed.stderr or "", SETTINGS["exec_max_output_chars"])
return {
"executed": True,
"timed_out": False,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
"duration_ms": int((time.time() - start) * 1000),
"exit_code": completed.returncode,
"stdout": stdout,
"stderr": stderr,
"stdout_truncated": stdout_truncated,
"stderr_truncated": stderr_truncated,
}
def _exec_action(req: ActionRequest, screen: int = 0) -> dict:
run_dry = SETTINGS["dry_run"] or req.dry_run
selected_display, displays, screen_selection = _select_display(screen)
pyautogui = None if run_dry else _import_input_lib()
resolved_target = None
if req.target is not None:
x, y, info = _resolve_target(req.target)
_enforce_allowed_region(x, y)
resolved_target = {"x": x, "y": y, "target_info": info}
duration_sec = req.duration_ms / 1000.0
if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for pointer actions")
if req.action == "scroll" and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for scroll")
if not run_dry:
if req.action == "move":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
elif req.action == "click":
pyautogui.click(
x=resolved_target["x"],
y=resolved_target["y"],
clicks=req.clicks,
interval=req.interval_ms / 1000.0,
button=req.button,
duration=duration_sec,
)
elif req.action == "right_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec)
elif req.action == "double_click":
pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0)
elif req.action == "middle_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec)
elif req.action == "scroll":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
pyautogui.scroll(req.scroll_amount)
elif req.action == "type":
pyautogui.write(req.text, interval=req.interval_ms / 1000.0)
elif req.action == "hotkey":
if len(req.keys) < 1:
raise HTTPException(status_code=400, detail="keys is required for hotkey")
pyautogui.hotkey(*req.keys)
return {
"action": req.action,
"executed": not run_dry,
"dry_run": run_dry,
"screen": screen_selection,
"display": selected_display,
"resolved_target": resolved_target,
}
def _localization_confidence(source: str, confidence: float | None = None) -> str:
if source == "image_tool_point":
return "high"
if source == "ocr" and confidence is not None:
if confidence >= 0.8:
return "high"
if confidence >= 0.55:
return "medium"
return "low"
@app.post("/v2/observe")
def observe_v2(req: ObserveRequestV2, screen: int = 0, _: None = Depends(_auth)):
capture_started = time.perf_counter()
image, region, mon, displays, screen_selection = _capture_region_image(
screen,
req.region_x if req.mode == "region" else None,
req.region_y if req.mode == "region" else None,
req.region_width if req.mode == "region" else None,
req.region_height if req.mode == "region" else None,
)
capture_ms = int((time.perf_counter() - capture_started) * 1000)
encoded = None
if req.include_image:
encoded = _encode_image(image, req.image_format, req.jpeg_quality)
ocr_started = time.perf_counter()
blocks: list[dict] = []
grouped_lines: list[dict] = []
ocr_applied_mode = "none"
if req.ocr_mode != "none":
if req.ocr_mode == "screen":
ocr_image, ocr_region, _, _, _ = _capture_region_image(screen, None, None, None, None)
else:
ocr_image, ocr_region = image, region
area = ocr_region["width"] * ocr_region["height"]
if req.max_ocr_area_px is not None and area > req.max_ocr_area_px:
raise HTTPException(
status_code=400,
detail=f"ocr area {area} exceeds max_ocr_area_px {req.max_ocr_area_px}",
)
blocks = _run_ocr(
ocr_image,
req.language_hint,
req.min_confidence,
ocr_region["x"],
ocr_region["y"],
)
if req.group_lines:
grouped_lines = _group_ocr_lines(blocks)
ocr_applied_mode = req.ocr_mode
ocr_ms = int((time.perf_counter() - ocr_started) * 1000)
observation_id = _request_id()
OBSERVATIONS[observation_id] = {
"id": observation_id,
"region": region,
"screen": screen_selection,
"display": mon,
"image_width": image.size[0],
"image_height": image.size[1],
"ocr_blocks": blocks,
"ocr_lines": grouped_lines,
"created_at_ms": _now_ms(),
}
return _ok(
{
"observation_id": observation_id,
"region": region,
"screen": screen_selection,
"display": mon,
"image": {
"included": req.include_image,
"format": req.image_format if req.include_image else None,
"base64": encoded,
"width": image.size[0],
"height": image.size[1],
},
"ocr": {
"mode": ocr_applied_mode,
"min_confidence": req.min_confidence,
"language_hint": req.language_hint,
"block_count": len(blocks),
"line_count": len(grouped_lines),
"blocks": blocks,
"lines": grouped_lines,
},
"timing_ms": {
"capture_ms": capture_ms,
"ocr_ms": ocr_ms if req.ocr_mode != "none" else 0,
"total_ms": capture_ms + (ocr_ms if req.ocr_mode != "none" else 0),
},
}
)
@app.post("/v2/localize")
def localize_v2(req: LocalizeRequestV2, _: None = Depends(_auth)):
observation = _get_observation(req.observation_id)
region = observation["region"]
image_width = observation["image_width"]
image_height = observation["image_height"]
if req.image_tool_point is not None:
if req.image_tool_point.x >= image_width or req.image_tool_point.y >= image_height:
raise HTTPException(status_code=400, detail="image_tool_point outside observation image bounds")
x = region["x"] + req.image_tool_point.x
y = region["y"] + req.image_tool_point.y
_enforce_allowed_region(x, y)
resolved_target_id = _request_id()
RESOLVED_TARGETS[resolved_target_id] = {
"id": resolved_target_id,
"observation_id": req.observation_id,
"x": x,
"y": y,
"source": "image_tool_point",
}
return _ok(
{
"resolved_target_id": resolved_target_id,
"source": "image_tool_point",
"localization_confidence": _localization_confidence("image_tool_point"),
"pixel": {"x": x, "y": y},
"observation_region": region,
"image_bounds": {"width": image_width, "height": image_height},
}
)
lines = observation.get("ocr_lines") or _group_ocr_lines(observation.get("ocr_blocks", []))
matches = _find_text_matches(lines, req.text_query or "", req.text_match, False, 200)
if not matches:
return _err("not_found", "no localization candidates found", 404, {"found": False, "matches": []})
if req.candidate_index >= len(matches):
raise HTTPException(status_code=400, detail="candidate_index is outside match results")
chosen = matches[req.candidate_index]
bbox = chosen["bbox"]
x = bbox["x"] + max(1, bbox["width"] // 2)
y = bbox["y"] + max(1, bbox["height"] // 2)
_enforce_allowed_region(x, y)
resolved_target_id = _request_id()
RESOLVED_TARGETS[resolved_target_id] = {
"id": resolved_target_id,
"observation_id": req.observation_id,
"x": x,
"y": y,
"source": "ocr",
"match": chosen,
}
return _ok(
{
"resolved_target_id": resolved_target_id,
"source": "ocr",
"localization_confidence": _localization_confidence("ocr", chosen.get("confidence")),
"pixel": {"x": x, "y": y},
"selected_match": chosen,
"match_count": len(matches),
}
)
@app.post("/v2/act")
def act_v2(req: ActRequestV2, screen: int = 0, _: None = Depends(_auth)):
legacy_action = _resolve_v2_action(req.action)
result = _exec_action(legacy_action, screen)
return _ok(result)
@app.post("/v2/act-verify")
def act_verify_v2(req: ActVerifyRequestV2, screen: int = 0, _: None = Depends(_auth)):
defaults = _risk_defaults(req.risk_level)
verify_req = VerifyActionRequest(
action=_resolve_v2_action(req.action),
condition=req.condition,
retries=defaults["retries"] if req.retries is None else req.retries,
timeout_ms=defaults["timeout_ms"] if req.timeout_ms is None else req.timeout_ms,
poll_interval_ms=defaults["poll_interval_ms"] if req.poll_interval_ms is None else req.poll_interval_ms,
retry_delay_ms=defaults["retry_delay_ms"] if req.retry_delay_ms is None else req.retry_delay_ms,
stop_on_action_error=req.stop_on_action_error,
)
result = _run_verified_action(verify_req, screen)
payload = {
"risk_level": req.risk_level,
"defaults_applied": defaults,
**result,
}
if result.get("success", False):
return _ok(payload)
return _err("verification_failed", "action verification did not satisfy condition", 409, payload)
@app.get("/health")
def health(_: None = Depends(_auth)):
return _ok(
{
"service": "clickthrough",
"version": app.version,
"dry_run": SETTINGS["dry_run"],
"allowed_region": SETTINGS["allowed_region"],
"exec": {
"enabled": SETTINGS["exec_enabled"],
"secret_configured": bool(SETTINGS["exec_secret"]),
"default_shell": SETTINGS["exec_default_shell"],
"default_timeout_s": SETTINGS["exec_default_timeout_s"],
"max_timeout_s": SETTINGS["exec_max_timeout_s"],
},
}
)
@app.get("/displays")
def displays(_: None = Depends(_auth)):
detected = _get_displays()
return _ok({"displays": detected, "default_screen": 0})
@app.post("/exec")
def exec_command(
req: ExecRequest,
x_clickthrough_exec_secret: Optional[str] = Header(default=None),
_: None = Depends(_auth),
):
expected = SETTINGS["exec_secret"]
if not expected:
raise HTTPException(status_code=403, detail="exec secret not configured")
if not x_clickthrough_exec_secret or not hmac.compare_digest(x_clickthrough_exec_secret, expected):
raise HTTPException(status_code=401, detail="invalid exec secret")
result = _exec_command(req)
return _ok(result)
@app.get("/windows")
def windows(
title_contains: str | None = None,
title_regex: str | None = None,
process_name: str | None = None,
hwnd: int | None = None,
visible_only: bool = True,
_: None = Depends(_auth),
):
query = WindowQuery(
title_contains=title_contains,
title_regex=title_regex,
process_name=process_name,
hwnd=hwnd,
visible_only=visible_only,
)
matches = _list_windows(query)
return _ok({"windows": matches, "count": len(matches)})
@app.post("/windows/action")
def window_action(req: WindowActionRequest, _: None = Depends(_auth)):
result = _apply_window_action(req)
return _ok(result)
@app.post("/launch")
def launch(req: LaunchRequest, _: None = Depends(_auth)):
result = _launch_app(req)
return _ok(result)
if __name__ == "__main__":
import uvicorn
uvicorn.run("server.app:app", host=SETTINGS["host"], port=SETTINGS["port"], reload=False)