Files
roa2web-service-auto/tests/ocr-validation/test_receipts_parallel_windows.py
Claude Agent 62f86250cc refactor(docs): consolidate and cleanup documentation
- Delete 9 deprecated/obsolete docs (~6,300 lines removed)
- Move test PDFs to tests/fixtures/ocr-samples/
- Create docs/DEPLOYMENT.md as principal guide
- Create tests/ocr-validation/README.md
- Update all refs for ultrathin monolith architecture
- Update OCR tests to use relative paths

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 09:14:51 +00:00

315 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Parallel OCR test for Windows.
Run from backend directory: python tests\ocr-validation\test_receipts_parallel_windows.py
"""
import argparse
import json
import os
import sys
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from pathlib import Path
import requests
from jose import jwt
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
print("Warning: psutil not installed, memory tracking disabled")
# Paths - relative to backend directory
SCRIPT_DIR = Path(__file__).parent
BACKEND_DIR = SCRIPT_DIR.parent.parent / "backend"
PDF_FOLDER = SCRIPT_DIR.parent.parent / "tests" / "fixtures" / "ocr-samples"
EXPECTED_FILE = SCRIPT_DIR / "expected_receipts.json"
class MemoryMonitor:
"""Monitor memory usage of backend process and its children (OCR workers)."""
def __init__(self, port=8006):
self.port = port
self.peak_memory_mb = 0
self.current_memory_mb = 0
self._stop_event = threading.Event()
self._thread = None
self._process = None
def _find_backend_process(self):
"""Find the backend process by port."""
if not PSUTIL_AVAILABLE:
return None
try:
for conn in psutil.net_connections(kind='inet'):
if conn.laddr.port == self.port and conn.status == 'LISTEN':
return psutil.Process(conn.pid)
except (psutil.AccessDenied, psutil.NoSuchProcess):
pass
return None
def _get_total_memory(self):
"""Get total memory of backend + all child processes (OCR workers)."""
if not self._process:
self._process = self._find_backend_process()
if not self._process:
return 0
try:
# Get memory of main process
total = self._process.memory_info().rss
# Add memory of all child processes (OCR workers)
for child in self._process.children(recursive=True):
try:
total += child.memory_info().rss
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return total / (1024 * 1024) # Convert to MB
except (psutil.NoSuchProcess, psutil.AccessDenied):
self._process = None
return 0
def _monitor_loop(self):
"""Background thread that monitors memory every 0.5s."""
while not self._stop_event.is_set():
mem = self._get_total_memory()
if mem > 0:
self.current_memory_mb = mem
if mem > self.peak_memory_mb:
self.peak_memory_mb = mem
self._stop_event.wait(0.5)
def start(self):
"""Start monitoring in background thread."""
if not PSUTIL_AVAILABLE:
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
# Wait a bit to get initial reading
time.sleep(1)
def stop(self):
"""Stop monitoring and return peak memory."""
if self._thread:
self._stop_event.set()
self._thread.join(timeout=2)
return self.peak_memory_mb
def get_jwt_token():
secret_key = os.getenv('JWT_SECRET_KEY', 'generate_with_secrets_token_urlsafe_32')
now = datetime.utcnow()
payload = {
"username": "MARIUS",
"user_id": 1,
"companies": ["604"],
"permissions": ["read", "write"],
"exp": now + timedelta(hours=1),
"iat": now,
"type": "access"
}
return jwt.encode(payload, secret_key, algorithm="HS256")
def submit_job(pdf_path, headers, api_base):
"""Submit OCR job and return job_id immediately."""
filename = os.path.basename(pdf_path)
try:
with open(pdf_path, "rb") as f:
files = {"file": (filename, f, "application/pdf")}
response = requests.post(
f"{api_base}/api/data-entry/ocr/extract?engine=doctr_plus",
files=files,
headers=headers,
timeout=30
)
if response.status_code == 200:
return response.json().get("job_id"), filename, None
return None, filename, f"HTTP {response.status_code}: {response.text[:100]}"
except Exception as e:
return None, filename, str(e)
def wait_for_job(job_id, filename, headers, api_base, timeout=180):
"""Wait for job completion."""
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(
f"{api_base}/api/data-entry/ocr/jobs/{job_id}/wait?timeout=30",
headers=headers,
timeout=35
)
if resp.status_code == 200:
data = resp.json()
status = data.get("status")
if status == "completed":
result = data.get("result", {})
conf = result.get("overall_confidence", 0)
return {"success": True, "conf": conf, "time": time.time() - start, "filename": filename}
elif status in ("error", "failed"):
return {"success": False, "error": data.get("error", "unknown"), "time": time.time() - start, "filename": filename}
time.sleep(1)
except Exception as e:
time.sleep(1)
return {"success": False, "error": "timeout", "time": time.time() - start, "filename": filename}
def run_test(api_base, workers, output_file=None, port=8006):
"""Run test and return results dict."""
# Load receipts
if not EXPECTED_FILE.exists():
print(f"ERROR: {EXPECTED_FILE} not found!")
return None
with open(EXPECTED_FILE) as f:
data = json.load(f)
receipts = data.get("receipts", data)
receipts = [r for r in receipts if r.get("pages", 1) == 1]
token = get_jwt_token()
headers = {"Authorization": f"Bearer {token}"}
# Start memory monitoring
memory_monitor = MemoryMonitor(port=port)
memory_monitor.start()
header = f"TEST: {len(receipts)} receipts, {workers} worker(s)"
print()
print("=" * 60)
print(header)
print(f"Backend: {api_base}")
print("=" * 60)
print()
# PHASE 1: Submit ALL jobs rapidly
print("Phase 1: Submitting all jobs...")
total_start = time.time()
jobs = []
for r in receipts:
pdf_path = PDF_FOLDER / r["filename"]
if pdf_path.exists():
job_id, filename, error = submit_job(str(pdf_path), headers, api_base)
if job_id:
jobs.append((job_id, filename))
else:
print(f" Submit failed: {filename} - {error}")
else:
print(f" File not found: {r['filename']}")
submit_time = time.time() - total_start
print(f"Submitted {len(jobs)} jobs in {submit_time:.1f}s")
print()
# PHASE 2: Wait for ALL results in parallel
print("Phase 2: Waiting for results...")
wait_start = time.time()
results = []
with ThreadPoolExecutor(max_workers=26) as executor:
futures = {
executor.submit(wait_for_job, job_id, fn, headers, api_base): fn
for job_id, fn in jobs
}
for future in as_completed(futures):
result = future.result()
results.append(result)
if result["success"]:
print(f" OK: {result['filename'][:45]:47} {result['time']:5.1f}s conf={result['conf']:.0%}")
else:
print(f" ERR: {result['filename'][:45]:47} {result['time']:5.1f}s {result.get('error', '?')}")
total_time = time.time() - total_start
wait_time = time.time() - wait_start
# Stop memory monitoring and get peak
peak_memory_mb = memory_monitor.stop()
# Summary
print()
print("=" * 60)
print(f"SUMMARY - {workers} WORKER(S)")
print("=" * 60)
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Success: {len(successful)}/{len(results)}")
print(f"Submit phase: {submit_time:.1f}s")
print(f"Wait phase: {wait_time:.1f}s")
print(f"TOTAL TIME: {total_time:.1f}s")
if peak_memory_mb > 0:
print(f"PEAK MEMORY: {peak_memory_mb:.0f} MB")
avg_time = sum(r["time"] for r in successful) / len(successful) if successful else 0
min_time = min(r["time"] for r in successful) if successful else 0
max_time = max(r["time"] for r in successful) if successful else 0
avg_conf = sum(r["conf"] for r in successful) / len(successful) if successful else 0
if successful:
print(f"\nPer-job: avg={avg_time:.1f}s, min={min_time:.1f}s, max={max_time:.1f}s")
if failed:
print(f"\nFailed jobs ({len(failed)}):")
for r in failed:
print(f" - {r['filename']}: {r.get('error', '?')}")
# Build result dict
result_data = {
"workers": workers,
"total_receipts": len(receipts),
"submitted": len(jobs),
"successful": len(successful),
"failed": len(failed),
"submit_time": round(submit_time, 1),
"wait_time": round(wait_time, 1),
"total_time": round(total_time, 1),
"avg_time": round(avg_time, 1),
"min_time": round(min_time, 1),
"max_time": round(max_time, 1),
"avg_confidence": round(avg_conf * 100, 1),
"peak_memory_mb": round(peak_memory_mb, 0),
"timestamp": datetime.now().isoformat()
}
# Write to file if specified
if output_file:
# Append to existing results
all_results = []
if Path(output_file).exists():
try:
with open(output_file) as f:
all_results = json.load(f)
except:
all_results = []
all_results.append(result_data)
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2)
print(f"\nResults saved to: {output_file}")
return result_data
def main():
parser = argparse.ArgumentParser(description="Parallel OCR Test")
parser.add_argument("--port", type=int, default=8006, help="Backend port")
parser.add_argument("--host", default="localhost", help="Backend host")
parser.add_argument("--workers", type=int, default=1, help="Number of OCR workers (for labeling)")
parser.add_argument("--output", type=str, help="Output JSON file for results")
args = parser.parse_args()
api_base = f"http://{args.host}:{args.port}"
run_test(api_base, args.workers, args.output, port=args.port)
if __name__ == "__main__":
main()