Centralized order status values in api/app/constants.py via a str-valued Enum so comparisons keep working. Replaced literals in: - services: sync_service, sqlite_service, retry_service - routers: sync, dashboard - templates: dashboard.html, logs.html - static JS: shared (ORDER_STATUS mirror), dashboard, logs - tests: requirements, order_items_overwrite, business_rules MALFORMED intentionally NOT added — introduced in follow-up PR2 (per-order failure isolation). Full test suite: 231 unit + 33 e2e pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1384 lines
72 KiB
Python
1384 lines
72 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
|
|
_tz_bucharest = ZoneInfo("Europe/Bucharest")
|
|
|
|
|
|
def _now():
|
|
"""Return current time in Bucharest timezone (naive, for display/storage)."""
|
|
return datetime.now(_tz_bucharest).replace(tzinfo=None)
|
|
|
|
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client, anaf_service
|
|
from ..config import settings
|
|
from .. import database
|
|
from ..constants import OrderStatus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _addr_match(gomag_json, roa_json):
|
|
"""Server-side address comparison matching JS addrMatch()."""
|
|
if not gomag_json or not roa_json:
|
|
return True
|
|
try:
|
|
g = json.loads(gomag_json) if isinstance(gomag_json, str) else gomag_json
|
|
r = json.loads(roa_json) if isinstance(roa_json, str) else roa_json
|
|
except (json.JSONDecodeError, TypeError):
|
|
return True
|
|
_ADDR_WORDS = re.compile(
|
|
r'\bSECTORUL\s*\d*'
|
|
r'|\b(STR|STRADA|NR|NUMAR|NUMARUL|BL|BLOC|SC|SCARA|AP|APART|APARTAMENT|'
|
|
r'ET|ETAJ|COM|COMUNA|SAT|MUN|MUNICIPIUL|JUD|JUDETUL|CARTIER|PARTER|SECTOR|SECTORUL|ORAS)(?:\b|(?=\d))'
|
|
)
|
|
def norm(s):
|
|
s = (s or '').translate(import_service._DIACRITICS).upper()
|
|
s = _ADDR_WORDS.sub('', s)
|
|
return re.sub(r'[^A-Z0-9]', '', s)
|
|
def _soundex(s):
|
|
"""SOUNDEX matching Oracle's implementation — for city fuzzy compare."""
|
|
if not s:
|
|
return ''
|
|
_code = {'B':'1','F':'1','P':'1','V':'1',
|
|
'C':'2','G':'2','J':'2','K':'2','Q':'2','S':'2','X':'2','Z':'2',
|
|
'D':'3','T':'3','L':'4','M':'5','N':'5','R':'6'}
|
|
result = s[0]
|
|
prev = _code.get(s[0], '0')
|
|
for c in s[1:]:
|
|
if len(result) >= 4:
|
|
break
|
|
if c in 'AEIOU':
|
|
prev = '0'
|
|
elif c not in 'HW':
|
|
d = _code.get(c, '')
|
|
if d and d != prev:
|
|
result += d
|
|
if d:
|
|
prev = d
|
|
return result.ljust(4, '0')
|
|
g_street = norm(g.get('address') or g.get('strada') or '')
|
|
r_street = norm((r.get('strada') or '') + (r.get('numar') or '') + (r.get('bloc') or '') + (r.get('scara') or '') + (r.get('etaj') or '') + (r.get('apart') or ''))
|
|
g_city = norm(g.get('city') or g.get('localitate') or '')
|
|
r_city = norm(r.get('localitate') or '')
|
|
g_region = norm(g.get('region') or g.get('judet') or '')
|
|
r_region = norm(r.get('judet') or '')
|
|
return g_street == r_street and _soundex(g_city) == _soundex(r_city) and g_region == r_region
|
|
|
|
|
|
# Sync state
|
|
_sync_lock = asyncio.Lock()
|
|
_current_sync = None # dict with run_id, status, progress info
|
|
|
|
# In-memory text log buffer per run
|
|
_run_logs: dict[str, list[str]] = {}
|
|
|
|
|
|
def _log_line(run_id: str, message: str):
|
|
"""Append a timestamped line to the in-memory log buffer."""
|
|
if run_id not in _run_logs:
|
|
_run_logs[run_id] = []
|
|
ts = _now().strftime("%H:%M:%S")
|
|
_run_logs[run_id].append(f"[{ts}] {message}")
|
|
|
|
|
|
def get_run_text_log(run_id: str) -> str | None:
|
|
"""Return the accumulated text log for a run, or None if not found."""
|
|
lines = _run_logs.get(run_id)
|
|
if lines is None:
|
|
return None
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _update_progress(phase: str, phase_text: str, current: int = 0, total: int = 0,
|
|
counts: dict = None):
|
|
"""Update _current_sync with progress details for polling."""
|
|
global _current_sync
|
|
if _current_sync is None:
|
|
return
|
|
_current_sync["phase"] = phase
|
|
_current_sync["phase_text"] = phase_text
|
|
_current_sync["progress_current"] = current
|
|
_current_sync["progress_total"] = total
|
|
_current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0}
|
|
|
|
|
|
async def get_sync_status():
|
|
"""Get current sync status."""
|
|
if _current_sync:
|
|
return {**_current_sync}
|
|
return {"status": "idle"}
|
|
|
|
|
|
async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict:
|
|
"""Prepare a sync run - creates run_id and sets initial state.
|
|
Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running.
|
|
"""
|
|
global _current_sync
|
|
if _sync_lock.locked():
|
|
return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None}
|
|
|
|
run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
|
|
_current_sync = {
|
|
"run_id": run_id,
|
|
"status": "running",
|
|
"started_at": _now().isoformat(),
|
|
"finished_at": None,
|
|
"phase": "starting",
|
|
"phase_text": "Starting...",
|
|
"progress_current": 0,
|
|
"progress_total": 0,
|
|
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0},
|
|
}
|
|
return {"run_id": run_id, "status": "starting"}
|
|
|
|
|
|
def _derive_customer_info(order):
|
|
"""Extract shipping/billing names and customer from an order.
|
|
customer = who appears on the invoice (partner in ROA):
|
|
- company name if billing is on a company
|
|
- shipping person name otherwise (consistent with import_service partner logic)
|
|
"""
|
|
shipping_name = ""
|
|
if order.shipping:
|
|
shipping_name = f"{getattr(order.shipping, 'lastname', '') or ''} {getattr(order.shipping, 'firstname', '') or ''}".strip()
|
|
billing_name = f"{getattr(order.billing, 'lastname', '') or ''} {getattr(order.billing, 'firstname', '') or ''}".strip()
|
|
if not shipping_name:
|
|
shipping_name = billing_name
|
|
if order.billing.is_company and order.billing.company_name:
|
|
customer = order.billing.company_name
|
|
else:
|
|
customer = shipping_name or billing_name
|
|
payment_method = getattr(order, 'payment_name', None) or None
|
|
delivery_method = getattr(order, 'delivery_name', None) or None
|
|
return shipping_name.upper(), billing_name.upper(), customer.upper(), payment_method, delivery_method
|
|
|
|
|
|
async def _fix_stale_error_orders(existing_map: dict, run_id: str):
|
|
"""Fix orders stuck in ERROR status that are actually in Oracle.
|
|
|
|
This can happen when a previous import committed partially (no rollback on error).
|
|
If the order exists in Oracle COMENZI, update SQLite status to ALREADY_IMPORTED.
|
|
"""
|
|
from ..database import get_sqlite
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute(
|
|
f"SELECT order_number FROM orders WHERE status = '{OrderStatus.ERROR.value}'"
|
|
)
|
|
error_orders = [row["order_number"] for row in await cursor.fetchall()]
|
|
fixed = 0
|
|
for order_number in error_orders:
|
|
if order_number in existing_map:
|
|
id_comanda = existing_map[order_number]
|
|
await db.execute(f"""
|
|
UPDATE orders SET
|
|
status = '{OrderStatus.ALREADY_IMPORTED.value}',
|
|
id_comanda = ?,
|
|
error_message = NULL,
|
|
updated_at = datetime('now')
|
|
WHERE order_number = ? AND status = '{OrderStatus.ERROR.value}'
|
|
""", (id_comanda, order_number))
|
|
fixed += 1
|
|
_log_line(run_id, f"#{order_number} → status corectat ERROR → ALREADY_IMPORTED (ID: {id_comanda})")
|
|
if fixed:
|
|
await db.commit()
|
|
logger.info(f"Fixed {fixed} stale ERROR orders that exist in Oracle")
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict:
|
|
"""Run a full sync cycle. Returns summary dict."""
|
|
global _current_sync
|
|
|
|
if _sync_lock.locked():
|
|
return {"error": "Sync already running"}
|
|
|
|
async with _sync_lock:
|
|
# Use provided run_id or generate one
|
|
if not run_id:
|
|
run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
|
|
_current_sync = {
|
|
"run_id": run_id,
|
|
"status": "running",
|
|
"started_at": _now().isoformat(),
|
|
"finished_at": None,
|
|
"phase": "reading",
|
|
"phase_text": "Reading JSON files...",
|
|
"progress_current": 0,
|
|
"progress_total": 0,
|
|
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0},
|
|
}
|
|
|
|
_update_progress("reading", "Reading JSON files...")
|
|
|
|
started_dt = _now()
|
|
_run_logs[run_id] = [
|
|
f"=== Sync Run {run_id} ===",
|
|
f"Inceput: {started_dt.strftime('%d.%m.%Y %H:%M:%S')}",
|
|
""
|
|
]
|
|
|
|
json_dir = settings.JSON_OUTPUT_DIR
|
|
|
|
try:
|
|
# Phase 0: Download orders from GoMag API
|
|
_update_progress("downloading", "Descărcare comenzi din GoMag API...")
|
|
_log_line(run_id, "Descărcare comenzi din GoMag API...")
|
|
# Read GoMag settings from SQLite (override config defaults)
|
|
dl_settings = await sqlite_service.get_app_settings()
|
|
gomag_key = dl_settings.get("gomag_api_key") or None
|
|
gomag_shop = dl_settings.get("gomag_api_shop") or None
|
|
gomag_days_str = dl_settings.get("gomag_order_days_back")
|
|
gomag_days = int(gomag_days_str) if gomag_days_str else None
|
|
gomag_limit_str = dl_settings.get("gomag_limit")
|
|
gomag_limit = int(gomag_limit_str) if gomag_limit_str else None
|
|
dl_result = await gomag_client.download_orders(
|
|
json_dir, log_fn=lambda msg: _log_line(run_id, msg),
|
|
api_key=gomag_key, api_shop=gomag_shop,
|
|
days_back=gomag_days, limit=gomag_limit,
|
|
)
|
|
if dl_result["files"]:
|
|
_log_line(run_id, f"GoMag: {dl_result['total']} comenzi în {dl_result['pages']} pagini → {len(dl_result['files'])} fișiere")
|
|
|
|
_update_progress("reading", "Citire fisiere JSON...")
|
|
_log_line(run_id, "Citire fisiere JSON...")
|
|
|
|
# Step 1: Read orders and sort chronologically (oldest first - R3)
|
|
orders, json_count = order_reader.read_json_orders()
|
|
orders.sort(key=lambda o: o.date or '')
|
|
await sqlite_service.create_sync_run(run_id, json_count)
|
|
_update_progress("reading", f"Found {len(orders)} orders in {json_count} files", 0, len(orders))
|
|
_log_line(run_id, f"Gasite {len(orders)} comenzi in {json_count} fisiere")
|
|
|
|
# Populate web_products catalog from all orders (R4)
|
|
web_product_items = [
|
|
(item.sku, item.name)
|
|
for order in orders
|
|
for item in order.items
|
|
if item.sku and item.name
|
|
]
|
|
await sqlite_service.upsert_web_products_batch(web_product_items)
|
|
|
|
if not orders:
|
|
_log_line(run_id, "Nicio comanda gasita.")
|
|
await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0)
|
|
_update_progress("completed", "No orders found")
|
|
summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count}
|
|
return summary
|
|
|
|
# ── Separate cancelled orders (GoMag status "Anulata" / statusId "7") ──
|
|
cancelled_orders = [o for o in orders if o.status_id == "7" or (o.status and o.status.lower() == "anulata")]
|
|
active_orders = [o for o in orders if o not in cancelled_orders]
|
|
cancelled_count = len(cancelled_orders)
|
|
|
|
if cancelled_orders:
|
|
_log_line(run_id, f"Comenzi anulate in GoMag: {cancelled_count}")
|
|
|
|
# Record cancelled orders in SQLite
|
|
cancelled_batch = []
|
|
for order in cancelled_orders:
|
|
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
|
order_items_data = [
|
|
{"sku": item.sku, "product_name": item.name,
|
|
"quantity": item.quantity, "price": item.price,
|
|
"baseprice": item.baseprice, "vat": item.vat,
|
|
"mapping_status": "unknown", "codmat": None,
|
|
"id_articol": None, "cantitate_roa": None}
|
|
for item in order.items
|
|
]
|
|
cancelled_batch.append({
|
|
"sync_run_id": run_id, "order_number": order.number,
|
|
"order_date": order.date, "customer_name": customer,
|
|
"status": OrderStatus.CANCELLED.value, "status_at_run": OrderStatus.CANCELLED.value,
|
|
"id_comanda": None, "id_partener": None,
|
|
"error_message": "Comanda anulata in GoMag",
|
|
"missing_skus": None,
|
|
"items_count": len(order.items),
|
|
"shipping_name": shipping_name, "billing_name": billing_name,
|
|
"payment_method": payment_method, "delivery_method": delivery_method,
|
|
"order_total": order.total or None,
|
|
"delivery_cost": order.delivery_cost or None,
|
|
"discount_total": order.discount_total or None,
|
|
"web_status": order.status or "Anulata",
|
|
"items": order_items_data,
|
|
})
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag")
|
|
|
|
await sqlite_service.save_orders_batch(cancelled_batch)
|
|
|
|
# Check if any cancelled orders were previously imported
|
|
from ..database import get_sqlite as _get_sqlite
|
|
db_check = await _get_sqlite()
|
|
try:
|
|
cancelled_numbers = [o.number for o in cancelled_orders]
|
|
placeholders = ",".join("?" for _ in cancelled_numbers)
|
|
cursor = await db_check.execute(f"""
|
|
SELECT order_number, id_comanda FROM orders
|
|
WHERE order_number IN ({placeholders})
|
|
AND id_comanda IS NOT NULL
|
|
AND status = '{OrderStatus.CANCELLED.value}'
|
|
""", cancelled_numbers)
|
|
previously_imported = [dict(r) for r in await cursor.fetchall()]
|
|
finally:
|
|
await db_check.close()
|
|
|
|
if previously_imported:
|
|
_log_line(run_id, f"Verificare {len(previously_imported)} comenzi anulate care erau importate in Oracle...")
|
|
# Check which have invoices
|
|
id_comanda_list = [o["id_comanda"] for o in previously_imported]
|
|
invoice_data = await asyncio.to_thread(
|
|
invoice_service.check_invoices_for_orders, id_comanda_list
|
|
)
|
|
|
|
for o in previously_imported:
|
|
idc = o["id_comanda"]
|
|
order_num = o["order_number"]
|
|
if idc in invoice_data:
|
|
# Invoiced — keep in Oracle, just log warning
|
|
_log_line(run_id,
|
|
f"#{order_num} → ANULAT dar FACTURAT (factura {invoice_data[idc].get('serie_act', '')}"
|
|
f"{invoice_data[idc].get('numar_act', '')}) — NU se sterge din Oracle")
|
|
# Update web_status but keep CANCELLED status (already set by batch above)
|
|
else:
|
|
# Not invoiced — soft-delete in Oracle
|
|
del_result = await asyncio.to_thread(
|
|
import_service.soft_delete_order_in_roa, idc
|
|
)
|
|
if del_result["success"]:
|
|
# Clear id_comanda via mark_order_cancelled
|
|
await sqlite_service.mark_order_cancelled(order_num, "Anulata")
|
|
_log_line(run_id,
|
|
f"#{order_num} → ANULAT + STERS din Oracle (ID: {idc}, "
|
|
f"{del_result['details_deleted']} detalii)")
|
|
else:
|
|
_log_line(run_id,
|
|
f"#{order_num} → ANULAT dar EROARE la stergere Oracle: {del_result['error']}")
|
|
|
|
orders = active_orders
|
|
|
|
if not orders:
|
|
_log_line(run_id, "Nicio comanda activa dupa filtrare anulate.")
|
|
await sqlite_service.update_sync_run(run_id, "completed", cancelled_count, 0, 0, 0)
|
|
_update_progress("completed", f"No active orders ({cancelled_count} cancelled)")
|
|
summary = {"run_id": run_id, "status": "completed",
|
|
"message": f"No active orders ({cancelled_count} cancelled)",
|
|
"json_files": json_count, "cancelled": cancelled_count}
|
|
return summary
|
|
|
|
_update_progress("validation", f"Validating {len(orders)} orders...", 0, len(orders))
|
|
|
|
# ── Single Oracle connection for entire validation phase ──
|
|
conn = await asyncio.to_thread(database.get_oracle_connection)
|
|
try:
|
|
# Step 2a: Find orders already in Oracle (date-range query)
|
|
order_dates = [o.date for o in orders if o.date]
|
|
if order_dates:
|
|
min_date_str = min(order_dates)
|
|
try:
|
|
min_date = datetime.strptime(min_date_str[:10], "%Y-%m-%d") - timedelta(days=1)
|
|
except (ValueError, TypeError):
|
|
min_date = _now() - timedelta(days=90)
|
|
else:
|
|
min_date = _now() - timedelta(days=90)
|
|
|
|
existing_map = await asyncio.to_thread(
|
|
validation_service.check_orders_in_roa, min_date, conn
|
|
)
|
|
|
|
# Step 2a-fix: Fix ERROR orders that are actually in Oracle
|
|
# (can happen if previous import committed partially without rollback)
|
|
await _fix_stale_error_orders(existing_map, run_id)
|
|
|
|
# Load app settings early (needed for id_gestiune in SKU validation)
|
|
app_settings = await sqlite_service.get_app_settings()
|
|
id_pol = id_pol or int(app_settings.get("id_pol") or 0) or settings.ID_POL
|
|
id_sectie = id_sectie or int(app_settings.get("id_sectie") or 0) or settings.ID_SECTIE
|
|
# Parse multi-gestiune CSV: "1,3" → [1, 3], "" → None
|
|
id_gestiune_raw = (app_settings.get("id_gestiune") or "").strip()
|
|
if id_gestiune_raw and id_gestiune_raw != "0":
|
|
id_gestiuni = [int(g) for g in id_gestiune_raw.split(",") if g.strip()]
|
|
else:
|
|
id_gestiuni = None # None = orice gestiune
|
|
logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}")
|
|
_log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}")
|
|
|
|
# Step 2b: Validate SKUs (reuse same connection)
|
|
all_skus = order_reader.get_all_skus(orders)
|
|
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus, conn, id_gestiuni)
|
|
importable, skipped = validation_service.classify_orders(orders, validation)
|
|
|
|
# ── Split importable into truly_importable vs already_in_roa ──
|
|
truly_importable = []
|
|
already_in_roa = []
|
|
for order in importable:
|
|
if order.number in existing_map:
|
|
already_in_roa.append(order)
|
|
else:
|
|
truly_importable.append(order)
|
|
|
|
_update_progress("validation",
|
|
f"{len(truly_importable)} new, {len(already_in_roa)} already imported, {len(skipped)} skipped",
|
|
0, len(truly_importable))
|
|
_log_line(run_id, f"Validare: {len(truly_importable)} noi, {len(already_in_roa)} deja importate, {len(skipped)} nemapate")
|
|
|
|
# Step 2c: Build SKU context from skipped orders
|
|
sku_context = {}
|
|
for order, missing_skus_list in skipped:
|
|
if order.billing.is_company and order.billing.company_name:
|
|
customer = order.billing.company_name
|
|
else:
|
|
ship_name = ""
|
|
if order.shipping:
|
|
ship_name = f"{order.shipping.lastname} {order.shipping.firstname}".strip()
|
|
customer = ship_name or f"{order.billing.lastname} {order.billing.firstname}"
|
|
for sku in missing_skus_list:
|
|
if sku not in sku_context:
|
|
sku_context[sku] = {"orders": [], "customers": []}
|
|
if order.number not in sku_context[sku]["orders"]:
|
|
sku_context[sku]["orders"].append(order.number)
|
|
if customer not in sku_context[sku]["customers"]:
|
|
sku_context[sku]["customers"].append(customer)
|
|
|
|
# Track missing SKUs with context
|
|
for sku in validation["missing"]:
|
|
product_name = ""
|
|
for order in orders:
|
|
for item in order.items:
|
|
if item.sku == sku:
|
|
product_name = item.name
|
|
break
|
|
if product_name:
|
|
break
|
|
ctx = sku_context.get(sku, {})
|
|
await sqlite_service.track_missing_sku(
|
|
sku, product_name,
|
|
order_count=len(ctx.get("orders", [])),
|
|
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
|
|
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
|
|
)
|
|
|
|
# Auto-resolve missing SKUs that now have mappings
|
|
resolved_skus = validation["mapped"] | validation["direct"]
|
|
if resolved_skus:
|
|
resolved_count = await sqlite_service.resolve_missing_skus_batch(resolved_skus)
|
|
if resolved_count:
|
|
_log_line(run_id, f"Auto-resolved {resolved_count} previously missing SKUs")
|
|
|
|
# Reconcile stale unresolved SKUs that got mappings outside the current JSON batch
|
|
rec = await validation_service.reconcile_unresolved_missing_skus(conn=conn)
|
|
if rec["resolved"]:
|
|
_log_line(run_id, f"Reconciliere: {rec['resolved']} SKU rezolvate suplimentar")
|
|
|
|
# Step 2d: Pre-validate prices for importable articles
|
|
if id_pol and (truly_importable or already_in_roa):
|
|
_update_progress("validation", "Validating prices...", 0, len(truly_importable))
|
|
_log_line(run_id, "Validare preturi...")
|
|
all_codmats = set()
|
|
for order in (truly_importable + already_in_roa):
|
|
for item in order.items:
|
|
if item.sku in validation["mapped"]:
|
|
pass
|
|
elif item.sku in validation["direct"]:
|
|
all_codmats.add(item.sku)
|
|
# Get standard VAT rate from settings for PROC_TVAV metadata
|
|
cota_tva = float(app_settings.get("discount_vat") or 21)
|
|
|
|
# Dual pricing policy support
|
|
id_pol_productie = int(app_settings.get("id_pol_productie") or 0) or None
|
|
codmat_policy_map = {}
|
|
|
|
if all_codmats:
|
|
if id_pol_productie:
|
|
# Dual-policy: classify articles by cont (sales vs production)
|
|
codmat_policy_map = await asyncio.to_thread(
|
|
validation_service.validate_and_ensure_prices_dual,
|
|
all_codmats, id_pol, id_pol_productie,
|
|
conn, validation.get("direct_id_map"),
|
|
cota_tva=cota_tva
|
|
)
|
|
_log_line(run_id,
|
|
f"Politici duale: {sum(1 for v in codmat_policy_map.values() if v == id_pol)} vanzare, "
|
|
f"{sum(1 for v in codmat_policy_map.values() if v == id_pol_productie)} productie")
|
|
else:
|
|
# Single-policy (backward compatible)
|
|
price_result = await asyncio.to_thread(
|
|
validation_service.validate_prices, all_codmats, id_pol,
|
|
conn, validation.get("direct_id_map")
|
|
)
|
|
if price_result["missing_price"]:
|
|
logger.info(
|
|
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
|
|
f"direct articles in policy {id_pol}"
|
|
)
|
|
await asyncio.to_thread(
|
|
validation_service.ensure_prices,
|
|
price_result["missing_price"], id_pol,
|
|
conn, validation.get("direct_id_map"),
|
|
cota_tva=cota_tva
|
|
)
|
|
|
|
# Also validate mapped SKU prices (cherry-pick 1)
|
|
mapped_skus_in_orders = set()
|
|
for order in (truly_importable + already_in_roa):
|
|
for item in order.items:
|
|
if item.sku in validation["mapped"]:
|
|
mapped_skus_in_orders.add(item.sku)
|
|
|
|
mapped_codmat_data = {}
|
|
if mapped_skus_in_orders:
|
|
mapped_codmat_data = await asyncio.to_thread(
|
|
validation_service.resolve_mapped_codmats, mapped_skus_in_orders, conn,
|
|
id_gestiuni=id_gestiuni
|
|
)
|
|
# Build id_map for mapped codmats and validate/ensure their prices
|
|
mapped_id_map = {}
|
|
for sku, entries in mapped_codmat_data.items():
|
|
for entry in entries:
|
|
mapped_id_map[entry["codmat"]] = {
|
|
"id_articol": entry["id_articol"],
|
|
"cont": entry.get("cont")
|
|
}
|
|
mapped_codmats = set(mapped_id_map.keys())
|
|
if mapped_codmats:
|
|
if id_pol_productie:
|
|
mapped_policy_map = await asyncio.to_thread(
|
|
validation_service.validate_and_ensure_prices_dual,
|
|
mapped_codmats, id_pol, id_pol_productie,
|
|
conn, mapped_id_map, cota_tva=cota_tva
|
|
)
|
|
codmat_policy_map.update(mapped_policy_map)
|
|
else:
|
|
mp_result = await asyncio.to_thread(
|
|
validation_service.validate_prices,
|
|
mapped_codmats, id_pol, conn, mapped_id_map
|
|
)
|
|
if mp_result["missing_price"]:
|
|
await asyncio.to_thread(
|
|
validation_service.ensure_prices,
|
|
mp_result["missing_price"], id_pol,
|
|
conn, mapped_id_map, cota_tva=cota_tva
|
|
)
|
|
|
|
# Add SKU → policy entries for mapped articles (1:1 and kits)
|
|
# codmat_policy_map has CODMAT keys, but build_articles_json
|
|
# looks up by GoMag SKU — bridge the gap here
|
|
if codmat_policy_map and mapped_codmat_data:
|
|
for sku, entries in mapped_codmat_data.items():
|
|
if len(entries) == 1:
|
|
# 1:1 mapping: SKU inherits the CODMAT's policy
|
|
codmat = entries[0]["codmat"]
|
|
if codmat in codmat_policy_map:
|
|
codmat_policy_map[sku] = codmat_policy_map[codmat]
|
|
|
|
# Pass codmat_policy_map to import via app_settings
|
|
if codmat_policy_map:
|
|
app_settings["_codmat_policy_map"] = codmat_policy_map
|
|
|
|
# ── Kit component price validation ──
|
|
kit_pricing_mode = app_settings.get("kit_pricing_mode")
|
|
if kit_pricing_mode and mapped_codmat_data:
|
|
id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None
|
|
kit_missing = await asyncio.to_thread(
|
|
validation_service.validate_kit_component_prices,
|
|
mapped_codmat_data, id_pol, id_pol_prod, conn
|
|
)
|
|
if kit_missing:
|
|
kit_skus_missing = set(kit_missing.keys())
|
|
for sku, missing_codmats in kit_missing.items():
|
|
_log_line(run_id, f"Kit {sku}: prețuri lipsă pentru {', '.join(missing_codmats)}")
|
|
new_truly = []
|
|
for order in truly_importable:
|
|
order_skus = {item.sku for item in order.items}
|
|
if order_skus & kit_skus_missing:
|
|
missing_list = list(order_skus & kit_skus_missing)
|
|
skipped.append((order, missing_list))
|
|
else:
|
|
new_truly.append(order)
|
|
truly_importable = new_truly
|
|
|
|
# Mode B config validation
|
|
if kit_pricing_mode == "separate_line":
|
|
if not app_settings.get("kit_discount_codmat"):
|
|
_log_line(run_id, "EROARE: Kit mode 'separate_line' dar kit_discount_codmat nu e configurat!")
|
|
finally:
|
|
await asyncio.to_thread(database.pool.release, conn)
|
|
|
|
# Step 3a: Record already-imported orders (batch)
|
|
already_imported_count = len(already_in_roa)
|
|
already_batch = []
|
|
for order in already_in_roa:
|
|
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
|
id_comanda_roa = existing_map.get(order.number)
|
|
order_items_data = [
|
|
{"sku": item.sku, "product_name": item.name,
|
|
"quantity": item.quantity, "price": item.price,
|
|
"baseprice": item.baseprice, "vat": item.vat,
|
|
"mapping_status": "mapped" if item.sku in validation["mapped"] else "direct",
|
|
"codmat": None, "id_articol": None, "cantitate_roa": None}
|
|
for item in order.items
|
|
]
|
|
already_batch.append({
|
|
"sync_run_id": run_id, "order_number": order.number,
|
|
"order_date": order.date, "customer_name": customer,
|
|
"status": OrderStatus.ALREADY_IMPORTED.value, "status_at_run": OrderStatus.ALREADY_IMPORTED.value,
|
|
"id_comanda": id_comanda_roa, "id_partener": None,
|
|
"error_message": None, "missing_skus": None,
|
|
"items_count": len(order.items),
|
|
"shipping_name": shipping_name, "billing_name": billing_name,
|
|
"payment_method": payment_method, "delivery_method": delivery_method,
|
|
"order_total": order.total or None,
|
|
"delivery_cost": order.delivery_cost or None,
|
|
"discount_total": order.discount_total or None,
|
|
"web_status": order.status or None,
|
|
"items": order_items_data,
|
|
})
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})")
|
|
await sqlite_service.save_orders_batch(already_batch)
|
|
|
|
# Update GoMag addresses + recompute address_mismatch for already-imported orders
|
|
addr_updates = []
|
|
for order in already_in_roa:
|
|
addr_updates.append({
|
|
"order_number": order.number,
|
|
"adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None,
|
|
"adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}),
|
|
})
|
|
await sqlite_service.update_gomag_addresses_batch(addr_updates)
|
|
|
|
# Detect partner mismatches for already-imported orders
|
|
if already_in_roa:
|
|
stored_partner_data = await sqlite_service.get_orders_partner_data_batch(
|
|
[o.number for o in already_in_roa]
|
|
)
|
|
mismatch_map = {}
|
|
mismatch_updates = []
|
|
for order in already_in_roa:
|
|
stored = stored_partner_data.get(order.number, {})
|
|
stored_cf = stored.get("cod_fiscal_gomag")
|
|
new_data = import_service.determine_partner_data(order)
|
|
new_cf = new_data["cod_fiscal"]
|
|
|
|
def _strip_ro(cf):
|
|
if not cf:
|
|
return ""
|
|
# Strip optional "RO" prefix + any surrounding whitespace
|
|
return re.sub(r'^RO\s*', '', cf.strip().upper()).strip()
|
|
|
|
is_mismatch = False
|
|
if new_data["is_pj"] and new_cf and not stored_cf:
|
|
is_mismatch = True # PF→PJ (doar dacă are CUI — fără CUI nu putem confirma)
|
|
elif not new_data["is_pj"] and stored_cf:
|
|
is_mismatch = True # PJ→PF
|
|
elif new_data["is_pj"] and stored_cf and _strip_ro(new_cf) != _strip_ro(stored_cf):
|
|
is_mismatch = True # CUI schimbat
|
|
|
|
val = 1 if is_mismatch else 0
|
|
mismatch_map[order.number] = val
|
|
mismatch_updates.append({"order_number": order.number, "partner_mismatch": val})
|
|
|
|
await sqlite_service.update_partner_mismatch_batch(mismatch_updates)
|
|
|
|
# Clear stale mismatches for orders outside the current sync window
|
|
# that have no CUI stored (flagged by old code before the no-CUI fix)
|
|
current_batch_numbers = {o.number for o in already_in_roa}
|
|
cleared = await sqlite_service.clear_stale_partner_mismatches_no_cui(current_batch_numbers)
|
|
if cleared:
|
|
logger.info(f"Partner mismatch: cleared {cleared} stale no-CUI flags from previous sync window")
|
|
|
|
# Auto-resync uninvoiced orders with partner mismatch (max 5/cycle)
|
|
MAX_PARTNER_RESYNC_PER_CYCLE = 5
|
|
total_mismatched = sum(1 for v in mismatch_map.values() if v == 1)
|
|
logger.info(f"Partner mismatch detection: {len(already_in_roa)} orders checked, {total_mismatched} mismatches found")
|
|
mismatched_uninvoiced = [
|
|
o for o in already_in_roa
|
|
if mismatch_map.get(o.number) == 1
|
|
and not stored_partner_data.get(o.number, {}).get("factura_numar")
|
|
][:MAX_PARTNER_RESYNC_PER_CYCLE]
|
|
logger.info(f"Partner auto-resync: {len(mismatched_uninvoiced)} uninvoiced orders queued")
|
|
|
|
if mismatched_uninvoiced:
|
|
resync_ok = 0
|
|
for _order in mismatched_uninvoiced:
|
|
logger.info(f"Partner resync attempt: #{_order.number}")
|
|
try:
|
|
await _resync_partner_for_order(
|
|
order=_order,
|
|
stored=stored_partner_data.get(_order.number, {}),
|
|
app_settings=app_settings,
|
|
run_id=run_id,
|
|
)
|
|
resync_ok += 1
|
|
logger.info(f"Partner resync success: #{_order.number}")
|
|
except Exception as _e:
|
|
_log_line(run_id, f"#{_order.number} EROARE resync partener: {_e}")
|
|
logger.error(f"Partner resync error for {_order.number}: {_e}")
|
|
if resync_ok:
|
|
_log_line(run_id, f"Resync parteneri: {resync_ok} comenzi actualizate")
|
|
|
|
# Step 3b: Record skipped orders + store items (batch)
|
|
skipped_count = len(skipped)
|
|
skipped_batch = []
|
|
for order, missing_skus in skipped:
|
|
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
|
order_items_data = [
|
|
{"sku": item.sku, "product_name": item.name,
|
|
"quantity": item.quantity, "price": item.price,
|
|
"baseprice": item.baseprice, "vat": item.vat,
|
|
"mapping_status": "missing" if item.sku in validation["missing"] else
|
|
"mapped" if item.sku in validation["mapped"] else "direct",
|
|
"codmat": None, "id_articol": None, "cantitate_roa": None}
|
|
for item in order.items
|
|
]
|
|
skipped_batch.append({
|
|
"sync_run_id": run_id, "order_number": order.number,
|
|
"order_date": order.date, "customer_name": customer,
|
|
"status": OrderStatus.SKIPPED.value, "status_at_run": OrderStatus.SKIPPED.value,
|
|
"id_comanda": None, "id_partener": None,
|
|
"error_message": None, "missing_skus": missing_skus,
|
|
"items_count": len(order.items),
|
|
"shipping_name": shipping_name, "billing_name": billing_name,
|
|
"payment_method": payment_method, "delivery_method": delivery_method,
|
|
"order_total": order.total or None,
|
|
"delivery_cost": order.delivery_cost or None,
|
|
"discount_total": order.discount_total or None,
|
|
"web_status": order.status or None,
|
|
"items": order_items_data,
|
|
})
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
|
|
await sqlite_service.save_orders_batch(skipped_batch)
|
|
|
|
# ── Price sync from orders ──
|
|
if app_settings.get("price_sync_enabled") == "1":
|
|
try:
|
|
all_sync_orders = truly_importable + already_in_roa
|
|
direct_id_map = validation.get("direct_id_map", {})
|
|
id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None
|
|
price_updates = await asyncio.to_thread(
|
|
validation_service.sync_prices_from_order,
|
|
all_sync_orders, mapped_codmat_data,
|
|
direct_id_map, codmat_policy_map, id_pol,
|
|
id_pol_productie=id_pol_prod,
|
|
settings=app_settings
|
|
)
|
|
if price_updates:
|
|
_log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate")
|
|
for pu in price_updates:
|
|
_log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f} → {pu['new_price']:.2f}")
|
|
except Exception as e:
|
|
_log_line(run_id, f"Eroare sync prețuri din comenzi: {e}")
|
|
logger.error(f"Price sync error: {e}")
|
|
|
|
_update_progress("skipped", f"Skipped {skipped_count}",
|
|
0, len(truly_importable),
|
|
{"imported": 0, "skipped": skipped_count, "errors": 0, "already_imported": already_imported_count})
|
|
|
|
# ANAF cache pre-population: CUIs from last 3 months with expired/missing cache
|
|
try:
|
|
prepop_cuis = await sqlite_service.get_expired_cuis_for_prepopulate()
|
|
if prepop_cuis:
|
|
_log_line(run_id, f"ANAF pre-populare: {len(prepop_cuis)} CUI-uri cu cache expirat")
|
|
prepop_results = await anaf_service.check_vat_status_batch(
|
|
prepop_cuis, log_fn=lambda msg: _log_line(run_id, msg)
|
|
)
|
|
if prepop_results:
|
|
await sqlite_service.bulk_populate_anaf_cache(prepop_results)
|
|
_log_line(run_id, f"ANAF pre-populare: {len(prepop_results)} rezultate stocate")
|
|
else:
|
|
_log_line(run_id, "ANAF pre-populare: cache complet")
|
|
except Exception as e:
|
|
_log_line(run_id, f"ANAF pre-populare eroare: {e}")
|
|
logger.warning(f"ANAF cache pre-population failed: {e}")
|
|
|
|
# Step 4: ANAF batch verification for company CUIs (RO companies only)
|
|
company_cuis = set()
|
|
for order in truly_importable:
|
|
is_ro = (order.billing.country or "").strip().lower() == "romania"
|
|
if order.billing.is_company and order.billing.company_code and is_ro:
|
|
raw_cf = import_service.clean_web_text(order.billing.company_code) or ""
|
|
bare, _ = anaf_service.sanitize_cui(raw_cf)
|
|
if anaf_service.validate_cui(bare):
|
|
company_cuis.add(bare)
|
|
|
|
# Check anaf_cache for already-known CUIs (7-day validity)
|
|
uncached_cuis = []
|
|
cached_results = {}
|
|
for cui in company_cuis:
|
|
cached = await sqlite_service.get_anaf_cache(cui)
|
|
if cached:
|
|
cached_results[cui] = cached
|
|
else:
|
|
uncached_cuis.append(cui)
|
|
|
|
# Batch ANAF call for uncached CUIs only
|
|
if uncached_cuis:
|
|
_log_line(run_id, f"ANAF: verificare {len(uncached_cuis)} CUI-uri noi...")
|
|
anaf_results = await anaf_service.check_vat_status_batch(
|
|
uncached_cuis, log_fn=lambda msg: _log_line(run_id, msg)
|
|
)
|
|
if anaf_results:
|
|
await sqlite_service.bulk_populate_anaf_cache(anaf_results)
|
|
cached_results.update(anaf_results)
|
|
else:
|
|
_log_line(run_id, "ANAF: batch call esuat, continua fara corectie CUI")
|
|
|
|
# Step 5: Import only truly new orders
|
|
imported_count = 0
|
|
error_count = 0
|
|
|
|
for i, order in enumerate(truly_importable):
|
|
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
|
|
|
_update_progress("import",
|
|
f"Import {i+1}/{len(truly_importable)}: #{order.number} {customer}",
|
|
i + 1, len(truly_importable),
|
|
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
|
|
"already_imported": already_imported_count})
|
|
|
|
# Determine cod_fiscal override from ANAF data
|
|
cod_fiscal_override = None
|
|
anaf_data_for_order = None
|
|
raw_cf = ""
|
|
if order.billing.is_company and order.billing.company_code:
|
|
raw_cf = import_service.clean_web_text(order.billing.company_code) or ""
|
|
bare_cui, cui_warning = anaf_service.sanitize_cui(raw_cf)
|
|
if cui_warning:
|
|
_log_line(run_id, f"#{order.number} WARN: {cui_warning}")
|
|
anaf_data_for_order = cached_results.get(bare_cui)
|
|
if anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None:
|
|
correct_cf = anaf_service.determine_correct_cod_fiscal(bare_cui, anaf_data_for_order["scpTVA"])
|
|
if correct_cf != raw_cf:
|
|
_log_line(run_id, f"#{order.number} CUI corectat: {raw_cf} → {correct_cf}")
|
|
cod_fiscal_override = correct_cf
|
|
|
|
# Determine strict search mode: only when RO company + ANAF data available
|
|
is_ro_company = (order.billing.is_company
|
|
and (order.billing.country or "").strip().lower() == "romania")
|
|
anaf_strict = None
|
|
if is_ro_company and anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None:
|
|
anaf_strict = 1 # ANAF data available → strict search
|
|
|
|
# ANAF official name override: used at partner creation (not lookup).
|
|
# Strip before truthy check → reject whitespace-only values.
|
|
denumire_override = None
|
|
if is_ro_company and anaf_data_for_order:
|
|
anaf_name_clean = (anaf_data_for_order.get("denumire_anaf") or "").strip()
|
|
if anaf_name_clean:
|
|
denumire_override = anaf_name_clean.upper()
|
|
|
|
result = await asyncio.to_thread(
|
|
import_service.import_single_order,
|
|
order, id_pol=id_pol, id_sectie=id_sectie,
|
|
app_settings=app_settings, id_gestiuni=id_gestiuni,
|
|
cod_fiscal_override=cod_fiscal_override,
|
|
anaf_strict=anaf_strict,
|
|
denumire_override=denumire_override,
|
|
)
|
|
|
|
# Build order items data for storage (R9)
|
|
order_items_data = []
|
|
for item in order.items:
|
|
ms = "mapped" if item.sku in validation["mapped"] else "direct"
|
|
order_items_data.append({
|
|
"sku": item.sku, "product_name": item.name,
|
|
"quantity": item.quantity, "price": item.price,
|
|
"baseprice": item.baseprice, "vat": item.vat,
|
|
"mapping_status": ms, "codmat": None, "id_articol": None,
|
|
"cantitate_roa": None
|
|
})
|
|
|
|
# Compute discount split for SQLite storage
|
|
ds = import_service.compute_discount_split(order, app_settings)
|
|
discount_split_json = json.dumps(ds) if ds else None
|
|
|
|
if result["success"]:
|
|
imported_count += 1
|
|
await sqlite_service.upsert_order(
|
|
sync_run_id=run_id,
|
|
order_number=order.number,
|
|
order_date=order.date,
|
|
customer_name=customer,
|
|
status=OrderStatus.IMPORTED.value,
|
|
id_comanda=result["id_comanda"],
|
|
id_partener=result["id_partener"],
|
|
items_count=len(order.items),
|
|
shipping_name=shipping_name,
|
|
billing_name=billing_name,
|
|
payment_method=payment_method,
|
|
delivery_method=delivery_method,
|
|
order_total=order.total or None,
|
|
delivery_cost=order.delivery_cost or None,
|
|
discount_total=order.discount_total or None,
|
|
web_status=order.status or None,
|
|
discount_split=discount_split_json,
|
|
)
|
|
await sqlite_service.add_sync_run_order(run_id, order.number, OrderStatus.IMPORTED.value)
|
|
# Store ROA address IDs (R9)
|
|
await sqlite_service.update_import_order_addresses(
|
|
order.number,
|
|
id_adresa_facturare=result.get("id_adresa_facturare"),
|
|
id_adresa_livrare=result.get("id_adresa_livrare")
|
|
)
|
|
await sqlite_service.add_order_items(order.number, order_items_data)
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → IMPORTAT (ID: {result['id_comanda']})")
|
|
|
|
# Save partner + ANAF + address data to SQLite
|
|
if result["success"] or result.get("id_partener"):
|
|
partner_data = {
|
|
"cod_fiscal_gomag": raw_cf if order.billing.is_company else None,
|
|
"cod_fiscal_roa": result.get("cod_fiscal_roa"),
|
|
"denumire_roa": result.get("denumire_roa"),
|
|
"anaf_platitor_tva": (1 if anaf_data_for_order.get("scpTVA") else 0) if anaf_data_for_order and anaf_data_for_order.get("scpTVA") is not None else None,
|
|
"anaf_checked_at": anaf_data_for_order.get("checked_at") if anaf_data_for_order else None,
|
|
"anaf_cod_fiscal_adjusted": 1 if (
|
|
cod_fiscal_override
|
|
and result.get("cod_fiscal_roa")
|
|
and anaf_service.strip_ro_prefix(result["cod_fiscal_roa"]) == anaf_service.strip_ro_prefix(raw_cf)
|
|
and result["cod_fiscal_roa"].strip().upper().replace("RO ", "RO") != raw_cf.strip().upper().replace("RO ", "RO")
|
|
) else 0,
|
|
"adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None,
|
|
"adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}),
|
|
"adresa_livrare_roa": json.dumps(result.get("adresa_livrare_roa")) if result.get("adresa_livrare_roa") else None,
|
|
"adresa_facturare_roa": json.dumps(result.get("adresa_facturare_roa")) if result.get("adresa_facturare_roa") else None,
|
|
"anaf_denumire_mismatch": 0,
|
|
"denumire_anaf": None,
|
|
}
|
|
# Denomination mismatch check
|
|
if anaf_data_for_order and anaf_data_for_order.get("denumire_anaf") and order.billing.is_company:
|
|
norm_gomag = anaf_service.normalize_company_name(order.billing.company_name or "")
|
|
norm_anaf = anaf_service.normalize_company_name(anaf_data_for_order["denumire_anaf"])
|
|
if norm_gomag and norm_anaf and norm_gomag != norm_anaf:
|
|
partner_data["anaf_denumire_mismatch"] = 1
|
|
partner_data["denumire_anaf"] = anaf_data_for_order["denumire_anaf"]
|
|
|
|
# Address mismatch check (server-side, mirrors JS addrMatch)
|
|
livr_match = _addr_match(partner_data.get("adresa_livrare_gomag"), partner_data.get("adresa_livrare_roa"))
|
|
fact_match = _addr_match(partner_data.get("adresa_facturare_gomag"), partner_data.get("adresa_facturare_roa"))
|
|
partner_data["address_mismatch"] = 1 if (not livr_match or not fact_match) else 0
|
|
|
|
await sqlite_service.update_order_partner_data(order.number, partner_data)
|
|
|
|
if not result["success"]:
|
|
error_count += 1
|
|
await sqlite_service.upsert_order(
|
|
sync_run_id=run_id,
|
|
order_number=order.number,
|
|
order_date=order.date,
|
|
customer_name=customer,
|
|
status=OrderStatus.ERROR.value,
|
|
id_partener=result.get("id_partener"),
|
|
error_message=result["error"],
|
|
items_count=len(order.items),
|
|
shipping_name=shipping_name,
|
|
billing_name=billing_name,
|
|
payment_method=payment_method,
|
|
delivery_method=delivery_method,
|
|
order_total=order.total or None,
|
|
delivery_cost=order.delivery_cost or None,
|
|
discount_total=order.discount_total or None,
|
|
web_status=order.status or None,
|
|
discount_split=discount_split_json,
|
|
)
|
|
await sqlite_service.add_sync_run_order(run_id, order.number, OrderStatus.ERROR.value)
|
|
await sqlite_service.add_order_items(order.number, order_items_data)
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → EROARE: {result['error']}")
|
|
|
|
# Safety: stop if too many errors
|
|
if error_count > 10:
|
|
logger.warning("Too many errors, stopping sync")
|
|
break
|
|
|
|
# Step 4b: Invoice & order status check — sync with Oracle
|
|
_update_progress("invoices", "Checking invoices & order status...", 0, 0)
|
|
invoices_updated = 0
|
|
invoices_cleared = 0
|
|
orders_deleted = 0
|
|
try:
|
|
# 4b-1: Uninvoiced → check for new invoices
|
|
uninvoiced = await sqlite_service.get_uninvoiced_imported_orders()
|
|
if uninvoiced:
|
|
id_comanda_list = [o["id_comanda"] for o in uninvoiced]
|
|
invoice_data = await asyncio.to_thread(
|
|
invoice_service.check_invoices_for_orders, id_comanda_list
|
|
)
|
|
id_to_order = {o["id_comanda"]: o["order_number"] for o in uninvoiced}
|
|
for idc, inv in invoice_data.items():
|
|
order_num = id_to_order.get(idc)
|
|
if order_num and inv.get("facturat"):
|
|
await sqlite_service.update_order_invoice(
|
|
order_num,
|
|
serie=inv.get("serie_act"),
|
|
numar=str(inv.get("numar_act", "")),
|
|
total_fara_tva=inv.get("total_fara_tva"),
|
|
total_tva=inv.get("total_tva"),
|
|
total_cu_tva=inv.get("total_cu_tva"),
|
|
data_act=inv.get("data_act"),
|
|
)
|
|
invoices_updated += 1
|
|
|
|
# 4b-2: Invoiced → check for deleted invoices
|
|
invoiced = await sqlite_service.get_invoiced_imported_orders()
|
|
if invoiced:
|
|
id_comanda_list = [o["id_comanda"] for o in invoiced]
|
|
invoice_data = await asyncio.to_thread(
|
|
invoice_service.check_invoices_for_orders, id_comanda_list
|
|
)
|
|
for o in invoiced:
|
|
if o["id_comanda"] not in invoice_data:
|
|
await sqlite_service.clear_order_invoice(o["order_number"])
|
|
invoices_cleared += 1
|
|
|
|
# 4b-3: All imported → check for deleted orders in ROA
|
|
all_imported = await sqlite_service.get_all_imported_orders()
|
|
if all_imported:
|
|
id_comanda_list = [o["id_comanda"] for o in all_imported]
|
|
existing_ids = await asyncio.to_thread(
|
|
invoice_service.check_orders_exist, id_comanda_list
|
|
)
|
|
for o in all_imported:
|
|
if o["id_comanda"] not in existing_ids:
|
|
await sqlite_service.mark_order_deleted_in_roa(o["order_number"])
|
|
orders_deleted += 1
|
|
|
|
if invoices_updated:
|
|
_log_line(run_id, f"Facturi noi: {invoices_updated} comenzi facturate")
|
|
if invoices_cleared:
|
|
_log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache")
|
|
if orders_deleted:
|
|
_log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}")
|
|
except Exception as e:
|
|
logger.warning(f"Invoice/order status check failed: {e}")
|
|
|
|
# Step 4c: ANAF backfill — populate anaf_platitor_tva for orders with CUI but no ANAF data
|
|
try:
|
|
orders_needing_anaf = await sqlite_service.get_orders_missing_anaf()
|
|
if orders_needing_anaf:
|
|
# Group orders by unique CUI
|
|
from collections import defaultdict
|
|
cui_to_orders = defaultdict(list)
|
|
for o in orders_needing_anaf:
|
|
bare = anaf_service.strip_ro_prefix(o["cod_fiscal_roa"])
|
|
if anaf_service.validate_cui(bare):
|
|
cui_to_orders[bare].append(o)
|
|
|
|
# Batch cache lookup
|
|
unique_cuis = list(cui_to_orders.keys())
|
|
anaf_cache = await sqlite_service.get_anaf_cache_batch(unique_cuis)
|
|
|
|
# Single ANAF API call for uncached CUIs
|
|
uncached = [c for c in unique_cuis if c not in anaf_cache]
|
|
if uncached:
|
|
fresh = await anaf_service.check_vat_status_batch(uncached)
|
|
if fresh:
|
|
await sqlite_service.bulk_populate_anaf_cache(fresh)
|
|
anaf_cache.update(fresh)
|
|
|
|
# Build batch updates
|
|
db_updates = []
|
|
for cui, orders_for_cui in cui_to_orders.items():
|
|
data = anaf_cache.get(cui)
|
|
if not data or data.get("scpTVA") is None:
|
|
continue
|
|
platitor = 1 if data["scpTVA"] else 0
|
|
checked_at = data.get("checked_at")
|
|
denumire_anaf = data.get("denumire_anaf") or ""
|
|
for o in orders_for_cui:
|
|
mismatch = 0
|
|
den_store = None
|
|
if denumire_anaf:
|
|
norm_roa = anaf_service.normalize_company_name(o.get("denumire_roa") or o.get("customer_name") or "")
|
|
norm_anaf = anaf_service.normalize_company_name(denumire_anaf)
|
|
if norm_roa and norm_anaf and norm_roa != norm_anaf:
|
|
mismatch = 1
|
|
den_store = denumire_anaf
|
|
db_updates.append((platitor, checked_at, mismatch, den_store, o["order_number"]))
|
|
|
|
await sqlite_service.bulk_update_order_anaf_data(db_updates)
|
|
if db_updates:
|
|
_log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate")
|
|
except Exception as e:
|
|
logger.warning(f"ANAF backfill failed: {e}")
|
|
_log_line(run_id, f"ANAF backfill eroare: {e}")
|
|
|
|
# Step 5: Update sync run
|
|
total_imported = imported_count + already_imported_count # backward-compat
|
|
status = "completed" if error_count <= 10 else "failed"
|
|
await sqlite_service.update_sync_run(
|
|
run_id, status, len(orders), total_imported, len(skipped), error_count,
|
|
already_imported=already_imported_count, new_imported=imported_count
|
|
)
|
|
|
|
summary = {
|
|
"run_id": run_id,
|
|
"status": status,
|
|
"json_files": json_count,
|
|
"total_orders": len(orders) + cancelled_count,
|
|
"new_orders": len(truly_importable),
|
|
"imported": total_imported,
|
|
"new_imported": imported_count,
|
|
"already_imported": already_imported_count,
|
|
"skipped": len(skipped),
|
|
"errors": error_count,
|
|
"cancelled": cancelled_count,
|
|
"missing_skus": len(validation["missing"]),
|
|
"invoices_updated": invoices_updated,
|
|
"invoices_cleared": invoices_cleared,
|
|
"orders_deleted_in_roa": orders_deleted,
|
|
}
|
|
|
|
_update_progress("completed",
|
|
f"Completed: {imported_count} new, {already_imported_count} already, {len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled",
|
|
len(truly_importable), len(truly_importable),
|
|
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
|
|
"already_imported": already_imported_count, "cancelled": cancelled_count})
|
|
if _current_sync:
|
|
_current_sync["status"] = status
|
|
_current_sync["finished_at"] = _now().isoformat()
|
|
|
|
logger.info(
|
|
f"Sync {run_id} completed: {imported_count} new, {already_imported_count} already imported, "
|
|
f"{len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled"
|
|
)
|
|
|
|
duration = (_now() - started_dt).total_seconds()
|
|
_log_line(run_id, "")
|
|
cancelled_text = f", {cancelled_count} anulate" if cancelled_count else ""
|
|
_run_logs[run_id].append(
|
|
f"Finalizat: {imported_count} importate, {already_imported_count} deja importate, "
|
|
f"{len(skipped)} nemapate, {error_count} erori{cancelled_text} din {len(orders) + cancelled_count} comenzi | Durata: {int(duration)}s"
|
|
)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Sync {run_id} failed: {e}")
|
|
_log_line(run_id, f"EROARE FATALA: {e}")
|
|
await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e))
|
|
if _current_sync:
|
|
_current_sync["status"] = "failed"
|
|
_current_sync["finished_at"] = _now().isoformat()
|
|
_current_sync["error"] = str(e)
|
|
return {"run_id": run_id, "status": "failed", "error": str(e)}
|
|
finally:
|
|
# Keep _current_sync for 10 seconds so status endpoint can show final result
|
|
async def _clear_current_sync():
|
|
await asyncio.sleep(10)
|
|
global _current_sync
|
|
_current_sync = None
|
|
asyncio.ensure_future(_clear_current_sync())
|
|
|
|
async def _clear_run_logs():
|
|
await asyncio.sleep(300) # 5 minutes
|
|
_run_logs.pop(run_id, None)
|
|
asyncio.ensure_future(_clear_run_logs())
|
|
|
|
|
|
def stop_sync():
|
|
"""Signal sync to stop. Currently sync runs to completion."""
|
|
pass
|
|
|
|
|
|
async def _resync_partner_for_order(order, stored: dict, app_settings: dict, run_id: str) -> None:
|
|
"""Resync partner for a single already-imported uninvoiced order.
|
|
|
|
Safety: double-checks factura_numar before Oracle call.
|
|
Reads existing comanda row and calls PACK_COMENZI.modifica_comanda.
|
|
"""
|
|
import oracledb
|
|
|
|
order_number = order.number
|
|
id_comanda = stored.get("id_comanda")
|
|
if not id_comanda:
|
|
_log_line(run_id, f"#{order_number} SKIP resync partener: id_comanda lipsa")
|
|
return
|
|
|
|
# Double-check factura_numar — may have been invoiced since mismatch detection
|
|
current_detail = await sqlite_service.get_order_detail(order_number)
|
|
if current_detail and current_detail.get("order", {}).get("factura_numar"):
|
|
_log_line(run_id, f"#{order_number} SKIP resync partener: comanda facturata in tranzit")
|
|
return
|
|
|
|
old_partner_id = stored.get("id_partener")
|
|
old_partner_name = stored.get("denumire_roa") or "?"
|
|
|
|
new_partner_data = import_service.determine_partner_data(order)
|
|
|
|
# ANAF check for PF→PJ transition
|
|
cod_fiscal_override = None
|
|
anaf_data = None
|
|
if new_partner_data["is_pj"] and new_partner_data["cod_fiscal"]:
|
|
raw_cf = new_partner_data["cod_fiscal"]
|
|
bare_cui, _ = anaf_service.sanitize_cui(raw_cf)
|
|
if bare_cui:
|
|
anaf_data = await sqlite_service.get_anaf_cache(bare_cui)
|
|
if not anaf_data:
|
|
try:
|
|
fresh = await anaf_service.check_vat_status_batch([bare_cui])
|
|
if fresh:
|
|
await sqlite_service.bulk_populate_anaf_cache(fresh)
|
|
anaf_data = fresh.get(bare_cui)
|
|
except Exception as e:
|
|
logger.warning(f"ANAF check failed for {bare_cui}: {e}")
|
|
if anaf_data and anaf_data.get("scpTVA") is not None:
|
|
cod_fiscal_override = anaf_service.determine_correct_cod_fiscal(
|
|
bare_cui, anaf_data["scpTVA"]
|
|
)
|
|
|
|
def _do_resync():
|
|
if database.pool is None:
|
|
raise RuntimeError("Oracle pool not initialized")
|
|
conn = database.pool.acquire()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
# Create/find partner
|
|
id_partener_var = cur.var(oracledb.DB_TYPE_NUMBER)
|
|
anaf_strict = 1 if (anaf_data and anaf_data.get("scpTVA") is not None) else None
|
|
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_partener", [
|
|
cod_fiscal_override or new_partner_data["cod_fiscal"],
|
|
new_partner_data["denumire"],
|
|
new_partner_data["registru"],
|
|
new_partner_data["is_pj"],
|
|
anaf_strict,
|
|
id_partener_var,
|
|
])
|
|
new_partner_id = id_partener_var.getvalue()
|
|
if not new_partner_id or new_partner_id <= 0:
|
|
raise RuntimeError(f"Partner creation failed for {new_partner_data['denumire']}")
|
|
new_partner_id = int(new_partner_id)
|
|
|
|
# Same partner — just clear mismatch
|
|
if new_partner_id == (old_partner_id or -1):
|
|
return {"same_partner": True, "new_partner_id": new_partner_id}
|
|
|
|
# Get new partner details for audit log
|
|
cur.execute(
|
|
"SELECT denumire, cod_fiscal FROM nom_parteneri WHERE id_part = :1",
|
|
[new_partner_id]
|
|
)
|
|
row = cur.fetchone()
|
|
new_partner_name = row[0] if row else new_partner_data["denumire"]
|
|
new_cod_fiscal_roa = row[1] if row else None
|
|
|
|
# Create addresses under new partner
|
|
addr_livr_id = None
|
|
shipping_addr = None
|
|
if order.shipping:
|
|
id_adresa_livr = cur.var(oracledb.DB_TYPE_NUMBER)
|
|
shipping_addr = import_service.format_address_for_oracle(
|
|
order.shipping.address, order.shipping.city, order.shipping.region
|
|
)
|
|
shipping_phone = order.shipping.phone or order.billing.phone or ""
|
|
shipping_email = order.shipping.email or order.billing.email or ""
|
|
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_adresa", [
|
|
new_partner_id, shipping_addr, shipping_phone, shipping_email, id_adresa_livr
|
|
])
|
|
addr_livr_id = id_adresa_livr.getvalue()
|
|
if addr_livr_id is None:
|
|
raise RuntimeError(f"Shipping address creation failed for partner {new_partner_id}")
|
|
addr_livr_id = int(addr_livr_id)
|
|
|
|
billing_name_str = import_service.clean_web_text(
|
|
f"{order.billing.lastname} {order.billing.firstname}"
|
|
).strip().upper()
|
|
ship_name_str = ""
|
|
if order.shipping:
|
|
ship_name_str = import_service.clean_web_text(
|
|
f"{order.shipping.lastname} {order.shipping.firstname}"
|
|
).strip().upper()
|
|
different_person = bool(ship_name_str and billing_name_str and ship_name_str != billing_name_str)
|
|
|
|
if different_person and addr_livr_id:
|
|
addr_fact_id = addr_livr_id
|
|
else:
|
|
billing_addr = import_service.format_address_for_oracle(
|
|
order.billing.address, order.billing.city, order.billing.region
|
|
)
|
|
if addr_livr_id and order.shipping and billing_addr == shipping_addr:
|
|
addr_fact_id = addr_livr_id
|
|
else:
|
|
id_adresa_fact = cur.var(oracledb.DB_TYPE_NUMBER)
|
|
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_adresa", [
|
|
new_partner_id, billing_addr,
|
|
order.billing.phone or "",
|
|
order.billing.email or "",
|
|
id_adresa_fact,
|
|
])
|
|
addr_fact_id = id_adresa_fact.getvalue()
|
|
if addr_fact_id is None:
|
|
raise RuntimeError(f"Billing address creation failed for partner {new_partner_id}")
|
|
addr_fact_id = int(addr_fact_id)
|
|
|
|
# Read existing comanda row for modifica_comanda params
|
|
cur.execute("""
|
|
SELECT nr_comanda, data_comanda, data_livrare, proc_discount,
|
|
interna, id_util_um, id_codclient, comanda_externa, id_ctr
|
|
FROM comenzi WHERE id_comanda = :1
|
|
""", [id_comanda])
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise RuntimeError(f"Comanda {id_comanda} not found in Oracle")
|
|
nr_comanda, data_comanda, data_livrare, proc_discount, interna, id_util_um, id_codclient, comanda_externa, id_ctr = row
|
|
|
|
cur.callproc("PACK_COMENZI.modifica_comanda", [
|
|
id_comanda,
|
|
nr_comanda,
|
|
data_comanda,
|
|
new_partner_id,
|
|
data_livrare,
|
|
proc_discount,
|
|
interna,
|
|
id_util_um,
|
|
addr_fact_id,
|
|
addr_livr_id,
|
|
id_codclient,
|
|
comanda_externa,
|
|
id_ctr,
|
|
])
|
|
conn.commit()
|
|
return {
|
|
"same_partner": False,
|
|
"new_partner_id": new_partner_id,
|
|
"new_partner_name": new_partner_name,
|
|
"new_cod_fiscal_roa": new_cod_fiscal_roa,
|
|
}
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
finally:
|
|
database.pool.release(conn)
|
|
|
|
resync_result = await asyncio.to_thread(_do_resync)
|
|
|
|
if resync_result.get("same_partner"):
|
|
# Update cod_fiscal_gomag so next detection doesn't re-flag this order
|
|
await sqlite_service.update_partner_resync_data(order_number, {
|
|
"id_partener": resync_result["new_partner_id"],
|
|
"cod_fiscal_gomag": cod_fiscal_override or new_partner_data["cod_fiscal"],
|
|
"cod_fiscal_roa": None,
|
|
"denumire_roa": stored.get("denumire_roa"),
|
|
"partner_mismatch": 0,
|
|
})
|
|
_log_line(run_id, f"#{order_number} RESYNC: partener neschimbat, mismatch cleared")
|
|
else:
|
|
new_partner_id = resync_result["new_partner_id"]
|
|
new_partner_name = resync_result.get("new_partner_name", "?")
|
|
new_cod_fiscal_roa = resync_result.get("new_cod_fiscal_roa")
|
|
await sqlite_service.update_partner_resync_data(order_number, {
|
|
"id_partener": new_partner_id,
|
|
"cod_fiscal_gomag": cod_fiscal_override or new_partner_data["cod_fiscal"],
|
|
"cod_fiscal_roa": new_cod_fiscal_roa,
|
|
"denumire_roa": new_partner_name,
|
|
"partner_mismatch": 0,
|
|
})
|
|
_log_line(
|
|
run_id,
|
|
f"#{order_number} RESYNC partener: {old_partner_id} ({old_partner_name}) → {new_partner_id} ({new_partner_name})"
|
|
)
|