feat(ocr): add higher-level text search helpers
All checks were successful
python-syntax / syntax-check (push) Successful in 6s

This commit is contained in:
2026-05-01 16:23:16 +02:00
parent 8857feaf7b
commit f00c525721
4 changed files with 190 additions and 35 deletions

View File

@@ -239,6 +239,12 @@ class WaitRequest(BaseModel):
poll_interval_ms: int = Field(default=250, ge=50, le=10000)
class OCRFindRequest(OCRRequest):
query: str = Field(min_length=1, max_length=512)
match: Literal["contains", "exact", "regex"] = "contains"
group_lines: bool = True
max_results: int = Field(default=20, ge=1, le=200)
def _auth(x_clickthrough_token: Optional[str] = Header(default=None)):
@@ -445,7 +451,11 @@ def _import_ocr_libs():
def _decode_image_base64(value: str):
Image, _, _ = _import_capture_libs()
try:
from PIL import Image
except Exception as exc:
raise HTTPException(status_code=500, detail=f"image decode backend unavailable: {exc}") from exc
payload = value.strip()
if payload.startswith("data:"):
parts = payload.split(",", 1)
@@ -773,6 +783,36 @@ def _capture_region_image(screen: int, region_x: int | None, region_y: int | Non
return crop, region, mon, displays, screen_selection
def _capture_ocr_source(req: OCRRequest, screen: int = 0):
source = req.mode
if source == "image":
image = _decode_image_base64(req.image_base64 or "")
region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]}
return image, region, None, None, None, source
base_img, mon, displays, screen_selection = _capture_screen(screen)
if source == "screen":
image = base_img
region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}
return image, region, mon, displays, screen_selection, source
left = req.region_x - mon["x"]
top = req.region_y - mon["y"]
right = left + req.region_width
bottom = top + req.region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
image = base_img.crop((left, top, right, bottom))
region = {
"x": req.region_x,
"y": req.region_y,
"width": req.region_width,
"height": req.region_height,
}
return image, region, mon, displays, screen_selection, source
def _image_diff_ratio(before, after) -> float:
diff = ImageChops.difference(before, after)
stat = ImageStat.Stat(diff)
@@ -780,6 +820,84 @@ def _image_diff_ratio(before, after) -> float:
return float(sum(channel_means) / (len(channel_means) * 255.0))
def _merge_bbox(blocks: list[dict]) -> dict:
xs = [b["bbox"]["x"] for b in blocks]
ys = [b["bbox"]["y"] for b in blocks]
rights = [b["bbox"]["x"] + b["bbox"]["width"] for b in blocks]
bottoms = [b["bbox"]["y"] + b["bbox"]["height"] for b in blocks]
return {
"x": min(xs),
"y": min(ys),
"width": max(rights) - min(xs),
"height": max(bottoms) - min(ys),
}
def _group_ocr_lines(blocks: list[dict]) -> list[dict]:
if not blocks:
return []
sorted_blocks = sorted(blocks, key=lambda b: (b["bbox"]["y"], b["bbox"]["x"]))
lines: list[list[dict]] = []
current: list[dict] = []
current_center = None
for block in sorted_blocks:
bbox = block["bbox"]
center_y = bbox["y"] + (bbox["height"] / 2)
tolerance = max(10.0, bbox["height"] * 0.8)
if current and current_center is not None and abs(center_y - current_center) > tolerance:
lines.append(sorted(current, key=lambda item: item["bbox"]["x"]))
current = []
current_center = None
current.append(block)
current_center = sum(item["bbox"]["y"] + (item["bbox"]["height"] / 2) for item in current) / len(current)
if current:
lines.append(sorted(current, key=lambda item: item["bbox"]["x"]))
grouped = []
for idx, line_blocks in enumerate(lines):
text = " ".join(item["text"] for item in line_blocks).strip()
if not text:
continue
grouped.append(
{
"text": text,
"confidence": round(sum(item["confidence"] for item in line_blocks) / len(line_blocks), 4),
"bbox": _merge_bbox(line_blocks),
"blocks": line_blocks,
"line_index": idx,
}
)
return grouped
def _find_text_matches(blocks: list[dict], query: str, match_mode: str, group_lines: bool, max_results: int) -> list[dict]:
target = _normalize_text(query)
candidates = _group_ocr_lines(blocks) if group_lines else blocks
matches = []
for item in candidates:
normalized = _normalize_text(item["text"])
if not normalized:
continue
if _matches_text(normalized, target, match_mode):
match = {
"text": item["text"],
"normalized_text": normalized,
"confidence": item["confidence"],
"bbox": item["bbox"],
"grouped": group_lines,
}
if group_lines:
match["blocks"] = item["blocks"]
match["line_index"] = item["line_index"]
matches.append(match)
matches.sort(key=lambda item: (-item["confidence"], item["bbox"]["y"], item["bbox"]["x"]))
return matches[:max_results]
def _wait_for_condition(req: WaitRequest, screen: int = 0) -> dict:
condition = req.condition
deadline = time.time() + (req.timeout_ms / 1000.0)
@@ -1317,38 +1435,10 @@ def wait(req: WaitRequest, screen: int = 0, _: None = Depends(_auth)):
@app.post("/ocr")
def ocr(req: OCRRequest, screen: int = 0, _: None = Depends(_auth)):
source = req.mode
if source == "image":
image = _decode_image_base64(req.image_base64 or "")
region = {"x": 0, "y": 0, "width": image.size[0], "height": image.size[1]}
blocks = _run_ocr(image, req.language_hint, req.min_confidence, 0, 0)
else:
base_img, mon, displays, screen_selection = _capture_screen(screen)
if source == "screen":
image = base_img
region = {"x": mon["x"], "y": mon["y"], "width": mon["width"], "height": mon["height"]}
offset_x = mon["x"]
offset_y = mon["y"]
else:
left = req.region_x - mon["x"]
top = req.region_y - mon["y"]
right = left + req.region_width
bottom = top + req.region_height
if left < 0 or top < 0 or right > base_img.size[0] or bottom > base_img.size[1]:
raise HTTPException(status_code=400, detail="requested region is outside the captured monitor")
image = base_img.crop((left, top, right, bottom))
region = {
"x": req.region_x,
"y": req.region_y,
"width": req.region_width,
"height": req.region_height,
}
offset_x = req.region_x
offset_y = req.region_y
blocks = _run_ocr(image, req.language_hint, req.min_confidence, offset_x, offset_y)
image, region, mon, displays, screen_selection, source = _capture_ocr_source(req, screen)
offset_x = region["x"] if source != "image" else 0
offset_y = region["y"] if source != "image" else 0
blocks = _run_ocr(image, req.language_hint, req.min_confidence, offset_x, offset_y)
return {
"ok": True,
@@ -1366,6 +1456,35 @@ def ocr(req: OCRRequest, screen: int = 0, _: None = Depends(_auth)):
}
@app.post("/ocr/find")
def ocr_find(req: OCRFindRequest, screen: int = 0, _: None = Depends(_auth)):
image, region, mon, displays, screen_selection, source = _capture_ocr_source(req, screen)
offset_x = region["x"] if source != "image" else 0
offset_y = region["y"] if source != "image" else 0
blocks = _run_ocr(image, req.language_hint, req.min_confidence, offset_x, offset_y)
matches = _find_text_matches(blocks, req.query, req.match, req.group_lines, req.max_results)
return {
"ok": True,
"request_id": _request_id(),
"time_ms": _now_ms(),
"result": {
"mode": source,
"screen": screen_selection if source != "image" else None,
"display": mon if source != "image" else None,
"language_hint": req.language_hint,
"min_confidence": req.min_confidence,
"query": req.query,
"match": req.match,
"group_lines": req.group_lines,
"region": region,
"matches": matches,
"match_count": len(matches),
"blocks_considered": len(blocks),
},
}
@app.post("/batch")
def batch(req: BatchRequest, screen: int = 0, _: None = Depends(_auth)):
results = []