Romanian CUI check digit algorithm (key 753217532) validates CUIs before ANAF lookup. New sanitize_cui() fixes OCR typos (O→0, I→1) and verifies checksum, logging warnings for invalid CUIs. Applied at both ANAF batch verification and per-order import steps. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
198 lines
7.0 KiB
Python
198 lines
7.0 KiB
Python
import re
|
|
import logging
|
|
import httpx
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Romanian diacritics to ASCII mapping (same 14 chars as import_service)
|
|
_DIACRITICS = str.maketrans('ĂăÂâÎîȘșȚțŞşŢţ', 'AAAAIISSTTSSTT')
|
|
|
|
|
|
def strip_ro_prefix(cod_fiscal: str) -> str:
|
|
"""Normalize CUI: strip whitespace, uppercase, remove 'RO' prefix, fix OCR-like typos."""
|
|
if not cod_fiscal:
|
|
return ""
|
|
cleaned = cod_fiscal.strip().upper()
|
|
cleaned = re.sub(r'^RO\s*', '', cleaned)
|
|
# Fix common character confusions in CUI (O→0, I→1, L→1, B→8)
|
|
cleaned = cleaned.translate(str.maketrans('OIL', '011'))
|
|
return cleaned
|
|
|
|
|
|
def validate_cui(bare_cui: str) -> bool:
|
|
"""Validate bare CUI: digits only, length 2-10."""
|
|
if not bare_cui:
|
|
return False
|
|
return bare_cui.isdigit() and 2 <= len(bare_cui) <= 10
|
|
|
|
|
|
# Cheia de testare CUI Romania (9 ponderi, aliniate la dreapta cu cifrele fara cifra de control)
|
|
_CUI_KEY = [7, 5, 3, 2, 1, 7, 5, 3, 2]
|
|
|
|
|
|
def validate_cui_checksum(bare_cui: str) -> bool:
|
|
"""Validate CUI check digit using the Romanian algorithm.
|
|
|
|
Algorithm: pad to 9 digits (without check digit), multiply by key 753217532,
|
|
sum products, (sum * 10) % 11 → if 10 then 0, else result == check digit.
|
|
"""
|
|
if not validate_cui(bare_cui):
|
|
return False
|
|
digits = [int(d) for d in bare_cui]
|
|
check_digit = digits[-1]
|
|
body = digits[:-1]
|
|
# Pad left with zeros to 9 positions
|
|
padded = [0] * (9 - len(body)) + body
|
|
total = sum(d * k for d, k in zip(padded, _CUI_KEY))
|
|
result = (total * 10) % 11
|
|
if result == 10:
|
|
result = 0
|
|
return result == check_digit
|
|
|
|
|
|
def sanitize_cui(raw_cf: str) -> tuple[str, str | None]:
|
|
"""Sanitize and validate CUI. Returns (clean_cui, warning_or_none).
|
|
|
|
Steps: strip RO prefix, fix OCR typos (O→0), validate checksum.
|
|
If sanitized version passes checksum but original didn't, returns the fixed CUI.
|
|
If neither passes, returns original with warning.
|
|
"""
|
|
bare = strip_ro_prefix(raw_cf)
|
|
if not bare:
|
|
return bare, None
|
|
|
|
if validate_cui(bare) and validate_cui_checksum(bare):
|
|
return bare, None
|
|
|
|
# Try without OCR fix (raw, just stripped)
|
|
raw_bare = re.sub(r'^RO\s*', '', raw_cf.strip().upper())
|
|
if raw_bare != bare and validate_cui(raw_bare) and validate_cui_checksum(raw_bare):
|
|
return raw_bare, None
|
|
|
|
# Sanitized version passes format but not checksum
|
|
if validate_cui(bare):
|
|
return bare, f"CUI {bare} nu trece verificarea cifrei de control"
|
|
|
|
# Not even valid format
|
|
return bare, f"CUI {raw_cf!r} contine caractere invalide dupa sanitizare: {bare!r}"
|
|
|
|
|
|
async def check_vat_status_batch(cui_list: list[str], date: str = None) -> dict[str, dict]:
|
|
"""POST to ANAF API to check VAT status for a batch of CUIs.
|
|
|
|
Chunks in batches of 500 (ANAF API limit).
|
|
Returns {cui_str: {"scpTVA": bool|None, "denumire_anaf": str, "checked_at": str}, ...}
|
|
"""
|
|
if not cui_list:
|
|
return {}
|
|
|
|
check_date = date or datetime.now().strftime("%Y-%m-%d")
|
|
results = {}
|
|
|
|
for i in range(0, len(cui_list), 500):
|
|
chunk = cui_list[i:i+500]
|
|
body = [{"cui": int(cui), "data": check_date} for cui in chunk if cui.isdigit()]
|
|
if not body:
|
|
continue
|
|
|
|
chunk_results = await _call_anaf_api(body)
|
|
results.update(chunk_results)
|
|
|
|
return results
|
|
|
|
|
|
async def _call_anaf_api(body: list[dict], retry: int = 0) -> dict[str, dict]:
|
|
"""Internal: single ANAF API call with retry logic."""
|
|
url = "https://webservicesp.anaf.ro/api/PlatitorTvaRest/v9/tva"
|
|
results = {}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.post(url, json=body)
|
|
|
|
if response.status_code == 429:
|
|
if retry < 1:
|
|
logger.warning("ANAF API rate limited (429), retrying in 10s...")
|
|
await asyncio.sleep(10)
|
|
return await _call_anaf_api(body, retry + 1)
|
|
logger.error("ANAF API rate limited after retry")
|
|
return {}
|
|
|
|
if response.status_code >= 500:
|
|
if retry < 1:
|
|
logger.warning(f"ANAF API server error ({response.status_code}), retrying in 3s...")
|
|
await asyncio.sleep(3)
|
|
return await _call_anaf_api(body, retry + 1)
|
|
logger.error(f"ANAF API server error after retry: {response.status_code}")
|
|
return {}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
checked_at = datetime.now().isoformat()
|
|
|
|
# Parse ANAF response
|
|
found_list = data.get("found", [])
|
|
for item in found_list:
|
|
date_generals = item.get("date_generale", {})
|
|
cui_str = str(date_generals.get("cui", ""))
|
|
results[cui_str] = {
|
|
"scpTVA": item.get("inregistrare_scop_Tva", {}).get("scpTVA"),
|
|
"denumire_anaf": date_generals.get("denumire", ""),
|
|
"checked_at": checked_at,
|
|
}
|
|
|
|
# Not found CUIs
|
|
notfound_list = data.get("notFound", [])
|
|
for item in notfound_list:
|
|
date_gen = item.get("date_generale", {})
|
|
cui_str = str(date_gen.get("cui", item.get("cui", "")))
|
|
results[cui_str] = {
|
|
"scpTVA": None,
|
|
"denumire_anaf": "",
|
|
"checked_at": checked_at,
|
|
}
|
|
|
|
logger.info(f"ANAF batch: {len(body)} CUIs → {len(found_list)} found, {len(notfound_list)} not found")
|
|
|
|
except httpx.TimeoutException:
|
|
if retry < 1:
|
|
logger.warning("ANAF API timeout, retrying in 3s...")
|
|
await asyncio.sleep(3)
|
|
return await _call_anaf_api(body, retry + 1)
|
|
logger.error("ANAF API timeout after retry")
|
|
except Exception as e:
|
|
if retry < 1:
|
|
logger.warning(f"ANAF API error: {e}, retrying in 3s...")
|
|
await asyncio.sleep(3)
|
|
return await _call_anaf_api(body, retry + 1)
|
|
logger.error(f"ANAF API error after retry: {e}")
|
|
|
|
return results
|
|
|
|
|
|
def determine_correct_cod_fiscal(bare_cui: str, is_vat_payer: bool | None) -> str:
|
|
"""Determine the correct cod_fiscal format based on ANAF VAT status.
|
|
True → "RO" + bare, False → bare, None → bare (conservative)
|
|
"""
|
|
if is_vat_payer is True:
|
|
return "RO" + bare_cui
|
|
return bare_cui
|
|
|
|
|
|
def normalize_company_name(name: str) -> str:
|
|
"""Normalize company name for comparison: strip SRL/SA suffixes, diacritics, punctuation."""
|
|
if not name:
|
|
return ""
|
|
result = name.strip().upper()
|
|
# Strip diacritics
|
|
result = result.translate(_DIACRITICS)
|
|
# Remove common suffixes
|
|
result = re.sub(r'\b(S\.?R\.?L\.?|S\.?A\.?|S\.?C\.?|S\.?N\.?C\.?|S\.?C\.?S\.?)\b', '', result)
|
|
# Remove punctuation and extra spaces
|
|
result = re.sub(r'[^\w\s]', '', result)
|
|
result = re.sub(r'\s+', ' ', result).strip()
|
|
return result
|