#!/usr/bin/env python3 """Test receipts in PARALLEL to measure real worker benefit.""" import json import os import time import requests from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from jose import jwt API_BASE = "http://localhost:8000" # Paths - relative to project root from pathlib import Path SCRIPT_DIR = Path(__file__).parent PROJECT_ROOT = SCRIPT_DIR.parent.parent PDF_FOLDER = str(PROJECT_ROOT / "tests" / "fixtures" / "ocr-samples") EXPECTED_FILE = str(SCRIPT_DIR / "expected_receipts.json") def get_jwt_token(): secret_key = os.getenv('JWT_SECRET_KEY', 'generate_with_secrets_token_urlsafe_32') now = datetime.utcnow() payload = { "username": "MARIUS", "user_id": 1, "companies": ["604"], "permissions": ["read", "write"], "exp": now + timedelta(hours=1), "iat": now, "type": "access" } return jwt.encode(payload, secret_key, algorithm="HS256") def submit_job(pdf_path, headers): """Submit OCR job and return job_id immediately.""" filename = os.path.basename(pdf_path) try: with open(pdf_path, "rb") as f: files = {"file": (filename, f, "application/pdf")} response = requests.post( f"{API_BASE}/api/data-entry/ocr/extract?engine=doctr_plus", files=files, headers=headers, timeout=30 ) if response.status_code == 200: return response.json().get("job_id"), filename, None return None, filename, f"HTTP {response.status_code}" except Exception as e: return None, filename, str(e) def wait_for_job(job_id, filename, headers, timeout=180): """Wait for job completion.""" start = time.time() while time.time() - start < timeout: try: resp = requests.get( f"{API_BASE}/api/data-entry/ocr/jobs/{job_id}/wait?timeout=30", headers=headers, timeout=35 ) if resp.status_code == 200: data = resp.json() status = data.get("status") if status == "completed": result = data.get("result", {}) conf = result.get("overall_confidence", 0) return {"success": True, "conf": conf, "time": time.time() - start} elif status == "error": return {"success": False, "error": data.get("error", "unknown"), "time": time.time() - start} time.sleep(1) except Exception as e: time.sleep(1) return {"success": False, "error": "timeout", "time": time.time() - start} def main(): # Load receipts with open(EXPECTED_FILE) as f: data = json.load(f) receipts = data.get("receipts", data) receipts = [r for r in receipts if r.get("pages", 1) == 1] token = get_jwt_token() headers = {"Authorization": f"Bearer {token}"} print(f"\n{'='*60}") print(f"PARALLEL TEST: {len(receipts)} receipts") print(f"{'='*60}\n") # PHASE 1: Submit ALL jobs rapidly print("Phase 1: Submitting all jobs...") total_start = time.time() jobs = [] for r in receipts: pdf_path = os.path.join(PDF_FOLDER, r["filename"]) if os.path.exists(pdf_path): job_id, filename, error = submit_job(pdf_path, headers) if job_id: jobs.append((job_id, filename)) else: print(f" Submit failed: {filename} - {error}") submit_time = time.time() - total_start print(f"Submitted {len(jobs)} jobs in {submit_time:.1f}s") # PHASE 2: Wait for ALL results in parallel print("\nPhase 2: Waiting for results...") wait_start = time.time() results = [] with ThreadPoolExecutor(max_workers=26) as executor: futures = {executor.submit(wait_for_job, job_id, fn, headers): fn for job_id, fn in jobs} for future in as_completed(futures): filename = futures[future] result = future.result() result["filename"] = filename results.append(result) if result["success"]: print(f" OK: {filename[:45]:47} {result['time']:5.1f}s conf={result['conf']:.0%}") else: print(f" ERR: {filename[:45]:47} {result['time']:5.1f}s {result.get('error','?')}") total_time = time.time() - total_start # Summary print(f"\n{'='*60}") print("SUMMARY") print(f"{'='*60}") successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] print(f"Success: {len(successful)}/{len(results)}") print(f"Submit phase: {submit_time:.1f}s") print(f"Wait phase: {time.time() - wait_start:.1f}s") print(f"TOTAL TIME: {total_time:.1f}s") if successful: times = [r["time"] for r in successful] print(f"\nPer-job: avg={sum(times)/len(times):.1f}s, min={min(times):.1f}s, max={max(times):.1f}s") if __name__ == "__main__": main()