#!/usr/bin/env python3 """ Store Profile Generator Script Analyzes PDF receipts from a store and generates a Python profile class for the OCR extraction system. Usage: python scripts/generate_store_profile.py \ --name "Magazin Exemplu" \ --cui "12345678" \ --receipts "docs/data-entry/MagazinExemplu*.pdf" \ --output "backend/modules/data_entry/services/ocr/profiles/magazin_exemplu.py" Features: - Submits PDFs to OCR API - Analyzes extracted text for patterns (TVA, total, date, payment) - Generates a BaseStoreProfile subclass with detected patterns - Supports hot-reload via ProfileRegistry Requirements: - Backend server running on localhost:8000 - JWT authentication - python-jose, requests packages """ import argparse import glob import json import os import re import sys import time from collections import Counter, defaultdict from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple try: import requests from jose import jwt except ImportError: print("Error: Required packages not installed.") print("Run: pip install python-jose requests") sys.exit(1) # Configuration API_BASE = os.getenv("API_BASE", "http://localhost:8000") JWT_SECRET = os.getenv("JWT_SECRET_KEY", "GENERATE_NEW_SECRET_FOR_PRODUCTION3334!") def create_jwt_token() -> str: """Create a test JWT token for API authentication.""" payload = { "username": "PROFILE_GENERATOR", "user_id": 1, "companies": ["604"], "permissions": ["read", "write"], "exp": datetime.now(timezone.utc) + timedelta(hours=1), "iat": datetime.now(timezone.utc), "type": "access" } return jwt.encode(payload, JWT_SECRET, algorithm="HS256") def submit_ocr(pdf_path: str, token: str, api_base: str = API_BASE, timeout: int = 120) -> Optional[Dict]: """ Submit a PDF to OCR API and wait for result. Args: pdf_path: Path to PDF file token: JWT authentication token api_base: API base URL timeout: Max seconds to wait for completion Returns: Extraction result dict or None on failure """ headers = {"Authorization": f"Bearer {token}"} filename = os.path.basename(pdf_path) print(f" Submitting: {filename}...", end=" ", flush=True) try: with open(pdf_path, "rb") as f: files = {"file": (filename, f, "application/pdf")} response = requests.post( f"{api_base}/api/data-entry/ocr/extract?engine=doctr_plus", files=files, headers=headers, timeout=30 ) if response.status_code != 200: print(f"FAILED (HTTP {response.status_code})") return None job_data = response.json() job_id = job_data.get("job_id") if not job_id: print("FAILED (no job_id)") return None # Poll for completion start_time = time.time() while time.time() - start_time < timeout: poll_response = requests.get( f"{api_base}/api/data-entry/ocr/jobs/{job_id}/wait?timeout=30", headers=headers, timeout=35 ) if poll_response.status_code == 200: job_result = poll_response.json() status = job_result.get("status") if status == "completed": elapsed = time.time() - start_time print(f"OK ({elapsed:.1f}s)") return job_result.get("result", {}) elif status == "error": print(f"ERROR: {job_result.get('error', 'Unknown')}") return None time.sleep(2) print("TIMEOUT") return None except Exception as e: print(f"EXCEPTION: {e}") return None def analyze_tva_patterns(results: List[Dict]) -> Dict: """ Analyze TVA patterns from multiple extraction results. Returns: Dict with detected patterns and statistics """ tva_entries = [] raw_texts = [] for r in results: if r.get("tva_entries"): tva_entries.extend(r["tva_entries"]) if r.get("raw_text"): raw_texts.append(r["raw_text"]) # Analyze TVA code patterns (A, B, C, etc.) codes = Counter(e.get("code") for e in tva_entries if e.get("code")) # Analyze TVA percentage patterns percents = Counter(e.get("percent") for e in tva_entries if e.get("percent")) # Detect TVA format from raw text tva_formats = defaultdict(int) for text in raw_texts: text_upper = text.upper() # Standard format: "TVA 19% 10.50" or "TVA: 19% 10.50" if re.search(r'TVA\s*:?\s*\d{1,2}%', text_upper): tva_formats["standard"] += 1 # Lidl format: "TVA A 21% 7.71" if re.search(r'TVA\s+[A-D]\s+\d{1,2}', text_upper): tva_formats["lidl_multi_rate"] += 1 # Table format: "BAZA TVA | % TVA | VALOARE TVA" if re.search(r'BAZA\s+TVA', text_upper): tva_formats["table"] += 1 # No TVA (neplatitor) if re.search(r'NEPLATITOR|NON.?TVA', text_upper): tva_formats["non_vat"] += 1 return { "codes": dict(codes), "percents": dict(percents), "formats": dict(tva_formats), "has_multi_rate": len(codes) > 1, "is_non_vat": tva_formats.get("non_vat", 0) > 0, "dominant_format": max(tva_formats, key=tva_formats.get) if tva_formats else "standard" } def analyze_total_patterns(results: List[Dict]) -> Dict: """Analyze TOTAL patterns from extraction results.""" totals = [] raw_texts = [] for r in results: if r.get("amount"): totals.append(float(r["amount"])) if r.get("raw_text"): raw_texts.append(r["raw_text"]) total_formats = defaultdict(int) for text in raw_texts: text_upper = text.upper() if re.search(r'TOTAL\s*:?\s*[\d.,]+', text_upper): total_formats["TOTAL:"] += 1 if re.search(r'TOTAL\s+DE\s+PLAT', text_upper): total_formats["TOTAL DE PLATA"] += 1 if re.search(r'SUMA\s+TOTAL', text_upper): total_formats["SUMA TOTALA"] += 1 if re.search(r'GRAND\s*TOTAL', text_upper): total_formats["GRAND TOTAL"] += 1 return { "count": len(totals), "formats": dict(total_formats), "dominant_format": max(total_formats, key=total_formats.get) if total_formats else "TOTAL" } def analyze_date_patterns(results: List[Dict]) -> Dict: """Analyze date patterns from extraction results.""" dates = [] raw_texts = [] for r in results: if r.get("receipt_date"): dates.append(r["receipt_date"]) if r.get("raw_text"): raw_texts.append(r["raw_text"]) date_formats = defaultdict(int) for text in raw_texts: # DD.MM.YYYY if re.search(r'\d{2}\.\d{2}\.\d{4}', text): date_formats["DD.MM.YYYY"] += 1 # YYYY.MM.DD (OMV/SOCAR style) if re.search(r'\d{4}\.\d{2}\.\d{2}', text): date_formats["YYYY.MM.DD"] += 1 # DD-MM-YYYY if re.search(r'\d{2}-\d{2}-\d{4}', text): date_formats["DD-MM-YYYY"] += 1 # DD/MM/YYYY if re.search(r'\d{2}/\d{2}/\d{4}', text): date_formats["DD/MM/YYYY"] += 1 return { "extracted_dates": dates, "formats": dict(date_formats), "dominant_format": max(date_formats, key=date_formats.get) if date_formats else "DD.MM.YYYY" } def analyze_payment_patterns(results: List[Dict]) -> Dict: """Analyze payment method patterns.""" payment_counts = defaultdict(int) for r in results: methods = r.get("payment_methods", []) for m in methods: method_type = m.get("method", "UNKNOWN") payment_counts[method_type] += 1 return { "methods": dict(payment_counts), "has_mixed_payments": len(payment_counts) > 1 } def analyze_client_patterns(results: List[Dict]) -> Dict: """Analyze client (B2B) patterns.""" has_client_cui = 0 has_client_name = 0 for r in results: if r.get("client_cui"): has_client_cui += 1 if r.get("client_name"): has_client_name += 1 return { "has_client_cui": has_client_cui > 0, "has_client_name": has_client_name > 0, "b2b_ratio": has_client_cui / len(results) if results else 0 } def generate_profile_code( store_name: str, cui: str, tva_analysis: Dict, total_analysis: Dict, date_analysis: Dict, payment_analysis: Dict, client_analysis: Dict ) -> str: """ Generate Python profile class code. Args: store_name: Human-readable store name cui: CUI number (without RO prefix) *_analysis: Analysis results from pattern detection Returns: Python source code for the profile class """ # Generate class name from store name class_name = "".join( word.capitalize() for word in re.sub(r'[^a-zA-Z0-9\s]', '', store_name).split() ) + "Profile" # Generate module name module_name = re.sub(r'[^a-z0-9]', '_', store_name.lower()).strip('_') # Determine profile characteristics is_non_vat = tva_analysis.get("is_non_vat", False) has_multi_rate = tva_analysis.get("has_multi_rate", False) has_client_cui = client_analysis.get("has_client_cui", False) uses_yyyy_mm_dd = date_analysis.get("dominant_format") == "YYYY.MM.DD" # Generate OCR name patterns name_words = store_name.upper().split() primary_word = name_words[0] if name_words else store_name.upper() name_patterns = [ primary_word, store_name.upper().replace(".", "").replace(",", ""), ] # Add OCR error variants ocr_variants = { 'O': '0', 'I': '1', 'L': '1', 'S': '5', 'B': '8', 'E': '3' } for char, replacement in ocr_variants.items(): if char in primary_word: name_patterns.append(primary_word.replace(char, replacement, 1)) name_patterns = list(dict.fromkeys(name_patterns))[:4] # Unique, max 4 # Build the code code_lines = [ '"""', f'{store_name} store profile for OCR extraction.', '', 'Auto-generated by generate_store_profile.py', f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M")}', '"""', '', 'import re', 'from decimal import Decimal, InvalidOperation', 'from typing import List, Dict, Any', '', 'from .base import BaseStoreProfile', 'from . import ProfileRegistry', '', '', '@ProfileRegistry.register', f'class {class_name}(BaseStoreProfile):', ' """', f' {store_name} - OCR extraction profile.', ' ', ] # Add characteristics to docstring characteristics = [] if is_non_vat: characteristics.append("Non-VAT payer (neplatitor TVA)") if has_multi_rate: characteristics.append("Multi-rate TVA") if has_client_cui: characteristics.append("B2B receipts with client CUI") if uses_yyyy_mm_dd: characteristics.append("Date format: YYYY.MM.DD") if characteristics: code_lines.append(' Key characteristics:') for c in characteristics: code_lines.append(f' - {c}') code_lines.append(' ') code_lines.extend([ ' """', '', f' CUI_LIST = ["{cui}"]', f' NAME_PATTERNS = {name_patterns}', f' STORE_NAME = "{store_name}"', '', ]) # Add date patterns override for YYYY.MM.DD format if uses_yyyy_mm_dd: code_lines.extend([ ' # Override date patterns for YYYY.MM.DD format', ' DATE_PATTERNS_OCR_SPACES = [', ' r\'(\\d{4})[.,]\\s*(\\d{2})[.,]\\s*(\\d{2})\', # YYYY. MM. DD with spaces', ' r\'(\\d{4})[.,](\\d{2})[.,](\\d{2})\', # YYYY.MM.DD', ' ]', '', ]) # Add TVA extraction method for multi-rate or non-VAT if is_non_vat: code_lines.extend([ ' def extract_tva_entries(self, text: str) -> List[dict]:', ' """Non-VAT payer - returns empty list."""', ' return []', '', ]) elif has_multi_rate and tva_analysis.get("dominant_format") == "lidl_multi_rate": code_lines.extend([ ' # Store-specific TVA patterns', ' TVA_PATTERNS = [', ' r\'T[VU][AR]\\s+([A-D])\\s+(\\d{1,2})[.,]?\\d{0,2}\\s*%\\s+([\\d.,]+)\',', ' ]', '', ' def extract_tva_entries(self, text: str) -> List[dict]:', ' """Extract multi-rate TVA entries."""', ' entries = []', ' seen = set()', '', ' for pattern in self.TVA_PATTERNS:', ' for match in re.finditer(pattern, text, re.IGNORECASE):', ' try:', ' code = match.group(1).upper()', ' percent = int(match.group(2))', ' amount = self._parse_decimal(match.group(3))', '', ' if amount and amount > 0:', ' entry_key = (code, percent)', ' if entry_key not in seen:', ' entries.append({', ' \'code\': code,', ' \'percent\': percent,', ' \'amount\': amount', ' })', ' seen.add(entry_key)', ' except (ValueError, InvalidOperation):', ' continue', '', ' return entries', '', ]) # Add validation hints method code_lines.extend([ ' def get_validation_hints(self) -> Dict[str, Any]:', f' """Return {store_name}-specific validation hints."""', ' return {', f' "has_multi_rate_tva": {has_multi_rate},', f' "card_equals_total": True,', f' "has_client_cui": {has_client_cui},', f' "has_efactura": False,', f' "is_non_vat_payer": {is_non_vat},', ' }', ]) return '\n'.join(code_lines) + '\n' def main(): parser = argparse.ArgumentParser( description="Generate store profile from PDF receipts", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Generate profile from a single PDF python scripts/generate_store_profile.py \\ --name "Magazin Nou" --cui "12345678" \\ --receipts "docs/data-entry/magazin_nou.pdf" # Generate profile from multiple PDFs (glob pattern) python scripts/generate_store_profile.py \\ --name "Carrefour" --cui "2475489" \\ --receipts "docs/data-entry/Carrefour*.pdf" \\ --output backend/modules/data_entry/services/ocr/profiles/carrefour.py # Dry run (analyze only, don't write file) python scripts/generate_store_profile.py \\ --name "Test Store" --cui "11111111" \\ --receipts "docs/data-entry/test*.pdf" \\ --dry-run """ ) parser.add_argument("--name", required=True, help="Store name (e.g., 'LIDL DISCOUNT S.R.L.')") parser.add_argument("--cui", required=True, help="CUI number without RO prefix") parser.add_argument("--receipts", required=True, help="PDF file path or glob pattern") parser.add_argument("--output", help="Output file path (default: auto-generated)") parser.add_argument("--dry-run", action="store_true", help="Analyze only, don't write file") parser.add_argument("--api-base", default=API_BASE, help=f"API base URL (default: {API_BASE})") args = parser.parse_args() # Update API base if provided api_base = args.api_base # Validate CUI format cui = args.cui.strip().replace("RO", "").replace(" ", "") if not cui.isdigit() or len(cui) < 6 or len(cui) > 10: print(f"Error: Invalid CUI format: {args.cui}") sys.exit(1) # Find PDF files pdf_files = glob.glob(args.receipts) if not pdf_files: print(f"Error: No PDF files found matching: {args.receipts}") sys.exit(1) print(f"\n{'='*60}") print(f"Store Profile Generator") print(f"{'='*60}") print(f"Store: {args.name}") print(f"CUI: {cui}") print(f"PDFs: {len(pdf_files)} files") print(f"{'='*60}\n") # Generate JWT token token = create_jwt_token() # Submit PDFs to OCR print("Step 1: Submitting PDFs to OCR API...") results = [] for pdf_path in pdf_files: result = submit_ocr(pdf_path, token, api_base=api_base) if result: results.append(result) if not results: print("\nError: No successful extractions. Check if backend is running.") sys.exit(1) print(f"\nSuccessfully extracted: {len(results)}/{len(pdf_files)} PDFs") # Analyze patterns print("\nStep 2: Analyzing patterns...") tva_analysis = analyze_tva_patterns(results) total_analysis = analyze_total_patterns(results) date_analysis = analyze_date_patterns(results) payment_analysis = analyze_payment_patterns(results) client_analysis = analyze_client_patterns(results) print(f" TVA: {tva_analysis['dominant_format']} format, multi-rate={tva_analysis['has_multi_rate']}") print(f" Date: {date_analysis['dominant_format']} format") print(f" Payments: {list(payment_analysis['methods'].keys())}") print(f" B2B: {client_analysis['has_client_cui']}") # Generate profile code print("\nStep 3: Generating profile code...") code = generate_profile_code( store_name=args.name, cui=cui, tva_analysis=tva_analysis, total_analysis=total_analysis, date_analysis=date_analysis, payment_analysis=payment_analysis, client_analysis=client_analysis ) # Determine output path if args.output: output_path = args.output else: module_name = re.sub(r'[^a-z0-9]', '_', args.name.lower()).strip('_') output_path = f"backend/modules/data_entry/services/ocr/profiles/{module_name}.py" if args.dry_run: print(f"\n[DRY RUN] Would write to: {output_path}") print(f"\n{'='*60}") print("Generated code:") print(f"{'='*60}") print(code) else: # Write file os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'w') as f: f.write(code) print(f" Written to: {output_path}") # Verify syntax import py_compile try: py_compile.compile(output_path, doraise=True) print(f" Syntax check: OK") except py_compile.PyCompileError as e: print(f" Syntax check: FAILED - {e}") print(f"\n{'='*60}") print("Profile generation complete!") print(f"{'='*60}") if not args.dry_run: print(f"\nNext steps:") print(f"1. Review the generated code: {output_path}") print(f"2. Customize patterns if needed") print(f"3. Hot-reload profiles: curl -X POST http://localhost:8000/api/data-entry/ocr/profiles/reload") print(f"4. Test with a sample receipt") if __name__ == "__main__": main()