961 lines
36 KiB
Python
961 lines
36 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from PIL import Image
|
|
|
|
import src.agent as agent_module
|
|
from src.models import RunArtifacts, RuntimeOptions
|
|
|
|
|
|
class _DummyPyAutoGUI:
|
|
FAILSAFE = True
|
|
PAUSE = 0.0
|
|
|
|
def __init__(self) -> None:
|
|
self.last_move_to: tuple[int, int] | None = None
|
|
self.last_move_duration: float | None = None
|
|
self.last_click: dict[str, object] | None = None
|
|
self.last_hotkey: tuple[str, ...] | None = None
|
|
self.last_drag_to: dict[str, object] | None = None
|
|
self.last_scroll: int | None = None
|
|
self.current_position: tuple[int, int] = (640, 360)
|
|
|
|
def screenshot(self) -> Image.Image:
|
|
return Image.new("RGB", (1280, 720), color=(24, 24, 24))
|
|
|
|
def size(self) -> tuple[int, int]:
|
|
return (1280, 720)
|
|
|
|
def moveTo(self, x: int, y: int, duration: float = 0.0) -> None: # noqa: N802
|
|
self.last_move_to = (x, y)
|
|
self.last_move_duration = duration
|
|
self.current_position = (x, y)
|
|
|
|
def click(
|
|
self,
|
|
x: int,
|
|
y: int,
|
|
clicks: int = 1,
|
|
interval: float = 0.0,
|
|
button: str = "left",
|
|
) -> None:
|
|
self.last_click = {"x": x, "y": y, "clicks": clicks, "interval": interval, "button": button}
|
|
self.current_position = (x, y)
|
|
|
|
def dragTo(self, x: int, y: int, duration: float = 0.0, button: str = "left") -> None: # noqa: N802
|
|
self.last_drag_to = {"x": x, "y": y, "duration": duration, "button": button}
|
|
self.current_position = (x, y)
|
|
|
|
def scroll(self, amount: int) -> None:
|
|
self.last_scroll = amount
|
|
|
|
def write(self, _: str, interval: float = 0.0) -> None:
|
|
return None
|
|
|
|
def press(self, _: str) -> None:
|
|
return None
|
|
|
|
def hotkey(self, *keys: str) -> None:
|
|
self.last_hotkey = tuple(keys)
|
|
|
|
def position(self):
|
|
x, y = self.current_position
|
|
return type("Point", (), {"x": x, "y": y})()
|
|
|
|
|
|
def _build_agent(tmp_path: Path, monkeypatch) -> agent_module.ScreenJobAgent:
|
|
dummy_gui = _DummyPyAutoGUI()
|
|
monkeypatch.setattr(agent_module, "pyautogui", dummy_gui)
|
|
monkeypatch.setattr(agent_module.time, "sleep", lambda _: None)
|
|
|
|
run_dir = tmp_path / "run"
|
|
run_dir.mkdir(parents=True, exist_ok=True)
|
|
artifacts = RunArtifacts(
|
|
run_id="test_run",
|
|
root_dir=run_dir,
|
|
logs_dir=run_dir / "logs",
|
|
shots_dir=run_dir / "shots",
|
|
enhance_dir=run_dir / "enhance",
|
|
log_file=run_dir / "screenjob.log",
|
|
)
|
|
options = RuntimeOptions(model="gpt-5.4-mini")
|
|
logger = logging.getLogger("screenjob-test-agent")
|
|
return agent_module.ScreenJobAgent(
|
|
client=object(), # type: ignore[arg-type]
|
|
logger=logger,
|
|
artifacts=artifacts,
|
|
options=options,
|
|
)
|
|
|
|
|
|
def test_task_complete_captures_return_and_data(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
result = agent._tool_task_complete({"return": "Task completed successfully", "data": "file1\nfile2"})
|
|
assert result["ok"] is True
|
|
assert result["return"] == "Task completed successfully"
|
|
assert result["data"] == "file1\nfile2"
|
|
assert "verification" not in result
|
|
assert agent.final_result == "Task completed successfully"
|
|
assert agent.final_data == "file1\nfile2"
|
|
|
|
|
|
def test_click_supports_directional_offsets(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
click_result = agent._tool_click(
|
|
{
|
|
"coordinate": {"x": 100, "y": 100},
|
|
"offset_up": "2px",
|
|
"offset_right": 7,
|
|
"offset": {"x": 3, "y": 4},
|
|
"button": "right",
|
|
"click_count": 2,
|
|
"interval_seconds": "0.5s",
|
|
"duration_seconds": "0.2s",
|
|
"sleep_after_seconds": 0,
|
|
}
|
|
)
|
|
assert click_result["ok"] is True
|
|
assert click_result["clicked"] == {"x": 110, "y": 102}
|
|
assert click_result["button"] == "right"
|
|
assert click_result["click_count"] == 2
|
|
assert click_result["interval_seconds"] == 0.5
|
|
assert click_result["duration_seconds"] == 0.2
|
|
assert agent_module.pyautogui.last_click == {
|
|
"x": 110,
|
|
"y": 102,
|
|
"clicks": 2,
|
|
"interval": 0.5,
|
|
"button": "right",
|
|
}
|
|
|
|
|
|
def test_scroll_supports_direction_and_amount(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
result = agent._tool_scroll(
|
|
{
|
|
"amount": 8,
|
|
"direction": "down",
|
|
"coordinate": {"x": 1400, "y": -5},
|
|
"sleep_after_seconds": 0,
|
|
}
|
|
)
|
|
|
|
assert result["ok"] is True
|
|
assert result["amount"] == -8
|
|
assert result["direction"] == "down"
|
|
assert result["moved_to"] == {"x": 1279, "y": 0}
|
|
assert agent_module.pyautogui.last_scroll == -8
|
|
|
|
|
|
def test_drag_translates_coordinates_and_button(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
result = agent._tool_drag(
|
|
{
|
|
"start_coordinate": {"x": -10, "y": 100},
|
|
"end_coordinate": {"x": 1285, "y": 800},
|
|
"button": "middle",
|
|
"duration_seconds": "0.3s",
|
|
"sleep_after_seconds": 0,
|
|
}
|
|
)
|
|
|
|
assert result["ok"] is True
|
|
assert result["from"] == {"x": 0, "y": 100}
|
|
assert result["to"] == {"x": 1279, "y": 719}
|
|
assert result["button"] == "middle"
|
|
assert result["duration_seconds"] == 0.3
|
|
assert agent_module.pyautogui.last_drag_to == {
|
|
"x": 1279,
|
|
"y": 719,
|
|
"duration": 0.3,
|
|
"button": "middle",
|
|
}
|
|
|
|
|
|
def test_move_mouse_clamps_target_coordinate(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
result = agent._tool_move_mouse({"coordinate": {"x": 1500, "y": -5}, "duration_seconds": "0.4s"})
|
|
|
|
assert result["ok"] is True
|
|
assert result["moved_to"] == {"x": 1279, "y": 0}
|
|
assert result["duration_seconds"] == 0.4
|
|
assert agent_module.pyautogui.last_move_to == (1279, 0)
|
|
|
|
|
|
def test_clipboard_get_and_set_round_trip(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
state = {"text": ""}
|
|
monkeypatch.setattr(agent, "_clipboard_set_text", lambda text: state.__setitem__("text", text))
|
|
monkeypatch.setattr(agent, "_clipboard_get_text", lambda: state["text"])
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_clipboard_get_metadata",
|
|
lambda: {"has_text": bool(state["text"]), "has_image": True, "available_formats": ["CF_UNICODETEXT", "CF_DIB"]},
|
|
)
|
|
|
|
set_result = agent._tool_clipboard_set({"text": "hello clipboard"})
|
|
get_result = agent._tool_clipboard_get({})
|
|
|
|
assert set_result["ok"] is True
|
|
assert set_result["length"] == 15
|
|
assert get_result["ok"] is True
|
|
assert get_result["text"] == "hello clipboard"
|
|
assert get_result["length"] == 15
|
|
assert get_result["has_text"] is True
|
|
assert get_result["has_image"] is True
|
|
assert get_result["available_formats"] == ["CF_UNICODETEXT", "CF_DIB"]
|
|
|
|
|
|
def test_clipboard_set_falls_back_to_powershell_when_native_path_fails(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
state = {"text": ""}
|
|
|
|
def fail_native(_: str) -> None:
|
|
raise OSError("[WinError 6] The handle is invalid.")
|
|
|
|
def shell_fallback(text: str) -> None:
|
|
state["text"] = text
|
|
|
|
monkeypatch.setattr(agent, "_clipboard_set_text", fail_native)
|
|
monkeypatch.setattr(agent, "_clipboard_set_text_via_shell", shell_fallback)
|
|
|
|
result = agent._tool_clipboard_set({"text": "Example Domain"})
|
|
|
|
assert result["ok"] is True
|
|
assert result["used_shell_fallback"] is True
|
|
assert "WinError 6" in result["native_error"]
|
|
assert state["text"] == "Example Domain"
|
|
|
|
|
|
def test_get_cursor_position_returns_current_mouse_location(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent_module.pyautogui.current_position = (321, 654)
|
|
|
|
result = agent._tool_get_cursor_position({})
|
|
|
|
assert result["ok"] is True
|
|
assert result["position"] == {"x": 321, "y": 654}
|
|
|
|
|
|
def test_get_active_window_returns_metadata_shape(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_get_active_window_info",
|
|
lambda: {
|
|
"available": True,
|
|
"hwnd": 1234,
|
|
"title": "Settings",
|
|
"class_name": "ApplicationFrameWindow",
|
|
"thread_id": 44,
|
|
"process_id": 77,
|
|
"is_visible": True,
|
|
"rect": {"left": 10, "top": 20, "right": 410, "bottom": 320, "width": 400, "height": 300},
|
|
},
|
|
)
|
|
|
|
result = agent._tool_get_active_window({})
|
|
|
|
assert result["ok"] is True
|
|
assert result["window"]["title"] == "Settings"
|
|
assert result["window"]["rect"]["width"] == 400
|
|
|
|
|
|
def test_enhance_defaults_to_small_ui_preset(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
result = agent._tool_enhance({"coordinate": {"x": 100, "y": 120}})
|
|
|
|
assert result["ok"] is True
|
|
meta = result["meta"]
|
|
assert meta["region"] == "small"
|
|
assert meta["mode"] == "ui"
|
|
assert meta["scale"] == 4
|
|
assert Path(meta["path"]).exists()
|
|
assert meta["target_pixel"]["x"] >= 0
|
|
assert meta["target_pixel"]["y"] >= 0
|
|
|
|
|
|
def test_enhance_supports_text_mode_and_scale_clamp(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
result = agent._tool_enhance(
|
|
{
|
|
"coordinate": {"x": -99, "y": 9999},
|
|
"region": "medium",
|
|
"mode": "text",
|
|
"scale": 99,
|
|
}
|
|
)
|
|
|
|
assert result["ok"] is True
|
|
meta = result["meta"]
|
|
assert meta["region"] == "medium"
|
|
assert meta["mode"] == "text"
|
|
assert meta["scale"] == 6
|
|
assert meta["requested_coord"] == {"x": -99, "y": 9999}
|
|
assert meta["source_coord"] == {"x": 0, "y": 719}
|
|
assert Path(meta["path"]).exists()
|
|
|
|
|
|
def test_press_key_supports_hotkey_combo(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
result = agent._tool_press_key({"key": "meta+r"})
|
|
assert result["ok"] is True
|
|
assert result["key"] == "win+r"
|
|
assert result["message"] == "Key combo executed."
|
|
assert agent_module.pyautogui.last_hotkey == ("win", "r")
|
|
|
|
|
|
def test_press_key_blocks_prohibited_combo(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.options.prohibited_key_combos = {"ctrl+shift+s"}
|
|
agent.prohibited_key_combos = agent._normalize_prohibited_key_combos(agent.options.prohibited_key_combos)
|
|
|
|
result = agent._tool_press_key({"key": "ctrl+shift+s"})
|
|
|
|
assert result["ok"] is False
|
|
assert result["blocked"] is True
|
|
assert result["key"] == "ctrl+shift+s"
|
|
assert "prohibited by runtime configuration" in result["error"]
|
|
assert "another allowed route" in result["hint"]
|
|
|
|
|
|
def test_press_key_blocks_prohibited_combo_after_alias_normalization(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.options.prohibited_key_combos = {"meta+r"}
|
|
agent.prohibited_key_combos = agent._normalize_prohibited_key_combos(agent.options.prohibited_key_combos)
|
|
|
|
result = agent._tool_press_key({"key": "win+r"})
|
|
|
|
assert result["ok"] is False
|
|
assert result["blocked"] is True
|
|
assert result["key"] == "win+r"
|
|
|
|
|
|
def test_context_compaction_trigger_and_payload(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.objective = "Open settings app"
|
|
agent.previous_response_id = "resp_123"
|
|
agent.step = 4
|
|
agent.last_context_compact_step = 0
|
|
agent.options.screen_context_decay_steps = 4
|
|
agent.recent_tool_summaries = ["step=1 tool=see_screen status=ok"]
|
|
agent.last_screen_data_url = "data:image/png;base64,abc"
|
|
agent.last_screen_meta = {"width": 1280, "height": 720, "path": "C:/tmp/frame.png"}
|
|
|
|
assert agent._should_compact_context() is True
|
|
visual_message = agent._build_visual_message("Current screen", "data:image/png;base64,abc", agent.last_screen_meta)
|
|
agent._register_visual_context_message(visual_message, agent.last_screen_meta, tool_name="see_screen")
|
|
compacted = agent._build_compacted_pending_input("decay")
|
|
assert len(compacted) == 2
|
|
assert "Context compaction activated due to stale context decay." in compacted[0]["content"][0]["text"]
|
|
assert "Open settings app" in compacted[0]["content"][0]["text"]
|
|
assert "Treat prior reasoning as stale" in compacted[0]["content"][0]["text"]
|
|
assert "Retained visual observations:" in compacted[0]["content"][0]["text"]
|
|
assert "do not call see_screen again only because compaction happened" in compacted[0]["content"][0]["text"]
|
|
assert "observe -> decide -> act -> verify" in compacted[0]["content"][0]["text"]
|
|
|
|
|
|
def test_context_compaction_drops_function_call_outputs_from_rebased_input(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.objective = "Open settings app"
|
|
visual_meta = {"path": "C:/tmp/frame.png"}
|
|
visual_message = agent._build_visual_message("Current screen", "data:image/png;base64,abc", visual_meta)
|
|
agent._register_visual_context_message(visual_message, visual_meta, tool_name="see_screen")
|
|
|
|
compacted = agent._build_compacted_pending_input(
|
|
"decay",
|
|
carryover_items=[
|
|
{"type": "function_call_output", "call_id": "call_123", "output": "{\"ok\": true}"},
|
|
{"role": "user", "content": [{"type": "input_text", "text": "blocked hint"}]},
|
|
],
|
|
)
|
|
|
|
assert len(compacted) == 3
|
|
assert compacted[1]["role"] == "user"
|
|
assert compacted[1]["content"][0]["text"] == "blocked hint"
|
|
assert all(item.get("type") != "function_call_output" for item in compacted)
|
|
|
|
|
|
def test_visual_context_budget_keeps_only_latest_three_images(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.options.max_visual_context_images = 3
|
|
|
|
captured_times = [
|
|
"2026-05-30T10:00:03+00:00",
|
|
"2026-05-30T10:00:01+00:00",
|
|
"2026-05-30T10:00:04+00:00",
|
|
"2026-05-30T10:00:02+00:00",
|
|
]
|
|
for idx, captured_at in enumerate(captured_times):
|
|
meta = {"path": f"C:/tmp/frame_{idx}.png", "captured_at": captured_at}
|
|
message = agent._build_visual_message(f"frame {idx}", f"data:image/png;base64,{idx}", meta)
|
|
agent._register_visual_context_message(message, meta, tool_name="see_screen")
|
|
|
|
assert agent.visual_context_overflow_pending is True
|
|
assert [entry["meta"]["path"] for entry in agent.visual_context_messages] == [
|
|
"C:/tmp/frame_3.png",
|
|
"C:/tmp/frame_0.png",
|
|
"C:/tmp/frame_2.png",
|
|
]
|
|
|
|
|
|
def test_compacted_input_uses_latest_visuals_by_capture_time(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.options.max_visual_context_images = 3
|
|
agent.objective = "Verify the current app window"
|
|
|
|
for idx, captured_at in enumerate(
|
|
[
|
|
"2026-05-30T10:00:04+00:00",
|
|
"2026-05-30T10:00:01+00:00",
|
|
"2026-05-30T10:00:03+00:00",
|
|
"2026-05-30T10:00:02+00:00",
|
|
]
|
|
):
|
|
meta = {"path": f"C:/tmp/frame_{idx}.png", "captured_at": captured_at}
|
|
message = agent._build_visual_message(f"frame {idx}", f"data:image/png;base64,{idx}", meta)
|
|
agent._register_visual_context_message(message, meta, tool_name="see_screen")
|
|
|
|
compacted = agent._build_compacted_pending_input("visual_budget")
|
|
visual_messages = [
|
|
item
|
|
for item in compacted
|
|
if isinstance(item.get("content"), list)
|
|
and any(part.get("type") == "input_image" for part in item["content"] if isinstance(part, dict))
|
|
]
|
|
|
|
assert len(visual_messages) == 3
|
|
assert [
|
|
json.loads(message["content"][0]["text"].split("Metadata: ", 1)[1].split("\n", 1)[0])["path"]
|
|
for message in visual_messages
|
|
] == [
|
|
"C:/tmp/frame_3.png",
|
|
"C:/tmp/frame_2.png",
|
|
"C:/tmp/frame_0.png",
|
|
]
|
|
|
|
|
|
def test_context_compaction_event_includes_visual_budget_reason_and_paths(tmp_path: Path, monkeypatch) -> None:
|
|
events: list[dict[str, object]] = []
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.event_callback = events.append
|
|
agent.step = 5
|
|
agent.recent_tool_summaries = ["step=4 tool=enhance status=ok"]
|
|
agent.visual_context_messages = [
|
|
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/1.png"}},
|
|
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/2.png"}},
|
|
{"message": {"role": "user", "content": []}, "meta": {"path": "C:/tmp/3.png"}},
|
|
]
|
|
|
|
agent._emit_context_compacted("visual_budget")
|
|
|
|
assert events[-1]["event_type"] == "context_compacted"
|
|
payload = events[-1]["payload"]
|
|
assert payload["rebuild_reason"] == "visual_budget"
|
|
assert payload["visual_context_paths"] == ["C:/tmp/1.png", "C:/tmp/2.png", "C:/tmp/3.png"]
|
|
|
|
|
|
def test_observation_loop_blocks_repeated_broad_reobservation(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.step_history = [
|
|
{
|
|
"step": 21,
|
|
"tool_names": ["get_active_window", "see_screen"],
|
|
"window_signature": "123|#32770|Save as",
|
|
"window_summary": "Save as [#32770]",
|
|
"had_visual": True,
|
|
},
|
|
{
|
|
"step": 22,
|
|
"tool_names": ["get_active_window", "see_screen"],
|
|
"window_signature": "123|#32770|Save as",
|
|
"window_summary": "Save as [#32770]",
|
|
"had_visual": True,
|
|
},
|
|
{
|
|
"step": 23,
|
|
"tool_names": ["get_active_window", "see_screen"],
|
|
"window_signature": "123|#32770|Save as",
|
|
"window_summary": "Save as [#32770]",
|
|
"had_visual": True,
|
|
},
|
|
]
|
|
|
|
blocked = agent._dispatch_tool("see_screen", {})
|
|
|
|
assert blocked["ok"] is False
|
|
assert blocked["blocked"] is True
|
|
assert blocked["blocked_reason"] == "observation_loop"
|
|
assert "unchanged foreground window" in blocked["error"]
|
|
assert blocked["window_summary"] == "Save as [#32770]"
|
|
|
|
|
|
def test_repeated_ambiguous_action_requires_verification_and_then_blocks(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
type_args = {"text": "repeat me"}
|
|
|
|
first = agent._dispatch_tool("type", type_args)
|
|
assert first["ok"] is True
|
|
assert first["verification_required"] is True
|
|
assert first["verification_channels"] == ["enhance", "get_active_window", "see_screen"]
|
|
|
|
blocked_without_verification = agent._dispatch_tool("type", type_args)
|
|
assert blocked_without_verification["blocked"] is True
|
|
assert "see_screen" in blocked_without_verification["error"]
|
|
|
|
assert agent._dispatch_tool("see_screen", {})["ok"] is True
|
|
assert agent._dispatch_tool("type", type_args)["ok"] is True
|
|
assert agent._dispatch_tool("see_screen", {})["ok"] is True
|
|
assert agent._dispatch_tool("type", type_args)["ok"] is True
|
|
assert agent._dispatch_tool("see_screen", {})["ok"] is True
|
|
|
|
blocked_after_retry_budget = agent._dispatch_tool("type", type_args)
|
|
assert blocked_after_retry_budget["blocked"] is True
|
|
assert "3 time(s) on the same surface" in blocked_after_retry_budget["error"]
|
|
|
|
assert agent._dispatch_tool("see_screen", {})["ok"] is True
|
|
reset_attempt = agent._dispatch_tool("type", type_args)
|
|
assert reset_attempt["ok"] is True
|
|
|
|
|
|
def test_copy_shortcut_prefers_clipboard_verification(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_clipboard_get_metadata",
|
|
lambda: {"has_text": True, "has_image": False, "available_formats": ["CF_UNICODETEXT"]},
|
|
)
|
|
monkeypatch.setattr(agent, "_clipboard_get_text", lambda: "copied")
|
|
|
|
first = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
|
|
assert first["ok"] is True
|
|
assert first["verification_channels"] == ["clipboard_get"]
|
|
|
|
blocked = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
|
|
assert blocked["blocked"] is True
|
|
assert "clipboard_get" in blocked["error"]
|
|
|
|
observed = agent._dispatch_tool("clipboard_get", {})
|
|
assert observed["ok"] is True
|
|
assert observed["has_text"] is True
|
|
|
|
second = agent._dispatch_tool("press_key", {"key": "ctrl+c"})
|
|
assert second["ok"] is True
|
|
|
|
|
|
def test_execute_command_blocks_unrequested_recursive_file_search(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.objective = "Save the current note in Notepad"
|
|
|
|
result = agent._tool_execute_command({"command": "Get-ChildItem -Recurse -Filter *.txt"})
|
|
|
|
assert result["ok"] is False
|
|
assert result["blocked"] is True
|
|
assert "out of scope" in result["error"]
|
|
|
|
|
|
def test_execute_command_allows_recursive_file_search_when_objective_requests_it(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.objective = "Find the saved text file path"
|
|
|
|
called: dict[str, Any] = {}
|
|
|
|
class _FakeProcess:
|
|
returncode = 0
|
|
|
|
def poll(self) -> int:
|
|
return 0
|
|
|
|
def communicate(self, timeout: int = 2):
|
|
return ("ok", "")
|
|
|
|
def fake_popen(*args, **kwargs):
|
|
called["command"] = args[0]
|
|
return _FakeProcess()
|
|
|
|
monkeypatch.setattr(agent_module.subprocess, "Popen", fake_popen)
|
|
|
|
result = agent._tool_execute_command({"command": "Get-ChildItem -Recurse -Filter *.txt"})
|
|
|
|
assert result["ok"] is True
|
|
assert called["command"] == "Get-ChildItem -Recurse -Filter *.txt"
|
|
|
|
|
|
def test_execute_command_launch_requires_focus_verification(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
called: dict[str, Any] = {}
|
|
|
|
class _FakeProcess:
|
|
returncode = 0
|
|
|
|
def poll(self) -> int:
|
|
return 0
|
|
|
|
def communicate(self, timeout: int = 2):
|
|
return ("", "")
|
|
|
|
def fake_popen(*args, **kwargs):
|
|
called["command"] = args[0]
|
|
return _FakeProcess()
|
|
|
|
monkeypatch.setattr(agent_module.subprocess, "Popen", fake_popen)
|
|
|
|
first = agent._dispatch_tool("execute_command", {"command": "start notepad"})
|
|
|
|
assert first["ok"] is True
|
|
assert first["background_launch_assumed"] is True
|
|
assert first["focus_change_assumed"] is False
|
|
assert first["verification_required"] is True
|
|
assert first["verification_channels"] == ["get_active_window", "see_screen"]
|
|
assert called["command"] == "start notepad"
|
|
|
|
blocked = agent._dispatch_tool("execute_command", {"command": "start notepad"})
|
|
assert blocked["blocked"] is True
|
|
assert "get_active_window" in blocked["error"]
|
|
|
|
observed = agent._dispatch_tool("get_active_window", {})
|
|
assert observed["ok"] is True
|
|
|
|
second = agent._dispatch_tool("execute_command", {"command": "start notepad"})
|
|
assert second["ok"] is True
|
|
|
|
|
|
def test_system_prompt_emphasizes_situational_awareness() -> None:
|
|
prompt = agent_module.SYSTEM_PROMPT
|
|
|
|
assert "Maintain a live mental model" in prompt
|
|
assert "classify -> choose control channel -> execute one meaningful transition -> verify" in prompt
|
|
assert "First classify, then act." in prompt
|
|
assert "Use see_screen at a balanced cadence" in prompt
|
|
assert "get_active_window" in prompt
|
|
assert "detect_dialog" in prompt
|
|
assert "dialog_set_filename" in prompt
|
|
assert "list_ui_elements" in prompt
|
|
assert "clipboard_get" in prompt
|
|
assert "Do not invent new subgoals" in prompt
|
|
assert "verify-and-finish" in prompt
|
|
assert "data.observed_result" in prompt
|
|
assert "Treat command-launched apps or URLs as background" in prompt
|
|
assert "#32770" in prompt
|
|
assert "secure desktop" in prompt.lower()
|
|
|
|
|
|
def test_observation_loop_prompt_pushes_action_or_finish() -> None:
|
|
prompt = agent_module.build_observation_loop_prompt("Save as [#32770]", repeated_steps=3)
|
|
|
|
assert "same stable window for 3 step(s)" in prompt
|
|
assert "Save as [#32770]" in prompt
|
|
assert "Do not keep calling broad observation tools" in prompt
|
|
assert "native window/dialog/element tool" in prompt
|
|
assert "Use enhance only if a small or text-heavy control must be read before acting." in prompt
|
|
assert "#32770 dialog" in prompt
|
|
|
|
|
|
def test_finish_likely_prompt_pushes_verification_then_completion() -> None:
|
|
prompt = agent_module.build_finish_likely_prompt(
|
|
'Save dialog closed and focus returned to "todo-demo.txt - Notepad". | Command verification confirms "todo-demo.txt" exists.',
|
|
prohibited_key_combos={"ctrl+shift+s"},
|
|
)
|
|
|
|
assert "objective is likely already satisfied" in prompt
|
|
assert "todo-demo.txt - Notepad" in prompt
|
|
assert "call see_screen" in prompt
|
|
assert "then call task_complete" in prompt
|
|
assert "Do not reopen menus" in prompt
|
|
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
|
|
|
|
|
|
def test_initial_action_prompt_reinforces_observation_and_verification() -> None:
|
|
prompt = agent_module.build_initial_action_prompt("Open calculator", {"ctrl+shift+s"})
|
|
|
|
assert "JOB: Open calculator" in prompt
|
|
assert "First classify the current UI state from the latest evidence." in prompt
|
|
assert "Identify what changed since the last action or screen capture." in prompt
|
|
assert "classify -> choose control channel -> execute one meaningful transition -> verify" in prompt
|
|
assert "Prefer native window/dialog/element tools" in prompt
|
|
assert "get_active_window plus detect_dialog" in prompt
|
|
assert "click then see_screen" in prompt
|
|
assert "Do not invent new subgoals" in prompt
|
|
assert "Prefer non-visual verification when available" in prompt
|
|
assert "wait_for_focus_change" in prompt
|
|
assert "#32770 dialogs" in prompt
|
|
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
|
|
assert "do not re-capture the screen just to reconfirm an obvious large input area" in prompt
|
|
assert 'task_complete(return=..., data={"observed_result": ...})' in prompt
|
|
|
|
|
|
def test_no_tool_prompt_recovers_by_reobserving() -> None:
|
|
prompt = agent_module.build_no_tool_prompt({"ctrl+shift+s"})
|
|
|
|
assert "Recover by re-observing the current desktop state instead of guessing." in prompt
|
|
assert "Start by classifying the surface." in prompt
|
|
assert "get_active_window" in prompt
|
|
assert "detect_dialog" in prompt
|
|
assert "clipboard_get" in prompt
|
|
assert "native window/dialog/element tools" in prompt
|
|
assert "Do not assume execute_command launches changed the foreground window" in prompt
|
|
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
|
|
assert "If a modal, picker, or browser download/upload surface is likely" in prompt
|
|
|
|
|
|
def test_blocked_action_prompt_reanchors_on_screen_state() -> None:
|
|
prompt = agent_module.build_blocked_action_prompt("click", prohibited_key_combos={"ctrl+shift+s"})
|
|
|
|
assert "The last action using click was blocked or unreliable." in prompt
|
|
assert "Do not retry blindly." in prompt
|
|
assert "classify the current surface" in prompt
|
|
assert "detect_dialog" in prompt
|
|
assert "dialog_set_filename" in prompt
|
|
assert "get_active_window" in prompt
|
|
assert "get_cursor_position before move_mouse or drag" in prompt
|
|
assert "wait_for_focus_change" in prompt
|
|
assert "secure desktop or UAC" in prompt
|
|
assert "Switch strategy after the fresh classification" in prompt
|
|
assert "Prohibited key combos for this run: ctrl+shift+s." in prompt
|
|
assert "native control instead of pixels" in prompt
|
|
|
|
|
|
def test_tool_schemas_include_completion_and_desktop_awareness_guidance(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.prohibited_key_combos = {"ctrl+shift+s"}
|
|
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
|
|
|
|
assert "data.observed_result" in schemas["task_complete"]["description"]
|
|
assert "before task_complete" in schemas["see_screen"]["description"]
|
|
assert "text-heavy targets" in schemas["enhance"]["description"]
|
|
assert "verify copy or cut results" in schemas["clipboard_get"]["description"]
|
|
assert "pointer state matters" in schemas["get_cursor_position"]["description"]
|
|
assert "verify focus and active app" in schemas["get_active_window"]["description"]
|
|
assert "foreground focus" in schemas["execute_command"]["description"]
|
|
assert "Prohibited for this run: ctrl+shift+s." in schemas["press_key"]["description"]
|
|
assert "dialog classification" in schemas["get_active_window"]["description"]
|
|
assert "visible top-level windows" in schemas["list_windows"]["description"]
|
|
assert "#32770 or picker surface" in schemas["detect_dialog"]["description"]
|
|
assert "filename or path field" in schemas["dialog_set_filename"]["description"]
|
|
assert "native child controls" in schemas["list_ui_elements"]["description"]
|
|
|
|
|
|
def test_tool_schemas_hide_optional_native_tools_when_mode_off(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.options.native_automation_mode = "off"
|
|
|
|
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
|
|
|
|
assert "get_active_window" in schemas
|
|
assert "list_windows" not in schemas
|
|
assert "detect_dialog" not in schemas
|
|
assert "list_ui_elements" not in schemas
|
|
|
|
|
|
def test_tool_schemas_hide_windows_only_tools_on_non_windows_host(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
monkeypatch.setattr(agent_module.sys, "platform", "linux")
|
|
|
|
schemas = {tool["name"]: tool for tool in agent._tool_schemas()}
|
|
|
|
assert "get_active_window" not in schemas
|
|
assert "list_windows" not in schemas
|
|
assert "detect_dialog" not in schemas
|
|
assert "list_ui_elements" not in schemas
|
|
|
|
result = agent._dispatch_tool("get_active_window", {})
|
|
|
|
assert result["ok"] is False
|
|
assert result["error"] == "Tool 'get_active_window' is only available on Windows."
|
|
|
|
|
|
def test_list_windows_returns_structured_surface_metadata(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_list_windows_info",
|
|
lambda visible_only=True: [
|
|
{
|
|
"available": True,
|
|
"hwnd": 111,
|
|
"title": "Open",
|
|
"class_name": "#32770",
|
|
"executable_name": "notepad.exe",
|
|
"surface_kind": "file_dialog",
|
|
"dialog_kind": "file_open",
|
|
}
|
|
],
|
|
)
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_get_active_window_info",
|
|
lambda: {
|
|
"available": True,
|
|
"hwnd": 111,
|
|
"title": "Open",
|
|
"class_name": "#32770",
|
|
"executable_name": "notepad.exe",
|
|
},
|
|
)
|
|
|
|
result = agent._tool_list_windows({})
|
|
|
|
assert result["ok"] is True
|
|
assert result["count"] == 1
|
|
assert result["surface_kind"] == "file_dialog"
|
|
assert result["dialog_kind"] == "file_open"
|
|
assert result["recommended_next_tools"][0] == "dialog_set_filename"
|
|
|
|
|
|
def test_detect_dialog_returns_buttons_and_target_handle(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_find_dialog_info",
|
|
lambda title_contains="": {
|
|
"available": True,
|
|
"hwnd": 222,
|
|
"title": "Save as",
|
|
"class_name": "#32770",
|
|
"executable_name": "notepad.exe",
|
|
},
|
|
)
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_get_active_window_info",
|
|
lambda: {
|
|
"available": True,
|
|
"hwnd": 222,
|
|
"title": "Save as",
|
|
"class_name": "#32770",
|
|
"executable_name": "notepad.exe",
|
|
},
|
|
)
|
|
monkeypatch.setattr(
|
|
agent,
|
|
"_list_ui_elements_for_window",
|
|
lambda hwnd, include_hidden=False: [
|
|
{
|
|
"handle": 10,
|
|
"role": "button",
|
|
"text": "Save",
|
|
"target": {"type": "ui_element", "handle": 10, "window_handle": hwnd},
|
|
}
|
|
],
|
|
)
|
|
|
|
result = agent._tool_detect_dialog({})
|
|
|
|
assert result["ok"] is True
|
|
assert result["dialog_kind"] == "file_save"
|
|
assert result["target"]["type"] == "dialog"
|
|
assert result["buttons"][0]["text"] == "Save"
|
|
|
|
|
|
def test_notepad_save_pattern_enters_finish_likely_mode(tmp_path: Path, monkeypatch) -> None:
|
|
events: list[dict[str, object]] = []
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.event_callback = events.append
|
|
agent.objective = "Open Notepad, type a short to-do list, save it as todo-demo.txt in Documents"
|
|
agent.finish_likely_state["target_filename"] = agent._infer_target_filename(agent.objective)
|
|
agent.last_observed_window = {
|
|
"available": True,
|
|
"title": "Save as",
|
|
"class_name": "#32770",
|
|
}
|
|
|
|
agent.step = 24
|
|
window_result = agent._update_finish_likely_from_tool(
|
|
"get_active_window",
|
|
{},
|
|
{
|
|
"ok": True,
|
|
"window": {
|
|
"available": True,
|
|
"title": "todo-demo.txt - Notepad",
|
|
"class_name": "Notepad",
|
|
},
|
|
},
|
|
)
|
|
|
|
assert agent.finish_likely_state["active"] is False
|
|
assert [item["kind"] for item in window_result["completion_evidence"]] == [
|
|
"active_window_title_matches_target",
|
|
"save_dialog_closed_to_target_window",
|
|
]
|
|
|
|
agent.last_visual_signature = "stable-post-save"
|
|
agent.step = 25
|
|
command_result = agent._update_finish_likely_from_tool(
|
|
"execute_command",
|
|
{"command": "powershell -NoProfile -Command \"Test-Path ... todo-demo.txt\""},
|
|
{
|
|
"ok": True,
|
|
"exit_code": 0,
|
|
"stdout": r"C:\Users\paulw\Documents\todo-demo.txt",
|
|
},
|
|
)
|
|
|
|
assert agent.finish_likely_state["active"] is True
|
|
assert agent.finish_likely_state["summary"]
|
|
assert command_result["finish_likely"]["target_filename"] == "todo-demo.txt"
|
|
assert any(event["event_type"] == "completion_evidence" for event in events)
|
|
assert any(event["event_type"] == "finish_likely" for event in events)
|
|
|
|
|
|
def test_finish_likely_guard_blocks_reopening_menu_after_fresh_verification(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.objective = "Open Notepad, type a short to-do list, save it as todo-demo.txt in Documents"
|
|
agent.finish_likely_state.update(
|
|
{
|
|
"active": True,
|
|
"activated_at_step": 24,
|
|
"target_filename": "todo-demo.txt",
|
|
"summary": 'Save dialog closed and focus returned to "todo-demo.txt - Notepad". | Command verification confirms "todo-demo.txt" exists.',
|
|
"fresh_verification_done": False,
|
|
"verification_step": 0,
|
|
"post_completion_visual_signature": "",
|
|
}
|
|
)
|
|
|
|
agent.step = 25
|
|
verify_result = agent._dispatch_tool("see_screen", {})
|
|
assert verify_result["ok"] is True
|
|
assert verify_result["finish_likely_verification_done"] is True
|
|
assert agent.finish_likely_state["fresh_verification_done"] is True
|
|
|
|
blocked = agent._dispatch_tool("press_key", {"key": "alt+f"})
|
|
assert blocked["ok"] is False
|
|
assert blocked["blocked"] is True
|
|
assert blocked["blocked_reason"] == "finish_likely"
|
|
assert "appears satisfied" in blocked["error"]
|
|
assert "reopen menus" in blocked["hint"].lower()
|
|
|
|
|
|
def test_dispatch_rejects_unknown_and_disabled_tools(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.disabled_tools = {"scroll"}
|
|
|
|
assert agent._dispatch_tool("unknown_tool", {}) == {"ok": False, "error": "Unknown tool: unknown_tool"}
|
|
assert agent._dispatch_tool("scroll", {}) == {"ok": False, "error": "Tool 'scroll' is disabled for this job."}
|
|
|
|
|
|
def test_tool_schemas_filter_disabled_tools(tmp_path: Path, monkeypatch) -> None:
|
|
agent = _build_agent(tmp_path, monkeypatch)
|
|
agent.disabled_tools = {"scroll", "clipboard_get"}
|
|
|
|
tool_names = {tool["name"] for tool in agent._tool_schemas()}
|
|
|
|
assert "scroll" not in tool_names
|
|
assert "clipboard_get" not in tool_names
|
|
assert "click" in tool_names
|
|
assert "task_complete" in tool_names
|
|
|
|
|
|
def test_normalize_disabled_tools_rejects_invalid_and_required_names() -> None:
|
|
with pytest.raises(ValueError, match="Unknown disabled tool"):
|
|
agent_module.normalize_disabled_tools(["not_a_real_tool"])
|
|
|
|
with pytest.raises(ValueError, match="Cannot disable required tool"):
|
|
agent_module.normalize_disabled_tools(["task_complete"])
|