Files
clickthrough/server/app.py
Luna 4aa51e2d69
All checks were successful
python-syntax / syntax-check (push) Successful in 29s
feat: bootstrap clickthrough server, skill docs, and syntax CI
2026-04-05 19:59:39 +02:00

458 lines
14 KiB
Python

import base64
import io
import os
import time
import uuid
from typing import Literal, Optional
from fastapi import Depends, FastAPI, Header, HTTPException
from pydantic import BaseModel, Field, model_validator
app = FastAPI(title="clickthrough", version="0.1.0")
def _env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]:
raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION")
if not raw:
return None
parts = [p.strip() for p in raw.split(",")]
if len(parts) != 4:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height")
x, y, w, h = (int(p) for p in parts)
if w <= 0 or h <= 0:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0")
return x, y, w, h
SETTINGS = {
"host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"),
"port": int(os.getenv("CLICKTHROUGH_PORT", "8123")),
"token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(),
"dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False),
"default_grid_rows": int(os.getenv("CLICKTHROUGH_GRID_ROWS", "12")),
"default_grid_cols": int(os.getenv("CLICKTHROUGH_GRID_COLS", "12")),
"allowed_region": _parse_allowed_region(),
}
class ScreenRequest(BaseModel):
with_grid: bool = True
grid_rows: int = Field(default=SETTINGS["default_grid_rows"], ge=1, le=200)
grid_cols: int = Field(default=SETTINGS["default_grid_cols"], ge=1, le=200)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=85, ge=1, le=100)
class ZoomRequest(BaseModel):
center_x: int = Field(ge=0)
center_y: int = Field(ge=0)
width: int = Field(default=500, ge=10)
height: int = Field(default=350, ge=10)
with_grid: bool = True
grid_rows: int = Field(default=20, ge=1, le=300)
grid_cols: int = Field(default=20, ge=1, le=300)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=90, ge=1, le=100)
class PixelTarget(BaseModel):
mode: Literal["pixel"]
x: int
y: int
dx: int = 0
dy: int = 0
class GridTarget(BaseModel):
mode: Literal["grid"]
region_x: int
region_y: int
region_width: int = Field(gt=0)
region_height: int = Field(gt=0)
rows: int = Field(gt=0)
cols: int = Field(gt=0)
row: int = Field(ge=0)
col: int = Field(ge=0)
dx: float = 0.0
dy: float = 0.0
@model_validator(mode="after")
def _validate_indices(self):
if self.row >= self.rows or self.col >= self.cols:
raise ValueError("row/col must be inside rows/cols")
if not -1.0 <= self.dx <= 1.0:
raise ValueError("dx must be in [-1, 1]")
if not -1.0 <= self.dy <= 1.0:
raise ValueError("dy must be in [-1, 1]")
return self
Target = PixelTarget | GridTarget
class ActionRequest(BaseModel):
action: Literal[
"move",
"click",
"right_click",
"double_click",
"middle_click",
"scroll",
"type",
"hotkey",
]
target: Optional[Target] = None
duration_ms: int = Field(default=0, ge=0, le=20000)
button: Literal["left", "right", "middle"] = "left"
clicks: int = Field(default=1, ge=1, le=10)
scroll_amount: int = 0
text: str = ""
keys: list[str] = Field(default_factory=list)
interval_ms: int = Field(default=20, ge=0, le=5000)
dry_run: bool = False
class BatchRequest(BaseModel):
actions: list[ActionRequest] = Field(min_length=1, max_length=100)
stop_on_error: bool = True
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
token = SETTINGS["token"]
if token and x_clickthrough_token != token:
raise HTTPException(status_code=401, detail="invalid token")
def _now_ms() -> int:
return int(time.time() * 1000)
def _request_id() -> str:
return str(uuid.uuid4())
def _import_capture_libs():
try:
from PIL import Image, ImageDraw
import mss
return Image, ImageDraw, mss
except Exception as exc:
raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc
def _capture_screen():
Image, _, mss = _import_capture_libs()
with mss.mss() as sct:
mon = sct.monitors[1]
shot = sct.grab(mon)
image = Image.frombytes("RGB", shot.size, shot.rgb)
return image, {"x": mon["left"], "y": mon["top"], "width": mon["width"], "height": mon["height"]}
def _encode_image(image, image_format: str, jpeg_quality: int) -> str:
buf = io.BytesIO()
if image_format == "jpeg":
image.save(buf, format="JPEG", quality=jpeg_quality)
else:
image.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode("ascii")
def _draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool):
_, ImageDraw, _ = _import_capture_libs()
out = image.copy()
draw = ImageDraw.Draw(out)
w, h = out.size
cell_w = w / cols
cell_h = h / rows
for c in range(1, cols):
x = int(round(c * cell_w))
draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1)
for r in range(1, rows):
y = int(round(r * cell_h))
draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1)
draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2)
if include_labels:
for r in range(rows):
for c in range(cols):
cx = int((c + 0.5) * cell_w)
cy = int((r + 0.5) * cell_h)
label = f"{r},{c}"
draw.text((cx - 12, cy - 6), label, fill=(255, 255, 0))
meta = {
"region": {"x": region_x, "y": region_y, "width": w, "height": h},
"grid": {
"rows": rows,
"cols": cols,
"cell_width": cell_w,
"cell_height": cell_h,
"indexing": "zero-based",
"point_formula": {
"pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)",
"pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)",
"dx_range": "[-1,1]",
"dy_range": "[-1,1]",
},
},
}
return out, meta
def _resolve_target(target: Target) -> tuple[int, int, dict]:
if isinstance(target, PixelTarget):
x = target.x + target.dx
y = target.y + target.dy
return x, y, {"mode": "pixel", "source": target.model_dump()}
cell_w = target.region_width / target.cols
cell_h = target.region_height / target.rows
x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w))
y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h))
return x, y, {
"mode": "grid",
"source": target.model_dump(),
"derived": {"cell_width": cell_w, "cell_height": cell_h},
}
def _enforce_allowed_region(x: int, y: int):
region = SETTINGS["allowed_region"]
if region is None:
return
rx, ry, rw, rh = region
if not (rx <= x < rx + rw and ry <= y < ry + rh):
raise HTTPException(status_code=403, detail="point outside allowed region")
def _import_input_lib():
try:
import pyautogui
pyautogui.FAILSAFE = True
return pyautogui
except Exception as exc:
raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc
def _exec_action(req: ActionRequest) -> dict:
run_dry = SETTINGS["dry_run"] or req.dry_run
pyautogui = None if run_dry else _import_input_lib()
resolved_target = None
if req.target is not None:
x, y, info = _resolve_target(req.target)
_enforce_allowed_region(x, y)
resolved_target = {"x": x, "y": y, "target_info": info}
duration_sec = req.duration_ms / 1000.0
if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for pointer actions")
if req.action == "scroll" and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for scroll")
if not run_dry:
if req.action == "move":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
elif req.action == "click":
pyautogui.click(
x=resolved_target["x"],
y=resolved_target["y"],
clicks=req.clicks,
interval=req.interval_ms / 1000.0,
button=req.button,
duration=duration_sec,
)
elif req.action == "right_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec)
elif req.action == "double_click":
pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0)
elif req.action == "middle_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec)
elif req.action == "scroll":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
pyautogui.scroll(req.scroll_amount)
elif req.action == "type":
pyautogui.write(req.text, interval=req.interval_ms / 1000.0)
elif req.action == "hotkey":
if len(req.keys) < 1:
raise HTTPException(status_code=400, detail="keys is required for hotkey")
pyautogui.hotkey(*req.keys)
return {
"action": req.action,
"executed": not run_dry,
"dry_run": run_dry,
"resolved_target": resolved_target,
}
@app.get("/health")
def health(_: None = Depends(_auth)):
return {
"ok": True,
"service": "clickthrough",
"version": app.version,
"time_ms": _now_ms(),
"request_id": _request_id(),
"dry_run": SETTINGS["dry_run"],
"allowed_region": SETTINGS["allowed_region"],
}
@app.get("/screen")
def screen(
with_grid: bool = True,
grid_rows: int = SETTINGS["default_grid_rows"],
grid_cols: int = SETTINGS["default_grid_cols"],
include_labels: bool = True,
image_format: Literal["png", "jpeg"] = "png",
jpeg_quality: int = 85,
_: None = Depends(_auth),
):
req = ScreenRequest(
with_grid=with_grid,
grid_rows=grid_rows,
grid_cols=grid_cols,
include_labels=include_labels,
image_format=image_format,
jpeg_quality=jpeg_quality,
)
base_img, mon = _capture_screen()
meta = {"region": mon}
out_img = base_img
if req.with_grid:
out_img, grid_meta = _draw_grid(base_img, mon["x"], mon["y"], req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
encoded = _encode_image(out_img, req.image_format, req.jpeg_quality)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"image": {
"format": req.image_format,
"base64": encoded,
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
@app.post("/zoom")
def zoom(req: ZoomRequest, _: None = Depends(_auth)):
base_img, mon = _capture_screen()
cx = req.center_x - mon["x"]
cy = req.center_y - mon["y"]
half_w = req.width // 2
half_h = req.height // 2
left = max(0, cx - half_w)
top = max(0, cy - half_h)
right = min(base_img.size[0], left + req.width)
bottom = min(base_img.size[1], top + req.height)
crop = base_img.crop((left, top, right, bottom))
region_x = mon["x"] + left
region_y = mon["y"] + top
meta = {
"source_monitor": mon,
"region": {
"x": region_x,
"y": region_y,
"width": crop.size[0],
"height": crop.size[1],
},
}
out_img = crop
if req.with_grid:
out_img, grid_meta = _draw_grid(crop, region_x, region_y, req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
encoded = _encode_image(out_img, req.image_format, req.jpeg_quality)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"image": {
"format": req.image_format,
"base64": encoded,
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
@app.post("/action")
def action(req: ActionRequest, _: None = Depends(_auth)):
result = _exec_action(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/batch")
def batch(req: BatchRequest, _: None = Depends(_auth)):
results = []
for index, item in enumerate(req.actions):
try:
item_result = _exec_action(item)
results.append({"index": index, "ok": True, "result": item_result})
except Exception as exc:
results.append({"index": index, "ok": False, "error": str(exc)})
if req.stop_on_error:
break
return {
"ok": all(r["ok"] for r in results),
"request_id": _request_id(),
"time_ms": _now_ms(),
"results": results,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("server.app:app", host=SETTINGS["host"], port=SETTINGS["port"], reload=False)