From 5d631c12faea235ce3897af099f4950bb7e1f4e1 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Thu, 2 Apr 2026 14:27:32 +0000 Subject: [PATCH] feat(sync): add batched ANAF backfill for orders missing TVA status Orders imported before the ANAF dedup feature had no anaf_platitor_tva. Step 4c now auto-backfills on each sync: batch cache lookup, single ANAF API call for uncached CUIs, bulk DB update in one transaction. Co-Authored-By: Claude Opus 4.6 (1M context) --- api/app/services/sqlite_service.py | 68 +++++++++++++++++++++++++++++- api/app/services/sync_service.py | 51 ++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/api/app/services/sqlite_service.py b/api/app/services/sqlite_service.py index f04a23d..0c65cc3 100644 --- a/api/app/services/sqlite_service.py +++ b/api/app/services/sqlite_service.py @@ -1085,7 +1085,7 @@ async def bulk_populate_anaf_cache(results: dict[str, dict]): async def get_expired_cuis_for_prepopulate() -> list[str]: """Get CUIs from recent orders that need ANAF cache refresh.""" - from ..services import anaf_service + from . import anaf_service db = await get_sqlite() try: cursor = await db.execute(""" @@ -1160,6 +1160,72 @@ async def update_order_partner_data(order_number: str, partner_data: dict): await db.close() +async def get_orders_missing_anaf() -> list[dict]: + """Get orders with cod_fiscal_roa set but no ANAF data (for backfill).""" + db = await get_sqlite() + try: + cursor = await db.execute(""" + SELECT order_number, cod_fiscal_roa, denumire_roa, customer_name + FROM orders + WHERE cod_fiscal_roa IS NOT NULL + AND cod_fiscal_roa != '' + AND anaf_platitor_tva IS NULL + AND status IN ('IMPORTED', 'ALREADY_IMPORTED') + """) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + finally: + await db.close() + + +async def get_anaf_cache_batch(bare_cuis: list[str]) -> dict[str, dict]: + """Get cached ANAF data for multiple CUIs (valid for 7 days).""" + if not bare_cuis: + return {} + db = await get_sqlite() + try: + placeholders = ",".join("?" for _ in bare_cuis) + cursor = await db.execute(f""" + SELECT cui, scp_tva, denumire_anaf, checked_at + FROM anaf_cache + WHERE cui IN ({placeholders}) AND checked_at > datetime('now', '-7 days') + """, bare_cuis) + rows = await cursor.fetchall() + return { + r["cui"]: { + "scpTVA": bool(r["scp_tva"]) if r["scp_tva"] is not None else None, + "denumire_anaf": r["denumire_anaf"] or "", + "checked_at": r["checked_at"], + } + for r in rows + } + finally: + await db.close() + + +async def bulk_update_order_anaf_data(updates: list[tuple]): + """Batch update orders with ANAF data. + + updates: list of (anaf_platitor_tva, anaf_checked_at, anaf_denumire_mismatch, denumire_anaf, order_number) + """ + if not updates: + return + db = await get_sqlite() + try: + await db.executemany(""" + UPDATE orders SET + anaf_platitor_tva = ?, + anaf_checked_at = ?, + anaf_denumire_mismatch = ?, + denumire_anaf = ?, + updated_at = datetime('now') + WHERE order_number = ? + """, updates) + await db.commit() + finally: + await db.close() + + # ── Address Quality Cache (via app_settings) ────── async def get_incomplete_addresses_count() -> int: diff --git a/api/app/services/sync_service.py b/api/app/services/sync_service.py index 83cb90e..0e558e5 100644 --- a/api/app/services/sync_service.py +++ b/api/app/services/sync_service.py @@ -893,6 +893,57 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None except Exception as e: logger.warning(f"Invoice/order status check failed: {e}") + # Step 4c: ANAF backfill — populate anaf_platitor_tva for orders with CUI but no ANAF data + try: + orders_needing_anaf = await sqlite_service.get_orders_missing_anaf() + if orders_needing_anaf: + # Group orders by unique CUI + from collections import defaultdict + cui_to_orders = defaultdict(list) + for o in orders_needing_anaf: + bare = anaf_service.strip_ro_prefix(o["cod_fiscal_roa"]) + if anaf_service.validate_cui(bare): + cui_to_orders[bare].append(o) + + # Batch cache lookup + unique_cuis = list(cui_to_orders.keys()) + anaf_cache = await sqlite_service.get_anaf_cache_batch(unique_cuis) + + # Single ANAF API call for uncached CUIs + uncached = [c for c in unique_cuis if c not in anaf_cache] + if uncached: + fresh = await anaf_service.check_vat_status_batch(uncached) + if fresh: + await sqlite_service.bulk_populate_anaf_cache(fresh) + anaf_cache.update(fresh) + + # Build batch updates + db_updates = [] + for cui, orders_for_cui in cui_to_orders.items(): + data = anaf_cache.get(cui) + if not data or data.get("scpTVA") is None: + continue + platitor = 1 if data["scpTVA"] else 0 + checked_at = data.get("checked_at") + denumire_anaf = data.get("denumire_anaf") or "" + for o in orders_for_cui: + mismatch = 0 + den_store = None + if denumire_anaf: + norm_roa = anaf_service.normalize_company_name(o.get("denumire_roa") or o.get("customer_name") or "") + norm_anaf = anaf_service.normalize_company_name(denumire_anaf) + if norm_roa and norm_anaf and norm_roa != norm_anaf: + mismatch = 1 + den_store = denumire_anaf + db_updates.append((platitor, checked_at, mismatch, den_store, o["order_number"])) + + await sqlite_service.bulk_update_order_anaf_data(db_updates) + if db_updates: + _log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate") + except Exception as e: + logger.warning(f"ANAF backfill failed: {e}") + _log_line(run_id, f"ANAF backfill eroare: {e}") + # Step 5: Update sync run total_imported = imported_count + already_imported_count # backward-compat status = "completed" if error_count <= 10 else "failed"