- Delete 9 deprecated/obsolete docs (~6,300 lines removed) - Move test PDFs to tests/fixtures/ocr-samples/ - Create docs/DEPLOYMENT.md as principal guide - Create tests/ocr-validation/README.md - Update all refs for ultrathin monolith architecture - Update OCR tests to use relative paths Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
234 lines
8.0 KiB
Python
234 lines
8.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Test each receipt sequentially and report results."""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import requests
|
|
from datetime import datetime, timedelta
|
|
from jose import jwt
|
|
|
|
API_BASE = "http://localhost:8000"
|
|
|
|
# Paths - relative to project root
|
|
from pathlib import Path
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
PROJECT_ROOT = SCRIPT_DIR.parent.parent
|
|
PDF_FOLDER = str(PROJECT_ROOT / "tests" / "fixtures" / "ocr-samples")
|
|
EXPECTED_FILE = str(SCRIPT_DIR / "expected_receipts.json")
|
|
|
|
def get_jwt_token():
|
|
"""Create a test JWT token for API authentication."""
|
|
secret_key = os.getenv('JWT_SECRET_KEY', 'generate_with_secrets_token_urlsafe_32')
|
|
now = datetime.utcnow()
|
|
expire = now + timedelta(hours=1)
|
|
|
|
payload = {
|
|
"username": "MARIUS",
|
|
"user_id": 1,
|
|
"companies": ["604"],
|
|
"permissions": ["read", "write"],
|
|
"exp": expire,
|
|
"iat": now,
|
|
"type": "access"
|
|
}
|
|
return jwt.encode(payload, secret_key, algorithm="HS256")
|
|
|
|
def test_receipt(pdf_path: str, expected: dict, headers: dict) -> dict:
|
|
"""Test a single receipt and return results."""
|
|
filename = os.path.basename(pdf_path)
|
|
result = {
|
|
"filename": filename,
|
|
"success": False,
|
|
"time_ms": 0,
|
|
"error": None,
|
|
"extracted": {},
|
|
"matches": {},
|
|
"issues": []
|
|
}
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
with open(pdf_path, "rb") as f:
|
|
files = {"file": (filename, f, "application/pdf")}
|
|
response = requests.post(
|
|
f"{API_BASE}/api/data-entry/ocr/extract?engine=doctr_plus",
|
|
files=files,
|
|
headers=headers,
|
|
timeout=120
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
result["error"] = f"HTTP {response.status_code}"
|
|
result["time_ms"] = int((time.time() - start_time) * 1000)
|
|
return result
|
|
|
|
job_data = response.json()
|
|
job_id = job_data.get("job_id")
|
|
|
|
# Poll for completion
|
|
for _ in range(60): # Max 60 polls (2 minutes)
|
|
poll_response = requests.get(
|
|
f"{API_BASE}/api/data-entry/ocr/jobs/{job_id}/wait?timeout=30",
|
|
headers=headers,
|
|
timeout=35
|
|
)
|
|
if poll_response.status_code == 200:
|
|
job_result = poll_response.json()
|
|
status = job_result.get("status")
|
|
if status == "completed":
|
|
break
|
|
elif status == "error":
|
|
result["error"] = job_result.get("error", "Unknown error")
|
|
result["time_ms"] = int((time.time() - start_time) * 1000)
|
|
return result
|
|
time.sleep(1)
|
|
|
|
result["time_ms"] = int((time.time() - start_time) * 1000)
|
|
|
|
if job_result.get("status") != "completed":
|
|
result["error"] = f"Timeout - status: {job_result.get('status')}"
|
|
return result
|
|
|
|
# Extract fields (correct field names from API)
|
|
extraction = job_result.get("result", {})
|
|
result["extracted"] = {
|
|
"total": extraction.get("amount"), # API uses "amount" not "total"
|
|
"date": extraction.get("receipt_date"), # API uses "receipt_date" not "date"
|
|
"cui": extraction.get("cui"),
|
|
"tva_total": extraction.get("tva_total"),
|
|
"confidence": extraction.get("overall_confidence")
|
|
}
|
|
|
|
# Compare with expected (use correct field names from expected_receipts.json)
|
|
exp_total = expected.get("total")
|
|
exp_date = expected.get("data_bon")
|
|
exp_cui = expected.get("cui_furnizor")
|
|
|
|
# Normalize for comparison
|
|
def normalize_total(val):
|
|
if val is None:
|
|
return None
|
|
return float(str(val).replace(',', '.'))
|
|
|
|
def normalize_cui(val):
|
|
if val is None:
|
|
return None
|
|
return str(val).upper().replace('RO', '').replace(' ', '').strip()
|
|
|
|
ext_total = normalize_total(result["extracted"]["total"])
|
|
ext_cui = normalize_cui(result["extracted"]["cui"])
|
|
exp_cui_norm = normalize_cui(exp_cui)
|
|
exp_total_norm = normalize_total(exp_total)
|
|
|
|
result["matches"]["total"] = abs(ext_total - exp_total_norm) < 0.01 if ext_total and exp_total_norm else None
|
|
result["matches"]["date"] = result["extracted"]["date"] == exp_date if exp_date else None
|
|
result["matches"]["cui"] = ext_cui == exp_cui_norm if exp_cui else None
|
|
|
|
# Check for issues
|
|
if exp_total and not result["matches"]["total"]:
|
|
result["issues"].append(f"TOTAL: got {result['extracted']['total']}, expected {exp_total}")
|
|
if exp_date and not result["matches"]["date"]:
|
|
result["issues"].append(f"DATE: got {result['extracted']['date']}, expected {exp_date}")
|
|
if exp_cui and not result["matches"]["cui"]:
|
|
result["issues"].append(f"CUI: got {result['extracted']['cui']}, expected {exp_cui}")
|
|
|
|
result["success"] = len(result["issues"]) == 0
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
result["time_ms"] = int((time.time() - start_time) * 1000)
|
|
|
|
return result
|
|
|
|
def main():
|
|
# Load expected data
|
|
with open(EXPECTED_FILE) as f:
|
|
expected_data = json.load(f)
|
|
|
|
# Handle both formats: list or dict with "receipts" key
|
|
if isinstance(expected_data, dict) and "receipts" in expected_data:
|
|
all_receipts = expected_data["receipts"]
|
|
else:
|
|
all_receipts = expected_data
|
|
|
|
# Get JWT token
|
|
token = get_jwt_token()
|
|
if not token:
|
|
print("ERROR: Could not get JWT token")
|
|
sys.exit(1)
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Filter single-page receipts
|
|
receipts = [r for r in all_receipts if r.get("pages", 1) == 1]
|
|
print(f"\n{'='*60}")
|
|
print(f"Testing {len(receipts)} single-page receipts with doctr_plus")
|
|
print(f"{'='*60}\n")
|
|
|
|
results = []
|
|
times = []
|
|
|
|
for i, receipt in enumerate(receipts, 1):
|
|
filename = receipt["filename"]
|
|
pdf_path = os.path.join(PDF_FOLDER, filename)
|
|
|
|
if not os.path.exists(pdf_path):
|
|
print(f"[{i:02d}/{len(receipts)}] SKIP: {filename} (not found)")
|
|
continue
|
|
|
|
print(f"[{i:02d}/{len(receipts)}] Testing: {filename[:50]}...", end=" ", flush=True)
|
|
|
|
result = test_receipt(pdf_path, receipt, headers)
|
|
results.append(result)
|
|
|
|
if result["error"]:
|
|
print(f"ERROR ({result['time_ms']}ms): {result['error']}")
|
|
elif result["success"]:
|
|
print(f"OK ({result['time_ms']}ms) conf={result['extracted'].get('confidence', 0):.2f}")
|
|
times.append(result["time_ms"])
|
|
else:
|
|
print(f"FAIL ({result['time_ms']}ms): {'; '.join(result['issues'])}")
|
|
times.append(result["time_ms"])
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
print("SUMMARY")
|
|
print(f"{'='*60}")
|
|
|
|
successful = [r for r in results if r["success"]]
|
|
failed = [r for r in results if not r["success"] and not r["error"]]
|
|
errors = [r for r in results if r["error"]]
|
|
|
|
print(f"Total: {len(results)}")
|
|
print(f"Success: {len(successful)} ({len(successful)*100/len(results):.1f}%)")
|
|
print(f"Failed: {len(failed)}")
|
|
print(f"Errors: {len(errors)}")
|
|
|
|
if times:
|
|
avg_time = sum(times) / len(times)
|
|
print(f"\nTiming: avg={avg_time:.0f}ms, min={min(times)}ms, max={max(times)}ms")
|
|
|
|
# Flag slow ones
|
|
slow_threshold = avg_time * 2
|
|
slow = [r for r in results if r["time_ms"] > slow_threshold and not r["error"]]
|
|
if slow:
|
|
print(f"\nSlow receipts (>{slow_threshold:.0f}ms):")
|
|
for r in slow:
|
|
print(f" - {r['filename']}: {r['time_ms']}ms")
|
|
|
|
if failed:
|
|
print(f"\nFailed receipts:")
|
|
for r in failed:
|
|
print(f" - {r['filename']}: {'; '.join(r['issues'])}")
|
|
|
|
if errors:
|
|
print(f"\nError receipts:")
|
|
for r in errors:
|
|
print(f" - {r['filename']}: {r['error']}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|