#!/usr/bin/env python3 """ Parallel OCR test for Windows. Run from backend directory: python tests\ocr-validation\test_receipts_parallel_windows.py """ import argparse import json import os import sys import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from pathlib import Path import requests from jose import jwt try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False print("Warning: psutil not installed, memory tracking disabled") # Paths - relative to backend directory SCRIPT_DIR = Path(__file__).parent BACKEND_DIR = SCRIPT_DIR.parent.parent / "backend" PDF_FOLDER = SCRIPT_DIR.parent.parent / "tests" / "fixtures" / "ocr-samples" EXPECTED_FILE = SCRIPT_DIR / "expected_receipts.json" class MemoryMonitor: """Monitor memory usage of backend process and its children (OCR workers).""" def __init__(self, port=8006): self.port = port self.peak_memory_mb = 0 self.current_memory_mb = 0 self._stop_event = threading.Event() self._thread = None self._process = None def _find_backend_process(self): """Find the backend process by port.""" if not PSUTIL_AVAILABLE: return None try: for conn in psutil.net_connections(kind='inet'): if conn.laddr.port == self.port and conn.status == 'LISTEN': return psutil.Process(conn.pid) except (psutil.AccessDenied, psutil.NoSuchProcess): pass return None def _get_total_memory(self): """Get total memory of backend + all child processes (OCR workers).""" if not self._process: self._process = self._find_backend_process() if not self._process: return 0 try: # Get memory of main process total = self._process.memory_info().rss # Add memory of all child processes (OCR workers) for child in self._process.children(recursive=True): try: total += child.memory_info().rss except (psutil.NoSuchProcess, psutil.AccessDenied): pass return total / (1024 * 1024) # Convert to MB except (psutil.NoSuchProcess, psutil.AccessDenied): self._process = None return 0 def _monitor_loop(self): """Background thread that monitors memory every 0.5s.""" while not self._stop_event.is_set(): mem = self._get_total_memory() if mem > 0: self.current_memory_mb = mem if mem > self.peak_memory_mb: self.peak_memory_mb = mem self._stop_event.wait(0.5) def start(self): """Start monitoring in background thread.""" if not PSUTIL_AVAILABLE: return self._stop_event.clear() self._thread = threading.Thread(target=self._monitor_loop, daemon=True) self._thread.start() # Wait a bit to get initial reading time.sleep(1) def stop(self): """Stop monitoring and return peak memory.""" if self._thread: self._stop_event.set() self._thread.join(timeout=2) return self.peak_memory_mb def get_jwt_token(): secret_key = os.getenv('JWT_SECRET_KEY', 'generate_with_secrets_token_urlsafe_32') now = datetime.utcnow() payload = { "username": "MARIUS", "user_id": 1, "companies": ["604"], "permissions": ["read", "write"], "exp": now + timedelta(hours=1), "iat": now, "type": "access" } return jwt.encode(payload, secret_key, algorithm="HS256") def submit_job(pdf_path, headers, api_base): """Submit OCR job and return job_id immediately.""" filename = os.path.basename(pdf_path) try: with open(pdf_path, "rb") as f: files = {"file": (filename, f, "application/pdf")} response = requests.post( f"{api_base}/api/data-entry/ocr/extract?engine=doctr_plus", files=files, headers=headers, timeout=30 ) if response.status_code == 200: return response.json().get("job_id"), filename, None return None, filename, f"HTTP {response.status_code}: {response.text[:100]}" except Exception as e: return None, filename, str(e) def wait_for_job(job_id, filename, headers, api_base, timeout=180): """Wait for job completion.""" start = time.time() while time.time() - start < timeout: try: resp = requests.get( f"{api_base}/api/data-entry/ocr/jobs/{job_id}/wait?timeout=30", headers=headers, timeout=35 ) if resp.status_code == 200: data = resp.json() status = data.get("status") if status == "completed": result = data.get("result", {}) conf = result.get("overall_confidence", 0) return {"success": True, "conf": conf, "time": time.time() - start, "filename": filename} elif status in ("error", "failed"): return {"success": False, "error": data.get("error", "unknown"), "time": time.time() - start, "filename": filename} time.sleep(1) except Exception as e: time.sleep(1) return {"success": False, "error": "timeout", "time": time.time() - start, "filename": filename} def run_test(api_base, workers, output_file=None, port=8006): """Run test and return results dict.""" # Load receipts if not EXPECTED_FILE.exists(): print(f"ERROR: {EXPECTED_FILE} not found!") return None with open(EXPECTED_FILE) as f: data = json.load(f) receipts = data.get("receipts", data) receipts = [r for r in receipts if r.get("pages", 1) == 1] token = get_jwt_token() headers = {"Authorization": f"Bearer {token}"} # Start memory monitoring memory_monitor = MemoryMonitor(port=port) memory_monitor.start() header = f"TEST: {len(receipts)} receipts, {workers} worker(s)" print() print("=" * 60) print(header) print(f"Backend: {api_base}") print("=" * 60) print() # PHASE 1: Submit ALL jobs rapidly print("Phase 1: Submitting all jobs...") total_start = time.time() jobs = [] for r in receipts: pdf_path = PDF_FOLDER / r["filename"] if pdf_path.exists(): job_id, filename, error = submit_job(str(pdf_path), headers, api_base) if job_id: jobs.append((job_id, filename)) else: print(f" Submit failed: {filename} - {error}") else: print(f" File not found: {r['filename']}") submit_time = time.time() - total_start print(f"Submitted {len(jobs)} jobs in {submit_time:.1f}s") print() # PHASE 2: Wait for ALL results in parallel print("Phase 2: Waiting for results...") wait_start = time.time() results = [] with ThreadPoolExecutor(max_workers=26) as executor: futures = { executor.submit(wait_for_job, job_id, fn, headers, api_base): fn for job_id, fn in jobs } for future in as_completed(futures): result = future.result() results.append(result) if result["success"]: print(f" OK: {result['filename'][:45]:47} {result['time']:5.1f}s conf={result['conf']:.0%}") else: print(f" ERR: {result['filename'][:45]:47} {result['time']:5.1f}s {result.get('error', '?')}") total_time = time.time() - total_start wait_time = time.time() - wait_start # Stop memory monitoring and get peak peak_memory_mb = memory_monitor.stop() # Summary print() print("=" * 60) print(f"SUMMARY - {workers} WORKER(S)") print("=" * 60) successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] print(f"Success: {len(successful)}/{len(results)}") print(f"Submit phase: {submit_time:.1f}s") print(f"Wait phase: {wait_time:.1f}s") print(f"TOTAL TIME: {total_time:.1f}s") if peak_memory_mb > 0: print(f"PEAK MEMORY: {peak_memory_mb:.0f} MB") avg_time = sum(r["time"] for r in successful) / len(successful) if successful else 0 min_time = min(r["time"] for r in successful) if successful else 0 max_time = max(r["time"] for r in successful) if successful else 0 avg_conf = sum(r["conf"] for r in successful) / len(successful) if successful else 0 if successful: print(f"\nPer-job: avg={avg_time:.1f}s, min={min_time:.1f}s, max={max_time:.1f}s") if failed: print(f"\nFailed jobs ({len(failed)}):") for r in failed: print(f" - {r['filename']}: {r.get('error', '?')}") # Build result dict result_data = { "workers": workers, "total_receipts": len(receipts), "submitted": len(jobs), "successful": len(successful), "failed": len(failed), "submit_time": round(submit_time, 1), "wait_time": round(wait_time, 1), "total_time": round(total_time, 1), "avg_time": round(avg_time, 1), "min_time": round(min_time, 1), "max_time": round(max_time, 1), "avg_confidence": round(avg_conf * 100, 1), "peak_memory_mb": round(peak_memory_mb, 0), "timestamp": datetime.now().isoformat() } # Write to file if specified if output_file: # Append to existing results all_results = [] if Path(output_file).exists(): try: with open(output_file) as f: all_results = json.load(f) except: all_results = [] all_results.append(result_data) with open(output_file, 'w') as f: json.dump(all_results, f, indent=2) print(f"\nResults saved to: {output_file}") return result_data def main(): parser = argparse.ArgumentParser(description="Parallel OCR Test") parser.add_argument("--port", type=int, default=8006, help="Backend port") parser.add_argument("--host", default="localhost", help="Backend host") parser.add_argument("--workers", type=int, default=1, help="Number of OCR workers (for labeling)") parser.add_argument("--output", type=str, help="Output JSON file for results") args = parser.parse_args() api_base = f"http://{args.host}:{args.port}" run_test(api_base, args.workers, args.output, port=args.port) if __name__ == "__main__": main()