feat(ocr): add /ocr endpoint for screen, region, and image input
All checks were successful
python-syntax / syntax-check (push) Successful in 5s
python-syntax / syntax-check (pull_request) Successful in 4s

This commit is contained in:
2026-04-06 13:48:33 +02:00
parent 2955426f14
commit 097c6a095c
4 changed files with 240 additions and 0 deletions

View File

@@ -146,6 +146,27 @@ class ExecRequest(BaseModel):
dry_run: bool = False
class OCRRequest(BaseModel):
mode: Literal["screen", "region", "image"] = "screen"
region_x: int | None = Field(default=None, ge=0)
region_y: int | None = Field(default=None, ge=0)
region_width: int | None = Field(default=None, gt=0)
region_height: int | None = Field(default=None, gt=0)
image_base64: str | None = None
language_hint: str | None = Field(default=None, min_length=1, max_length=64)
min_confidence: float = Field(default=0.0, ge=0.0, le=1.0)
@model_validator(mode="after")
def _validate_mode_inputs(self):
if self.mode == "region":
required = [self.region_x, self.region_y, self.region_width, self.region_height]
if any(v is None for v in required):
raise ValueError("region_x, region_y, region_width, region_height are required for mode=region")
if self.mode == "image" and not self.image_base64:
raise ValueError("image_base64 is required for mode=image")
return self
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
token = SETTINGS["token"]
if token and x_clickthrough_token != token:
@@ -275,6 +296,101 @@ def _import_input_lib():
raise HTTPException(status_code=500, detail=f"input backend unavailable: {exc}") from exc
def _import_ocr_libs():
try:
import pytesseract
from pytesseract import Output
return pytesseract, Output
except Exception as exc:
raise HTTPException(status_code=500, detail=f"ocr backend unavailable: {exc}") from exc
def _decode_image_base64(value: str):
Image, _, _ = _import_capture_libs()
payload = value.strip()
if payload.startswith("data:"):
parts = payload.split(",", 1)
if len(parts) != 2:
raise HTTPException(status_code=400, detail="invalid data URL image payload")
payload = parts[1]
try:
image_bytes = base64.b64decode(payload, validate=True)
except Exception as exc:
raise HTTPException(status_code=400, detail="invalid image_base64 payload") from exc
try:
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
except Exception as exc:
raise HTTPException(status_code=400, detail="unsupported or unreadable image bytes") from exc
return image
def _run_ocr(image, language_hint: str | None, min_confidence: float, offset_x: int = 0, offset_y: int = 0) -> list[dict]:
pytesseract, Output = _import_ocr_libs()
config = "--oem 3 --psm 6"
kwargs = {
"image": image,
"output_type": Output.DICT,
"config": config,
}
if language_hint:
kwargs["lang"] = language_hint
try:
data = pytesseract.image_to_data(**kwargs)
except pytesseract.TesseractNotFoundError as exc:
raise HTTPException(status_code=500, detail="tesseract executable not found") from exc
except pytesseract.TesseractError as exc:
raise HTTPException(status_code=400, detail=f"ocr failed: {exc}") from exc
blocks = []
count = len(data.get("text", []))
for idx in range(count):
text = (data["text"][idx] or "").strip()
if not text:
continue
raw_conf = str(data["conf"][idx]).strip()
try:
conf_0_100 = float(raw_conf)
except ValueError:
conf_0_100 = -1.0
if conf_0_100 < 0:
continue
confidence = round(conf_0_100 / 100.0, 4)
if confidence < min_confidence:
continue
left = int(data["left"][idx])
top = int(data["top"][idx])
width = int(data["width"][idx])
height = int(data["height"][idx])
blocks.append(
{
"text": text,
"confidence": confidence,
"bbox": {
"x": left + offset_x,
"y": top + offset_y,
"width": width,
"height": height,
},
"_sort": [top + offset_y, left + offset_x, idx],
}
)
blocks.sort(key=lambda b: (b["_sort"][0], b["_sort"][1], b["_sort"][2]))
for block in blocks:
block.pop("_sort", None)
return blocks
def _pick_shell(explicit_shell: str | None) -> str:
shell_name = (explicit_shell or SETTINGS["exec_default_shell"] or "powershell").lower().strip()
if shell_name not in {"powershell", "bash", "cmd"}:
@@ -600,6 +716,55 @@ def exec_command(
}
@app.post("/ocr")
def ocr(req: OCRRequest, _: None = Depends(_auth)):
source = req.mode
if source == "image":
image = _decode_image_base64(req.image_base64 or "")
region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]}
blocks = _run_ocr(image, req.language_hint, req.min_confidence, 0, 0)
else:
base_img, mon = _capture_screen()
if source == "screen":
image = base_img
region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}
offset_x = mon["x"]
offset_y = mon["y"]
else:
left = req.region_x - mon["x"]
top = req.region_y - mon["y"]
right = left + req.region_width
bottom = top + req.region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
image = base_img.crop((left, top, right, bottom))
region = {
"x": req.region_x,
"y": req.region_y,
"width": req.region_width,
"height": req.region_height,
}
offset_x = req.region_x
offset_y = req.region_y
blocks = _run_ocr(image, req.language_hint, req.min_confidence, offset_x, offset_y)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": {
"mode": source,
"language_hint": req.language_hint,
"min_confidence": req.min_confidence,
"region": region,
"blocks": blocks,
},
}
@app.post("/batch")
def batch(req: BatchRequest, _: None = Depends(_auth)):
results = []