This repository has been archived on 2026-05-20. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
clickthrough/server/app.py
space 9e816e0417
All checks were successful
python-syntax / syntax-check (push) Successful in 6s
Add pytesseract OCR, click_text interact action, and interact verify endpoint
2026-05-03 20:57:34 +02:00

235 lines
7.2 KiB
Python

import hmac
import time
import uuid
from typing import Any, Optional
from fastapi import Depends, FastAPI, Header, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from .config import SETTINGS
from .models import ExecRequest, InteractRequest, InteractVerifyRequest, LaunchRequest, SeeRequest, SeeZoomRequest, WindowActionRequest, WindowQuery
from .services import (
apply_window_action,
capture_region_image,
capture_screen,
draw_grid,
encode_image,
execute_and_verify,
extract_ocr_items,
exec_action,
exec_command as run_exec_command,
get_displays,
launch_app,
list_windows,
)
app = FastAPI(title="clickthrough", version="0.1.0")
def _now_ms() -> int:
return int(time.time() * 1000)
def _request_id() -> str:
return str(uuid.uuid4())
def _ok(data: Any, status_code: int = 200):
return JSONResponse(
status_code=status_code,
content={
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"data": data,
"error": None,
},
)
def _err(code: str, message: str, status_code: int, details: Any = None):
return JSONResponse(
status_code=status_code,
content={
"ok": False,
"request_id": _request_id(),
"time_ms": _now_ms(),
"data": None,
"error": {"code": code, "message": message, "details": details},
},
)
@app.exception_handler(HTTPException)
async def _http_exception_handler(_: Request, exc: HTTPException):
detail = exc.detail
if isinstance(detail, dict):
message = str(detail.get("message", "request failed"))
code = str(detail.get("code", "http_error"))
return _err(code, message, exc.status_code, detail.get("details"))
return _err("http_error", str(detail), exc.status_code)
@app.exception_handler(RequestValidationError)
async def _validation_exception_handler(_: Request, exc: RequestValidationError):
return _err("validation_error", "request validation failed", 422, exc.errors())
@app.exception_handler(Exception)
async def _unhandled_exception_handler(_: Request, exc: Exception):
return _err("internal_error", "internal server error", 500, {"type": type(exc).__name__})
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
token = SETTINGS["token"]
if token and x_clickthrough_token != token:
raise HTTPException(status_code=401, detail="invalid token")
@app.post("/see")
def see(req: SeeRequest, _: None = Depends(_auth)):
image, region, mon, displays, screen_selection = capture_region_image(
req.screen,
req.region_x,
req.region_y,
req.region_width,
req.region_height,
)
out_img = image
meta = {"region": region, "screen": screen_selection, "display": mon, "displays": displays}
if req.with_grid:
out_img, grid_meta = draw_grid(image, region["x"], region["y"], req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
if req.ocr:
meta["ocr"] = extract_ocr_items(image, region["x"], region["y"], req.ocr_min_confidence, req.ocr_lang, req.ocr_psm)
return _ok(
{
"image": {
"format": req.image_format,
"base64": encode_image(out_img, req.image_format, req.jpeg_quality),
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
)
@app.post("/see/zoom")
def see_zoom(req: SeeZoomRequest, _: None = Depends(_auth)):
base_img, mon, displays, screen_selection = capture_screen(req.screen)
cx = req.center_x - mon["x"]
cy = req.center_y - mon["y"]
left = max(0, cx - (req.width // 2))
top = max(0, cy - (req.height // 2))
right = min(base_img.size[0], left + req.width)
bottom = min(base_img.size[1], top + req.height)
crop = base_img.crop((left, top, right, bottom))
region_x = mon["x"] + left
region_y = mon["y"] + top
meta = {
"region": {"x": region_x, "y": region_y, "width": crop.size[0], "height": crop.size[1]},
"screen": screen_selection,
"display": mon,
"displays": displays,
}
out_img = crop
if req.with_grid:
out_img, grid_meta = draw_grid(crop, region_x, region_y, req.grid_rows, req.grid_cols, req.include_labels)
meta.update(grid_meta)
return _ok(
{
"image": {
"format": req.image_format,
"base64": encode_image(out_img, req.image_format, req.jpeg_quality),
"width": out_img.size[0],
"height": out_img.size[1],
},
"meta": meta,
}
)
@app.post("/interact")
def interact(req: InteractRequest, _: None = Depends(_auth)):
return _ok(exec_action(req.action, req.screen))
@app.post("/interact/verify")
def interact_verify(req: InteractVerifyRequest, _: None = Depends(_auth)):
return _ok(execute_and_verify(req))
@app.get("/health")
def health(_: None = Depends(_auth)):
return _ok(
{
"service": "clickthrough",
"version": app.version,
"dry_run": SETTINGS["dry_run"],
"allowed_region": SETTINGS["allowed_region"],
"exec": {
"enabled": SETTINGS["exec_enabled"],
"secret_configured": bool(SETTINGS["exec_secret"]),
"default_shell": SETTINGS["exec_default_shell"],
"default_timeout_s": SETTINGS["exec_default_timeout_s"],
"max_timeout_s": SETTINGS["exec_max_timeout_s"],
},
}
)
@app.get("/displays")
def displays(_: None = Depends(_auth)):
return _ok({"displays": get_displays(), "default_screen": 0})
@app.post("/exec")
def exec_command(req: ExecRequest, x_clickthrough_exec_secret: Optional[str] = Header(default=None), _: None = Depends(_auth)):
expected = SETTINGS["exec_secret"]
if not expected:
raise HTTPException(status_code=403, detail="exec secret not configured")
if not x_clickthrough_exec_secret or not hmac.compare_digest(x_clickthrough_exec_secret, expected):
raise HTTPException(status_code=401, detail="invalid exec secret")
return _ok(run_exec_command(req))
@app.get("/windows")
def windows(
title_contains: str | None = None,
title_regex: str | None = None,
process_name: str | None = None,
hwnd: int | None = None,
visible_only: bool = True,
_: None = Depends(_auth),
):
query = WindowQuery(
title_contains=title_contains,
title_regex=title_regex,
process_name=process_name,
hwnd=hwnd,
visible_only=visible_only,
)
matches = list_windows(query)
return _ok({"windows": matches, "count": len(matches)})
@app.post("/windows/action")
def window_action(req: WindowActionRequest, _: None = Depends(_auth)):
return _ok(apply_window_action(req))
@app.post("/launch")
def launch(req: LaunchRequest, _: None = Depends(_auth)):
return _ok(launch_app(req))
if __name__ == "__main__":
import uvicorn
uvicorn.run("server.app:app", host=SETTINGS["host"], port=SETTINGS["port"], reload=False)