diff --git a/main.py b/main.py index 69c5bfc..4937eb9 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,4 @@ -from src.cli import main +from src.app_main import main if __name__ == "__main__": diff --git a/screenjob.py b/screenjob.py index 748ade4..27117bb 100644 --- a/screenjob.py +++ b/screenjob.py @@ -2,10 +2,10 @@ Compatibility wrapper. Preferred entry point: - python main.py "" + python main.py run "" """ -from src.cli import main +from src.app_main import main if __name__ == "__main__": diff --git a/src/agent.py b/src/agent.py index 28694da..feb3c85 100644 --- a/src/agent.py +++ b/src/agent.py @@ -3,14 +3,16 @@ from __future__ import annotations import json import logging import subprocess +import threading import time import traceback -from typing import Any +from typing import Any, Callable from openai import OpenAI from PIL import Image, ImageEnhance, ImageFilter, ImageOps -from .models import AgentResult, RunArtifacts +from .models import AgentResult, RunArtifacts, RuntimeOptions, UsageSummary +from .pricing import estimate_cost_usd from .utils import clamp, draw_global_grid, image_to_data_url, utc_now_iso try: @@ -46,32 +48,78 @@ class ScreenJobAgent: client: OpenAI, logger: logging.Logger, artifacts: RunArtifacts, - model: str, - max_steps: int, - command_timeout: int, - type_interval: float, - click_pause: float, + options: RuntimeOptions, + cancel_event: threading.Event | None = None, + event_callback: Callable[[dict[str, Any]], None] | None = None, ) -> None: self.client = client self.logger = logger self.artifacts = artifacts - self.model = model - self.max_steps = max_steps - self.command_timeout = command_timeout - self.type_interval = type_interval - self.click_pause = click_pause + self.options = options + self.cancel_event = cancel_event or threading.Event() + self.event_callback = event_callback self.step = 0 self.completed = False self.final_result = "" self.previous_response_id: str | None = None + self.usage = UsageSummary() self.last_screen_data_url: str | None = None self.last_screen_meta: dict[str, Any] | None = None self.click_history: list[tuple[int, int, float]] = [] + self.disabled_tools = {tool.strip() for tool in (options.disable_tools or set()) if tool.strip()} + + def _emit(self, event_type: str, payload: dict[str, Any]) -> None: + if self.event_callback is None: + return + event = { + "ts": utc_now_iso(), + "job_run_id": self.artifacts.run_id, + "step": self.step, + "event_type": event_type, + "payload": payload, + } + try: + self.event_callback(event) + except Exception: # noqa: BLE001 + self.logger.debug("Event callback failed.", exc_info=True) + + def _is_cancelled(self) -> bool: + return bool(self.cancel_event.is_set()) + + def _register_usage(self, response: Any) -> None: + usage_obj = getattr(response, "usage", None) + if usage_obj is None: + return + input_tokens = int(getattr(usage_obj, "input_tokens", 0) or 0) + output_tokens = int(getattr(usage_obj, "output_tokens", 0) or 0) + total_tokens = int(getattr(usage_obj, "total_tokens", input_tokens + output_tokens) or 0) + + input_details = getattr(usage_obj, "input_tokens_details", None) + cached_tokens = int(getattr(input_details, "cached_tokens", 0) or 0) if input_details else 0 + output_details = getattr(usage_obj, "output_tokens_details", None) + reasoning_tokens = int(getattr(output_details, "reasoning_tokens", 0) or 0) if output_details else 0 + + self.usage.input_tokens += input_tokens + self.usage.cached_input_tokens += cached_tokens + self.usage.output_tokens += output_tokens + self.usage.reasoning_tokens += reasoning_tokens + self.usage.total_tokens += total_tokens + estimated_cost, model_for_pricing = estimate_cost_usd(self.options.model, self.usage) + self.usage.estimated_cost_usd = estimated_cost + self.usage.model_for_pricing = model_for_pricing + + self._emit( + "usage_update", + { + "usage": self.usage.to_dict(), + "response_id": getattr(response, "id", None), + }, + ) def _tool_schemas(self) -> list[dict[str, Any]]: - return [ + all_tools: list[dict[str, Any]] = [ { "type": "function", "name": "task_complete", @@ -213,6 +261,7 @@ class ScreenJobAgent: }, }, ] + return [tool for tool in all_tools if tool["name"] not in self.disabled_tools] def _capture_screen(self, with_grid: bool = True) -> tuple[Image.Image, dict[str, Any]]: screenshot = pyautogui.screenshot().convert("RGB") @@ -378,13 +427,16 @@ class ScreenJobAgent: "recent_similar_clicks": len(near_same), } - pyautogui.moveTo(x, y, duration=self.click_pause) + pyautogui.moveTo(x, y, duration=self.options.click_pause) pyautogui.click(x=x, y=y) sleep_after = self._parse_seconds(args.get("sleep_after_seconds", 0), default=0.0, max_seconds=30.0) - if sleep_after > 0: - time.sleep(sleep_after) - else: - time.sleep(0.15) + wait_remaining = sleep_after if sleep_after > 0 else 0.15 + while wait_remaining > 0: + if self._is_cancelled(): + break + interval = min(0.05, wait_remaining) + time.sleep(interval) + wait_remaining -= interval return { "ok": True, @@ -401,7 +453,11 @@ class ScreenJobAgent: def _tool_type(self, args: dict[str, Any]) -> dict[str, Any]: text = str(args.get("text", "")) - pyautogui.write(text, interval=self.type_interval) + for char in text: + if self._is_cancelled(): + return {"ok": False, "cancelled": True, "typed_length": 0} + pyautogui.write(char, interval=0) + time.sleep(self.options.type_interval) return {"ok": True, "typed_length": len(text), "message": "Text typed."} def _tool_press_key(self, args: dict[str, Any]) -> dict[str, Any]: @@ -410,15 +466,25 @@ class ScreenJobAgent: if not key: return {"ok": False, "error": "Missing key."} repeats = min(repeats, 50) + pressed = 0 for _ in range(repeats): + if self._is_cancelled(): + break pyautogui.press(key) + pressed += 1 time.sleep(0.03) - return {"ok": True, "key": key, "repeats": repeats, "message": "Key press executed."} + return {"ok": True, "key": key, "repeats": pressed, "message": "Key press executed."} def _tool_sleep(self, args: dict[str, Any]) -> dict[str, Any]: seconds = self._parse_seconds(args.get("seconds"), default=0.0, max_seconds=60.0) - time.sleep(seconds) - return {"ok": True, "slept_seconds": seconds, "message": "Sleep completed."} + elapsed = 0.0 + while elapsed < seconds: + if self._is_cancelled(): + return {"ok": False, "cancelled": True, "slept_seconds": round(elapsed, 3)} + interval = min(0.1, seconds - elapsed) + time.sleep(interval) + elapsed += interval + return {"ok": True, "slept_seconds": round(seconds, 3), "message": "Sleep completed."} def _tool_execute_command(self, args: dict[str, Any]) -> dict[str, Any]: command = str(args.get("command", "")).strip() @@ -426,36 +492,55 @@ class ScreenJobAgent: return {"ok": False, "error": "Empty command."} started = time.time() + process: subprocess.Popen[str] | None = None try: - completed = subprocess.run( + process = subprocess.Popen( command, shell=True, - capture_output=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True, - timeout=self.command_timeout, - check=False, ) - elapsed_ms = int((time.time() - started) * 1000) + while True: + if self._is_cancelled(): + process.terminate() + return { + "ok": False, + "cancelled": True, + "command": command, + "elapsed_ms": int((time.time() - started) * 1000), + } + if process.poll() is not None: + break + if (time.time() - started) > self.options.command_timeout: + process.kill() + stdout, stderr = process.communicate(timeout=2) + return { + "ok": False, + "command": command, + "error": "Command timed out.", + "elapsed_ms": int((time.time() - started) * 1000), + "timeout_seconds": self.options.command_timeout, + "stdout": (stdout or "")[-12000:], + "stderr": (stderr or "")[-12000:], + } + time.sleep(0.05) + + stdout, stderr = process.communicate(timeout=2) return { "ok": True, "command": command, - "exit_code": completed.returncode, - "stdout": completed.stdout[-12000:], - "stderr": completed.stderr[-12000:], - "elapsed_ms": elapsed_ms, - } - except subprocess.TimeoutExpired as exc: - elapsed_ms = int((time.time() - started) * 1000) - return { - "ok": False, - "command": command, - "error": "Command timed out.", - "elapsed_ms": elapsed_ms, - "timeout_seconds": self.command_timeout, - "stdout": (exc.stdout or "")[-12000:], - "stderr": (exc.stderr or "")[-12000:], + "exit_code": process.returncode, + "stdout": (stdout or "")[-12000:], + "stderr": (stderr or "")[-12000:], + "elapsed_ms": int((time.time() - started) * 1000), } except Exception as exc: # noqa: BLE001 + if process is not None and process.poll() is None: + try: + process.kill() + except Exception: # noqa: BLE001 + pass return {"ok": False, "command": command, "error": f"{type(exc).__name__}: {exc}"} def _tool_task_complete(self, args: dict[str, Any]) -> dict[str, Any]: @@ -465,6 +550,8 @@ class ScreenJobAgent: return {"ok": True, "result": result} def _dispatch_tool(self, name: str, args: dict[str, Any]) -> dict[str, Any]: + if name in self.disabled_tools: + return {"ok": False, "error": f"Tool '{name}' is disabled for this job."} handlers = { "see_screen": self._tool_see_screen, "enhance": self._tool_enhance, @@ -491,7 +578,7 @@ class ScreenJobAgent: def _call_model(self, input_items: list[dict[str, Any]]) -> Any: return self.client.responses.create( - model=self.model, + model=self.options.model, instructions=SYSTEM_PROMPT, tools=self._tool_schemas(), input=input_items, @@ -502,8 +589,18 @@ class ScreenJobAgent: def run(self, job: str) -> AgentResult: started_at = time.time() - self.logger.info("Starting run_id=%s model=%s", self.artifacts.run_id, self.model) + self.logger.info("Starting run_id=%s model=%s", self.artifacts.run_id, self.options.model) self.logger.info("Job: %s", job) + self.logger.info("Disabled tools: %s", sorted(self.disabled_tools)) + self._emit( + "job_started", + { + "run_id": self.artifacts.run_id, + "model": self.options.model, + "objective": job, + "disabled_tools": sorted(self.disabled_tools), + }, + ) self._tool_see_screen({}) init_input: list[dict[str, Any]] = [ @@ -528,25 +625,37 @@ class ScreenJobAgent: ) pending_input = init_input + error_text: str | None = None + cancelled = False + + while self.step < self.options.max_steps and not self.completed: + if self._is_cancelled(): + cancelled = True + error_text = "Cancelled by user request." + break - while self.step < self.max_steps and not self.completed: self.step += 1 - self.logger.info("---- Agent step %d/%d ----", self.step, self.max_steps) + self.logger.info("---- Agent step %d/%d ----", self.step, self.options.max_steps) + self._emit("step_started", {"step": self.step, "max_steps": self.options.max_steps}) try: response = self._call_model(pending_input) + self._register_usage(response) except Exception as exc: # noqa: BLE001 self.logger.exception("OpenAI API call failed on step %d", self.step) - raise RuntimeError(f"OpenAI API call failed: {type(exc).__name__}: {exc}") from exc + error_text = f"OpenAI API call failed: {type(exc).__name__}: {exc}" + break self.previous_response_id = response.id output_items = list(response.output or []) text_preview = getattr(response, "output_text", "") or "" if text_preview.strip(): self.logger.info("Model text: %s", text_preview.strip()[:500]) + self._emit("model_text", {"step": self.step, "text": text_preview.strip()[:2000]}) tool_calls = [item for item in output_items if getattr(item, "type", None) == "function_call"] if not tool_calls: self.logger.warning("No tool calls returned; nudging model to continue with tools.") + self._emit("step_warning", {"step": self.step, "message": "No tool calls; nudged model."}) pending_input = [ { "role": "user", @@ -566,12 +675,21 @@ class ScreenJobAgent: next_input: list[dict[str, Any]] = [] for tool_call in tool_calls: + if self._is_cancelled(): + cancelled = True + error_text = "Cancelled by user request." + break + name = str(getattr(tool_call, "name", "")) call_id = str(getattr(tool_call, "call_id", "")) args_raw = getattr(tool_call, "arguments", "{}") args = self._safe_parse_args(args_raw) self.logger.info("Tool call: %s args=%s", name, json.dumps(args, ensure_ascii=False)) + self._emit( + "tool_called", + {"step": self.step, "tool": name, "args": args}, + ) try: result = self._dispatch_tool(name, args) except Exception as exc: # noqa: BLE001 @@ -587,6 +705,7 @@ class ScreenJobAgent: name, json.dumps(result, ensure_ascii=False)[:2500], ) + self._emit("tool_result", {"step": self.step, "tool": name, "result": result}) next_input.append( { "type": "function_call_output", @@ -600,26 +719,69 @@ class ScreenJobAgent: next_input.append( self._build_visual_message(title, self.last_screen_data_url, self.last_screen_meta) ) + self._emit( + "visual_update", + { + "step": self.step, + "kind": name, + "image_meta": self.last_screen_meta, + }, + ) + if cancelled: + break pending_input = next_input ended_at = time.time() if self.completed: self.logger.info("Task completed in %d step(s).", self.step) + self._emit("job_completed", {"result": self.final_result, "steps": self.step, "usage": self.usage.to_dict()}) return AgentResult( completed=True, result=self.final_result, steps=self.step, started_at=started_at, ended_at=ended_at, + usage=self.usage, ) - self.logger.warning("Stopped due to step limit (%d).", self.max_steps) + if cancelled: + self.logger.warning("Run cancelled by user after %d step(s).", self.step) + self._emit("job_cancelled", {"steps": self.step, "usage": self.usage.to_dict()}) + return AgentResult( + completed=False, + result="Cancelled by user request.", + steps=self.step, + started_at=started_at, + ended_at=ended_at, + usage=self.usage, + error=error_text, + cancelled=True, + ) + + if error_text: + self.logger.error("Run failed: %s", error_text) + self._emit("job_failed", {"steps": self.step, "error": error_text, "usage": self.usage.to_dict()}) + return AgentResult( + completed=False, + result=error_text, + steps=self.step, + started_at=started_at, + ended_at=ended_at, + usage=self.usage, + error=error_text, + ) + + self.logger.warning("Stopped due to step limit (%d).", self.options.max_steps) + result_text = f"Stopped after max steps ({self.options.max_steps}) without task_complete." + self._emit("job_stopped", {"steps": self.step, "reason": "max_steps", "usage": self.usage.to_dict()}) return AgentResult( completed=False, - result=f"Stopped after max steps ({self.max_steps}) without task_complete.", + result=result_text, steps=self.step, started_at=started_at, ended_at=ended_at, + usage=self.usage, + error=result_text, ) diff --git a/src/app_main.py b/src/app_main.py new file mode 100644 index 0000000..8ef6fa5 --- /dev/null +++ b/src/app_main.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import sys + +from . import cli, server + + +def main(argv: list[str] | None = None) -> int: + args = list(sys.argv[1:] if argv is None else argv) + if not args: + print("Usage: python main.py run \"\" | python main.py server") + return 2 + + command = args[0].strip().lower() + if command in {"-h", "--help", "help"}: + print("Usage:") + print(" python main.py run \"\" [options]") + print(" python main.py server") + print("") + print("Examples:") + print(" python main.py run \"Open amazon.de\"") + print(" python main.py server") + return 0 + if command in {"server", "serve"}: + if any(flag in {"-h", "--help"} for flag in args[1:]): + print("Server mode:") + print(" python main.py server") + print("") + print("Env:") + print(" OPENAI_API_KEY=...") + print(" SCREENJOB_TOKEN=...") + print(" DISABLE_UI=true|false (optional)") + return 0 + server.main() + return 0 + if command in {"run"}: + return cli.main(args[1:]) + return cli.main(args) diff --git a/src/cli.py b/src/cli.py index 4d4cec5..c0cbe85 100644 --- a/src/cli.py +++ b/src/cli.py @@ -2,146 +2,96 @@ from __future__ import annotations import argparse import json -import os import sys from pathlib import Path -from dotenv import load_dotenv -from openai import OpenAI - -from .agent import ScreenJobAgent -from .utils import setup_artifacts, setup_logger - -try: - import pyautogui -except Exception as import_exc: - raise RuntimeError( - "pyautogui is required. Install dependencies with: pip install pyautogui pillow" - ) from import_exc +from .config import load_app_config +from .models import RuntimeOptions +from .runtime import create_openai_client, run_job +from .safety import assess_task_safety def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( - description="Run an autonomous desktop task agent using OpenAI + UI tools.", + description="Run an autonomous desktop task agent using OpenAI + local tools.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" - ' python main.py "Open amazon.de"\n' - ' python main.py "Open amazon.de and search for mechanical keyboard" --max-steps 80\n\n' - "Artifacts:\n" - " Each run stores logs/screens in ./screenjob_runs/run_YYYYMMDD_HHMMSS/" + ' python main.py run "Open amazon.de"\n' + ' python main.py run "Open amazon.de and search for keyboard" --max-steps 80\n' + ' python main.py run "Open amazon.de" --disable-tool click --disable-tool type\n' ), ) parser.add_argument("job", type=str, help="Task objective for the agent.") - parser.add_argument("--model", type=str, default="gpt-5.2", help="OpenAI model name.") + parser.add_argument("--model", type=str, default=None, help="OpenAI model name.") parser.add_argument("--max-steps", type=int, default=60, help="Max tool-iteration steps.") - parser.add_argument( - "--command-timeout", - type=int, - default=45, - help="Timeout (seconds) for execute_command tool.", - ) - parser.add_argument( - "--type-interval", - type=float, - default=0.02, - help="Seconds between typed characters.", - ) - parser.add_argument( - "--click-pause", - type=float, - default=0.10, - help="Mouse move duration before click (seconds).", - ) - parser.add_argument( - "--no-failsafe", - action="store_true", - help="Disable PyAutoGUI fail-safe. Not recommended.", - ) + parser.add_argument("--command-timeout", type=int, default=45, help="Timeout in seconds for execute_command.") + parser.add_argument("--type-interval", type=float, default=0.02, help="Seconds between typed characters.") + parser.add_argument("--click-pause", type=float, default=0.10, help="Mouse move duration before click.") + parser.add_argument("--disable-tool", action="append", default=[], help="Disable a tool by name.") + parser.add_argument("--skip-safety-check", action="store_true", help="Bypass pre-flight safety check.") + parser.add_argument("--no-failsafe", action="store_true", help="Disable PyAutoGUI fail-safe.") return parser -def main() -> int: - load_dotenv() +def main(argv: list[str] | None = None) -> int: parser = build_parser() - args = parser.parse_args() + args = parser.parse_args(argv) + cwd = Path.cwd() + config = load_app_config(cwd) - api_key = os.getenv("OPENAI_API_KEY", "").strip() - if not api_key: - print("ERROR: Missing OPENAI_API_KEY (expected in environment or .env).", file=sys.stderr) + if not config.openai_api_key: + print("ERROR: Missing OPENAI_API_KEY in environment/.env", file=sys.stderr) return 2 - pyautogui.FAILSAFE = not args.no_failsafe - pyautogui.PAUSE = 0.05 + model = args.model or config.default_model + disabled_tools = sorted({str(x).strip() for x in args.disable_tool if str(x).strip()}) - runs_base = Path.cwd() / "screenjob_runs" - artifacts = setup_artifacts(runs_base) - logger = setup_logger(artifacts.log_file, verbose=True) + if not args.skip_safety_check: + safety_client = create_openai_client(config.openai_api_key) + safe, reason, parsed = assess_task_safety( + safety_client, + model=config.safety_model, + objective=args.job, + disabled_tools=disabled_tools, + ) + if not safe: + print( + json.dumps( + { + "completed": False, + "result": f"Blocked by safety check: {reason}", + "safety": parsed, + }, + ensure_ascii=False, + indent=2, + ) + ) + return 1 - logger.info("ScreenJob booting. Artifacts: %s", str(artifacts.root_dir.resolve())) - logger.info("PyAutoGUI FAILSAFE=%s", pyautogui.FAILSAFE) - - try: - client = OpenAI(api_key=api_key) - except Exception as exc: # noqa: BLE001 - logger.exception("Failed to create OpenAI client.") - print(f"ERROR: Could not initialize OpenAI client: {exc}", file=sys.stderr) - return 2 - - agent = ScreenJobAgent( - client=client, - logger=logger, - artifacts=artifacts, - model=args.model, + options = RuntimeOptions( + model=model, max_steps=args.max_steps, command_timeout=args.command_timeout, type_interval=args.type_interval, click_pause=args.click_pause, + disable_tools=set(disabled_tools), ) - try: - result = agent.run(args.job) - elapsed = result.ended_at - result.started_at - logger.info("Run finished. completed=%s elapsed=%.2fs", result.completed, elapsed) - print( - json.dumps( - { - "completed": result.completed, - "result": result.result, - "steps": result.steps, - "elapsed_seconds": round(elapsed, 3), - "artifacts_dir": str(artifacts.root_dir.resolve()), - }, - ensure_ascii=False, - indent=2, - ) + result, artifacts = run_job( + api_key=config.openai_api_key, + objective=args.job, + options=options, + runs_base=config.runs_dir, + no_failsafe=args.no_failsafe, ) - return 0 if result.completed else 1 except KeyboardInterrupt: - logger.warning("Interrupted by user.") - print( - json.dumps( - { - "completed": False, - "result": "Interrupted by user.", - "steps": agent.step, - "artifacts_dir": str(artifacts.root_dir.resolve()), - }, - ensure_ascii=False, - indent=2, - ) - ) + print(json.dumps({"completed": False, "result": "Interrupted by user."}, ensure_ascii=False, indent=2)) return 130 except Exception as exc: # noqa: BLE001 - logger.exception("Fatal runtime error.") print( json.dumps( - { - "completed": False, - "result": f"Fatal error: {type(exc).__name__}: {exc}", - "steps": agent.step, - "artifacts_dir": str(artifacts.root_dir.resolve()), - }, + {"completed": False, "result": f"Fatal error: {type(exc).__name__}: {exc}"}, ensure_ascii=False, indent=2, ), @@ -149,3 +99,16 @@ def main() -> int: ) return 1 + payload = { + "completed": result.completed, + "result": result.result, + "steps": result.steps, + "elapsed_seconds": round(result.ended_at - result.started_at, 3), + "artifacts_dir": str(artifacts.root_dir.resolve()), + "usage": result.usage.to_dict(), + "error": result.error, + "cancelled": result.cancelled, + } + print(json.dumps(payload, ensure_ascii=False, indent=2)) + return 0 if result.completed else 1 + diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..ed12c2e --- /dev/null +++ b/src/config.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +from dotenv import load_dotenv + + +def _env_bool(name: str, default: bool = False) -> bool: + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +@dataclass(frozen=True) +class AppConfig: + openai_api_key: str + screenjob_token: str + disable_ui: bool + default_model: str + safety_model: str + host: str + port: int + runs_dir: Path + db_path: Path + + +def load_app_config(cwd: Path) -> AppConfig: + load_dotenv() + openai_api_key = os.getenv("OPENAI_API_KEY", "").strip() + screenjob_token = os.getenv("SCREENJOB_TOKEN", "").strip() + default_model = os.getenv("SCREENJOB_DEFAULT_MODEL", "gpt-5.4-mini").strip() or "gpt-5.4-mini" + safety_model = os.getenv("SCREENJOB_SAFETY_MODEL", "gpt-5.4-mini").strip() or "gpt-5.4-mini" + host = os.getenv("SCREENJOB_HOST", "127.0.0.1").strip() or "127.0.0.1" + port = int(os.getenv("SCREENJOB_PORT", "8787").strip() or "8787") + runs_dir = cwd / "screenjob_runs" + db_path = cwd / "screenjob.db" + disable_ui = _env_bool("DISABLE_UI", default=False) + return AppConfig( + openai_api_key=openai_api_key, + screenjob_token=screenjob_token, + disable_ui=disable_ui, + default_model=default_model, + safety_model=safety_model, + host=host, + port=port, + runs_dir=runs_dir, + db_path=db_path, + ) + diff --git a/src/models.py b/src/models.py index ade7587..6528cbe 100644 --- a/src/models.py +++ b/src/models.py @@ -2,6 +2,7 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path +from typing import Any @dataclass @@ -21,4 +22,38 @@ class AgentResult: steps: int started_at: float ended_at: float + usage: "UsageSummary" + error: str | None = None + cancelled: bool = False + +@dataclass +class UsageSummary: + input_tokens: int = 0 + cached_input_tokens: int = 0 + output_tokens: int = 0 + reasoning_tokens: int = 0 + total_tokens: int = 0 + estimated_cost_usd: float | None = None + model_for_pricing: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "input_tokens": self.input_tokens, + "cached_input_tokens": self.cached_input_tokens, + "output_tokens": self.output_tokens, + "reasoning_tokens": self.reasoning_tokens, + "total_tokens": self.total_tokens, + "estimated_cost_usd": self.estimated_cost_usd, + "model_for_pricing": self.model_for_pricing, + } + + +@dataclass +class RuntimeOptions: + model: str + max_steps: int = 60 + command_timeout: int = 45 + type_interval: float = 0.02 + click_pause: float = 0.10 + disable_tools: set[str] | None = None diff --git a/src/pricing.py b/src/pricing.py new file mode 100644 index 0000000..daf265d --- /dev/null +++ b/src/pricing.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .models import UsageSummary + + +@dataclass(frozen=True) +class ModelRate: + input_per_million: float + cached_input_per_million: float | None + output_per_million: float + + +# Prices are USD per 1M tokens (standard tier). +# Source: OpenAI API pricing pages (captured 2026-05-27). +MODEL_RATES: dict[str, ModelRate] = { + "gpt-5.4-mini": ModelRate(0.75, 0.075, 4.50), + "gpt-5.4": ModelRate(2.50, 0.25, 15.00), + "gpt-5.2": ModelRate(1.75, 0.175, 14.00), + "gpt-5.2-chat-latest": ModelRate(1.75, 0.175, 14.00), + "gpt-5-mini": ModelRate(0.25, 0.025, 2.00), + "gpt-5-nano": ModelRate(0.05, 0.005, 0.40), +} + + +def normalize_model_for_pricing(model: str) -> str: + model_name = model.strip().lower() + for key in MODEL_RATES: + if model_name == key or model_name.startswith(f"{key}-"): + return key + return model_name + + +def estimate_cost_usd(model: str, usage: UsageSummary) -> tuple[float | None, str]: + normalized = normalize_model_for_pricing(model) + rate = MODEL_RATES.get(normalized) + if rate is None: + return None, normalized + + cached = max(0, usage.cached_input_tokens) + non_cached_input = max(0, usage.input_tokens - cached) + input_cost = (non_cached_input / 1_000_000.0) * rate.input_per_million + cached_cost = 0.0 + if rate.cached_input_per_million is not None: + cached_cost = (cached / 1_000_000.0) * rate.cached_input_per_million + output_cost = (max(0, usage.output_tokens) / 1_000_000.0) * rate.output_per_million + return input_cost + cached_cost + output_cost, normalized + diff --git a/src/runtime.py b/src/runtime.py new file mode 100644 index 0000000..a5f35bd --- /dev/null +++ b/src/runtime.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import logging +import threading +from pathlib import Path +from typing import Any, Callable + +from openai import OpenAI + +from .agent import ScreenJobAgent +from .models import AgentResult, RunArtifacts, RuntimeOptions +from .utils import setup_artifacts, setup_logger + +try: + import pyautogui +except Exception as import_exc: + raise RuntimeError( + "pyautogui is required. Install dependencies with: pip install pyautogui pillow" + ) from import_exc + + +def create_openai_client(api_key: str) -> OpenAI: + return OpenAI(api_key=api_key) + + +def run_job( + *, + api_key: str, + objective: str, + options: RuntimeOptions, + runs_base: Path, + no_failsafe: bool = False, + cancel_event: threading.Event | None = None, + event_callback: Callable[[dict[str, Any]], None] | None = None, + logger: logging.Logger | None = None, +) -> tuple[AgentResult, RunArtifacts]: + pyautogui.FAILSAFE = not no_failsafe + pyautogui.PAUSE = 0.05 + + artifacts = setup_artifacts(runs_base) + active_logger = logger or setup_logger(artifacts.log_file, verbose=True) + active_logger.info("ScreenJob booting. Artifacts: %s", str(artifacts.root_dir.resolve())) + active_logger.info("PyAutoGUI FAILSAFE=%s", pyautogui.FAILSAFE) + + client = create_openai_client(api_key) + agent = ScreenJobAgent( + client=client, + logger=active_logger, + artifacts=artifacts, + options=options, + cancel_event=cancel_event, + event_callback=event_callback, + ) + result = agent.run(objective) + active_logger.info("Run finished. completed=%s elapsed=%.2fs", result.completed, result.ended_at - result.started_at) + return result, artifacts + diff --git a/src/safety.py b/src/safety.py new file mode 100644 index 0000000..ba0a661 --- /dev/null +++ b/src/safety.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import json +from typing import Any + +from openai import OpenAI + + +SAFETY_PROMPT = """ +You are a safety gate for a desktop automation agent that can execute terminal commands, click/type on screen, and browse websites. +Classify whether the user objective can be executed safely and legally. + +Return strict JSON only: +{ + "safe": true|false, + "reason": "short reason", + "risk_level": "low|medium|high", + "recommended_disabled_tools": ["optional_tool_names"] +} +""" + + +def _extract_json(text: str) -> dict[str, Any]: + text = text.strip() + try: + parsed = json.loads(text) + return parsed if isinstance(parsed, dict) else {} + except Exception: + pass + start = text.find("{") + end = text.rfind("}") + if start >= 0 and end > start: + try: + parsed = json.loads(text[start : end + 1]) + return parsed if isinstance(parsed, dict) else {} + except Exception: + return {} + return {} + + +def assess_task_safety( + client: OpenAI, + *, + model: str, + objective: str, + disabled_tools: list[str], +) -> tuple[bool, str, dict[str, Any]]: + try: + response = client.responses.create( + model=model, + instructions=SAFETY_PROMPT, + input=[ + { + "role": "user", + "content": [ + { + "type": "input_text", + "text": ( + "Objective:\n" + f"{objective}\n\n" + f"Disabled tools: {json.dumps(disabled_tools, ensure_ascii=False)}\n" + "Answer with strict JSON only." + ), + } + ], + } + ], + ) + except Exception as exc: # noqa: BLE001 + return False, f"Safety check failed: {type(exc).__name__}: {exc}", {"safe": False} + + raw_text = getattr(response, "output_text", "") or "" + parsed = _extract_json(raw_text) + safe = bool(parsed.get("safe", False)) + reason = str(parsed.get("reason", "")).strip() or "No reason provided by safety check." + if not parsed: + safe = False + reason = "Safety check returned unparseable response." + return safe, reason, parsed + diff --git a/src/server.py b/src/server.py new file mode 100644 index 0000000..3b796cd --- /dev/null +++ b/src/server.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import asyncio +import secrets +from pathlib import Path +from typing import Any + +from fastapi import Depends, FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse, JSONResponse +from pydantic import BaseModel, Field + +from .config import AppConfig, load_app_config +from .storage import HistoryDB +from .task_manager import JobManager +from .ui import monitoring_page_html + + +class CreateJobRequest(BaseModel): + job: str = Field(..., min_length=1) + model: str | None = None + max_steps: int = Field(60, ge=1, le=400) + command_timeout: int = Field(45, ge=1, le=600) + type_interval: float = Field(0.02, ge=0.0, le=1.0) + click_pause: float = Field(0.10, ge=0.0, le=2.0) + disabled_tools: list[str] = Field(default_factory=list) + safety_override: bool = False + no_failsafe: bool = False + + +class _WebSocketHub: + def __init__(self) -> None: + self._connections: set[WebSocket] = set() + self._lock = asyncio.Lock() + self._loop: asyncio.AbstractEventLoop | None = None + + def set_loop(self, loop: asyncio.AbstractEventLoop) -> None: + self._loop = loop + + async def connect(self, websocket: WebSocket) -> None: + await websocket.accept() + async with self._lock: + self._connections.add(websocket) + + async def disconnect(self, websocket: WebSocket) -> None: + async with self._lock: + self._connections.discard(websocket) + + async def broadcast(self, message: dict[str, Any]) -> None: + async with self._lock: + clients = list(self._connections) + dead: list[WebSocket] = [] + for ws in clients: + try: + await ws.send_json(message) + except Exception: # noqa: BLE001 + dead.append(ws) + if dead: + async with self._lock: + for ws in dead: + self._connections.discard(ws) + + def broadcast_from_thread(self, message: dict[str, Any]) -> None: + if self._loop is None: + return + asyncio.run_coroutine_threadsafe(self.broadcast(message), self._loop) + + +def create_app(config: AppConfig | None = None) -> FastAPI: + app_config = config or load_app_config(cwd=Path.cwd()) + if not app_config.openai_api_key: + raise RuntimeError("OPENAI_API_KEY is required in environment or .env.") + if not app_config.screenjob_token: + raise RuntimeError("SCREENJOB_TOKEN is required in environment or .env.") + + app = FastAPI(title="ScreenJob API", version="1.0.0") + db = HistoryDB(app_config.db_path) + ws_hub = _WebSocketHub() + manager = JobManager(config=app_config, db=db, broadcast=ws_hub.broadcast_from_thread) + + app.state.config = app_config + app.state.db = db + app.state.ws_hub = ws_hub + app.state.manager = manager + + @app.on_event("startup") + async def _on_startup() -> None: + ws_hub.set_loop(asyncio.get_running_loop()) + + def _extract_token(authorization: str | None, x_screenjob_token: str | None) -> str: + if x_screenjob_token: + return x_screenjob_token.strip() + if authorization: + token = authorization.strip() + if token.lower().startswith("bearer "): + return token[7:].strip() + return token + return "" + + def require_token( + authorization: str | None = Header(default=None), + x_screenjob_token: str | None = Header(default=None), + ) -> None: + token = _extract_token(authorization, x_screenjob_token) + if not token or not secrets.compare_digest(token, app_config.screenjob_token): + raise HTTPException(status_code=401, detail="Unauthorized") + + @app.post("/api/jobs") + def create_job(payload: CreateJobRequest, _: None = Depends(require_token)) -> dict[str, str]: + job_id = manager.submit_job( + objective=payload.job, + model=payload.model, + max_steps=payload.max_steps, + command_timeout=payload.command_timeout, + type_interval=payload.type_interval, + click_pause=payload.click_pause, + disabled_tools=payload.disabled_tools, + safety_override=payload.safety_override, + no_failsafe=payload.no_failsafe, + ) + return {"job_id": job_id} + + @app.get("/api/jobs") + def list_jobs(limit: int = Query(default=100, ge=1, le=500), _: None = Depends(require_token)) -> dict[str, Any]: + return {"jobs": manager.list_jobs(limit=limit)} + + @app.get("/api/jobs/{job_id}") + def get_job(job_id: str, _: None = Depends(require_token)) -> dict[str, Any]: + job = manager.get_job(job_id) + if job is None: + raise HTTPException(status_code=404, detail="Job not found") + return job + + @app.get("/api/jobs/{job_id}/events") + def get_job_events( + job_id: str, + limit: int = Query(default=500, ge=1, le=5000), + _: None = Depends(require_token), + ) -> dict[str, Any]: + job = manager.get_job(job_id) + if job is None: + raise HTTPException(status_code=404, detail="Job not found") + return {"events": manager.get_events(job_id, limit=limit)} + + @app.post("/api/jobs/{job_id}/cancel") + def cancel_job(job_id: str, _: None = Depends(require_token)) -> dict[str, Any]: + job = manager.get_job(job_id) + if job is None: + raise HTTPException(status_code=404, detail="Job not found") + accepted = manager.cancel_job(job_id) + return {"job_id": job_id, "cancel_requested": bool(accepted)} + + @app.get("/api/stats") + def stats(_: None = Depends(require_token)) -> dict[str, Any]: + return manager.stats() + + if not app_config.disable_ui: + @app.get("/", response_class=HTMLResponse) + def ui_root() -> str: + return monitoring_page_html() + + @app.websocket("/ws") + async def ws_endpoint(websocket: WebSocket, token: str = Query(default="")) -> None: + if not token or not secrets.compare_digest(token, app_config.screenjob_token): + await websocket.close(code=1008) + return + await ws_hub.connect(websocket) + try: + await websocket.send_json({"event_type": "connected", "payload": {"ok": True}}) + while True: + await websocket.receive_text() + except WebSocketDisconnect: + await ws_hub.disconnect(websocket) + except Exception: + await ws_hub.disconnect(websocket) + else: + @app.get("/", response_class=JSONResponse) + def ui_disabled() -> dict[str, Any]: + return {"ok": True, "ui_disabled": True} + + return app + + +def main() -> None: + import uvicorn + + app = create_app(load_app_config(Path.cwd())) + config = app.state.config + uvicorn.run(app, host=config.host, port=config.port, log_level="info") diff --git a/src/storage.py b/src/storage.py new file mode 100644 index 0000000..c4ab525 --- /dev/null +++ b/src/storage.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +import json +import sqlite3 +import threading +from pathlib import Path +from typing import Any + + +class HistoryDB: + def __init__(self, db_path: Path) -> None: + self.db_path = db_path + self._lock = threading.Lock() + self._init_schema() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path, check_same_thread=False) + conn.row_factory = sqlite3.Row + return conn + + def _init_schema(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS jobs ( + job_id TEXT PRIMARY KEY, + objective TEXT NOT NULL, + model TEXT NOT NULL, + status TEXT NOT NULL, + created_at TEXT NOT NULL, + started_at TEXT, + ended_at TEXT, + result TEXT, + error TEXT, + steps INTEGER DEFAULT 0, + cancelled INTEGER DEFAULT 0, + safety_checked INTEGER DEFAULT 0, + safety_passed INTEGER DEFAULT 0, + safety_reason TEXT, + safety_override INTEGER DEFAULT 0, + disabled_tools_json TEXT NOT NULL, + artifacts_dir TEXT, + input_tokens INTEGER DEFAULT 0, + cached_input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + reasoning_tokens INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + estimated_cost_usd REAL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS job_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL, + ts TEXT NOT NULL, + step INTEGER DEFAULT 0, + event_type TEXT NOT NULL, + payload_json TEXT NOT NULL, + FOREIGN KEY(job_id) REFERENCES jobs(job_id) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_job_events_job_id_id ON job_events(job_id, id)" + ) + conn.commit() + + def create_job( + self, + *, + job_id: str, + objective: str, + model: str, + created_at: str, + safety_override: bool, + disabled_tools: list[str], + ) -> None: + with self._lock, self._connect() as conn: + conn.execute( + """ + INSERT INTO jobs ( + job_id, objective, model, status, created_at, + safety_override, disabled_tools_json + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + job_id, + objective, + model, + "queued", + created_at, + int(safety_override), + json.dumps(disabled_tools, ensure_ascii=False), + ), + ) + conn.commit() + + def update_job(self, job_id: str, **fields: Any) -> None: + if not fields: + return + keys = sorted(fields.keys()) + assignments = ", ".join([f"{k} = ?" for k in keys]) + values = [fields[k] for k in keys] + with self._lock, self._connect() as conn: + conn.execute(f"UPDATE jobs SET {assignments} WHERE job_id = ?", (*values, job_id)) + conn.commit() + + def add_event(self, *, job_id: str, ts: str, step: int, event_type: str, payload: dict[str, Any]) -> None: + with self._lock, self._connect() as conn: + conn.execute( + """ + INSERT INTO job_events (job_id, ts, step, event_type, payload_json) + VALUES (?, ?, ?, ?, ?) + """, + (job_id, ts, int(step), event_type, json.dumps(payload, ensure_ascii=False)), + ) + conn.commit() + + def get_job(self, job_id: str) -> dict[str, Any] | None: + with self._connect() as conn: + row = conn.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,)).fetchone() + return self._row_to_job(row) if row else None + + def list_jobs(self, limit: int = 100) -> list[dict[str, Any]]: + bounded = min(max(int(limit), 1), 1000) + with self._connect() as conn: + rows = conn.execute( + "SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?", + (bounded,), + ).fetchall() + return [self._row_to_job(row) for row in rows] + + def get_job_events(self, job_id: str, limit: int = 500) -> list[dict[str, Any]]: + bounded = min(max(int(limit), 1), 5000) + with self._connect() as conn: + rows = conn.execute( + """ + SELECT id, job_id, ts, step, event_type, payload_json + FROM job_events + WHERE job_id = ? + ORDER BY id ASC + LIMIT ? + """, + (job_id, bounded), + ).fetchall() + events: list[dict[str, Any]] = [] + for row in rows: + payload = {} + try: + payload = json.loads(row["payload_json"]) if row["payload_json"] else {} + except Exception: + payload = {"_raw": row["payload_json"]} + events.append( + { + "id": row["id"], + "job_id": row["job_id"], + "ts": row["ts"], + "step": row["step"], + "event_type": row["event_type"], + "payload": payload, + } + ) + return events + + def stats(self) -> dict[str, Any]: + with self._connect() as conn: + totals = conn.execute( + """ + SELECT + COUNT(*) AS total_jobs, + SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_jobs, + SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) AS completed_jobs, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed_jobs, + SUM(CASE WHEN cancelled = 1 THEN 1 ELSE 0 END) AS cancelled_jobs, + COALESCE(SUM(estimated_cost_usd), 0) AS total_estimated_cost + FROM jobs + """ + ).fetchone() + return dict(totals) if totals else {} + + def _row_to_job(self, row: sqlite3.Row) -> dict[str, Any]: + disabled_tools: list[str] = [] + try: + disabled_tools = json.loads(row["disabled_tools_json"]) if row["disabled_tools_json"] else [] + except Exception: + disabled_tools = [] + return { + "job_id": row["job_id"], + "objective": row["objective"], + "model": row["model"], + "status": row["status"], + "created_at": row["created_at"], + "started_at": row["started_at"], + "ended_at": row["ended_at"], + "result": row["result"], + "error": row["error"], + "steps": row["steps"], + "cancelled": bool(row["cancelled"]), + "safety_checked": bool(row["safety_checked"]), + "safety_passed": bool(row["safety_passed"]), + "safety_reason": row["safety_reason"], + "safety_override": bool(row["safety_override"]), + "disabled_tools": disabled_tools, + "artifacts_dir": row["artifacts_dir"], + "usage": { + "input_tokens": row["input_tokens"] or 0, + "cached_input_tokens": row["cached_input_tokens"] or 0, + "output_tokens": row["output_tokens"] or 0, + "reasoning_tokens": row["reasoning_tokens"] or 0, + "total_tokens": row["total_tokens"] or 0, + "estimated_cost_usd": row["estimated_cost_usd"], + }, + } + diff --git a/src/task_manager.py b/src/task_manager.py new file mode 100644 index 0000000..0db25ef --- /dev/null +++ b/src/task_manager.py @@ -0,0 +1,326 @@ +from __future__ import annotations + +import threading +import time +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Callable + +from .config import AppConfig +from .models import RuntimeOptions +from .runtime import create_openai_client, run_job +from .safety import assess_task_safety +from .storage import HistoryDB +from .utils import utc_now_iso + + +@dataclass +class _RunningJob: + thread: threading.Thread + cancel_event: threading.Event + started_at: str + objective: str + model: str + + +class JobManager: + def __init__( + self, + *, + config: AppConfig, + db: HistoryDB, + broadcast: Callable[[dict[str, Any]], None] | None = None, + ) -> None: + self.config = config + self.db = db + self.broadcast = broadcast + self._running: dict[str, _RunningJob] = {} + self._lock = threading.Lock() + + def submit_job( + self, + *, + objective: str, + model: str | None = None, + max_steps: int = 60, + command_timeout: int = 45, + type_interval: float = 0.02, + click_pause: float = 0.10, + disabled_tools: list[str] | None = None, + safety_override: bool = False, + no_failsafe: bool = False, + ) -> str: + job_id = f"job_{int(time.time())}_{uuid.uuid4().hex[:8]}" + created_at = utc_now_iso() + selected_model = (model or self.config.default_model).strip() or self.config.default_model + disabled = sorted({tool.strip() for tool in (disabled_tools or []) if tool.strip()}) + self.db.create_job( + job_id=job_id, + objective=objective, + model=selected_model, + created_at=created_at, + safety_override=safety_override, + disabled_tools=disabled, + ) + self._publish( + job_id, + { + "ts": created_at, + "step": 0, + "event_type": "job_queued", + "payload": { + "job_id": job_id, + "objective": objective, + "model": selected_model, + "disabled_tools": disabled, + "safety_override": bool(safety_override), + }, + }, + ) + + cancel_event = threading.Event() + thread = threading.Thread( + target=self._execute_job, + kwargs={ + "job_id": job_id, + "objective": objective, + "model": selected_model, + "disabled_tools": disabled, + "safety_override": safety_override, + "max_steps": max_steps, + "command_timeout": command_timeout, + "type_interval": type_interval, + "click_pause": click_pause, + "no_failsafe": no_failsafe, + "cancel_event": cancel_event, + }, + daemon=True, + ) + with self._lock: + self._running[job_id] = _RunningJob( + thread=thread, + cancel_event=cancel_event, + started_at=created_at, + objective=objective, + model=selected_model, + ) + thread.start() + return job_id + + def _execute_job( + self, + *, + job_id: str, + objective: str, + model: str, + disabled_tools: list[str], + safety_override: bool, + max_steps: int, + command_timeout: int, + type_interval: float, + click_pause: float, + no_failsafe: bool, + cancel_event: threading.Event, + ) -> None: + started_at = utc_now_iso() + self.db.update_job(job_id, status="running", started_at=started_at) + self._publish(job_id, {"ts": started_at, "step": 0, "event_type": "job_started", "payload": {"job_id": job_id}}) + + if not safety_override: + client = create_openai_client(self.config.openai_api_key) + safe, reason, raw = assess_task_safety( + client, + model=self.config.safety_model, + objective=objective, + disabled_tools=disabled_tools, + ) + self.db.update_job( + job_id, + safety_checked=1, + safety_passed=1 if safe else 0, + safety_reason=reason, + ) + self._publish( + job_id, + { + "ts": utc_now_iso(), + "step": 0, + "event_type": "safety_check", + "payload": {"safe": safe, "reason": reason, "raw": raw}, + }, + ) + if not safe: + ended_at = utc_now_iso() + error_text = f"Task blocked by safety gate: {reason}" + self.db.update_job( + job_id, + status="failed", + ended_at=ended_at, + error=error_text, + result=error_text, + ) + self._publish( + job_id, + { + "ts": ended_at, + "step": 0, + "event_type": "job_rejected", + "payload": {"error": error_text}, + }, + ) + with self._lock: + self._running.pop(job_id, None) + return + else: + self.db.update_job( + job_id, + safety_checked=1, + safety_passed=1, + safety_reason="Safety check bypassed by override.", + ) + self._publish( + job_id, + { + "ts": utc_now_iso(), + "step": 0, + "event_type": "safety_override", + "payload": {"enabled": True}, + }, + ) + + def on_event(event: dict[str, Any]) -> None: + self._publish(job_id, event) + if event.get("event_type") == "usage_update": + usage = (event.get("payload") or {}).get("usage") or {} + self.db.update_job( + job_id, + input_tokens=int(usage.get("input_tokens", 0) or 0), + cached_input_tokens=int(usage.get("cached_input_tokens", 0) or 0), + output_tokens=int(usage.get("output_tokens", 0) or 0), + reasoning_tokens=int(usage.get("reasoning_tokens", 0) or 0), + total_tokens=int(usage.get("total_tokens", 0) or 0), + estimated_cost_usd=usage.get("estimated_cost_usd"), + ) + + options = RuntimeOptions( + model=model, + max_steps=max_steps, + command_timeout=command_timeout, + type_interval=type_interval, + click_pause=click_pause, + disable_tools=set(disabled_tools), + ) + try: + result, artifacts = run_job( + api_key=self.config.openai_api_key, + objective=objective, + options=options, + runs_base=self.config.runs_dir, + no_failsafe=no_failsafe, + cancel_event=cancel_event, + event_callback=on_event, + ) + except Exception as exc: # noqa: BLE001 + ended_at = utc_now_iso() + err = f"Fatal runtime error: {type(exc).__name__}: {exc}" + self.db.update_job( + job_id, + status="failed", + ended_at=ended_at, + error=err, + result=err, + ) + self._publish(job_id, {"ts": ended_at, "step": 0, "event_type": "job_failed", "payload": {"error": err}}) + with self._lock: + self._running.pop(job_id, None) + return + + ended_at = utc_now_iso() + status = "completed" if result.completed else "failed" + if result.cancelled: + status = "cancelled" + self.db.update_job( + job_id, + status=status, + ended_at=ended_at, + result=result.result, + error=result.error, + steps=result.steps, + cancelled=1 if result.cancelled else 0, + artifacts_dir=str(Path(artifacts.root_dir).resolve()), + input_tokens=result.usage.input_tokens, + cached_input_tokens=result.usage.cached_input_tokens, + output_tokens=result.usage.output_tokens, + reasoning_tokens=result.usage.reasoning_tokens, + total_tokens=result.usage.total_tokens, + estimated_cost_usd=result.usage.estimated_cost_usd, + ) + self._publish( + job_id, + { + "ts": ended_at, + "step": result.steps, + "event_type": "job_finished", + "payload": { + "status": status, + "result": result.result, + "error": result.error, + "cancelled": result.cancelled, + "usage": result.usage.to_dict(), + }, + }, + ) + with self._lock: + self._running.pop(job_id, None) + + def _publish(self, job_id: str, event: dict[str, Any]) -> None: + ts = str(event.get("ts") or utc_now_iso()) + step = int(event.get("step", 0) or 0) + event_type = str(event.get("event_type", "event")) + payload = event.get("payload") or {} + self.db.add_event(job_id=job_id, ts=ts, step=step, event_type=event_type, payload=payload) + if self.broadcast is not None: + self.broadcast( + { + "job_id": job_id, + "ts": ts, + "step": step, + "event_type": event_type, + "payload": payload, + } + ) + + def cancel_job(self, job_id: str) -> bool: + with self._lock: + running = self._running.get(job_id) + if running is None: + job = self.db.get_job(job_id) + return bool(job and job.get("status") == "cancelled") + running.cancel_event.set() + self.db.update_job(job_id, status="cancelling") + self._publish(job_id, {"ts": utc_now_iso(), "step": 0, "event_type": "cancel_requested", "payload": {}}) + return True + + def get_job(self, job_id: str) -> dict[str, Any] | None: + job = self.db.get_job(job_id) + if job is None: + return None + live = self._running.get(job_id) + if live and job["status"] in {"queued", "running", "cancelling"}: + job["is_running_thread"] = live.thread.is_alive() + else: + job["is_running_thread"] = False + return job + + def list_jobs(self, limit: int = 100) -> list[dict[str, Any]]: + return self.db.list_jobs(limit=limit) + + def get_events(self, job_id: str, limit: int = 500) -> list[dict[str, Any]]: + return self.db.get_job_events(job_id, limit=limit) + + def stats(self) -> dict[str, Any]: + stats = self.db.stats() + with self._lock: + stats["live_running_threads"] = sum(1 for job in self._running.values() if job.thread.is_alive()) + return stats diff --git a/src/ui.py b/src/ui.py new file mode 100644 index 0000000..96a11c6 --- /dev/null +++ b/src/ui.py @@ -0,0 +1,193 @@ +from __future__ import annotations + + +def monitoring_page_html() -> str: + return """ + + + + + ScreenJob Monitor + + + +
+
+
+

ScreenJob Monitor

+

Read-only monitoring for active and historical tasks.

+
+
+ + +
+
+ +
+ +
+
+
+

Jobs

+ +
+
+
+ +
+

Job Detail

+

+        

Live Events

+
+
+
+
+ + + + +""" +