Files
gomag-vending/api/app/services/sync_service.py
Claude Agent 5eba87976b fix(address): use SOUNDEX city matching and strip SECTORUL from city
Fixes false negatives where city spellings differ slightly (e.g.
"Sfântu Ilie" vs "SFINTU ILIE") or ROA stores "BUCURESTI SECTORUL 1"
while GoMag sends "Municipiul București". Both backend (_addr_match)
and frontend (addrMatch) now use identical SOUNDEX logic mirroring
Oracle's implementation.

Also fixes field order: etaj before apart in r_street concatenation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 22:31:36 +00:00

1369 lines
71 KiB
Python

import asyncio
import json
import logging
import re
import uuid
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
_tz_bucharest = ZoneInfo("Europe/Bucharest")
def _now():
"""Return current time in Bucharest timezone (naive, for display/storage)."""
return datetime.now(_tz_bucharest).replace(tzinfo=None)
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client, anaf_service
from ..config import settings
from .. import database
logger = logging.getLogger(__name__)
def _addr_match(gomag_json, roa_json):
"""Server-side address comparison matching JS addrMatch()."""
if not gomag_json or not roa_json:
return True
try:
g = json.loads(gomag_json) if isinstance(gomag_json, str) else gomag_json
r = json.loads(roa_json) if isinstance(roa_json, str) else roa_json
except (json.JSONDecodeError, TypeError):
return True
_ADDR_WORDS = re.compile(
r'\bSECTORUL\s*\d*'
r'|\b(STR|STRADA|NR|NUMAR|NUMARUL|BL|BLOC|SC|SCARA|AP|APART|APARTAMENT|'
r'ET|ETAJ|COM|COMUNA|SAT|MUN|MUNICIPIUL|JUD|JUDETUL|CARTIER|PARTER|SECTOR|SECTORUL|ORAS)(?:\b|(?=\d))'
)
def norm(s):
s = (s or '').translate(import_service._DIACRITICS).upper()
s = _ADDR_WORDS.sub('', s)
return re.sub(r'[^A-Z0-9]', '', s)
def _soundex(s):
"""SOUNDEX matching Oracle's implementation — for city fuzzy compare."""
if not s:
return ''
_code = {'B':'1','F':'1','P':'1','V':'1',
'C':'2','G':'2','J':'2','K':'2','Q':'2','S':'2','X':'2','Z':'2',
'D':'3','T':'3','L':'4','M':'5','N':'5','R':'6'}
result = s[0]
prev = _code.get(s[0], '0')
for c in s[1:]:
if len(result) >= 4:
break
if c in 'AEIOU':
prev = '0'
elif c not in 'HW':
d = _code.get(c, '')
if d and d != prev:
result += d
if d:
prev = d
return result.ljust(4, '0')
g_street = norm(g.get('address') or g.get('strada') or '')
r_street = norm((r.get('strada') or '') + (r.get('numar') or '') + (r.get('bloc') or '') + (r.get('scara') or '') + (r.get('etaj') or '') + (r.get('apart') or ''))
g_city = norm(g.get('city') or g.get('localitate') or '')
r_city = norm(r.get('localitate') or '')
g_region = norm(g.get('region') or g.get('judet') or '')
r_region = norm(r.get('judet') or '')
return g_street == r_street and _soundex(g_city) == _soundex(r_city) and g_region == r_region
# Sync state
_sync_lock = asyncio.Lock()
_current_sync = None # dict with run_id, status, progress info
# In-memory text log buffer per run
_run_logs: dict[str, list[str]] = {}
def _log_line(run_id: str, message: str):
"""Append a timestamped line to the in-memory log buffer."""
if run_id not in _run_logs:
_run_logs[run_id] = []
ts = _now().strftime("%H:%M:%S")
_run_logs[run_id].append(f"[{ts}] {message}")
def get_run_text_log(run_id: str) -> str | None:
"""Return the accumulated text log for a run, or None if not found."""
lines = _run_logs.get(run_id)
if lines is None:
return None
return "\n".join(lines)
def _update_progress(phase: str, phase_text: str, current: int = 0, total: int = 0,
counts: dict = None):
"""Update _current_sync with progress details for polling."""
global _current_sync
if _current_sync is None:
return
_current_sync["phase"] = phase
_current_sync["phase_text"] = phase_text
_current_sync["progress_current"] = current
_current_sync["progress_total"] = total
_current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0}
async def get_sync_status():
"""Get current sync status."""
if _current_sync:
return {**_current_sync}
return {"status": "idle"}
async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict:
"""Prepare a sync run - creates run_id and sets initial state.
Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running.
"""
global _current_sync
if _sync_lock.locked():
return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None}
run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
_current_sync = {
"run_id": run_id,
"status": "running",
"started_at": _now().isoformat(),
"finished_at": None,
"phase": "starting",
"phase_text": "Starting...",
"progress_current": 0,
"progress_total": 0,
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0},
}
return {"run_id": run_id, "status": "starting"}
def _derive_customer_info(order):
"""Extract shipping/billing names and customer from an order.
customer = who appears on the invoice (partner in ROA):
- company name if billing is on a company
- shipping person name otherwise (consistent with import_service partner logic)
"""
shipping_name = ""
if order.shipping:
shipping_name = f"{getattr(order.shipping, 'lastname', '') or ''} {getattr(order.shipping, 'firstname', '') or ''}".strip()
billing_name = f"{getattr(order.billing, 'lastname', '') or ''} {getattr(order.billing, 'firstname', '') or ''}".strip()
if not shipping_name:
shipping_name = billing_name
if order.billing.is_company and order.billing.company_name:
customer = order.billing.company_name
else:
customer = shipping_name or billing_name
payment_method = getattr(order, 'payment_name', None) or None
delivery_method = getattr(order, 'delivery_name', None) or None
return shipping_name.upper(), billing_name.upper(), customer.upper(), payment_method, delivery_method
async def _fix_stale_error_orders(existing_map: dict, run_id: str):
"""Fix orders stuck in ERROR status that are actually in Oracle.
This can happen when a previous import committed partially (no rollback on error).
If the order exists in Oracle COMENZI, update SQLite status to ALREADY_IMPORTED.
"""
from ..database import get_sqlite
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT order_number FROM orders WHERE status = 'ERROR'"
)
error_orders = [row["order_number"] for row in await cursor.fetchall()]
fixed = 0
for order_number in error_orders:
if order_number in existing_map:
id_comanda = existing_map[order_number]
await db.execute("""
UPDATE orders SET
status = 'ALREADY_IMPORTED',
id_comanda = ?,
error_message = NULL,
updated_at = datetime('now')
WHERE order_number = ? AND status = 'ERROR'
""", (id_comanda, order_number))
fixed += 1
_log_line(run_id, f"#{order_number} → status corectat ERROR → ALREADY_IMPORTED (ID: {id_comanda})")
if fixed:
await db.commit()
logger.info(f"Fixed {fixed} stale ERROR orders that exist in Oracle")
finally:
await db.close()
async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict:
"""Run a full sync cycle. Returns summary dict."""
global _current_sync
if _sync_lock.locked():
return {"error": "Sync already running"}
async with _sync_lock:
# Use provided run_id or generate one
if not run_id:
run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
_current_sync = {
"run_id": run_id,
"status": "running",
"started_at": _now().isoformat(),
"finished_at": None,
"phase": "reading",
"phase_text": "Reading JSON files...",
"progress_current": 0,
"progress_total": 0,
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0},
}
_update_progress("reading", "Reading JSON files...")
started_dt = _now()
_run_logs[run_id] = [
f"=== Sync Run {run_id} ===",
f"Inceput: {started_dt.strftime('%d.%m.%Y %H:%M:%S')}",
""
]
json_dir = settings.JSON_OUTPUT_DIR
try:
# Phase 0: Download orders from GoMag API
_update_progress("downloading", "Descărcare comenzi din GoMag API...")
_log_line(run_id, "Descărcare comenzi din GoMag API...")
# Read GoMag settings from SQLite (override config defaults)
dl_settings = await sqlite_service.get_app_settings()
gomag_key = dl_settings.get("gomag_api_key") or None
gomag_shop = dl_settings.get("gomag_api_shop") or None
gomag_days_str = dl_settings.get("gomag_order_days_back")
gomag_days = int(gomag_days_str) if gomag_days_str else None
gomag_limit_str = dl_settings.get("gomag_limit")
gomag_limit = int(gomag_limit_str) if gomag_limit_str else None
dl_result = await gomag_client.download_orders(
json_dir, log_fn=lambda msg: _log_line(run_id, msg),
api_key=gomag_key, api_shop=gomag_shop,
days_back=gomag_days, limit=gomag_limit,
)
if dl_result["files"]:
_log_line(run_id, f"GoMag: {dl_result['total']} comenzi în {dl_result['pages']} pagini → {len(dl_result['files'])} fișiere")
_update_progress("reading", "Citire fisiere JSON...")
_log_line(run_id, "Citire fisiere JSON...")
# Step 1: Read orders and sort chronologically (oldest first - R3)
orders, json_count = order_reader.read_json_orders()
orders.sort(key=lambda o: o.date or '')
await sqlite_service.create_sync_run(run_id, json_count)
_update_progress("reading", f"Found {len(orders)} orders in {json_count} files", 0, len(orders))
_log_line(run_id, f"Gasite {len(orders)} comenzi in {json_count} fisiere")
# Populate web_products catalog from all orders (R4)
web_product_items = [
(item.sku, item.name)
for order in orders
for item in order.items
if item.sku and item.name
]
await sqlite_service.upsert_web_products_batch(web_product_items)
if not orders:
_log_line(run_id, "Nicio comanda gasita.")
await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0)
_update_progress("completed", "No orders found")
summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count}
return summary
# ── Separate cancelled orders (GoMag status "Anulata" / statusId "7") ──
cancelled_orders = [o for o in orders if o.status_id == "7" or (o.status and o.status.lower() == "anulata")]
active_orders = [o for o in orders if o not in cancelled_orders]
cancelled_count = len(cancelled_orders)
if cancelled_orders:
_log_line(run_id, f"Comenzi anulate in GoMag: {cancelled_count}")
# Record cancelled orders in SQLite
cancelled_batch = []
for order in cancelled_orders:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
order_items_data = [
{"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price,
"baseprice": item.baseprice, "vat": item.vat,
"mapping_status": "unknown", "codmat": None,
"id_articol": None, "cantitate_roa": None}
for item in order.items
]
cancelled_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "CANCELLED", "status_at_run": "CANCELLED",
"id_comanda": None, "id_partener": None,
"error_message": "Comanda anulata in GoMag",
"missing_skus": None,
"items_count": len(order.items),
"shipping_name": shipping_name, "billing_name": billing_name,
"payment_method": payment_method, "delivery_method": delivery_method,
"order_total": order.total or None,
"delivery_cost": order.delivery_cost or None,
"discount_total": order.discount_total or None,
"web_status": order.status or "Anulata",
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag")
await sqlite_service.save_orders_batch(cancelled_batch)
# Check if any cancelled orders were previously imported
from ..database import get_sqlite as _get_sqlite
db_check = await _get_sqlite()
try:
cancelled_numbers = [o.number for o in cancelled_orders]
placeholders = ",".join("?" for _ in cancelled_numbers)
cursor = await db_check.execute(f"""
SELECT order_number, id_comanda FROM orders
WHERE order_number IN ({placeholders})
AND id_comanda IS NOT NULL
AND status = 'CANCELLED'
""", cancelled_numbers)
previously_imported = [dict(r) for r in await cursor.fetchall()]
finally:
await db_check.close()
if previously_imported:
_log_line(run_id, f"Verificare {len(previously_imported)} comenzi anulate care erau importate in Oracle...")
# Check which have invoices
id_comanda_list = [o["id_comanda"] for o in previously_imported]
invoice_data = await asyncio.to_thread(
invoice_service.check_invoices_for_orders, id_comanda_list
)
for o in previously_imported:
idc = o["id_comanda"]
order_num = o["order_number"]
if idc in invoice_data:
# Invoiced — keep in Oracle, just log warning
_log_line(run_id,
f"#{order_num} → ANULAT dar FACTURAT (factura {invoice_data[idc].get('serie_act', '')}"
f"{invoice_data[idc].get('numar_act', '')}) — NU se sterge din Oracle")
# Update web_status but keep CANCELLED status (already set by batch above)
else:
# Not invoiced — soft-delete in Oracle
del_result = await asyncio.to_thread(
import_service.soft_delete_order_in_roa, idc
)
if del_result["success"]:
# Clear id_comanda via mark_order_cancelled
await sqlite_service.mark_order_cancelled(order_num, "Anulata")
_log_line(run_id,
f"#{order_num} → ANULAT + STERS din Oracle (ID: {idc}, "
f"{del_result['details_deleted']} detalii)")
else:
_log_line(run_id,
f"#{order_num} → ANULAT dar EROARE la stergere Oracle: {del_result['error']}")
orders = active_orders
if not orders:
_log_line(run_id, "Nicio comanda activa dupa filtrare anulate.")
await sqlite_service.update_sync_run(run_id, "completed", cancelled_count, 0, 0, 0)
_update_progress("completed", f"No active orders ({cancelled_count} cancelled)")
summary = {"run_id": run_id, "status": "completed",
"message": f"No active orders ({cancelled_count} cancelled)",
"json_files": json_count, "cancelled": cancelled_count}
return summary
_update_progress("validation", f"Validating {len(orders)} orders...", 0, len(orders))
# ── Single Oracle connection for entire validation phase ──
conn = await asyncio.to_thread(database.get_oracle_connection)
try:
# Step 2a: Find orders already in Oracle (date-range query)
order_dates = [o.date for o in orders if o.date]
if order_dates:
min_date_str = min(order_dates)
try:
min_date = datetime.strptime(min_date_str[:10], "%Y-%m-%d") - timedelta(days=1)
except (ValueError, TypeError):
min_date = _now() - timedelta(days=90)
else:
min_date = _now() - timedelta(days=90)
existing_map = await asyncio.to_thread(
validation_service.check_orders_in_roa, min_date, conn
)
# Step 2a-fix: Fix ERROR orders that are actually in Oracle
# (can happen if previous import committed partially without rollback)
await _fix_stale_error_orders(existing_map, run_id)
# Load app settings early (needed for id_gestiune in SKU validation)
app_settings = await sqlite_service.get_app_settings()
id_pol = id_pol or int(app_settings.get("id_pol") or 0) or settings.ID_POL
id_sectie = id_sectie or int(app_settings.get("id_sectie") or 0) or settings.ID_SECTIE
# Parse multi-gestiune CSV: "1,3" → [1, 3], "" → None
id_gestiune_raw = (app_settings.get("id_gestiune") or "").strip()
if id_gestiune_raw and id_gestiune_raw != "0":
id_gestiuni = [int(g) for g in id_gestiune_raw.split(",") if g.strip()]
else:
id_gestiuni = None # None = orice gestiune
logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}")
_log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}")
# Step 2b: Validate SKUs (reuse same connection)
all_skus = order_reader.get_all_skus(orders)
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus, conn, id_gestiuni)
importable, skipped = validation_service.classify_orders(orders, validation)
# ── Split importable into truly_importable vs already_in_roa ──
truly_importable = []
already_in_roa = []
for order in importable:
if order.number in existing_map:
already_in_roa.append(order)
else:
truly_importable.append(order)
_update_progress("validation",
f"{len(truly_importable)} new, {len(already_in_roa)} already imported, {len(skipped)} skipped",
0, len(truly_importable))
_log_line(run_id, f"Validare: {len(truly_importable)} noi, {len(already_in_roa)} deja importate, {len(skipped)} nemapate")
# Step 2c: Build SKU context from skipped orders
sku_context = {}
for order, missing_skus_list in skipped:
if order.billing.is_company and order.billing.company_name:
customer = order.billing.company_name
else:
ship_name = ""
if order.shipping:
ship_name = f"{order.shipping.lastname} {order.shipping.firstname}".strip()
customer = ship_name or f"{order.billing.lastname} {order.billing.firstname}"
for sku in missing_skus_list:
if sku not in sku_context:
sku_context[sku] = {"orders": [], "customers": []}
if order.number not in sku_context[sku]["orders"]:
sku_context[sku]["orders"].append(order.number)
if customer not in sku_context[sku]["customers"]:
sku_context[sku]["customers"].append(customer)
# Track missing SKUs with context
for sku in validation["missing"]:
product_name = ""
for order in orders:
for item in order.items:
if item.sku == sku:
product_name = item.name
break
if product_name:
break
ctx = sku_context.get(sku, {})
await sqlite_service.track_missing_sku(
sku, product_name,
order_count=len(ctx.get("orders", [])),
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
)
# Auto-resolve missing SKUs that now have mappings
resolved_skus = validation["mapped"] | validation["direct"]
if resolved_skus:
resolved_count = await sqlite_service.resolve_missing_skus_batch(resolved_skus)
if resolved_count:
_log_line(run_id, f"Auto-resolved {resolved_count} previously missing SKUs")
# Step 2d: Pre-validate prices for importable articles
if id_pol and (truly_importable or already_in_roa):
_update_progress("validation", "Validating prices...", 0, len(truly_importable))
_log_line(run_id, "Validare preturi...")
all_codmats = set()
for order in (truly_importable + already_in_roa):
for item in order.items:
if item.sku in validation["mapped"]:
pass
elif item.sku in validation["direct"]:
all_codmats.add(item.sku)
# Get standard VAT rate from settings for PROC_TVAV metadata
cota_tva = float(app_settings.get("discount_vat") or 21)
# Dual pricing policy support
id_pol_productie = int(app_settings.get("id_pol_productie") or 0) or None
codmat_policy_map = {}
if all_codmats:
if id_pol_productie:
# Dual-policy: classify articles by cont (sales vs production)
codmat_policy_map = await asyncio.to_thread(
validation_service.validate_and_ensure_prices_dual,
all_codmats, id_pol, id_pol_productie,
conn, validation.get("direct_id_map"),
cota_tva=cota_tva
)
_log_line(run_id,
f"Politici duale: {sum(1 for v in codmat_policy_map.values() if v == id_pol)} vanzare, "
f"{sum(1 for v in codmat_policy_map.values() if v == id_pol_productie)} productie")
else:
# Single-policy (backward compatible)
price_result = await asyncio.to_thread(
validation_service.validate_prices, all_codmats, id_pol,
conn, validation.get("direct_id_map")
)
if price_result["missing_price"]:
logger.info(
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
f"direct articles in policy {id_pol}"
)
await asyncio.to_thread(
validation_service.ensure_prices,
price_result["missing_price"], id_pol,
conn, validation.get("direct_id_map"),
cota_tva=cota_tva
)
# Also validate mapped SKU prices (cherry-pick 1)
mapped_skus_in_orders = set()
for order in (truly_importable + already_in_roa):
for item in order.items:
if item.sku in validation["mapped"]:
mapped_skus_in_orders.add(item.sku)
mapped_codmat_data = {}
if mapped_skus_in_orders:
mapped_codmat_data = await asyncio.to_thread(
validation_service.resolve_mapped_codmats, mapped_skus_in_orders, conn,
id_gestiuni=id_gestiuni
)
# Build id_map for mapped codmats and validate/ensure their prices
mapped_id_map = {}
for sku, entries in mapped_codmat_data.items():
for entry in entries:
mapped_id_map[entry["codmat"]] = {
"id_articol": entry["id_articol"],
"cont": entry.get("cont")
}
mapped_codmats = set(mapped_id_map.keys())
if mapped_codmats:
if id_pol_productie:
mapped_policy_map = await asyncio.to_thread(
validation_service.validate_and_ensure_prices_dual,
mapped_codmats, id_pol, id_pol_productie,
conn, mapped_id_map, cota_tva=cota_tva
)
codmat_policy_map.update(mapped_policy_map)
else:
mp_result = await asyncio.to_thread(
validation_service.validate_prices,
mapped_codmats, id_pol, conn, mapped_id_map
)
if mp_result["missing_price"]:
await asyncio.to_thread(
validation_service.ensure_prices,
mp_result["missing_price"], id_pol,
conn, mapped_id_map, cota_tva=cota_tva
)
# Add SKU → policy entries for mapped articles (1:1 and kits)
# codmat_policy_map has CODMAT keys, but build_articles_json
# looks up by GoMag SKU — bridge the gap here
if codmat_policy_map and mapped_codmat_data:
for sku, entries in mapped_codmat_data.items():
if len(entries) == 1:
# 1:1 mapping: SKU inherits the CODMAT's policy
codmat = entries[0]["codmat"]
if codmat in codmat_policy_map:
codmat_policy_map[sku] = codmat_policy_map[codmat]
# Pass codmat_policy_map to import via app_settings
if codmat_policy_map:
app_settings["_codmat_policy_map"] = codmat_policy_map
# ── Kit component price validation ──
kit_pricing_mode = app_settings.get("kit_pricing_mode")
if kit_pricing_mode and mapped_codmat_data:
id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None
kit_missing = await asyncio.to_thread(
validation_service.validate_kit_component_prices,
mapped_codmat_data, id_pol, id_pol_prod, conn
)
if kit_missing:
kit_skus_missing = set(kit_missing.keys())
for sku, missing_codmats in kit_missing.items():
_log_line(run_id, f"Kit {sku}: prețuri lipsă pentru {', '.join(missing_codmats)}")
new_truly = []
for order in truly_importable:
order_skus = {item.sku for item in order.items}
if order_skus & kit_skus_missing:
missing_list = list(order_skus & kit_skus_missing)
skipped.append((order, missing_list))
else:
new_truly.append(order)
truly_importable = new_truly
# Mode B config validation
if kit_pricing_mode == "separate_line":
if not app_settings.get("kit_discount_codmat"):
_log_line(run_id, "EROARE: Kit mode 'separate_line' dar kit_discount_codmat nu e configurat!")
finally:
await asyncio.to_thread(database.pool.release, conn)
# Step 3a: Record already-imported orders (batch)
already_imported_count = len(already_in_roa)
already_batch = []
for order in already_in_roa:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
id_comanda_roa = existing_map.get(order.number)
order_items_data = [
{"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price,
"baseprice": item.baseprice, "vat": item.vat,
"mapping_status": "mapped" if item.sku in validation["mapped"] else "direct",
"codmat": None, "id_articol": None, "cantitate_roa": None}
for item in order.items
]
already_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "ALREADY_IMPORTED", "status_at_run": "ALREADY_IMPORTED",
"id_comanda": id_comanda_roa, "id_partener": None,
"error_message": None, "missing_skus": None,
"items_count": len(order.items),
"shipping_name": shipping_name, "billing_name": billing_name,
"payment_method": payment_method, "delivery_method": delivery_method,
"order_total": order.total or None,
"delivery_cost": order.delivery_cost or None,
"discount_total": order.discount_total or None,
"web_status": order.status or None,
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})")
await sqlite_service.save_orders_batch(already_batch)
# Update GoMag addresses + recompute address_mismatch for already-imported orders
addr_updates = []
for order in already_in_roa:
addr_updates.append({
"order_number": order.number,
"adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None,
"adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}),
})
await sqlite_service.update_gomag_addresses_batch(addr_updates)
# Detect partner mismatches for already-imported orders
if already_in_roa:
stored_partner_data = await sqlite_service.get_orders_partner_data_batch(
[o.number for o in already_in_roa]
)
mismatch_map = {}
mismatch_updates = []
for order in already_in_roa:
stored = stored_partner_data.get(order.number, {})
stored_cf = stored.get("cod_fiscal_gomag")
new_data = import_service.determine_partner_data(order)
new_cf = new_data["cod_fiscal"]
def _strip_ro(cf):
if not cf:
return ""
# Strip optional "RO" prefix + any surrounding whitespace
return re.sub(r'^RO\s*', '', cf.strip().upper()).strip()
is_mismatch = False
if new_data["is_pj"] and new_cf and not stored_cf:
is_mismatch = True # PF→PJ (doar dacă are CUI — fără CUI nu putem confirma)
elif not new_data["is_pj"] and stored_cf:
is_mismatch = True # PJ→PF
elif new_data["is_pj"] and stored_cf and _strip_ro(new_cf) != _strip_ro(stored_cf):
is_mismatch = True # CUI schimbat
val = 1 if is_mismatch else 0
mismatch_map[order.number] = val
mismatch_updates.append({"order_number": order.number, "partner_mismatch": val})
await sqlite_service.update_partner_mismatch_batch(mismatch_updates)
# Clear stale mismatches for orders outside the current sync window
# that have no CUI stored (flagged by old code before the no-CUI fix)
current_batch_numbers = {o.number for o in already_in_roa}
cleared = await sqlite_service.clear_stale_partner_mismatches_no_cui(current_batch_numbers)
if cleared:
logger.info(f"Partner mismatch: cleared {cleared} stale no-CUI flags from previous sync window")
# Auto-resync uninvoiced orders with partner mismatch (max 5/cycle)
MAX_PARTNER_RESYNC_PER_CYCLE = 5
total_mismatched = sum(1 for v in mismatch_map.values() if v == 1)
logger.info(f"Partner mismatch detection: {len(already_in_roa)} orders checked, {total_mismatched} mismatches found")
mismatched_uninvoiced = [
o for o in already_in_roa
if mismatch_map.get(o.number) == 1
and not stored_partner_data.get(o.number, {}).get("factura_numar")
][:MAX_PARTNER_RESYNC_PER_CYCLE]
logger.info(f"Partner auto-resync: {len(mismatched_uninvoiced)} uninvoiced orders queued")
if mismatched_uninvoiced:
resync_ok = 0
for _order in mismatched_uninvoiced:
logger.info(f"Partner resync attempt: #{_order.number}")
try:
await _resync_partner_for_order(
order=_order,
stored=stored_partner_data.get(_order.number, {}),
app_settings=app_settings,
run_id=run_id,
)
resync_ok += 1
logger.info(f"Partner resync success: #{_order.number}")
except Exception as _e:
_log_line(run_id, f"#{_order.number} EROARE resync partener: {_e}")
logger.error(f"Partner resync error for {_order.number}: {_e}")
if resync_ok:
_log_line(run_id, f"Resync parteneri: {resync_ok} comenzi actualizate")
# Step 3b: Record skipped orders + store items (batch)
skipped_count = len(skipped)
skipped_batch = []
for order, missing_skus in skipped:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
order_items_data = [
{"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price,
"baseprice": item.baseprice, "vat": item.vat,
"mapping_status": "missing" if item.sku in validation["missing"] else
"mapped" if item.sku in validation["mapped"] else "direct",
"codmat": None, "id_articol": None, "cantitate_roa": None}
for item in order.items
]
skipped_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "SKIPPED", "status_at_run": "SKIPPED",
"id_comanda": None, "id_partener": None,
"error_message": None, "missing_skus": missing_skus,
"items_count": len(order.items),
"shipping_name": shipping_name, "billing_name": billing_name,
"payment_method": payment_method, "delivery_method": delivery_method,
"order_total": order.total or None,
"delivery_cost": order.delivery_cost or None,
"discount_total": order.discount_total or None,
"web_status": order.status or None,
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
await sqlite_service.save_orders_batch(skipped_batch)
# ── Price sync from orders ──
if app_settings.get("price_sync_enabled") == "1":
try:
all_sync_orders = truly_importable + already_in_roa
direct_id_map = validation.get("direct_id_map", {})
id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None
price_updates = await asyncio.to_thread(
validation_service.sync_prices_from_order,
all_sync_orders, mapped_codmat_data,
direct_id_map, codmat_policy_map, id_pol,
id_pol_productie=id_pol_prod,
settings=app_settings
)
if price_updates:
_log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate")
for pu in price_updates:
_log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f}{pu['new_price']:.2f}")
except Exception as e:
_log_line(run_id, f"Eroare sync prețuri din comenzi: {e}")
logger.error(f"Price sync error: {e}")
_update_progress("skipped", f"Skipped {skipped_count}",
0, len(truly_importable),
{"imported": 0, "skipped": skipped_count, "errors": 0, "already_imported": already_imported_count})
# ANAF cache pre-population: CUIs from last 3 months with expired/missing cache
try:
prepop_cuis = await sqlite_service.get_expired_cuis_for_prepopulate()
if prepop_cuis:
_log_line(run_id, f"ANAF pre-populare: {len(prepop_cuis)} CUI-uri cu cache expirat")
prepop_results = await anaf_service.check_vat_status_batch(
prepop_cuis, log_fn=lambda msg: _log_line(run_id, msg)
)
if prepop_results:
await sqlite_service.bulk_populate_anaf_cache(prepop_results)
_log_line(run_id, f"ANAF pre-populare: {len(prepop_results)} rezultate stocate")
else:
_log_line(run_id, "ANAF pre-populare: cache complet")
except Exception as e:
_log_line(run_id, f"ANAF pre-populare eroare: {e}")
logger.warning(f"ANAF cache pre-population failed: {e}")
# Step 4: ANAF batch verification for company CUIs (RO companies only)
company_cuis = set()
for order in truly_importable:
is_ro = (order.billing.country or "").strip().lower() == "romania"
if order.billing.is_company and order.billing.company_code and is_ro:
raw_cf = import_service.clean_web_text(order.billing.company_code) or ""
bare, _ = anaf_service.sanitize_cui(raw_cf)
if anaf_service.validate_cui(bare):
company_cuis.add(bare)
# Check anaf_cache for already-known CUIs (7-day validity)
uncached_cuis = []
cached_results = {}
for cui in company_cuis:
cached = await sqlite_service.get_anaf_cache(cui)
if cached:
cached_results[cui] = cached
else:
uncached_cuis.append(cui)
# Batch ANAF call for uncached CUIs only
if uncached_cuis:
_log_line(run_id, f"ANAF: verificare {len(uncached_cuis)} CUI-uri noi...")
anaf_results = await anaf_service.check_vat_status_batch(
uncached_cuis, log_fn=lambda msg: _log_line(run_id, msg)
)
if anaf_results:
await sqlite_service.bulk_populate_anaf_cache(anaf_results)
cached_results.update(anaf_results)
else:
_log_line(run_id, "ANAF: batch call esuat, continua fara corectie CUI")
# Step 5: Import only truly new orders
imported_count = 0
error_count = 0
for i, order in enumerate(truly_importable):
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
_update_progress("import",
f"Import {i+1}/{len(truly_importable)}: #{order.number} {customer}",
i + 1, len(truly_importable),
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
"already_imported": already_imported_count})
# Determine cod_fiscal override from ANAF data
cod_fiscal_override = None
anaf_data_for_order = None
raw_cf = ""
if order.billing.is_company and order.billing.company_code:
raw_cf = import_service.clean_web_text(order.billing.company_code) or ""
bare_cui, cui_warning = anaf_service.sanitize_cui(raw_cf)
if cui_warning:
_log_line(run_id, f"#{order.number} WARN: {cui_warning}")
anaf_data_for_order = cached_results.get(bare_cui)
if anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None:
correct_cf = anaf_service.determine_correct_cod_fiscal(bare_cui, anaf_data_for_order["scpTVA"])
if correct_cf != raw_cf:
_log_line(run_id, f"#{order.number} CUI corectat: {raw_cf}{correct_cf}")
cod_fiscal_override = correct_cf
# Determine strict search mode: only when RO company + ANAF data available
is_ro_company = (order.billing.is_company
and (order.billing.country or "").strip().lower() == "romania")
anaf_strict = None
if is_ro_company and anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None:
anaf_strict = 1 # ANAF data available → strict search
result = await asyncio.to_thread(
import_service.import_single_order,
order, id_pol=id_pol, id_sectie=id_sectie,
app_settings=app_settings, id_gestiuni=id_gestiuni,
cod_fiscal_override=cod_fiscal_override,
anaf_strict=anaf_strict
)
# Build order items data for storage (R9)
order_items_data = []
for item in order.items:
ms = "mapped" if item.sku in validation["mapped"] else "direct"
order_items_data.append({
"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price,
"baseprice": item.baseprice, "vat": item.vat,
"mapping_status": ms, "codmat": None, "id_articol": None,
"cantitate_roa": None
})
# Compute discount split for SQLite storage
ds = import_service.compute_discount_split(order, app_settings)
discount_split_json = json.dumps(ds) if ds else None
if result["success"]:
imported_count += 1
await sqlite_service.upsert_order(
sync_run_id=run_id,
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="IMPORTED",
id_comanda=result["id_comanda"],
id_partener=result["id_partener"],
items_count=len(order.items),
shipping_name=shipping_name,
billing_name=billing_name,
payment_method=payment_method,
delivery_method=delivery_method,
order_total=order.total or None,
delivery_cost=order.delivery_cost or None,
discount_total=order.discount_total or None,
web_status=order.status or None,
discount_split=discount_split_json,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "IMPORTED")
# Store ROA address IDs (R9)
await sqlite_service.update_import_order_addresses(
order.number,
id_adresa_facturare=result.get("id_adresa_facturare"),
id_adresa_livrare=result.get("id_adresa_livrare")
)
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → IMPORTAT (ID: {result['id_comanda']})")
# Save partner + ANAF + address data to SQLite
if result["success"] or result.get("id_partener"):
partner_data = {
"cod_fiscal_gomag": raw_cf if order.billing.is_company else None,
"cod_fiscal_roa": result.get("cod_fiscal_roa"),
"denumire_roa": result.get("denumire_roa"),
"anaf_platitor_tva": (1 if anaf_data_for_order.get("scpTVA") else 0) if anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None else None,
"anaf_checked_at": anaf_data_for_order.get("checked_at") if anaf_data_for_order else None,
"anaf_cod_fiscal_adjusted": 1 if (
cod_fiscal_override
and result.get("cod_fiscal_roa")
and anaf_service.strip_ro_prefix(result["cod_fiscal_roa"]) == anaf_service.strip_ro_prefix(raw_cf)
and result["cod_fiscal_roa"].strip().upper().replace("RO ", "RO") != raw_cf.strip().upper().replace("RO ", "RO")
) else 0,
"adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None,
"adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}),
"adresa_livrare_roa": json.dumps(result.get("adresa_livrare_roa")) if result.get("adresa_livrare_roa") else None,
"adresa_facturare_roa": json.dumps(result.get("adresa_facturare_roa")) if result.get("adresa_facturare_roa") else None,
"anaf_denumire_mismatch": 0,
"denumire_anaf": None,
}
# Denomination mismatch check
if anaf_data_for_order and anaf_data_for_order.get("denumire_anaf") and order.billing.is_company:
norm_gomag = anaf_service.normalize_company_name(order.billing.company_name or "")
norm_anaf = anaf_service.normalize_company_name(anaf_data_for_order["denumire_anaf"])
if norm_gomag and norm_anaf and norm_gomag != norm_anaf:
partner_data["anaf_denumire_mismatch"] = 1
partner_data["denumire_anaf"] = anaf_data_for_order["denumire_anaf"]
# Address mismatch check (server-side, mirrors JS addrMatch)
livr_match = _addr_match(partner_data.get("adresa_livrare_gomag"), partner_data.get("adresa_livrare_roa"))
fact_match = _addr_match(partner_data.get("adresa_facturare_gomag"), partner_data.get("adresa_facturare_roa"))
partner_data["address_mismatch"] = 1 if (not livr_match or not fact_match) else 0
await sqlite_service.update_order_partner_data(order.number, partner_data)
if not result["success"]:
error_count += 1
await sqlite_service.upsert_order(
sync_run_id=run_id,
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="ERROR",
id_partener=result.get("id_partener"),
error_message=result["error"],
items_count=len(order.items),
shipping_name=shipping_name,
billing_name=billing_name,
payment_method=payment_method,
delivery_method=delivery_method,
order_total=order.total or None,
delivery_cost=order.delivery_cost or None,
discount_total=order.discount_total or None,
web_status=order.status or None,
discount_split=discount_split_json,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "ERROR")
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → EROARE: {result['error']}")
# Safety: stop if too many errors
if error_count > 10:
logger.warning("Too many errors, stopping sync")
break
# Step 4b: Invoice & order status check — sync with Oracle
_update_progress("invoices", "Checking invoices & order status...", 0, 0)
invoices_updated = 0
invoices_cleared = 0
orders_deleted = 0
try:
# 4b-1: Uninvoiced → check for new invoices
uninvoiced = await sqlite_service.get_uninvoiced_imported_orders()
if uninvoiced:
id_comanda_list = [o["id_comanda"] for o in uninvoiced]
invoice_data = await asyncio.to_thread(
invoice_service.check_invoices_for_orders, id_comanda_list
)
id_to_order = {o["id_comanda"]: o["order_number"] for o in uninvoiced}
for idc, inv in invoice_data.items():
order_num = id_to_order.get(idc)
if order_num and inv.get("facturat"):
await sqlite_service.update_order_invoice(
order_num,
serie=inv.get("serie_act"),
numar=str(inv.get("numar_act", "")),
total_fara_tva=inv.get("total_fara_tva"),
total_tva=inv.get("total_tva"),
total_cu_tva=inv.get("total_cu_tva"),
data_act=inv.get("data_act"),
)
invoices_updated += 1
# 4b-2: Invoiced → check for deleted invoices
invoiced = await sqlite_service.get_invoiced_imported_orders()
if invoiced:
id_comanda_list = [o["id_comanda"] for o in invoiced]
invoice_data = await asyncio.to_thread(
invoice_service.check_invoices_for_orders, id_comanda_list
)
for o in invoiced:
if o["id_comanda"] not in invoice_data:
await sqlite_service.clear_order_invoice(o["order_number"])
invoices_cleared += 1
# 4b-3: All imported → check for deleted orders in ROA
all_imported = await sqlite_service.get_all_imported_orders()
if all_imported:
id_comanda_list = [o["id_comanda"] for o in all_imported]
existing_ids = await asyncio.to_thread(
invoice_service.check_orders_exist, id_comanda_list
)
for o in all_imported:
if o["id_comanda"] not in existing_ids:
await sqlite_service.mark_order_deleted_in_roa(o["order_number"])
orders_deleted += 1
if invoices_updated:
_log_line(run_id, f"Facturi noi: {invoices_updated} comenzi facturate")
if invoices_cleared:
_log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache")
if orders_deleted:
_log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}")
except Exception as e:
logger.warning(f"Invoice/order status check failed: {e}")
# Step 4c: ANAF backfill — populate anaf_platitor_tva for orders with CUI but no ANAF data
try:
orders_needing_anaf = await sqlite_service.get_orders_missing_anaf()
if orders_needing_anaf:
# Group orders by unique CUI
from collections import defaultdict
cui_to_orders = defaultdict(list)
for o in orders_needing_anaf:
bare = anaf_service.strip_ro_prefix(o["cod_fiscal_roa"])
if anaf_service.validate_cui(bare):
cui_to_orders[bare].append(o)
# Batch cache lookup
unique_cuis = list(cui_to_orders.keys())
anaf_cache = await sqlite_service.get_anaf_cache_batch(unique_cuis)
# Single ANAF API call for uncached CUIs
uncached = [c for c in unique_cuis if c not in anaf_cache]
if uncached:
fresh = await anaf_service.check_vat_status_batch(uncached)
if fresh:
await sqlite_service.bulk_populate_anaf_cache(fresh)
anaf_cache.update(fresh)
# Build batch updates
db_updates = []
for cui, orders_for_cui in cui_to_orders.items():
data = anaf_cache.get(cui)
if not data or data.get("scpTVA") is None:
continue
platitor = 1 if data["scpTVA"] else 0
checked_at = data.get("checked_at")
denumire_anaf = data.get("denumire_anaf") or ""
for o in orders_for_cui:
mismatch = 0
den_store = None
if denumire_anaf:
norm_roa = anaf_service.normalize_company_name(o.get("denumire_roa") or o.get("customer_name") or "")
norm_anaf = anaf_service.normalize_company_name(denumire_anaf)
if norm_roa and norm_anaf and norm_roa != norm_anaf:
mismatch = 1
den_store = denumire_anaf
db_updates.append((platitor, checked_at, mismatch, den_store, o["order_number"]))
await sqlite_service.bulk_update_order_anaf_data(db_updates)
if db_updates:
_log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate")
except Exception as e:
logger.warning(f"ANAF backfill failed: {e}")
_log_line(run_id, f"ANAF backfill eroare: {e}")
# Step 5: Update sync run
total_imported = imported_count + already_imported_count # backward-compat
status = "completed" if error_count <= 10 else "failed"
await sqlite_service.update_sync_run(
run_id, status, len(orders), total_imported, len(skipped), error_count,
already_imported=already_imported_count, new_imported=imported_count
)
summary = {
"run_id": run_id,
"status": status,
"json_files": json_count,
"total_orders": len(orders) + cancelled_count,
"new_orders": len(truly_importable),
"imported": total_imported,
"new_imported": imported_count,
"already_imported": already_imported_count,
"skipped": len(skipped),
"errors": error_count,
"cancelled": cancelled_count,
"missing_skus": len(validation["missing"]),
"invoices_updated": invoices_updated,
"invoices_cleared": invoices_cleared,
"orders_deleted_in_roa": orders_deleted,
}
_update_progress("completed",
f"Completed: {imported_count} new, {already_imported_count} already, {len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled",
len(truly_importable), len(truly_importable),
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
"already_imported": already_imported_count, "cancelled": cancelled_count})
if _current_sync:
_current_sync["status"] = status
_current_sync["finished_at"] = _now().isoformat()
logger.info(
f"Sync {run_id} completed: {imported_count} new, {already_imported_count} already imported, "
f"{len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled"
)
duration = (_now() - started_dt).total_seconds()
_log_line(run_id, "")
cancelled_text = f", {cancelled_count} anulate" if cancelled_count else ""
_run_logs[run_id].append(
f"Finalizat: {imported_count} importate, {already_imported_count} deja importate, "
f"{len(skipped)} nemapate, {error_count} erori{cancelled_text} din {len(orders) + cancelled_count} comenzi | Durata: {int(duration)}s"
)
return summary
except Exception as e:
logger.error(f"Sync {run_id} failed: {e}")
_log_line(run_id, f"EROARE FATALA: {e}")
await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e))
if _current_sync:
_current_sync["status"] = "failed"
_current_sync["finished_at"] = _now().isoformat()
_current_sync["error"] = str(e)
return {"run_id": run_id, "status": "failed", "error": str(e)}
finally:
# Keep _current_sync for 10 seconds so status endpoint can show final result
async def _clear_current_sync():
await asyncio.sleep(10)
global _current_sync
_current_sync = None
asyncio.ensure_future(_clear_current_sync())
async def _clear_run_logs():
await asyncio.sleep(300) # 5 minutes
_run_logs.pop(run_id, None)
asyncio.ensure_future(_clear_run_logs())
def stop_sync():
"""Signal sync to stop. Currently sync runs to completion."""
pass
async def _resync_partner_for_order(order, stored: dict, app_settings: dict, run_id: str) -> None:
"""Resync partner for a single already-imported uninvoiced order.
Safety: double-checks factura_numar before Oracle call.
Reads existing comanda row and calls PACK_COMENZI.modifica_comanda.
"""
import oracledb
order_number = order.number
id_comanda = stored.get("id_comanda")
if not id_comanda:
_log_line(run_id, f"#{order_number} SKIP resync partener: id_comanda lipsa")
return
# Double-check factura_numar — may have been invoiced since mismatch detection
current_detail = await sqlite_service.get_order_detail(order_number)
if current_detail and current_detail.get("order", {}).get("factura_numar"):
_log_line(run_id, f"#{order_number} SKIP resync partener: comanda facturata in tranzit")
return
old_partner_id = stored.get("id_partener")
old_partner_name = stored.get("denumire_roa") or "?"
new_partner_data = import_service.determine_partner_data(order)
# ANAF check for PF→PJ transition
cod_fiscal_override = None
anaf_data = None
if new_partner_data["is_pj"] and new_partner_data["cod_fiscal"]:
raw_cf = new_partner_data["cod_fiscal"]
bare_cui, _ = anaf_service.sanitize_cui(raw_cf)
if bare_cui:
anaf_data = await sqlite_service.get_anaf_cache(bare_cui)
if not anaf_data:
try:
fresh = await anaf_service.check_vat_status_batch([bare_cui])
if fresh:
await sqlite_service.bulk_populate_anaf_cache(fresh)
anaf_data = fresh.get(bare_cui)
except Exception as e:
logger.warning(f"ANAF check failed for {bare_cui}: {e}")
if anaf_data and anaf_data.get("scpTVA") is not None:
cod_fiscal_override = anaf_service.determine_correct_cod_fiscal(
bare_cui, anaf_data["scpTVA"]
)
def _do_resync():
if database.pool is None:
raise RuntimeError("Oracle pool not initialized")
conn = database.pool.acquire()
try:
with conn.cursor() as cur:
# Create/find partner
id_partener_var = cur.var(oracledb.DB_TYPE_NUMBER)
anaf_strict = 1 if (anaf_data and anaf_data.get("scpTVA") is not None) else None
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_partener", [
cod_fiscal_override or new_partner_data["cod_fiscal"],
new_partner_data["denumire"],
new_partner_data["registru"],
new_partner_data["is_pj"],
anaf_strict,
id_partener_var,
])
new_partner_id = id_partener_var.getvalue()
if not new_partner_id or new_partner_id <= 0:
raise RuntimeError(f"Partner creation failed for {new_partner_data['denumire']}")
new_partner_id = int(new_partner_id)
# Same partner — just clear mismatch
if new_partner_id == (old_partner_id or -1):
return {"same_partner": True, "new_partner_id": new_partner_id}
# Get new partner details for audit log
cur.execute(
"SELECT denumire, cod_fiscal FROM nom_parteneri WHERE id_part = :1",
[new_partner_id]
)
row = cur.fetchone()
new_partner_name = row[0] if row else new_partner_data["denumire"]
new_cod_fiscal_roa = row[1] if row else None
# Create addresses under new partner
addr_livr_id = None
shipping_addr = None
if order.shipping:
id_adresa_livr = cur.var(oracledb.DB_TYPE_NUMBER)
shipping_addr = import_service.format_address_for_oracle(
order.shipping.address, order.shipping.city, order.shipping.region
)
shipping_phone = order.shipping.phone or order.billing.phone or ""
shipping_email = order.shipping.email or order.billing.email or ""
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_adresa", [
new_partner_id, shipping_addr, shipping_phone, shipping_email, id_adresa_livr
])
addr_livr_id = id_adresa_livr.getvalue()
if addr_livr_id is None:
raise RuntimeError(f"Shipping address creation failed for partner {new_partner_id}")
addr_livr_id = int(addr_livr_id)
billing_name_str = import_service.clean_web_text(
f"{order.billing.lastname} {order.billing.firstname}"
).strip().upper()
ship_name_str = ""
if order.shipping:
ship_name_str = import_service.clean_web_text(
f"{order.shipping.lastname} {order.shipping.firstname}"
).strip().upper()
different_person = bool(ship_name_str and billing_name_str and ship_name_str != billing_name_str)
if different_person and addr_livr_id:
addr_fact_id = addr_livr_id
else:
billing_addr = import_service.format_address_for_oracle(
order.billing.address, order.billing.city, order.billing.region
)
if addr_livr_id and order.shipping and billing_addr == shipping_addr:
addr_fact_id = addr_livr_id
else:
id_adresa_fact = cur.var(oracledb.DB_TYPE_NUMBER)
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_adresa", [
new_partner_id, billing_addr,
order.billing.phone or "",
order.billing.email or "",
id_adresa_fact,
])
addr_fact_id = id_adresa_fact.getvalue()
if addr_fact_id is None:
raise RuntimeError(f"Billing address creation failed for partner {new_partner_id}")
addr_fact_id = int(addr_fact_id)
# Read existing comanda row for modifica_comanda params
cur.execute("""
SELECT nr_comanda, data_comanda, data_livrare, proc_discount,
interna, id_util_um, id_codclient, comanda_externa, id_ctr
FROM comenzi WHERE id_comanda = :1
""", [id_comanda])
row = cur.fetchone()
if not row:
raise RuntimeError(f"Comanda {id_comanda} not found in Oracle")
nr_comanda, data_comanda, data_livrare, proc_discount, interna, id_util_um, id_codclient, comanda_externa, id_ctr = row
cur.callproc("PACK_COMENZI.modifica_comanda", [
id_comanda,
nr_comanda,
data_comanda,
new_partner_id,
data_livrare,
proc_discount,
interna,
id_util_um,
addr_fact_id,
addr_livr_id,
id_codclient,
comanda_externa,
id_ctr,
])
conn.commit()
return {
"same_partner": False,
"new_partner_id": new_partner_id,
"new_partner_name": new_partner_name,
"new_cod_fiscal_roa": new_cod_fiscal_roa,
}
except Exception:
try:
conn.rollback()
except Exception:
pass
raise
finally:
database.pool.release(conn)
resync_result = await asyncio.to_thread(_do_resync)
if resync_result.get("same_partner"):
# Update cod_fiscal_gomag so next detection doesn't re-flag this order
await sqlite_service.update_partner_resync_data(order_number, {
"id_partener": resync_result["new_partner_id"],
"cod_fiscal_gomag": cod_fiscal_override or new_partner_data["cod_fiscal"],
"cod_fiscal_roa": None,
"denumire_roa": stored.get("denumire_roa"),
"partner_mismatch": 0,
})
_log_line(run_id, f"#{order_number} RESYNC: partener neschimbat, mismatch cleared")
else:
new_partner_id = resync_result["new_partner_id"]
new_partner_name = resync_result.get("new_partner_name", "?")
new_cod_fiscal_roa = resync_result.get("new_cod_fiscal_roa")
await sqlite_service.update_partner_resync_data(order_number, {
"id_partener": new_partner_id,
"cod_fiscal_gomag": cod_fiscal_override or new_partner_data["cod_fiscal"],
"cod_fiscal_roa": new_cod_fiscal_roa,
"denumire_roa": new_partner_name,
"partner_mismatch": 0,
})
_log_line(
run_id,
f"#{order_number} RESYNC partener: {old_partner_id} ({old_partner_name}) → {new_partner_id} ({new_partner_name})"
)