Files
screenjob/tests/test_agent_tools.py
Space-Banane f0058d1057
Some checks failed
CI / test (push) Failing after 10s
feat: add support for Windows-only tools and enhance platform checks
2026-05-31 21:02:56 +02:00

961 lines
36 KiB
Python

from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any
import pytest
from PIL import Image
import src.agent as agent_module
from src.models import RunArtifacts, RuntimeOptions
class _DummyPyAutoGUI:
FAILSAFE = True
PAUSE = 0.0
def __init__(self) -> None:
self.last_move_to: tuple[int, int] | None = None
self.last_move_duration: float | None = None
self.last_click: dict[str, object] | None = None
self.last_hotkey: tuple[str, ...] | None = None
self.last_drag_to: dict[str, object] | None = None
self.last_scroll: int | None = None
self.current_position: tuple[int, int] = (640, 360)
def screenshot(self) -> Image.Image:
return Image.new("RGB", (1280, 720), color=(24, 24, 24))
def size(self) -> tuple[int, int]:
return (1280, 720)
def moveTo(self, x: int, y: int, duration: float = 0.0) -> None: # noqa: N802
self.last_move_to = (x, y)
self.last_move_duration = duration
self.current_position = (x, y)
def click(
self,
x: int,
y: int,
clicks: int = 1,
interval: float = 0.0,
button: str = "left",
) -> None:
self.last_click = {"x": x, "y": y, "clicks": clicks, "interval": interval, "button": button}
self.current_position = (x, y)
def dragTo(self, x: int, y: int, duration: float = 0.0, button: str = "left") -> None: # noqa: N802
self.last_drag_to = {"x": x, "y": y, "duration": duration, "button": button}
self.current_position = (x, y)
def scroll(self, amount: int) -> None:
self.last_scroll = amount
def write(self, _: str, interval: float = 0.0) -> None:
return None
def press(self, _: str) -> None:
return None
def hotkey(self, *keys: str) -> None:
self.last_hotkey = tuple(keys)
def position(self):
x, y = self.current_position
return type("Point", (), {"x": x, "y": y})()
def _build_agent(tmp_path: Path, monkeypatch) -> agent_module.ScreenJobAgent:
dummy_gui = _DummyPyAutoGUI()
monkeypatch.setattr(agent_module, "pyautogui", dummy_gui)
monkeypatch.setattr(agent_module.time, "sleep", lambda _: None)
run_dir = tmp_path / "run"
run_dir.mkdir(parents=True, exist_ok=True)
artifacts = RunArtifacts(
run_id="test_run",
root_dir=run_dir,
logs_dir=run_dir / "logs",
shots_dir=run_dir / "shots",
enhance_dir=run_dir / "enhance",
log_file=run_dir / "screenjob.log",
)
options = RuntimeOptions(model="gpt-5.4-mini")
logger = logging.getLogger("screenjob-test-agent")
return agent_module.ScreenJobAgent(
client=object(), # type: ignore[arg-type]
logger=logger,
artifacts=artifacts,
options=options,
)
def test_task_complete_captures_return_and_data(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_task_complete({"return": "Task completed successfully", "data": "file1\nfile2"})
assert result["ok"] is True
assert result["return"] == "Task completed successfully"
assert result["data"] == "file1\nfile2"
assert "verification" not in result
assert agent.final_result == "Task completed successfully"
assert agent.final_data == "file1\nfile2"
def test_click_supports_directional_offsets(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
click_result = agent._tool_click(
{
"coordinate": {"x": 100, "y": 100},
"offset_up": "2px",
"offset_right": 7,
"offset": {"x": 3, "y": 4},
"button": "right",
"click_count": 2,
"interval_seconds": "0.5s",
"duration_seconds": "0.2s",
"sleep_after_seconds": 0,
}
)
assert click_result["ok"] is True
assert click_result["clicked"] == {"x": 110, "y": 102}
assert click_result["button"] == "right"
assert click_result["click_count"] == 2
assert click_result["interval_seconds"] == 0.5
assert click_result["duration_seconds"] == 0.2
assert agent_module.pyautogui.last_click == {
"x": 110,
"y": 102,
"clicks": 2,
"interval": 0.5,
"button": "right",
}
def test_scroll_supports_direction_and_amount(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_scroll(
{
"amount": 8,
"direction": "down",
"coordinate": {"x": 1400, "y": -5},
"sleep_after_seconds": 0,
}
)
assert result["ok"] is True
assert result["amount"] == -8
assert result["direction"] == "down"
assert result["moved_to"] == {"x": 1279, "y": 0}
assert agent_module.pyautogui.last_scroll == -8
def test_drag_translates_coordinates_and_button(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_drag(
{
"start_coordinate": {"x": -10, "y": 100},
"end_coordinate": {"x": 1285, "y": 800},
"button": "middle",
"duration_seconds": "0.3s",
"sleep_after_seconds": 0,
}
)
assert result["ok"] is True
assert result["from"] == {"x": 0, "y": 100}
assert result["to"] == {"x": 1279, "y": 719}
assert result["button"] == "middle"
assert result["duration_seconds"] == 0.3
assert agent_module.pyautogui.last_drag_to == {
"x": 1279,
"y": 719,
"duration": 0.3,
"button": "middle",
}
def test_move_mouse_clamps_target_coordinate(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_move_mouse({"coordinate": {"x": 1500, "y": -5}, "duration_seconds": "0.4s"})
assert result["ok"] is True
assert result["moved_to"] == {"x": 1279, "y": 0}
assert result["duration_seconds"] == 0.4
assert agent_module.pyautogui.last_move_to == (1279, 0)
def test_clipboard_get_and_set_round_trip(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
state = {"text": ""}
monkeypatch.setattr(agent, "_clipboard_set_text", lambda text: state.__setitem__("text", text))
monkeypatch.setattr(agent, "_clipboard_get_text", lambda: state["text"])
monkeypatch.setattr(
agent,
"_clipboard_get_metadata",
lambda: {"has_text": bool(state["text"]), "has_image": True, "available_formats": ["CF_UNICODETEXT", "CF_DIB"]},
)
set_result = agent._tool_clipboard_set({"text": "hello clipboard"})
get_result = agent._tool_clipboard_get({})
assert set_result["ok"] is True
assert set_result["length"] == 15
assert get_result["ok"] is True
assert get_result["text"] == "hello clipboard"
assert get_result["length"] == 15
assert get_result["has_text"] is True
assert get_result["has_image"] is True
assert get_result["available_formats"] == ["CF_UNICODETEXT", "CF_DIB"]
def test_clipboard_set_falls_back_to_powershell_when_native_path_fails(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
state = {"text": ""}
def fail_native(_: str) -> None:
raise OSError("[WinError 6] The handle is invalid.")
def shell_fallback(text: str) -> None:
state["text"] = text
monkeypatch.setattr(agent, "_clipboard_set_text", fail_native)
monkeypatch.setattr(agent, "_clipboard_set_text_via_shell", shell_fallback)
result = agent._tool_clipboard_set({"text": "Example Domain"})
assert result["ok"] is True
assert result["used_shell_fallback"] is True
assert "WinError 6" in result["native_error"]
assert state["text"] == "Example Domain"
def test_get_cursor_position_returns_current_mouse_location(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent_module.pyautogui.current_position = (321, 654)
result = agent._tool_get_cursor_position({})
assert result["ok"] is True
assert result["position"] == {"x": 321, "y": 654}
def test_get_active_window_returns_metadata_shape(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_get_active_window_info",
lambda: {
"available": True,
"hwnd": 1234,
"title": "Settings",
"class_name": "ApplicationFrameWindow",
"thread_id": 44,
"process_id": 77,
"is_visible": True,
"rect": {"left": 10, "top": 20, "right": 410, "bottom": 320, "width": 400, "height": 300},
},
)
result = agent._tool_get_active_window({})
assert result["ok"] is True
assert result["window"]["title"] == "Settings"
assert result["window"]["rect"]["width"] == 400
def test_enhance_defaults_to_small_ui_preset(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_enhance({"coordinate": {"x": 100, "y": 120}})
assert result["ok"] is True
meta = result["meta"]
assert meta["region"] == "small"
assert meta["mode"] == "ui"
assert meta["scale"] == 4
assert Path(meta["path"]).exists()
assert meta["target_pixel"]["x"] >= 0
assert meta["target_pixel"]["y"] >= 0
def test_enhance_supports_text_mode_and_scale_clamp(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_enhance(
{
"coordinate": {"x": -99, "y": 9999},
"region": "medium",
"mode": "text",
"scale": 99,
}
)
assert result["ok"] is True
meta = result["meta"]
assert meta["region"] == "medium"
assert meta["mode"] == "text"
assert meta["scale"] == 6
assert meta["requested_coord"] == {"x": -99, "y": 9999}
assert meta["source_coord"] == {"x": 0, "y": 719}
assert Path(meta["path"]).exists()
def test_press_key_supports_hotkey_combo(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
result = agent._tool_press_key({"key": "meta+r"})
assert result["ok"] is True
assert result["key"] == "win+r"
assert result["message"] == "Key combo executed."
assert agent_module.pyautogui.last_hotkey == ("win", "r")
def test_press_key_blocks_prohibited_combo(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.prohibited_key_combos = {"ctrl+shift+s"}
agent.prohibited_key_combos = agent._normalize_prohibited_key_combos(agent.options.prohibited_key_combos)
result = agent._tool_press_key({"key": "ctrl+shift+s"})
assert result["ok"] is False
assert result["blocked"] is True
assert result["key"] == "ctrl+shift+s"
assert "prohibited by runtime configuration" in result["error"]
assert "another allowed route" in result["hint"]
def test_press_key_blocks_prohibited_combo_after_alias_normalization(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.prohibited_key_combos = {"meta+r"}
agent.prohibited_key_combos = agent._normalize_prohibited_key_combos(agent.options.prohibited_key_combos)
result = agent._tool_press_key({"key": "win+r"})
assert result["ok"] is False
assert result["blocked"] is True
assert result["key"] == "win+r"
def test_context_compaction_trigger_and_payload(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Open settings app"
agent.previous_response_id = "resp_123"
agent.step = 4
agent.last_context_compact_step = 0
agent.options.screen_context_decay_steps = 4
agent.recent_tool_summaries = ["step=1 tool=see_screen status=ok"]
agent.last_screen_data_url = "data:image/png;base64,abc"
agent.last_screen_meta = {"width": 1280, "height": 720, "path": "C:/tmp/frame.png"}
assert agent._should_compact_context() is True
visual_message = agent._build_visual_message("Current screen", "data:image/png;base64,abc", agent.last_screen_meta)
agent._register_visual_context_message(visual_message, agent.last_screen_meta, tool_name="see_screen")
compacted = agent._build_compacted_pending_input("decay")
assert len(compacted) == 2
assert "Context compaction activated due to stale context decay." in compacted[0]["content"][0]["text"]
assert "Open settings app" in compacted[0]["content"][0]["text"]
assert "Treat prior reasoning as stale" in compacted[0]["content"][0]["text"]
assert "Retained visual observations:" in compacted[0]["content"][0]["text"]
assert "do not call see_screen again only because compaction happened" in compacted[0]["content"][0]["text"]
assert "observe -> decide -> act -> verify" in compacted[0]["content"][0]["text"]
def test_context_compaction_drops_function_call_outputs_from_rebased_input(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Open settings app"
visual_meta = {"path": "C:/tmp/frame.png"}
visual_message = agent._build_visual_message("Current screen", "data:image/png;base64,abc", visual_meta)
agent._register_visual_context_message(visual_message, visual_meta, tool_name="see_screen")
compacted = agent._build_compacted_pending_input(
"decay",
carryover_items=[
{"type": "function_call_output", "call_id": "call_123", "output": "{\"ok\": true}"},
{"role": "user", "content": [{"type": "input_text", "text": "blocked hint"}]},
],
)
assert len(compacted) == 3
assert compacted[1]["role"] == "user"
assert compacted[1]["content"][0]["text"] == "blocked hint"
assert all(item.get("type") != "function_call_output" for item in compacted)
def test_visual_context_budget_keeps_only_latest_three_images(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.max_visual_context_images = 3
captured_times = [
"2026-05-30T10:00:03+00:00",
"2026-05-30T10:00:01+00:00",
"2026-05-30T10:00:04+00:00",
"2026-05-30T10:00:02+00:00",
]
for idx, captured_at in enumerate(captured_times):
meta = {"path": f"C:/tmp/frame_{idx}.png", "captured_at": captured_at}
message = agent._build_visual_message(f"frame {idx}", f"data:image/png;base64,{idx}", meta)
agent._register_visual_context_message(message, meta, tool_name="see_screen")
assert agent.visual_context_overflow_pending is True
assert [entry["meta"]["path"] for entry in agent.visual_context_messages] == [
"C:/tmp/frame_3.png",
"C:/tmp/frame_0.png",
"C:/tmp/frame_2.png",
]
def test_compacted_input_uses_latest_visuals_by_capture_time(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.max_visual_context_images = 3
agent.objective = "Verify the current app window"
for idx, captured_at in enumerate(
[
"2026-05-30T10:00:04+00:00",
"2026-05-30T10:00:01+00:00",
"2026-05-30T10:00:03+00:00",
"2026-05-30T10:00:02+00:00",
]
):
meta = {"path": f"C:/tmp/frame_{idx}.png", "captured_at": captured_at}
message = agent._build_visual_message(f"frame {idx}", f"data:image/png;base64,{idx}", meta)
agent._register_visual_context_message(message, meta, tool_name="see_screen")
compacted = agent._build_compacted_pending_input("visual_budget")
visual_messages = [
item
for item in compacted
if isinstance(item.get("content"), list)
and any(part.get("type") == "input_image" for part in item["content"] if isinstance(part, dict))
]
assert len(visual_messages) == 3
assert [
json.loads(message["content"][0]["text"].split("Metadata: ", 1)[1].split("\n", 1)[0])["path"]
for message in visual_messages
] == [
"C:/tmp/frame_3.png",
"C:/tmp/frame_2.png",
"C:/tmp/frame_0.png",
]
def test_context_compaction_event_includes_visual_budget_reason_and_paths(tmp_path: Path, monkeypatch) -> None:
events: list[dict[str, object]] = []
agent = _build_agent(tmp_path, monkeypatch)
agent.event_callback = events.append
agent.step = 5
agent.recent_tool_summaries = ["step=4 tool=enhance status=ok"]
agent.visual_context_messages = [
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/1.png"}},
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/2.png"}},
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/3.png"}},
]
agent._emit_context_compacted("visual_budget")
assert events[-1]["event_type"] == "context_compacted"
payload = events[-1]["payload"]
assert payload["rebuild_reason"] == "visual_budget"
assert payload["visual_context_paths"] == ["C:/tmp/1.png", "C:/tmp/2.png", "C:/tmp/3.png"]
def test_observation_loop_blocks_repeated_broad_reobservation(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.step_history = [
{
"step": 21,
"tool_names": ["get_active_window", "see_screen"],
"window_signature": "123|#32770|Save as",
"window_summary": "Save as [#32770]",
"had_visual": True,
},
{
"step": 22,
"tool_names": ["get_active_window", "see_screen"],
"window_signature": "123|#32770|Save as",
"window_summary": "Save as [#32770]",
"had_visual": True,
},
{
"step": 23,
"tool_names": ["get_active_window", "see_screen"],
"window_signature": "123|#32770|Save as",
"window_summary": "Save as [#32770]",
"had_visual": True,
},
]
blocked = agent._dispatch_tool("see_screen", {})
assert blocked["ok"] is False
assert blocked["blocked"] is True
assert blocked["blocked_reason"] == "observation_loop"
assert "unchanged foreground window" in blocked["error"]
assert blocked["window_summary"] == "Save as [#32770]"
def test_repeated_ambiguous_action_requires_verification_and_then_blocks(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
type_args = {"text": "repeat me"}
first = agent._dispatch_tool("type", type_args)
assert first["ok"] is True
assert first["verification_required"] is True
assert first["verification_channels"] == ["enhance", "get_active_window", "see_screen"]
blocked_without_verification = agent._dispatch_tool("type", type_args)
assert blocked_without_verification["blocked"] is True
assert "see_screen" in blocked_without_verification["error"]
assert agent._dispatch_tool("see_screen", {})["ok"] is True
assert agent._dispatch_tool("type", type_args)["ok"] is True
assert agent._dispatch_tool("see_screen", {})["ok"] is True
assert agent._dispatch_tool("type", type_args)["ok"] is True
assert agent._dispatch_tool("see_screen", {})["ok"] is True
blocked_after_retry_budget = agent._dispatch_tool("type", type_args)
assert blocked_after_retry_budget["blocked"] is True
assert "3 time(s) on the same surface" in blocked_after_retry_budget["error"]
assert agent._dispatch_tool("see_screen", {})["ok"] is True
reset_attempt = agent._dispatch_tool("type", type_args)
assert reset_attempt["ok"] is True
def test_copy_shortcut_prefers_clipboard_verification(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_clipboard_get_metadata",
lambda: {"has_text": True, "has_image": False, "available_formats": ["CF_UNICODETEXT"]},
)
monkeypatch.setattr(agent, "_clipboard_get_text", lambda: "copied")
first = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
assert first["ok"] is True
assert first["verification_channels"] == ["clipboard_get"]
blocked = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
assert blocked["blocked"] is True
assert "clipboard_get" in blocked["error"]
observed = agent._dispatch_tool("clipboard_get", {})
assert observed["ok"] is True
assert observed["has_text"] is True
second = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
assert second["ok"] is True
def test_execute_command_blocks_unrequested_recursive_file_search(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Save the current note in Notepad"
result = agent._tool_execute_command({"command": "Get-ChildItem -Recurse -Filter *.txt"})
assert result["ok"] is False
assert result["blocked"] is True
assert "out of scope" in result["error"]
def test_execute_command_allows_recursive_file_search_when_objective_requests_it(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Find the saved text file path"
called: dict[str, Any] = {}
class _FakeProcess:
returncode = 0
def poll(self) -> int:
return 0
def communicate(self, timeout: int = 2):
return ("ok", "")
def fake_popen(*args, **kwargs):
called["command"] = args[0]
return _FakeProcess()
monkeypatch.setattr(agent_module.subprocess, "Popen", fake_popen)
result = agent._tool_execute_command({"command": "Get-ChildItem -Recurse -Filter *.txt"})
assert result["ok"] is True
assert called["command"] == "Get-ChildItem -Recurse -Filter *.txt"
def test_execute_command_launch_requires_focus_verification(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
called: dict[str, Any] = {}
class _FakeProcess:
returncode = 0
def poll(self) -> int:
return 0
def communicate(self, timeout: int = 2):
return ("", "")
def fake_popen(*args, **kwargs):
called["command"] = args[0]
return _FakeProcess()
monkeypatch.setattr(agent_module.subprocess, "Popen", fake_popen)
first = agent._dispatch_tool("execute_command", {"command": "start notepad"})
assert first["ok"] is True
assert first["background_launch_assumed"] is True
assert first["focus_change_assumed"] is False
assert first["verification_required"] is True
assert first["verification_channels"] == ["get_active_window", "see_screen"]
assert called["command"] == "start notepad"
blocked = agent._dispatch_tool("execute_command", {"command": "start notepad"})
assert blocked["blocked"] is True
assert "get_active_window" in blocked["error"]
observed = agent._dispatch_tool("get_active_window", {})
assert observed["ok"] is True
second = agent._dispatch_tool("execute_command", {"command": "start notepad"})
assert second["ok"] is True
def test_system_prompt_emphasizes_situational_awareness() -> None:
prompt = agent_module.SYSTEM_PROMPT
assert "Maintain a live mental model" in prompt
assert "classify -> choose control channel -> execute one meaningful transition -> verify" in prompt
assert "First classify, then act." in prompt
assert "Use see_screen at a balanced cadence" in prompt
assert "get_active_window" in prompt
assert "detect_dialog" in prompt
assert "dialog_set_filename" in prompt
assert "list_ui_elements" in prompt
assert "clipboard_get" in prompt
assert "Do not invent new subgoals" in prompt
assert "verify-and-finish" in prompt
assert "data.observed_result" in prompt
assert "Treat command-launched apps or URLs as background" in prompt
assert "#32770" in prompt
assert "secure desktop" in prompt.lower()
def test_observation_loop_prompt_pushes_action_or_finish() -> None:
prompt = agent_module.build_observation_loop_prompt("Save as [#32770]", repeated_steps=3)
assert "same stable window for 3 step(s)" in prompt
assert "Save as [#32770]" in prompt
assert "Do not keep calling broad observation tools" in prompt
assert "native window/dialog/element tool" in prompt
assert "Use enhance only if a small or text-heavy control must be read before acting." in prompt
assert "#32770 dialog" in prompt
def test_finish_likely_prompt_pushes_verification_then_completion() -> None:
prompt = agent_module.build_finish_likely_prompt(
'Save dialog closed and focus returned to "todo-demo.txt - Notepad". | Command verification confirms "todo-demo.txt" exists.',
prohibited_key_combos={"ctrl+shift+s"},
)
assert "objective is likely already satisfied" in prompt
assert "todo-demo.txt - Notepad" in prompt
assert "call see_screen" in prompt
assert "then call task_complete" in prompt
assert "Do not reopen menus" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
def test_initial_action_prompt_reinforces_observation_and_verification() -> None:
prompt = agent_module.build_initial_action_prompt("Open calculator", {"ctrl+shift+s"})
assert "JOB: Open calculator" in prompt
assert "First classify the current UI state from the latest evidence." in prompt
assert "Identify what changed since the last action or screen capture." in prompt
assert "classify -> choose control channel -> execute one meaningful transition -> verify" in prompt
assert "Prefer native window/dialog/element tools" in prompt
assert "get_active_window plus detect_dialog" in prompt
assert "click then see_screen" in prompt
assert "Do not invent new subgoals" in prompt
assert "Prefer non-visual verification when available" in prompt
assert "wait_for_focus_change" in prompt
assert "#32770 dialogs" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
assert "do not re-capture the screen just to reconfirm an obvious large input area" in prompt
assert 'task_complete(return=..., data={"observed_result": ...})' in prompt
def test_no_tool_prompt_recovers_by_reobserving() -> None:
prompt = agent_module.build_no_tool_prompt({"ctrl+shift+s"})
assert "Recover by re-observing the current desktop state instead of guessing." in prompt
assert "Start by classifying the surface." in prompt
assert "get_active_window" in prompt
assert "detect_dialog" in prompt
assert "clipboard_get" in prompt
assert "native window/dialog/element tools" in prompt
assert "Do not assume execute_command launches changed the foreground window" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
assert "If a modal, picker, or browser download/upload surface is likely" in prompt
def test_blocked_action_prompt_reanchors_on_screen_state() -> None:
prompt = agent_module.build_blocked_action_prompt("click", prohibited_key_combos={"ctrl+shift+s"})
assert "The last action using click was blocked or unreliable." in prompt
assert "Do not retry blindly." in prompt
assert "classify the current surface" in prompt
assert "detect_dialog" in prompt
assert "dialog_set_filename" in prompt
assert "get_active_window" in prompt
assert "get_cursor_position before move_mouse or drag" in prompt
assert "wait_for_focus_change" in prompt
assert "secure desktop or UAC" in prompt
assert "Switch strategy after the fresh classification" in prompt
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
assert "native control instead of pixels" in prompt
def test_tool_schemas_include_completion_and_desktop_awareness_guidance(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.prohibited_key_combos = {"ctrl+shift+s"}
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
assert "data.observed_result" in schemas["task_complete"]["description"]
assert "before task_complete" in schemas["see_screen"]["description"]
assert "text-heavy targets" in schemas["enhance"]["description"]
assert "verify copy or cut results" in schemas["clipboard_get"]["description"]
assert "pointer state matters" in schemas["get_cursor_position"]["description"]
assert "verify focus and active app" in schemas["get_active_window"]["description"]
assert "foreground focus" in schemas["execute_command"]["description"]
assert "Prohibited for this run: ctrl+shift+s." in schemas["press_key"]["description"]
assert "dialog classification" in schemas["get_active_window"]["description"]
assert "visible top-level windows" in schemas["list_windows"]["description"]
assert "#32770 or picker surface" in schemas["detect_dialog"]["description"]
assert "filename or path field" in schemas["dialog_set_filename"]["description"]
assert "native child controls" in schemas["list_ui_elements"]["description"]
def test_tool_schemas_hide_optional_native_tools_when_mode_off(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.options.native_automation_mode = "off"
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
assert "get_active_window" in schemas
assert "list_windows" not in schemas
assert "detect_dialog" not in schemas
assert "list_ui_elements" not in schemas
def test_tool_schemas_hide_windows_only_tools_on_non_windows_host(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(agent_module.sys, "platform", "linux")
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
assert "get_active_window" not in schemas
assert "list_windows" not in schemas
assert "detect_dialog" not in schemas
assert "list_ui_elements" not in schemas
result = agent._dispatch_tool("get_active_window", {})
assert result["ok"] is False
assert result["error"] == "Tool 'get_active_window' is only available on Windows."
def test_list_windows_returns_structured_surface_metadata(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_list_windows_info",
lambda visible_only=True: [
{
"available": True,
"hwnd": 111,
"title": "Open",
"class_name": "#32770",
"executable_name": "notepad.exe",
"surface_kind": "file_dialog",
"dialog_kind": "file_open",
}
],
)
monkeypatch.setattr(
agent,
"_get_active_window_info",
lambda: {
"available": True,
"hwnd": 111,
"title": "Open",
"class_name": "#32770",
"executable_name": "notepad.exe",
},
)
result = agent._tool_list_windows({})
assert result["ok"] is True
assert result["count"] == 1
assert result["surface_kind"] == "file_dialog"
assert result["dialog_kind"] == "file_open"
assert result["recommended_next_tools"][0] == "dialog_set_filename"
def test_detect_dialog_returns_buttons_and_target_handle(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
monkeypatch.setattr(
agent,
"_find_dialog_info",
lambda title_contains="": {
"available": True,
"hwnd": 222,
"title": "Save as",
"class_name": "#32770",
"executable_name": "notepad.exe",
},
)
monkeypatch.setattr(
agent,
"_get_active_window_info",
lambda: {
"available": True,
"hwnd": 222,
"title": "Save as",
"class_name": "#32770",
"executable_name": "notepad.exe",
},
)
monkeypatch.setattr(
agent,
"_list_ui_elements_for_window",
lambda hwnd, include_hidden=False: [
{
"handle": 10,
"role": "button",
"text": "Save",
"target": {"type": "ui_element", "handle": 10, "window_handle": hwnd},
}
],
)
result = agent._tool_detect_dialog({})
assert result["ok"] is True
assert result["dialog_kind"] == "file_save"
assert result["target"]["type"] == "dialog"
assert result["buttons"][0]["text"] == "Save"
def test_notepad_save_pattern_enters_finish_likely_mode(tmp_path: Path, monkeypatch) -> None:
events: list[dict[str, object]] = []
agent = _build_agent(tmp_path, monkeypatch)
agent.event_callback = events.append
agent.objective = "Open Notepad, type a short to-do list, save it as todo-demo.txt in Documents"
agent.finish_likely_state["target_filename"] = agent._infer_target_filename(agent.objective)
agent.last_observed_window = {
"available": True,
"title": "Save as",
"class_name": "#32770",
}
agent.step = 24
window_result = agent._update_finish_likely_from_tool(
"get_active_window",
{},
{
"ok": True,
"window": {
"available": True,
"title": "todo-demo.txt - Notepad",
"class_name": "Notepad",
},
},
)
assert agent.finish_likely_state["active"] is False
assert [item["kind"] for item in window_result["completion_evidence"]] == [
"active_window_title_matches_target",
"save_dialog_closed_to_target_window",
]
agent.last_visual_signature = "stable-post-save"
agent.step = 25
command_result = agent._update_finish_likely_from_tool(
"execute_command",
{"command": "powershell -NoProfile -Command \"Test-Path ... todo-demo.txt\""},
{
"ok": True,
"exit_code": 0,
"stdout": r"C:\Users\paulw\Documents\todo-demo.txt",
},
)
assert agent.finish_likely_state["active"] is True
assert agent.finish_likely_state["summary"]
assert command_result["finish_likely"]["target_filename"] == "todo-demo.txt"
assert any(event["event_type"] == "completion_evidence" for event in events)
assert any(event["event_type"] == "finish_likely" for event in events)
def test_finish_likely_guard_blocks_reopening_menu_after_fresh_verification(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.objective = "Open Notepad, type a short to-do list, save it as todo-demo.txt in Documents"
agent.finish_likely_state.update(
{
"active": True,
"activated_at_step": 24,
"target_filename": "todo-demo.txt",
"summary": 'Save dialog closed and focus returned to "todo-demo.txt - Notepad". | Command verification confirms "todo-demo.txt" exists.',
"fresh_verification_done": False,
"verification_step": 0,
"post_completion_visual_signature": "",
}
)
agent.step = 25
verify_result = agent._dispatch_tool("see_screen", {})
assert verify_result["ok"] is True
assert verify_result["finish_likely_verification_done"] is True
assert agent.finish_likely_state["fresh_verification_done"] is True
blocked = agent._dispatch_tool("press_key", {"key": "alt+f"})
assert blocked["ok"] is False
assert blocked["blocked"] is True
assert blocked["blocked_reason"] == "finish_likely"
assert "appears satisfied" in blocked["error"]
assert "reopen menus" in blocked["hint"].lower()
def test_dispatch_rejects_unknown_and_disabled_tools(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.disabled_tools = {"scroll"}
assert agent._dispatch_tool("unknown_tool", {}) == {"ok": False, "error": "Unknown tool: unknown_tool"}
assert agent._dispatch_tool("scroll", {}) == {"ok": False, "error": "Tool 'scroll' is disabled for this job."}
def test_tool_schemas_filter_disabled_tools(tmp_path: Path, monkeypatch) -> None:
agent = _build_agent(tmp_path, monkeypatch)
agent.disabled_tools = {"scroll", "clipboard_get"}
tool_names = {tool["name"] for tool in agent._tool_schemas()}
assert "scroll" not in tool_names
assert "clipboard_get" not in tool_names
assert "click" in tool_names
assert "task_complete" in tool_names
def test_normalize_disabled_tools_rejects_invalid_and_required_names() -> None:
with pytest.raises(ValueError, match="Unknown disabled tool"):
agent_module.normalize_disabled_tools(["not_a_real_tool"])
with pytest.raises(ValueError, match="Cannot disable required tool"):
agent_module.normalize_disabled_tools(["task_complete"])