diff --git a/src/server.py b/src/server.py
index 599267a..f26a8c7 100644
--- a/src/server.py
+++ b/src/server.py
@@ -30,6 +30,181 @@ class CreateJobRequest(BaseModel):
no_failsafe: bool = False
+def _safe_int(value: Any) -> int | None:
+ try:
+ return int(value)
+ except Exception: # noqa: BLE001
+ return None
+
+
+def _safe_text(value: Any, limit: int = 180) -> str:
+ text = str(value or "").strip()
+ if len(text) <= limit:
+ return text
+ return f"{text[:limit]}..."
+
+
+def _resolve_artifact_path(artifacts_dir: Path | None, path_raw: Any) -> Path | None:
+ if artifacts_dir is None:
+ return None
+ text = str(path_raw or "").strip()
+ if not text:
+ return None
+ candidate = Path(text).resolve()
+ try:
+ candidate.relative_to(artifacts_dir)
+ except ValueError:
+ return None
+ return candidate
+
+
+def _extract_replay_action(
+ event: dict[str, Any],
+ pending_tool_args: dict[tuple[int, str], list[dict[str, Any]]],
+) -> dict[str, Any] | None:
+ event_type = str(event.get("event_type") or "")
+ payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
+ step = int(event.get("step") or 0)
+ ts = str(event.get("ts") or "")
+ event_id = int(event.get("id") or 0)
+
+ if event_type == "tool_called":
+ tool = str(payload.get("tool") or "").strip()
+ args = payload.get("args") if isinstance(payload.get("args"), dict) else {}
+ if tool:
+ pending_tool_args.setdefault((step, tool), []).append(args)
+ action: dict[str, Any] = {
+ "ts": ts,
+ "step": step,
+ "event_id": event_id,
+ "kind": "tool_called",
+ "tool": tool,
+ "label": f"Call: {tool}" if tool else "Tool call",
+ }
+ if tool == "click":
+ coord = args.get("coordinate") if isinstance(args, dict) else None
+ if isinstance(coord, dict):
+ x = _safe_int(coord.get("x"))
+ y = _safe_int(coord.get("y"))
+ if x is not None and y is not None:
+ action["requested_click"] = {"x": x, "y": y}
+ action["label"] = f"Call: click ({x}, {y})"
+ elif tool == "type":
+ text = _safe_text((args or {}).get("text"), 120)
+ if text:
+ action["text_preview"] = text
+ action["label"] = f"Call: type \"{text}\""
+ return action
+
+ if event_type == "tool_result":
+ tool = str(payload.get("tool") or "").strip()
+ result = payload.get("result") if isinstance(payload.get("result"), dict) else {}
+ matching_args: dict[str, Any] = {}
+ key = (step, tool)
+ queued = pending_tool_args.get(key) or []
+ if queued:
+ matching_args = queued.pop(0)
+ if not queued:
+ pending_tool_args.pop(key, None)
+
+ action = {
+ "ts": ts,
+ "step": step,
+ "event_id": event_id,
+ "kind": "tool_result",
+ "tool": tool,
+ "ok": bool(result.get("ok")),
+ "label": f"Result: {tool}",
+ }
+ if tool == "click":
+ clicked = result.get("clicked") if isinstance(result.get("clicked"), dict) else {}
+ x = _safe_int(clicked.get("x"))
+ y = _safe_int(clicked.get("y"))
+ if x is not None and y is not None:
+ action["click"] = {"x": x, "y": y}
+ action["label"] = f"Clicked ({x}, {y})" if bool(result.get("ok")) else f"Click failed ({x}, {y})"
+ elif tool == "type":
+ text = _safe_text((matching_args or {}).get("text"), 120)
+ typed_length = _safe_int(result.get("typed_length"))
+ if typed_length is not None:
+ action["typed_length"] = typed_length
+ if text:
+ action["text_preview"] = text
+ action["label"] = f"Typed \"{text}\""
+ elif tool == "press_key":
+ key_name = _safe_text(result.get("key"), 80)
+ if key_name:
+ action["label"] = f"Pressed {key_name}"
+ elif tool == "execute_command":
+ command = _safe_text((matching_args or {}).get("command"), 140)
+ if command:
+ action["command_preview"] = command
+ action["label"] = f"Command: {command}"
+ return action
+
+ return None
+
+
+def _build_replay_payload(job_id: str, job: dict[str, Any], events: list[dict[str, Any]]) -> dict[str, Any]:
+ artifacts_dir_raw = str(job.get("artifacts_dir") or "").strip()
+ artifacts_dir = Path(artifacts_dir_raw).resolve() if artifacts_dir_raw else None
+ pending_tool_args: dict[tuple[int, str], list[dict[str, Any]]] = {}
+ buffered_actions: list[dict[str, Any]] = []
+ frames: list[dict[str, Any]] = []
+
+ for event in events:
+ action = _extract_replay_action(event, pending_tool_args)
+ if action is not None:
+ buffered_actions.append(action)
+
+ if str(event.get("event_type") or "") != "visual_update":
+ continue
+ payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
+ image_meta = payload.get("image_meta") if isinstance(payload.get("image_meta"), dict) else {}
+ resolved = _resolve_artifact_path(artifacts_dir, image_meta.get("path"))
+ if resolved is None or not resolved.exists() or not resolved.is_file():
+ continue
+
+ width = _safe_int(image_meta.get("width"))
+ height = _safe_int(image_meta.get("height"))
+ if width is None or height is None:
+ size = image_meta.get("screen_size") if isinstance(image_meta.get("screen_size"), dict) else {}
+ width = _safe_int(size.get("width"))
+ height = _safe_int(size.get("height"))
+ is_fullscreen = (
+ str(payload.get("kind") or "") == "see_screen"
+ and bool(image_meta.get("grid"))
+ and isinstance(width, int)
+ and isinstance(height, int)
+ and width > 0
+ and height > 0
+ )
+
+ frames.append(
+ {
+ "frame_index": len(frames),
+ "event_id": int(event.get("id") or 0),
+ "ts": str(event.get("ts") or ""),
+ "step": int(event.get("step") or 0),
+ "kind": str(payload.get("kind") or "visual_update"),
+ "image_path": str(resolved),
+ "image_meta": image_meta,
+ "screen_size": {"width": width, "height": height} if width and height else None,
+ "is_fullscreen": is_fullscreen,
+ "overlays": buffered_actions,
+ }
+ )
+ buffered_actions = []
+
+ return {
+ "job_id": job_id,
+ "total_events": len(events),
+ "total_frames": len(frames),
+ "frames": frames,
+ "trailing_events": buffered_actions,
+ }
+
+
class _WebSocketHub:
def __init__(self) -> None:
self._connections: set[WebSocket] = set()
@@ -161,6 +336,18 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
raise HTTPException(status_code=404, detail="Job not found")
return {"events": manager.get_events(job_id, limit=limit)}
+ @app.get("/api/jobs/{job_id}/replay")
+ def get_job_replay(
+ job_id: str,
+ limit: int = Query(default=5000, ge=1, le=5000),
+ _: None = Depends(require_token),
+ ) -> dict[str, Any]:
+ job = manager.get_job(job_id)
+ if job is None:
+ raise HTTPException(status_code=404, detail="Job not found")
+ events = manager.get_events(job_id, limit=limit)
+ return _build_replay_payload(job_id, job, events)
+
@app.post("/api/jobs/{job_id}/cancel")
def cancel_job(job_id: str, _: None = Depends(require_token)) -> dict[str, Any]:
job = manager.get_job(job_id)
diff --git a/src/ui.py b/src/ui.py
index a4bc53f..f4c9939 100644
--- a/src/ui.py
+++ b/src/ui.py
@@ -43,6 +43,33 @@ def monitoring_page_html(device_hostname: str = "") -> str:
+
+
Replay
+
No replay loaded.
+
+
+
+
+
+
+
+
+
+
+
![Replay frame]()
+
+
+
+
+
Live Events