refactor: simplify to see/interact/exec and split server modules
All checks were successful
python-syntax / syntax-check (push) Successful in 6s

This commit is contained in:
2026-05-03 20:07:12 +02:00
parent aced5be25e
commit 1c03cab457
8 changed files with 911 additions and 1928 deletions

File diff suppressed because it is too large Load Diff

42
server/config.py Normal file
View File

@@ -0,0 +1,42 @@
import os
from typing import Optional
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env", override=False)
def _env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _parse_allowed_region() -> Optional[tuple[int, int, int, int]]:
raw = os.getenv("CLICKTHROUGH_ALLOWED_REGION")
if not raw:
return None
parts = [p.strip() for p in raw.split(",")]
if len(parts) != 4:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION must be x,y,width,height")
x, y, w, h = (int(p) for p in parts)
if w <= 0 or h <= 0:
raise ValueError("CLICKTHROUGH_ALLOWED_REGION width/height must be > 0")
return x, y, w, h
SETTINGS = {
"host": os.getenv("CLICKTHROUGH_HOST", "127.0.0.1"),
"port": int(os.getenv("CLICKTHROUGH_PORT", "8123")),
"token": os.getenv("CLICKTHROUGH_TOKEN", "").strip(),
"dry_run": _env_bool("CLICKTHROUGH_DRY_RUN", False),
"allowed_region": _parse_allowed_region(),
"exec_enabled": _env_bool("CLICKTHROUGH_EXEC_ENABLED", True),
"exec_default_shell": os.getenv("CLICKTHROUGH_EXEC_DEFAULT_SHELL", "powershell").strip().lower(),
"exec_default_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_TIMEOUT_S", "30")),
"exec_max_timeout_s": int(os.getenv("CLICKTHROUGH_EXEC_MAX_TIMEOUT_S", "120")),
"exec_max_output_chars": int(os.getenv("CLICKTHROUGH_EXEC_MAX_OUTPUT_CHARS", "20000")),
"exec_secret": os.getenv("CLICKTHROUGH_EXEC_SECRET", "").strip(),
}

124
server/models.py Normal file
View File

@@ -0,0 +1,124 @@
from typing import Literal, Optional
from pydantic import BaseModel, Field, model_validator
class PixelTarget(BaseModel):
mode: Literal["pixel"]
x: int
y: int
dx: int = 0
dy: int = 0
class GridTarget(BaseModel):
mode: Literal["grid"]
region_x: int
region_y: int
region_width: int = Field(gt=0)
region_height: int = Field(gt=0)
rows: int = Field(gt=0)
cols: int = Field(gt=0)
row: int = Field(ge=0)
col: int = Field(ge=0)
dx: float = 0.0
dy: float = 0.0
@model_validator(mode="after")
def _validate_indices(self):
if self.row >= self.rows or self.col >= self.cols:
raise ValueError("row/col must be inside rows/cols")
if not -1.0 <= self.dx <= 1.0:
raise ValueError("dx must be in [-1, 1]")
if not -1.0 <= self.dy <= 1.0:
raise ValueError("dy must be in [-1, 1]")
return self
Target = PixelTarget | GridTarget
class ActionRequest(BaseModel):
action: Literal[
"move",
"click",
"right_click",
"double_click",
"middle_click",
"scroll",
"type",
"hotkey",
]
target: Optional[Target] = None
duration_ms: int = Field(default=0, ge=0, le=20000)
button: Literal["left", "right", "middle"] = "left"
clicks: int = Field(default=1, ge=1, le=10)
scroll_amount: int = 0
text: str = ""
keys: list[str] = Field(default_factory=list)
interval_ms: int = Field(default=20, ge=0, le=5000)
dry_run: bool = False
class ExecRequest(BaseModel):
command: str = Field(min_length=1, max_length=10000)
shell: Literal["powershell", "bash", "cmd"] | None = None
timeout_s: int | None = Field(default=None, ge=1, le=600)
cwd: str | None = None
dry_run: bool = False
class WindowQuery(BaseModel):
title_contains: str | None = Field(default=None, max_length=512)
title_regex: str | None = Field(default=None, max_length=512)
process_name: str | None = Field(default=None, max_length=260)
hwnd: int | None = Field(default=None, ge=1)
visible_only: bool = True
class WindowActionRequest(WindowQuery):
action: Literal["focus", "restore", "minimize", "maximize", "close"]
timeout_ms: int = Field(default=3000, ge=0, le=60000)
class LaunchRequest(BaseModel):
executable: str = Field(min_length=1, max_length=2048)
args: list[str] = Field(default_factory=list, max_length=100)
cwd: str | None = None
wait_for_window: bool = False
match: WindowQuery | None = None
timeout_ms: int = Field(default=5000, ge=0, le=120000)
dry_run: bool = False
class SeeRequest(BaseModel):
screen: int = 0
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
with_grid: bool = True
grid_rows: int = Field(default=12, ge=1, le=300)
grid_cols: int = Field(default=12, ge=1, le=300)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=85, ge=1, le=100)
class SeeZoomRequest(BaseModel):
screen: int = 0
center_x: int = Field(ge=0)
center_y: int = Field(ge=0)
width: int = Field(default=500, ge=10)
height: int = Field(default=350, ge=10)
with_grid: bool = True
grid_rows: int = Field(default=20, ge=1, le=300)
grid_cols: int = Field(default=20, ge=1, le=300)
include_labels: bool = True
image_format: Literal["png", "jpeg"] = "png"
jpeg_quality: int = Field(default=90, ge=1, le=100)
class InteractRequest(BaseModel):
screen: int = 0
action: ActionRequest

462
server/services.py Normal file
View File

@@ -0,0 +1,462 @@
import ctypes
import io
import os
import re
import subprocess
import sys
import time
from typing import Literal
from fastapi import HTTPException
from PIL import ImageChops, ImageStat
from .config import SETTINGS
from .models import ActionRequest, GridTarget, LaunchRequest, PixelTarget, Target, WindowActionRequest, WindowQuery
def import_capture_libs():
try:
from PIL import Image, ImageDraw
import mss
return Image, ImageDraw, mss
except Exception as exc:
raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc
def display_region(mon: dict, screen: int, mss_index: int, primary: bool) -> dict:
return {
"screen": screen,
"mss_index": mss_index,
"primary": primary,
"x": mon["left"],
"y": mon["top"],
"width": mon["width"],
"height": mon["height"],
}
def ordered_displays(sct) -> list[dict]:
raw_monitors = list(enumerate(sct.monitors[1:], start=1))
if not raw_monitors:
raise HTTPException(status_code=500, detail="no displays detected")
primary_pos = next((idx for idx, (_, mon) in enumerate(raw_monitors) if mon["left"] == 0 and mon["top"] == 0), 0)
ordered = [raw_monitors[primary_pos]] + [item for idx, item in enumerate(raw_monitors) if idx != primary_pos]
return [display_region(mon, screen=index, mss_index=mss_index, primary=(index == 0)) for index, (mss_index, mon) in enumerate(ordered)]
def get_displays() -> list[dict]:
_, _, mss = import_capture_libs()
with mss.mss() as sct:
return ordered_displays(sct)
def select_display(screen: int) -> tuple[dict, list[dict], dict]:
displays = get_displays()
selected = displays[screen] if 0 <= screen < len(displays) else displays[0]
return selected, displays, {"requested": screen, "selected": selected["screen"], "fallback": selected["screen"] != screen}
def capture_screen(screen: int = 0):
Image, _, mss = import_capture_libs()
with mss.mss() as sct:
displays = ordered_displays(sct)
mon = displays[screen] if 0 <= screen < len(displays) else displays[0]
shot = sct.grab({"left": mon["x"], "top": mon["y"], "width": mon["width"], "height": mon["height"]})
image = Image.frombytes("RGB", shot.size, shot.rgb)
selection = {"requested": screen, "selected": mon["screen"], "fallback": mon["screen"] != screen}
return image, mon, displays, selection
def capture_region_image(screen: int, region_x: int | None, region_y: int | None, region_width: int | None, region_height: int | None):
base_img, mon, displays, screen_selection = capture_screen(screen)
if None in {region_x, region_y, region_width, region_height}:
return base_img, {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}, mon, displays, screen_selection
left = region_x - mon["x"]
top = region_y - mon["y"]
right = left + region_width
bottom = top + region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
crop = base_img.crop((left, top, right, bottom))
return crop, {"x": region_x, "y": region_y, "width": region_width, "height": region_height}, mon, displays, screen_selection
def serialize_image(image, image_format: str, jpeg_quality: int) -> bytes:
buf = io.BytesIO()
if image_format == "jpeg":
image.save(buf, format="JPEG", quality=jpeg_quality)
else:
image.save(buf, format="PNG")
return buf.getvalue()
def encode_image(image, image_format: str, jpeg_quality: int) -> str:
import base64
return base64.b64encode(serialize_image(image, image_format, jpeg_quality)).decode("ascii")
def draw_grid(image, region_x: int, region_y: int, rows: int, cols: int, include_labels: bool):
_, ImageDraw, _ = import_capture_libs()
out = image.copy()
draw = ImageDraw.Draw(out)
w, h = out.size
cell_w = w / cols
cell_h = h / rows
for c in range(1, cols):
x = int(round(c * cell_w))
draw.line([(x, 0), (x, h)], fill=(255, 0, 0), width=1)
for r in range(1, rows):
y = int(round(r * cell_h))
draw.line([(0, y), (w, y)], fill=(255, 0, 0), width=1)
draw.rectangle([(0, 0), (w - 1, h - 1)], outline=(255, 0, 0), width=2)
if include_labels:
for r in range(rows):
for c in range(cols):
cx = int((c + 0.5) * cell_w)
cy = int((r + 0.5) * cell_h)
draw.text((cx - 12, cy - 6), f"{r},{c}", fill=(255, 255, 0))
meta = {
"region": {"x": region_x, "y": region_y, "width": w, "height": h},
"grid": {
"rows": rows,
"cols": cols,
"cell_width": cell_w,
"cell_height": cell_h,
"indexing": "zero-based",
"point_formula": {
"pixel_x": "region.x + ((col + 0.5 + dx*0.5) * cell_width)",
"pixel_y": "region.y + ((row + 0.5 + dy*0.5) * cell_height)",
"dx_range": "[-1,1]",
"dy_range": "[-1,1]",
},
},
}
return out, meta
def resolve_target(target: Target) -> tuple[int, int, dict]:
if isinstance(target, PixelTarget):
x = target.x + target.dx
y = target.y + target.dy
return x, y, {"mode": "pixel", "source": target.model_dump()}
cell_w = target.region_width / target.cols
cell_h = target.region_height / target.rows
x = target.region_x + int(round((target.col + 0.5 + (target.dx * 0.5)) * cell_w))
y = target.region_y + int(round((target.row + 0.5 + (target.dy * 0.5)) * cell_h))
return x, y, {"mode": "grid", "source": target.model_dump(), "derived": {"cell_width": cell_w, "cell_height": cell_h}}
def enforce_allowed_region(x: int, y: int):
region = SETTINGS["allowed_region"]
if region is None:
return
rx, ry, rw, rh = region
if not (rx <= x < rx + rw and ry <= y < ry + rh):
raise HTTPException(status_code=403, detail="point outside allowed region")
def import_input_lib():
try:
import pyautogui
pyautogui.FAILSAFE = True
return pyautogui
except Exception as exc:
raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc
def exec_action(req: ActionRequest, screen: int = 0) -> dict:
run_dry = SETTINGS["dry_run"] or req.dry_run
selected_display, _, screen_selection = select_display(screen)
pyautogui = None if run_dry else import_input_lib()
resolved_target = None
if req.target is not None:
x, y, info = resolve_target(req.target)
enforce_allowed_region(x, y)
resolved_target = {"x": x, "y": y, "target_info": info}
duration_sec = req.duration_ms / 1000.0
if req.action in {"move", "click", "right_click", "double_click", "middle_click"} and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for pointer actions")
if req.action == "scroll" and resolved_target is None:
raise HTTPException(status_code=400, detail="target is required for scroll")
if not run_dry:
if req.action == "move":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
elif req.action == "click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], clicks=req.clicks, interval=req.interval_ms / 1000.0, button=req.button, duration=duration_sec)
elif req.action == "right_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="right", duration=duration_sec)
elif req.action == "double_click":
pyautogui.doubleClick(x=resolved_target["x"], y=resolved_target["y"], interval=req.interval_ms / 1000.0)
elif req.action == "middle_click":
pyautogui.click(x=resolved_target["x"], y=resolved_target["y"], button="middle", duration=duration_sec)
elif req.action == "scroll":
pyautogui.moveTo(resolved_target["x"], resolved_target["y"], duration=duration_sec)
pyautogui.scroll(req.scroll_amount)
elif req.action == "type":
pyautogui.write(req.text, interval=req.interval_ms / 1000.0)
elif req.action == "hotkey":
if len(req.keys) < 1:
raise HTTPException(status_code=400, detail="keys is required for hotkey")
pyautogui.hotkey(*req.keys)
return {"action": req.action, "executed": not run_dry, "dry_run": run_dry, "screen": screen_selection, "display": selected_display, "resolved_target": resolved_target}
def windows_only(feature: str):
if sys.platform != "win32":
raise HTTPException(status_code=501, detail=f"{feature} is currently supported on Windows hosts only")
def tasklist_process_name(pid: int) -> str | None:
try:
completed = subprocess.run(["tasklist", "/FI", f"PID eq {pid}", "/FO", "CSV", "/NH"], capture_output=True, text=True, timeout=5, check=False)
except Exception:
return None
line = (completed.stdout or "").strip().splitlines()
if not line:
return None
row = line[0].strip()
if not row or row.startswith("INFO:"):
return None
if row.startswith('"') and '","' in row:
return row.split('","', 1)[0].strip('"')
return None
def list_windows(query: WindowQuery | None = None) -> list[dict]:
windows_only("window endpoints")
query = query or WindowQuery()
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
psapi = ctypes.windll.psapi
user32.GetWindowTextLengthW.argtypes = [ctypes.c_void_p]
user32.GetWindowTextLengthW.restype = ctypes.c_int
user32.GetWindowTextW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetWindowTextW.restype = ctypes.c_int
user32.IsWindowVisible.argtypes = [ctypes.c_void_p]
user32.IsWindowVisible.restype = ctypes.c_bool
user32.IsWindowEnabled.argtypes = [ctypes.c_void_p]
user32.IsWindowEnabled.restype = ctypes.c_bool
user32.IsIconic.argtypes = [ctypes.c_void_p]
user32.IsIconic.restype = ctypes.c_bool
user32.IsZoomed.argtypes = [ctypes.c_void_p]
user32.IsZoomed.restype = ctypes.c_bool
user32.GetForegroundWindow.restype = ctypes.c_void_p
user32.GetWindowRect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.RECT)]
user32.GetWindowRect.restype = ctypes.c_bool
user32.GetClassNameW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetClassNameW.restype = ctypes.c_int
kernel32.OpenProcess.argtypes = [ctypes.wintypes.DWORD, ctypes.wintypes.BOOL, ctypes.wintypes.DWORD]
kernel32.OpenProcess.restype = ctypes.wintypes.HANDLE
kernel32.CloseHandle.argtypes = [ctypes.wintypes.HANDLE]
kernel32.CloseHandle.restype = ctypes.wintypes.BOOL
psapi.GetModuleBaseNameW.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.HMODULE, ctypes.c_wchar_p, ctypes.wintypes.DWORD]
psapi.GetModuleBaseNameW.restype = ctypes.wintypes.DWORD
foreground = int(user32.GetForegroundWindow() or 0)
results: list[dict] = []
def callback(hwnd, _lparam):
hwnd_int = int(hwnd)
if query.hwnd and hwnd_int != query.hwnd:
return True
visible = bool(user32.IsWindowVisible(hwnd))
if query.visible_only and not visible:
return True
length = user32.GetWindowTextLengthW(hwnd)
title_buf = ctypes.create_unicode_buffer(max(1, length + 1))
user32.GetWindowTextW(hwnd, title_buf, len(title_buf))
title = title_buf.value or ""
if query.title_contains and query.title_contains.lower() not in title.lower():
return True
if query.title_regex and re.search(query.title_regex, title, flags=re.IGNORECASE) is None:
return True
pid = ctypes.wintypes.DWORD(0)
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
process_name = tasklist_process_name(pid.value)
if query.process_name and (process_name or "").lower() != query.process_name.lower():
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, len(class_buf))
rect = ctypes.wintypes.RECT()
user32.GetWindowRect(hwnd, ctypes.byref(rect))
results.append(
{
"hwnd": hwnd_int,
"title": title,
"class_name": class_buf.value,
"pid": int(pid.value),
"process_name": process_name,
"visible": visible,
"enabled": bool(user32.IsWindowEnabled(hwnd)),
"minimized": bool(user32.IsIconic(hwnd)),
"maximized": bool(user32.IsZoomed(hwnd)),
"foreground": hwnd_int == foreground,
"rect": {"x": int(rect.left), "y": int(rect.top), "width": int(rect.right - rect.left), "height": int(rect.bottom - rect.top)},
}
)
return True
enum_proc = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p)(callback)
user32.EnumWindows(enum_proc, 0)
results.sort(key=lambda item: (not item["foreground"], item["title"].lower(), item["hwnd"]))
return results
def _pick_single_window(query: WindowQuery) -> dict:
matches = list_windows(query)
if not matches:
raise HTTPException(status_code=404, detail="no window matched")
if len(matches) > 1:
raise HTTPException(status_code=409, detail={"message": "multiple windows matched", "matches": matches[:10]})
return matches[0]
def apply_window_action(req: WindowActionRequest) -> dict:
windows_only("window endpoints")
match = _pick_single_window(req)
hwnd = match["hwnd"]
user32 = ctypes.windll.user32
SW_RESTORE, SW_MINIMIZE, SW_MAXIMIZE = 9, 6, 3
WM_CLOSE = 0x0010
if req.action == "focus":
user32.ShowWindow(hwnd, SW_RESTORE)
ok = bool(user32.SetForegroundWindow(hwnd))
if not ok:
raise HTTPException(status_code=500, detail="failed to focus window")
elif req.action == "restore":
user32.ShowWindow(hwnd, SW_RESTORE)
elif req.action == "minimize":
user32.ShowWindow(hwnd, SW_MINIMIZE)
elif req.action == "maximize":
user32.ShowWindow(hwnd, SW_MAXIMIZE)
elif req.action == "close":
user32.PostMessageW(hwnd, WM_CLOSE, 0, 0)
deadline = time.time() + (req.timeout_ms / 1000.0)
final = None
while time.time() <= deadline:
current = list_windows(WindowQuery(hwnd=hwnd, visible_only=False))
if not current:
if req.action == "close":
return {"matched": match, "closed": True, "final": None}
time.sleep(0.05)
continue
final = current[0]
if req.action == "focus" and final.get("foreground"):
break
if req.action in {"restore", "minimize", "maximize"}:
break
time.sleep(0.05)
return {"matched": match, "closed": False, "final": final}
def launch_app(req: LaunchRequest) -> dict:
if req.cwd and not os.path.isdir(req.cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
argv = [req.executable, *req.args]
cwd = req.cwd or None
if req.dry_run or SETTINGS["dry_run"]:
return {"executed": False, "dry_run": True, "argv": argv, "cwd": cwd}
try:
proc = subprocess.Popen(argv, cwd=cwd)
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"executable not found: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=400, detail=f"failed to launch process: {exc}") from exc
result = {"executed": True, "dry_run": False, "argv": argv, "cwd": cwd, "pid": proc.pid}
if req.wait_for_window:
query = req.match or WindowQuery(process_name=os.path.basename(req.executable), visible_only=True)
deadline = time.time() + (req.timeout_ms / 1000.0)
match = None
while time.time() <= deadline:
matches = list_windows(query)
if matches:
match = matches[0]
break
time.sleep(0.2)
result["window"] = match
result["window_found"] = match is not None
return result
def _truncate_text(text: str, limit: int) -> tuple[str, bool]:
if len(text) <= limit:
return text, False
return text[:limit], True
def _resolve_exec_program(shell_name: str, command: str) -> list[str]:
if shell_name == "powershell":
return ["powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command]
if shell_name == "bash":
return ["bash", "-lc", command]
if shell_name == "cmd":
return ["cmd", "/c", command]
raise HTTPException(status_code=400, detail="unsupported shell")
def exec_command(req):
if not SETTINGS["exec_enabled"]:
raise HTTPException(status_code=403, detail="exec endpoint disabled")
if not SETTINGS["exec_secret"]:
raise HTTPException(status_code=403, detail="exec secret not configured")
shell_name = (req.shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip()
if shell_name not in {"powershell", "bash", "cmd"}:
raise HTTPException(status_code=400, detail="unsupported shell")
run_dry = SETTINGS["dry_run"] or req.dry_run
timeout_s = req.timeout_s if req.timeout_s is not None else SETTINGS["exec_default_timeout_s"]
timeout_s = min(timeout_s, SETTINGS["exec_max_timeout_s"])
cwd = None
if req.cwd:
cwd = os.path.abspath(req.cwd)
if not os.path.isdir(cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
argv = _resolve_exec_program(shell_name, req.command)
if run_dry:
return {"executed": False, "dry_run": True, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd}
start = time.time()
try:
completed = subprocess.run(argv, cwd=cwd, capture_output=True, text=True, timeout=timeout_s, check=False)
except subprocess.TimeoutExpired as exc:
stdout, stdout_truncated = _truncate_text(str(exc.stdout or ""), SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(str(exc.stderr or ""), SETTINGS["exec_max_output_chars"])
return {"executed": True, "timed_out": True, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd, "duration_ms": int((time.time() - start) * 1000), "exit_code": None, "stdout": stdout, "stderr": stderr, "stdout_truncated": stdout_truncated, "stderr_truncated": stderr_truncated}
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"shell executable not found: {exc}") from exc
stdout, stdout_truncated = _truncate_text(completed.stdout or "", SETTINGS["exec_max_output_chars"])
stderr, stderr_truncated = _truncate_text(completed.stderr or "", SETTINGS["exec_max_output_chars"])
return {"executed": True, "timed_out": False, "shell": shell_name, "command": req.command, "argv": argv, "timeout_s": timeout_s, "cwd": cwd, "duration_ms": int((time.time() - start) * 1000), "exit_code": completed.returncode, "stdout": stdout, "stderr": stderr, "stdout_truncated": stdout_truncated, "stderr_truncated": stderr_truncated}