feat: implement replay functionality with UI controls and backend support

This commit is contained in:
Space-Banane
2026-05-27 21:57:37 +02:00
parent 620fcc4aa6
commit 111a1e84af
4 changed files with 491 additions and 5 deletions

View File

@@ -30,6 +30,181 @@ class CreateJobRequest(BaseModel):
no_failsafe: bool = False
def _safe_int(value: Any) -> int | None:
try:
return int(value)
except Exception: # noqa: BLE001
return None
def _safe_text(value: Any, limit: int = 180) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return f"{text[:limit]}..."
def _resolve_artifact_path(artifacts_dir: Path | None, path_raw: Any) -> Path | None:
if artifacts_dir is None:
return None
text = str(path_raw or "").strip()
if not text:
return None
candidate = Path(text).resolve()
try:
candidate.relative_to(artifacts_dir)
except ValueError:
return None
return candidate
def _extract_replay_action(
event: dict[str, Any],
pending_tool_args: dict[tuple[int, str], list[dict[str, Any]]],
) -> dict[str, Any] | None:
event_type = str(event.get("event_type") or "")
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
step = int(event.get("step") or 0)
ts = str(event.get("ts") or "")
event_id = int(event.get("id") or 0)
if event_type == "tool_called":
tool = str(payload.get("tool") or "").strip()
args = payload.get("args") if isinstance(payload.get("args"), dict) else {}
if tool:
pending_tool_args.setdefault((step, tool), []).append(args)
action: dict[str, Any] = {
"ts": ts,
"step": step,
"event_id": event_id,
"kind": "tool_called",
"tool": tool,
"label": f"Call: {tool}" if tool else "Tool call",
}
if tool == "click":
coord = args.get("coordinate") if isinstance(args, dict) else None
if isinstance(coord, dict):
x = _safe_int(coord.get("x"))
y = _safe_int(coord.get("y"))
if x is not None and y is not None:
action["requested_click"] = {"x": x, "y": y}
action["label"] = f"Call: click ({x}, {y})"
elif tool == "type":
text = _safe_text((args or {}).get("text"), 120)
if text:
action["text_preview"] = text
action["label"] = f"Call: type \"{text}\""
return action
if event_type == "tool_result":
tool = str(payload.get("tool") or "").strip()
result = payload.get("result") if isinstance(payload.get("result"), dict) else {}
matching_args: dict[str, Any] = {}
key = (step, tool)
queued = pending_tool_args.get(key) or []
if queued:
matching_args = queued.pop(0)
if not queued:
pending_tool_args.pop(key, None)
action = {
"ts": ts,
"step": step,
"event_id": event_id,
"kind": "tool_result",
"tool": tool,
"ok": bool(result.get("ok")),
"label": f"Result: {tool}",
}
if tool == "click":
clicked = result.get("clicked") if isinstance(result.get("clicked"), dict) else {}
x = _safe_int(clicked.get("x"))
y = _safe_int(clicked.get("y"))
if x is not None and y is not None:
action["click"] = {"x": x, "y": y}
action["label"] = f"Clicked ({x}, {y})" if bool(result.get("ok")) else f"Click failed ({x}, {y})"
elif tool == "type":
text = _safe_text((matching_args or {}).get("text"), 120)
typed_length = _safe_int(result.get("typed_length"))
if typed_length is not None:
action["typed_length"] = typed_length
if text:
action["text_preview"] = text
action["label"] = f"Typed \"{text}\""
elif tool == "press_key":
key_name = _safe_text(result.get("key"), 80)
if key_name:
action["label"] = f"Pressed {key_name}"
elif tool == "execute_command":
command = _safe_text((matching_args or {}).get("command"), 140)
if command:
action["command_preview"] = command
action["label"] = f"Command: {command}"
return action
return None
def _build_replay_payload(job_id: str, job: dict[str, Any], events: list[dict[str, Any]]) -> dict[str, Any]:
artifacts_dir_raw = str(job.get("artifacts_dir") or "").strip()
artifacts_dir = Path(artifacts_dir_raw).resolve() if artifacts_dir_raw else None
pending_tool_args: dict[tuple[int, str], list[dict[str, Any]]] = {}
buffered_actions: list[dict[str, Any]] = []
frames: list[dict[str, Any]] = []
for event in events:
action = _extract_replay_action(event, pending_tool_args)
if action is not None:
buffered_actions.append(action)
if str(event.get("event_type") or "") != "visual_update":
continue
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
image_meta = payload.get("image_meta") if isinstance(payload.get("image_meta"), dict) else {}
resolved = _resolve_artifact_path(artifacts_dir, image_meta.get("path"))
if resolved is None or not resolved.exists() or not resolved.is_file():
continue
width = _safe_int(image_meta.get("width"))
height = _safe_int(image_meta.get("height"))
if width is None or height is None:
size = image_meta.get("screen_size") if isinstance(image_meta.get("screen_size"), dict) else {}
width = _safe_int(size.get("width"))
height = _safe_int(size.get("height"))
is_fullscreen = (
str(payload.get("kind") or "") == "see_screen"
and bool(image_meta.get("grid"))
and isinstance(width, int)
and isinstance(height, int)
and width > 0
and height > 0
)
frames.append(
{
"frame_index": len(frames),
"event_id": int(event.get("id") or 0),
"ts": str(event.get("ts") or ""),
"step": int(event.get("step") or 0),
"kind": str(payload.get("kind") or "visual_update"),
"image_path": str(resolved),
"image_meta": image_meta,
"screen_size": {"width": width, "height": height} if width and height else None,
"is_fullscreen": is_fullscreen,
"overlays": buffered_actions,
}
)
buffered_actions = []
return {
"job_id": job_id,
"total_events": len(events),
"total_frames": len(frames),
"frames": frames,
"trailing_events": buffered_actions,
}
class _WebSocketHub:
def __init__(self) -> None:
self._connections: set[WebSocket] = set()
@@ -161,6 +336,18 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
raise HTTPException(status_code=404, detail="Job not found")
return {"events": manager.get_events(job_id, limit=limit)}
@app.get("/api/jobs/{job_id}/replay")
def get_job_replay(
job_id: str,
limit: int = Query(default=5000, ge=1, le=5000),
_: None = Depends(require_token),
) -> dict[str, Any]:
job = manager.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
events = manager.get_events(job_id, limit=limit)
return _build_replay_payload(job_id, job, events)
@app.post("/api/jobs/{job_id}/cancel")
def cancel_job(job_id: str, _: None = Depends(require_token)) -> dict[str, Any]:
job = manager.get_job(job_id)