Files
roa2web-service-auto/tests/ocr-validation/test_receipts_parallel.py
Marius Mutu 495790411f feat(ocr): Add docTR OCR engine with metrics infrastructure
Add docTR as primary OCR engine with 2-tier sequential processing,
OCR metrics tracking, and simplified engine selection.

Features:
- docTR OCR engine with light+medium preprocessing tiers
- doctr_plus mode with early exit optimization (~65% fast path)
- OCR metrics dashboard with per-engine statistics
- User OCR preference persistence
- Parallel worker pool for OCR processing
- Cross-validation for extraction quality

Engine options: tesseract, doctr, doctr_plus (recommended), paddleocr

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 05:37:16 +02:00

136 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""Test receipts in PARALLEL to measure real worker benefit."""
import json
import os
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from jose import jwt
API_BASE = "http://localhost:8000"
PDF_FOLDER = "/mnt/e/proiecte/ab-worktrees/doctr-ocr-metrics/docs/data-entry"
EXPECTED_FILE = "/mnt/e/proiecte/ab-worktrees/doctr-ocr-metrics/tests/ocr-validation/expected_receipts.json"
def get_jwt_token():
secret_key = os.getenv('JWT_SECRET_KEY', 'generate_with_secrets_token_urlsafe_32')
now = datetime.utcnow()
payload = {
"username": "MARIUS", "user_id": 1, "companies": ["604"],
"permissions": ["read", "write"], "exp": now + timedelta(hours=1),
"iat": now, "type": "access"
}
return jwt.encode(payload, secret_key, algorithm="HS256")
def submit_job(pdf_path, headers):
"""Submit OCR job and return job_id immediately."""
filename = os.path.basename(pdf_path)
try:
with open(pdf_path, "rb") as f:
files = {"file": (filename, f, "application/pdf")}
response = requests.post(
f"{API_BASE}/api/data-entry/ocr/extract?engine=doctr_plus",
files=files, headers=headers, timeout=30
)
if response.status_code == 200:
return response.json().get("job_id"), filename, None
return None, filename, f"HTTP {response.status_code}"
except Exception as e:
return None, filename, str(e)
def wait_for_job(job_id, filename, headers, timeout=180):
"""Wait for job completion."""
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(
f"{API_BASE}/api/data-entry/ocr/jobs/{job_id}/wait?timeout=30",
headers=headers, timeout=35
)
if resp.status_code == 200:
data = resp.json()
status = data.get("status")
if status == "completed":
result = data.get("result", {})
conf = result.get("overall_confidence", 0)
return {"success": True, "conf": conf, "time": time.time() - start}
elif status == "error":
return {"success": False, "error": data.get("error", "unknown"), "time": time.time() - start}
time.sleep(1)
except Exception as e:
time.sleep(1)
return {"success": False, "error": "timeout", "time": time.time() - start}
def main():
# Load receipts
with open(EXPECTED_FILE) as f:
data = json.load(f)
receipts = data.get("receipts", data)
receipts = [r for r in receipts if r.get("pages", 1) == 1]
token = get_jwt_token()
headers = {"Authorization": f"Bearer {token}"}
print(f"\n{'='*60}")
print(f"PARALLEL TEST: {len(receipts)} receipts")
print(f"{'='*60}\n")
# PHASE 1: Submit ALL jobs rapidly
print("Phase 1: Submitting all jobs...")
total_start = time.time()
jobs = []
for r in receipts:
pdf_path = os.path.join(PDF_FOLDER, r["filename"])
if os.path.exists(pdf_path):
job_id, filename, error = submit_job(pdf_path, headers)
if job_id:
jobs.append((job_id, filename))
else:
print(f" Submit failed: {filename} - {error}")
submit_time = time.time() - total_start
print(f"Submitted {len(jobs)} jobs in {submit_time:.1f}s")
# PHASE 2: Wait for ALL results in parallel
print("\nPhase 2: Waiting for results...")
wait_start = time.time()
results = []
with ThreadPoolExecutor(max_workers=26) as executor:
futures = {executor.submit(wait_for_job, job_id, fn, headers): fn
for job_id, fn in jobs}
for future in as_completed(futures):
filename = futures[future]
result = future.result()
result["filename"] = filename
results.append(result)
if result["success"]:
print(f" OK: {filename[:45]:47} {result['time']:5.1f}s conf={result['conf']:.0%}")
else:
print(f" ERR: {filename[:45]:47} {result['time']:5.1f}s {result.get('error','?')}")
total_time = time.time() - total_start
# Summary
print(f"\n{'='*60}")
print("SUMMARY")
print(f"{'='*60}")
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Success: {len(successful)}/{len(results)}")
print(f"Submit phase: {submit_time:.1f}s")
print(f"Wait phase: {time.time() - wait_start:.1f}s")
print(f"TOTAL TIME: {total_time:.1f}s")
if successful:
times = [r["time"] for r in successful]
print(f"\nPer-job: avg={sum(times)/len(times):.1f}s, min={min(times):.1f}s, max={max(times):.1f}s")
if __name__ == "__main__":
main()