import base64 import io import os import subprocess import time import uuid from typing import Literal, Optional from dotenv import load_dotenv from fastapi import Depends, FastAPI, Header, HTTPException, Response from pydantic import BaseModel, Field, model_validator load_dotenv(dotenv_path=".env", override=False) app = FastAPI(title="clickthrough", version="0.1.0") def _env_bool(name: str, default: bool) -> bool: raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]: raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION") if not raw: return None parts = [p.strip() for p in raw.split(",")] if len(parts) != 4: raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height") x, y, w, h = (int(p) for p in parts) if w <= 0 or h <= 0: raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0") return x, y, w, h SETTINGS = { "host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"), "port": int(os.getenv("CLICKTHROUGH_PORT", "8123")), "token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(), "dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False), "default_grid_rows": int(os.getenv("CLICKTHROUGH_GRID_ROWS", "12")), "default_grid_cols": int(os.getenv("CLICKTHROUGH_GRID_COLS", "12")), "allowed_region": _parse_allowed_region(), "exec_enabled": _env_bool("CLICKTHROUGH_EXEC_ENABLED", True), "exec_default_shell": os.getenv("CLICKTHROUGH_EXEC_DEFAULT_SHELL", "powershell").strip().lower(), "exec_default_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_TIMEOUT_S", "30")), "exec_max_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_MAX_TIMEOUT_S", "120")), "exec_max_output_chars": int(os.getenv("CLICKTHROUGH_EXEC_MAX_OUTPUT_CHARS", "20000")), } class ScreenRequest(BaseModel): with_grid: bool = True grid_rows: int = Field(default=SETTINGS["default_grid_rows"], ge=1, le=200) grid_cols: int = Field(default=SETTINGS["default_grid_cols"], ge=1, le=200) include_labels: bool = True image_format: Literal["png", "jpeg"] = "png" jpeg_quality: int = Field(default=85, ge=1, le=100) class ZoomRequest(BaseModel): center_x: int = Field(ge=0) center_y: int = Field(ge=0) width: int = Field(default=500, ge=10) height: int = Field(default=350, ge=10) with_grid: bool = True grid_rows: int = Field(default=20, ge=1, le=300) grid_cols: int = Field(default=20, ge=1, le=300) include_labels: bool = True image_format: Literal["png", "jpeg"] = "png" jpeg_quality: int = Field(default=90, ge=1, le=100) class PixelTarget(BaseModel): mode: Literal["pixel"] x: int y: int dx: int = 0 dy: int = 0 class GridTarget(BaseModel): mode: Literal["grid"] region_x: int region_y: int region_width: int = Field(gt=0) region_height: int = Field(gt=0) rows: int = Field(gt=0) cols: int = Field(gt=0) row: int = Field(ge=0) col: int = Field(ge=0) dx: float = 0.0 dy: float = 0.0 @model_validator(mode="after") def _validate_indices(self): if self.row >= self.rows or self.col >= self.cols: raise ValueError("row/col must be inside rows/cols") if not -1.0 <= self.dx <= 1.0: raise ValueError("dx must be in [-1, 1]") if not -1.0 <= self.dy <= 1.0: raise ValueError("dy must be in [-1, 1]") return self Target = PixelTarget | GridTarget class ActionRequest(BaseModel): action: Literal[ "move", "click", "right_click", "double_click", "middle_click", "scroll", "type", "hotkey", ] target: Optional[Target] = None duration_ms: int = Field(default=0, ge=0, le=20000) button: Literal["left", "right", "middle"] = "left" clicks: int = Field(default=1, ge=1, le=10) scroll_amount: int = 0 text: str = "" keys: list[str] = Field(default_factory=list) interval_ms: int = Field(default=20, ge=0, le=5000) dry_run: bool = False class BatchRequest(BaseModel): actions: list[ActionRequest] = Field(min_length=1, max_length=100) stop_on_error: bool = True class ExecRequest(BaseModel): command: str = Field(min_length=1, max_length=10000) shell: Literal["powershell", "bash", "cmd"] | None = None timeout_s: int | None = Field(default=None, ge=1, le=600) cwd: str | None = None dry_run: bool = False def _auth(x_clickthrough_token: Optional[str] = Header(default=None)): token = SETTINGS["token"] if token and x_clickthrough_token != token: raise HTTPException(status_code=401, detail="invalid token") def _now_ms() -> int: return int(time.time() * 1000) def _request_id() -> str: return str(uuid.uuid4()) def _import_capture_libs(): try: from PIL import Image, ImageDraw import mss return Image, ImageDraw, mss except Exception as exc: raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc def _capture_screen(): Image, _, mss = _import_capture_libs() with mss.mss() as sct: mon = sct.monitors[1] shot = sct.grab(mon) image = Image.frombytes("RGB", shot.size, shot.rgb) return image, {"x": mon["left"], "y": mon["top"], "width": mon["width"], "height": mon["height"]} def _serialize_image(image, image_format: str, jpeg_quality: int) -> bytes: buf = io.BytesIO() if image_format == "jpeg": image.save(buf, format="JPEG", quality=jpeg_quality) else: image.save(buf, format="PNG") return buf.getvalue() def _encode_image(image, image_format: str, jpeg_quality: int) -> str: return base64.b64encode(_serialize_image(image, image_format, jpeg_quality)).decode("ascii") def _draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool): _, ImageDraw, _ = _import_capture_libs() out = image.copy() draw = ImageDraw.Draw(out) w, h = out.size cell_w = w / cols cell_h = h / rows for c in range(1, cols): x = int(round(c * cell_w)) draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1) for r in range(1, rows): y = int(round(r * cell_h)) draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1) draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2) if include_labels: for r in range(rows): for c in range(cols): cx = int((c + 0.5) * cell_w) cy = int((r + 0.5) * cell_h) label = f"{r},{c}" draw.text((cx - 12, cy - 6), label, fill=(255, 255, 0)) meta = { "region": {"x": region_x, "y": region_y, "width": w, "height": h}, "grid": { "rows": rows, "cols": cols, "cell_width": cell_w, "cell_height": cell_h, "indexing": "zero-based", "point_formula": { "pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)", "pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)", "dx_range": "[-1,1]", "dy_range": "[-1,1]", }, }, } return out, meta def _resolve_target(target: Target) -> tuple[int, int, dict]: if isinstance(target, PixelTarget): x = target.x + target.dx y = target.y + target.dy return x, y, {"mode": "pixel", "source": target.model_dump()} cell_w = target.region_width / target.cols cell_h = target.region_height / target.rows x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w)) y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h)) return x, y, { "mode": "grid", "source": target.model_dump(), "derived": {"cell_width": cell_w, "cell_height": cell_h}, } def _enforce_allowed_region(x: int, y: int): region = SETTINGS["allowed_region"] if region is None: return rx, ry, rw, rh = region if not (rx <= x < rx + rw and ry <= y < ry + rh): raise HTTPException(status_code=403, detail="point outside allowed region") def _import_input_lib(): try: import pyautogui pyautogui.FAILSAFE = True return pyautogui except Exception as exc: raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc def _pick_shell(explicit_shell: str | None) -> str: shell_name = (explicit_shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip() if shell_name not in {"powershell", "bash", "cmd"}: raise HTTPException(status_code=400, detail="unsupported shell") return shell_name def _truncate_text(text: str, limit: int) -> tuple[str, bool]: if len(text) <= limit: return text, False return text[:limit], True def _resolve_exec_program(shell_name: str, command: str) -> list[str]: if shell_name == "powershell": return ["powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command] if shell_name == "bash": return ["bash", "-lc", command] if shell_name == "cmd": return ["cmd", "/c", command] raise HTTPException(status_code=400, detail="unsupported shell") def _exec_command(req: ExecRequest) -> dict: if not SETTINGS["exec_enabled"]: raise HTTPException(status_code=403, detail="exec endpoint disabled") run_dry = SETTINGS["dry_run"] or req.dry_run shell_name = _pick_shell(req.shell) timeout_s = req.timeout_s if req.timeout_s is not None else SETTINGS["exec_default_timeout_s"] timeout_s = min(timeout_s, SETTINGS["exec_max_timeout_s"]) cwd = None if req.cwd: cwd = os.path.abspath(req.cwd) if not os.path.isdir(cwd): raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory") argv = _resolve_exec_program(shell_name, req.command) if run_dry: return { "executed": False, "dry_run": True, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd, } start = time.time() try: completed = subprocess.run( argv, cwd=cwd, capture_output=True, text=True, timeout=timeout_s, check=False, ) except subprocess.TimeoutExpired as exc: stdout = exc.stdout or "" stderr = exc.stderr or "" stdout, stdout_truncated = _truncate_text(str(stdout), SETTINGS["exec_max_output_chars"]) stderr, stderr_truncated = _truncate_text(str(stderr), SETTINGS["exec_max_output_chars"]) return { "executed": True, "timed_out": True, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd, "duration_ms": int((time.time() - start) * 1000), "exit_code": None, "stdout": stdout, "stderr": stderr, "stdout_truncated": stdout_truncated, "stderr_truncated": stderr_truncated, } except FileNotFoundError as exc: raise HTTPException(status_code=400, detail=f"shell executable not found: {exc}") from exc stdout, stdout_truncated = _truncate_text(completed.stdout or "", SETTINGS["exec_max_output_chars"]) stderr, stderr_truncated = _truncate_text(completed.stderr or "", SETTINGS["exec_max_output_chars"]) return { "executed": True, "timed_out": False, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd, "duration_ms": int((time.time() - start) * 1000), "exit_code": completed.returncode, "stdout": stdout, "stderr": stderr, "stdout_truncated": stdout_truncated, "stderr_truncated": stderr_truncated, } def _exec_action(req: ActionRequest) -> dict: run_dry = SETTINGS["dry_run"] or req.dry_run pyautogui = None if run_dry else _import_input_lib() resolved_target = None if req.target is not None: x, y, info = _resolve_target(req.target) _enforce_allowed_region(x, y) resolved_target = {"x": x, "y": y, "target_info": info} duration_sec = req.duration_ms / 1000.0 if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None: raise HTTPException(status_code=400, detail="target is required for pointer actions") if req.action == "scroll" and resolved_target is None: raise HTTPException(status_code=400, detail="target is required for scroll") if not run_dry: if req.action == "move": pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec) elif req.action == "click": pyautogui.click( x=resolved_target["x"], y=resolved_target["y"], clicks=req.clicks, interval=req.interval_ms / 1000.0, button=req.button, duration=duration_sec, ) elif req.action == "right_click": pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec) elif req.action == "double_click": pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0) elif req.action == "middle_click": pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec) elif req.action == "scroll": pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec) pyautogui.scroll(req.scroll_amount) elif req.action == "type": pyautogui.write(req.text, interval=req.interval_ms / 1000.0) elif req.action == "hotkey": if len(req.keys) < 1: raise HTTPException(status_code=400, detail="keys is required for hotkey") pyautogui.hotkey(*req.keys) return { "action": req.action, "executed": not run_dry, "dry_run": run_dry, "resolved_target": resolved_target, } @app.get("/health") def health(_: None = Depends(_auth)): return { "ok": True, "service": "clickthrough", "version": app.version, "time_ms": _now_ms(), "request_id": _request_id(), "dry_run": SETTINGS["dry_run"], "allowed_region": SETTINGS["allowed_region"], "exec": { "enabled": SETTINGS["exec_enabled"], "default_shell": SETTINGS["exec_default_shell"], "default_timeout_s": SETTINGS["exec_default_timeout_s"], "max_timeout_s": SETTINGS["exec_max_timeout_s"], }, } @app.get("/screen") def screen( with_grid: bool = True, grid_rows: int = SETTINGS["default_grid_rows"], grid_cols: int = SETTINGS["default_grid_cols"], include_labels: bool = True, image_format: Literal["png", "jpeg"] = "png", jpeg_quality: int = 85, asImage: bool = False, _: None = Depends(_auth), ): req = ScreenRequest( with_grid=with_grid, grid_rows=grid_rows, grid_cols=grid_cols, include_labels=include_labels, image_format=image_format, jpeg_quality=jpeg_quality, ) base_img, mon = _capture_screen() meta = {"region": mon} out_img = base_img if req.with_grid: out_img, grid_meta = _draw_grid(base_img, mon["x"], mon["y"], req.grid_rows, req.grid_cols, req.include_labels) meta.update(grid_meta) if asImage: image_bytes = _serialize_image(out_img, req.image_format, req.jpeg_quality) media_type = "image/jpeg" if req.image_format == "jpeg" else "image/png" return Response(content=image_bytes, media_type=media_type) encoded = _encode_image(out_img, req.image_format, req.jpeg_quality) return { "ok": True, "request_id": _request_id(), "time_ms": _now_ms(), "image": { "format": req.image_format, "base64": encoded, "width": out_img.size[0], "height": out_img.size[1], }, "meta": meta, } @app.post("/zoom") def zoom(req: ZoomRequest, asImage: bool = False, _: None = Depends(_auth)): base_img, mon = _capture_screen() cx = req.center_x - mon["x"] cy = req.center_y - mon["y"] half_w = req.width // 2 half_h = req.height // 2 left = max(0, cx - half_w) top = max(0, cy - half_h) right = min(base_img.size[0], left + req.width) bottom = min(base_img.size[1], top + req.height) crop = base_img.crop((left, top, right, bottom)) region_x = mon["x"] + left region_y = mon["y"] + top meta = { "source_monitor": mon, "region": { "x": region_x, "y": region_y, "width": crop.size[0], "height": crop.size[1], }, } out_img = crop if req.with_grid: out_img, grid_meta = _draw_grid(crop, region_x, region_y, req.grid_rows, req.grid_cols, req.include_labels) meta.update(grid_meta) if asImage: image_bytes = _serialize_image(out_img, req.image_format, req.jpeg_quality) media_type = "image/jpeg" if req.image_format == "jpeg" else "image/png" return Response(content=image_bytes, media_type=media_type) encoded = _encode_image(out_img, req.image_format, req.jpeg_quality) return { "ok": True, "request_id": _request_id(), "time_ms": _now_ms(), "image": { "format": req.image_format, "base64": encoded, "width": out_img.size[0], "height": out_img.size[1], }, "meta": meta, } @app.post("/action") def action(req: ActionRequest, _: None = Depends(_auth)): result = _exec_action(req) return { "ok": True, "request_id": _request_id(), "time_ms": _now_ms(), "result": result, } @app.post("/exec") def exec_command(req: ExecRequest, _: None = Depends(_auth)): result = _exec_command(req) return { "ok": True, "request_id": _request_id(), "time_ms": _now_ms(), "result": result, } @app.post("/batch") def batch(req: BatchRequest, _: None = Depends(_auth)): results = [] for index, item in enumerate(req.actions): try: item_result = _exec_action(item) results.append({"index": index, "ok": True, "result": item_result}) except Exception as exc: results.append({"index": index, "ok": False, "error": str(exc)}) if req.stop_on_error: break return { "ok": all(r["ok"] for r in results), "request_id": _request_id(), "time_ms": _now_ms(), "results": results, } if __name__ == "__main__": import uvicorn uvicorn.run("server.app:app", host=SETTINGS["host"], port=SETTINGS["port"], reload=False)