Support multi-display screen selection
All checks were successful
python-syntax / syntax-check (push) Successful in 1m33s
All checks were successful
python-syntax / syntax-check (push) Successful in 1m33s
This commit is contained in:
110
server/app.py
110
server/app.py
@@ -192,13 +192,73 @@ def _import_capture_libs():
|
||||
raise HTTPException(status_code=500, detail=f"capture backend unavailable: {exc}") from exc
|
||||
|
||||
|
||||
def _capture_screen():
|
||||
def _display_region(mon: dict, screen: int, mss_index: int, primary: bool) -> dict:
|
||||
return {
|
||||
"screen": screen,
|
||||
"mss_index": mss_index,
|
||||
"primary": primary,
|
||||
"x": mon["left"],
|
||||
"y": mon["top"],
|
||||
"width": mon["width"],
|
||||
"height": mon["height"],
|
||||
}
|
||||
|
||||
|
||||
def _ordered_displays(sct) -> list[dict]:
|
||||
raw_monitors = list(enumerate(sct.monitors[1:], start=1))
|
||||
if not raw_monitors:
|
||||
raise HTTPException(status_code=500, detail="no displays detected")
|
||||
|
||||
primary_pos = next(
|
||||
(idx for idx, (_, mon) in enumerate(raw_monitors) if mon["left"] == 0 and mon["top"] == 0),
|
||||
0,
|
||||
)
|
||||
ordered = [raw_monitors[primary_pos]] + [
|
||||
item for idx, item in enumerate(raw_monitors) if idx != primary_pos
|
||||
]
|
||||
return [
|
||||
_display_region(mon, screen=index, mss_index=mss_index, primary=(index == 0))
|
||||
for index, (mss_index, mon) in enumerate(ordered)
|
||||
]
|
||||
|
||||
|
||||
def _get_displays() -> list[dict]:
|
||||
_, _, mss = _import_capture_libs()
|
||||
with mss.mss() as sct:
|
||||
return _ordered_displays(sct)
|
||||
|
||||
|
||||
def _select_display(screen: int) -> tuple[dict, list[dict], dict]:
|
||||
displays = _get_displays()
|
||||
selected = displays[screen] if 0 <= screen < len(displays) else displays[0]
|
||||
selection = {
|
||||
"requested": screen,
|
||||
"selected": selected["screen"],
|
||||
"fallback": selected["screen"] != screen,
|
||||
}
|
||||
return selected, displays, selection
|
||||
|
||||
|
||||
def _capture_screen(screen: int = 0):
|
||||
Image, _, mss = _import_capture_libs()
|
||||
with mss.mss() as sct:
|
||||
mon = sct.monitors[1]
|
||||
shot = sct.grab(mon)
|
||||
displays = _ordered_displays(sct)
|
||||
mon = displays[screen] if 0 <= screen < len(displays) else displays[0]
|
||||
shot = sct.grab(
|
||||
{
|
||||
"left": mon["x"],
|
||||
"top": mon["y"],
|
||||
"width": mon["width"],
|
||||
"height": mon["height"],
|
||||
}
|
||||
)
|
||||
image = Image.frombytes("RGB", shot.size, shot.rgb)
|
||||
return image, {"x": mon["left"], "y": mon["top"], "width": mon["width"], "height": mon["height"]}
|
||||
selection = {
|
||||
"requested": screen,
|
||||
"selected": mon["screen"],
|
||||
"fallback": mon["screen"] != screen,
|
||||
}
|
||||
return image, mon, displays, selection
|
||||
|
||||
|
||||
def _serialize_image(image, image_format: str, jpeg_quality: int) -> bytes:
|
||||
@@ -503,8 +563,9 @@ def _exec_command(req: ExecRequest) -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _exec_action(req: ActionRequest) -> dict:
|
||||
def _exec_action(req: ActionRequest, screen: int = 0) -> dict:
|
||||
run_dry = SETTINGS["dry_run"] or req.dry_run
|
||||
selected_display, displays, screen_selection = _select_display(screen)
|
||||
|
||||
pyautogui = None if run_dry else _import_input_lib()
|
||||
resolved_target = None
|
||||
@@ -561,6 +622,8 @@ def _exec_action(req: ActionRequest) -> dict:
|
||||
"action": req.action,
|
||||
"executed": not run_dry,
|
||||
"dry_run": run_dry,
|
||||
"screen": screen_selection,
|
||||
"display": selected_display,
|
||||
"resolved_target": resolved_target,
|
||||
}
|
||||
|
||||
@@ -585,6 +648,18 @@ def health(_: None = Depends(_auth)):
|
||||
}
|
||||
|
||||
|
||||
@app.get("/displays")
|
||||
def displays(_: None = Depends(_auth)):
|
||||
detected = _get_displays()
|
||||
return {
|
||||
"ok": True,
|
||||
"request_id": _request_id(),
|
||||
"time_ms": _now_ms(),
|
||||
"displays": detected,
|
||||
"default_screen": 0,
|
||||
}
|
||||
|
||||
|
||||
@app.get("/screen")
|
||||
def screen(
|
||||
with_grid: bool = True,
|
||||
@@ -594,6 +669,7 @@ def screen(
|
||||
image_format: Literal["png", "jpeg"] = "png",
|
||||
jpeg_quality: int = 85,
|
||||
asImage: bool = False,
|
||||
screen: int = 0,
|
||||
_: None = Depends(_auth),
|
||||
):
|
||||
req = ScreenRequest(
|
||||
@@ -605,8 +681,8 @@ def screen(
|
||||
jpeg_quality=jpeg_quality,
|
||||
)
|
||||
|
||||
base_img, mon = _capture_screen()
|
||||
meta = {"region": mon}
|
||||
base_img, mon, displays, screen_selection = _capture_screen(screen)
|
||||
meta = {"region": mon, "screen": screen_selection, "displays": displays}
|
||||
out_img = base_img
|
||||
|
||||
if req.with_grid:
|
||||
@@ -634,8 +710,8 @@ def screen(
|
||||
|
||||
|
||||
@app.post("/zoom")
|
||||
def zoom(req: ZoomRequest, asImage: bool = False, _: None = Depends(_auth)):
|
||||
base_img, mon = _capture_screen()
|
||||
def zoom(req: ZoomRequest, asImage: bool = False, screen: int = 0, _: None = Depends(_auth)):
|
||||
base_img, mon, displays, screen_selection = _capture_screen(screen)
|
||||
|
||||
cx = req.center_x - mon["x"]
|
||||
cy = req.center_y - mon["y"]
|
||||
@@ -655,6 +731,8 @@ def zoom(req: ZoomRequest, asImage: bool = False, _: None = Depends(_auth)):
|
||||
|
||||
meta = {
|
||||
"source_monitor": mon,
|
||||
"screen": screen_selection,
|
||||
"displays": displays,
|
||||
"region": {
|
||||
"x": region_x,
|
||||
"y": region_y,
|
||||
@@ -690,8 +768,8 @@ def zoom(req: ZoomRequest, asImage: bool = False, _: None = Depends(_auth)):
|
||||
|
||||
|
||||
@app.post("/action")
|
||||
def action(req: ActionRequest, _: None = Depends(_auth)):
|
||||
result = _exec_action(req)
|
||||
def action(req: ActionRequest, screen: int = 0, _: None = Depends(_auth)):
|
||||
result = _exec_action(req, screen)
|
||||
return {
|
||||
"ok": True,
|
||||
"request_id": _request_id(),
|
||||
@@ -722,14 +800,14 @@ def exec_command(
|
||||
|
||||
|
||||
@app.post("/ocr")
|
||||
def ocr(req: OCRRequest, _: None = Depends(_auth)):
|
||||
def ocr(req: OCRRequest, screen: int = 0, _: None = Depends(_auth)):
|
||||
source = req.mode
|
||||
if source == "image":
|
||||
image = _decode_image_base64(req.image_base64 or "")
|
||||
region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]}
|
||||
blocks = _run_ocr(image, req.language_hint, req.min_confidence, 0, 0)
|
||||
else:
|
||||
base_img, mon = _capture_screen()
|
||||
base_img, mon, displays, screen_selection = _capture_screen(screen)
|
||||
if source == "screen":
|
||||
image = base_img
|
||||
region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}
|
||||
@@ -762,6 +840,8 @@ def ocr(req: OCRRequest, _: None = Depends(_auth)):
|
||||
"time_ms": _now_ms(),
|
||||
"result": {
|
||||
"mode": source,
|
||||
"screen": screen_selection if source != "image" else None,
|
||||
"display": mon if source != "image" else None,
|
||||
"language_hint": req.language_hint,
|
||||
"min_confidence": req.min_confidence,
|
||||
"region": region,
|
||||
@@ -771,11 +851,11 @@ def ocr(req: OCRRequest, _: None = Depends(_auth)):
|
||||
|
||||
|
||||
@app.post("/batch")
|
||||
def batch(req: BatchRequest, _: None = Depends(_auth)):
|
||||
def batch(req: BatchRequest, screen: int = 0, _: None = Depends(_auth)):
|
||||
results = []
|
||||
for index, item in enumerate(req.actions):
|
||||
try:
|
||||
item_result = _exec_action(item)
|
||||
item_result = _exec_action(item, screen)
|
||||
results.append({"index": index, "ok": True, "result": item_result})
|
||||
except Exception as exc:
|
||||
results.append({"index": index, "ok": False, "error": str(exc)})
|
||||
|
||||
Reference in New Issue
Block a user