feat(window): add window lifecycle and launch endpoints
All checks were successful
python-syntax / syntax-check (push) Successful in 28s

This commit is contained in:
2026-05-01 15:52:02 +02:00
parent 1429e90be2
commit 493e5499e8
4 changed files with 382 additions and 2 deletions

View File

@@ -1,8 +1,10 @@
import base64
import ctypes
import hmac
import io
import os
import subprocess
import sys
import time
import uuid
from typing import Literal, Optional
@@ -168,6 +170,31 @@ class OCRRequest(BaseModel):
return self
class WindowQuery(BaseModel):
title_contains: str | None = Field(default=None, max_length=512)
title_regex: str | None = Field(default=None, max_length=512)
process_name: str | None = Field(default=None, max_length=260)
hwnd: int | None = Field(default=None, ge=1)
visible_only: bool = True
class WindowActionRequest(WindowQuery):
action: Literal["focus", "restore", "minimize", "maximize", "close"]
timeout_ms: int = Field(default=3000, ge=0, le=60000)
class LaunchRequest(BaseModel):
executable: str = Field(min_length=1, max_length=2048)
args: list[str] = Field(default_factory=list, max_length=100)
cwd: str | None = None
wait_for_window: bool = False
match: WindowQuery | None = None
timeout_ms: int = Field(default=5000, ge=0, le=120000)
dry_run: bool = False
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
token = SETTINGS["token"]
if token and x_clickthrough_token != token:
@@ -456,6 +483,221 @@ def _run_ocr(image, language_hint: str | None, min_confidence: float, offset_x:
return blocks
def _windows_only(feature: str):
if sys.platform != "win32":
raise HTTPException(status_code=501, detail=f"{feature} is currently supported on Windows hosts only")
def _tasklist_process_name(pid: int) -> str | None:
try:
completed = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}", "/FO", "CSV", "/NH"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
except Exception:
return None
line = (completed.stdout or "").strip().splitlines()
if not line:
return None
row = line[0].strip()
if not row or row.startswith("INFO:"):
return None
if row.startswith('"') and '","' in row:
return row.split('","', 1)[0].strip('"')
return None
def _list_windows(query: WindowQuery | None = None) -> list[dict]:
_windows_only("window endpoints")
user32 = ctypes.windll.user32
user32.EnumWindows.restype = ctypes.c_bool
user32.EnumWindows.argtypes = [ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p), ctypes.c_void_p]
user32.IsWindowVisible.argtypes = [ctypes.c_void_p]
user32.IsWindowVisible.restype = ctypes.c_bool
user32.IsWindowEnabled.argtypes = [ctypes.c_void_p]
user32.IsWindowEnabled.restype = ctypes.c_bool
user32.IsIconic.argtypes = [ctypes.c_void_p]
user32.IsIconic.restype = ctypes.c_bool
user32.IsZoomed.argtypes = [ctypes.c_void_p]
user32.IsZoomed.restype = ctypes.c_bool
user32.GetWindowTextLengthW.argtypes = [ctypes.c_void_p]
user32.GetWindowTextLengthW.restype = ctypes.c_int
user32.GetWindowTextW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetClassNameW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_int]
user32.GetClassNameW.restype = ctypes.c_int
user32.GetForegroundWindow.restype = ctypes.c_void_p
user32.GetWindowRect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.wintypes.RECT)]
foreground = int(user32.GetForegroundWindow() or 0)
title_regex = re.compile(query.title_regex, re.IGNORECASE) if query and query.title_regex else None
windows: list[dict] = []
enum_proc = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_void_p)
def _callback(hwnd, _lparam):
hwnd_int = int(hwnd)
if query and query.hwnd is not None and hwnd_int != query.hwnd:
return True
title_len = user32.GetWindowTextLengthW(hwnd)
title_buf = ctypes.create_unicode_buffer(max(title_len + 1, 1))
user32.GetWindowTextW(hwnd, title_buf, len(title_buf))
title = title_buf.value
visible = bool(user32.IsWindowVisible(hwnd))
if query and query.visible_only and not visible:
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, len(class_buf))
pid = ctypes.wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
process_name = _tasklist_process_name(int(pid.value))
rect = ctypes.wintypes.RECT()
user32.GetWindowRect(hwnd, ctypes.byref(rect))
window = {
"hwnd": hwnd_int,
"title": title,
"class_name": class_buf.value,
"pid": int(pid.value),
"process_name": process_name,
"visible": visible,
"enabled": bool(user32.IsWindowEnabled(hwnd)),
"minimized": bool(user32.IsIconic(hwnd)),
"maximized": bool(user32.IsZoomed(hwnd)),
"foreground": hwnd_int == foreground,
"rect": {
"x": int(rect.left),
"y": int(rect.top),
"width": int(rect.right - rect.left),
"height": int(rect.bottom - rect.top),
},
}
if query:
if query.title_contains and query.title_contains.lower() not in title.lower():
return True
if title_regex and not title_regex.search(title):
return True
if query.process_name and (process_name or "").lower() != query.process_name.lower():
return True
windows.append(window)
return True
user32.EnumWindows(enum_proc(_callback), 0)
windows.sort(key=lambda item: (not item["foreground"], item["title"].lower(), item["hwnd"]))
return windows
def _require_window_match(query: WindowQuery) -> dict:
matches = _list_windows(query)
if not matches:
raise HTTPException(status_code=404, detail="no matching window found")
if len(matches) > 1 and query.hwnd is None:
raise HTTPException(
status_code=409,
detail={"message": "multiple windows matched", "matches": matches[:10]},
)
return matches[0]
def _apply_window_action(req: WindowActionRequest) -> dict:
_windows_only("window endpoints")
match = _require_window_match(req)
hwnd = match["hwnd"]
user32 = ctypes.windll.user32
WM_CLOSE = 0x0010
SW_RESTORE = 9
SW_MINIMIZE = 6
SW_MAXIMIZE = 3
if req.action in {"focus", "restore"}:
user32.ShowWindow(hwnd, SW_RESTORE)
ok = bool(user32.SetForegroundWindow(hwnd))
elif req.action == "minimize":
ok = bool(user32.ShowWindow(hwnd, SW_MINIMIZE))
elif req.action == "maximize":
ok = bool(user32.ShowWindow(hwnd, SW_MAXIMIZE))
elif req.action == "close":
ok = bool(user32.PostMessageW(hwnd, WM_CLOSE, 0, 0))
else:
raise HTTPException(status_code=400, detail="unsupported window action")
deadline = time.time() + (req.timeout_ms / 1000.0)
final_match = None
while time.time() <= deadline:
current = _list_windows(WindowQuery(hwnd=hwnd, visible_only=False))
final_match = current[0] if current else None
if req.action == "close" and final_match is None:
break
if req.action in {"focus", "restore"} and final_match and final_match["foreground"] and not final_match["minimized"]:
break
if req.action == "minimize" and final_match and final_match["minimized"]:
break
if req.action == "maximize" and final_match and final_match["maximized"]:
break
time.sleep(0.1)
return {
"ok": ok,
"matched": match,
"window": final_match,
"closed": final_match is None,
}
def _launch_app(req: LaunchRequest) -> dict:
if req.cwd:
cwd = os.path.abspath(req.cwd)
if not os.path.isdir(cwd):
raise HTTPException(status_code=400, detail="cwd does not exist or is not a directory")
else:
cwd = None
argv = [req.executable, *req.args]
if SETTINGS["dry_run"] or req.dry_run:
return {"executed": False, "dry_run": True, "argv": argv, "cwd": cwd}
try:
proc = subprocess.Popen(argv, cwd=cwd)
except FileNotFoundError as exc:
raise HTTPException(status_code=400, detail=f"executable not found: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=400, detail=f"failed to launch process: {exc}") from exc
result = {
"executed": True,
"dry_run": False,
"argv": argv,
"cwd": cwd,
"pid": proc.pid,
}
if req.wait_for_window:
query = req.match or WindowQuery(process_name=os.path.basename(req.executable), visible_only=True)
deadline = time.time() + (req.timeout_ms / 1000.0)
match = None
while time.time() <= deadline:
matches = _list_windows(query)
if matches:
match = matches[0]
break
time.sleep(0.2)
result["window"] = match
result["window_found"] = match is not None
return result
def _pick_shell(explicit_shell: str | None) -> str:
shell_name = (explicit_shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip()
if shell_name not in {"powershell", "bash", "cmd"}:
@@ -799,6 +1041,54 @@ def exec_command(
}
@app.get("/windows")
def windows(
title_contains: str | None = None,
title_regex: str | None = None,
process_name: str | None = None,
hwnd: int | None = None,
visible_only: bool = True,
_: None = Depends(_auth),
):
query = WindowQuery(
title_contains=title_contains,
title_regex=title_regex,
process_name=process_name,
hwnd=hwnd,
visible_only=visible_only,
)
matches = _list_windows(query)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"windows": matches,
"count": len(matches),
}
@app.post("/windows/action")
def window_action(req: WindowActionRequest, _: None = Depends(_auth)):
result = _apply_window_action(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/launch")
def launch(req: LaunchRequest, _: None = Depends(_auth)):
result = _launch_app(req)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": result,
}
@app.post("/ocr")
def ocr(req: OCRRequest, screen: int = 0, _: None = Depends(_auth)):
source = req.mode