from __future__ import annotations import threading from pathlib import Path from typing import Any import src.task_manager as task_manager_module from src.config import AppConfig from src.models import AgentResult, RunArtifacts, UsageSummary from src.storage import HistoryDB from src.task_manager import JobManager class _OverlayRecorder: def __init__(self) -> None: self.calls: list[dict[str, Any]] = [] def show_completion(self, **kwargs: Any) -> None: self.calls.append(kwargs) def _build_manager(tmp_path: Path, overlay_manager: _OverlayRecorder) -> tuple[JobManager, HistoryDB, AppConfig]: config = AppConfig( openai_api_key="test-key", screenjob_token="test-token", disable_ui=False, default_model="gpt-5.4-mini", safety_model="gpt-5.4-mini", host="127.0.0.1", port=8787, runs_dir=tmp_path / "runs", db_path=tmp_path / "screenjob.db", ) db = HistoryDB(config.db_path) manager = JobManager(config=config, db=db, overlay_manager=overlay_manager) return manager, db, config def _artifacts(tmp_path: Path) -> RunArtifacts: root = tmp_path / "run_artifacts" return RunArtifacts( run_id="test_run", root_dir=root, logs_dir=root / "logs", shots_dir=root / "shots", enhance_dir=root / "enhanced", log_file=root / "logs" / "screenjob.log", ) def _create_job(db: HistoryDB, job_id: str, objective: str) -> None: db.create_job( job_id=job_id, objective=objective, model="gpt-5.4-mini", created_at="2026-05-30T12:00:00+00:00", safety_override=True, disabled_tools=[], ) def test_completed_job_triggers_desktop_overlay(tmp_path: Path, monkeypatch) -> None: overlay = _OverlayRecorder() manager, db, _config = _build_manager(tmp_path, overlay) job_id = "job_overlay_complete" objective = "Save todo-demo.txt in Documents" _create_job(db, job_id, objective) result = AgentResult( completed=True, result="Saved todo-demo.txt", return_message="Saved todo-demo.txt", data={"observed_result": "todo-demo.txt - Notepad is visible"}, steps=11, started_at=100.0, ended_at=112.6, usage=UsageSummary(), ) monkeypatch.setattr(task_manager_module, "run_job", lambda **_kwargs: (result, _artifacts(tmp_path))) manager._execute_job( job_id=job_id, objective=objective, model="gpt-5.4-mini", disabled_tools=[], safety_override=True, max_steps=60, command_timeout=45, type_interval=0.02, click_pause=0.10, reasoning_effort="medium", screen_context_decay_steps=4, max_visual_context_images=3, native_automation_mode="prefer", dialog_timeout_seconds=12.0, focus_timeout_seconds=8.0, ui_element_timeout_seconds=8.0, max_retries_per_surface=3, pretty_logs=False, no_failsafe=False, cancel_event=threading.Event(), ) assert overlay.calls == [ { "job_id": job_id, "objective": objective, "return_message": "Saved todo-demo.txt", "steps": 11, "elapsed_seconds": 12.599999999999994, } ] assert db.get_job(job_id)["status"] == "completed" def test_non_completed_jobs_do_not_trigger_desktop_overlay(tmp_path: Path, monkeypatch) -> None: overlay = _OverlayRecorder() manager, db, _config = _build_manager(tmp_path, overlay) failed_job_id = "job_overlay_failed" _create_job(db, failed_job_id, "Fail intentionally") failed_result = AgentResult( completed=False, result="Failure", return_message="Failure", data=None, steps=7, started_at=10.0, ended_at=18.0, usage=UsageSummary(), error="Failure", ) monkeypatch.setattr(task_manager_module, "run_job", lambda **_kwargs: (failed_result, _artifacts(tmp_path))) manager._execute_job( job_id=failed_job_id, objective="Fail intentionally", model="gpt-5.4-mini", disabled_tools=[], safety_override=True, max_steps=60, command_timeout=45, type_interval=0.02, click_pause=0.10, reasoning_effort="medium", screen_context_decay_steps=4, max_visual_context_images=3, native_automation_mode="prefer", dialog_timeout_seconds=12.0, focus_timeout_seconds=8.0, ui_element_timeout_seconds=8.0, max_retries_per_surface=3, pretty_logs=False, no_failsafe=False, cancel_event=threading.Event(), ) cancelled_job_id = "job_overlay_cancelled" _create_job(db, cancelled_job_id, "Cancel intentionally") cancelled_result = AgentResult( completed=False, result="Cancelled", return_message="Cancelled", data=None, steps=4, started_at=20.0, ended_at=23.0, usage=UsageSummary(), error="Cancelled", cancelled=True, ) monkeypatch.setattr(task_manager_module, "run_job", lambda **_kwargs: (cancelled_result, _artifacts(tmp_path))) manager._execute_job( job_id=cancelled_job_id, objective="Cancel intentionally", model="gpt-5.4-mini", disabled_tools=[], safety_override=True, max_steps=60, command_timeout=45, type_interval=0.02, click_pause=0.10, reasoning_effort="medium", screen_context_decay_steps=4, max_visual_context_images=3, native_automation_mode="prefer", dialog_timeout_seconds=12.0, focus_timeout_seconds=8.0, ui_element_timeout_seconds=8.0, max_retries_per_surface=3, pretty_logs=False, no_failsafe=False, cancel_event=threading.Event(), ) assert overlay.calls == [] def test_rejected_job_does_not_trigger_desktop_overlay(tmp_path: Path, monkeypatch) -> None: overlay = _OverlayRecorder() manager, db, _config = _build_manager(tmp_path, overlay) job_id = "job_overlay_rejected" _create_job(db, job_id, "Do something unsafe") monkeypatch.setattr(task_manager_module, "create_openai_client", lambda *_args, **_kwargs: object()) monkeypatch.setattr( task_manager_module, "assess_task_safety", lambda *_args, **_kwargs: (False, "Unsafe request", {"decision": "blocked"}), ) manager._execute_job( job_id=job_id, objective="Do something unsafe", model="gpt-5.4-mini", disabled_tools=[], safety_override=False, max_steps=60, command_timeout=45, type_interval=0.02, click_pause=0.10, reasoning_effort="medium", screen_context_decay_steps=4, max_visual_context_images=3, native_automation_mode="prefer", dialog_timeout_seconds=12.0, focus_timeout_seconds=8.0, ui_element_timeout_seconds=8.0, max_retries_per_surface=3, pretty_logs=False, no_failsafe=False, cancel_event=threading.Event(), ) assert overlay.calls == [] events = db.get_job_events(job_id) assert events[-1]["event_type"] == "job_rejected"