Files
roa2web-service-auto/scripts/generate_store_profile.py
Claude Agent 099556213d feat(ocr): Add modular store profiles with hot-reload support
## Store Profiles System
- Add ProfileRegistry for CUI-based profile lookup
- Add BaseStoreProfile with generic extraction patterns
- Implement hot-reload via POST /api/data-entry/ocr/profiles/reload

## 12 Store Profiles
- LIDL: Multi-rate TVA (A, B, C, D codes)
- OMV, SOCAR: B2B with client CUI, YYYY.MM.DD dates
- BRICK, DEDEMAN: Standard TVA, e-factura support
- KINETERRA, BEST PRINT: Non-VAT payers (returns [])
- STEPOUT MARKET: TVA 5% (books/reduced rate)
- UNLIMITED KEYS: NUMERAR payment detection
- GAMA INK, ELECTROBERING, PICTUS VELUM: Standard TVA

## Flexible TVA Patterns
- All patterns use (\d{1,2})% to accept any rate
- Supports historical (19%, 9%, 5%) and current (21%, 11%)

## Payment Methods Fix
- Fixed base.py to support multiple payments of same type
- Changed deduplication from method-only to (method, amount) tuple
- Returns separate entries for split payments

## Tools
- Add generate_store_profile.py for automatic profile generation
- Analyzes PDFs via OCR API and detects patterns

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 23:07:07 +00:00

601 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Store Profile Generator Script
Analyzes PDF receipts from a store and generates a Python profile class
for the OCR extraction system.
Usage:
python scripts/generate_store_profile.py \
--name "Magazin Exemplu" \
--cui "12345678" \
--receipts "docs/data-entry/MagazinExemplu*.pdf" \
--output "backend/modules/data_entry/services/ocr/profiles/magazin_exemplu.py"
Features:
- Submits PDFs to OCR API
- Analyzes extracted text for patterns (TVA, total, date, payment)
- Generates a BaseStoreProfile subclass with detected patterns
- Supports hot-reload via ProfileRegistry
Requirements:
- Backend server running on localhost:8000
- JWT authentication
- python-jose, requests packages
"""
import argparse
import glob
import json
import os
import re
import sys
import time
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
try:
import requests
from jose import jwt
except ImportError:
print("Error: Required packages not installed.")
print("Run: pip install python-jose requests")
sys.exit(1)
# Configuration
API_BASE = os.getenv("API_BASE", "http://localhost:8000")
JWT_SECRET = os.getenv("JWT_SECRET_KEY", "GENERATE_NEW_SECRET_FOR_PRODUCTION3334!")
def create_jwt_token() -> str:
"""Create a test JWT token for API authentication."""
payload = {
"username": "PROFILE_GENERATOR",
"user_id": 1,
"companies": ["604"],
"permissions": ["read", "write"],
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
"iat": datetime.now(timezone.utc),
"type": "access"
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def submit_ocr(pdf_path: str, token: str, api_base: str = API_BASE, timeout: int = 120) -> Optional[Dict]:
"""
Submit a PDF to OCR API and wait for result.
Args:
pdf_path: Path to PDF file
token: JWT authentication token
api_base: API base URL
timeout: Max seconds to wait for completion
Returns:
Extraction result dict or None on failure
"""
headers = {"Authorization": f"Bearer {token}"}
filename = os.path.basename(pdf_path)
print(f" Submitting: {filename}...", end=" ", flush=True)
try:
with open(pdf_path, "rb") as f:
files = {"file": (filename, f, "application/pdf")}
response = requests.post(
f"{api_base}/api/data-entry/ocr/extract?engine=doctr_plus",
files=files,
headers=headers,
timeout=30
)
if response.status_code != 200:
print(f"FAILED (HTTP {response.status_code})")
return None
job_data = response.json()
job_id = job_data.get("job_id")
if not job_id:
print("FAILED (no job_id)")
return None
# Poll for completion
start_time = time.time()
while time.time() - start_time < timeout:
poll_response = requests.get(
f"{api_base}/api/data-entry/ocr/jobs/{job_id}/wait?timeout=30",
headers=headers,
timeout=35
)
if poll_response.status_code == 200:
job_result = poll_response.json()
status = job_result.get("status")
if status == "completed":
elapsed = time.time() - start_time
print(f"OK ({elapsed:.1f}s)")
return job_result.get("result", {})
elif status == "error":
print(f"ERROR: {job_result.get('error', 'Unknown')}")
return None
time.sleep(2)
print("TIMEOUT")
return None
except Exception as e:
print(f"EXCEPTION: {e}")
return None
def analyze_tva_patterns(results: List[Dict]) -> Dict:
"""
Analyze TVA patterns from multiple extraction results.
Returns:
Dict with detected patterns and statistics
"""
tva_entries = []
raw_texts = []
for r in results:
if r.get("tva_entries"):
tva_entries.extend(r["tva_entries"])
if r.get("raw_text"):
raw_texts.append(r["raw_text"])
# Analyze TVA code patterns (A, B, C, etc.)
codes = Counter(e.get("code") for e in tva_entries if e.get("code"))
# Analyze TVA percentage patterns
percents = Counter(e.get("percent") for e in tva_entries if e.get("percent"))
# Detect TVA format from raw text
tva_formats = defaultdict(int)
for text in raw_texts:
text_upper = text.upper()
# Standard format: "TVA 19% 10.50" or "TVA: 19% 10.50"
if re.search(r'TVA\s*:?\s*\d{1,2}%', text_upper):
tva_formats["standard"] += 1
# Lidl format: "TVA A 21% 7.71"
if re.search(r'TVA\s+[A-D]\s+\d{1,2}', text_upper):
tva_formats["lidl_multi_rate"] += 1
# Table format: "BAZA TVA | % TVA | VALOARE TVA"
if re.search(r'BAZA\s+TVA', text_upper):
tva_formats["table"] += 1
# No TVA (neplatitor)
if re.search(r'NEPLATITOR|NON.?TVA', text_upper):
tva_formats["non_vat"] += 1
return {
"codes": dict(codes),
"percents": dict(percents),
"formats": dict(tva_formats),
"has_multi_rate": len(codes) > 1,
"is_non_vat": tva_formats.get("non_vat", 0) > 0,
"dominant_format": max(tva_formats, key=tva_formats.get) if tva_formats else "standard"
}
def analyze_total_patterns(results: List[Dict]) -> Dict:
"""Analyze TOTAL patterns from extraction results."""
totals = []
raw_texts = []
for r in results:
if r.get("amount"):
totals.append(float(r["amount"]))
if r.get("raw_text"):
raw_texts.append(r["raw_text"])
total_formats = defaultdict(int)
for text in raw_texts:
text_upper = text.upper()
if re.search(r'TOTAL\s*:?\s*[\d.,]+', text_upper):
total_formats["TOTAL:"] += 1
if re.search(r'TOTAL\s+DE\s+PLAT', text_upper):
total_formats["TOTAL DE PLATA"] += 1
if re.search(r'SUMA\s+TOTAL', text_upper):
total_formats["SUMA TOTALA"] += 1
if re.search(r'GRAND\s*TOTAL', text_upper):
total_formats["GRAND TOTAL"] += 1
return {
"count": len(totals),
"formats": dict(total_formats),
"dominant_format": max(total_formats, key=total_formats.get) if total_formats else "TOTAL"
}
def analyze_date_patterns(results: List[Dict]) -> Dict:
"""Analyze date patterns from extraction results."""
dates = []
raw_texts = []
for r in results:
if r.get("receipt_date"):
dates.append(r["receipt_date"])
if r.get("raw_text"):
raw_texts.append(r["raw_text"])
date_formats = defaultdict(int)
for text in raw_texts:
# DD.MM.YYYY
if re.search(r'\d{2}\.\d{2}\.\d{4}', text):
date_formats["DD.MM.YYYY"] += 1
# YYYY.MM.DD (OMV/SOCAR style)
if re.search(r'\d{4}\.\d{2}\.\d{2}', text):
date_formats["YYYY.MM.DD"] += 1
# DD-MM-YYYY
if re.search(r'\d{2}-\d{2}-\d{4}', text):
date_formats["DD-MM-YYYY"] += 1
# DD/MM/YYYY
if re.search(r'\d{2}/\d{2}/\d{4}', text):
date_formats["DD/MM/YYYY"] += 1
return {
"extracted_dates": dates,
"formats": dict(date_formats),
"dominant_format": max(date_formats, key=date_formats.get) if date_formats else "DD.MM.YYYY"
}
def analyze_payment_patterns(results: List[Dict]) -> Dict:
"""Analyze payment method patterns."""
payment_counts = defaultdict(int)
for r in results:
methods = r.get("payment_methods", [])
for m in methods:
method_type = m.get("method", "UNKNOWN")
payment_counts[method_type] += 1
return {
"methods": dict(payment_counts),
"has_mixed_payments": len(payment_counts) > 1
}
def analyze_client_patterns(results: List[Dict]) -> Dict:
"""Analyze client (B2B) patterns."""
has_client_cui = 0
has_client_name = 0
for r in results:
if r.get("client_cui"):
has_client_cui += 1
if r.get("client_name"):
has_client_name += 1
return {
"has_client_cui": has_client_cui > 0,
"has_client_name": has_client_name > 0,
"b2b_ratio": has_client_cui / len(results) if results else 0
}
def generate_profile_code(
store_name: str,
cui: str,
tva_analysis: Dict,
total_analysis: Dict,
date_analysis: Dict,
payment_analysis: Dict,
client_analysis: Dict
) -> str:
"""
Generate Python profile class code.
Args:
store_name: Human-readable store name
cui: CUI number (without RO prefix)
*_analysis: Analysis results from pattern detection
Returns:
Python source code for the profile class
"""
# Generate class name from store name
class_name = "".join(
word.capitalize()
for word in re.sub(r'[^a-zA-Z0-9\s]', '', store_name).split()
) + "Profile"
# Generate module name
module_name = re.sub(r'[^a-z0-9]', '_', store_name.lower()).strip('_')
# Determine profile characteristics
is_non_vat = tva_analysis.get("is_non_vat", False)
has_multi_rate = tva_analysis.get("has_multi_rate", False)
has_client_cui = client_analysis.get("has_client_cui", False)
uses_yyyy_mm_dd = date_analysis.get("dominant_format") == "YYYY.MM.DD"
# Generate OCR name patterns
name_words = store_name.upper().split()
primary_word = name_words[0] if name_words else store_name.upper()
name_patterns = [
primary_word,
store_name.upper().replace(".", "").replace(",", ""),
]
# Add OCR error variants
ocr_variants = {
'O': '0', 'I': '1', 'L': '1', 'S': '5', 'B': '8', 'E': '3'
}
for char, replacement in ocr_variants.items():
if char in primary_word:
name_patterns.append(primary_word.replace(char, replacement, 1))
name_patterns = list(dict.fromkeys(name_patterns))[:4] # Unique, max 4
# Build the code
code_lines = [
'"""',
f'{store_name} store profile for OCR extraction.',
'',
'Auto-generated by generate_store_profile.py',
f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M")}',
'"""',
'',
'import re',
'from decimal import Decimal, InvalidOperation',
'from typing import List, Dict, Any',
'',
'from .base import BaseStoreProfile',
'from . import ProfileRegistry',
'',
'',
'@ProfileRegistry.register',
f'class {class_name}(BaseStoreProfile):',
' """',
f' {store_name} - OCR extraction profile.',
' ',
]
# Add characteristics to docstring
characteristics = []
if is_non_vat:
characteristics.append("Non-VAT payer (neplatitor TVA)")
if has_multi_rate:
characteristics.append("Multi-rate TVA")
if has_client_cui:
characteristics.append("B2B receipts with client CUI")
if uses_yyyy_mm_dd:
characteristics.append("Date format: YYYY.MM.DD")
if characteristics:
code_lines.append(' Key characteristics:')
for c in characteristics:
code_lines.append(f' - {c}')
code_lines.append(' ')
code_lines.extend([
' """',
'',
f' CUI_LIST = ["{cui}"]',
f' NAME_PATTERNS = {name_patterns}',
f' STORE_NAME = "{store_name}"',
'',
])
# Add date patterns override for YYYY.MM.DD format
if uses_yyyy_mm_dd:
code_lines.extend([
' # Override date patterns for YYYY.MM.DD format',
' DATE_PATTERNS_OCR_SPACES = [',
' r\'(\\d{4})[.,]\\s*(\\d{2})[.,]\\s*(\\d{2})\', # YYYY. MM. DD with spaces',
' r\'(\\d{4})[.,](\\d{2})[.,](\\d{2})\', # YYYY.MM.DD',
' ]',
'',
])
# Add TVA extraction method for multi-rate or non-VAT
if is_non_vat:
code_lines.extend([
' def extract_tva_entries(self, text: str) -> List[dict]:',
' """Non-VAT payer - returns empty list."""',
' return []',
'',
])
elif has_multi_rate and tva_analysis.get("dominant_format") == "lidl_multi_rate":
code_lines.extend([
' # Store-specific TVA patterns',
' TVA_PATTERNS = [',
' r\'T[VU][AR]\\s+([A-D])\\s+(\\d{1,2})[.,]?\\d{0,2}\\s*%\\s+([\\d.,]+)\',',
' ]',
'',
' def extract_tva_entries(self, text: str) -> List[dict]:',
' """Extract multi-rate TVA entries."""',
' entries = []',
' seen = set()',
'',
' for pattern in self.TVA_PATTERNS:',
' for match in re.finditer(pattern, text, re.IGNORECASE):',
' try:',
' code = match.group(1).upper()',
' percent = int(match.group(2))',
' amount = self._parse_decimal(match.group(3))',
'',
' if amount and amount > 0:',
' entry_key = (code, percent)',
' if entry_key not in seen:',
' entries.append({',
' \'code\': code,',
' \'percent\': percent,',
' \'amount\': amount',
' })',
' seen.add(entry_key)',
' except (ValueError, InvalidOperation):',
' continue',
'',
' return entries',
'',
])
# Add validation hints method
code_lines.extend([
' def get_validation_hints(self) -> Dict[str, Any]:',
f' """Return {store_name}-specific validation hints."""',
' return {',
f' "has_multi_rate_tva": {has_multi_rate},',
f' "card_equals_total": True,',
f' "has_client_cui": {has_client_cui},',
f' "has_efactura": False,',
f' "is_non_vat_payer": {is_non_vat},',
' }',
])
return '\n'.join(code_lines) + '\n'
def main():
parser = argparse.ArgumentParser(
description="Generate store profile from PDF receipts",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Generate profile from a single PDF
python scripts/generate_store_profile.py \\
--name "Magazin Nou" --cui "12345678" \\
--receipts "docs/data-entry/magazin_nou.pdf"
# Generate profile from multiple PDFs (glob pattern)
python scripts/generate_store_profile.py \\
--name "Carrefour" --cui "2475489" \\
--receipts "docs/data-entry/Carrefour*.pdf" \\
--output backend/modules/data_entry/services/ocr/profiles/carrefour.py
# Dry run (analyze only, don't write file)
python scripts/generate_store_profile.py \\
--name "Test Store" --cui "11111111" \\
--receipts "docs/data-entry/test*.pdf" \\
--dry-run
"""
)
parser.add_argument("--name", required=True, help="Store name (e.g., 'LIDL DISCOUNT S.R.L.')")
parser.add_argument("--cui", required=True, help="CUI number without RO prefix")
parser.add_argument("--receipts", required=True, help="PDF file path or glob pattern")
parser.add_argument("--output", help="Output file path (default: auto-generated)")
parser.add_argument("--dry-run", action="store_true", help="Analyze only, don't write file")
parser.add_argument("--api-base", default=API_BASE, help=f"API base URL (default: {API_BASE})")
args = parser.parse_args()
# Update API base if provided
api_base = args.api_base
# Validate CUI format
cui = args.cui.strip().replace("RO", "").replace(" ", "")
if not cui.isdigit() or len(cui) < 6 or len(cui) > 10:
print(f"Error: Invalid CUI format: {args.cui}")
sys.exit(1)
# Find PDF files
pdf_files = glob.glob(args.receipts)
if not pdf_files:
print(f"Error: No PDF files found matching: {args.receipts}")
sys.exit(1)
print(f"\n{'='*60}")
print(f"Store Profile Generator")
print(f"{'='*60}")
print(f"Store: {args.name}")
print(f"CUI: {cui}")
print(f"PDFs: {len(pdf_files)} files")
print(f"{'='*60}\n")
# Generate JWT token
token = create_jwt_token()
# Submit PDFs to OCR
print("Step 1: Submitting PDFs to OCR API...")
results = []
for pdf_path in pdf_files:
result = submit_ocr(pdf_path, token, api_base=api_base)
if result:
results.append(result)
if not results:
print("\nError: No successful extractions. Check if backend is running.")
sys.exit(1)
print(f"\nSuccessfully extracted: {len(results)}/{len(pdf_files)} PDFs")
# Analyze patterns
print("\nStep 2: Analyzing patterns...")
tva_analysis = analyze_tva_patterns(results)
total_analysis = analyze_total_patterns(results)
date_analysis = analyze_date_patterns(results)
payment_analysis = analyze_payment_patterns(results)
client_analysis = analyze_client_patterns(results)
print(f" TVA: {tva_analysis['dominant_format']} format, multi-rate={tva_analysis['has_multi_rate']}")
print(f" Date: {date_analysis['dominant_format']} format")
print(f" Payments: {list(payment_analysis['methods'].keys())}")
print(f" B2B: {client_analysis['has_client_cui']}")
# Generate profile code
print("\nStep 3: Generating profile code...")
code = generate_profile_code(
store_name=args.name,
cui=cui,
tva_analysis=tva_analysis,
total_analysis=total_analysis,
date_analysis=date_analysis,
payment_analysis=payment_analysis,
client_analysis=client_analysis
)
# Determine output path
if args.output:
output_path = args.output
else:
module_name = re.sub(r'[^a-z0-9]', '_', args.name.lower()).strip('_')
output_path = f"backend/modules/data_entry/services/ocr/profiles/{module_name}.py"
if args.dry_run:
print(f"\n[DRY RUN] Would write to: {output_path}")
print(f"\n{'='*60}")
print("Generated code:")
print(f"{'='*60}")
print(code)
else:
# Write file
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w') as f:
f.write(code)
print(f" Written to: {output_path}")
# Verify syntax
import py_compile
try:
py_compile.compile(output_path, doraise=True)
print(f" Syntax check: OK")
except py_compile.PyCompileError as e:
print(f" Syntax check: FAILED - {e}")
print(f"\n{'='*60}")
print("Profile generation complete!")
print(f"{'='*60}")
if not args.dry_run:
print(f"\nNext steps:")
print(f"1. Review the generated code: {output_path}")
print(f"2. Customize patterns if needed")
print(f"3. Hot-reload profiles: curl -X POST http://localhost:8000/api/data-entry/ocr/profiles/reload")
print(f"4. Test with a sample receipt")
if __name__ == "__main__":
main()