From 84b0df520c65f7b10074d1bee7856cb43105c8b7 Mon Sep 17 00:00:00 2001 From: Space-Banane Date: Wed, 27 May 2026 17:31:49 +0200 Subject: [PATCH] chore: initialize screenjob project baseline --- .gitignore | 22 ++ README.md | 94 ++++++++ main.py | 5 + screenjob.py | 12 + src/__init__.py | 1 + src/agent.py | 625 ++++++++++++++++++++++++++++++++++++++++++++++++ src/cli.py | 151 ++++++++++++ src/models.py | 24 ++ src/utils.py | 111 +++++++++ 9 files changed, 1045 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 main.py create mode 100644 screenjob.py create mode 100644 src/__init__.py create mode 100644 src/agent.py create mode 100644 src/cli.py create mode 100644 src/models.py create mode 100644 src/utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2c46fa --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd + +# Virtual envs +.venv/ +venv/ +env/ + +# Secrets +.env + +# Runtime artifacts +screenjob_runs/ +result.json + +# IDE +.vscode/ +.idea/ + diff --git a/README.md b/README.md new file mode 100644 index 0000000..b98ea69 --- /dev/null +++ b/README.md @@ -0,0 +1,94 @@ +# ScreenJob + +Single-file behavior, split into maintainable modules under `src/`. + +## Entry point + +- Primary: `python main.py ""` +- Backward compatible: `python screenjob.py ""` + +## Install + +```powershell +pip install openai pillow pyautogui python-dotenv +``` + +## Configure + +Create a `.env` file in project root: + +```env +OPENAI_API_KEY=your_key_here +``` + +## Usage + +```powershell +python main.py "Open amazon.de and go to my orders" +``` + +Optional flags: + +```powershell +python main.py "Open amazon.de" --model gpt-5.2 --max-steps 80 +``` + +## Tools exposed to the model + +- `execute_command(command)` +- `sleep(seconds)` (replaces shell-based sleep calls) +- `see_screen()` +- `enhance(coordinate)` +- `click(coordinate, offset_up/down/left/right, sleep_after_seconds)` +- `type(text)` +- `press_key(key, repeats=1)` +- `task_complete(result)` + +### Offset examples + +- `{"coordinate":{"x":1000,"y":500},"offset_up":"2px"}` +- `{"coordinate":{"x":1000,"y":500},"offset_right":4}` + +### Multi-tool calls in one step + +The agent supports multiple tool calls in a single model response and executes them in order. +Example sequence in one step: + +1. `click(...)` +2. `sleep({"seconds": 1.5})` + +You can also use `click(..., sleep_after_seconds=1.5)` for a one-call variant. + +## Output + +Each run creates: + +- `screenjob_runs/run_YYYYMMDD_HHMMSS/logs/screenjob.log` +- `screenjob_runs/run_YYYYMMDD_HHMMSS/screens/*.png` +- `screenjob_runs/run_YYYYMMDD_HHMMSS/enhanced/*.png` + +Final stdout is JSON: + +```json +{ + "completed": true, + "result": "...", + "steps": 13, + "elapsed_seconds": 59.691, + "artifacts_dir": "C:\\...\\screenjob_runs\\run_..." +} +``` + +## Project layout + +```text +main.py +screenjob.py +src/ + __init__.py + cli.py + agent.py + models.py + utils.py +``` + diff --git a/main.py b/main.py new file mode 100644 index 0000000..69c5bfc --- /dev/null +++ b/main.py @@ -0,0 +1,5 @@ +from src.cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/screenjob.py b/screenjob.py new file mode 100644 index 0000000..748ade4 --- /dev/null +++ b/screenjob.py @@ -0,0 +1,12 @@ +""" +Compatibility wrapper. + +Preferred entry point: + python main.py "" +""" + +from src.cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..6e6874d --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +# Root package marker for local imports like: from src.cli import main diff --git a/src/agent.py b/src/agent.py new file mode 100644 index 0000000..28694da --- /dev/null +++ b/src/agent.py @@ -0,0 +1,625 @@ +from __future__ import annotations + +import json +import logging +import subprocess +import time +import traceback +from typing import Any + +from openai import OpenAI +from PIL import Image, ImageEnhance, ImageFilter, ImageOps + +from .models import AgentResult, RunArtifacts +from .utils import clamp, draw_global_grid, image_to_data_url, utc_now_iso + +try: + import pyautogui +except Exception as import_exc: + raise RuntimeError( + "pyautogui is required. Install dependencies with: pip install pyautogui pillow" + ) from import_exc + + +SYSTEM_PROMPT = """ +You are ScreenJob, an autonomous desktop-and-terminal task executor. + +Rules: +1) Use tools to act. Do not claim actions without tool calls. +2) Prefer execute_command for deterministic actions: + - opening URLs/websites (Windows: start https://amazon.de) + - launching apps or running terminal checks +3) For UI tasks, inspect with see_screen before clicking/typing. +4) Coordinates are absolute screen pixels (x, y) from top-left. +5) Use enhance(coordinate) when text/UI is unclear. +6) For keyboard-heavy interactions, prefer press_key for special keys. +7) You may call multiple tools in one step. If needed, do click then sleep. +8) Never spam repeated clicks on the same coordinate; switch strategy. +9) Keep tool arguments valid JSON and concise. +10) When objective is fully complete, call task_complete(result="..."). +""" + + +class ScreenJobAgent: + def __init__( + self, + client: OpenAI, + logger: logging.Logger, + artifacts: RunArtifacts, + model: str, + max_steps: int, + command_timeout: int, + type_interval: float, + click_pause: float, + ) -> None: + self.client = client + self.logger = logger + self.artifacts = artifacts + self.model = model + self.max_steps = max_steps + self.command_timeout = command_timeout + self.type_interval = type_interval + self.click_pause = click_pause + + self.step = 0 + self.completed = False + self.final_result = "" + self.previous_response_id: str | None = None + + self.last_screen_data_url: str | None = None + self.last_screen_meta: dict[str, Any] | None = None + self.click_history: list[tuple[int, int, float]] = [] + + def _tool_schemas(self) -> list[dict[str, Any]]: + return [ + { + "type": "function", + "name": "task_complete", + "description": "Call this when the job objective is fully done.", + "parameters": { + "type": "object", + "properties": { + "result": {"type": "string"}, + }, + "required": ["result"], + "additionalProperties": False, + }, + }, + { + "type": "function", + "name": "execute_command", + "description": ( + "Run a shell command and return stdout/stderr/exit code. " + "Prefer this for deterministic operations like opening URLs." + ), + "parameters": { + "type": "object", + "properties": { + "command": {"type": "string"}, + }, + "required": ["command"], + "additionalProperties": False, + }, + }, + { + "type": "function", + "name": "sleep", + "description": ( + "Pause execution for a short time. " + "Use this instead of shell sleep commands." + ), + "parameters": { + "type": "object", + "properties": { + "seconds": {"type": ["number", "string"]}, + }, + "required": ["seconds"], + "additionalProperties": False, + }, + }, + { + "type": "function", + "name": "see_screen", + "description": "Capture full screen with coordinate grid overlay.", + "parameters": { + "type": "object", + "properties": {}, + "additionalProperties": False, + }, + }, + { + "type": "function", + "name": "enhance", + "description": "Create enhanced zoom around a coordinate for readability.", + "parameters": { + "type": "object", + "properties": { + "coordinate": { + "type": "object", + "properties": { + "x": {"type": "integer"}, + "y": {"type": "integer"}, + }, + "required": ["x", "y"], + "additionalProperties": False, + } + }, + "required": ["coordinate"], + "additionalProperties": False, + }, + }, + { + "type": "function", + "name": "type", + "description": "Type literal text into the active focused element.", + "parameters": { + "type": "object", + "properties": {"text": {"type": "string"}}, + "required": ["text"], + "additionalProperties": False, + }, + }, + { + "type": "function", + "name": "press_key", + "description": "Press a specific key (enter, tab, esc, arrows, etc).", + "parameters": { + "type": "object", + "properties": { + "key": {"type": "string"}, + "repeats": {"type": "integer", "minimum": 1}, + }, + "required": ["key"], + "additionalProperties": False, + }, + }, + { + "type": "function", + "name": "click", + "description": ( + "Click absolute screen coordinate with simple directional offsets. " + "Use offset_up/down/left/right values like 2 or '2px'. " + "Optional sleep_after_seconds performs a pause immediately after click." + ), + "parameters": { + "type": "object", + "properties": { + "coordinate": { + "type": "object", + "properties": { + "x": {"type": "integer"}, + "y": {"type": "integer"}, + }, + "required": ["x", "y"], + "additionalProperties": False, + }, + "offset": { + "type": "object", + "properties": { + "x": {"type": "integer"}, + "y": {"type": "integer"}, + }, + "required": [], + "additionalProperties": False, + }, + "offset_up": {"type": ["integer", "string"]}, + "offset_down": {"type": ["integer", "string"]}, + "offset_left": {"type": ["integer", "string"]}, + "offset_right": {"type": ["integer", "string"]}, + "sleep_after_seconds": {"type": ["number", "string"]}, + }, + "required": ["coordinate"], + "additionalProperties": False, + }, + }, + ] + + def _capture_screen(self, with_grid: bool = True) -> tuple[Image.Image, dict[str, Any]]: + screenshot = pyautogui.screenshot().convert("RGB") + width, height = screenshot.size + image = draw_global_grid(screenshot) if with_grid else screenshot + meta = { + "width": width, + "height": height, + "captured_at": utc_now_iso(), + "grid": bool(with_grid), + } + return image, meta + + def _save_image(self, image: Image.Image, path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + image.save(path, format="PNG") + + def _build_visual_message(self, title: str, data_url: str, meta: dict[str, Any]) -> dict[str, Any]: + text = ( + f"{title}\n" + f"Metadata: {json.dumps(meta, ensure_ascii=False)}\n" + "Use coordinates from this image for click/enhance actions." + ) + return { + "role": "user", + "content": [ + {"type": "input_text", "text": text}, + {"type": "input_image", "image_url": data_url, "detail": "high"}, + ], + } + + def _parse_px(self, value: Any) -> int: + if value is None: + return 0 + if isinstance(value, bool): + return int(value) + if isinstance(value, int): + return value + if isinstance(value, float): + return int(round(value)) + text = str(value).strip().lower() + if text.endswith("px"): + text = text[:-2].strip() + try: + return int(float(text)) + except Exception: # noqa: BLE001 + return 0 + + def _parse_seconds(self, value: Any, default: float = 0.0, max_seconds: float = 60.0) -> float: + if value is None: + return default + if isinstance(value, (int, float)): + sec = float(value) + else: + text = str(value).strip().lower() + if text.endswith("ms"): + try: + sec = float(text[:-2].strip()) / 1000.0 + except Exception: # noqa: BLE001 + sec = default + else: + if text.endswith("s"): + text = text[:-1].strip() + try: + sec = float(text) + except Exception: # noqa: BLE001 + sec = default + if sec < 0: + sec = 0.0 + if sec > max_seconds: + sec = max_seconds + return sec + + def _tool_see_screen(self, _: dict[str, Any]) -> dict[str, Any]: + image, meta = self._capture_screen(with_grid=True) + out_path = self.artifacts.shots_dir / f"screen_step_{self.step:03d}.png" + self._save_image(image, out_path) + data_url = image_to_data_url(image, "PNG") + + self.last_screen_data_url = data_url + self.last_screen_meta = meta | {"path": str(out_path.resolve())} + return { + "ok": True, + "path": str(out_path.resolve()), + "meta": self.last_screen_meta, + "message": "Screen captured with coordinate grid.", + } + + def _tool_enhance(self, args: dict[str, Any]) -> dict[str, Any]: + coord = args.get("coordinate") or {} + x = int(coord.get("x", 0)) + y = int(coord.get("y", 0)) + base, base_meta = self._capture_screen(with_grid=False) + width, height = base.size + + region_half = 180 + left = clamp(x - region_half, 0, width - 1) + top = clamp(y - region_half, 0, height - 1) + right = clamp(x + region_half, left + 1, width) + bottom = clamp(y + region_half, top + 1, height) + + crop = base.crop((left, top, right, bottom)) + upscaled = crop.resize((crop.width * 2, crop.height * 2), Image.Resampling.BICUBIC) + enhanced = ImageOps.autocontrast(upscaled) + enhanced = ImageEnhance.Sharpness(enhanced).enhance(2.0) + enhanced = ImageEnhance.Contrast(enhanced).enhance(1.25) + enhanced = enhanced.filter(ImageFilter.UnsharpMask(radius=1.8, percent=180, threshold=2)) + + out_path = self.artifacts.enhance_dir / f"enhance_step_{self.step:03d}_{x}_{y}.png" + self._save_image(enhanced, out_path) + data_url = image_to_data_url(enhanced, "PNG") + + meta = { + "captured_at": utc_now_iso(), + "source_coord": {"x": x, "y": y}, + "source_box": {"left": left, "top": top, "right": right, "bottom": bottom}, + "scale": 2, + "path": str(out_path.resolve()), + "screen_size": {"width": width, "height": height}, + "base_capture_meta": base_meta, + } + self.last_screen_data_url = data_url + self.last_screen_meta = meta + return {"ok": True, "meta": meta, "message": "Enhanced view generated."} + + def _tool_click(self, args: dict[str, Any]) -> dict[str, Any]: + coord = args.get("coordinate") or {} + offset = args.get("offset") or {} + base_x = int(coord.get("x", 0)) + base_y = int(coord.get("y", 0)) + + legacy_dx = self._parse_px(offset.get("x", 0)) + legacy_dy = self._parse_px(offset.get("y", 0)) + up = self._parse_px(args.get("offset_up", 0)) + down = self._parse_px(args.get("offset_down", 0)) + left = self._parse_px(args.get("offset_left", 0)) + right = self._parse_px(args.get("offset_right", 0)) + + x = base_x + legacy_dx + right - left + y = base_y + legacy_dy + down - up + width, height = pyautogui.size() + x = clamp(x, 0, max(0, width - 1)) + y = clamp(y, 0, max(0, height - 1)) + + now = time.time() + self.click_history.append((x, y, now)) + self.click_history = self.click_history[-20:] + very_recent = [(cx, cy, ts) for (cx, cy, ts) in self.click_history if now - ts <= 25] + near_same = [ + (cx, cy, ts) + for (cx, cy, ts) in very_recent + if abs(cx - x) <= 6 and abs(cy - y) <= 6 + ] + if len(near_same) >= 4: + return { + "ok": False, + "blocked": True, + "error": ( + "Repeated click loop detected at nearly same coordinate. " + "Switch strategy: call see_screen/enhance and use execute_command." + ), + "clicked": {"x": x, "y": y}, + "recent_similar_clicks": len(near_same), + } + + pyautogui.moveTo(x, y, duration=self.click_pause) + pyautogui.click(x=x, y=y) + sleep_after = self._parse_seconds(args.get("sleep_after_seconds", 0), default=0.0, max_seconds=30.0) + if sleep_after > 0: + time.sleep(sleep_after) + else: + time.sleep(0.15) + + return { + "ok": True, + "clicked": {"x": x, "y": y}, + "base_coordinate": {"x": base_x, "y": base_y}, + "applied_offset": { + "legacy": {"x": legacy_dx, "y": legacy_dy}, + "directional": {"up": up, "down": down, "left": left, "right": right}, + }, + "sleep_after_seconds": sleep_after, + "screen_size": {"width": width, "height": height}, + "message": "Click executed.", + } + + def _tool_type(self, args: dict[str, Any]) -> dict[str, Any]: + text = str(args.get("text", "")) + pyautogui.write(text, interval=self.type_interval) + return {"ok": True, "typed_length": len(text), "message": "Text typed."} + + def _tool_press_key(self, args: dict[str, Any]) -> dict[str, Any]: + key = str(args.get("key", "")).strip().lower() + repeats = max(1, int(args.get("repeats", 1))) + if not key: + return {"ok": False, "error": "Missing key."} + repeats = min(repeats, 50) + for _ in range(repeats): + pyautogui.press(key) + time.sleep(0.03) + return {"ok": True, "key": key, "repeats": repeats, "message": "Key press executed."} + + def _tool_sleep(self, args: dict[str, Any]) -> dict[str, Any]: + seconds = self._parse_seconds(args.get("seconds"), default=0.0, max_seconds=60.0) + time.sleep(seconds) + return {"ok": True, "slept_seconds": seconds, "message": "Sleep completed."} + + def _tool_execute_command(self, args: dict[str, Any]) -> dict[str, Any]: + command = str(args.get("command", "")).strip() + if not command: + return {"ok": False, "error": "Empty command."} + + started = time.time() + try: + completed = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + timeout=self.command_timeout, + check=False, + ) + elapsed_ms = int((time.time() - started) * 1000) + return { + "ok": True, + "command": command, + "exit_code": completed.returncode, + "stdout": completed.stdout[-12000:], + "stderr": completed.stderr[-12000:], + "elapsed_ms": elapsed_ms, + } + except subprocess.TimeoutExpired as exc: + elapsed_ms = int((time.time() - started) * 1000) + return { + "ok": False, + "command": command, + "error": "Command timed out.", + "elapsed_ms": elapsed_ms, + "timeout_seconds": self.command_timeout, + "stdout": (exc.stdout or "")[-12000:], + "stderr": (exc.stderr or "")[-12000:], + } + except Exception as exc: # noqa: BLE001 + return {"ok": False, "command": command, "error": f"{type(exc).__name__}: {exc}"} + + def _tool_task_complete(self, args: dict[str, Any]) -> dict[str, Any]: + result = str(args.get("result", "")).strip() or "Task completed." + self.completed = True + self.final_result = result + return {"ok": True, "result": result} + + def _dispatch_tool(self, name: str, args: dict[str, Any]) -> dict[str, Any]: + handlers = { + "see_screen": self._tool_see_screen, + "enhance": self._tool_enhance, + "click": self._tool_click, + "type": self._tool_type, + "press_key": self._tool_press_key, + "sleep": self._tool_sleep, + "execute_command": self._tool_execute_command, + "task_complete": self._tool_task_complete, + } + handler = handlers.get(name) + if handler is None: + return {"ok": False, "error": f"Unknown tool: {name}"} + return handler(args) + + def _safe_parse_args(self, raw: str | None) -> dict[str, Any]: + if not raw: + return {} + try: + parsed = json.loads(raw) + return parsed if isinstance(parsed, dict) else {"value": parsed} + except Exception: # noqa: BLE001 + return {"_raw": raw} + + def _call_model(self, input_items: list[dict[str, Any]]) -> Any: + return self.client.responses.create( + model=self.model, + instructions=SYSTEM_PROMPT, + tools=self._tool_schemas(), + input=input_items, + previous_response_id=self.previous_response_id, + parallel_tool_calls=True, + max_tool_calls=8, + ) + + def run(self, job: str) -> AgentResult: + started_at = time.time() + self.logger.info("Starting run_id=%s model=%s", self.artifacts.run_id, self.model) + self.logger.info("Job: %s", job) + + self._tool_see_screen({}) + init_input: list[dict[str, Any]] = [ + { + "role": "user", + "content": [ + { + "type": "input_text", + "text": ( + f"JOB: {job}\n" + "You are in an action loop. Prefer execute_command for deterministic actions. " + "You can return multiple tool calls in one step (example: click then sleep). " + "Call task_complete(result=...) only when truly done." + ), + } + ], + } + ] + if self.last_screen_data_url and self.last_screen_meta: + init_input.append( + self._build_visual_message("Initial screen capture", self.last_screen_data_url, self.last_screen_meta) + ) + + pending_input = init_input + + while self.step < self.max_steps and not self.completed: + self.step += 1 + self.logger.info("---- Agent step %d/%d ----", self.step, self.max_steps) + try: + response = self._call_model(pending_input) + except Exception as exc: # noqa: BLE001 + self.logger.exception("OpenAI API call failed on step %d", self.step) + raise RuntimeError(f"OpenAI API call failed: {type(exc).__name__}: {exc}") from exc + + self.previous_response_id = response.id + output_items = list(response.output or []) + text_preview = getattr(response, "output_text", "") or "" + if text_preview.strip(): + self.logger.info("Model text: %s", text_preview.strip()[:500]) + + tool_calls = [item for item in output_items if getattr(item, "type", None) == "function_call"] + if not tool_calls: + self.logger.warning("No tool calls returned; nudging model to continue with tools.") + pending_input = [ + { + "role": "user", + "content": [ + { + "type": "input_text", + "text": ( + "No function call was returned. Continue by using tools. " + "You may call multiple tools in one step. " + "When complete, call task_complete(result=...)." + ), + } + ], + } + ] + continue + + next_input: list[dict[str, Any]] = [] + for tool_call in tool_calls: + name = str(getattr(tool_call, "name", "")) + call_id = str(getattr(tool_call, "call_id", "")) + args_raw = getattr(tool_call, "arguments", "{}") + args = self._safe_parse_args(args_raw) + + self.logger.info("Tool call: %s args=%s", name, json.dumps(args, ensure_ascii=False)) + try: + result = self._dispatch_tool(name, args) + except Exception as exc: # noqa: BLE001 + self.logger.exception("Tool execution failed: %s", name) + result = { + "ok": False, + "error": f"{type(exc).__name__}: {exc}", + "traceback": traceback.format_exc()[-8000:], + } + + self.logger.debug( + "Tool result for %s: %s", + name, + json.dumps(result, ensure_ascii=False)[:2500], + ) + next_input.append( + { + "type": "function_call_output", + "call_id": call_id, + "output": json.dumps(result, ensure_ascii=False), + } + ) + + if name in ("see_screen", "enhance") and self.last_screen_data_url and self.last_screen_meta: + title = "Updated screen capture" if name == "see_screen" else "Enhanced screen region" + next_input.append( + self._build_visual_message(title, self.last_screen_data_url, self.last_screen_meta) + ) + + pending_input = next_input + + ended_at = time.time() + if self.completed: + self.logger.info("Task completed in %d step(s).", self.step) + return AgentResult( + completed=True, + result=self.final_result, + steps=self.step, + started_at=started_at, + ended_at=ended_at, + ) + + self.logger.warning("Stopped due to step limit (%d).", self.max_steps) + return AgentResult( + completed=False, + result=f"Stopped after max steps ({self.max_steps}) without task_complete.", + steps=self.step, + started_at=started_at, + ended_at=ended_at, + ) + diff --git a/src/cli.py b/src/cli.py new file mode 100644 index 0000000..4d4cec5 --- /dev/null +++ b/src/cli.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +from dotenv import load_dotenv +from openai import OpenAI + +from .agent import ScreenJobAgent +from .utils import setup_artifacts, setup_logger + +try: + import pyautogui +except Exception as import_exc: + raise RuntimeError( + "pyautogui is required. Install dependencies with: pip install pyautogui pillow" + ) from import_exc + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Run an autonomous desktop task agent using OpenAI + UI tools.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Examples:\n" + ' python main.py "Open amazon.de"\n' + ' python main.py "Open amazon.de and search for mechanical keyboard" --max-steps 80\n\n' + "Artifacts:\n" + " Each run stores logs/screens in ./screenjob_runs/run_YYYYMMDD_HHMMSS/" + ), + ) + parser.add_argument("job", type=str, help="Task objective for the agent.") + parser.add_argument("--model", type=str, default="gpt-5.2", help="OpenAI model name.") + parser.add_argument("--max-steps", type=int, default=60, help="Max tool-iteration steps.") + parser.add_argument( + "--command-timeout", + type=int, + default=45, + help="Timeout (seconds) for execute_command tool.", + ) + parser.add_argument( + "--type-interval", + type=float, + default=0.02, + help="Seconds between typed characters.", + ) + parser.add_argument( + "--click-pause", + type=float, + default=0.10, + help="Mouse move duration before click (seconds).", + ) + parser.add_argument( + "--no-failsafe", + action="store_true", + help="Disable PyAutoGUI fail-safe. Not recommended.", + ) + return parser + + +def main() -> int: + load_dotenv() + parser = build_parser() + args = parser.parse_args() + + api_key = os.getenv("OPENAI_API_KEY", "").strip() + if not api_key: + print("ERROR: Missing OPENAI_API_KEY (expected in environment or .env).", file=sys.stderr) + return 2 + + pyautogui.FAILSAFE = not args.no_failsafe + pyautogui.PAUSE = 0.05 + + runs_base = Path.cwd() / "screenjob_runs" + artifacts = setup_artifacts(runs_base) + logger = setup_logger(artifacts.log_file, verbose=True) + + logger.info("ScreenJob booting. Artifacts: %s", str(artifacts.root_dir.resolve())) + logger.info("PyAutoGUI FAILSAFE=%s", pyautogui.FAILSAFE) + + try: + client = OpenAI(api_key=api_key) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to create OpenAI client.") + print(f"ERROR: Could not initialize OpenAI client: {exc}", file=sys.stderr) + return 2 + + agent = ScreenJobAgent( + client=client, + logger=logger, + artifacts=artifacts, + model=args.model, + max_steps=args.max_steps, + command_timeout=args.command_timeout, + type_interval=args.type_interval, + click_pause=args.click_pause, + ) + + try: + result = agent.run(args.job) + elapsed = result.ended_at - result.started_at + logger.info("Run finished. completed=%s elapsed=%.2fs", result.completed, elapsed) + print( + json.dumps( + { + "completed": result.completed, + "result": result.result, + "steps": result.steps, + "elapsed_seconds": round(elapsed, 3), + "artifacts_dir": str(artifacts.root_dir.resolve()), + }, + ensure_ascii=False, + indent=2, + ) + ) + return 0 if result.completed else 1 + except KeyboardInterrupt: + logger.warning("Interrupted by user.") + print( + json.dumps( + { + "completed": False, + "result": "Interrupted by user.", + "steps": agent.step, + "artifacts_dir": str(artifacts.root_dir.resolve()), + }, + ensure_ascii=False, + indent=2, + ) + ) + return 130 + except Exception as exc: # noqa: BLE001 + logger.exception("Fatal runtime error.") + print( + json.dumps( + { + "completed": False, + "result": f"Fatal error: {type(exc).__name__}: {exc}", + "steps": agent.step, + "artifacts_dir": str(artifacts.root_dir.resolve()), + }, + ensure_ascii=False, + indent=2, + ), + file=sys.stderr, + ) + return 1 + diff --git a/src/models.py b/src/models.py new file mode 100644 index 0000000..ade7587 --- /dev/null +++ b/src/models.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class RunArtifacts: + run_id: str + root_dir: Path + logs_dir: Path + shots_dir: Path + enhance_dir: Path + log_file: Path + + +@dataclass +class AgentResult: + completed: bool + result: str + steps: int + started_at: float + ended_at: float + diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..b872e83 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import base64 +import io +import logging +import sys +from datetime import datetime, timezone +from pathlib import Path + +from PIL import Image, ImageDraw + +from .models import RunArtifacts + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def ensure_dir(path: Path) -> None: + path.mkdir(parents=True, exist_ok=True) + + +def clamp(value: int, minimum: int, maximum: int) -> int: + return max(minimum, min(maximum, value)) + + +def image_to_data_url(image: Image.Image, fmt: str = "PNG") -> str: + buf = io.BytesIO() + image.save(buf, format=fmt) + encoded = base64.b64encode(buf.getvalue()).decode("ascii") + mime = "image/png" if fmt.upper() == "PNG" else "image/jpeg" + return f"data:{mime};base64,{encoded}" + + +def draw_global_grid(image: Image.Image, step: int = 100) -> Image.Image: + canvas = image.convert("RGB").copy() + draw = ImageDraw.Draw(canvas) + width, height = canvas.size + + grid_color = (30, 200, 255) + minor_color = (180, 220, 240) + text_bg = (0, 0, 0) + text_fg = (255, 255, 255) + + draw.rectangle([0, 0, width - 1, height - 1], outline=(255, 80, 80), width=2) + + for x in range(0, width, step): + color = grid_color if x % (step * 5) == 0 else minor_color + draw.line([(x, 0), (x, height)], fill=color, width=1) + label = f"x={x}" + draw.rectangle([x + 2, 2, x + 58, 18], fill=text_bg) + draw.text((x + 4, 4), label, fill=text_fg) + + for y in range(0, height, step): + color = grid_color if y % (step * 5) == 0 else minor_color + draw.line([(0, y), (width, y)], fill=color, width=1) + label = f"y={y}" + draw.rectangle([2, y + 2, 58, y + 18], fill=text_bg) + draw.text((4, y + 4), label, fill=text_fg) + + draw.rectangle([5, 5, 520, 35], fill=text_bg) + draw.text( + (10, 12), + "Coordinate system: origin at top-left, values in pixels", + fill=text_fg, + ) + return canvas + + +def setup_artifacts(base_dir: Path) -> RunArtifacts: + run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + root = base_dir / f"run_{run_id}" + logs_dir = root / "logs" + shots_dir = root / "screens" + enhance_dir = root / "enhanced" + for path in (root, logs_dir, shots_dir, enhance_dir): + ensure_dir(path) + return RunArtifacts( + run_id=run_id, + root_dir=root, + logs_dir=logs_dir, + shots_dir=shots_dir, + enhance_dir=enhance_dir, + log_file=logs_dir / "screenjob.log", + ) + + +def setup_logger(log_file: Path, verbose: bool = True) -> logging.Logger: + logger = logging.getLogger("screenjob") + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + + stream_level = logging.INFO if verbose else logging.WARNING + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setLevel(stream_level) + stream_handler.setFormatter( + logging.Formatter("%(asctime)s | %(levelname)-8s | %(message)s") + ) + + file_handler = logging.FileHandler(log_file, encoding="utf-8") + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter( + logging.Formatter( + "%(asctime)s | %(levelname)-8s | %(name)s | %(filename)s:%(lineno)d | %(message)s" + ) + ) + + logger.addHandler(stream_handler) + logger.addHandler(file_handler) + return logger +