feat(sync): add batched ANAF backfill for orders missing TVA status
Orders imported before the ANAF dedup feature had no anaf_platitor_tva. Step 4c now auto-backfills on each sync: batch cache lookup, single ANAF API call for uncached CUIs, bulk DB update in one transaction. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1085,7 +1085,7 @@ async def bulk_populate_anaf_cache(results: dict[str, dict]):
|
|||||||
|
|
||||||
async def get_expired_cuis_for_prepopulate() -> list[str]:
|
async def get_expired_cuis_for_prepopulate() -> list[str]:
|
||||||
"""Get CUIs from recent orders that need ANAF cache refresh."""
|
"""Get CUIs from recent orders that need ANAF cache refresh."""
|
||||||
from ..services import anaf_service
|
from . import anaf_service
|
||||||
db = await get_sqlite()
|
db = await get_sqlite()
|
||||||
try:
|
try:
|
||||||
cursor = await db.execute("""
|
cursor = await db.execute("""
|
||||||
@@ -1160,6 +1160,72 @@ async def update_order_partner_data(order_number: str, partner_data: dict):
|
|||||||
await db.close()
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def get_orders_missing_anaf() -> list[dict]:
|
||||||
|
"""Get orders with cod_fiscal_roa set but no ANAF data (for backfill)."""
|
||||||
|
db = await get_sqlite()
|
||||||
|
try:
|
||||||
|
cursor = await db.execute("""
|
||||||
|
SELECT order_number, cod_fiscal_roa, denumire_roa, customer_name
|
||||||
|
FROM orders
|
||||||
|
WHERE cod_fiscal_roa IS NOT NULL
|
||||||
|
AND cod_fiscal_roa != ''
|
||||||
|
AND anaf_platitor_tva IS NULL
|
||||||
|
AND status IN ('IMPORTED', 'ALREADY_IMPORTED')
|
||||||
|
""")
|
||||||
|
rows = await cursor.fetchall()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def get_anaf_cache_batch(bare_cuis: list[str]) -> dict[str, dict]:
|
||||||
|
"""Get cached ANAF data for multiple CUIs (valid for 7 days)."""
|
||||||
|
if not bare_cuis:
|
||||||
|
return {}
|
||||||
|
db = await get_sqlite()
|
||||||
|
try:
|
||||||
|
placeholders = ",".join("?" for _ in bare_cuis)
|
||||||
|
cursor = await db.execute(f"""
|
||||||
|
SELECT cui, scp_tva, denumire_anaf, checked_at
|
||||||
|
FROM anaf_cache
|
||||||
|
WHERE cui IN ({placeholders}) AND checked_at > datetime('now', '-7 days')
|
||||||
|
""", bare_cuis)
|
||||||
|
rows = await cursor.fetchall()
|
||||||
|
return {
|
||||||
|
r["cui"]: {
|
||||||
|
"scpTVA": bool(r["scp_tva"]) if r["scp_tva"] is not None else None,
|
||||||
|
"denumire_anaf": r["denumire_anaf"] or "",
|
||||||
|
"checked_at": r["checked_at"],
|
||||||
|
}
|
||||||
|
for r in rows
|
||||||
|
}
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def bulk_update_order_anaf_data(updates: list[tuple]):
|
||||||
|
"""Batch update orders with ANAF data.
|
||||||
|
|
||||||
|
updates: list of (anaf_platitor_tva, anaf_checked_at, anaf_denumire_mismatch, denumire_anaf, order_number)
|
||||||
|
"""
|
||||||
|
if not updates:
|
||||||
|
return
|
||||||
|
db = await get_sqlite()
|
||||||
|
try:
|
||||||
|
await db.executemany("""
|
||||||
|
UPDATE orders SET
|
||||||
|
anaf_platitor_tva = ?,
|
||||||
|
anaf_checked_at = ?,
|
||||||
|
anaf_denumire_mismatch = ?,
|
||||||
|
denumire_anaf = ?,
|
||||||
|
updated_at = datetime('now')
|
||||||
|
WHERE order_number = ?
|
||||||
|
""", updates)
|
||||||
|
await db.commit()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
# ── Address Quality Cache (via app_settings) ──────
|
# ── Address Quality Cache (via app_settings) ──────
|
||||||
|
|
||||||
async def get_incomplete_addresses_count() -> int:
|
async def get_incomplete_addresses_count() -> int:
|
||||||
|
|||||||
@@ -893,6 +893,57 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Invoice/order status check failed: {e}")
|
logger.warning(f"Invoice/order status check failed: {e}")
|
||||||
|
|
||||||
|
# Step 4c: ANAF backfill — populate anaf_platitor_tva for orders with CUI but no ANAF data
|
||||||
|
try:
|
||||||
|
orders_needing_anaf = await sqlite_service.get_orders_missing_anaf()
|
||||||
|
if orders_needing_anaf:
|
||||||
|
# Group orders by unique CUI
|
||||||
|
from collections import defaultdict
|
||||||
|
cui_to_orders = defaultdict(list)
|
||||||
|
for o in orders_needing_anaf:
|
||||||
|
bare = anaf_service.strip_ro_prefix(o["cod_fiscal_roa"])
|
||||||
|
if anaf_service.validate_cui(bare):
|
||||||
|
cui_to_orders[bare].append(o)
|
||||||
|
|
||||||
|
# Batch cache lookup
|
||||||
|
unique_cuis = list(cui_to_orders.keys())
|
||||||
|
anaf_cache = await sqlite_service.get_anaf_cache_batch(unique_cuis)
|
||||||
|
|
||||||
|
# Single ANAF API call for uncached CUIs
|
||||||
|
uncached = [c for c in unique_cuis if c not in anaf_cache]
|
||||||
|
if uncached:
|
||||||
|
fresh = await anaf_service.check_vat_status_batch(uncached)
|
||||||
|
if fresh:
|
||||||
|
await sqlite_service.bulk_populate_anaf_cache(fresh)
|
||||||
|
anaf_cache.update(fresh)
|
||||||
|
|
||||||
|
# Build batch updates
|
||||||
|
db_updates = []
|
||||||
|
for cui, orders_for_cui in cui_to_orders.items():
|
||||||
|
data = anaf_cache.get(cui)
|
||||||
|
if not data or data.get("scpTVA") is None:
|
||||||
|
continue
|
||||||
|
platitor = 1 if data["scpTVA"] else 0
|
||||||
|
checked_at = data.get("checked_at")
|
||||||
|
denumire_anaf = data.get("denumire_anaf") or ""
|
||||||
|
for o in orders_for_cui:
|
||||||
|
mismatch = 0
|
||||||
|
den_store = None
|
||||||
|
if denumire_anaf:
|
||||||
|
norm_roa = anaf_service.normalize_company_name(o.get("denumire_roa") or o.get("customer_name") or "")
|
||||||
|
norm_anaf = anaf_service.normalize_company_name(denumire_anaf)
|
||||||
|
if norm_roa and norm_anaf and norm_roa != norm_anaf:
|
||||||
|
mismatch = 1
|
||||||
|
den_store = denumire_anaf
|
||||||
|
db_updates.append((platitor, checked_at, mismatch, den_store, o["order_number"]))
|
||||||
|
|
||||||
|
await sqlite_service.bulk_update_order_anaf_data(db_updates)
|
||||||
|
if db_updates:
|
||||||
|
_log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"ANAF backfill failed: {e}")
|
||||||
|
_log_line(run_id, f"ANAF backfill eroare: {e}")
|
||||||
|
|
||||||
# Step 5: Update sync run
|
# Step 5: Update sync run
|
||||||
total_imported = imported_count + already_imported_count # backward-compat
|
total_imported = imported_count + already_imported_count # backward-compat
|
||||||
status = "completed" if error_count <= 10 else "failed"
|
status = "completed" if error_count <= 10 else "failed"
|
||||||
|
|||||||
Reference in New Issue
Block a user