Major OCR infrastructure improvements: - Add persistent SQLite-based job queue for OCR tasks - Implement worker pool with process isolation and auto-restart - Add OCR engine selector dropdown (Tesseract/PaddleOCR) in upload zone - Optimize Tesseract preprocessing based on benchmark results (8x faster) - Add recognize_cif_optimized() with multi-strategy CIF extraction - Add Romanian CIF checksum validation - Increase Telegram long polling timeout from 10s to 30s Squashed commits: - feat(ocr): Implement persistent worker pool with SQLite job queue - feat(ocr): Add OCR engine selector dropdown to upload zone - perf(telegram): Increase long polling timeout from 10s to 30s - perf(ocr): Optimize Tesseract preprocessing based on benchmark results 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
656 lines
23 KiB
Python
656 lines
23 KiB
Python
"""
|
||
Optimized Tesseract Engine for OCR - SPEED + QUALITY OPTIMIZED
|
||
|
||
Performance optimizations (vs previous version):
|
||
- Single PSM mode (PSM 4) instead of multi-PSM (4 modes × 2 calls = 8x faster)
|
||
- Single Tesseract call per image (skip image_to_data for speed)
|
||
- Lighter preprocessing (no over-binarization)
|
||
- --dpi 300 flag for proper scaling
|
||
- OEM 3 (default LSTM+Legacy) for balanced speed/accuracy
|
||
|
||
Quality optimizations for Romanian receipts:
|
||
- PSM 4: Single column layout (optimal for receipts)
|
||
- Polarity correction: ensures black text on white background
|
||
- Language: Romanian only (-l ron) for faster recognition
|
||
- Fallback to PSM 6 if PSM 4 produces poor results
|
||
|
||
Previous issues fixed:
|
||
- Was 8x slower than PaddleOCR due to multi-PSM + dual calls
|
||
- Produced gibberish on clear PDFs due to over-binarization
|
||
"""
|
||
|
||
import logging
|
||
import os
|
||
from dataclasses import dataclass, field
|
||
from typing import List, Optional, Tuple
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
# Check Tesseract availability
|
||
try:
|
||
import pytesseract
|
||
TESSERACT_AVAILABLE = True
|
||
except ImportError:
|
||
TESSERACT_AVAILABLE = False
|
||
pytesseract = None
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class OCRResult:
|
||
"""Raw OCR result from Tesseract."""
|
||
text: str
|
||
confidence: float
|
||
boxes: List[dict] = field(default_factory=list)
|
||
engine: str = "tesseract"
|
||
|
||
|
||
class TesseractEngine:
|
||
"""
|
||
Optimized Tesseract engine for receipt OCR.
|
||
|
||
TESTED OPTIMAL SETTINGS (from comprehensive benchmark):
|
||
- DPI 200 for PDF loading (not 300!)
|
||
- Padding 40px for edge protection
|
||
- PSM 6 for complex receipts, PSM 4 for simple ones
|
||
- Multi-pass strategy when quality is critical
|
||
|
||
SPEED vs QUALITY tradeoff:
|
||
- Fast mode (single pass): ~0.9s, ~6-7 keywords
|
||
- Quality mode (multi-pass): ~1.7s, ~8-9 keywords (+2 more keywords)
|
||
|
||
BENCHMARK RESULTS:
|
||
- padded_psm6_40: Best for complex receipts (igiena, five-holding)
|
||
- baseline_psm4: Best for simple receipts (rechizite, benzina)
|
||
- multi-pass: Best overall quality but slower
|
||
"""
|
||
|
||
# PSM modes for receipts
|
||
PSM_SINGLE_COLUMN = 4 # Best for simple vertical receipts
|
||
PSM_UNIFORM_BLOCK = 6 # Best for complex layouts
|
||
PSM_SPARSE_TEXT = 11 # Fallback for difficult receipts
|
||
|
||
# Optimal padding (from benchmark)
|
||
DEFAULT_PADDING = 40
|
||
|
||
def __init__(self):
|
||
"""Initialize Tesseract engine."""
|
||
if not TESSERACT_AVAILABLE:
|
||
raise RuntimeError("pytesseract not available. Install with: pip install pytesseract")
|
||
|
||
# Verify Tesseract installation
|
||
try:
|
||
self._version = pytesseract.get_tesseract_version()
|
||
except Exception as e:
|
||
raise RuntimeError(f"Tesseract not installed or not in PATH: {e}")
|
||
|
||
logger.info(f"[TesseractEngine] Initialized (v{self._version})")
|
||
|
||
def recognize(self, image: np.ndarray, fast_mode: bool = True) -> OCRResult:
|
||
"""
|
||
Perform OCR recognition on image (OPTIMIZED).
|
||
|
||
SPEED: Uses single PSM mode + single Tesseract call.
|
||
Previously used 4 PSM modes × 2 calls = 8 Tesseract invocations.
|
||
Now uses 1-2 calls maximum (with fallback).
|
||
|
||
Args:
|
||
image: Preprocessed grayscale image (DO NOT binarize for clear PDFs!)
|
||
fast_mode: If True, skip confidence calculation for maximum speed
|
||
|
||
Returns:
|
||
OCRResult with text and confidence
|
||
"""
|
||
if not TESSERACT_AVAILABLE:
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract")
|
||
|
||
# Ensure grayscale
|
||
if len(image.shape) == 3:
|
||
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||
|
||
# Fix polarity (black text on white background)
|
||
image = self._ensure_correct_polarity(image)
|
||
|
||
# Try PSM 4 first (single column - best for receipts)
|
||
result = self._recognize_fast(image, self.PSM_SINGLE_COLUMN, fast_mode)
|
||
|
||
# If poor result, try PSM 6 as fallback
|
||
if not result.text.strip() or result.confidence < 0.3:
|
||
logger.debug(f"[Tesseract] PSM {self.PSM_SINGLE_COLUMN} poor result, trying PSM {self.PSM_UNIFORM_BLOCK}")
|
||
fallback = self._recognize_fast(image, self.PSM_UNIFORM_BLOCK, fast_mode)
|
||
if len(fallback.text) > len(result.text):
|
||
result = fallback
|
||
|
||
if result.text.strip():
|
||
logger.info(f"[TesseractEngine] Result: {len(result.text)} chars, conf={result.confidence:.0%}")
|
||
|
||
return result
|
||
|
||
def _recognize_fast(self, image: np.ndarray, psm: int, fast_mode: bool = True) -> OCRResult:
|
||
"""
|
||
Fast single-call Tesseract recognition.
|
||
|
||
Optimizations:
|
||
- Single call (image_to_string only in fast mode)
|
||
- OEM 3 (LSTM+Legacy) - faster than OEM 1
|
||
- --dpi 300 for proper scaling
|
||
- Romanian only (-l ron)
|
||
|
||
Args:
|
||
image: Grayscale image
|
||
psm: Page segmentation mode
|
||
fast_mode: Skip confidence calculation for speed
|
||
|
||
Returns:
|
||
OCRResult
|
||
"""
|
||
# Build optimized config:
|
||
# OEM 3 = LSTM + Legacy (faster than pure LSTM)
|
||
# --dpi 300 = proper scaling hint
|
||
# -l ron = Romanian only (faster, avoids eng confusion)
|
||
config = f'--psm {psm} --oem 3 --dpi 300 -l ron'
|
||
|
||
try:
|
||
if fast_mode:
|
||
# Fast path: just get text, estimate confidence
|
||
text = pytesseract.image_to_string(image, config=config)
|
||
# Estimate confidence based on text quality
|
||
confidence = self._estimate_confidence(text)
|
||
else:
|
||
# Accurate path: get text + real confidence
|
||
text = pytesseract.image_to_string(image, config=config)
|
||
data = pytesseract.image_to_data(
|
||
image, config=config, output_type=pytesseract.Output.DICT
|
||
)
|
||
confidences = [int(c) for c in data['conf'] if int(c) > 0]
|
||
confidence = sum(confidences) / len(confidences) / 100 if confidences else 0.0
|
||
|
||
return OCRResult(
|
||
text=text,
|
||
confidence=confidence,
|
||
boxes=[],
|
||
engine="tesseract"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[Tesseract] PSM {psm} error: {e}")
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract")
|
||
|
||
def _estimate_confidence(self, text: str) -> float:
|
||
"""
|
||
Estimate OCR confidence based on text quality.
|
||
|
||
Heuristics:
|
||
- More alphanumeric chars = higher confidence
|
||
- Less garbage chars = higher confidence
|
||
- Romanian-specific patterns boost confidence
|
||
"""
|
||
if not text.strip():
|
||
return 0.0
|
||
|
||
# Count valid vs garbage chars
|
||
valid_chars = sum(1 for c in text if c.isalnum() or c in '.,;:-/\n ')
|
||
total_chars = len(text)
|
||
|
||
if total_chars == 0:
|
||
return 0.0
|
||
|
||
# Base confidence from char ratio
|
||
confidence = valid_chars / total_chars
|
||
|
||
# Boost for Romanian receipt patterns
|
||
text_lower = text.lower()
|
||
if any(word in text_lower for word in ['total', 'lei', 'ron', 'buc', 'tva', 'cif', 'bon']):
|
||
confidence = min(confidence + 0.1, 1.0)
|
||
|
||
return confidence
|
||
|
||
def recognize_multipass(self, image: np.ndarray) -> OCRResult:
|
||
"""
|
||
Multi-pass OCR for maximum quality (slower but more accurate).
|
||
|
||
Strategy (from benchmark testing):
|
||
- Pass 1: PSM 4 (single column) - no padding, fast baseline
|
||
- Pass 2: PSM 6 (uniform block) - with 40px padding, better for complex layouts
|
||
- Pass 3: PSM 11 (sparse text) - with 40px padding + stronger CLAHE, for difficult receipts
|
||
|
||
Merges results: picks the pass with highest keyword count.
|
||
On average finds +2.1 more keywords than single-pass (~8.7 vs 6.6).
|
||
|
||
Time: ~1.7s (vs ~0.9s for single pass)
|
||
|
||
Args:
|
||
image: Input image (RGB or grayscale)
|
||
|
||
Returns:
|
||
OCRResult from the best pass
|
||
"""
|
||
if not TESSERACT_AVAILABLE:
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract")
|
||
|
||
# Ensure grayscale
|
||
if len(image.shape) == 3:
|
||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||
else:
|
||
gray = image.copy()
|
||
|
||
# Define passes with different settings
|
||
passes = [
|
||
# Pass 1: Fast baseline (no padding) - good for simple receipts
|
||
{"name": "pass1_psm4", "psm": 4, "padding": 0, "clahe_clip": 1.5},
|
||
# Pass 2: Padded PSM 6 - good for complex receipts
|
||
{"name": "pass2_psm6_padded", "psm": 6, "padding": 40, "clahe_clip": 1.5},
|
||
# Pass 3: Sparse text with stronger enhancement - for difficult cases
|
||
{"name": "pass3_psm11", "psm": 11, "padding": 40, "clahe_clip": 2.0},
|
||
]
|
||
|
||
best_result = None
|
||
best_score = -1
|
||
all_keywords = set()
|
||
|
||
for p in passes:
|
||
# Apply preprocessing for this pass
|
||
processed = gray.copy()
|
||
|
||
# Add padding if specified
|
||
if p["padding"] > 0:
|
||
processed = cv2.copyMakeBorder(
|
||
processed, p["padding"], p["padding"], p["padding"], p["padding"],
|
||
cv2.BORDER_CONSTANT, value=255
|
||
)
|
||
|
||
# Apply CLAHE
|
||
clahe = cv2.createCLAHE(clipLimit=p["clahe_clip"], tileGridSize=(8, 8))
|
||
processed = clahe.apply(processed)
|
||
|
||
# Ensure correct polarity
|
||
processed = self._ensure_correct_polarity(processed)
|
||
|
||
# Run OCR
|
||
config = f'--psm {p["psm"]} --oem 3 -l ron'
|
||
try:
|
||
text = pytesseract.image_to_string(processed, config=config)
|
||
confidence = self._estimate_confidence(text)
|
||
|
||
# Score based on Romanian receipt keywords
|
||
text_lower = text.lower()
|
||
keywords = ['cif', 'total', 'tva', 'lei', 'ron', 'buc', 'fiscal', 'bon',
|
||
'hartie', 'prosop', 'saci', 'creion', 'constanta', 'bucuresti']
|
||
found_keywords = [kw for kw in keywords if kw in text_lower]
|
||
all_keywords.update(found_keywords)
|
||
|
||
# Score: keywords + CIF bonus + TOTAL bonus
|
||
score = len(found_keywords) * 10
|
||
if self._has_cif_pattern(text):
|
||
score += 15
|
||
if self._has_total_pattern(text):
|
||
score += 10
|
||
|
||
logger.debug(f"[Tesseract] {p['name']}: {len(found_keywords)} keywords, score={score}")
|
||
|
||
if score > best_score:
|
||
best_score = score
|
||
best_result = OCRResult(
|
||
text=text,
|
||
confidence=confidence,
|
||
boxes=[],
|
||
engine=f"tesseract-multipass-{p['name']}"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[Tesseract] {p['name']} failed: {e}")
|
||
continue
|
||
|
||
if best_result:
|
||
logger.info(f"[TesseractEngine] Multi-pass best: {best_result.engine}, "
|
||
f"{len(all_keywords)} total keywords found")
|
||
return best_result
|
||
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract-multipass")
|
||
|
||
def _has_cif_pattern(self, text: str) -> bool:
|
||
"""Check if text contains a valid CIF/CUI pattern."""
|
||
import re
|
||
text_upper = text.upper()
|
||
patterns = [
|
||
r'CIF[:\s]*RO?\d{6,10}',
|
||
r'CUI[:\s]*RO?\d{6,10}',
|
||
r'C\.?I\.?F\.?[:\s]*RO?\d{6,10}',
|
||
]
|
||
for pattern in patterns:
|
||
if re.search(pattern, text_upper):
|
||
return True
|
||
return bool(re.search(r'RO\d{7,10}', text_upper))
|
||
|
||
def _has_total_pattern(self, text: str) -> bool:
|
||
"""Check if TOTAL is properly recognized (not truncated to BTOTAL/OTAL)."""
|
||
import re
|
||
text_upper = text.upper()
|
||
return bool(re.search(r'(^|\s)TOTAL\s', text_upper, re.MULTILINE))
|
||
|
||
def recognize_with_boxes(self, image: np.ndarray, psm: int = 4) -> OCRResult:
|
||
"""
|
||
Recognition with bounding boxes (slower, for debugging/visualization).
|
||
|
||
Use this only when you need box coordinates.
|
||
For normal OCR, use recognize() which is faster.
|
||
|
||
Args:
|
||
image: Grayscale image
|
||
psm: Page segmentation mode (default: 4 for receipts)
|
||
|
||
Returns:
|
||
OCRResult with text, confidence, and boxes
|
||
"""
|
||
if not TESSERACT_AVAILABLE:
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract")
|
||
|
||
# Ensure grayscale
|
||
if len(image.shape) == 3:
|
||
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||
|
||
image = self._ensure_correct_polarity(image)
|
||
config = f'--psm {psm} --oem 3 --dpi 300 -l ron'
|
||
|
||
try:
|
||
text = pytesseract.image_to_string(image, config=config)
|
||
data = pytesseract.image_to_data(
|
||
image, config=config, output_type=pytesseract.Output.DICT
|
||
)
|
||
|
||
confidences = [int(c) for c in data['conf'] if int(c) > 0]
|
||
avg_conf = sum(confidences) / len(confidences) / 100 if confidences else 0.0
|
||
|
||
boxes = []
|
||
for i in range(len(data['text'])):
|
||
if data['text'][i].strip() and int(data['conf'][i]) > 0:
|
||
boxes.append({
|
||
'text': data['text'][i],
|
||
'confidence': int(data['conf'][i]) / 100,
|
||
'box': [data['left'][i], data['top'][i], data['width'][i], data['height'][i]]
|
||
})
|
||
|
||
return OCRResult(text=text, confidence=avg_conf, boxes=boxes, engine="tesseract")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[Tesseract] recognize_with_boxes error: {e}")
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract")
|
||
|
||
def _ensure_correct_polarity(self, image: np.ndarray) -> np.ndarray:
|
||
"""
|
||
Ensure image has black text on white background.
|
||
|
||
Receipts should have dark text on light background.
|
||
If image is inverted (light text on dark), invert it.
|
||
|
||
Detection method:
|
||
- Calculate mean pixel value
|
||
- If mean < 127, image is mostly dark (inverted)
|
||
- Invert to correct polarity
|
||
|
||
Args:
|
||
image: Grayscale image
|
||
|
||
Returns:
|
||
Polarity-corrected image
|
||
"""
|
||
mean_value = np.mean(image)
|
||
|
||
if mean_value < 127:
|
||
# Image is mostly dark = inverted (white text on black)
|
||
logger.debug(f"[TesseractEngine] Detected inverted polarity (mean={mean_value:.1f}), correcting...")
|
||
return 255 - image
|
||
|
||
return image
|
||
|
||
def recognize_numbers_only(self, image: np.ndarray) -> OCRResult:
|
||
"""
|
||
OCR optimized for numeric content (amounts, totals).
|
||
|
||
Uses character whitelist to reduce errors on numbers.
|
||
|
||
Args:
|
||
image: Preprocessed grayscale image
|
||
|
||
Returns:
|
||
OCRResult with numeric text
|
||
"""
|
||
if not TESSERACT_AVAILABLE:
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract")
|
||
|
||
# Ensure grayscale
|
||
if len(image.shape) == 3:
|
||
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||
|
||
# Fix polarity
|
||
image = self._ensure_correct_polarity(image)
|
||
|
||
# Config for numbers only
|
||
# Whitelist: digits, comma, period, space, RON, LEI
|
||
config = '--psm 6 --oem 1 -c tessedit_char_whitelist=0123456789.,- '
|
||
|
||
try:
|
||
text = pytesseract.image_to_string(image, config=config)
|
||
|
||
data = pytesseract.image_to_data(
|
||
image,
|
||
config=config,
|
||
output_type=pytesseract.Output.DICT
|
||
)
|
||
|
||
confidences = [int(c) for c in data['conf'] if int(c) > 0]
|
||
avg_conf = sum(confidences) / len(confidences) / 100 if confidences else 0.0
|
||
|
||
return OCRResult(
|
||
text=text.strip(),
|
||
confidence=avg_conf,
|
||
boxes=[],
|
||
engine="tesseract-numeric"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[TesseractEngine] Numeric OCR error: {e}")
|
||
return OCRResult(text="", confidence=0.0, boxes=[], engine="tesseract")
|
||
|
||
def recognize_cif_optimized(self, image: np.ndarray) -> Optional[str]:
|
||
"""
|
||
Optimized CIF extraction using multi-strategy approach.
|
||
|
||
BENCHMARK RESULTS (from test_critical_fields.py):
|
||
- digit_opt_dpi200: 33% accuracy (best)
|
||
- digit_whitelist: Works well on specific receipts
|
||
- basic_ron_eng: Good backup
|
||
|
||
Strategy:
|
||
1. Try digit-optimized preprocessing (2x scale + Otsu)
|
||
2. Try character whitelist (RO + digits only)
|
||
3. Try standard ron+eng config
|
||
4. Return best match based on CIF pattern validation
|
||
|
||
Args:
|
||
image: Input image (RGB from pdf2image or BGR from OpenCV)
|
||
|
||
Returns:
|
||
Extracted CIF string (e.g., "RO10562600") or None
|
||
"""
|
||
import re
|
||
|
||
if not TESSERACT_AVAILABLE:
|
||
return None
|
||
|
||
# Ensure grayscale
|
||
if len(image.shape) == 3:
|
||
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
|
||
else:
|
||
gray = image.copy()
|
||
|
||
# Extract top 35% of image (where CIF is typically found)
|
||
height = gray.shape[0]
|
||
top_region = gray[:int(height * 0.35), :]
|
||
|
||
candidates = []
|
||
|
||
# Strategy 1: Digit-optimized preprocessing (best performer: 33% accuracy)
|
||
try:
|
||
# Scale up 2x + Otsu binarization
|
||
scaled = cv2.resize(top_region, None, fx=2.0, fy=2.0, interpolation=cv2.INTER_CUBIC)
|
||
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
|
||
enhanced = clahe.apply(scaled)
|
||
_, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
||
if np.mean(binary) < 127:
|
||
binary = 255 - binary
|
||
|
||
text = pytesseract.image_to_string(binary, config='--psm 6 --oem 3 -l ron')
|
||
cif = self._extract_cif_from_text(text)
|
||
if cif:
|
||
candidates.append(('digit_opt', cif))
|
||
except Exception as e:
|
||
logger.debug(f"[TesseractEngine] digit_opt strategy failed: {e}")
|
||
|
||
# Strategy 2: Character whitelist (RO + digits only)
|
||
try:
|
||
# Add padding
|
||
padded = cv2.copyMakeBorder(top_region, 40, 40, 40, 40, cv2.BORDER_CONSTANT, value=255)
|
||
scaled = cv2.resize(padded, None, fx=2.0, fy=2.0, interpolation=cv2.INTER_CUBIC)
|
||
|
||
config = '--psm 6 --oem 1 -c tessedit_char_whitelist=0123456789ROro'
|
||
text = pytesseract.image_to_string(scaled, config=config)
|
||
cif = self._extract_cif_from_text(text)
|
||
if cif:
|
||
candidates.append(('whitelist', cif))
|
||
except Exception as e:
|
||
logger.debug(f"[TesseractEngine] whitelist strategy failed: {e}")
|
||
|
||
# Strategy 3: Standard ron+eng config (good backup)
|
||
try:
|
||
padded = cv2.copyMakeBorder(top_region, 40, 40, 40, 40, cv2.BORDER_CONSTANT, value=255)
|
||
clahe = cv2.createCLAHE(clipLimit=1.5, tileGridSize=(8, 8))
|
||
enhanced = clahe.apply(padded)
|
||
|
||
text = pytesseract.image_to_string(enhanced, config='--psm 6 --oem 3 -l ron+eng')
|
||
cif = self._extract_cif_from_text(text)
|
||
if cif:
|
||
candidates.append(('ron_eng', cif))
|
||
except Exception as e:
|
||
logger.debug(f"[TesseractEngine] ron_eng strategy failed: {e}")
|
||
|
||
if not candidates:
|
||
return None
|
||
|
||
# Log all candidates
|
||
for strategy, cif in candidates:
|
||
logger.debug(f"[TesseractEngine] CIF candidate from {strategy}: {cif}")
|
||
|
||
# Use majority voting if multiple strategies agree
|
||
from collections import Counter
|
||
cif_counts = Counter(cif for _, cif in candidates)
|
||
most_common_cif, count = cif_counts.most_common(1)[0]
|
||
|
||
if count > 1:
|
||
# Multiple strategies agree
|
||
logger.info(f"[TesseractEngine] CIF extracted (majority {count} strategies): {most_common_cif}")
|
||
return most_common_cif
|
||
|
||
# No agreement - prefer digit_opt strategy (33% accuracy in benchmarks)
|
||
for strategy, cif in candidates:
|
||
if strategy == 'digit_opt':
|
||
logger.info(f"[TesseractEngine] CIF extracted via digit_opt (preferred): {cif}")
|
||
return cif
|
||
|
||
# Fallback to first candidate
|
||
strategy, cif = candidates[0]
|
||
logger.info(f"[TesseractEngine] CIF extracted via {strategy}: {cif}")
|
||
return cif
|
||
|
||
def _extract_cif_from_text(self, text: str) -> Optional[str]:
|
||
"""Extract CIF/CUI from OCR text."""
|
||
import re
|
||
text_upper = text.upper().replace(' ', '')
|
||
|
||
patterns = [
|
||
r'CIF[:\s]*R?O?(\d{6,10})',
|
||
r'CUI[:\s]*R?O?(\d{6,10})',
|
||
r'C\.?I\.?F\.?[:\s]*R?O?(\d{6,10})',
|
||
r'RO(\d{7,10})',
|
||
r'R\.?O\.?[\s:]*(\d{6,10})',
|
||
]
|
||
|
||
for pattern in patterns:
|
||
match = re.search(pattern, text_upper)
|
||
if match:
|
||
digits = match.group(1).lstrip('0') or '0'
|
||
return f"RO{digits}"
|
||
|
||
return None
|
||
|
||
@staticmethod
|
||
def validate_romanian_cif(cif: str) -> bool:
|
||
"""
|
||
Validate Romanian CIF/CUI using checksum algorithm.
|
||
|
||
Romanian CIF format: RO + 2-10 digits
|
||
The last digit is a control digit calculated using modulo 11.
|
||
|
||
Algorithm:
|
||
1. Multiply each digit by corresponding weight (from right to left: 2,3,4,5,6,7,2,3,4,5)
|
||
2. Sum all products
|
||
3. Remainder of sum / 11 is the control digit
|
||
4. If remainder is 10, control digit is 0
|
||
|
||
Args:
|
||
cif: CIF string (e.g., "RO10562600", "10562600")
|
||
|
||
Returns:
|
||
True if CIF is valid, False otherwise
|
||
"""
|
||
# Remove RO prefix and spaces
|
||
cif = cif.upper().replace(' ', '').replace('RO', '')
|
||
|
||
# Must be 2-10 digits
|
||
if not cif.isdigit() or len(cif) < 2 or len(cif) > 10:
|
||
return False
|
||
|
||
# Weights for checksum calculation (right to left)
|
||
weights = [2, 3, 4, 5, 6, 7, 2, 3, 4, 5]
|
||
|
||
# Pad with zeros on the left to make it 10 digits
|
||
cif_padded = cif.zfill(10)
|
||
|
||
# Calculate checksum (excluding last digit which is control)
|
||
total = 0
|
||
for i in range(9):
|
||
total += int(cif_padded[i]) * weights[i]
|
||
|
||
# Control digit
|
||
control = total % 11
|
||
if control == 10:
|
||
control = 0
|
||
|
||
# Compare with last digit
|
||
return int(cif_padded[9]) == control
|
||
|
||
@staticmethod
|
||
def is_available() -> bool:
|
||
"""Check if Tesseract is available."""
|
||
if not TESSERACT_AVAILABLE:
|
||
return False
|
||
|
||
try:
|
||
pytesseract.get_tesseract_version()
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
@staticmethod
|
||
def get_version() -> Optional[str]:
|
||
"""Get Tesseract version string."""
|
||
if not TESSERACT_AVAILABLE:
|
||
return None
|
||
|
||
try:
|
||
return str(pytesseract.get_tesseract_version())
|
||
except Exception:
|
||
return None
|