chore: initialize screenjob project baseline

This commit is contained in:
Space-Banane
2026-05-27 17:31:49 +02:00
commit 84b0df520c
9 changed files with 1045 additions and 0 deletions

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Root package marker for local imports like: from src.cli import main

625
src/agent.py Normal file
View File

@@ -0,0 +1,625 @@
from __future__ import annotations
import json
import logging
import subprocess
import time
import traceback
from typing import Any
from openai import OpenAI
from PIL import Image, ImageEnhance, ImageFilter, ImageOps
from .models import AgentResult, RunArtifacts
from .utils import clamp, draw_global_grid, image_to_data_url, utc_now_iso
try:
import pyautogui
except Exception as import_exc:
raise RuntimeError(
"pyautogui is required. Install dependencies with: pip install pyautogui pillow"
) from import_exc
SYSTEM_PROMPT = """
You are ScreenJob, an autonomous desktop-and-terminal task executor.
Rules:
1) Use tools to act. Do not claim actions without tool calls.
2) Prefer execute_command for deterministic actions:
- opening URLs/websites (Windows: start https://amazon.de)
- launching apps or running terminal checks
3) For UI tasks, inspect with see_screen before clicking/typing.
4) Coordinates are absolute screen pixels (x, y) from top-left.
5) Use enhance(coordinate) when text/UI is unclear.
6) For keyboard-heavy interactions, prefer press_key for special keys.
7) You may call multiple tools in one step. If needed, do click then sleep.
8) Never spam repeated clicks on the same coordinate; switch strategy.
9) Keep tool arguments valid JSON and concise.
10) When objective is fully complete, call task_complete(result="...").
"""
class ScreenJobAgent:
def __init__(
self,
client: OpenAI,
logger: logging.Logger,
artifacts: RunArtifacts,
model: str,
max_steps: int,
command_timeout: int,
type_interval: float,
click_pause: float,
) -> None:
self.client = client
self.logger = logger
self.artifacts = artifacts
self.model = model
self.max_steps = max_steps
self.command_timeout = command_timeout
self.type_interval = type_interval
self.click_pause = click_pause
self.step = 0
self.completed = False
self.final_result = ""
self.previous_response_id: str | None = None
self.last_screen_data_url: str | None = None
self.last_screen_meta: dict[str, Any] | None = None
self.click_history: list[tuple[int, int, float]] = []
def _tool_schemas(self) -> list[dict[str, Any]]:
return [
{
"type": "function",
"name": "task_complete",
"description": "Call this when the job objective is fully done.",
"parameters": {
"type": "object",
"properties": {
"result": {"type": "string"},
},
"required": ["result"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "execute_command",
"description": (
"Run a shell command and return stdout/stderr/exit code. "
"Prefer this for deterministic operations like opening URLs."
),
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string"},
},
"required": ["command"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "sleep",
"description": (
"Pause execution for a short time. "
"Use this instead of shell sleep commands."
),
"parameters": {
"type": "object",
"properties": {
"seconds": {"type": ["number", "string"]},
},
"required": ["seconds"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "see_screen",
"description": "Capture full screen with coordinate grid overlay.",
"parameters": {
"type": "object",
"properties": {},
"additionalProperties": False,
},
},
{
"type": "function",
"name": "enhance",
"description": "Create enhanced zoom around a coordinate for readability.",
"parameters": {
"type": "object",
"properties": {
"coordinate": {
"type": "object",
"properties": {
"x": {"type": "integer"},
"y": {"type": "integer"},
},
"required": ["x", "y"],
"additionalProperties": False,
}
},
"required": ["coordinate"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "type",
"description": "Type literal text into the active focused element.",
"parameters": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "press_key",
"description": "Press a specific key (enter, tab, esc, arrows, etc).",
"parameters": {
"type": "object",
"properties": {
"key": {"type": "string"},
"repeats": {"type": "integer", "minimum": 1},
},
"required": ["key"],
"additionalProperties": False,
},
},
{
"type": "function",
"name": "click",
"description": (
"Click absolute screen coordinate with simple directional offsets. "
"Use offset_up/down/left/right values like 2 or '2px'. "
"Optional sleep_after_seconds performs a pause immediately after click."
),
"parameters": {
"type": "object",
"properties": {
"coordinate": {
"type": "object",
"properties": {
"x": {"type": "integer"},
"y": {"type": "integer"},
},
"required": ["x", "y"],
"additionalProperties": False,
},
"offset": {
"type": "object",
"properties": {
"x": {"type": "integer"},
"y": {"type": "integer"},
},
"required": [],
"additionalProperties": False,
},
"offset_up": {"type": ["integer", "string"]},
"offset_down": {"type": ["integer", "string"]},
"offset_left": {"type": ["integer", "string"]},
"offset_right": {"type": ["integer", "string"]},
"sleep_after_seconds": {"type": ["number", "string"]},
},
"required": ["coordinate"],
"additionalProperties": False,
},
},
]
def _capture_screen(self, with_grid: bool = True) -> tuple[Image.Image, dict[str, Any]]:
screenshot = pyautogui.screenshot().convert("RGB")
width, height = screenshot.size
image = draw_global_grid(screenshot) if with_grid else screenshot
meta = {
"width": width,
"height": height,
"captured_at": utc_now_iso(),
"grid": bool(with_grid),
}
return image, meta
def _save_image(self, image: Image.Image, path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
image.save(path, format="PNG")
def _build_visual_message(self, title: str, data_url: str, meta: dict[str, Any]) -> dict[str, Any]:
text = (
f"{title}\n"
f"Metadata: {json.dumps(meta, ensure_ascii=False)}\n"
"Use coordinates from this image for click/enhance actions."
)
return {
"role": "user",
"content": [
{"type": "input_text", "text": text},
{"type": "input_image", "image_url": data_url, "detail": "high"},
],
}
def _parse_px(self, value: Any) -> int:
if value is None:
return 0
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, float):
return int(round(value))
text = str(value).strip().lower()
if text.endswith("px"):
text = text[:-2].strip()
try:
return int(float(text))
except Exception: # noqa: BLE001
return 0
def _parse_seconds(self, value: Any, default: float = 0.0, max_seconds: float = 60.0) -> float:
if value is None:
return default
if isinstance(value, (int, float)):
sec = float(value)
else:
text = str(value).strip().lower()
if text.endswith("ms"):
try:
sec = float(text[:-2].strip()) / 1000.0
except Exception: # noqa: BLE001
sec = default
else:
if text.endswith("s"):
text = text[:-1].strip()
try:
sec = float(text)
except Exception: # noqa: BLE001
sec = default
if sec < 0:
sec = 0.0
if sec > max_seconds:
sec = max_seconds
return sec
def _tool_see_screen(self, _: dict[str, Any]) -> dict[str, Any]:
image, meta = self._capture_screen(with_grid=True)
out_path = self.artifacts.shots_dir / f"screen_step_{self.step:03d}.png"
self._save_image(image, out_path)
data_url = image_to_data_url(image, "PNG")
self.last_screen_data_url = data_url
self.last_screen_meta = meta | {"path": str(out_path.resolve())}
return {
"ok": True,
"path": str(out_path.resolve()),
"meta": self.last_screen_meta,
"message": "Screen captured with coordinate grid.",
}
def _tool_enhance(self, args: dict[str, Any]) -> dict[str, Any]:
coord = args.get("coordinate") or {}
x = int(coord.get("x", 0))
y = int(coord.get("y", 0))
base, base_meta = self._capture_screen(with_grid=False)
width, height = base.size
region_half = 180
left = clamp(x - region_half, 0, width - 1)
top = clamp(y - region_half, 0, height - 1)
right = clamp(x + region_half, left + 1, width)
bottom = clamp(y + region_half, top + 1, height)
crop = base.crop((left, top, right, bottom))
upscaled = crop.resize((crop.width * 2, crop.height * 2), Image.Resampling.BICUBIC)
enhanced = ImageOps.autocontrast(upscaled)
enhanced = ImageEnhance.Sharpness(enhanced).enhance(2.0)
enhanced = ImageEnhance.Contrast(enhanced).enhance(1.25)
enhanced = enhanced.filter(ImageFilter.UnsharpMask(radius=1.8, percent=180, threshold=2))
out_path = self.artifacts.enhance_dir / f"enhance_step_{self.step:03d}_{x}_{y}.png"
self._save_image(enhanced, out_path)
data_url = image_to_data_url(enhanced, "PNG")
meta = {
"captured_at": utc_now_iso(),
"source_coord": {"x": x, "y": y},
"source_box": {"left": left, "top": top, "right": right, "bottom": bottom},
"scale": 2,
"path": str(out_path.resolve()),
"screen_size": {"width": width, "height": height},
"base_capture_meta": base_meta,
}
self.last_screen_data_url = data_url
self.last_screen_meta = meta
return {"ok": True, "meta": meta, "message": "Enhanced view generated."}
def _tool_click(self, args: dict[str, Any]) -> dict[str, Any]:
coord = args.get("coordinate") or {}
offset = args.get("offset") or {}
base_x = int(coord.get("x", 0))
base_y = int(coord.get("y", 0))
legacy_dx = self._parse_px(offset.get("x", 0))
legacy_dy = self._parse_px(offset.get("y", 0))
up = self._parse_px(args.get("offset_up", 0))
down = self._parse_px(args.get("offset_down", 0))
left = self._parse_px(args.get("offset_left", 0))
right = self._parse_px(args.get("offset_right", 0))
x = base_x + legacy_dx + right - left
y = base_y + legacy_dy + down - up
width, height = pyautogui.size()
x = clamp(x, 0, max(0, width - 1))
y = clamp(y, 0, max(0, height - 1))
now = time.time()
self.click_history.append((x, y, now))
self.click_history = self.click_history[-20:]
very_recent = [(cx, cy, ts) for (cx, cy, ts) in self.click_history if now - ts <= 25]
near_same = [
(cx, cy, ts)
for (cx, cy, ts) in very_recent
if abs(cx - x) <= 6 and abs(cy - y) <= 6
]
if len(near_same) >= 4:
return {
"ok": False,
"blocked": True,
"error": (
"Repeated click loop detected at nearly same coordinate. "
"Switch strategy: call see_screen/enhance and use execute_command."
),
"clicked": {"x": x, "y": y},
"recent_similar_clicks": len(near_same),
}
pyautogui.moveTo(x, y, duration=self.click_pause)
pyautogui.click(x=x, y=y)
sleep_after = self._parse_seconds(args.get("sleep_after_seconds", 0), default=0.0, max_seconds=30.0)
if sleep_after > 0:
time.sleep(sleep_after)
else:
time.sleep(0.15)
return {
"ok": True,
"clicked": {"x": x, "y": y},
"base_coordinate": {"x": base_x, "y": base_y},
"applied_offset": {
"legacy": {"x": legacy_dx, "y": legacy_dy},
"directional": {"up": up, "down": down, "left": left, "right": right},
},
"sleep_after_seconds": sleep_after,
"screen_size": {"width": width, "height": height},
"message": "Click executed.",
}
def _tool_type(self, args: dict[str, Any]) -> dict[str, Any]:
text = str(args.get("text", ""))
pyautogui.write(text, interval=self.type_interval)
return {"ok": True, "typed_length": len(text), "message": "Text typed."}
def _tool_press_key(self, args: dict[str, Any]) -> dict[str, Any]:
key = str(args.get("key", "")).strip().lower()
repeats = max(1, int(args.get("repeats", 1)))
if not key:
return {"ok": False, "error": "Missing key."}
repeats = min(repeats, 50)
for _ in range(repeats):
pyautogui.press(key)
time.sleep(0.03)
return {"ok": True, "key": key, "repeats": repeats, "message": "Key press executed."}
def _tool_sleep(self, args: dict[str, Any]) -> dict[str, Any]:
seconds = self._parse_seconds(args.get("seconds"), default=0.0, max_seconds=60.0)
time.sleep(seconds)
return {"ok": True, "slept_seconds": seconds, "message": "Sleep completed."}
def _tool_execute_command(self, args: dict[str, Any]) -> dict[str, Any]:
command = str(args.get("command", "")).strip()
if not command:
return {"ok": False, "error": "Empty command."}
started = time.time()
try:
completed = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=self.command_timeout,
check=False,
)
elapsed_ms = int((time.time() - started) * 1000)
return {
"ok": True,
"command": command,
"exit_code": completed.returncode,
"stdout": completed.stdout[-12000:],
"stderr": completed.stderr[-12000:],
"elapsed_ms": elapsed_ms,
}
except subprocess.TimeoutExpired as exc:
elapsed_ms = int((time.time() - started) * 1000)
return {
"ok": False,
"command": command,
"error": "Command timed out.",
"elapsed_ms": elapsed_ms,
"timeout_seconds": self.command_timeout,
"stdout": (exc.stdout or "")[-12000:],
"stderr": (exc.stderr or "")[-12000:],
}
except Exception as exc: # noqa: BLE001
return {"ok": False, "command": command, "error": f"{type(exc).__name__}: {exc}"}
def _tool_task_complete(self, args: dict[str, Any]) -> dict[str, Any]:
result = str(args.get("result", "")).strip() or "Task completed."
self.completed = True
self.final_result = result
return {"ok": True, "result": result}
def _dispatch_tool(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
handlers = {
"see_screen": self._tool_see_screen,
"enhance": self._tool_enhance,
"click": self._tool_click,
"type": self._tool_type,
"press_key": self._tool_press_key,
"sleep": self._tool_sleep,
"execute_command": self._tool_execute_command,
"task_complete": self._tool_task_complete,
}
handler = handlers.get(name)
if handler is None:
return {"ok": False, "error": f"Unknown tool: {name}"}
return handler(args)
def _safe_parse_args(self, raw: str | None) -> dict[str, Any]:
if not raw:
return {}
try:
parsed = json.loads(raw)
return parsed if isinstance(parsed, dict) else {"value": parsed}
except Exception: # noqa: BLE001
return {"_raw": raw}
def _call_model(self, input_items: list[dict[str, Any]]) -> Any:
return self.client.responses.create(
model=self.model,
instructions=SYSTEM_PROMPT,
tools=self._tool_schemas(),
input=input_items,
previous_response_id=self.previous_response_id,
parallel_tool_calls=True,
max_tool_calls=8,
)
def run(self, job: str) -> AgentResult:
started_at = time.time()
self.logger.info("Starting run_id=%s model=%s", self.artifacts.run_id, self.model)
self.logger.info("Job: %s", job)
self._tool_see_screen({})
init_input: list[dict[str, Any]] = [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": (
f"JOB: {job}\n"
"You are in an action loop. Prefer execute_command for deterministic actions. "
"You can return multiple tool calls in one step (example: click then sleep). "
"Call task_complete(result=...) only when truly done."
),
}
],
}
]
if self.last_screen_data_url and self.last_screen_meta:
init_input.append(
self._build_visual_message("Initial screen capture", self.last_screen_data_url, self.last_screen_meta)
)
pending_input = init_input
while self.step < self.max_steps and not self.completed:
self.step += 1
self.logger.info("---- Agent step %d/%d ----", self.step, self.max_steps)
try:
response = self._call_model(pending_input)
except Exception as exc: # noqa: BLE001
self.logger.exception("OpenAI API call failed on step %d", self.step)
raise RuntimeError(f"OpenAI API call failed: {type(exc).__name__}: {exc}") from exc
self.previous_response_id = response.id
output_items = list(response.output or [])
text_preview = getattr(response, "output_text", "") or ""
if text_preview.strip():
self.logger.info("Model text: %s", text_preview.strip()[:500])
tool_calls = [item for item in output_items if getattr(item, "type", None) == "function_call"]
if not tool_calls:
self.logger.warning("No tool calls returned; nudging model to continue with tools.")
pending_input = [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": (
"No function call was returned. Continue by using tools. "
"You may call multiple tools in one step. "
"When complete, call task_complete(result=...)."
),
}
],
}
]
continue
next_input: list[dict[str, Any]] = []
for tool_call in tool_calls:
name = str(getattr(tool_call, "name", ""))
call_id = str(getattr(tool_call, "call_id", ""))
args_raw = getattr(tool_call, "arguments", "{}")
args = self._safe_parse_args(args_raw)
self.logger.info("Tool call: %s args=%s", name, json.dumps(args, ensure_ascii=False))
try:
result = self._dispatch_tool(name, args)
except Exception as exc: # noqa: BLE001
self.logger.exception("Tool execution failed: %s", name)
result = {
"ok": False,
"error": f"{type(exc).__name__}: {exc}",
"traceback": traceback.format_exc()[-8000:],
}
self.logger.debug(
"Tool result for %s: %s",
name,
json.dumps(result, ensure_ascii=False)[:2500],
)
next_input.append(
{
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps(result, ensure_ascii=False),
}
)
if name in ("see_screen", "enhance") and self.last_screen_data_url and self.last_screen_meta:
title = "Updated screen capture" if name == "see_screen" else "Enhanced screen region"
next_input.append(
self._build_visual_message(title, self.last_screen_data_url, self.last_screen_meta)
)
pending_input = next_input
ended_at = time.time()
if self.completed:
self.logger.info("Task completed in %d step(s).", self.step)
return AgentResult(
completed=True,
result=self.final_result,
steps=self.step,
started_at=started_at,
ended_at=ended_at,
)
self.logger.warning("Stopped due to step limit (%d).", self.max_steps)
return AgentResult(
completed=False,
result=f"Stopped after max steps ({self.max_steps}) without task_complete.",
steps=self.step,
started_at=started_at,
ended_at=ended_at,
)

151
src/cli.py Normal file
View File

@@ -0,0 +1,151 @@
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from openai import OpenAI
from .agent import ScreenJobAgent
from .utils import setup_artifacts, setup_logger
try:
import pyautogui
except Exception as import_exc:
raise RuntimeError(
"pyautogui is required. Install dependencies with: pip install pyautogui pillow"
) from import_exc
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Run an autonomous desktop task agent using OpenAI + UI tools.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
' python main.py "Open amazon.de"\n'
' python main.py "Open amazon.de and search for mechanical keyboard" --max-steps 80\n\n'
"Artifacts:\n"
" Each run stores logs/screens in ./screenjob_runs/run_YYYYMMDD_HHMMSS/"
),
)
parser.add_argument("job", type=str, help="Task objective for the agent.")
parser.add_argument("--model", type=str, default="gpt-5.2", help="OpenAI model name.")
parser.add_argument("--max-steps", type=int, default=60, help="Max tool-iteration steps.")
parser.add_argument(
"--command-timeout",
type=int,
default=45,
help="Timeout (seconds) for execute_command tool.",
)
parser.add_argument(
"--type-interval",
type=float,
default=0.02,
help="Seconds between typed characters.",
)
parser.add_argument(
"--click-pause",
type=float,
default=0.10,
help="Mouse move duration before click (seconds).",
)
parser.add_argument(
"--no-failsafe",
action="store_true",
help="Disable PyAutoGUI fail-safe. Not recommended.",
)
return parser
def main() -> int:
load_dotenv()
parser = build_parser()
args = parser.parse_args()
api_key = os.getenv("OPENAI_API_KEY", "").strip()
if not api_key:
print("ERROR: Missing OPENAI_API_KEY (expected in environment or .env).", file=sys.stderr)
return 2
pyautogui.FAILSAFE = not args.no_failsafe
pyautogui.PAUSE = 0.05
runs_base = Path.cwd() / "screenjob_runs"
artifacts = setup_artifacts(runs_base)
logger = setup_logger(artifacts.log_file, verbose=True)
logger.info("ScreenJob booting. Artifacts: %s", str(artifacts.root_dir.resolve()))
logger.info("PyAutoGUI FAILSAFE=%s", pyautogui.FAILSAFE)
try:
client = OpenAI(api_key=api_key)
except Exception as exc: # noqa: BLE001
logger.exception("Failed to create OpenAI client.")
print(f"ERROR: Could not initialize OpenAI client: {exc}", file=sys.stderr)
return 2
agent = ScreenJobAgent(
client=client,
logger=logger,
artifacts=artifacts,
model=args.model,
max_steps=args.max_steps,
command_timeout=args.command_timeout,
type_interval=args.type_interval,
click_pause=args.click_pause,
)
try:
result = agent.run(args.job)
elapsed = result.ended_at - result.started_at
logger.info("Run finished. completed=%s elapsed=%.2fs", result.completed, elapsed)
print(
json.dumps(
{
"completed": result.completed,
"result": result.result,
"steps": result.steps,
"elapsed_seconds": round(elapsed, 3),
"artifacts_dir": str(artifacts.root_dir.resolve()),
},
ensure_ascii=False,
indent=2,
)
)
return 0 if result.completed else 1
except KeyboardInterrupt:
logger.warning("Interrupted by user.")
print(
json.dumps(
{
"completed": False,
"result": "Interrupted by user.",
"steps": agent.step,
"artifacts_dir": str(artifacts.root_dir.resolve()),
},
ensure_ascii=False,
indent=2,
)
)
return 130
except Exception as exc: # noqa: BLE001
logger.exception("Fatal runtime error.")
print(
json.dumps(
{
"completed": False,
"result": f"Fatal error: {type(exc).__name__}: {exc}",
"steps": agent.step,
"artifacts_dir": str(artifacts.root_dir.resolve()),
},
ensure_ascii=False,
indent=2,
),
file=sys.stderr,
)
return 1

24
src/models.py Normal file
View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass
class RunArtifacts:
run_id: str
root_dir: Path
logs_dir: Path
shots_dir: Path
enhance_dir: Path
log_file: Path
@dataclass
class AgentResult:
completed: bool
result: str
steps: int
started_at: float
ended_at: float

111
src/utils.py Normal file
View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import base64
import io
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
from PIL import Image, ImageDraw
from .models import RunArtifacts
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def clamp(value: int, minimum: int, maximum: int) -> int:
return max(minimum, min(maximum, value))
def image_to_data_url(image: Image.Image, fmt: str = "PNG") -> str:
buf = io.BytesIO()
image.save(buf, format=fmt)
encoded = base64.b64encode(buf.getvalue()).decode("ascii")
mime = "image/png" if fmt.upper() == "PNG" else "image/jpeg"
return f"data:{mime};base64,{encoded}"
def draw_global_grid(image: Image.Image, step: int = 100) -> Image.Image:
canvas = image.convert("RGB").copy()
draw = ImageDraw.Draw(canvas)
width, height = canvas.size
grid_color = (30, 200, 255)
minor_color = (180, 220, 240)
text_bg = (0, 0, 0)
text_fg = (255, 255, 255)
draw.rectangle([0, 0, width - 1, height - 1], outline=(255, 80, 80), width=2)
for x in range(0, width, step):
color = grid_color if x % (step * 5) == 0 else minor_color
draw.line([(x, 0), (x, height)], fill=color, width=1)
label = f"x={x}"
draw.rectangle([x + 2, 2, x + 58, 18], fill=text_bg)
draw.text((x + 4, 4), label, fill=text_fg)
for y in range(0, height, step):
color = grid_color if y % (step * 5) == 0 else minor_color
draw.line([(0, y), (width, y)], fill=color, width=1)
label = f"y={y}"
draw.rectangle([2, y + 2, 58, y + 18], fill=text_bg)
draw.text((4, y + 4), label, fill=text_fg)
draw.rectangle([5, 5, 520, 35], fill=text_bg)
draw.text(
(10, 12),
"Coordinate system: origin at top-left, values in pixels",
fill=text_fg,
)
return canvas
def setup_artifacts(base_dir: Path) -> RunArtifacts:
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
root = base_dir / f"run_{run_id}"
logs_dir = root / "logs"
shots_dir = root / "screens"
enhance_dir = root / "enhanced"
for path in (root, logs_dir, shots_dir, enhance_dir):
ensure_dir(path)
return RunArtifacts(
run_id=run_id,
root_dir=root,
logs_dir=logs_dir,
shots_dir=shots_dir,
enhance_dir=enhance_dir,
log_file=logs_dir / "screenjob.log",
)
def setup_logger(log_file: Path, verbose: bool = True) -> logging.Logger:
logger = logging.getLogger("screenjob")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
stream_level = logging.INFO if verbose else logging.WARNING
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(stream_level)
stream_handler.setFormatter(
logging.Formatter("%(asctime)s | %(levelname)-8s | %(message)s")
)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s | %(filename)s:%(lineno)d | %(message)s"
)
)
logger.addHandler(stream_handler)
logger.addHandler(file_handler)
return logger