Commit remaining workspace updates
Some checks failed
CI / test (push) Failing after 8s

This commit is contained in:
Space-Banane
2026-05-31 20:43:25 +02:00
parent 79c9e98842
commit 4123765aba
11 changed files with 4498 additions and 131 deletions

View File

@@ -6,8 +6,10 @@ ScreenJob lets an agent execute tasks that require a real desktop UI plus termin
## Main Features ## Main Features
- Hybrid control model: screenshot grounding plus Windows-native window/dialog/element helpers when available
- Screen perception (`see_screen`, `enhance`) - Screen perception (`see_screen`, `enhance`)
- Mouse/keyboard control (`click`, `type`, `press_key`) - Mouse/keyboard control (`click`, `type`, `press_key`)
- Native window/dialog control (`list_windows`, `find_window`, `focus_window`, `detect_dialog`, `dialog_action`, `dialog_set_filename`, `list_ui_elements`)
- Terminal execution (`execute_command`, `sleep`) - Terminal execution (`execute_command`, `sleep`)
- Structured completion payload (`task_complete(return=..., data=...)`) - Structured completion payload (`task_complete(return=..., data=...)`)
- Safety gate, auth, history, and live monitoring - Safety gate, auth, history, and live monitoring
@@ -45,6 +47,12 @@ Enhance-first click rule:
- Optional zoom control: set `scale` from `2` to `6` (defaults are tuned by region). - Optional zoom control: set `scale` from `2` to `6` (defaults are tuned by region).
- After checking the enhanced image, click using the same target coordinate (or a small directional offset if needed). - After checking the enhanced image, click using the same target coordinate (or a small directional offset if needed).
Windows-native routing rule:
- First classify whether the current surface is a normal app window, browser window, `#32770` dialog, Explorer file picker, or another system surface.
- Prefer native window/dialog/element tools for focus changes, save/open dialogs, modal confirmations, and exposed controls.
- Fall back to screenshots plus mouse/keyboard only when native automation is unavailable or the UI is custom-drawn.
Verification rule: Verification rule:
- Before `task_complete`, verify actual on-screen content matches the expected outcome. - Before `task_complete`, verify actual on-screen content matches the expected outcome.

File diff suppressed because it is too large Load Diff

View File

@@ -30,6 +30,7 @@ def main(argv: list[str] | None = None) -> int:
print(" OPENAI_API_KEY=...") print(" OPENAI_API_KEY=...")
print(" SCREENJOB_TOKEN=...") print(" SCREENJOB_TOKEN=...")
print(" DISABLE_UI=true|false (optional)") print(" DISABLE_UI=true|false (optional)")
print(" SCREENJOB_PROHIBITED_KEY_COMBOS=ctrl+shift+s,alt+f4 (optional)")
return 0 return 0
server.main() server.main()
return 0 return 0

View File

@@ -5,6 +5,7 @@ import json
import sys import sys
from pathlib import Path from pathlib import Path
from .agent import normalize_disabled_tools
from .config import load_app_config from .config import load_app_config
from .models import RuntimeOptions from .models import RuntimeOptions
from .runtime import create_openai_client, run_job from .runtime import create_openai_client, run_job
@@ -40,8 +41,55 @@ def build_parser() -> argparse.ArgumentParser:
default=4, default=4,
help="Compact model context every N steps to decay old screenshots (0 disables).", help="Compact model context every N steps to decay old screenshots (0 disables).",
) )
parser.add_argument(
"--max-visual-context-images",
type=int,
default=3,
help="Maximum screenshots/enhanced images retained in model-visible context during rebases.",
)
parser.add_argument(
"--native-automation-mode",
choices=["off", "prefer", "require_fallback"],
default="prefer",
help="How strongly the agent should prefer Windows-native automation helpers over pixel fallback.",
)
parser.add_argument(
"--dialog-timeout-seconds",
type=float,
default=12.0,
help="Timeout for dialog-oriented waits and retries.",
)
parser.add_argument(
"--focus-timeout-seconds",
type=float,
default=8.0,
help="Timeout for focus-change waits and verification.",
)
parser.add_argument(
"--ui-element-timeout-seconds",
type=float,
default=8.0,
help="Timeout for native UI element lookup waits.",
)
parser.add_argument(
"--max-retries-per-surface",
type=int,
default=3,
help="Maximum repeated retries on the same classified window/dialog surface before the agent must pivot.",
)
parser.add_argument(
"--pretty-logs",
action="store_true",
help="Emit expanded multi-line tool call/result logs for easier debugging.",
)
parser.add_argument("--disable-tool", action="append", default=[], help="Disable a tool by name.") parser.add_argument("--disable-tool", action="append", default=[], help="Disable a tool by name.")
parser.add_argument("--skip-safety-check", action="store_true", help="Bypass pre-flight safety check.") parser.add_argument(
"--skip-safety-check",
"--skip-safety-chec",
dest="skip_safety_check",
action="store_true",
help="Bypass pre-flight safety check.",
)
parser.add_argument("--no-failsafe", action="store_true", help="Disable PyAutoGUI fail-safe.") parser.add_argument("--no-failsafe", action="store_true", help="Disable PyAutoGUI fail-safe.")
return parser return parser
@@ -57,7 +105,10 @@ def main(argv: list[str] | None = None) -> int:
return 2 return 2
model = args.model or config.default_model model = args.model or config.default_model
disabled_tools = sorted({str(x).strip() for x in args.disable_tool if str(x).strip()}) try:
disabled_tools = normalize_disabled_tools(args.disable_tool)
except ValueError as exc:
parser.error(str(exc))
if not args.skip_safety_check: if not args.skip_safety_check:
safety_client = create_openai_client(config.openai_api_key) safety_client = create_openai_client(config.openai_api_key)
@@ -92,7 +143,15 @@ def main(argv: list[str] | None = None) -> int:
click_pause=args.click_pause, click_pause=args.click_pause,
reasoning_effort=args.reasoning_effort, reasoning_effort=args.reasoning_effort,
screen_context_decay_steps=max(0, int(args.screen_context_decay_steps)), screen_context_decay_steps=max(0, int(args.screen_context_decay_steps)),
max_visual_context_images=max(0, int(args.max_visual_context_images)),
native_automation_mode=args.native_automation_mode,
dialog_timeout_seconds=max(0.5, float(args.dialog_timeout_seconds)),
focus_timeout_seconds=max(0.5, float(args.focus_timeout_seconds)),
ui_element_timeout_seconds=max(0.5, float(args.ui_element_timeout_seconds)),
max_retries_per_surface=max(1, int(args.max_retries_per_surface)),
pretty_logs=bool(args.pretty_logs),
disable_tools=set(disabled_tools), disable_tools=set(disabled_tools),
prohibited_key_combos=set(config.prohibited_key_combos),
) )
try: try:
result, artifacts = run_job( result, artifacts = run_job(

View File

@@ -14,6 +14,13 @@ def _env_bool(name: str, default: bool = False) -> bool:
return raw.strip().lower() in {"1", "true", "yes", "on"} return raw.strip().lower() in {"1", "true", "yes", "on"}
def _env_csv(name: str) -> list[str]:
raw = os.getenv(name)
if raw is None:
return []
return [item.strip() for item in raw.split(",") if item.strip()]
@dataclass(frozen=True) @dataclass(frozen=True)
class AppConfig: class AppConfig:
openai_api_key: str openai_api_key: str
@@ -25,6 +32,7 @@ class AppConfig:
port: int port: int
runs_dir: Path runs_dir: Path
db_path: Path db_path: Path
prohibited_key_combos: tuple[str, ...] = ()
def load_app_config(cwd: Path) -> AppConfig: def load_app_config(cwd: Path) -> AppConfig:
@@ -38,6 +46,7 @@ def load_app_config(cwd: Path) -> AppConfig:
runs_dir = cwd / "screenjob_runs" runs_dir = cwd / "screenjob_runs"
db_path = cwd / "screenjob.db" db_path = cwd / "screenjob.db"
disable_ui = _env_bool("DISABLE_UI", default=False) disable_ui = _env_bool("DISABLE_UI", default=False)
prohibited_key_combos = tuple(_env_csv("SCREENJOB_PROHIBITED_KEY_COMBOS"))
return AppConfig( return AppConfig(
openai_api_key=openai_api_key, openai_api_key=openai_api_key,
screenjob_token=screenjob_token, screenjob_token=screenjob_token,
@@ -48,5 +57,5 @@ def load_app_config(cwd: Path) -> AppConfig:
port=port, port=port,
runs_dir=runs_dir, runs_dir=runs_dir,
db_path=db_path, db_path=db_path,
prohibited_key_combos=prohibited_key_combos,
) )

View File

@@ -60,4 +60,12 @@ class RuntimeOptions:
click_pause: float = 0.10 click_pause: float = 0.10
reasoning_effort: str = "medium" reasoning_effort: str = "medium"
screen_context_decay_steps: int = 4 screen_context_decay_steps: int = 4
max_visual_context_images: int = 3
native_automation_mode: str = "prefer"
dialog_timeout_seconds: float = 12.0
focus_timeout_seconds: float = 8.0
ui_element_timeout_seconds: float = 8.0
max_retries_per_surface: int = 3
pretty_logs: bool = False
disable_tools: set[str] | None = None disable_tools: set[str] | None = None
prohibited_key_combos: set[str] | None = None

View File

@@ -12,6 +12,7 @@ from fastapi.responses import FileResponse
from fastapi.responses import HTMLResponse, JSONResponse from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from .agent import normalize_disabled_tools
from .config import AppConfig, load_app_config from .config import AppConfig, load_app_config
from .storage import HistoryDB from .storage import HistoryDB
from .task_manager import JobManager from .task_manager import JobManager
@@ -28,6 +29,13 @@ class CreateJobRequest(BaseModel):
click_pause: float = Field(0.10, ge=0.0, le=2.0) click_pause: float = Field(0.10, ge=0.0, le=2.0)
reasoning_effort: str = Field("medium", pattern="^(low|medium|high)$") reasoning_effort: str = Field("medium", pattern="^(low|medium|high)$")
screen_context_decay_steps: int = Field(4, ge=0, le=50) screen_context_decay_steps: int = Field(4, ge=0, le=50)
max_visual_context_images: int = Field(3, ge=0, le=12)
native_automation_mode: str = Field("prefer", pattern="^(off|prefer|require_fallback)$")
dialog_timeout_seconds: float = Field(12.0, ge=0.5, le=120.0)
focus_timeout_seconds: float = Field(8.0, ge=0.5, le=120.0)
ui_element_timeout_seconds: float = Field(8.0, ge=0.5, le=120.0)
max_retries_per_surface: int = Field(3, ge=1, le=10)
pretty_logs: bool = False
disabled_tools: list[str] = Field(default_factory=list) disabled_tools: list[str] = Field(default_factory=list)
safety_override: bool = False safety_override: bool = False
no_failsafe: bool = False no_failsafe: bool = False
@@ -297,6 +305,8 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
@app.post("/api/jobs") @app.post("/api/jobs")
def create_job(payload: CreateJobRequest, _: None = Depends(require_token)) -> dict[str, str]: def create_job(payload: CreateJobRequest, _: None = Depends(require_token)) -> dict[str, str]:
try:
disabled_tools = normalize_disabled_tools(payload.disabled_tools)
job_id = manager.submit_job( job_id = manager.submit_job(
objective=payload.job, objective=payload.job,
model=payload.model, model=payload.model,
@@ -306,10 +316,19 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
click_pause=payload.click_pause, click_pause=payload.click_pause,
reasoning_effort=payload.reasoning_effort, reasoning_effort=payload.reasoning_effort,
screen_context_decay_steps=payload.screen_context_decay_steps, screen_context_decay_steps=payload.screen_context_decay_steps,
disabled_tools=payload.disabled_tools, max_visual_context_images=payload.max_visual_context_images,
native_automation_mode=payload.native_automation_mode,
dialog_timeout_seconds=payload.dialog_timeout_seconds,
focus_timeout_seconds=payload.focus_timeout_seconds,
ui_element_timeout_seconds=payload.ui_element_timeout_seconds,
max_retries_per_surface=payload.max_retries_per_surface,
pretty_logs=payload.pretty_logs,
disabled_tools=disabled_tools,
safety_override=payload.safety_override, safety_override=payload.safety_override,
no_failsafe=payload.no_failsafe, no_failsafe=payload.no_failsafe,
) )
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return {"job_id": job_id} return {"job_id": job_id}
@app.get("/api/jobs") @app.get("/api/jobs")

View File

@@ -8,7 +8,9 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from .agent import normalize_disabled_tools
from .config import AppConfig from .config import AppConfig
from .desktop_overlay import DesktopOverlayManager, get_desktop_overlay_manager
from .models import RuntimeOptions from .models import RuntimeOptions
from .runtime import create_openai_client, run_job from .runtime import create_openai_client, run_job
from .safety import assess_task_safety from .safety import assess_task_safety
@@ -32,10 +34,12 @@ class JobManager:
config: AppConfig, config: AppConfig,
db: HistoryDB, db: HistoryDB,
broadcast: Callable[[dict[str, Any]], None] | None = None, broadcast: Callable[[dict[str, Any]], None] | None = None,
overlay_manager: DesktopOverlayManager | None = None,
) -> None: ) -> None:
self.config = config self.config = config
self.db = db self.db = db
self.broadcast = broadcast self.broadcast = broadcast
self.overlay_manager = overlay_manager or get_desktop_overlay_manager()
self._running: dict[str, _RunningJob] = {} self._running: dict[str, _RunningJob] = {}
self._lock = threading.Lock() self._lock = threading.Lock()
@@ -50,6 +54,13 @@ class JobManager:
click_pause: float = 0.10, click_pause: float = 0.10,
reasoning_effort: str = "medium", reasoning_effort: str = "medium",
screen_context_decay_steps: int = 4, screen_context_decay_steps: int = 4,
max_visual_context_images: int = 3,
native_automation_mode: str = "prefer",
dialog_timeout_seconds: float = 12.0,
focus_timeout_seconds: float = 8.0,
ui_element_timeout_seconds: float = 8.0,
max_retries_per_surface: int = 3,
pretty_logs: bool = False,
disabled_tools: list[str] | None = None, disabled_tools: list[str] | None = None,
safety_override: bool = False, safety_override: bool = False,
no_failsafe: bool = False, no_failsafe: bool = False,
@@ -57,7 +68,7 @@ class JobManager:
job_id = f"job_{int(time.time())}_{uuid.uuid4().hex[:8]}" job_id = f"job_{int(time.time())}_{uuid.uuid4().hex[:8]}"
created_at = utc_now_iso() created_at = utc_now_iso()
selected_model = (model or self.config.default_model).strip() or self.config.default_model selected_model = (model or self.config.default_model).strip() or self.config.default_model
disabled = sorted({tool.strip() for tool in (disabled_tools or []) if tool.strip()}) disabled = normalize_disabled_tools(disabled_tools)
self.db.create_job( self.db.create_job(
job_id=job_id, job_id=job_id,
objective=objective, objective=objective,
@@ -97,6 +108,13 @@ class JobManager:
"click_pause": click_pause, "click_pause": click_pause,
"reasoning_effort": reasoning_effort, "reasoning_effort": reasoning_effort,
"screen_context_decay_steps": screen_context_decay_steps, "screen_context_decay_steps": screen_context_decay_steps,
"max_visual_context_images": max_visual_context_images,
"native_automation_mode": native_automation_mode,
"dialog_timeout_seconds": dialog_timeout_seconds,
"focus_timeout_seconds": focus_timeout_seconds,
"ui_element_timeout_seconds": ui_element_timeout_seconds,
"max_retries_per_surface": max_retries_per_surface,
"pretty_logs": pretty_logs,
"no_failsafe": no_failsafe, "no_failsafe": no_failsafe,
"cancel_event": cancel_event, "cancel_event": cancel_event,
}, },
@@ -127,6 +145,13 @@ class JobManager:
click_pause: float, click_pause: float,
reasoning_effort: str, reasoning_effort: str,
screen_context_decay_steps: int, screen_context_decay_steps: int,
max_visual_context_images: int,
native_automation_mode: str,
dialog_timeout_seconds: float,
focus_timeout_seconds: float,
ui_element_timeout_seconds: float,
max_retries_per_surface: int,
pretty_logs: bool,
no_failsafe: bool, no_failsafe: bool,
cancel_event: threading.Event, cancel_event: threading.Event,
) -> None: ) -> None:
@@ -226,7 +251,15 @@ class JobManager:
click_pause=click_pause, click_pause=click_pause,
reasoning_effort=reasoning_effort, reasoning_effort=reasoning_effort,
screen_context_decay_steps=max(0, int(screen_context_decay_steps)), screen_context_decay_steps=max(0, int(screen_context_decay_steps)),
max_visual_context_images=max(0, int(max_visual_context_images)),
native_automation_mode=str(native_automation_mode or "prefer").strip().lower() or "prefer",
dialog_timeout_seconds=max(0.5, float(dialog_timeout_seconds)),
focus_timeout_seconds=max(0.5, float(focus_timeout_seconds)),
ui_element_timeout_seconds=max(0.5, float(ui_element_timeout_seconds)),
max_retries_per_surface=max(1, int(max_retries_per_surface)),
pretty_logs=bool(pretty_logs),
disable_tools=set(disabled_tools), disable_tools=set(disabled_tools),
prohibited_key_combos=set(self.config.prohibited_key_combos),
) )
try: try:
result, artifacts = run_job( result, artifacts = run_job(
@@ -297,6 +330,14 @@ class JobManager:
}, },
}, },
) )
if status == "completed":
self.overlay_manager.show_completion(
job_id=job_id,
objective=objective,
return_message=result.return_message,
steps=result.steps,
elapsed_seconds=max(0.0, float(result.ended_at - result.started_at)),
)
with self._lock: with self._lock:
self._running.pop(job_id, None) self._running.pop(job_id, None)

View File

@@ -1,8 +1,11 @@
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Any
import pytest
from PIL import Image from PIL import Image
import src.agent as agent_module import src.agent as agent_module
@@ -15,8 +18,12 @@ class _DummyPyAutoGUI:
def __init__(self) -> None: def __init__(self) -> None:
self.last_move_to: tuple[int, int] | None = None self.last_move_to: tuple[int, int] | None = None
self.last_click: tuple[int, int] | None = None self.last_move_duration: float | None = None
self.last_click: dict[str, object] | None = None
self.last_hotkey: tuple[str, ...] | None = None self.last_hotkey: tuple[str, ...] | None = None
self.last_drag_to: dict[str, object] | None = None
self.last_scroll: int | None = None
self.current_position: tuple[int, int] = (640, 360)
def screenshot(self) -> Image.Image: def screenshot(self) -> Image.Image:
return Image.new("RGB", (1280, 720), color=(24, 24, 24)) return Image.new("RGB", (1280, 720), color=(24, 24, 24))
@@ -26,9 +33,26 @@ class _DummyPyAutoGUI:
def moveTo(self, x: int, y: int, duration: float = 0.0) -> None: # noqa: N802 def moveTo(self, x: int, y: int, duration: float = 0.0) -> None: # noqa: N802
self.last_move_to = (x, y) self.last_move_to = (x, y)
self.last_move_duration = duration
self.current_position = (x, y)
def click(self, x: int, y: int) -> None: def click(
self.last_click = (x, y) self,
x: int,
y: int,
clicks: int = 1,
interval: float = 0.0,
button: str = "left",
) -> None:
self.last_click = {"x": x, "y": y, "clicks": clicks, "interval": interval, "button": button}
self.current_position = (x, y)
def dragTo(self, x: int, y: int, duration: float = 0.0, button: str = "left") -> None: # noqa: N802
self.last_drag_to = {"x": x, "y": y, "duration": duration, "button": button}
self.current_position = (x, y)
def scroll(self, amount: int) -> None:
self.last_scroll = amount
def write(self, _: str, interval: float = 0.0) -> None: def write(self, _: str, interval: float = 0.0) -> None:
return None return None
@@ -39,6 +63,10 @@ class _DummyPyAutoGUI:
def hotkey(self, *keys: str) -> None: def hotkey(self, *keys: str) -> None:
self.last_hotkey = tuple(keys) self.last_hotkey = tuple(keys)
def position(self):
x, y = self.current_position
return type("Point", (), {"x": x, "y": y})()
def _build_agent(tmp_path: Path, monkeypatch) -> agent_module.ScreenJobAgent: def _build_agent(tmp_path: Path, monkeypatch) -> agent_module.ScreenJobAgent:
dummy_gui = _DummyPyAutoGUI() dummy_gui = _DummyPyAutoGUI()
@@ -84,11 +112,158 @@ def test_click_supports_directional_offsets(tmp_path: Path, monkeypatch) -> None
"offset_up": "2px", "offset_up": "2px",
"offset_right": 7, "offset_right": 7,
"offset": {"x": 3, "y": 4}, "offset": {"x": 3, "y": 4},
"button": "right",
"click_count": 2,
"interval_seconds": "0.5s",
"duration_seconds": "0.2s",
"sleep_after_seconds": 0, "sleep_after_seconds": 0,
} }
) )
assert click_result["ok"] is True assert click_result["ok"] is True
assert click_result["clicked"] == {"x": 110, "y": 102} assert click_result["clicked"] == {"x": 110, "y": 102}
assert click_result["button"] == "right"
assert click_result["click_count"] == 2
assert click_result["interval_seconds"] == 0.5
assert click_result["duration_seconds"] == 0.2
assert agent_module.pyautogui.last_click == {
"x": 110,
"y": 102,
"clicks": 2,
"interval": 0.5,
"button": "right",
}
def test_scroll_supports_direction_and_amount(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_scroll(
{
"amount": 8,
"direction": "down",
"coordinate": {"x": 1400, "y": -5},
"sleep_after_seconds": 0,
}
)
assert result["ok"] is True
assert result["amount"] == -8
assert result["direction"] == "down"
assert result["moved_to"] == {"x": 1279, "y": 0}
assert agent_module.pyautogui.last_scroll == -8
def test_drag_translates_coordinates_and_button(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_drag(
{
"start_coordinate": {"x": -10, "y": 100},
"end_coordinate": {"x": 1285, "y": 800},
"button": "middle",
"duration_seconds": "0.3s",
"sleep_after_seconds": 0,
}
)
assert result["ok"] is True
assert result["from"] == {"x": 0, "y": 100}
assert result["to"] == {"x": 1279, "y": 719}
assert result["button"] == "middle"
assert result["duration_seconds"] == 0.3
assert agent_module.pyautogui.last_drag_to == {
"x": 1279,
"y": 719,
"duration": 0.3,
"button": "middle",
}
def test_move_mouse_clamps_target_coordinate(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_move_mouse({"coordinate": {"x": 1500, "y": -5}, "duration_seconds": "0.4s"})
assert result["ok"] is True
assert result["moved_to"] == {"x": 1279, "y": 0}
assert result["duration_seconds"] == 0.4
assert agent_module.pyautogui.last_move_to == (1279, 0)
def test_clipboard_get_and_set_round_trip(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
state = {"text": ""}
monkeypatch.setattr(agent, "_clipboard_set_text", lambda text: state.__setitem__("text", text))
monkeypatch.setattr(agent, "_clipboard_get_text", lambda: state["text"])
monkeypatch.setattr(
agent,
"_clipboard_get_metadata",
lambda: {"has_text": bool(state["text"]), "has_image": True, "available_formats": ["CF_UNICODETEXT", "CF_DIB"]},
)
set_result = agent._tool_clipboard_set({"text": "hello clipboard"})
get_result = agent._tool_clipboard_get({})
assert set_result["ok"] is True
assert set_result["length"] == 15
assert get_result["ok"] is True
assert get_result["text"] == "hello clipboard"
assert get_result["length"] == 15
assert get_result["has_text"] is True
assert get_result["has_image"] is True
assert get_result["available_formats"] == ["CF_UNICODETEXT", "CF_DIB"]
def test_clipboard_set_falls_back_to_powershell_when_native_path_fails(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
state = {"text": ""}
def fail_native(_: str) -> None:
raise OSError("[WinError 6] The handle is invalid.")
def shell_fallback(text: str) -> None:
state["text"] = text
monkeypatch.setattr(agent, "_clipboard_set_text", fail_native)
monkeypatch.setattr(agent, "_clipboard_set_text_via_shell", shell_fallback)
result = agent._tool_clipboard_set({"text": "Example Domain"})
assert result["ok"] is True
assert result["used_shell_fallback"] is True
assert "WinError 6" in result["native_error"]
assert state["text"] == "Example Domain"
def test_get_cursor_position_returns_current_mouse_location(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent_module.pyautogui.current_position = (321, 654)
result = agent._tool_get_cursor_position({})
assert result["ok"] is True
assert result["position"] == {"x": 321, "y": 654}
def test_get_active_window_returns_metadata_shape(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_get_active_window_info",
lambda: {
"available": True,
"hwnd": 1234,
"title": "Settings",
"class_name": "ApplicationFrameWindow",
"thread_id": 44,
"process_id": 77,
"is_visible": True,
"rect": {"left": 10, "top": 20, "right": 410, "bottom": 320, "width": 400, "height": 300},
},
)
result = agent._tool_get_active_window({})
assert result["ok"] is True
assert result["window"]["title"] == "Settings"
assert result["window"]["rect"]["width"] == 400
def test_enhance_defaults_to_small_ui_preset(tmp_path: Path, monkeypatch) -> None: def test_enhance_defaults_to_small_ui_preset(tmp_path: Path, monkeypatch) -> None:
@@ -135,6 +310,32 @@ def test_press_key_supports_hotkey_combo(tmp_path: Path, monkeypatch) -> None:
assert agent_module.pyautogui.last_hotkey == ("win", "r") assert agent_module.pyautogui.last_hotkey == ("win", "r")
def test_press_key_blocks_prohibited_combo(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.prohibited_key_combos = {"ctrl+shift+s"}
agent.prohibited_key_combos = agent._normalize_prohibited_key_combos(agent.options.prohibited_key_combos)
result = agent._tool_press_key({"key": "ctrl+shift+s"})
assert result["ok"] is False
assert result["blocked"] is True
assert result["key"] == "ctrl+shift+s"
assert "prohibited by runtime configuration" in result["error"]
assert "another allowed route" in result["hint"]
def test_press_key_blocks_prohibited_combo_after_alias_normalization(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.prohibited_key_combos = {"meta+r"}
agent.prohibited_key_combos = agent._normalize_prohibited_key_combos(agent.options.prohibited_key_combos)
result = agent._tool_press_key({"key": "win+r"})
assert result["ok"] is False
assert result["blocked"] is True
assert result["key"] == "win+r"
def test_context_compaction_trigger_and_payload(tmp_path: Path, monkeypatch) -> None: def test_context_compaction_trigger_and_payload(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch) agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Open settings app" agent.objective = "Open settings app"
@@ -147,7 +348,596 @@ def test_context_compaction_trigger_and_payload(tmp_path: Path, monkeypatch) ->
agent.last_screen_meta = {"width": 1280, "height": 720, "path": "C:/tmp/frame.png"} agent.last_screen_meta = {"width": 1280, "height": 720, "path": "C:/tmp/frame.png"}
assert agent._should_compact_context() is True assert agent._should_compact_context() is True
compacted = agent._build_compacted_pending_input() visual_message = agent._build_visual_message("Current screen", "data:image/png;base64,abc", agent.last_screen_meta)
agent._register_visual_context_message(visual_message, agent.last_screen_meta, tool_name="see_screen")
compacted = agent._build_compacted_pending_input("decay")
assert len(compacted) == 2 assert len(compacted) == 2
assert "Context compaction activated" in compacted[0]["content"][0]["text"] assert "Context compaction activated due to stale context decay." in compacted[0]["content"][0]["text"]
assert "Open settings app" in compacted[0]["content"][0]["text"] assert "Open settings app" in compacted[0]["content"][0]["text"]
assert "Treat prior reasoning as stale" in compacted[0]["content"][0]["text"]
assert "Retained visual observations:" in compacted[0]["content"][0]["text"]
assert "do not call see_screen again only because compaction happened" in compacted[0]["content"][0]["text"]
assert "observe -> decide -> act -> verify" in compacted[0]["content"][0]["text"]
def test_context_compaction_drops_function_call_outputs_from_rebased_input(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Open settings app"
visual_meta = {"path": "C:/tmp/frame.png"}
visual_message = agent._build_visual_message("Current screen", "data:image/png;base64,abc", visual_meta)
agent._register_visual_context_message(visual_message, visual_meta, tool_name="see_screen")
compacted = agent._build_compacted_pending_input(
"decay",
carryover_items=[
{"type": "function_call_output", "call_id": "call_123", "output": "{\"ok\": true}"},
{"role": "user", "content": [{"type": "input_text", "text": "blocked hint"}]},
],
)
assert len(compacted) == 3
assert compacted[1]["role"] == "user"
assert compacted[1]["content"][0]["text"] == "blocked hint"
assert all(item.get("type") != "function_call_output" for item in compacted)
def test_visual_context_budget_keeps_only_latest_three_images(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.max_visual_context_images = 3
captured_times = [
"2026-05-30T10:00:03+00:00",
"2026-05-30T10:00:01+00:00",
"2026-05-30T10:00:04+00:00",
"2026-05-30T10:00:02+00:00",
]
for idx, captured_at in enumerate(captured_times):
meta = {"path": f"C:/tmp/frame_{idx}.png", "captured_at": captured_at}
message = agent._build_visual_message(f"frame {idx}", f"data:image/png;base64,{idx}", meta)
agent._register_visual_context_message(message, meta, tool_name="see_screen")
assert agent.visual_context_overflow_pending is True
assert [entry["meta"]["path"] for entry in agent.visual_context_messages] == [
"C:/tmp/frame_3.png",
"C:/tmp/frame_0.png",
"C:/tmp/frame_2.png",
]
def test_compacted_input_uses_latest_visuals_by_capture_time(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.max_visual_context_images = 3
agent.objective = "Verify the current app window"
for idx, captured_at in enumerate(
[
"2026-05-30T10:00:04+00:00",
"2026-05-30T10:00:01+00:00",
"2026-05-30T10:00:03+00:00",
"2026-05-30T10:00:02+00:00",
]
):
meta = {"path": f"C:/tmp/frame_{idx}.png", "captured_at": captured_at}
message = agent._build_visual_message(f"frame {idx}", f"data:image/png;base64,{idx}", meta)
agent._register_visual_context_message(message, meta, tool_name="see_screen")
compacted = agent._build_compacted_pending_input("visual_budget")
visual_messages = [
item
for item in compacted
if isinstance(item.get("content"), list)
and any(part.get("type") == "input_image" for part in item["content"] if isinstance(part, dict))
]
assert len(visual_messages) == 3
assert [
json.loads(message["content"][0]["text"].split("Metadata: ", 1)[1].split("\n", 1)[0])["path"]
for message in visual_messages
] == [
"C:/tmp/frame_3.png",
"C:/tmp/frame_2.png",
"C:/tmp/frame_0.png",
]
def test_context_compaction_event_includes_visual_budget_reason_and_paths(tmp_path: Path, monkeypatch) -> None:
events: list[dict[str, object]] = []
agent = _build_agent(tmp_path, monkeypatch)
agent.event_callback = events.append
agent.step = 5
agent.recent_tool_summaries = ["step=4 tool=enhance status=ok"]
agent.visual_context_messages = [
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/1.png"}},
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/2.png"}},
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/3.png"}},
]
agent._emit_context_compacted("visual_budget")
assert events[-1]["event_type"] == "context_compacted"
payload = events[-1]["payload"]
assert payload["rebuild_reason"] == "visual_budget"
assert payload["visual_context_paths"] == ["C:/tmp/1.png", "C:/tmp/2.png", "C:/tmp/3.png"]
def test_observation_loop_blocks_repeated_broad_reobservation(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.step_history = [
{
"step": 21,
"tool_names": ["get_active_window", "see_screen"],
"window_signature": "123|#32770|Save as",
"window_summary": "Save as [#32770]",
"had_visual": True,
},
{
"step": 22,
"tool_names": ["get_active_window", "see_screen"],
"window_signature": "123|#32770|Save as",
"window_summary": "Save as [#32770]",
"had_visual": True,
},
{
"step": 23,
"tool_names": ["get_active_window", "see_screen"],
"window_signature": "123|#32770|Save as",
"window_summary": "Save as [#32770]",
"had_visual": True,
},
]
blocked = agent._dispatch_tool("see_screen", {})
assert blocked["ok"] is False
assert blocked["blocked"] is True
assert blocked["blocked_reason"] == "observation_loop"
assert "unchanged foreground window" in blocked["error"]
assert blocked["window_summary"] == "Save as [#32770]"
def test_repeated_ambiguous_action_requires_verification_and_then_blocks(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
type_args = {"text": "repeat me"}
first = agent._dispatch_tool("type", type_args)
assert first["ok"] is True
assert first["verification_required"] is True
assert first["verification_channels"] == ["enhance", "get_active_window", "see_screen"]
blocked_without_verification = agent._dispatch_tool("type", type_args)
assert blocked_without_verification["blocked"] is True
assert "see_screen" in blocked_without_verification["error"]
assert agent._dispatch_tool("see_screen", {})["ok"] is True
assert agent._dispatch_tool("type", type_args)["ok"] is True
assert agent._dispatch_tool("see_screen", {})["ok"] is True
assert agent._dispatch_tool("type", type_args)["ok"] is True
assert agent._dispatch_tool("see_screen", {})["ok"] is True
blocked_after_retry_budget = agent._dispatch_tool("type", type_args)
assert blocked_after_retry_budget["blocked"] is True
assert "3 time(s) on the same surface" in blocked_after_retry_budget["error"]
assert agent._dispatch_tool("see_screen", {})["ok"] is True
reset_attempt = agent._dispatch_tool("type", type_args)
assert reset_attempt["ok"] is True
def test_copy_shortcut_prefers_clipboard_verification(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_clipboard_get_metadata",
lambda: {"has_text": True, "has_image": False, "available_formats": ["CF_UNICODETEXT"]},
)
monkeypatch.setattr(agent, "_clipboard_get_text", lambda: "copied")
first = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
assert first["ok"] is True
assert first["verification_channels"] == ["clipboard_get"]
blocked = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
assert blocked["blocked"] is True
assert "clipboard_get" in blocked["error"]
observed = agent._dispatch_tool("clipboard_get", {})
assert observed["ok"] is True
assert observed["has_text"] is True
second = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
assert second["ok"] is True
def test_execute_command_blocks_unrequested_recursive_file_search(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Save the current note in Notepad"
result = agent._tool_execute_command({"command": "Get-ChildItem -Recurse -Filter *.txt"})
assert result["ok"] is False
assert result["blocked"] is True
assert "out of scope" in result["error"]
def test_execute_command_allows_recursive_file_search_when_objective_requests_it(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Find the saved text file path"
called: dict[str, Any] = {}
class _FakeProcess:
returncode = 0
def poll(self) -> int:
return 0
def communicate(self, timeout: int = 2):
return ("ok", "")
def fake_popen(*args, **kwargs):
called["command"] = args[0]
return _FakeProcess()
monkeypatch.setattr(agent_module.subprocess, "Popen", fake_popen)
result = agent._tool_execute_command({"command": "Get-ChildItem -Recurse -Filter *.txt"})
assert result["ok"] is True
assert called["command"] == "Get-ChildItem -Recurse -Filter *.txt"
def test_execute_command_launch_requires_focus_verification(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
called: dict[str, Any] = {}
class _FakeProcess:
returncode = 0
def poll(self) -> int:
return 0
def communicate(self, timeout: int = 2):
return ("", "")
def fake_popen(*args, **kwargs):
called["command"] = args[0]
return _FakeProcess()
monkeypatch.setattr(agent_module.subprocess, "Popen", fake_popen)
first = agent._dispatch_tool("execute_command", {"command": "start notepad"})
assert first["ok"] is True
assert first["background_launch_assumed"] is True
assert first["focus_change_assumed"] is False
assert first["verification_required"] is True
assert first["verification_channels"] == ["get_active_window", "see_screen"]
assert called["command"] == "start notepad"
blocked = agent._dispatch_tool("execute_command", {"command": "start notepad"})
assert blocked["blocked"] is True
assert "get_active_window" in blocked["error"]
observed = agent._dispatch_tool("get_active_window", {})
assert observed["ok"] is True
second = agent._dispatch_tool("execute_command", {"command": "start notepad"})
assert second["ok"] is True
def test_system_prompt_emphasizes_situational_awareness() -> None:
prompt = agent_module.SYSTEM_PROMPT
assert "Maintain a live mental model" in prompt
assert "classify -> choose control channel -> execute one meaningful transition -> verify" in prompt
assert "First classify, then act." in prompt
assert "Use see_screen at a balanced cadence" in prompt
assert "get_active_window" in prompt
assert "detect_dialog" in prompt
assert "dialog_set_filename" in prompt
assert "list_ui_elements" in prompt
assert "clipboard_get" in prompt
assert "Do not invent new subgoals" in prompt
assert "verify-and-finish" in prompt
assert "data.observed_result" in prompt
assert "Treat command-launched apps or URLs as background" in prompt
assert "#32770" in prompt
assert "secure desktop" in prompt.lower()
def test_observation_loop_prompt_pushes_action_or_finish() -> None:
prompt = agent_module.build_observation_loop_prompt("Save as [#32770]", repeated_steps=3)
assert "same stable window for 3 step(s)" in prompt
assert "Save as [#32770]" in prompt
assert "Do not keep calling broad observation tools" in prompt
assert "native window/dialog/element tool" in prompt
assert "Use enhance only if a small or text-heavy control must be read before acting." in prompt
assert "#32770 dialog" in prompt
def test_finish_likely_prompt_pushes_verification_then_completion() -> None:
prompt = agent_module.build_finish_likely_prompt(
'Save dialog closed and focus returned to "todo-demo.txt - Notepad". | Command verification confirms "todo-demo.txt" exists.',
prohibited_key_combos={"ctrl+shift+s"},
)
assert "objective is likely already satisfied" in prompt
assert "todo-demo.txt - Notepad" in prompt
assert "call see_screen" in prompt
assert "then call task_complete" in prompt
assert "Do not reopen menus" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
def test_initial_action_prompt_reinforces_observation_and_verification() -> None:
prompt = agent_module.build_initial_action_prompt("Open calculator", {"ctrl+shift+s"})
assert "JOB: Open calculator" in prompt
assert "First classify the current UI state from the latest evidence." in prompt
assert "Identify what changed since the last action or screen capture." in prompt
assert "classify -> choose control channel -> execute one meaningful transition -> verify" in prompt
assert "Prefer native window/dialog/element tools" in prompt
assert "get_active_window plus detect_dialog" in prompt
assert "click then see_screen" in prompt
assert "Do not invent new subgoals" in prompt
assert "Prefer non-visual verification when available" in prompt
assert "wait_for_focus_change" in prompt
assert "#32770 dialogs" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
assert "do not re-capture the screen just to reconfirm an obvious large input area" in prompt
assert 'task_complete(return=..., data={"observed_result": ...})' in prompt
def test_no_tool_prompt_recovers_by_reobserving() -> None:
prompt = agent_module.build_no_tool_prompt({"ctrl+shift+s"})
assert "Recover by re-observing the current desktop state instead of guessing." in prompt
assert "Start by classifying the surface." in prompt
assert "get_active_window" in prompt
assert "detect_dialog" in prompt
assert "clipboard_get" in prompt
assert "native window/dialog/element tools" in prompt
assert "Do not assume execute_command launches changed the foreground window" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
assert "If a modal, picker, or browser download/upload surface is likely" in prompt
def test_blocked_action_prompt_reanchors_on_screen_state() -> None:
prompt = agent_module.build_blocked_action_prompt("click", prohibited_key_combos={"ctrl+shift+s"})
assert "The last action using click was blocked or unreliable." in prompt
assert "Do not retry blindly." in prompt
assert "classify the current surface" in prompt
assert "detect_dialog" in prompt
assert "dialog_set_filename" in prompt
assert "get_active_window" in prompt
assert "get_cursor_position before move_mouse or drag" in prompt
assert "wait_for_focus_change" in prompt
assert "secure desktop or UAC" in prompt
assert "Switch strategy after the fresh classification" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
assert "native control instead of pixels" in prompt
def test_tool_schemas_include_completion_and_desktop_awareness_guidance(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.prohibited_key_combos = {"ctrl+shift+s"}
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
assert "data.observed_result" in schemas["task_complete"]["description"]
assert "before task_complete" in schemas["see_screen"]["description"]
assert "text-heavy targets" in schemas["enhance"]["description"]
assert "verify copy or cut results" in schemas["clipboard_get"]["description"]
assert "pointer state matters" in schemas["get_cursor_position"]["description"]
assert "verify focus and active app" in schemas["get_active_window"]["description"]
assert "foreground focus" in schemas["execute_command"]["description"]
assert "Prohibited for this run: ctrl+shift+s." in schemas["press_key"]["description"]
assert "dialog classification" in schemas["get_active_window"]["description"]
assert "visible top-level windows" in schemas["list_windows"]["description"]
assert "#32770 or picker surface" in schemas["detect_dialog"]["description"]
assert "filename or path field" in schemas["dialog_set_filename"]["description"]
assert "native child controls" in schemas["list_ui_elements"]["description"]
def test_tool_schemas_hide_optional_native_tools_when_mode_off(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.native_automation_mode = "off"
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
assert "get_active_window" in schemas
assert "list_windows" not in schemas
assert "detect_dialog" not in schemas
assert "list_ui_elements" not in schemas
def test_list_windows_returns_structured_surface_metadata(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_list_windows_info",
lambda visible_only=True: [
{
"available": True,
"hwnd": 111,
"title": "Open",
"class_name": "#32770",
"executable_name": "notepad.exe",
"surface_kind": "file_dialog",
"dialog_kind": "file_open",
}
],
)
monkeypatch.setattr(
agent,
"_get_active_window_info",
lambda: {
"available": True,
"hwnd": 111,
"title": "Open",
"class_name": "#32770",
"executable_name": "notepad.exe",
},
)
result = agent._tool_list_windows({})
assert result["ok"] is True
assert result["count"] == 1
assert result["surface_kind"] == "file_dialog"
assert result["dialog_kind"] == "file_open"
assert result["recommended_next_tools"][0] == "dialog_set_filename"
def test_detect_dialog_returns_buttons_and_target_handle(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_find_dialog_info",
lambda title_contains="": {
"available": True,
"hwnd": 222,
"title": "Save as",
"class_name": "#32770",
"executable_name": "notepad.exe",
},
)
monkeypatch.setattr(
agent,
"_get_active_window_info",
lambda: {
"available": True,
"hwnd": 222,
"title": "Save as",
"class_name": "#32770",
"executable_name": "notepad.exe",
},
)
monkeypatch.setattr(
agent,
"_list_ui_elements_for_window",
lambda hwnd, include_hidden=False: [
{
"handle": 10,
"role": "button",
"text": "Save",
"target": {"type": "ui_element", "handle": 10, "window_handle": hwnd},
}
],
)
result = agent._tool_detect_dialog({})
assert result["ok"] is True
assert result["dialog_kind"] == "file_save"
assert result["target"]["type"] == "dialog"
assert result["buttons"][0]["text"] == "Save"
def test_notepad_save_pattern_enters_finish_likely_mode(tmp_path: Path, monkeypatch) -> None:
events: list[dict[str, object]] = []
agent = _build_agent(tmp_path, monkeypatch)
agent.event_callback = events.append
agent.objective = "Open Notepad, type a short to-do list, save it as todo-demo.txt in Documents"
agent.finish_likely_state["target_filename"] = agent._infer_target_filename(agent.objective)
agent.last_observed_window = {
"available": True,
"title": "Save as",
"class_name": "#32770",
}
agent.step = 24
window_result = agent._update_finish_likely_from_tool(
"get_active_window",
{},
{
"ok": True,
"window": {
"available": True,
"title": "todo-demo.txt - Notepad",
"class_name": "Notepad",
},
},
)
assert agent.finish_likely_state["active"] is False
assert [item["kind"] for item in window_result["completion_evidence"]] == [
"active_window_title_matches_target",
"save_dialog_closed_to_target_window",
]
agent.last_visual_signature = "stable-post-save"
agent.step = 25
command_result = agent._update_finish_likely_from_tool(
"execute_command",
{"command": "powershell -NoProfile -Command \"Test-Path ... todo-demo.txt\""},
{
"ok": True,
"exit_code": 0,
"stdout": r"C:\Users\paulw\Documents\todo-demo.txt",
},
)
assert agent.finish_likely_state["active"] is True
assert agent.finish_likely_state["summary"]
assert command_result["finish_likely"]["target_filename"] == "todo-demo.txt"
assert any(event["event_type"] == "completion_evidence" for event in events)
assert any(event["event_type"] == "finish_likely" for event in events)
def test_finish_likely_guard_blocks_reopening_menu_after_fresh_verification(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Open Notepad, type a short to-do list, save it as todo-demo.txt in Documents"
agent.finish_likely_state.update(
{
"active": True,
"activated_at_step": 24,
"target_filename": "todo-demo.txt",
"summary": 'Save dialog closed and focus returned to "todo-demo.txt - Notepad". | Command verification confirms "todo-demo.txt" exists.',
"fresh_verification_done": False,
"verification_step": 0,
"post_completion_visual_signature": "",
}
)
agent.step = 25
verify_result = agent._dispatch_tool("see_screen", {})
assert verify_result["ok"] is True
assert verify_result["finish_likely_verification_done"] is True
assert agent.finish_likely_state["fresh_verification_done"] is True
blocked = agent._dispatch_tool("press_key", {"key": "alt+f"})
assert blocked["ok"] is False
assert blocked["blocked"] is True
assert blocked["blocked_reason"] == "finish_likely"
assert "appears satisfied" in blocked["error"]
assert "reopen menus" in blocked["hint"].lower()
def test_dispatch_rejects_unknown_and_disabled_tools(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.disabled_tools = {"scroll"}
assert agent._dispatch_tool("unknown_tool", {}) == {"ok": False, "error": "Unknown tool: unknown_tool"}
assert agent._dispatch_tool("scroll", {}) == {"ok": False, "error": "Tool 'scroll' is disabled for this job."}
def test_tool_schemas_filter_disabled_tools(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.disabled_tools = {"scroll", "clipboard_get"}
tool_names = {tool["name"] for tool in agent._tool_schemas()}
assert "scroll" not in tool_names
assert "clipboard_get" not in tool_names
assert "click" in tool_names
assert "task_complete" in tool_names
def test_normalize_disabled_tools_rejects_invalid_and_required_names() -> None:
with pytest.raises(ValueError, match="Unknown disabled tool"):
agent_module.normalize_disabled_tools(["not_a_real_tool"])
with pytest.raises(ValueError, match="Cannot disable required tool"):
agent_module.normalize_disabled_tools(["task_complete"])

View File

@@ -20,6 +20,7 @@ def test_cli_emits_structured_return_and_data(monkeypatch: Any, capsys, tmp_path
port=8787, port=8787,
runs_dir=tmp_path / "runs", runs_dir=tmp_path / "runs",
db_path=tmp_path / "screenjob.db", db_path=tmp_path / "screenjob.db",
prohibited_key_combos=("ctrl+shift+s",),
) )
config.runs_dir.mkdir(parents=True, exist_ok=True) config.runs_dir.mkdir(parents=True, exist_ok=True)
@@ -71,3 +72,11 @@ def test_cli_emits_structured_return_and_data(monkeypatch: Any, capsys, tmp_path
assert payload["data"] == "file1.txt\nfile2.txt" assert payload["data"] == "file1.txt\nfile2.txt"
assert captured_kwargs["options"].reasoning_effort == "medium" assert captured_kwargs["options"].reasoning_effort == "medium"
assert captured_kwargs["options"].screen_context_decay_steps == 4 assert captured_kwargs["options"].screen_context_decay_steps == 4
assert captured_kwargs["options"].max_visual_context_images == 3
assert captured_kwargs["options"].native_automation_mode == "prefer"
assert captured_kwargs["options"].dialog_timeout_seconds == 12.0
assert captured_kwargs["options"].focus_timeout_seconds == 8.0
assert captured_kwargs["options"].ui_element_timeout_seconds == 8.0
assert captured_kwargs["options"].max_retries_per_surface == 3
assert captured_kwargs["options"].pretty_logs is False
assert captured_kwargs["options"].prohibited_key_combos == {"ctrl+shift+s"}

View File

@@ -46,6 +46,13 @@ class FakeJobManager:
click_pause: float = 0.10, click_pause: float = 0.10,
reasoning_effort: str = "medium", reasoning_effort: str = "medium",
screen_context_decay_steps: int = 4, screen_context_decay_steps: int = 4,
max_visual_context_images: int = 3,
native_automation_mode: str = "prefer",
dialog_timeout_seconds: float = 12.0,
focus_timeout_seconds: float = 8.0,
ui_element_timeout_seconds: float = 8.0,
max_retries_per_surface: int = 3,
pretty_logs: bool = False,
disabled_tools: list[str] | None = None, disabled_tools: list[str] | None = None,
safety_override: bool = False, safety_override: bool = False,
no_failsafe: bool = False, no_failsafe: bool = False,
@@ -69,6 +76,13 @@ class FakeJobManager:
"click_pause": click_pause, "click_pause": click_pause,
"reasoning_effort": reasoning_effort, "reasoning_effort": reasoning_effort,
"screen_context_decay_steps": screen_context_decay_steps, "screen_context_decay_steps": screen_context_decay_steps,
"max_visual_context_images": max_visual_context_images,
"native_automation_mode": native_automation_mode,
"dialog_timeout_seconds": dialog_timeout_seconds,
"focus_timeout_seconds": focus_timeout_seconds,
"ui_element_timeout_seconds": ui_element_timeout_seconds,
"max_retries_per_surface": max_retries_per_surface,
"pretty_logs": pretty_logs,
"no_failsafe": no_failsafe, "no_failsafe": no_failsafe,
} }
self._jobs[job_id] = { self._jobs[job_id] = {
@@ -293,6 +307,7 @@ def _build_app(tmp_path: Path, monkeypatch: Any, disable_ui: bool = False):
port=8787, port=8787,
runs_dir=tmp_path / "runs", runs_dir=tmp_path / "runs",
db_path=tmp_path / "screenjob_test.db", db_path=tmp_path / "screenjob_test.db",
prohibited_key_combos=("ctrl+shift+s",),
) )
config.runs_dir.mkdir(parents=True, exist_ok=True) config.runs_dir.mkdir(parents=True, exist_ok=True)
app = server_module.create_app(config) app = server_module.create_app(config)
@@ -326,6 +341,13 @@ def test_create_job_returns_only_job_id_and_defaults_model(tmp_path: Path, monke
assert manager.last_submit_payload["disabled_tools"] == ["click"] assert manager.last_submit_payload["disabled_tools"] == ["click"]
assert manager.last_submit_payload["reasoning_effort"] == "medium" assert manager.last_submit_payload["reasoning_effort"] == "medium"
assert manager.last_submit_payload["screen_context_decay_steps"] == 4 assert manager.last_submit_payload["screen_context_decay_steps"] == 4
assert manager.last_submit_payload["max_visual_context_images"] == 3
assert manager.last_submit_payload["native_automation_mode"] == "prefer"
assert manager.last_submit_payload["dialog_timeout_seconds"] == 12.0
assert manager.last_submit_payload["focus_timeout_seconds"] == 8.0
assert manager.last_submit_payload["ui_element_timeout_seconds"] == 8.0
assert manager.last_submit_payload["max_retries_per_surface"] == 3
assert manager.last_submit_payload["pretty_logs"] is False
status_res = client.get(f"/api/jobs/{job_id}/status", headers=headers) status_res = client.get(f"/api/jobs/{job_id}/status", headers=headers)
assert status_res.status_code == 200 assert status_res.status_code == 200
@@ -334,6 +356,36 @@ def test_create_job_returns_only_job_id_and_defaults_model(tmp_path: Path, monke
assert "data" in status_res.json()["response"] assert "data" in status_res.json()["response"]
def test_create_job_rejects_invalid_disabled_tool_names(tmp_path: Path, monkeypatch: Any) -> None:
app, _ = _build_app(tmp_path, monkeypatch, disable_ui=False)
client = TestClient(app)
headers = {"Authorization": "Bearer test_token"}
response = client.post(
"/api/jobs",
headers=headers,
json={"job": "Open amazon.de", "disabled_tools": ["not_a_real_tool"], "safety_override": True},
)
assert response.status_code == 400
assert "Unknown disabled tool" in response.json()["detail"]
def test_create_job_rejects_disabling_task_complete(tmp_path: Path, monkeypatch: Any) -> None:
app, _ = _build_app(tmp_path, monkeypatch, disable_ui=False)
client = TestClient(app)
headers = {"Authorization": "Bearer test_token"}
response = client.post(
"/api/jobs",
headers=headers,
json={"job": "Open amazon.de", "disabled_tools": ["task_complete"], "safety_override": True},
)
assert response.status_code == 400
assert "Cannot disable required tool" in response.json()["detail"]
def test_cancel_endpoint_and_events(tmp_path: Path, monkeypatch: Any) -> None: def test_cancel_endpoint_and_events(tmp_path: Path, monkeypatch: Any) -> None:
app, _ = _build_app(tmp_path, monkeypatch, disable_ui=False) app, _ = _build_app(tmp_path, monkeypatch, disable_ui=False)
client = TestClient(app) client = TestClient(app)