This commit is contained in:
Space-Banane
2026-04-05 19:48:00 +02:00
parent 48ac9f5d7d
commit 6f9eedcc7a
27 changed files with 1 additions and 1396 deletions

View File

@@ -1 +0,0 @@
from .main import app # noqa: F401

View File

@@ -1,34 +0,0 @@
from __future__ import annotations
from typing import Tuple
from .models import ActionPayload, ActionResult
class ActionEngine:
def __init__(self, grid) -> None:
self.grid = grid
def plan(self, payload: ActionPayload) -> ActionResult:
coords = self._resolve_coords(payload.target_cell)
detail = self._describe(payload, coords)
return ActionResult(
success=True,
detail=detail,
coordinates=coords,
payload={"comment": payload.comment or "", "text": payload.text or ""},
)
def _resolve_coords(self, target_cell: str | None) -> Tuple[int, int] | None:
if not target_cell:
return None
return self.grid.resolve_cell_center(target_cell)
def _describe(
self, payload: ActionPayload, coords: Tuple[int, int] | None
) -> str:
cell_info = payload.target_cell or "free space"
location = f"@{cell_info}" if coords else "(no target)"
action_hint = payload.action.value
extra = f" text='{payload.text}'" if payload.text else ""
return f"Plan {action_hint} {location}{extra}"

View File

@@ -1,14 +0,0 @@
from pathlib import Path
from pydantic import ConfigDict
from pydantic_settings import BaseSettings
class ServerSettings(BaseSettings):
grid_rows: int = 4
grid_cols: int = 4
cell_margin_px: int = 4
storage_dir: Path = Path("data/screenshots")
default_timeout: int = 10
model_config = ConfigDict(env_prefix="CLICKTHROUGH_", env_file=".env")

View File

@@ -1,136 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple
import uuid
from .actions import ActionEngine
from .config import ServerSettings
from .models import (
ActionPayload,
ActionResult,
GridCellModel,
GridDescriptor,
GridInitRequest,
)
@dataclass
class _StoredCell:
model: GridCellModel
center: Tuple[int, int]
class VisionGrid:
def __init__(self, request: GridInitRequest, grid_id: str, rows: int, columns: int):
self.grid_id = grid_id
self.screenshot = request.screenshot_base64
self.memo = request.memo
self.rows = rows
self.columns = columns
self.width = request.width
self.height = request.height
self.cells: Dict[str, _StoredCell] = {}
self._action_history: List[dict[str, Any]] = []
self._engine = ActionEngine(self)
self._build_cells()
def _build_cells(self, margin: int = 4) -> None:
cell_width = max(1, self.width // self.columns)
cell_height = max(1, self.height // self.rows)
for row in range(self.rows):
for col in range(self.columns):
left = col * cell_width + margin
top = row * cell_height + margin
right = min(self.width - margin, (col + 1) * cell_width - margin)
bottom = min(self.height - margin, (row + 1) * cell_height - margin)
cell_id = f"{self.grid_id}-{row}-{col}"
bounds = (left, top, right, bottom)
center = ((left + right) // 2, (top + bottom) // 2)
cell = GridCellModel(
cell_id=cell_id,
row=row,
column=col,
bounds=bounds,
)
self.cells[cell_id] = _StoredCell(model=cell, center=center)
def describe(self) -> GridDescriptor:
return GridDescriptor(
grid_id=self.grid_id,
rows=self.rows,
columns=self.columns,
cells=[cell.model for cell in self.cells.values()],
metadata=self.metadata,
)
@property
def metadata(self) -> Dict[str, Any]:
return {
"memo": self.memo or "",
"width": self.width,
"height": self.height,
}
def resolve_cell_center(self, cell_id: str) -> Tuple[int, int]:
cell = self.cells.get(cell_id)
if not cell:
raise KeyError(f"Unknown cell {cell_id}")
return cell.center
def preview_action(self, payload: ActionPayload) -> ActionResult:
return self._engine.plan(payload)
def apply_action(self, payload: ActionPayload) -> ActionResult:
result = self._engine.plan(payload)
self._action_history.append(result.model_dump())
return result
def update_screenshot(self, screenshot_base64: str, memo: str | None = None) -> None:
self.screenshot = screenshot_base64
if memo:
self.memo = memo
@property
def action_history(self) -> List[dict[str, Any]]:
return list(self._action_history)
def summary(self) -> str:
last_action = self._action_history[-1] if self._action_history else None
last_summary = (
f"Last action: {last_action.get('detail')}" if last_action else "No actions recorded yet"
)
return (
f"Grid {self.grid_id} ({self.rows}x{self.columns}) with {len(self.cells)} cells. {last_summary}."
)
class GridManager:
def __init__(self, settings: ServerSettings):
self.settings = settings
self._grids: Dict[str, VisionGrid] = {}
@property
def grid_count(self) -> int:
return len(self._grids)
def create_grid(self, request: GridInitRequest) -> VisionGrid:
rows = request.rows or self.settings.grid_rows
columns = request.columns or self.settings.grid_cols
grid_id = uuid.uuid4().hex
grid = VisionGrid(request, grid_id, rows, columns)
self._grids[grid_id] = grid
return grid
def get_grid(self, grid_id: str) -> VisionGrid:
try:
return self._grids[grid_id]
except KeyError as exc:
raise KeyError(f"Grid {grid_id} not found") from exc
def get_history(self, grid_id: str) -> List[dict[str, Any]]:
return self.get_grid(grid_id).action_history
def clear(self) -> None:
self._grids.clear()

View File

@@ -1,133 +0,0 @@
import time
from pathlib import Path
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
from .config import ServerSettings
from .grid import GridManager
from .models import (
ActionPayload,
GridDescriptor,
GridInitRequest,
GridPlanRequest,
GridRefreshRequest,
)
from .planner import GridPlanner
from .streamer import ScreenshotStreamer
settings = ServerSettings()
manager = GridManager(settings)
planner = GridPlanner()
streamer = ScreenshotStreamer()
app = FastAPI(
title="Clickthrough",
description="Grid-aware surface that lets an agent plan clicks, drags, and typing on a fake screenshot",
version="0.3.0",
)
client_dir = Path(__file__).resolve().parent.parent / "client"
if client_dir.exists():
app.mount("/ui", StaticFiles(directory=str(client_dir), html=True), name="ui")
@app.get("/")
async def root():
if client_dir.exists():
return RedirectResponse("/ui/")
return {"status": "ok", "grid_count": manager.grid_count}
@app.get("/health")
def health_check() -> dict[str, str]:
return {"status": "ok", "grid_count": str(manager.grid_count)}
@app.post("/grid/init", response_model=GridDescriptor)
def init_grid(request: GridInitRequest) -> GridDescriptor:
grid = manager.create_grid(request)
return grid.describe()
@app.post("/grid/action")
def apply_action(payload: ActionPayload):
try:
grid = manager.get_grid(payload.grid_id)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return grid.apply_action(payload)
@app.get("/grid/{grid_id}/summary")
def grid_summary(grid_id: str):
try:
grid = manager.get_grid(grid_id)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
descriptor = grid.describe()
return {
"grid_id": grid_id,
"summary": planner.describe(descriptor),
"details": grid.summary(),
"descriptor": descriptor,
}
@app.get("/grid/{grid_id}/history")
def grid_history(grid_id: str):
try:
history = manager.get_history(grid_id)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return {"grid_id": grid_id, "history": history}
@app.post("/grid/{grid_id}/plan")
def plan_grid(grid_id: str, request: GridPlanRequest):
try:
grid = manager.get_grid(grid_id)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
descriptor = grid.describe()
payload = planner.build_payload(
descriptor,
action=request.action,
preferred_label=request.preferred_label,
text=request.text,
comment=request.comment,
)
result = grid.preview_action(payload)
return {"plan": payload.model_dump(), "result": result, "descriptor": descriptor}
@app.post("/grid/{grid_id}/refresh")
async def refresh_grid(grid_id: str, payload: GridRefreshRequest):
try:
grid = manager.get_grid(grid_id)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
grid.update_screenshot(payload.screenshot_base64, payload.memo)
descriptor = grid.describe()
await streamer.broadcast(
grid_id,
{
"grid_id": grid_id,
"timestamp": time.time(),
"descriptor": descriptor,
"screenshot_base64": payload.screenshot_base64,
},
)
return {"status": "updated", "grid_id": grid_id}
@app.websocket("/stream/screenshots")
async def stream_screenshots(websocket: WebSocket, grid_id: str | None = None):
key = await streamer.connect(websocket, grid_id)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
streamer.disconnect(websocket, key)

View File

@@ -1,67 +0,0 @@
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field
class ActionType(str, Enum):
CLICK = "click"
DOUBLE_CLICK = "double_click"
DRAG = "drag"
TYPE = "type"
SCROLL = "scroll"
class GridInitRequest(BaseModel):
width: int
height: int
screenshot_base64: str
rows: Optional[int] = None
columns: Optional[int] = None
memo: Optional[str] = None
class GridCellModel(BaseModel):
cell_id: str
row: int
column: int
bounds: Tuple[int, int, int, int]
label: Optional[str] = None
class GridDescriptor(BaseModel):
grid_id: str
rows: int
columns: int
cells: List[GridCellModel]
metadata: Dict[str, Any] = Field(default_factory=dict)
class ActionPayload(BaseModel):
grid_id: str
action: ActionType
target_cell: Optional[str] = None
text: Optional[str] = None
comment: Optional[str] = None
data: Dict[str, Any] = Field(default_factory=dict)
class ActionResult(BaseModel):
success: bool
detail: str
coordinates: Optional[Tuple[int, int]] = None
payload: Dict[str, Any] = Field(default_factory=dict)
class GridPlanRequest(BaseModel):
preferred_label: Optional[str] = None
action: ActionType = ActionType.CLICK
text: Optional[str] = None
comment: Optional[str] = None
class GridRefreshRequest(BaseModel):
screenshot_base64: str
memo: Optional[str] = None

View File

@@ -1,70 +0,0 @@
from __future__ import annotations
from math import hypot
from typing import Sequence
from .models import ActionPayload, ActionType, GridCellModel, GridDescriptor
class GridPlanner:
"""Helper that picks a grid cell using simple heuristics."""
def select_cell(
self, descriptor: GridDescriptor, preferred_label: str | None = None
) -> GridCellModel | None:
if not descriptor.cells:
return None
if preferred_label:
match = self._match_label(descriptor.cells, preferred_label)
if match:
return match
center_point = self._grid_center(descriptor)
return min(descriptor.cells, key=lambda cell: self._distance(self._cell_center(cell), center_point))
def build_payload(
self,
descriptor: GridDescriptor,
action: ActionType = ActionType.CLICK,
preferred_label: str | None = None,
text: str | None = None,
comment: str | None = None,
) -> ActionPayload:
target = self.select_cell(descriptor, preferred_label)
return ActionPayload(
grid_id=descriptor.grid_id,
action=action,
target_cell=target.cell_id if target else None,
text=text,
comment=comment,
)
def describe(self, descriptor: GridDescriptor) -> str:
cell_count = len(descriptor.cells)
return (
f"Grid {descriptor.grid_id} is {descriptor.rows}x{descriptor.columns} with {cell_count} cells."
)
def _grid_center(self, descriptor: GridDescriptor) -> tuple[float, float]:
width = descriptor.metadata.get("width", 0)
height = descriptor.metadata.get("height", 0)
return (width / 2, height / 2)
def _cell_center(self, cell: GridCellModel) -> tuple[float, float]:
left, top, right, bottom = cell.bounds
return ((left + right) / 2, (top + bottom) / 2)
def _distance(
self, first: tuple[float, float], second: tuple[float, float]
) -> float:
return hypot(first[0] - second[0], first[1] - second[1])
def _match_label(
self, cells: Sequence[GridCellModel], label: str
) -> GridCellModel | None:
lowered = label.lower()
for cell in cells:
if cell.label and lowered in cell.label.lower():
return cell
return None

View File

@@ -1,38 +0,0 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List
from fastapi import WebSocket
from websockets.exceptions import ConnectionClosedError
class ScreenshotStreamer:
"""Keeps websocket listeners and pushes screenshot updates."""
def __init__(self) -> None:
self._listeners: DefaultDict[str, List[WebSocket]] = defaultdict(list)
async def connect(self, websocket: WebSocket, grid_id: str | None = None) -> str:
await websocket.accept()
key = grid_id or "*"
self._listeners[key].append(websocket)
return key
def disconnect(self, websocket: WebSocket, grid_key: str | None = None) -> None:
key = grid_key or "*"
sockets = self._listeners.get(key)
if not sockets:
return
if websocket in sockets:
sockets.remove(websocket)
if not sockets:
self._listeners.pop(key, None)
async def broadcast(self, grid_id: str, payload: Dict[str, Any]) -> None:
listeners = list(self._listeners.get(grid_id, [])) + list(self._listeners.get("*", []))
for websocket in listeners:
try:
await websocket.send_json(payload)
except (ConnectionClosedError, RuntimeError):
self.disconnect(websocket, grid_id)