import asyncio import json import logging import uuid from datetime import datetime, timedelta from zoneinfo import ZoneInfo _tz_bucharest = ZoneInfo("Europe/Bucharest") def _now(): """Return current time in Bucharest timezone (naive, for display/storage).""" return datetime.now(_tz_bucharest).replace(tzinfo=None) from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client from ..config import settings from .. import database logger = logging.getLogger(__name__) # Sync state _sync_lock = asyncio.Lock() _current_sync = None # dict with run_id, status, progress info # In-memory text log buffer per run _run_logs: dict[str, list[str]] = {} def _log_line(run_id: str, message: str): """Append a timestamped line to the in-memory log buffer.""" if run_id not in _run_logs: _run_logs[run_id] = [] ts = _now().strftime("%H:%M:%S") _run_logs[run_id].append(f"[{ts}] {message}") def get_run_text_log(run_id: str) -> str | None: """Return the accumulated text log for a run, or None if not found.""" lines = _run_logs.get(run_id) if lines is None: return None return "\n".join(lines) def _update_progress(phase: str, phase_text: str, current: int = 0, total: int = 0, counts: dict = None): """Update _current_sync with progress details for polling.""" global _current_sync if _current_sync is None: return _current_sync["phase"] = phase _current_sync["phase_text"] = phase_text _current_sync["progress_current"] = current _current_sync["progress_total"] = total _current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0} async def get_sync_status(): """Get current sync status.""" if _current_sync: return {**_current_sync} return {"status": "idle"} async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict: """Prepare a sync run - creates run_id and sets initial state. Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running. """ global _current_sync if _sync_lock.locked(): return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None} run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6] _current_sync = { "run_id": run_id, "status": "running", "started_at": _now().isoformat(), "finished_at": None, "phase": "starting", "phase_text": "Starting...", "progress_current": 0, "progress_total": 0, "counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0}, } return {"run_id": run_id, "status": "starting"} def _derive_customer_info(order): """Extract shipping/billing names and customer from an order. customer = who appears on the invoice (partner in ROA): - company name if billing is on a company - shipping person name otherwise (consistent with import_service partner logic) """ shipping_name = "" if order.shipping: shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip() billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip() if not shipping_name: shipping_name = billing_name if order.billing.is_company and order.billing.company_name: customer = order.billing.company_name else: customer = shipping_name or billing_name payment_method = getattr(order, 'payment_name', None) or None delivery_method = getattr(order, 'delivery_name', None) or None return shipping_name.upper(), billing_name.upper(), customer.upper(), payment_method, delivery_method async def _fix_stale_error_orders(existing_map: dict, run_id: str): """Fix orders stuck in ERROR status that are actually in Oracle. This can happen when a previous import committed partially (no rollback on error). If the order exists in Oracle COMENZI, update SQLite status to ALREADY_IMPORTED. """ from ..database import get_sqlite db = await get_sqlite() try: cursor = await db.execute( "SELECT order_number FROM orders WHERE status = 'ERROR'" ) error_orders = [row["order_number"] for row in await cursor.fetchall()] fixed = 0 for order_number in error_orders: if order_number in existing_map: id_comanda = existing_map[order_number] await db.execute(""" UPDATE orders SET status = 'ALREADY_IMPORTED', id_comanda = ?, error_message = NULL, updated_at = datetime('now') WHERE order_number = ? AND status = 'ERROR' """, (id_comanda, order_number)) fixed += 1 _log_line(run_id, f"#{order_number} → status corectat ERROR → ALREADY_IMPORTED (ID: {id_comanda})") if fixed: await db.commit() logger.info(f"Fixed {fixed} stale ERROR orders that exist in Oracle") finally: await db.close() async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict: """Run a full sync cycle. Returns summary dict.""" global _current_sync if _sync_lock.locked(): return {"error": "Sync already running"} async with _sync_lock: # Use provided run_id or generate one if not run_id: run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6] _current_sync = { "run_id": run_id, "status": "running", "started_at": _now().isoformat(), "finished_at": None, "phase": "reading", "phase_text": "Reading JSON files...", "progress_current": 0, "progress_total": 0, "counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0}, } _update_progress("reading", "Reading JSON files...") started_dt = _now() _run_logs[run_id] = [ f"=== Sync Run {run_id} ===", f"Inceput: {started_dt.strftime('%d.%m.%Y %H:%M:%S')}", "" ] json_dir = settings.JSON_OUTPUT_DIR try: # Phase 0: Download orders from GoMag API _update_progress("downloading", "Descărcare comenzi din GoMag API...") _log_line(run_id, "Descărcare comenzi din GoMag API...") # Read GoMag settings from SQLite (override config defaults) dl_settings = await sqlite_service.get_app_settings() gomag_key = dl_settings.get("gomag_api_key") or None gomag_shop = dl_settings.get("gomag_api_shop") or None gomag_days_str = dl_settings.get("gomag_order_days_back") gomag_days = int(gomag_days_str) if gomag_days_str else None gomag_limit_str = dl_settings.get("gomag_limit") gomag_limit = int(gomag_limit_str) if gomag_limit_str else None dl_result = await gomag_client.download_orders( json_dir, log_fn=lambda msg: _log_line(run_id, msg), api_key=gomag_key, api_shop=gomag_shop, days_back=gomag_days, limit=gomag_limit, ) if dl_result["files"]: _log_line(run_id, f"GoMag: {dl_result['total']} comenzi în {dl_result['pages']} pagini → {len(dl_result['files'])} fișiere") _update_progress("reading", "Citire fisiere JSON...") _log_line(run_id, "Citire fisiere JSON...") # Step 1: Read orders and sort chronologically (oldest first - R3) orders, json_count = order_reader.read_json_orders() orders.sort(key=lambda o: o.date or '') await sqlite_service.create_sync_run(run_id, json_count) _update_progress("reading", f"Found {len(orders)} orders in {json_count} files", 0, len(orders)) _log_line(run_id, f"Gasite {len(orders)} comenzi in {json_count} fisiere") # Populate web_products catalog from all orders (R4) web_product_items = [ (item.sku, item.name) for order in orders for item in order.items if item.sku and item.name ] await sqlite_service.upsert_web_products_batch(web_product_items) if not orders: _log_line(run_id, "Nicio comanda gasita.") await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0) _update_progress("completed", "No orders found") summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count} return summary # ── Separate cancelled orders (GoMag status "Anulata" / statusId "7") ── cancelled_orders = [o for o in orders if o.status_id == "7" or (o.status and o.status.lower() == "anulata")] active_orders = [o for o in orders if o not in cancelled_orders] cancelled_count = len(cancelled_orders) if cancelled_orders: _log_line(run_id, f"Comenzi anulate in GoMag: {cancelled_count}") # Record cancelled orders in SQLite cancelled_batch = [] for order in cancelled_orders: shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order) order_items_data = [ {"sku": item.sku, "product_name": item.name, "quantity": item.quantity, "price": item.price, "vat": item.vat, "mapping_status": "unknown", "codmat": None, "id_articol": None, "cantitate_roa": None} for item in order.items ] cancelled_batch.append({ "sync_run_id": run_id, "order_number": order.number, "order_date": order.date, "customer_name": customer, "status": "CANCELLED", "status_at_run": "CANCELLED", "id_comanda": None, "id_partener": None, "error_message": "Comanda anulata in GoMag", "missing_skus": None, "items_count": len(order.items), "shipping_name": shipping_name, "billing_name": billing_name, "payment_method": payment_method, "delivery_method": delivery_method, "order_total": order.total or None, "delivery_cost": order.delivery_cost or None, "discount_total": order.discount_total or None, "web_status": order.status or "Anulata", "items": order_items_data, }) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag") await sqlite_service.save_orders_batch(cancelled_batch) # Check if any cancelled orders were previously imported from ..database import get_sqlite as _get_sqlite db_check = await _get_sqlite() try: cancelled_numbers = [o.number for o in cancelled_orders] placeholders = ",".join("?" for _ in cancelled_numbers) cursor = await db_check.execute(f""" SELECT order_number, id_comanda FROM orders WHERE order_number IN ({placeholders}) AND id_comanda IS NOT NULL AND status = 'CANCELLED' """, cancelled_numbers) previously_imported = [dict(r) for r in await cursor.fetchall()] finally: await db_check.close() if previously_imported: _log_line(run_id, f"Verificare {len(previously_imported)} comenzi anulate care erau importate in Oracle...") # Check which have invoices id_comanda_list = [o["id_comanda"] for o in previously_imported] invoice_data = await asyncio.to_thread( invoice_service.check_invoices_for_orders, id_comanda_list ) for o in previously_imported: idc = o["id_comanda"] order_num = o["order_number"] if idc in invoice_data: # Invoiced — keep in Oracle, just log warning _log_line(run_id, f"#{order_num} → ANULAT dar FACTURAT (factura {invoice_data[idc].get('serie_act', '')}" f"{invoice_data[idc].get('numar_act', '')}) — NU se sterge din Oracle") # Update web_status but keep CANCELLED status (already set by batch above) else: # Not invoiced — soft-delete in Oracle del_result = await asyncio.to_thread( import_service.soft_delete_order_in_roa, idc ) if del_result["success"]: # Clear id_comanda via mark_order_cancelled await sqlite_service.mark_order_cancelled(order_num, "Anulata") _log_line(run_id, f"#{order_num} → ANULAT + STERS din Oracle (ID: {idc}, " f"{del_result['details_deleted']} detalii)") else: _log_line(run_id, f"#{order_num} → ANULAT dar EROARE la stergere Oracle: {del_result['error']}") orders = active_orders if not orders: _log_line(run_id, "Nicio comanda activa dupa filtrare anulate.") await sqlite_service.update_sync_run(run_id, "completed", cancelled_count, 0, 0, 0) _update_progress("completed", f"No active orders ({cancelled_count} cancelled)") summary = {"run_id": run_id, "status": "completed", "message": f"No active orders ({cancelled_count} cancelled)", "json_files": json_count, "cancelled": cancelled_count} return summary _update_progress("validation", f"Validating {len(orders)} orders...", 0, len(orders)) # ── Single Oracle connection for entire validation phase ── conn = await asyncio.to_thread(database.get_oracle_connection) try: # Step 2a: Find orders already in Oracle (date-range query) order_dates = [o.date for o in orders if o.date] if order_dates: min_date_str = min(order_dates) try: min_date = datetime.strptime(min_date_str[:10], "%Y-%m-%d") - timedelta(days=1) except (ValueError, TypeError): min_date = _now() - timedelta(days=90) else: min_date = _now() - timedelta(days=90) existing_map = await asyncio.to_thread( validation_service.check_orders_in_roa, min_date, conn ) # Step 2a-fix: Fix ERROR orders that are actually in Oracle # (can happen if previous import committed partially without rollback) await _fix_stale_error_orders(existing_map, run_id) # Load app settings early (needed for id_gestiune in SKU validation) app_settings = await sqlite_service.get_app_settings() id_pol = id_pol or int(app_settings.get("id_pol") or 0) or settings.ID_POL id_sectie = id_sectie or int(app_settings.get("id_sectie") or 0) or settings.ID_SECTIE # Parse multi-gestiune CSV: "1,3" → [1, 3], "" → None id_gestiune_raw = (app_settings.get("id_gestiune") or "").strip() if id_gestiune_raw and id_gestiune_raw != "0": id_gestiuni = [int(g) for g in id_gestiune_raw.split(",") if g.strip()] else: id_gestiuni = None # None = orice gestiune logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}") _log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}") # Step 2b: Validate SKUs (reuse same connection) all_skus = order_reader.get_all_skus(orders) validation = await asyncio.to_thread(validation_service.validate_skus, all_skus, conn, id_gestiuni) importable, skipped = validation_service.classify_orders(orders, validation) # ── Split importable into truly_importable vs already_in_roa ── truly_importable = [] already_in_roa = [] for order in importable: if order.number in existing_map: already_in_roa.append(order) else: truly_importable.append(order) _update_progress("validation", f"{len(truly_importable)} new, {len(already_in_roa)} already imported, {len(skipped)} skipped", 0, len(truly_importable)) _log_line(run_id, f"Validare: {len(truly_importable)} noi, {len(already_in_roa)} deja importate, {len(skipped)} nemapate") # Step 2c: Build SKU context from skipped orders sku_context = {} for order, missing_skus_list in skipped: if order.billing.is_company and order.billing.company_name: customer = order.billing.company_name else: ship_name = "" if order.shipping: ship_name = f"{order.shipping.firstname} {order.shipping.lastname}".strip() customer = ship_name or f"{order.billing.firstname} {order.billing.lastname}" for sku in missing_skus_list: if sku not in sku_context: sku_context[sku] = {"orders": [], "customers": []} if order.number not in sku_context[sku]["orders"]: sku_context[sku]["orders"].append(order.number) if customer not in sku_context[sku]["customers"]: sku_context[sku]["customers"].append(customer) # Track missing SKUs with context for sku in validation["missing"]: product_name = "" for order in orders: for item in order.items: if item.sku == sku: product_name = item.name break if product_name: break ctx = sku_context.get(sku, {}) await sqlite_service.track_missing_sku( sku, product_name, order_count=len(ctx.get("orders", [])), order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None, customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None, ) # Step 2d: Pre-validate prices for importable articles if id_pol and (truly_importable or already_in_roa): _update_progress("validation", "Validating prices...", 0, len(truly_importable)) _log_line(run_id, "Validare preturi...") all_codmats = set() for order in (truly_importable + already_in_roa): for item in order.items: if item.sku in validation["mapped"]: pass elif item.sku in validation["direct"]: all_codmats.add(item.sku) # Get standard VAT rate from settings for PROC_TVAV metadata cota_tva = float(app_settings.get("discount_vat") or 21) # Dual pricing policy support id_pol_productie = int(app_settings.get("id_pol_productie") or 0) or None codmat_policy_map = {} if all_codmats: if id_pol_productie: # Dual-policy: classify articles by cont (sales vs production) codmat_policy_map = await asyncio.to_thread( validation_service.validate_and_ensure_prices_dual, all_codmats, id_pol, id_pol_productie, conn, validation.get("direct_id_map"), cota_tva=cota_tva ) _log_line(run_id, f"Politici duale: {sum(1 for v in codmat_policy_map.values() if v == id_pol)} vanzare, " f"{sum(1 for v in codmat_policy_map.values() if v == id_pol_productie)} productie") else: # Single-policy (backward compatible) price_result = await asyncio.to_thread( validation_service.validate_prices, all_codmats, id_pol, conn, validation.get("direct_id_map") ) if price_result["missing_price"]: logger.info( f"Auto-adding price 0 for {len(price_result['missing_price'])} " f"direct articles in policy {id_pol}" ) await asyncio.to_thread( validation_service.ensure_prices, price_result["missing_price"], id_pol, conn, validation.get("direct_id_map"), cota_tva=cota_tva ) # Also validate mapped SKU prices (cherry-pick 1) mapped_skus_in_orders = set() for order in (truly_importable + already_in_roa): for item in order.items: if item.sku in validation["mapped"]: mapped_skus_in_orders.add(item.sku) mapped_codmat_data = {} if mapped_skus_in_orders: mapped_codmat_data = await asyncio.to_thread( validation_service.resolve_mapped_codmats, mapped_skus_in_orders, conn, id_gestiuni=id_gestiuni ) # Build id_map for mapped codmats and validate/ensure their prices mapped_id_map = {} for sku, entries in mapped_codmat_data.items(): for entry in entries: mapped_id_map[entry["codmat"]] = { "id_articol": entry["id_articol"], "cont": entry.get("cont") } mapped_codmats = set(mapped_id_map.keys()) if mapped_codmats: if id_pol_productie: mapped_policy_map = await asyncio.to_thread( validation_service.validate_and_ensure_prices_dual, mapped_codmats, id_pol, id_pol_productie, conn, mapped_id_map, cota_tva=cota_tva ) codmat_policy_map.update(mapped_policy_map) else: mp_result = await asyncio.to_thread( validation_service.validate_prices, mapped_codmats, id_pol, conn, mapped_id_map ) if mp_result["missing_price"]: await asyncio.to_thread( validation_service.ensure_prices, mp_result["missing_price"], id_pol, conn, mapped_id_map, cota_tva=cota_tva ) # Add SKU → policy entries for mapped articles (1:1 and kits) # codmat_policy_map has CODMAT keys, but build_articles_json # looks up by GoMag SKU — bridge the gap here if codmat_policy_map and mapped_codmat_data: for sku, entries in mapped_codmat_data.items(): if len(entries) == 1: # 1:1 mapping: SKU inherits the CODMAT's policy codmat = entries[0]["codmat"] if codmat in codmat_policy_map: codmat_policy_map[sku] = codmat_policy_map[codmat] # Pass codmat_policy_map to import via app_settings if codmat_policy_map: app_settings["_codmat_policy_map"] = codmat_policy_map # ── Kit component price validation ── kit_pricing_mode = app_settings.get("kit_pricing_mode") if kit_pricing_mode and mapped_codmat_data: id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None kit_missing = await asyncio.to_thread( validation_service.validate_kit_component_prices, mapped_codmat_data, id_pol, id_pol_prod, conn ) if kit_missing: kit_skus_missing = set(kit_missing.keys()) for sku, missing_codmats in kit_missing.items(): _log_line(run_id, f"Kit {sku}: prețuri lipsă pentru {', '.join(missing_codmats)}") new_truly = [] for order in truly_importable: order_skus = {item.sku for item in order.items} if order_skus & kit_skus_missing: missing_list = list(order_skus & kit_skus_missing) skipped.append((order, missing_list)) else: new_truly.append(order) truly_importable = new_truly # Mode B config validation if kit_pricing_mode == "separate_line": if not app_settings.get("kit_discount_codmat"): _log_line(run_id, "EROARE: Kit mode 'separate_line' dar kit_discount_codmat nu e configurat!") finally: await asyncio.to_thread(database.pool.release, conn) # Step 3a: Record already-imported orders (batch) already_imported_count = len(already_in_roa) already_batch = [] for order in already_in_roa: shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order) id_comanda_roa = existing_map.get(order.number) order_items_data = [ {"sku": item.sku, "product_name": item.name, "quantity": item.quantity, "price": item.price, "vat": item.vat, "mapping_status": "mapped" if item.sku in validation["mapped"] else "direct", "codmat": None, "id_articol": None, "cantitate_roa": None} for item in order.items ] already_batch.append({ "sync_run_id": run_id, "order_number": order.number, "order_date": order.date, "customer_name": customer, "status": "ALREADY_IMPORTED", "status_at_run": "ALREADY_IMPORTED", "id_comanda": id_comanda_roa, "id_partener": None, "error_message": None, "missing_skus": None, "items_count": len(order.items), "shipping_name": shipping_name, "billing_name": billing_name, "payment_method": payment_method, "delivery_method": delivery_method, "order_total": order.total or None, "delivery_cost": order.delivery_cost or None, "discount_total": order.discount_total or None, "web_status": order.status or None, "items": order_items_data, }) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})") await sqlite_service.save_orders_batch(already_batch) # Step 3b: Record skipped orders + store items (batch) skipped_count = len(skipped) skipped_batch = [] for order, missing_skus in skipped: shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order) order_items_data = [ {"sku": item.sku, "product_name": item.name, "quantity": item.quantity, "price": item.price, "vat": item.vat, "mapping_status": "missing" if item.sku in validation["missing"] else "mapped" if item.sku in validation["mapped"] else "direct", "codmat": None, "id_articol": None, "cantitate_roa": None} for item in order.items ] skipped_batch.append({ "sync_run_id": run_id, "order_number": order.number, "order_date": order.date, "customer_name": customer, "status": "SKIPPED", "status_at_run": "SKIPPED", "id_comanda": None, "id_partener": None, "error_message": None, "missing_skus": missing_skus, "items_count": len(order.items), "shipping_name": shipping_name, "billing_name": billing_name, "payment_method": payment_method, "delivery_method": delivery_method, "order_total": order.total or None, "delivery_cost": order.delivery_cost or None, "discount_total": order.discount_total or None, "web_status": order.status or None, "items": order_items_data, }) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})") await sqlite_service.save_orders_batch(skipped_batch) # ── Price sync from orders ── if app_settings.get("price_sync_enabled") == "1": try: all_sync_orders = truly_importable + already_in_roa direct_id_map = validation.get("direct_id_map", {}) id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None price_updates = await asyncio.to_thread( validation_service.sync_prices_from_order, all_sync_orders, mapped_codmat_data, direct_id_map, codmat_policy_map, id_pol, id_pol_productie=id_pol_prod, settings=app_settings ) if price_updates: _log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate") for pu in price_updates: _log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f} → {pu['new_price']:.2f}") except Exception as e: _log_line(run_id, f"Eroare sync prețuri din comenzi: {e}") logger.error(f"Price sync error: {e}") _update_progress("skipped", f"Skipped {skipped_count}", 0, len(truly_importable), {"imported": 0, "skipped": skipped_count, "errors": 0, "already_imported": already_imported_count}) # Step 4: Import only truly new orders imported_count = 0 error_count = 0 for i, order in enumerate(truly_importable): shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order) _update_progress("import", f"Import {i+1}/{len(truly_importable)}: #{order.number} {customer}", i + 1, len(truly_importable), {"imported": imported_count, "skipped": len(skipped), "errors": error_count, "already_imported": already_imported_count}) result = await asyncio.to_thread( import_service.import_single_order, order, id_pol=id_pol, id_sectie=id_sectie, app_settings=app_settings, id_gestiuni=id_gestiuni ) # Build order items data for storage (R9) order_items_data = [] for item in order.items: ms = "mapped" if item.sku in validation["mapped"] else "direct" order_items_data.append({ "sku": item.sku, "product_name": item.name, "quantity": item.quantity, "price": item.price, "vat": item.vat, "mapping_status": ms, "codmat": None, "id_articol": None, "cantitate_roa": None }) # Compute discount split for SQLite storage ds = import_service.compute_discount_split(order, app_settings) discount_split_json = json.dumps(ds) if ds else None if result["success"]: imported_count += 1 await sqlite_service.upsert_order( sync_run_id=run_id, order_number=order.number, order_date=order.date, customer_name=customer, status="IMPORTED", id_comanda=result["id_comanda"], id_partener=result["id_partener"], items_count=len(order.items), shipping_name=shipping_name, billing_name=billing_name, payment_method=payment_method, delivery_method=delivery_method, order_total=order.total or None, delivery_cost=order.delivery_cost or None, discount_total=order.discount_total or None, web_status=order.status or None, discount_split=discount_split_json, ) await sqlite_service.add_sync_run_order(run_id, order.number, "IMPORTED") # Store ROA address IDs (R9) await sqlite_service.update_import_order_addresses( order.number, id_adresa_facturare=result.get("id_adresa_facturare"), id_adresa_livrare=result.get("id_adresa_livrare") ) await sqlite_service.add_order_items(order.number, order_items_data) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → IMPORTAT (ID: {result['id_comanda']})") else: error_count += 1 await sqlite_service.upsert_order( sync_run_id=run_id, order_number=order.number, order_date=order.date, customer_name=customer, status="ERROR", id_partener=result.get("id_partener"), error_message=result["error"], items_count=len(order.items), shipping_name=shipping_name, billing_name=billing_name, payment_method=payment_method, delivery_method=delivery_method, order_total=order.total or None, delivery_cost=order.delivery_cost or None, discount_total=order.discount_total or None, web_status=order.status or None, discount_split=discount_split_json, ) await sqlite_service.add_sync_run_order(run_id, order.number, "ERROR") await sqlite_service.add_order_items(order.number, order_items_data) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → EROARE: {result['error']}") # Safety: stop if too many errors if error_count > 10: logger.warning("Too many errors, stopping sync") break # Step 4b: Invoice & order status check — sync with Oracle _update_progress("invoices", "Checking invoices & order status...", 0, 0) invoices_updated = 0 invoices_cleared = 0 orders_deleted = 0 try: # 4b-1: Uninvoiced → check for new invoices uninvoiced = await sqlite_service.get_uninvoiced_imported_orders() if uninvoiced: id_comanda_list = [o["id_comanda"] for o in uninvoiced] invoice_data = await asyncio.to_thread( invoice_service.check_invoices_for_orders, id_comanda_list ) id_to_order = {o["id_comanda"]: o["order_number"] for o in uninvoiced} for idc, inv in invoice_data.items(): order_num = id_to_order.get(idc) if order_num and inv.get("facturat"): await sqlite_service.update_order_invoice( order_num, serie=inv.get("serie_act"), numar=str(inv.get("numar_act", "")), total_fara_tva=inv.get("total_fara_tva"), total_tva=inv.get("total_tva"), total_cu_tva=inv.get("total_cu_tva"), data_act=inv.get("data_act"), ) invoices_updated += 1 # 4b-2: Invoiced → check for deleted invoices invoiced = await sqlite_service.get_invoiced_imported_orders() if invoiced: id_comanda_list = [o["id_comanda"] for o in invoiced] invoice_data = await asyncio.to_thread( invoice_service.check_invoices_for_orders, id_comanda_list ) for o in invoiced: if o["id_comanda"] not in invoice_data: await sqlite_service.clear_order_invoice(o["order_number"]) invoices_cleared += 1 # 4b-3: All imported → check for deleted orders in ROA all_imported = await sqlite_service.get_all_imported_orders() if all_imported: id_comanda_list = [o["id_comanda"] for o in all_imported] existing_ids = await asyncio.to_thread( invoice_service.check_orders_exist, id_comanda_list ) for o in all_imported: if o["id_comanda"] not in existing_ids: await sqlite_service.mark_order_deleted_in_roa(o["order_number"]) orders_deleted += 1 if invoices_updated: _log_line(run_id, f"Facturi noi: {invoices_updated} comenzi facturate") if invoices_cleared: _log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache") if orders_deleted: _log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}") except Exception as e: logger.warning(f"Invoice/order status check failed: {e}") # Step 5: Update sync run total_imported = imported_count + already_imported_count # backward-compat status = "completed" if error_count <= 10 else "failed" await sqlite_service.update_sync_run( run_id, status, len(orders), total_imported, len(skipped), error_count, already_imported=already_imported_count, new_imported=imported_count ) summary = { "run_id": run_id, "status": status, "json_files": json_count, "total_orders": len(orders) + cancelled_count, "new_orders": len(truly_importable), "imported": total_imported, "new_imported": imported_count, "already_imported": already_imported_count, "skipped": len(skipped), "errors": error_count, "cancelled": cancelled_count, "missing_skus": len(validation["missing"]), "invoices_updated": invoices_updated, "invoices_cleared": invoices_cleared, "orders_deleted_in_roa": orders_deleted, } _update_progress("completed", f"Completed: {imported_count} new, {already_imported_count} already, {len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled", len(truly_importable), len(truly_importable), {"imported": imported_count, "skipped": len(skipped), "errors": error_count, "already_imported": already_imported_count, "cancelled": cancelled_count}) if _current_sync: _current_sync["status"] = status _current_sync["finished_at"] = _now().isoformat() logger.info( f"Sync {run_id} completed: {imported_count} new, {already_imported_count} already imported, " f"{len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled" ) duration = (_now() - started_dt).total_seconds() _log_line(run_id, "") cancelled_text = f", {cancelled_count} anulate" if cancelled_count else "" _run_logs[run_id].append( f"Finalizat: {imported_count} importate, {already_imported_count} deja importate, " f"{len(skipped)} nemapate, {error_count} erori{cancelled_text} din {len(orders) + cancelled_count} comenzi | Durata: {int(duration)}s" ) return summary except Exception as e: logger.error(f"Sync {run_id} failed: {e}") _log_line(run_id, f"EROARE FATALA: {e}") await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e)) if _current_sync: _current_sync["status"] = "failed" _current_sync["finished_at"] = _now().isoformat() _current_sync["error"] = str(e) return {"run_id": run_id, "status": "failed", "error": str(e)} finally: # Keep _current_sync for 10 seconds so status endpoint can show final result async def _clear_current_sync(): await asyncio.sleep(10) global _current_sync _current_sync = None asyncio.ensure_future(_clear_current_sync()) async def _clear_run_logs(): await asyncio.sleep(300) # 5 minutes _run_logs.pop(run_id, None) asyncio.ensure_future(_clear_run_logs()) def stop_sync(): """Signal sync to stop. Currently sync runs to completion.""" pass