feat(anaf-dedup): ANAF partner dedup + address fix + UI enrichment

Prevent partner duplicates via ANAF CUI verification and dual PL/SQL
search. Fix address matching with street-level comparison and diacritics
normalization. Show partner/address comparison in order detail modal.

- New anaf_service.py: batch ANAF API client with chunking, retry, cache
- PL/SQL: dual CUI search (bare/RO+bare/RO space+bare), 3-tier address
  search (street+city+id_loc → city+id_loc → create), strip_diacritics
  at storage for addresses and partner names
- SQLite: anaf_cache table, 12 new order columns for partner/address data
- import_service: cod_fiscal_override param, return partner/address from Oracle
- sync_service: ANAF batch integration, denomination mismatch detection,
  cache pre-population trigger
- Router: enriched order_detail with partner_info + addresses JSON
- UI: collapsible Detalii Partener + Adrese Comparativ sections in modal,
  auto-expand on mismatch, ANAF badges, mobile address cards
- Dashboard: address quality attention indicator
- New scan_duplicate_partners.py script for one-time duplicate audit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-04-01 14:36:52 +00:00
parent 3b9198d742
commit 2f593c30f6
12 changed files with 925 additions and 64 deletions

View File

@@ -0,0 +1,142 @@
import re
import logging
import httpx
import asyncio
from datetime import datetime
logger = logging.getLogger(__name__)
# Romanian diacritics to ASCII mapping (same 14 chars as import_service)
_DIACRITICS = str.maketrans('ĂăÂâÎîȘșȚțŞşŢţ', 'AAAAIISSTTSSTT')
def strip_ro_prefix(cod_fiscal: str) -> str:
"""Normalize CUI: strip whitespace, uppercase, remove 'RO' prefix."""
if not cod_fiscal:
return ""
cleaned = cod_fiscal.strip().upper()
return re.sub(r'^RO\s*', '', cleaned)
def validate_cui(bare_cui: str) -> bool:
"""Validate bare CUI: digits only, length 1-13."""
if not bare_cui:
return False
return bare_cui.isdigit() and 1 <= len(bare_cui) <= 13
async def check_vat_status_batch(cui_list: list[str], date: str = None) -> dict[str, dict]:
"""POST to ANAF API to check VAT status for a batch of CUIs.
Chunks in batches of 500 (ANAF API limit).
Returns {cui_str: {"scpTVA": bool|None, "denumire_anaf": str, "checked_at": str}, ...}
"""
if not cui_list:
return {}
check_date = date or datetime.now().strftime("%Y-%m-%d")
results = {}
for i in range(0, len(cui_list), 500):
chunk = cui_list[i:i+500]
body = [{"cui": int(cui), "data": check_date} for cui in chunk if cui.isdigit()]
if not body:
continue
chunk_results = await _call_anaf_api(body)
results.update(chunk_results)
return results
async def _call_anaf_api(body: list[dict], retry: int = 0) -> dict[str, dict]:
"""Internal: single ANAF API call with retry logic."""
url = "https://webservicesp.anaf.ro/api/PlatitorTvaRest/v9/tva"
results = {}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, json=body)
if response.status_code == 429:
if retry < 1:
logger.warning("ANAF API rate limited (429), retrying in 10s...")
await asyncio.sleep(10)
return await _call_anaf_api(body, retry + 1)
logger.error("ANAF API rate limited after retry")
return {}
if response.status_code >= 500:
if retry < 1:
logger.warning(f"ANAF API server error ({response.status_code}), retrying in 3s...")
await asyncio.sleep(3)
return await _call_anaf_api(body, retry + 1)
logger.error(f"ANAF API server error after retry: {response.status_code}")
return {}
response.raise_for_status()
data = response.json()
checked_at = datetime.now().isoformat()
# Parse ANAF response
found_list = data.get("found", [])
for item in found_list:
cui_str = str(item.get("cui", ""))
date_generals = item.get("date_generale", {})
results[cui_str] = {
"scpTVA": item.get("inregistrare_scop_Tva", {}).get("scpTVA"),
"denumire_anaf": date_generals.get("denumire", ""),
"checked_at": checked_at,
}
# Not found CUIs
notfound_list = data.get("notfound", [])
for item in notfound_list:
cui_str = str(item.get("cui", ""))
results[cui_str] = {
"scpTVA": None,
"denumire_anaf": "",
"checked_at": checked_at,
}
logger.info(f"ANAF batch: {len(body)} CUIs → {len(found_list)} found, {len(notfound_list)} not found")
except httpx.TimeoutException:
if retry < 1:
logger.warning("ANAF API timeout, retrying in 3s...")
await asyncio.sleep(3)
return await _call_anaf_api(body, retry + 1)
logger.error("ANAF API timeout after retry")
except Exception as e:
if retry < 1:
logger.warning(f"ANAF API error: {e}, retrying in 3s...")
await asyncio.sleep(3)
return await _call_anaf_api(body, retry + 1)
logger.error(f"ANAF API error after retry: {e}")
return results
def determine_correct_cod_fiscal(bare_cui: str, is_vat_payer: bool | None) -> str:
"""Determine the correct cod_fiscal format based on ANAF VAT status.
True → "RO" + bare, False → bare, None → bare (conservative)
"""
if is_vat_payer is True:
return "RO" + bare_cui
return bare_cui
def normalize_company_name(name: str) -> str:
"""Normalize company name for comparison: strip SRL/SA suffixes, diacritics, punctuation."""
if not name:
return ""
result = name.strip().upper()
# Strip diacritics
result = result.translate(_DIACRITICS)
# Remove common suffixes
result = re.sub(r'\b(S\.?R\.?L\.?|S\.?A\.?|S\.?C\.?|S\.?N\.?C\.?|S\.?C\.?S\.?)\b', '', result)
# Remove punctuation and extra spaces
result = re.sub(r'[^\w\s]', '', result)
result = re.sub(r'\s+', ' ', result).strip()
return result

View File

@@ -201,7 +201,7 @@ def build_articles_json(items, order=None, settings=None) -> str:
return json.dumps(articles)
def import_single_order(order, id_pol: int = None, id_sectie: int = None, app_settings: dict = None, id_gestiuni: list[int] = None) -> dict:
def import_single_order(order, id_pol: int = None, id_sectie: int = None, app_settings: dict = None, id_gestiuni: list[int] = None, cod_fiscal_override: str = None) -> dict:
"""Import a single order into Oracle ROA.
Returns dict with:
@@ -239,7 +239,7 @@ def import_single_order(order, id_pol: int = None, id_sectie: int = None, app_se
if order.billing.is_company:
denumire = clean_web_text(order.billing.company_name).upper()
cod_fiscal = clean_web_text(order.billing.company_code) or None
cod_fiscal = cod_fiscal_override or clean_web_text(order.billing.company_code) or None
registru = clean_web_text(order.billing.company_reg) or None
is_pj = 1
else:
@@ -267,6 +267,12 @@ def import_single_order(order, id_pol: int = None, id_sectie: int = None, app_se
result["id_partener"] = int(partner_id)
# Query partner data from Oracle for sync back to SQLite
cur.execute("SELECT denumire, cod_fiscal FROM nom_parteneri WHERE id_part = :1", [partner_id])
row = cur.fetchone()
result["denumire_roa"] = row[0] if row else None
result["cod_fiscal_roa"] = row[1] if row else None
# Determine if billing and shipping are different persons
billing_name = clean_web_text(
f"{order.billing.lastname} {order.billing.firstname}"
@@ -350,6 +356,16 @@ def import_single_order(order, id_pol: int = None, id_sectie: int = None, app_se
if addr_livr_id is not None:
result["id_adresa_livrare"] = int(addr_livr_id)
# Query address details from Oracle for sync back to SQLite
if addr_livr_id:
cur.execute("SELECT strada, numar, localitate, judet FROM vadrese_parteneri WHERE id_adresa = :1", [int(addr_livr_id)])
row = cur.fetchone()
result["adresa_livrare_roa"] = {"strada": row[0], "numar": row[1], "localitate": row[2], "judet": row[3]} if row else None
if addr_fact_id and addr_fact_id != addr_livr_id:
cur.execute("SELECT strada, numar, localitate, judet FROM vadrese_parteneri WHERE id_adresa = :1", [int(addr_fact_id)])
row = cur.fetchone()
result["adresa_facturare_roa"] = {"strada": row[0], "numar": row[1], "localitate": row[2], "judet": row[3]} if row else None
# Step 4: Build articles JSON and import order
articles_json = build_articles_json(order.items, order, app_settings)

View File

@@ -1009,3 +1009,161 @@ async def get_price_sync_runs(page: int = 1, per_page: int = 20):
return {"runs": runs, "total": total, "page": page, "pages": (total + per_page - 1) // per_page}
finally:
await db.close()
# ── ANAF Cache ───────────────────────────────────
async def get_anaf_cache(bare_cui: str) -> dict | None:
"""Get cached ANAF data for a CUI (valid for 7 days)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT scp_tva, denumire_anaf, checked_at
FROM anaf_cache
WHERE cui = ? AND checked_at > datetime('now', '-7 days')
""", (bare_cui,))
row = await cursor.fetchone()
if not row:
return None
return {
"scpTVA": bool(row["scp_tva"]) if row["scp_tva"] is not None else None,
"denumire_anaf": row["denumire_anaf"] or "",
"checked_at": row["checked_at"],
}
finally:
await db.close()
async def upsert_anaf_cache(cui: str, scp_tva: int | None, denumire_anaf: str):
"""Insert or update ANAF cache entry."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR REPLACE INTO anaf_cache (cui, scp_tva, denumire_anaf, checked_at)
VALUES (?, ?, ?, datetime('now'))
""", (cui, scp_tva, denumire_anaf))
await db.commit()
finally:
await db.close()
async def bulk_populate_anaf_cache(results: dict[str, dict]):
"""Batch insert/update ANAF cache entries.
results format: {cui: {"scpTVA": bool|None, "denumire_anaf": str, "checked_at": str}, ...}
"""
if not results:
return
db = await get_sqlite()
try:
rows = []
for cui, data in results.items():
scp = None
if data.get("scpTVA") is True:
scp = 1
elif data.get("scpTVA") is False:
scp = 0
rows.append((cui, scp, data.get("denumire_anaf", ""), data.get("checked_at", _now_str())))
await db.executemany("""
INSERT OR REPLACE INTO anaf_cache (cui, scp_tva, denumire_anaf, checked_at)
VALUES (?, ?, ?, ?)
""", rows)
await db.commit()
finally:
await db.close()
# ── Partner/Address Data on Orders ─────────────────
async def update_order_partner_data(order_number: str, partner_data: dict):
"""Update order with partner/ANAF/address comparison data.
partner_data keys: cod_fiscal_gomag, cod_fiscal_roa, denumire_roa,
anaf_platitor_tva, anaf_checked_at, anaf_cod_fiscal_adjusted,
adresa_livrare_gomag, adresa_facturare_gomag, adresa_livrare_roa,
adresa_facturare_roa, anaf_denumire_mismatch, denumire_anaf
"""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
cod_fiscal_gomag = ?,
cod_fiscal_roa = ?,
denumire_roa = ?,
anaf_platitor_tva = ?,
anaf_checked_at = ?,
anaf_cod_fiscal_adjusted = ?,
adresa_livrare_gomag = ?,
adresa_facturare_gomag = ?,
adresa_livrare_roa = ?,
adresa_facturare_roa = ?,
anaf_denumire_mismatch = ?,
denumire_anaf = ?,
updated_at = datetime('now')
WHERE order_number = ?
""", (
partner_data.get("cod_fiscal_gomag"),
partner_data.get("cod_fiscal_roa"),
partner_data.get("denumire_roa"),
partner_data.get("anaf_platitor_tva"),
partner_data.get("anaf_checked_at"),
partner_data.get("anaf_cod_fiscal_adjusted", 0),
partner_data.get("adresa_livrare_gomag"),
partner_data.get("adresa_facturare_gomag"),
partner_data.get("adresa_livrare_roa"),
partner_data.get("adresa_facturare_roa"),
partner_data.get("anaf_denumire_mismatch", 0),
partner_data.get("denumire_anaf"),
order_number,
))
await db.commit()
finally:
await db.close()
# ── Address Quality Cache (via app_settings) ──────
async def get_incomplete_addresses_count() -> int:
"""Get cached count of orders with incomplete ROA addresses.
Returns -1 if cache is stale (> 1 hour old) or not set.
"""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT value FROM app_settings WHERE key = 'incomplete_addresses_checked_at'"
)
row = await cursor.fetchone()
if not row or not row["value"]:
return -1
# Check freshness
from datetime import datetime, timedelta
try:
checked_at = datetime.fromisoformat(row["value"])
if datetime.now() - checked_at > timedelta(hours=1):
return -1
except (ValueError, TypeError):
return -1
cursor = await db.execute(
"SELECT value FROM app_settings WHERE key = 'incomplete_addresses_count'"
)
row = await cursor.fetchone()
return int(row["value"]) if row and row["value"] else 0
finally:
await db.close()
async def set_incomplete_addresses_count(count: int):
"""Cache incomplete addresses count in app_settings."""
db = await get_sqlite()
try:
await db.execute(
"INSERT OR REPLACE INTO app_settings (key, value) VALUES ('incomplete_addresses_count', ?)",
(str(count),)
)
await db.execute(
"INSERT OR REPLACE INTO app_settings (key, value) VALUES ('incomplete_addresses_checked_at', ?)",
(_now_str(),)
)
await db.commit()
finally:
await db.close()

View File

@@ -12,7 +12,7 @@ def _now():
"""Return current time in Bucharest timezone (naive, for display/storage)."""
return datetime.now(_tz_bucharest).replace(tzinfo=None)
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client, anaf_service
from ..config import settings
from .. import database
@@ -638,7 +638,51 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
0, len(truly_importable),
{"imported": 0, "skipped": skipped_count, "errors": 0, "already_imported": already_imported_count})
# Step 4: Import only truly new orders
# ANAF cache pre-population check
try:
db_check = await sqlite_service.get_sqlite()
try:
cursor = await db_check.execute("SELECT COUNT(*) FROM anaf_cache WHERE checked_at > datetime('now', '-7 days')")
row = await cursor.fetchone()
cache_count = row[0] if row else 0
finally:
await db_check.close()
if cache_count < 10:
_log_line(run_id, "ANAF pre-populare cache...")
except Exception as e:
logger.warning(f"ANAF cache pre-population check failed: {e}")
# Step 4: ANAF batch verification for company CUIs
company_cuis = set()
for order in truly_importable:
if order.billing.is_company and order.billing.company_code:
raw_cf = import_service.clean_web_text(order.billing.company_code) or ""
bare = anaf_service.strip_ro_prefix(raw_cf)
if anaf_service.validate_cui(bare):
company_cuis.add(bare)
# Check anaf_cache for already-known CUIs (7-day validity)
uncached_cuis = []
cached_results = {}
for cui in company_cuis:
cached = await sqlite_service.get_anaf_cache(cui)
if cached:
cached_results[cui] = cached
else:
uncached_cuis.append(cui)
# Batch ANAF call for uncached CUIs only
if uncached_cuis:
_log_line(run_id, f"ANAF: verificare {len(uncached_cuis)} CUI-uri noi...")
anaf_results = await anaf_service.check_vat_status_batch(uncached_cuis)
if anaf_results:
await sqlite_service.bulk_populate_anaf_cache(anaf_results)
cached_results.update(anaf_results)
else:
_log_line(run_id, "ANAF: batch call esuat, continua fara corectie CUI")
# Step 5: Import only truly new orders
imported_count = 0
error_count = 0
@@ -651,10 +695,25 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
"already_imported": already_imported_count})
# Determine cod_fiscal override from ANAF data
cod_fiscal_override = None
anaf_data_for_order = None
raw_cf = ""
if order.billing.is_company and order.billing.company_code:
raw_cf = import_service.clean_web_text(order.billing.company_code) or ""
bare_cui = anaf_service.strip_ro_prefix(raw_cf)
anaf_data_for_order = cached_results.get(bare_cui)
if anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None:
correct_cf = anaf_service.determine_correct_cod_fiscal(bare_cui, anaf_data_for_order["scpTVA"])
if correct_cf != raw_cf:
_log_line(run_id, f"#{order.number} CUI corectat: {raw_cf}{correct_cf}")
cod_fiscal_override = correct_cf
result = await asyncio.to_thread(
import_service.import_single_order,
order, id_pol=id_pol, id_sectie=id_sectie,
app_settings=app_settings, id_gestiuni=id_gestiuni
app_settings=app_settings, id_gestiuni=id_gestiuni,
cod_fiscal_override=cod_fiscal_override
)
# Build order items data for storage (R9)
@@ -702,7 +761,34 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
)
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → IMPORTAT (ID: {result['id_comanda']})")
else:
# Save partner + ANAF + address data to SQLite
if result["success"] or result.get("id_partener"):
partner_data = {
"cod_fiscal_gomag": raw_cf if order.billing.is_company else None,
"cod_fiscal_roa": result.get("cod_fiscal_roa"),
"denumire_roa": result.get("denumire_roa"),
"anaf_platitor_tva": (1 if anaf_data_for_order.get("scpTVA") else 0) if anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None else None,
"anaf_checked_at": anaf_data_for_order.get("checked_at") if anaf_data_for_order else None,
"anaf_cod_fiscal_adjusted": 1 if cod_fiscal_override and cod_fiscal_override != raw_cf else 0,
"adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None,
"adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}),
"adresa_livrare_roa": json.dumps(result.get("adresa_livrare_roa")) if result.get("adresa_livrare_roa") else None,
"adresa_facturare_roa": json.dumps(result.get("adresa_facturare_roa")) if result.get("adresa_facturare_roa") else None,
"anaf_denumire_mismatch": 0,
"denumire_anaf": None,
}
# Denomination mismatch check
if anaf_data_for_order and anaf_data_for_order.get("denumire_anaf") and order.billing.is_company:
norm_gomag = anaf_service.normalize_company_name(order.billing.company_name or "")
norm_anaf = anaf_service.normalize_company_name(anaf_data_for_order["denumire_anaf"])
if norm_gomag and norm_anaf and norm_gomag != norm_anaf:
partner_data["anaf_denumire_mismatch"] = 1
partner_data["denumire_anaf"] = anaf_data_for_order["denumire_anaf"]
await sqlite_service.update_order_partner_data(order.number, partner_data)
if not result["success"]:
error_count += 1
await sqlite_service.upsert_order(
sync_run_id=run_id,