from __future__ import annotations import json import sqlite3 import threading from pathlib import Path from typing import Any class HistoryDB: def __init__(self, db_path: Path) -> None: self.db_path = db_path self._lock = threading.Lock() self._init_schema() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path, check_same_thread=False) conn.row_factory = sqlite3.Row return conn def _init_schema(self) -> None: with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS jobs ( job_id TEXT PRIMARY KEY, objective TEXT NOT NULL, model TEXT NOT NULL, status TEXT NOT NULL, created_at TEXT NOT NULL, started_at TEXT, ended_at TEXT, result TEXT, response_json TEXT, error TEXT, steps INTEGER DEFAULT 0, cancelled INTEGER DEFAULT 0, safety_checked INTEGER DEFAULT 0, safety_passed INTEGER DEFAULT 0, safety_reason TEXT, safety_override INTEGER DEFAULT 0, disabled_tools_json TEXT NOT NULL, artifacts_dir TEXT, input_tokens INTEGER DEFAULT 0, cached_input_tokens INTEGER DEFAULT 0, output_tokens INTEGER DEFAULT 0, reasoning_tokens INTEGER DEFAULT 0, total_tokens INTEGER DEFAULT 0, estimated_cost_usd REAL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS job_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT NOT NULL, ts TEXT NOT NULL, step INTEGER DEFAULT 0, event_type TEXT NOT NULL, payload_json TEXT NOT NULL, FOREIGN KEY(job_id) REFERENCES jobs(job_id) ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_job_events_job_id_id ON job_events(job_id, id)" ) columns = {row[1] for row in conn.execute("PRAGMA table_info(jobs)").fetchall()} if "response_json" not in columns: conn.execute("ALTER TABLE jobs ADD COLUMN response_json TEXT") conn.commit() def create_job( self, *, job_id: str, objective: str, model: str, created_at: str, safety_override: bool, disabled_tools: list[str], ) -> None: with self._lock, self._connect() as conn: conn.execute( """ INSERT INTO jobs ( job_id, objective, model, status, created_at, safety_override, disabled_tools_json ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( job_id, objective, model, "queued", created_at, int(safety_override), json.dumps(disabled_tools, ensure_ascii=False), ), ) conn.commit() def update_job(self, job_id: str, **fields: Any) -> None: if not fields: return keys = sorted(fields.keys()) assignments = ", ".join([f"{k} = ?" for k in keys]) values = [fields[k] for k in keys] with self._lock, self._connect() as conn: conn.execute(f"UPDATE jobs SET {assignments} WHERE job_id = ?", (*values, job_id)) conn.commit() def add_event(self, *, job_id: str, ts: str, step: int, event_type: str, payload: dict[str, Any]) -> None: with self._lock, self._connect() as conn: conn.execute( """ INSERT INTO job_events (job_id, ts, step, event_type, payload_json) VALUES (?, ?, ?, ?, ?) """, (job_id, ts, int(step), event_type, json.dumps(payload, ensure_ascii=False)), ) conn.commit() def get_job(self, job_id: str) -> dict[str, Any] | None: with self._connect() as conn: row = conn.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,)).fetchone() return self._row_to_job(row) if row else None def list_jobs(self, limit: int = 100) -> list[dict[str, Any]]: bounded = min(max(int(limit), 1), 1000) with self._connect() as conn: rows = conn.execute( "SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?", (bounded,), ).fetchall() return [self._row_to_job(row) for row in rows] def get_job_events(self, job_id: str, limit: int = 500) -> list[dict[str, Any]]: bounded = min(max(int(limit), 1), 5000) with self._connect() as conn: rows = conn.execute( """ SELECT id, job_id, ts, step, event_type, payload_json FROM job_events WHERE job_id = ? ORDER BY id ASC LIMIT ? """, (job_id, bounded), ).fetchall() events: list[dict[str, Any]] = [] for row in rows: payload = {} try: payload = json.loads(row["payload_json"]) if row["payload_json"] else {} except Exception: payload = {"_raw": row["payload_json"]} events.append( { "id": row["id"], "job_id": row["job_id"], "ts": row["ts"], "step": row["step"], "event_type": row["event_type"], "payload": payload, } ) return events def stats(self) -> dict[str, Any]: with self._connect() as conn: totals = conn.execute( """ SELECT COUNT(*) AS total_jobs, SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_jobs, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) AS completed_jobs, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed_jobs, SUM(CASE WHEN cancelled = 1 THEN 1 ELSE 0 END) AS cancelled_jobs, COALESCE(SUM(estimated_cost_usd), 0) AS total_estimated_cost FROM jobs """ ).fetchone() return dict(totals) if totals else {} def _row_to_job(self, row: sqlite3.Row) -> dict[str, Any]: disabled_tools: list[str] = [] try: disabled_tools = json.loads(row["disabled_tools_json"]) if row["disabled_tools_json"] else [] except Exception: disabled_tools = [] return { "job_id": row["job_id"], "objective": row["objective"], "model": row["model"], "status": row["status"], "created_at": row["created_at"], "started_at": row["started_at"], "ended_at": row["ended_at"], "result": row["result"], "response": self._parse_response_payload(row["response_json"], row["result"]), "error": row["error"], "steps": row["steps"], "cancelled": bool(row["cancelled"]), "safety_checked": bool(row["safety_checked"]), "safety_passed": bool(row["safety_passed"]), "safety_reason": row["safety_reason"], "safety_override": bool(row["safety_override"]), "disabled_tools": disabled_tools, "artifacts_dir": row["artifacts_dir"], "usage": { "input_tokens": row["input_tokens"] or 0, "cached_input_tokens": row["cached_input_tokens"] or 0, "output_tokens": row["output_tokens"] or 0, "reasoning_tokens": row["reasoning_tokens"] or 0, "total_tokens": row["total_tokens"] or 0, "estimated_cost_usd": row["estimated_cost_usd"], }, } def _parse_response_payload(self, response_json: str | None, result: str | None) -> dict[str, Any]: fallback_return = str(result or "").strip() if not response_json: return {"return": fallback_return, "data": None} try: payload = json.loads(response_json) if isinstance(payload, dict): return { "return": str(payload.get("return") or fallback_return), "data": payload.get("data"), } except Exception: pass return {"return": fallback_return, "data": None}