This repository has been archived on 2026-05-20. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
clickthrough/server/app.py
Luna c66779d929
All checks were successful
python-syntax / syntax-check (push) Successful in 9s
feat(verify): add compound action+verify flows
2026-05-01 16:26:57 +02:00

1719 lines
58 KiB
Python

import base64
import ctypes
import hmac
import io
import os
import re
import subprocess
import sys
import time
import uuid
from typing import Literal, Optional
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, Header, HTTPException, Response
from PIL import ImageChops, ImageStat
from pydantic import BaseModel, Field, model_validator
load_dotenv(dotenv_path=".env", override=False)
app = FastAPI(title="clickthrough", version="0.1.0")
def _env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]:
raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION")
if not raw:
return None
parts = [p.strip() for p in raw.split(",")]
if len(parts) != 4:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height")
x, y, w, h = (int(p) for p in parts)
if w <= 0 or h <= 0:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0")
return x, y, w, h
SETTINGS = {
"host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"),
"port": int(os.getenv("CLICKTHROUGH_PORT", "8123")),
"token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(),
"dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False),
"default_grid_rows": int(os.getenv("CLICKTHROUGH_GRID_ROWS", "12")),
"default_grid_cols": int(os.getenv("CLICKTHROUGH_GRID_COLS", "12")),
"allowed_region": _parse_allowed_region(),
"exec_enabled": _env_bool("CLICKTHROUGH_EXEC_ENABLED", True),
"exec_default_shell": os.getenv("CLICKTHROUGH_EXEC_DEFAULT_SHELL", "powershell").strip().lower(),
"exec_default_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_TIMEOUT_S", "30")),
"exec_max_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_MAX_TIMEOUT_S", "120")),
"exec_max_output_chars": int(os.getenv("CLICKTHROUGH_EXEC_MAX_OUTPUT_CHARS", "20000")),
"exec_secret": os.getenv("CLICKTHROUGH_EXEC_SECRET", "").strip(),
"tesseract_cmd": os.getenv("CLICKTHROUGH_TESSERACT_CMD", "").strip(),
}
class ScreenRequest(BaseModel):
with_grid: bool = True
grid_rows: int = Field(default=SETTINGS["default_grid_rows"], ge=1, le=200)
grid_cols: int = Field(default=SETTINGS["default_grid_cols"], ge=1, le=200)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=85, ge=1, le=100)
class ZoomRequest(BaseModel):
center_x: int = Field(ge=0)
center_y: int = Field(ge=0)
width: int = Field(default=500, ge=10)
height: int = Field(default=350, ge=10)
with_grid: bool = True
grid_rows: int = Field(default=20, ge=1, le=300)
grid_cols: int = Field(default=20, ge=1, le=300)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=90, ge=1, le=100)
class PixelTarget(BaseModel):
mode: Literal["pixel"]
x: int
y: int
dx: int = 0
dy: int = 0
class GridTarget(BaseModel):
mode: Literal["grid"]
region_x: int
region_y: int
region_width: int = Field(gt=0)
region_height: int = Field(gt=0)
rows: int = Field(gt=0)
cols: int = Field(gt=0)
row: int = Field(ge=0)
col: int = Field(ge=0)
dx: float = 0.0
dy: float = 0.0
@model_validator(mode="after")
def _validate_indices(self):
if self.row >= self.rows or self.col >= self.cols:
raise ValueError("row/col must be inside rows/cols")
if not -1.0 <= self.dx <= 1.0:
raise ValueError("dx must be in [-1, 1]")
if not -1.0 <= self.dy <= 1.0:
raise ValueError("dy must be in [-1, 1]")
return self
Target = PixelTarget | GridTarget
class ActionRequest(BaseModel):
action: Literal[
"move",
"click",
"right_click",
"double_click",
"middle_click",
"scroll",
"type",
"hotkey",
]
target: Optional[Target] = None
duration_ms: int = Field(default=0, ge=0, le=20000)
button: Literal["left", "right", "middle"] = "left"
clicks: int = Field(default=1, ge=1, le=10)
scroll_amount: int = 0
text: str = ""
keys: list[str] = Field(default_factory=list)
interval_ms: int = Field(default=20, ge=0, le=5000)
dry_run: bool = False
class BatchRequest(BaseModel):
actions: list[ActionRequest] = Field(min_length=1, max_length=100)
stop_on_error: bool = True
class ExecRequest(BaseModel):
command: str = Field(min_length=1, max_length=10000)
shell: Literal["powershell", "bash", "cmd"] | None = None
timeout_s: int | None = Field(default=None, ge=1, le=600)
cwd: str | None = None
dry_run: bool = False
class OCRRequest(BaseModel):
mode: Literal["screen", "region", "image"] = "screen"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
image_base64: str | None = None
language_hint: str | None = Field(default=None, min_length=1, max_length=64)
min_confidence: float = Field(default=0.0, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_mode_inputs(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
if self.mode == "image" and not self.image_base64:
raise ValueError("image_base64 is required for mode=image")
return self
class WindowQuery(BaseModel):
title_contains: str | None = Field(default=None, max_length=512)
title_regex: str | None = Field(default=None, max_length=512)
process_name: str | None = Field(default=None, max_length=260)
hwnd: int | None = Field(default=None, ge=1)
visible_only: bool = True
class WindowActionRequest(WindowQuery):
action: Literal["focus", "restore", "minimize", "maximize", "close"]
timeout_ms: int = Field(default=3000, ge=0, le=60000)
class LaunchRequest(BaseModel):
executable: str = Field(min_length=1, max_length=2048)
args: list[str] = Field(default_factory=list, max_length=100)
cwd: str | None = None
wait_for_window: bool = False
match: WindowQuery | None = None
timeout_ms: int = Field(default=5000, ge=0, le=120000)
dry_run: bool = False
class WaitTextCondition(BaseModel):
kind: Literal["text"]
mode: Literal["screen", "region"] = "screen"
text: str = Field(min_length=1, max_length=512)
match: Literal["contains", "exact", "regex"] = "contains"
present: bool = True
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
language_hint: str | None = Field(default=None, min_length=1, max_length=64)
min_confidence: float = Field(default=0.0, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_region(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
return self
class WaitWindowCondition(WindowQuery):
kind: Literal["window"]
state: Literal["exists", "focused", "closed"] = "exists"
class WaitVisualCondition(BaseModel):
kind: Literal["visual"]
state: Literal["change", "stable"] = "change"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0)
stable_for_ms: int = Field(default=800, ge=0, le=60000)
class WaitRequest(BaseModel):
condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition
timeout_ms: int = Field(default=5000, ge=0, le=120000)
poll_interval_ms: int = Field(default=250, ge=50, le=10000)
class OCRFindRequest(OCRRequest):
query: str = Field(min_length=1, max_length=512)
match: Literal["contains", "exact", "regex"] = "contains"
group_lines: bool = True
max_results: int = Field(default=20, ge=1, le=200)
class VisionDiffRequest(BaseModel):
mode: Literal["screen", "region", "image"] = "screen"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
before_image_base64: str | None = None
after_image_base64: str | None = None
delay_ms: int = Field(default=300, ge=0, le=60000)
diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_inputs(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
if self.mode == "image" and (not self.before_image_base64 or not self.after_image_base64):
raise ValueError("before_image_base64 and after_image_base64 are required for mode=image")
return self
class VisionStabilityRequest(BaseModel):
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
sample_interval_ms: int = Field(default=250, ge=50, le=10000)
duration_ms: int = Field(default=1200, ge=0, le=120000)
diff_threshold: float = Field(default=0.01, ge=0.0, le=1.0)
class VerifyActionRequest(BaseModel):
action: ActionRequest
condition: WaitTextCondition | WaitWindowCondition | WaitVisualCondition
retries: int = Field(default=0, ge=0, le=10)
timeout_ms: int = Field(default=5000, ge=0, le=120000)
poll_interval_ms: int = Field(default=250, ge=50, le=10000)
retry_delay_ms: int = Field(default=200, ge=0, le=60000)
stop_on_action_error: bool = True
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
token = SETTINGS["token"]
if token and x_clickthrough_token != token:
raise HTTPException(status_code=401, detail="invalid token")
def _now_ms() -> int:
return int(time.time() * 1000)
def _request_id() -> str:
return str(uuid.uuid4())
def _import_capture_libs():
try:
from PIL import Image, ImageDraw
import mss
return Image, ImageDraw, mss
except Exception as exc:
raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc
def _display_region(mon: dict, screen: int, mss_index: int, primary: bool) -> dict:
return {
"screen": screen,
"mss_index": mss_index,
"primary": primary,
"x": mon["left"],
"y": mon["top"],
"width": mon["width"],
"height": mon["height"],
}
def _ordered_displays(sct) -> list[dict]:
raw_monitors = list(enumerate(sct.monitors[1:], start=1))
if not raw_monitors:
raise HTTPException(status_code=500, detail="no displays detected")
primary_pos = next(
(idx for idx, (_, mon) in enumerate(raw_monitors) if mon["left"] == 0 and mon["top"] == 0),
0,
)
ordered = [raw_monitors[primary_pos]] + [
item for idx, item in enumerate(raw_monitors) if idx != primary_pos
]
return [
_display_region(mon, screen=index, mss_index=mss_index, primary=(index == 0))
for index, (mss_index, mon) in enumerate(ordered)
]
def _get_displays() -> list[dict]:
_, _, mss = _import_capture_libs()
with mss.mss() as sct:
return _ordered_displays(sct)
def _select_display(screen: int) -> tuple[dict, list[dict], dict]:
displays = _get_displays()
selected = displays[screen] if 0 <= screen < len(displays) else displays[0]
selection = {
"requested": screen,
"selected": selected["screen"],
"fallback": selected["screen"] != screen,
}
return selected, displays, selection
def _capture_screen(screen: int = 0):
Image, _, mss = _import_capture_libs()
with mss.mss() as sct:
displays = _ordered_displays(sct)
mon = displays[screen] if 0 <= screen < len(displays) else displays[0]
shot = sct.grab(
{
"left": mon["x"],
"top": mon["y"],
"width": mon["width"],
"height": mon["height"],
}
)
image = Image.frombytes("RGB", shot.size, shot.rgb)
selection = {
"requested": screen,
"selected": mon["screen"],
"fallback": mon["screen"] != screen,
}
return image, mon, displays, selection
def _serialize_image(image, image_format: str, jpeg_quality: int) -> bytes:
buf = io.BytesIO()
if image_format == "jpeg":
image.save(buf, format="JPEG", quality=jpeg_quality)
else:
image.save(buf, format="PNG")
return buf.getvalue()
def _encode_image(image, image_format: str, jpeg_quality: int) -> str:
return base64.b64encode(_serialize_image(image, image_format, jpeg_quality)).decode("ascii")
def _draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool):
_, ImageDraw, _ = _import_capture_libs()
out = image.copy()
draw = ImageDraw.Draw(out)
w, h = out.size
cell_w = w / cols
cell_h = h / rows
for c in range(1, cols):
x = int(round(c * cell_w))
draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1)
for r in range(1, rows):
y = int(round(r * cell_h))
draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1)
draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2)
if include_labels:
for r in range(rows):
for c in range(cols):
cx = int((c + 0.5) * cell_w)
cy = int((r + 0.5) * cell_h)
label = f"{r},{c}"
draw.text((cx - 12, cy - 6), label, fill=(255, 255, 0))
meta = {
"region": {"x": region_x, "y": region_y, "width": w, "height": h},
"grid": {
"rows": rows,
"cols": cols,
"cell_width": cell_w,
"cell_height": cell_h,
"indexing": "zero-based",
"point_formula": {
"pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)",
"pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)",
"dx_range": "[-1,1]",
"dy_range": "[-1,1]",
},
},
}
return out, meta
def _resolve_target(target: Target) -> tuple[int, int, dict]:
if isinstance(target, PixelTarget):
x = target.x + target.dx
y = target.y + target.dy
return x, y, {"mode": "pixel", "source": target.model_dump()}
cell_w = target.region_width / target.cols
cell_h = target.region_height / target.rows
x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w))
y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h))
return x, y, {
"mode": "grid",
"source": target.model_dump(),
"derived": {"cell_width": cell_w, "cell_height": cell_h},
}
def _enforce_allowed_region(x: int, y: int):
region = SETTINGS["allowed_region"]
if region is None:
return
rx, ry, rw, rh = region
if not (rx <= x < rx + rw and ry <= y < ry + rh):
raise HTTPException(status_code=403, detail="point outside allowed region")
def _import_input_lib():
try:
import pyautogui
pyautogui.FAILSAFE = True
return pyautogui
except Exception as exc:
raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc
def _import_ocr_libs():
try:
import pytesseract
from pytesseract import Output
tesseract_cmd = SETTINGS["tesseract_cmd"]
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
return pytesseract, Output
except Exception as exc:
raise HTTPException(status_code=500, detail=f"ocr backend unavailable: {exc}") from exc
def _decode_image_base64(value: str):
try:
from PIL import Image
except Exception as exc:
raise HTTPException(status_code=500, detail=f"image decode backend unavailable: {exc}") from exc
payload = value.strip()
if payload.startswith("data:"):
parts = payload.split(",", 1)
if len(parts) != 2:
raise HTTPException(status_code=400, detail="invalid data URL image payload")
payload = parts[1]
try:
image_bytes = base64.b64decode(payload, validate=True)
except Exception as exc:
raise HTTPException(status_code=400, detail="invalid image_base64 payload") from exc
try:
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
except Exception as exc:
raise HTTPException(status_code=400, detail="unsupported or unreadable image bytes") from exc
return image
def _run_ocr(image, language_hint: str | None, min_confidence: float, offset_x: int = 0, offset_y: int = 0) -> list[dict]:
pytesseract, Output = _import_ocr_libs()
config = "--oem 3 --psm 6"
kwargs = {
"image": image,
"output_type": Output.DICT,
"config": config,
}
if language_hint:
kwargs["lang"] = language_hint
try:
data = pytesseract.image_to_data(**kwargs)
except pytesseract.TesseractNotFoundError as exc:
raise HTTPException(status_code=500, detail="tesseract executable not found") from exc
except pytesseract.TesseractError as exc:
raise HTTPException(status_code=400, detail=f"ocr failed: {exc}") from exc
blocks = []
count = len(data.get("text", []))
for idx in range(count):
text = (data["text"][idx] or "").strip()
if not text:
continue
raw_conf = str(data["conf"][idx]).strip()
try:
conf_0_100 = float(raw_conf)
except ValueError:
conf_0_100 = -1.0
if conf_0_100 < 0:
continue
confidence = round(conf_0_100 / 100.0, 4)
if confidence < min_confidence:
continue
left = int(data["left"][idx])
top = int(data["top"][idx])
width = int(data["width"][idx])
height = int(data["height"][idx])
blocks.append(
{
"text": text,
"confidence": confidence,
"bbox": {
"x": left + offset_x,
"y": top + offset_y,
"width": width,
"height": height,
},
"_sort": [top + offset_y, left + offset_x, idx],
}
)
blocks.sort(key=lambda b: (b["_sort"][0], b["_sort"][1], b["_sort"][2]))
for block in blocks:
block.pop("_sort", None)
return blocks
def _normalize_text(value: str) -> str:
return re.sub(r"\s+", " ", value).strip()
def _matches_text(haystack: str, needle: str, match_mode: str) -> bool:
if match_mode == "exact":
return haystack == needle
if match_mode == "regex":
return re.search(needle, haystack) is not None
return needle.lower() in haystack.lower()
def _windows_only(feature: str):
if sys.platform != "win32":
raise HTTPException(status_code=501, detail=f"{feature} is currently supported on Windows hosts only")
def _tasklist_process_name(pid: int) -> str | None:
try:
completed = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}", "/FO", "CSV", "/NH"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
except Exception:
return None
line = (completed.stdout or "").strip().splitlines()
if not line:
return None
row = line[0].strip()
if not row or row.startswith("INFO:"):
return None
if row.startswith('"') and '","' in row:
return row.split('","', 1)[0].strip('"')
return None
def _list_windows(query: WindowQuery | None = None) -> list[dict]:
_windows_only("window endpoints")
user32 = ctypes.windll.user32
user32.EnumWindows.restype = ctypes.c_bool
user32.EnumWindows.argtypes = [ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p), ctypes.c_void_p]
user32.IsWindowVisible.argtypes = [ctypes.c_void_p]
user32.IsWindowVisible.restype = ctypes.c_bool
user32.IsWindowEnabled.argtypes = [ctypes.c_void_p]
user32.IsWindowEnabled.restype = ctypes.c_bool
user32.IsIconic.argtypes = [ctypes.c_void_p]
user32.IsIconic.restype = ctypes.c_bool
user32.IsZoomed.argtypes = [ctypes.c_void_p]
user32.IsZoomed.restype = ctypes.c_bool
user32.GetWindowTextLengthW.argtypes = [ctypes.c_void_p]
user32.GetWindowTextLengthW.restype = ctypes.c_int
user32.GetWindowTextW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetClassNameW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetClassNameW.restype = ctypes.c_int
user32.GetForegroundWindow.restype = ctypes.c_void_p
user32.GetWindowRect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.RECT)]
foreground = int(user32.GetForegroundWindow() or 0)
title_regex = re.compile(query.title_regex, re.IGNORECASE) if query and query.title_regex else None
windows: list[dict] = []
enum_proc = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p)
def _callback(hwnd, _lparam):
hwnd_int = int(hwnd)
if query and query.hwnd is not None and hwnd_int != query.hwnd:
return True
title_len = user32.GetWindowTextLengthW(hwnd)
title_buf = ctypes.create_unicode_buffer(max(title_len + 1, 1))
user32.GetWindowTextW(hwnd, title_buf, len(title_buf))
title = title_buf.value
visible = bool(user32.IsWindowVisible(hwnd))
if query and query.visible_only and not visible:
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, len(class_buf))
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
process_name = _tasklist_process_name(int(pid.value))
rect = ctypes.wintypes.RECT()
user32.GetWindowRect(hwnd, ctypes.byref(rect))
window = {
"hwnd": hwnd_int,
"title": title,
"class_name": class_buf.value,
"pid": int(pid.value),
"process_name": process_name,
"visible": visible,
"enabled": bool(user32.IsWindowEnabled(hwnd)),
"minimized": bool(user32.IsIconic(hwnd)),
"maximized": bool(user32.IsZoomed(hwnd)),
"foreground": hwnd_int == foreground,
"rect": {
"x": int(rect.left),
"y": int(rect.top),
"width": int(rect.right - rect.left),
"height": int(rect.bottom - rect.top),
},
}
if query:
if query.title_contains and query.title_contains.lower() not in title.lower():
return True
if title_regex and not title_regex.search(title):
return True
if query.process_name and (process_name or "").lower() != query.process_name.lower():
return True
windows.append(window)
return True
user32.EnumWindows(enum_proc(_callback), 0)
windows.sort(key=lambda item: (not item["foreground"], item["title"].lower(), item["hwnd"]))
return windows
def _require_window_match(query: WindowQuery) -> dict:
matches = _list_windows(query)
if not matches:
raise HTTPException(status_code=404, detail="no matching window found")
if len(matches) > 1 and query.hwnd is None:
raise HTTPException(
status_code=409,
detail={"message": "multiple windows matched", "matches": matches[:10]},
)
return matches[0]
def _apply_window_action(req: WindowActionRequest) -> dict:
_windows_only("window endpoints")
match = _require_window_match(req)
hwnd = match["hwnd"]
user32 = ctypes.windll.user32
WM_CLOSE = 0x0010
SW_RESTORE = 9
SW_MINIMIZE = 6
SW_MAXIMIZE = 3
if req.action in {"focus", "restore"}:
user32.ShowWindow(hwnd, SW_RESTORE)
ok = bool(user32.SetForegroundWindow(hwnd))
elif req.action == "minimize":
ok = bool(user32.ShowWindow(hwnd, SW_MINIMIZE))
elif req.action == "maximize":
ok = bool(user32.ShowWindow(hwnd, SW_MAXIMIZE))
elif req.action == "close":
ok = bool(user32.PostMessageW(hwnd, WM_CLOSE, 0, 0))
else:
raise HTTPException(status_code=400, detail="unsupported window action")
deadline = time.time() + (req.timeout_ms / 1000.0)
final_match = None
while time.time() <= deadline:
current = _list_windows(WindowQuery(hwnd=hwnd, visible_only=False))
final_match = current[0] if current else None
if req.action == "close" and final_match is None:
break
if req.action in {"focus", "restore"} and final_match and final_match["foreground"] and not final_match["minimized"]:
break
if req.action == "minimize" and final_match and final_match["minimized"]:
break
if req.action == "maximize" and final_match and final_match["maximized"]:
break
time.sleep(0.1)
return {
"ok": ok,
"matched": match,
"window": final_match,
"closed": final_match is None,
}
def _launch_app(req: LaunchRequest) -> dict:
if req.cwd:
cwd = os.path.abspath(req.cwd)
if not os.path.isdir(cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
else:
cwd = None
argv = [req.executable, *req.args]
if SETTINGS["dry_run"] or req.dry_run:
return {"executed": False, "dry_run": True, "argv": argv, "cwd": cwd}
try:
proc = subprocess.Popen(argv, cwd=cwd)
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"executable not found: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=400, detail=f"failed to launch process: {exc}") from exc
result = {
"executed": True,
"dry_run": False,
"argv": argv,
"cwd": cwd,
"pid": proc.pid,
}
if req.wait_for_window:
query = req.match or WindowQuery(process_name=os.path.basename(req.executable), visible_only=True)
deadline = time.time() + (req.timeout_ms / 1000.0)
match = None
while time.time() <= deadline:
matches = _list_windows(query)
if matches:
match = matches[0]
break
time.sleep(0.2)
result["window"] = match
result["window_found"] = match is not None
return result
def _capture_region_image(screen: int, region_x: int | None, region_y: int | None, region_width: int | None, region_height: int | None):
base_img, mon, displays, screen_selection = _capture_screen(screen)
if None in {region_x, region_y, region_width, region_height}:
return base_img, {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}, mon, displays, screen_selection
left = region_x - mon["x"]
top = region_y - mon["y"]
right = left + region_width
bottom = top + region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
crop = base_img.crop((left, top, right, bottom))
region = {"x": region_x, "y": region_y, "width": region_width, "height": region_height}
return crop, region, mon, displays, screen_selection
def _capture_ocr_source(req: OCRRequest, screen: int = 0):
source = req.mode
if source == "image":
image = _decode_image_base64(req.image_base64 or "")
region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]}
return image, region, None, None, None, source
base_img, mon, displays, screen_selection = _capture_screen(screen)
if source == "screen":
image = base_img
region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}
return image, region, mon, displays, screen_selection, source
left = req.region_x - mon["x"]
top = req.region_y - mon["y"]
right = left + req.region_width
bottom = top + req.region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
image = base_img.crop((left, top, right, bottom))
region = {
"x": req.region_x,
"y": req.region_y,
"width": req.region_width,
"height": req.region_height,
}
return image, region, mon, displays, screen_selection, source
def _image_diff_ratio(before, after) -> float:
diff = ImageChops.difference(before, after)
stat = ImageStat.Stat(diff)
channel_means = stat.mean if isinstance(stat.mean, list) else [stat.mean]
return float(sum(channel_means) / (len(channel_means) * 255.0))
def _merge_bbox(blocks: list[dict]) -> dict:
xs = [b["bbox"]["x"] for b in blocks]
ys = [b["bbox"]["y"] for b in blocks]
rights = [b["bbox"]["x"] + b["bbox"]["width"] for b in blocks]
bottoms = [b["bbox"]["y"] + b["bbox"]["height"] for b in blocks]
return {
"x": min(xs),
"y": min(ys),
"width": max(rights) - min(xs),
"height": max(bottoms) - min(ys),
}
def _group_ocr_lines(blocks: list[dict]) -> list[dict]:
if not blocks:
return []
sorted_blocks = sorted(blocks, key=lambda b: (b["bbox"]["y"], b["bbox"]["x"]))
lines: list[list[dict]] = []
current: list[dict] = []
current_center = None
for block in sorted_blocks:
bbox = block["bbox"]
center_y = bbox["y"] + (bbox["height"] / 2)
tolerance = max(10.0, bbox["height"] * 0.8)
if current and current_center is not None and abs(center_y - current_center) > tolerance:
lines.append(sorted(current, key=lambda item: item["bbox"]["x"]))
current = []
current_center = None
current.append(block)
current_center = sum(item["bbox"]["y"] + (item["bbox"]["height"] / 2) for item in current) / len(current)
if current:
lines.append(sorted(current, key=lambda item: item["bbox"]["x"]))
grouped = []
for idx, line_blocks in enumerate(lines):
text = " ".join(item["text"] for item in line_blocks).strip()
if not text:
continue
grouped.append(
{
"text": text,
"confidence": round(sum(item["confidence"] for item in line_blocks) / len(line_blocks), 4),
"bbox": _merge_bbox(line_blocks),
"blocks": line_blocks,
"line_index": idx,
}
)
return grouped
def _find_text_matches(blocks: list[dict], query: str, match_mode: str, group_lines: bool, max_results: int) -> list[dict]:
target = _normalize_text(query)
candidates = _group_ocr_lines(blocks) if group_lines else blocks
matches = []
for item in candidates:
normalized = _normalize_text(item["text"])
if not normalized:
continue
if _matches_text(normalized, target, match_mode):
match = {
"text": item["text"],
"normalized_text": normalized,
"confidence": item["confidence"],
"bbox": item["bbox"],
"grouped": group_lines,
}
if group_lines:
match["blocks"] = item["blocks"]
match["line_index"] = item["line_index"]
matches.append(match)
matches.sort(key=lambda item: (-item["confidence"], item["bbox"]["y"], item["bbox"]["x"]))
return matches[:max_results]
def _compute_visual_diff(req: VisionDiffRequest, screen: int = 0) -> dict:
if req.mode == "image":
before = _decode_image_base64(req.before_image_base64 or "")
after = _decode_image_base64(req.after_image_base64 or "")
if before.size != after.size:
raise HTTPException(status_code=400, detail="before and after images must have matching dimensions")
diff_ratio = _image_diff_ratio(before, after)
return {
"mode": req.mode,
"region": {"x": 0, "y": 0, "width": before.size[0], "height": before.size[1]},
"diff_ratio": diff_ratio,
"changed": diff_ratio >= req.diff_threshold,
"diff_threshold": req.diff_threshold,
}
before, region, mon, displays, screen_selection = _capture_region_image(
screen,
req.region_x,
req.region_y,
req.region_width,
req.region_height,
)
if req.delay_ms > 0:
time.sleep(req.delay_ms / 1000.0)
after, _, _, _, _ = _capture_region_image(
screen,
region["x"],
region["y"],
region["width"],
region["height"],
)
diff_ratio = _image_diff_ratio(before, after)
return {
"mode": req.mode,
"region": region,
"diff_ratio": diff_ratio,
"changed": diff_ratio >= req.diff_threshold,
"diff_threshold": req.diff_threshold,
"screen": screen_selection,
"display": mon,
"delay_ms": req.delay_ms,
}
def _measure_stability(req: VisionStabilityRequest, screen: int = 0) -> dict:
baseline, region, mon, displays, screen_selection = _capture_region_image(
screen,
req.region_x,
req.region_y,
req.region_width,
req.region_height,
)
sample_count = 0
max_diff_ratio = 0.0
diffs = []
deadline = time.time() + (req.duration_ms / 1000.0)
while time.time() < deadline:
time.sleep(req.sample_interval_ms / 1000.0)
current, _, _, _, _ = _capture_region_image(
screen,
region["x"],
region["y"],
region["width"],
region["height"],
)
diff_ratio = _image_diff_ratio(baseline, current)
diffs.append(diff_ratio)
max_diff_ratio = max(max_diff_ratio, diff_ratio)
sample_count += 1
baseline = current
return {
"stable": max_diff_ratio <= req.diff_threshold,
"region": region,
"sample_count": sample_count,
"max_diff_ratio": max_diff_ratio,
"avg_diff_ratio": round(sum(diffs) / len(diffs), 6) if diffs else 0.0,
"diff_threshold": req.diff_threshold,
"duration_ms": req.duration_ms,
"sample_interval_ms": req.sample_interval_ms,
"screen": screen_selection,
"display": mon,
}
def _run_verified_action(req: VerifyActionRequest, screen: int = 0) -> dict:
attempts = []
for attempt in range(req.retries + 1):
action_ok = True
action_result = None
action_error = None
try:
action_result = _exec_action(req.action, screen)
except Exception as exc:
action_ok = False
action_error = str(exc)
if req.stop_on_action_error:
attempts.append(
{
"attempt": attempt,
"action_ok": action_ok,
"action_error": action_error,
"verification": None,
}
)
return {"success": False, "attempts": attempts, "final_attempt": attempt}
verification = _wait_for_condition(
WaitRequest(
condition=req.condition,
timeout_ms=req.timeout_ms,
poll_interval_ms=req.poll_interval_ms,
),
screen,
)
attempts.append(
{
"attempt": attempt,
"action_ok": action_ok,
"action_error": action_error,
"action_result": action_result,
"verification": verification,
}
)
if verification.get("satisfied"):
return {"success": True, "attempts": attempts, "final_attempt": attempt}
if attempt < req.retries and req.retry_delay_ms > 0:
time.sleep(req.retry_delay_ms / 1000.0)
return {"success": False, "attempts": attempts, "final_attempt": req.retries}
def _wait_for_condition(req: WaitRequest, screen: int = 0) -> dict:
condition = req.condition
deadline = time.time() + (req.timeout_ms / 1000.0)
polls = 0
if isinstance(condition, WaitVisualCondition):
baseline, region, mon, displays, screen_selection = _capture_region_image(
screen,
condition.region_x,
condition.region_y,
condition.region_width,
condition.region_height,
)
stable_since = None
last_diff = 0.0
while True:
if time.time() > deadline:
return {
"satisfied": False,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"region": region,
"diff_ratio": last_diff,
"screen": screen_selection,
"display": mon,
}
time.sleep(req.poll_interval_ms / 1000.0)
current, _, _, _, _ = _capture_region_image(
screen,
region["x"],
region["y"],
region["width"],
region["height"],
)
polls += 1
last_diff = _image_diff_ratio(baseline, current)
if condition.state == "change":
if last_diff >= condition.diff_threshold:
return {
"satisfied": True,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"region": region,
"diff_ratio": last_diff,
"screen": screen_selection,
"display": mon,
}
else:
if last_diff <= condition.diff_threshold:
stable_since = stable_since or time.time()
if (time.time() - stable_since) * 1000 >= condition.stable_for_ms:
return {
"satisfied": True,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"region": region,
"diff_ratio": last_diff,
"stable_for_ms": int((time.time() - stable_since) * 1000),
"screen": screen_selection,
"display": mon,
}
else:
stable_since = None
baseline = current
while True:
if isinstance(condition, WaitWindowCondition):
matches = _list_windows(condition)
polls += 1
satisfied = False
if condition.state == "exists":
satisfied = bool(matches)
elif condition.state == "focused":
satisfied = any(item["foreground"] for item in matches)
elif condition.state == "closed":
satisfied = not matches
if satisfied:
return {
"satisfied": True,
"kind": condition.kind,
"state": condition.state,
"polls": polls,
"matches": matches[:10],
}
elif isinstance(condition, WaitTextCondition):
image, region, mon, displays, screen_selection = _capture_region_image(
screen,
condition.region_x,
condition.region_y,
condition.region_width,
condition.region_height,
)
blocks = _run_ocr(
image,
condition.language_hint,
condition.min_confidence,
region["x"],
region["y"],
)
polls += 1
matched = []
for block in blocks:
normalized = _normalize_text(block["text"])
target = _normalize_text(condition.text)
if _matches_text(normalized, target, condition.match):
matched.append(block)
satisfied = bool(matched) if condition.present else not bool(matched)
if satisfied:
return {
"satisfied": True,
"kind": condition.kind,
"mode": condition.mode,
"polls": polls,
"region": region,
"matches": matched,
"screen": screen_selection,
"display": mon,
}
else:
raise HTTPException(status_code=400, detail="unsupported wait condition")
if time.time() > deadline:
return {
"satisfied": False,
"kind": condition.kind,
"polls": polls,
}
time.sleep(req.poll_interval_ms / 1000.0)
def _pick_shell(explicit_shell: str | None) -> str:
shell_name = (explicit_shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip()
if shell_name not in {"powershell", "bash", "cmd"}:
raise HTTPException(status_code=400, detail="unsupported shell")
return shell_name
def _truncate_text(text: str, limit: int) -> tuple[str, bool]:
if len(text) <= limit:
return text, False
return text[:limit], True
def _resolve_exec_program(shell_name: str, command: str) -> list[str]:
if shell_name == "powershell":
return ["powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command]
if shell_name == "bash":
return ["bash", "-lc", command]
if shell_name == "cmd":
return ["cmd", "/c", command]
raise HTTPException(status_code=400, detail="unsupported shell")
def _exec_command(req: ExecRequest) -> dict:
if not SETTINGS["exec_enabled"]:
raise HTTPException(status_code=403, detail="exec endpoint disabled")
if not SETTINGS["exec_secret"]:
raise HTTPException(status_code=403, detail="exec secret not configured")
run_dry = SETTINGS["dry_run"] or req.dry_run
shell_name = _pick_shell(req.shell)
timeout_s = req.timeout_s if req.timeout_s is not None else SETTINGS["exec_default_timeout_s"]
timeout_s = min(timeout_s, SETTINGS["exec_max_timeout_s"])
cwd = None
if req.cwd:
cwd = os.path.abspath(req.cwd)
if not os.path.isdir(cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
argv = _resolve_exec_program(shell_name, req.command)
if run_dry:
return {
"executed": False,
"dry_run": True,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
}
start = time.time()
try:
completed = subprocess.run(
argv,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout_s,
check=False,
)
except subprocess.TimeoutExpired as exc:
stdout = exc.stdout or ""
stderr = exc.stderr or ""
stdout, stdout_truncated = _truncate_text(str(stdout), SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(str(stderr), SETTINGS["exec_max_output_chars"])
return {
"executed": True,
"timed_out": True,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
"duration_ms": int((time.time() - start) * 1000),
"exit_code": None,
"stdout": stdout,
"stderr": stderr,
"stdout_truncated": stdout_truncated,
"stderr_truncated": stderr_truncated,
}
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"shell executable not found: {exc}") from exc
stdout, stdout_truncated = _truncate_text(completed.stdout or "", SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(completed.stderr or "", SETTINGS["exec_max_output_chars"])
return {
"executed": True,
"timed_out": False,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
"duration_ms": int((time.time() - start) * 1000),
"exit_code": completed.returncode,
"stdout": stdout,
"stderr": stderr,
"stdout_truncated": stdout_truncated,
"stderr_truncated": stderr_truncated,
}
def _exec_action(req: ActionRequest, screen: int = 0) -> dict:
run_dry = SETTINGS["dry_run"] or req.dry_run
selected_display, displays, screen_selection = _select_display(screen)
pyautogui = None if run_dry else _import_input_lib()
resolved_target = None
if req.target is not None:
x, y, info = _resolve_target(req.target)
_enforce_allowed_region(x, y)
resolved_target = {"x": x, "y": y, "target_info": info}
duration_sec = req.duration_ms / 1000.0
if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for pointer actions")
if req.action == "scroll" and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for scroll")
if not run_dry:
if req.action == "move":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
elif req.action == "click":
pyautogui.click(
x=resolved_target["x"],
y=resolved_target["y"],
clicks=req.clicks,
interval=req.interval_ms / 1000.0,
button=req.button,
duration=duration_sec,
)
elif req.action == "right_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec)
elif req.action == "double_click":
pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0)
elif req.action == "middle_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec)
elif req.action == "scroll":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
pyautogui.scroll(req.scroll_amount)
elif req.action == "type":
pyautogui.write(req.text, interval=req.interval_ms / 1000.0)
elif req.action == "hotkey":
if len(req.keys) < 1:
raise HTTPException(status_code=400, detail="keys is required for hotkey")
pyautogui.hotkey(*req.keys)
return {
"action": req.action,
"executed": not run_dry,
"dry_run": run_dry,
"screen": screen_selection,
"display": selected_display,
"resolved_target": resolved_target,
}
@app.get("/health")
def health(_: None = Depends(_auth)):
return {
"ok": True,
"service": "clickthrough",
"version": app.version,
"time_ms": _now_ms(),
"request_id": _request_id(),
"dry_run": SETTINGS["dry_run"],
"allowed_region": SETTINGS["allowed_region"],
"exec": {
"enabled": SETTINGS["exec_enabled"],
"secret_configured": bool(SETTINGS["exec_secret"]),
"default_shell": SETTINGS["exec_default_shell"],
"default_timeout_s": SETTINGS["exec_default_timeout_s"],
"max_timeout_s": SETTINGS["exec_max_timeout_s"],
},
}
@app.get("/displays")
def displays(_: None = Depends(_auth)):
detected = _get_displays()
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"displays": detected,
"default_screen": 0,
}
@app.get("/screen")
def screen(
with_grid: bool = True,
grid_rows: int = SETTINGS["default_grid_rows"],
grid_cols: int = SETTINGS["default_grid_cols"],
include_labels: bool = True,
image_format: Literal["png", "jpeg"] = "png",
jpeg_quality: int = 85,
asImage: bool = False,
screen: int = 0,
_: None = Depends(_auth),
):
req = ScreenRequest(
with_grid=with_grid,
grid_rows=grid_rows,
grid_cols=grid_cols,
include_labels=include_labels,
image_format=image_format,
jpeg_quality=jpeg_quality,
)
base_img, mon, displays, screen_selection = _capture_screen(screen)
meta = {"region": mon, "screen": screen_selection, "displays": displays}
out_img = base_img
if req.with_grid:
out_img, grid_meta = _draw_grid(base_img, mon["x"], mon["y"], req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
if asImage:
image_bytes = _serialize_image(out_img, req.image_format, req.jpeg_quality)
media_type = "image/jpeg" if req.image_format == "jpeg" else "image/png"
return Response(content=image_bytes, media_type=media_type)
encoded = _encode_image(out_img, req.image_format, req.jpeg_quality)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"image": {
"format": req.image_format,
"base64": encoded,
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
@app.post("/zoom")
def zoom(req: ZoomRequest, asImage: bool = False, screen: int = 0, _: None = Depends(_auth)):
base_img, mon, displays, screen_selection = _capture_screen(screen)
cx = req.center_x - mon["x"]
cy = req.center_y - mon["y"]
half_w = req.width // 2
half_h = req.height // 2
left = max(0, cx - half_w)
top = max(0, cy - half_h)
right = min(base_img.size[0], left + req.width)
bottom = min(base_img.size[1], top + req.height)
crop = base_img.crop((left, top, right, bottom))
region_x = mon["x"] + left
region_y = mon["y"] + top
meta = {
"source_monitor": mon,
"screen": screen_selection,
"displays": displays,
"region": {
"x": region_x,
"y": region_y,
"width": crop.size[0],
"height": crop.size[1],
},
}
out_img = crop
if req.with_grid:
out_img, grid_meta = _draw_grid(crop, region_x, region_y, req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
if asImage:
image_bytes = _serialize_image(out_img, req.image_format, req.jpeg_quality)
media_type = "image/jpeg" if req.image_format == "jpeg" else "image/png"
return Response(content=image_bytes, media_type=media_type)
encoded = _encode_image(out_img, req.image_format, req.jpeg_quality)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"image": {
"format": req.image_format,
"base64": encoded,
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
@app.post("/action")
def action(req: ActionRequest, screen: int = 0, _: None = Depends(_auth)):
result = _exec_action(req, screen)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/exec")
def exec_command(
req: ExecRequest,
x_clickthrough_exec_secret: Optional[str] = Header(default=None),
_: None = Depends(_auth),
):
expected = SETTINGS["exec_secret"]
if not expected:
raise HTTPException(status_code=403, detail="exec secret not configured")
if not x_clickthrough_exec_secret or not hmac.compare_digest(x_clickthrough_exec_secret, expected):
raise HTTPException(status_code=401, detail="invalid exec secret")
result = _exec_command(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.get("/windows")
def windows(
title_contains: str | None = None,
title_regex: str | None = None,
process_name: str | None = None,
hwnd: int | None = None,
visible_only: bool = True,
_: None = Depends(_auth),
):
query = WindowQuery(
title_contains=title_contains,
title_regex=title_regex,
process_name=process_name,
hwnd=hwnd,
visible_only=visible_only,
)
matches = _list_windows(query)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"windows": matches,
"count": len(matches),
}
@app.post("/windows/action")
def window_action(req: WindowActionRequest, _: None = Depends(_auth)):
result = _apply_window_action(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/launch")
def launch(req: LaunchRequest, _: None = Depends(_auth)):
result = _launch_app(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/wait")
def wait(req: WaitRequest, screen: int = 0, _: None = Depends(_auth)):
result = _wait_for_condition(req, screen)
return {
"ok": result.get("satisfied", False),
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/vision/diff")
def vision_diff(req: VisionDiffRequest, screen: int = 0, _: None = Depends(_auth)):
result = _compute_visual_diff(req, screen)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/vision/stability")
def vision_stability(req: VisionStabilityRequest, screen: int = 0, _: None = Depends(_auth)):
result = _measure_stability(req, screen)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/action/verify")
def action_verify(req: VerifyActionRequest, screen: int = 0, _: None = Depends(_auth)):
result = _run_verified_action(req, screen)
return {
"ok": result.get("success", False),
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/ocr")
def ocr(req: OCRRequest, screen: int = 0, _: None = Depends(_auth)):
image, region, mon, displays, screen_selection, source = _capture_ocr_source(req, screen)
offset_x = region["x"] if source != "image" else 0
offset_y = region["y"] if source != "image" else 0
blocks = _run_ocr(image, req.language_hint, req.min_confidence, offset_x, offset_y)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": {
"mode": source,
"screen": screen_selection if source != "image" else None,
"display": mon if source != "image" else None,
"language_hint": req.language_hint,
"min_confidence": req.min_confidence,
"region": region,
"blocks": blocks,
},
}
@app.post("/ocr/find")
def ocr_find(req: OCRFindRequest, screen: int = 0, _: None = Depends(_auth)):
image, region, mon, displays, screen_selection, source = _capture_ocr_source(req, screen)
offset_x = region["x"] if source != "image" else 0
offset_y = region["y"] if source != "image" else 0
blocks = _run_ocr(image, req.language_hint, req.min_confidence, offset_x, offset_y)
matches = _find_text_matches(blocks, req.query, req.match, req.group_lines, req.max_results)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": {
"mode": source,
"screen": screen_selection if source != "image" else None,
"display": mon if source != "image" else None,
"language_hint": req.language_hint,
"min_confidence": req.min_confidence,
"query": req.query,
"match": req.match,
"group_lines": req.group_lines,
"region": region,
"matches": matches,
"match_count": len(matches),
"blocks_considered": len(blocks),
},
}
@app.post("/batch")
def batch(req: BatchRequest, screen: int = 0, _: None = Depends(_auth)):
results = []
for index, item in enumerate(req.actions):
try:
item_result = _exec_action(item, screen)
results.append({"index": index, "ok": True, "result": item_result})
except Exception as exc:
results.append({"index": index, "ok": False, "error": str(exc)})
if req.stop_on_error:
break
return {
"ok": all(r["ok"] for r in results),
"request_id": _request_id(),
"time_ms": _now_ms(),
"results": results,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("server.app:app", host=SETTINGS["host"], port=SETTINGS["port"], reload=False)