Files
gomag-vending/api/app/services/anaf_service.py
Claude Agent 6620b28ed1 feat(sync): gate CUI invalid/ANAF-notFound → ERROR inainte de import Oracle
Incident 22.04.2026 (#485225171 NONA ROYAL SRL): clientul a inversat
cod_fiscal cu registru in GoMag → sistem a creat partener cu CUI=J1994000194225.

Adauga evaluate_cui_gate() care blocheaza comanda (ERROR) daca:
- CUI format invalid (ex: J.. in loc de cifre)
- CUI nu trece cifra de control
- ANAF returneaza explicit notFound (scpTVA=None + denumire_anaf="")

ANAF down (anaf_data=None) → fallback pass, comportament existent pastrat.
_record_order_error() DRY helper evita duplicarea upsert/add_items.
Contract ANAF down/notFound/found documentat in anaf_service._call_anaf_api.
9 teste unit (inclusiv T5 CRITIC: ANAF down nu blocheaza) + T7 COALESCE.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 09:40:07 +00:00

217 lines
8.0 KiB
Python

import re
import logging
import httpx
import asyncio
from datetime import datetime
logger = logging.getLogger(__name__)
# Romanian diacritics to ASCII mapping (same 14 chars as import_service)
_DIACRITICS = str.maketrans('ĂăÂâÎîȘșȚțŞşŢţ', 'AAAAIISSTTSSTT')
def strip_ro_prefix(cod_fiscal: str) -> str:
"""Normalize CUI: strip whitespace, uppercase, remove 'RO' prefix, fix OCR-like typos."""
if not cod_fiscal:
return ""
cleaned = cod_fiscal.strip().upper()
cleaned = re.sub(r'^RO\s*', '', cleaned)
# Fix common character confusions in CUI (O→0, I→1, L→1, B→8)
cleaned = cleaned.translate(str.maketrans('OIL', '011'))
return cleaned
def validate_cui(bare_cui: str) -> bool:
"""Validate bare CUI: digits only, length 2-10."""
if not bare_cui:
return False
return bare_cui.isdigit() and 2 <= len(bare_cui) <= 10
# Cheia de testare CUI Romania (9 ponderi, aliniate la dreapta cu cifrele fara cifra de control)
_CUI_KEY = [7, 5, 3, 2, 1, 7, 5, 3, 2]
def validate_cui_checksum(bare_cui: str) -> bool:
"""Validate CUI check digit using the Romanian algorithm.
Algorithm: pad to 9 digits (without check digit), multiply by key 753217532,
sum products, (sum * 10) % 11 → if 10 then 0, else result == check digit.
"""
if not validate_cui(bare_cui):
return False
digits = [int(d) for d in bare_cui]
check_digit = digits[-1]
body = digits[:-1]
padded = [0] * (9 - len(body)) + body
total = sum(d * k for d, k in zip(padded, _CUI_KEY))
result = (total * 10) % 11
if result == 10:
result = 0
return result == check_digit
def sanitize_cui(raw_cf: str) -> tuple[str, str | None]:
"""Sanitize and validate CUI. Returns (clean_cui, warning_or_none).
Steps: strip RO prefix, fix OCR typos (O→0), validate checksum.
If sanitized version passes checksum but original didn't, returns the fixed CUI.
If neither passes, returns original with warning.
"""
bare = strip_ro_prefix(raw_cf)
if not bare:
return bare, None
if validate_cui(bare) and validate_cui_checksum(bare):
return bare, None
# Sanitized version passes format but not checksum
if validate_cui(bare):
return bare, f"CUI {bare} nu trece verificarea cifrei de control"
# Not even valid format
return bare, f"CUI {raw_cf!r} contine caractere invalide dupa sanitizare: {bare!r}"
async def check_vat_status_batch(cui_list: list[str], date: str = None, log_fn=None) -> dict[str, dict]:
"""POST to ANAF API to check VAT status for a batch of CUIs.
Chunks in batches of 500 (ANAF API limit).
Returns {cui_str: {"scpTVA": bool|None, "denumire_anaf": str, "checked_at": str}, ...}
"""
if not cui_list:
return {}
check_date = date or datetime.now().strftime("%Y-%m-%d")
results = {}
for i in range(0, len(cui_list), 500):
chunk = cui_list[i:i+500]
body = [{"cui": int(cui), "data": check_date} for cui in chunk if cui.isdigit()]
if not body:
continue
chunk_results = await _call_anaf_api(body, log_fn=log_fn)
results.update(chunk_results)
return results
async def _call_anaf_api(body: list[dict], retry: int = 0, log_fn=None) -> dict[str, dict]:
"""Internal: single ANAF API call with retry logic."""
url = "https://webservicesp.anaf.ro/api/PlatitorTvaRest/v9/tva"
results = {}
def _log_error(msg: str):
logger.error(msg)
if log_fn:
log_fn(f"ANAF eroare: {msg}")
def _log_warning(msg: str):
logger.warning(msg)
if log_fn:
log_fn(f"ANAF warn: {msg}")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, json=body)
if response.status_code == 429:
if retry < 1:
_log_warning("ANAF API rate limited (429), retrying in 10s...")
await asyncio.sleep(10)
return await _call_anaf_api(body, retry + 1, log_fn)
_log_error("ANAF API rate limited after retry")
return {}
if response.status_code >= 500:
if retry < 1:
_log_warning(f"ANAF API server error ({response.status_code}), retrying in 3s...")
await asyncio.sleep(3)
return await _call_anaf_api(body, retry + 1, log_fn)
_log_error(f"ANAF API server error after retry: {response.status_code}")
return {}
if 400 <= response.status_code < 500:
_log_error(f"ANAF API client error {response.status_code} (nu se reincearca)")
return {}
response.raise_for_status()
data = response.json()
checked_at = datetime.now().isoformat()
# CONTRACT (consumed by sync_service.evaluate_cui_gate):
# Return {} → transient error (down/429/5xx/timeout)
# Return {cui: {scpTVA: None, denumire_anaf: ""}} → ANAF notFound explicit
# Return {cui: {scpTVA: bool, denumire_anaf: str}} → ANAF found
# If you change this semantics, update the gate in sync_service too.
# Parse ANAF response
found_list = data.get("found", [])
for item in found_list:
date_generals = item.get("date_generale", {})
cui_str = str(date_generals.get("cui", ""))
results[cui_str] = {
"scpTVA": item.get("inregistrare_scop_Tva", {}).get("scpTVA"),
"denumire_anaf": date_generals.get("denumire", ""),
"checked_at": checked_at,
}
# Not found CUIs — ANAF returns plain integers (CUI values), not dicts
notfound_list = data.get("notFound", [])
for item in notfound_list:
if isinstance(item, int):
cui_str = str(item)
else:
date_gen = item.get("date_generale", {})
cui_str = str(date_gen.get("cui", item.get("cui", "")))
results[cui_str] = {
"scpTVA": None,
"denumire_anaf": "",
"checked_at": checked_at,
}
logger.info(f"ANAF batch: {len(body)} CUIs → {len(found_list)} found, {len(notfound_list)} not found")
except httpx.TimeoutException:
if retry < 1:
_log_warning("ANAF API timeout, retrying in 3s...")
await asyncio.sleep(3)
return await _call_anaf_api(body, retry + 1, log_fn)
_log_error("ANAF API timeout after retry")
except Exception as e:
if retry < 1:
_log_warning(f"ANAF API error: {e}, retrying in 3s...")
await asyncio.sleep(3)
return await _call_anaf_api(body, retry + 1, log_fn)
_log_error(f"ANAF API error after retry: {e}")
return results
def determine_correct_cod_fiscal(bare_cui: str, is_vat_payer: bool | None) -> str:
"""Determine the correct cod_fiscal format based on ANAF VAT status.
True → "RO" + bare, False → bare, None → bare (conservative)
"""
if is_vat_payer is True:
return "RO" + bare_cui
return bare_cui
def normalize_company_name(name: str) -> str:
"""Normalize company name for comparison: strip SRL/SA suffixes, diacritics, punctuation."""
if not name:
return ""
result = name.strip().upper()
# Strip diacritics
result = result.translate(_DIACRITICS)
# Remove common suffixes and legal forms
result = re.sub(r'\b(S\.?R\.?L\.?|S\.?A\.?|S\.?C\.?|S\.?N\.?C\.?|S\.?C\.?S\.?|P\.?F\.?A\.?|INTREPRINDERE\s+INDIVIDUALA)\b', '', result)
# Strip II only at start of name (avoid matching Roman numeral II in "TEHNICA II SRL")
result = re.sub(r'^I\.?I\.?\s+', '', result)
# Remove punctuation and extra spaces
result = re.sub(r'[^\w\s]', '', result)
result = re.sub(r'\s+', ' ', result).strip()
return result