From 111a1e84af181c42cc1f2b39f89bf5ad81618e3c Mon Sep 17 00:00:00 2001 From: Space-Banane Date: Wed, 27 May 2026 21:57:37 +0200 Subject: [PATCH] feat: implement replay functionality with UI controls and backend support --- src/server.py | 187 +++++++++++++++++++++++++++++++++++ src/ui.py | 207 ++++++++++++++++++++++++++++++++++++++- tests/test_server_api.py | 100 ++++++++++++++++++- todo.md | 2 + 4 files changed, 491 insertions(+), 5 deletions(-) diff --git a/src/server.py b/src/server.py index 599267a..f26a8c7 100644 --- a/src/server.py +++ b/src/server.py @@ -30,6 +30,181 @@ class CreateJobRequest(BaseModel): no_failsafe: bool = False +def _safe_int(value: Any) -> int | None: + try: + return int(value) + except Exception: # noqa: BLE001 + return None + + +def _safe_text(value: Any, limit: int = 180) -> str: + text = str(value or "").strip() + if len(text) <= limit: + return text + return f"{text[:limit]}..." + + +def _resolve_artifact_path(artifacts_dir: Path | None, path_raw: Any) -> Path | None: + if artifacts_dir is None: + return None + text = str(path_raw or "").strip() + if not text: + return None + candidate = Path(text).resolve() + try: + candidate.relative_to(artifacts_dir) + except ValueError: + return None + return candidate + + +def _extract_replay_action( + event: dict[str, Any], + pending_tool_args: dict[tuple[int, str], list[dict[str, Any]]], +) -> dict[str, Any] | None: + event_type = str(event.get("event_type") or "") + payload = event.get("payload") if isinstance(event.get("payload"), dict) else {} + step = int(event.get("step") or 0) + ts = str(event.get("ts") or "") + event_id = int(event.get("id") or 0) + + if event_type == "tool_called": + tool = str(payload.get("tool") or "").strip() + args = payload.get("args") if isinstance(payload.get("args"), dict) else {} + if tool: + pending_tool_args.setdefault((step, tool), []).append(args) + action: dict[str, Any] = { + "ts": ts, + "step": step, + "event_id": event_id, + "kind": "tool_called", + "tool": tool, + "label": f"Call: {tool}" if tool else "Tool call", + } + if tool == "click": + coord = args.get("coordinate") if isinstance(args, dict) else None + if isinstance(coord, dict): + x = _safe_int(coord.get("x")) + y = _safe_int(coord.get("y")) + if x is not None and y is not None: + action["requested_click"] = {"x": x, "y": y} + action["label"] = f"Call: click ({x}, {y})" + elif tool == "type": + text = _safe_text((args or {}).get("text"), 120) + if text: + action["text_preview"] = text + action["label"] = f"Call: type \"{text}\"" + return action + + if event_type == "tool_result": + tool = str(payload.get("tool") or "").strip() + result = payload.get("result") if isinstance(payload.get("result"), dict) else {} + matching_args: dict[str, Any] = {} + key = (step, tool) + queued = pending_tool_args.get(key) or [] + if queued: + matching_args = queued.pop(0) + if not queued: + pending_tool_args.pop(key, None) + + action = { + "ts": ts, + "step": step, + "event_id": event_id, + "kind": "tool_result", + "tool": tool, + "ok": bool(result.get("ok")), + "label": f"Result: {tool}", + } + if tool == "click": + clicked = result.get("clicked") if isinstance(result.get("clicked"), dict) else {} + x = _safe_int(clicked.get("x")) + y = _safe_int(clicked.get("y")) + if x is not None and y is not None: + action["click"] = {"x": x, "y": y} + action["label"] = f"Clicked ({x}, {y})" if bool(result.get("ok")) else f"Click failed ({x}, {y})" + elif tool == "type": + text = _safe_text((matching_args or {}).get("text"), 120) + typed_length = _safe_int(result.get("typed_length")) + if typed_length is not None: + action["typed_length"] = typed_length + if text: + action["text_preview"] = text + action["label"] = f"Typed \"{text}\"" + elif tool == "press_key": + key_name = _safe_text(result.get("key"), 80) + if key_name: + action["label"] = f"Pressed {key_name}" + elif tool == "execute_command": + command = _safe_text((matching_args or {}).get("command"), 140) + if command: + action["command_preview"] = command + action["label"] = f"Command: {command}" + return action + + return None + + +def _build_replay_payload(job_id: str, job: dict[str, Any], events: list[dict[str, Any]]) -> dict[str, Any]: + artifacts_dir_raw = str(job.get("artifacts_dir") or "").strip() + artifacts_dir = Path(artifacts_dir_raw).resolve() if artifacts_dir_raw else None + pending_tool_args: dict[tuple[int, str], list[dict[str, Any]]] = {} + buffered_actions: list[dict[str, Any]] = [] + frames: list[dict[str, Any]] = [] + + for event in events: + action = _extract_replay_action(event, pending_tool_args) + if action is not None: + buffered_actions.append(action) + + if str(event.get("event_type") or "") != "visual_update": + continue + payload = event.get("payload") if isinstance(event.get("payload"), dict) else {} + image_meta = payload.get("image_meta") if isinstance(payload.get("image_meta"), dict) else {} + resolved = _resolve_artifact_path(artifacts_dir, image_meta.get("path")) + if resolved is None or not resolved.exists() or not resolved.is_file(): + continue + + width = _safe_int(image_meta.get("width")) + height = _safe_int(image_meta.get("height")) + if width is None or height is None: + size = image_meta.get("screen_size") if isinstance(image_meta.get("screen_size"), dict) else {} + width = _safe_int(size.get("width")) + height = _safe_int(size.get("height")) + is_fullscreen = ( + str(payload.get("kind") or "") == "see_screen" + and bool(image_meta.get("grid")) + and isinstance(width, int) + and isinstance(height, int) + and width > 0 + and height > 0 + ) + + frames.append( + { + "frame_index": len(frames), + "event_id": int(event.get("id") or 0), + "ts": str(event.get("ts") or ""), + "step": int(event.get("step") or 0), + "kind": str(payload.get("kind") or "visual_update"), + "image_path": str(resolved), + "image_meta": image_meta, + "screen_size": {"width": width, "height": height} if width and height else None, + "is_fullscreen": is_fullscreen, + "overlays": buffered_actions, + } + ) + buffered_actions = [] + + return { + "job_id": job_id, + "total_events": len(events), + "total_frames": len(frames), + "frames": frames, + "trailing_events": buffered_actions, + } + + class _WebSocketHub: def __init__(self) -> None: self._connections: set[WebSocket] = set() @@ -161,6 +336,18 @@ def create_app(config: AppConfig | None = None) -> FastAPI: raise HTTPException(status_code=404, detail="Job not found") return {"events": manager.get_events(job_id, limit=limit)} + @app.get("/api/jobs/{job_id}/replay") + def get_job_replay( + job_id: str, + limit: int = Query(default=5000, ge=1, le=5000), + _: None = Depends(require_token), + ) -> dict[str, Any]: + job = manager.get_job(job_id) + if job is None: + raise HTTPException(status_code=404, detail="Job not found") + events = manager.get_events(job_id, limit=limit) + return _build_replay_payload(job_id, job, events) + @app.post("/api/jobs/{job_id}/cancel") def cancel_job(job_id: str, _: None = Depends(require_token)) -> dict[str, Any]: job = manager.get_job(job_id) diff --git a/src/ui.py b/src/ui.py index a4bc53f..f4c9939 100644 --- a/src/ui.py +++ b/src/ui.py @@ -43,6 +43,33 @@ def monitoring_page_html(device_hostname: str = "") -> str:
Latest visual update
+
+

Replay

+
No replay loaded.
+
+
+ + + + +
+ +
+
+ Replay frame + +
+
+
+

Live Events