Files
clickthrough/server/app.py
Luna a8f2e01bb9
All checks were successful
python-syntax / syntax-check (pull_request) Successful in 9s
python-syntax / syntax-check (push) Successful in 8s
fix(ocr): allow configuring tesseract path
2026-04-06 19:02:50 +02:00

797 lines
25 KiB
Python

import base64
import hmac
import io
import os
import subprocess
import time
import uuid
from typing import Literal, Optional
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, Header, HTTPException, Response
from pydantic import BaseModel, Field, model_validator
load_dotenv(dotenv_path=".env", override=False)
app = FastAPI(title="clickthrough", version="0.1.0")
def _env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]:
raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION")
if not raw:
return None
parts = [p.strip() for p in raw.split(",")]
if len(parts) != 4:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height")
x, y, w, h = (int(p) for p in parts)
if w <= 0 or h <= 0:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0")
return x, y, w, h
SETTINGS = {
"host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"),
"port": int(os.getenv("CLICKTHROUGH_PORT", "8123")),
"token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(),
"dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False),
"default_grid_rows": int(os.getenv("CLICKTHROUGH_GRID_ROWS", "12")),
"default_grid_cols": int(os.getenv("CLICKTHROUGH_GRID_COLS", "12")),
"allowed_region": _parse_allowed_region(),
"exec_enabled": _env_bool("CLICKTHROUGH_EXEC_ENABLED", True),
"exec_default_shell": os.getenv("CLICKTHROUGH_EXEC_DEFAULT_SHELL", "powershell").strip().lower(),
"exec_default_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_TIMEOUT_S", "30")),
"exec_max_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_MAX_TIMEOUT_S", "120")),
"exec_max_output_chars": int(os.getenv("CLICKTHROUGH_EXEC_MAX_OUTPUT_CHARS", "20000")),
"exec_secret": os.getenv("CLICKTHROUGH_EXEC_SECRET", "").strip(),
"tesseract_cmd": os.getenv("CLICKTHROUGH_TESSERACT_CMD", "").strip(),
}
class ScreenRequest(BaseModel):
with_grid: bool = True
grid_rows: int = Field(default=SETTINGS["default_grid_rows"], ge=1, le=200)
grid_cols: int = Field(default=SETTINGS["default_grid_cols"], ge=1, le=200)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=85, ge=1, le=100)
class ZoomRequest(BaseModel):
center_x: int = Field(ge=0)
center_y: int = Field(ge=0)
width: int = Field(default=500, ge=10)
height: int = Field(default=350, ge=10)
with_grid: bool = True
grid_rows: int = Field(default=20, ge=1, le=300)
grid_cols: int = Field(default=20, ge=1, le=300)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=90, ge=1, le=100)
class PixelTarget(BaseModel):
mode: Literal["pixel"]
x: int
y: int
dx: int = 0
dy: int = 0
class GridTarget(BaseModel):
mode: Literal["grid"]
region_x: int
region_y: int
region_width: int = Field(gt=0)
region_height: int = Field(gt=0)
rows: int = Field(gt=0)
cols: int = Field(gt=0)
row: int = Field(ge=0)
col: int = Field(ge=0)
dx: float = 0.0
dy: float = 0.0
@model_validator(mode="after")
def _validate_indices(self):
if self.row >= self.rows or self.col >= self.cols:
raise ValueError("row/col must be inside rows/cols")
if not -1.0 <= self.dx <= 1.0:
raise ValueError("dx must be in [-1, 1]")
if not -1.0 <= self.dy <= 1.0:
raise ValueError("dy must be in [-1, 1]")
return self
Target = PixelTarget | GridTarget
class ActionRequest(BaseModel):
action: Literal[
"move",
"click",
"right_click",
"double_click",
"middle_click",
"scroll",
"type",
"hotkey",
]
target: Optional[Target] = None
duration_ms: int = Field(default=0, ge=0, le=20000)
button: Literal["left", "right", "middle"] = "left"
clicks: int = Field(default=1, ge=1, le=10)
scroll_amount: int = 0
text: str = ""
keys: list[str] = Field(default_factory=list)
interval_ms: int = Field(default=20, ge=0, le=5000)
dry_run: bool = False
class BatchRequest(BaseModel):
actions: list[ActionRequest] = Field(min_length=1, max_length=100)
stop_on_error: bool = True
class ExecRequest(BaseModel):
command: str = Field(min_length=1, max_length=10000)
shell: Literal["powershell", "bash", "cmd"] | None = None
timeout_s: int | None = Field(default=None, ge=1, le=600)
cwd: str | None = None
dry_run: bool = False
class OCRRequest(BaseModel):
mode: Literal["screen", "region", "image"] = "screen"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
image_base64: str | None = None
language_hint: str | None = Field(default=None, min_length=1, max_length=64)
min_confidence: float = Field(default=0.0, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_mode_inputs(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
if self.mode == "image" and not self.image_base64:
raise ValueError("image_base64 is required for mode=image")
return self
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
token = SETTINGS["token"]
if token and x_clickthrough_token != token:
raise HTTPException(status_code=401, detail="invalid token")
def _now_ms() -> int:
return int(time.time() * 1000)
def _request_id() -> str:
return str(uuid.uuid4())
def _import_capture_libs():
try:
from PIL import Image, ImageDraw
import mss
return Image, ImageDraw, mss
except Exception as exc:
raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc
def _capture_screen():
Image, _, mss = _import_capture_libs()
with mss.mss() as sct:
mon = sct.monitors[1]
shot = sct.grab(mon)
image = Image.frombytes("RGB", shot.size, shot.rgb)
return image, {"x": mon["left"], "y": mon["top"], "width": mon["width"], "height": mon["height"]}
def _serialize_image(image, image_format: str, jpeg_quality: int) -> bytes:
buf = io.BytesIO()
if image_format == "jpeg":
image.save(buf, format="JPEG", quality=jpeg_quality)
else:
image.save(buf, format="PNG")
return buf.getvalue()
def _encode_image(image, image_format: str, jpeg_quality: int) -> str:
return base64.b64encode(_serialize_image(image, image_format, jpeg_quality)).decode("ascii")
def _draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool):
_, ImageDraw, _ = _import_capture_libs()
out = image.copy()
draw = ImageDraw.Draw(out)
w, h = out.size
cell_w = w / cols
cell_h = h / rows
for c in range(1, cols):
x = int(round(c * cell_w))
draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1)
for r in range(1, rows):
y = int(round(r * cell_h))
draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1)
draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2)
if include_labels:
for r in range(rows):
for c in range(cols):
cx = int((c + 0.5) * cell_w)
cy = int((r + 0.5) * cell_h)
label = f"{r},{c}"
draw.text((cx - 12, cy - 6), label, fill=(255, 255, 0))
meta = {
"region": {"x": region_x, "y": region_y, "width": w, "height": h},
"grid": {
"rows": rows,
"cols": cols,
"cell_width": cell_w,
"cell_height": cell_h,
"indexing": "zero-based",
"point_formula": {
"pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)",
"pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)",
"dx_range": "[-1,1]",
"dy_range": "[-1,1]",
},
},
}
return out, meta
def _resolve_target(target: Target) -> tuple[int, int, dict]:
if isinstance(target, PixelTarget):
x = target.x + target.dx
y = target.y + target.dy
return x, y, {"mode": "pixel", "source": target.model_dump()}
cell_w = target.region_width / target.cols
cell_h = target.region_height / target.rows
x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w))
y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h))
return x, y, {
"mode": "grid",
"source": target.model_dump(),
"derived": {"cell_width": cell_w, "cell_height": cell_h},
}
def _enforce_allowed_region(x: int, y: int):
region = SETTINGS["allowed_region"]
if region is None:
return
rx, ry, rw, rh = region
if not (rx <= x < rx + rw and ry <= y < ry + rh):
raise HTTPException(status_code=403, detail="point outside allowed region")
def _import_input_lib():
try:
import pyautogui
pyautogui.FAILSAFE = True
return pyautogui
except Exception as exc:
raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc
def _import_ocr_libs():
try:
import pytesseract
from pytesseract import Output
tesseract_cmd = SETTINGS["tesseract_cmd"]
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
return pytesseract, Output
except Exception as exc:
raise HTTPException(status_code=500, detail=f"ocr backend unavailable: {exc}") from exc
def _decode_image_base64(value: str):
Image, _, _ = _import_capture_libs()
payload = value.strip()
if payload.startswith("data:"):
parts = payload.split(",", 1)
if len(parts) != 2:
raise HTTPException(status_code=400, detail="invalid data URL image payload")
payload = parts[1]
try:
image_bytes = base64.b64decode(payload, validate=True)
except Exception as exc:
raise HTTPException(status_code=400, detail="invalid image_base64 payload") from exc
try:
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
except Exception as exc:
raise HTTPException(status_code=400, detail="unsupported or unreadable image bytes") from exc
return image
def _run_ocr(image, language_hint: str | None, min_confidence: float, offset_x: int = 0, offset_y: int = 0) -> list[dict]:
pytesseract, Output = _import_ocr_libs()
config = "--oem 3 --psm 6"
kwargs = {
"image": image,
"output_type": Output.DICT,
"config": config,
}
if language_hint:
kwargs["lang"] = language_hint
try:
data = pytesseract.image_to_data(**kwargs)
except pytesseract.TesseractNotFoundError as exc:
raise HTTPException(status_code=500, detail="tesseract executable not found") from exc
except pytesseract.TesseractError as exc:
raise HTTPException(status_code=400, detail=f"ocr failed: {exc}") from exc
blocks = []
count = len(data.get("text", []))
for idx in range(count):
text = (data["text"][idx] or "").strip()
if not text:
continue
raw_conf = str(data["conf"][idx]).strip()
try:
conf_0_100 = float(raw_conf)
except ValueError:
conf_0_100 = -1.0
if conf_0_100 < 0:
continue
confidence = round(conf_0_100 / 100.0, 4)
if confidence < min_confidence:
continue
left = int(data["left"][idx])
top = int(data["top"][idx])
width = int(data["width"][idx])
height = int(data["height"][idx])
blocks.append(
{
"text": text,
"confidence": confidence,
"bbox": {
"x": left + offset_x,
"y": top + offset_y,
"width": width,
"height": height,
},
"_sort": [top + offset_y, left + offset_x, idx],
}
)
blocks.sort(key=lambda b: (b["_sort"][0], b["_sort"][1], b["_sort"][2]))
for block in blocks:
block.pop("_sort", None)
return blocks
def _pick_shell(explicit_shell: str | None) -> str:
shell_name = (explicit_shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip()
if shell_name not in {"powershell", "bash", "cmd"}:
raise HTTPException(status_code=400, detail="unsupported shell")
return shell_name
def _truncate_text(text: str, limit: int) -> tuple[str, bool]:
if len(text) <= limit:
return text, False
return text[:limit], True
def _resolve_exec_program(shell_name: str, command: str) -> list[str]:
if shell_name == "powershell":
return ["powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command]
if shell_name == "bash":
return ["bash", "-lc", command]
if shell_name == "cmd":
return ["cmd", "/c", command]
raise HTTPException(status_code=400, detail="unsupported shell")
def _exec_command(req: ExecRequest) -> dict:
if not SETTINGS["exec_enabled"]:
raise HTTPException(status_code=403, detail="exec endpoint disabled")
if not SETTINGS["exec_secret"]:
raise HTTPException(status_code=403, detail="exec secret not configured")
run_dry = SETTINGS["dry_run"] or req.dry_run
shell_name = _pick_shell(req.shell)
timeout_s = req.timeout_s if req.timeout_s is not None else SETTINGS["exec_default_timeout_s"]
timeout_s = min(timeout_s, SETTINGS["exec_max_timeout_s"])
cwd = None
if req.cwd:
cwd = os.path.abspath(req.cwd)
if not os.path.isdir(cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
argv = _resolve_exec_program(shell_name, req.command)
if run_dry:
return {
"executed": False,
"dry_run": True,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
}
start = time.time()
try:
completed = subprocess.run(
argv,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout_s,
check=False,
)
except subprocess.TimeoutExpired as exc:
stdout = exc.stdout or ""
stderr = exc.stderr or ""
stdout, stdout_truncated = _truncate_text(str(stdout), SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(str(stderr), SETTINGS["exec_max_output_chars"])
return {
"executed": True,
"timed_out": True,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
"duration_ms": int((time.time() - start) * 1000),
"exit_code": None,
"stdout": stdout,
"stderr": stderr,
"stdout_truncated": stdout_truncated,
"stderr_truncated": stderr_truncated,
}
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"shell executable not found: {exc}") from exc
stdout, stdout_truncated = _truncate_text(completed.stdout or "", SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(completed.stderr or "", SETTINGS["exec_max_output_chars"])
return {
"executed": True,
"timed_out": False,
"shell": shell_name,
"command": req.command,
"argv": argv,
"timeout_s": timeout_s,
"cwd": cwd,
"duration_ms": int((time.time() - start) * 1000),
"exit_code": completed.returncode,
"stdout": stdout,
"stderr": stderr,
"stdout_truncated": stdout_truncated,
"stderr_truncated": stderr_truncated,
}
def _exec_action(req: ActionRequest) -> dict:
run_dry = SETTINGS["dry_run"] or req.dry_run
pyautogui = None if run_dry else _import_input_lib()
resolved_target = None
if req.target is not None:
x, y, info = _resolve_target(req.target)
_enforce_allowed_region(x, y)
resolved_target = {"x": x, "y": y, "target_info": info}
duration_sec = req.duration_ms / 1000.0
if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for pointer actions")
if req.action == "scroll" and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for scroll")
if not run_dry:
if req.action == "move":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
elif req.action == "click":
pyautogui.click(
x=resolved_target["x"],
y=resolved_target["y"],
clicks=req.clicks,
interval=req.interval_ms / 1000.0,
button=req.button,
duration=duration_sec,
)
elif req.action == "right_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec)
elif req.action == "double_click":
pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0)
elif req.action == "middle_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec)
elif req.action == "scroll":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
pyautogui.scroll(req.scroll_amount)
elif req.action == "type":
pyautogui.write(req.text, interval=req.interval_ms / 1000.0)
elif req.action == "hotkey":
if len(req.keys) < 1:
raise HTTPException(status_code=400, detail="keys is required for hotkey")
pyautogui.hotkey(*req.keys)
return {
"action": req.action,
"executed": not run_dry,
"dry_run": run_dry,
"resolved_target": resolved_target,
}
@app.get("/health")
def health(_: None = Depends(_auth)):
return {
"ok": True,
"service": "clickthrough",
"version": app.version,
"time_ms": _now_ms(),
"request_id": _request_id(),
"dry_run": SETTINGS["dry_run"],
"allowed_region": SETTINGS["allowed_region"],
"exec": {
"enabled": SETTINGS["exec_enabled"],
"secret_configured": bool(SETTINGS["exec_secret"]),
"default_shell": SETTINGS["exec_default_shell"],
"default_timeout_s": SETTINGS["exec_default_timeout_s"],
"max_timeout_s": SETTINGS["exec_max_timeout_s"],
},
}
@app.get("/screen")
def screen(
with_grid: bool = True,
grid_rows: int = SETTINGS["default_grid_rows"],
grid_cols: int = SETTINGS["default_grid_cols"],
include_labels: bool = True,
image_format: Literal["png", "jpeg"] = "png",
jpeg_quality: int = 85,
asImage: bool = False,
_: None = Depends(_auth),
):
req = ScreenRequest(
with_grid=with_grid,
grid_rows=grid_rows,
grid_cols=grid_cols,
include_labels=include_labels,
image_format=image_format,
jpeg_quality=jpeg_quality,
)
base_img, mon = _capture_screen()
meta = {"region": mon}
out_img = base_img
if req.with_grid:
out_img, grid_meta = _draw_grid(base_img, mon["x"], mon["y"], req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
if asImage:
image_bytes = _serialize_image(out_img, req.image_format, req.jpeg_quality)
media_type = "image/jpeg" if req.image_format == "jpeg" else "image/png"
return Response(content=image_bytes, media_type=media_type)
encoded = _encode_image(out_img, req.image_format, req.jpeg_quality)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"image": {
"format": req.image_format,
"base64": encoded,
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
@app.post("/zoom")
def zoom(req: ZoomRequest, asImage: bool = False, _: None = Depends(_auth)):
base_img, mon = _capture_screen()
cx = req.center_x - mon["x"]
cy = req.center_y - mon["y"]
half_w = req.width // 2
half_h = req.height // 2
left = max(0, cx - half_w)
top = max(0, cy - half_h)
right = min(base_img.size[0], left + req.width)
bottom = min(base_img.size[1], top + req.height)
crop = base_img.crop((left, top, right, bottom))
region_x = mon["x"] + left
region_y = mon["y"] + top
meta = {
"source_monitor": mon,
"region": {
"x": region_x,
"y": region_y,
"width": crop.size[0],
"height": crop.size[1],
},
}
out_img = crop
if req.with_grid:
out_img, grid_meta = _draw_grid(crop, region_x, region_y, req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
if asImage:
image_bytes = _serialize_image(out_img, req.image_format, req.jpeg_quality)
media_type = "image/jpeg" if req.image_format == "jpeg" else "image/png"
return Response(content=image_bytes, media_type=media_type)
encoded = _encode_image(out_img, req.image_format, req.jpeg_quality)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"image": {
"format": req.image_format,
"base64": encoded,
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
@app.post("/action")
def action(req: ActionRequest, _: None = Depends(_auth)):
result = _exec_action(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/exec")
def exec_command(
req: ExecRequest,
x_clickthrough_exec_secret: Optional[str] = Header(default=None),
_: None = Depends(_auth),
):
expected = SETTINGS["exec_secret"]
if not expected:
raise HTTPException(status_code=403, detail="exec secret not configured")
if not x_clickthrough_exec_secret or not hmac.compare_digest(x_clickthrough_exec_secret, expected):
raise HTTPException(status_code=401, detail="invalid exec secret")
result = _exec_command(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/ocr")
def ocr(req: OCRRequest, _: None = Depends(_auth)):
source = req.mode
if source == "image":
image = _decode_image_base64(req.image_base64 or "")
region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]}
blocks = _run_ocr(image, req.language_hint, req.min_confidence, 0, 0)
else:
base_img, mon = _capture_screen()
if source == "screen":
image = base_img
region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}
offset_x = mon["x"]
offset_y = mon["y"]
else:
left = req.region_x - mon["x"]
top = req.region_y - mon["y"]
right = left + req.region_width
bottom = top + req.region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
image = base_img.crop((left, top, right, bottom))
region = {
"x": req.region_x,
"y": req.region_y,
"width": req.region_width,
"height": req.region_height,
}
offset_x = req.region_x
offset_y = req.region_y
blocks = _run_ocr(image, req.language_hint, req.min_confidence, offset_x, offset_y)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": {
"mode": source,
"language_hint": req.language_hint,
"min_confidence": req.min_confidence,
"region": region,
"blocks": blocks,
},
}
@app.post("/batch")
def batch(req: BatchRequest, _: None = Depends(_auth)):
results = []
for index, item in enumerate(req.actions):
try:
item_result = _exec_action(item)
results.append({"index": index, "ok": True, "result": item_result})
except Exception as exc:
results.append({"index": index, "ok": False, "error": str(exc)})
if req.stop_on_error:
break
return {
"ok": all(r["ok"] for r in results),
"request_id": _request_id(),
"time_ms": _now_ms(),
"results": results,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("server.app:app", host=SETTINGS["host"], port=SETTINGS["port"], reload=False)