import json import logging import logging.handlers import os from datetime import datetime from zoneinfo import ZoneInfo from ..database import get_sqlite, get_sqlite_sync from ..constants import OrderStatus # Re-export so other services can import get_sqlite from sqlite_service __all__ = ["get_sqlite", "get_sqlite_sync"] _tz_bucharest = ZoneInfo("Europe/Bucharest") def _now_str(): """Return current Bucharest time as ISO string.""" return datetime.now(_tz_bucharest).replace(tzinfo=None).isoformat() logger = logging.getLogger(__name__) # Dedicated append-only logger for per-order errors. # orders.error_message is overwritten when retry succeeds — this file # keeps the permanent audit trail. _error_history_logger: logging.Logger | None = None def _get_error_history_logger() -> logging.Logger: """Lazily-initialised logger writing to logs/sync_errors_history.log. Append-only. Rolls over at 100MB with 12 kept backups (~monthly cadence under prod load). """ global _error_history_logger if _error_history_logger is not None: return _error_history_logger lg = logging.getLogger("sync_errors_history") lg.setLevel(logging.INFO) lg.propagate = False # Find project root by walking up from this file here = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(here, "..", "..", "..")) logs_dir = os.path.join(project_root, "logs") os.makedirs(logs_dir, exist_ok=True) log_path = os.path.join(logs_dir, "sync_errors_history.log") if not any( isinstance(h, logging.handlers.RotatingFileHandler) and getattr(h, "baseFilename", "") == log_path for h in lg.handlers ): handler = logging.handlers.RotatingFileHandler( log_path, maxBytes=100 * 1024 * 1024, backupCount=12, encoding="utf-8" ) handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) lg.addHandler(handler) _error_history_logger = lg return lg def _log_order_error_history(order_number: str, error_msg: str) -> None: """Append an order-level failure line to the permanent error history log. Called from save_orders_batch + add_order_items on MALFORMED fallback, so the evidence survives later retry-success overwrites of orders.error_message. """ try: _get_error_history_logger().warning(f"ORDER_FAIL {order_number}: {error_msg}") except Exception as e: logger.warning(f"_log_order_error_history failed for {order_number}: {e}") async def create_sync_run(run_id: str, json_files: int = 0): """Create a new sync run record.""" db = await get_sqlite() try: await db.execute(""" INSERT INTO sync_runs (run_id, started_at, status, json_files) VALUES (?, ?, 'running', ?) """, (run_id, _now_str(), json_files)) await db.commit() finally: await db.close() async def update_sync_run(run_id: str, status: str, total_orders: int = 0, imported: int = 0, skipped: int = 0, errors: int = 0, error_message: str = None, already_imported: int = 0, new_imported: int = 0): """Update sync run with results.""" db = await get_sqlite() try: await db.execute(""" UPDATE sync_runs SET finished_at = ?, status = ?, total_orders = ?, imported = ?, skipped = ?, errors = ?, error_message = ?, already_imported = ?, new_imported = ? WHERE run_id = ? """, (_now_str(), status, total_orders, imported, skipped, errors, error_message, already_imported, new_imported, run_id)) await db.commit() finally: await db.close() async def upsert_order(sync_run_id: str, order_number: str, order_date: str, customer_name: str, status: str, id_comanda: int = None, id_partener: int = None, error_message: str = None, missing_skus: list = None, items_count: int = 0, shipping_name: str = None, billing_name: str = None, payment_method: str = None, delivery_method: str = None, order_total: float = None, delivery_cost: float = None, discount_total: float = None, web_status: str = None, discount_split: str = None): """Upsert a single order — one row per order_number, status updated in place.""" db = await get_sqlite() try: await db.execute(f""" INSERT INTO orders (order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, missing_skus, items_count, last_sync_run_id, shipping_name, billing_name, payment_method, delivery_method, order_total, delivery_cost, discount_total, web_status, discount_split) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(order_number) DO UPDATE SET customer_name = excluded.customer_name, status = CASE WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}' THEN orders.status ELSE excluded.status END, error_message = excluded.error_message, missing_skus = excluded.missing_skus, items_count = excluded.items_count, id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda), id_partener = COALESCE(excluded.id_partener, orders.id_partener), times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}' THEN orders.times_skipped + 1 ELSE orders.times_skipped END, last_sync_run_id = excluded.last_sync_run_id, shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name), billing_name = COALESCE(excluded.billing_name, orders.billing_name), payment_method = COALESCE(excluded.payment_method, orders.payment_method), delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method), order_total = COALESCE(excluded.order_total, orders.order_total), delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost), discount_total = COALESCE(excluded.discount_total, orders.discount_total), web_status = COALESCE(excluded.web_status, orders.web_status), discount_split = COALESCE(excluded.discount_split, orders.discount_split), updated_at = datetime('now') """, (order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, json.dumps(missing_skus) if missing_skus else None, items_count, sync_run_id, shipping_name, billing_name, payment_method, delivery_method, order_total, delivery_cost, discount_total, web_status, discount_split)) await db.commit() finally: await db.close() async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str): """Record that this run processed this order (junction table).""" db = await get_sqlite() try: await db.execute(""" INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?) """, (sync_run_id, order_number, status_at_run)) await db.commit() finally: await db.close() # SQL for the orders upsert — reused by batch + single-order fallback paths. _ORDERS_UPSERT_SQL = f""" INSERT INTO orders (order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, missing_skus, items_count, last_sync_run_id, shipping_name, billing_name, payment_method, delivery_method, order_total, delivery_cost, discount_total, web_status, discount_split) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(order_number) DO UPDATE SET customer_name = excluded.customer_name, status = CASE WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}' THEN orders.status ELSE excluded.status END, error_message = excluded.error_message, missing_skus = excluded.missing_skus, items_count = excluded.items_count, id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda), id_partener = COALESCE(excluded.id_partener, orders.id_partener), times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}' THEN orders.times_skipped + 1 ELSE orders.times_skipped END, last_sync_run_id = excluded.last_sync_run_id, shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name), billing_name = COALESCE(excluded.billing_name, orders.billing_name), payment_method = COALESCE(excluded.payment_method, orders.payment_method), delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method), order_total = COALESCE(excluded.order_total, orders.order_total), delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost), discount_total = COALESCE(excluded.discount_total, orders.discount_total), web_status = COALESCE(excluded.web_status, orders.web_status), discount_split = COALESCE(excluded.discount_split, orders.discount_split), updated_at = datetime('now') """ def _orders_row(d: dict) -> tuple: return ( d["order_number"], d["order_date"], d["customer_name"], d["status"], d.get("id_comanda"), d.get("id_partener"), d.get("error_message"), json.dumps(d["missing_skus"]) if d.get("missing_skus") else None, d.get("items_count", 0), d["sync_run_id"], d.get("shipping_name"), d.get("billing_name"), d.get("payment_method"), d.get("delivery_method"), d.get("order_total"), d.get("delivery_cost"), d.get("discount_total"), d.get("web_status"), d.get("discount_split"), ) def _mark_malformed(d: dict, reason: str) -> dict: """Return a copy of d with status=MALFORMED, error_message set, items wiped. Does NOT mutate caller's dict. """ out = dict(d) out["status"] = OrderStatus.MALFORMED.value out["error_message"] = reason out["items"] = [] out["items_count"] = 0 out["missing_skus"] = None return out async def _insert_orders_only(db, orders: list[dict]): """Upsert only the `orders` + `sync_run_orders` rows (no items). Used for MALFORMED fallback where we can't trust item data. """ if not orders: return await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders]) await db.executemany( "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", [(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) for d in orders], ) async def _insert_valid_batch(db, orders: list[dict]): """Happy-path batch insert: orders + sync_run_orders + order_items in bulk. Caller wraps in a SAVEPOINT so a mid-batch failure rolls back to a clean point and the per-order fallback can take over. """ if not orders: return await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders]) await db.executemany( "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders], ) all_items: list[tuple] = [] order_numbers_with_items: set = set() for d in orders: raw_items = d.get("items", []) if not raw_items: continue order_numbers_with_items.add(d["order_number"]) for item in _dedup_items_by_sku(raw_items): all_items.append(( d["order_number"], item.get("sku"), item.get("product_name"), item.get("quantity"), item.get("price"), item.get("baseprice"), item.get("vat"), item.get("mapping_status"), item.get("codmat"), item.get("id_articol"), item.get("cantitate_roa"), )) if all_items: placeholders = ",".join("?" * len(order_numbers_with_items)) await db.execute( f"DELETE FROM order_items WHERE order_number IN ({placeholders})", tuple(order_numbers_with_items), ) await db.executemany(""" INSERT INTO order_items (order_number, sku, product_name, quantity, price, baseprice, vat, mapping_status, codmat, id_articol, cantitate_roa) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, all_items) async def _insert_single_order(db, d: dict): """Insert one order + its sync_run_orders row + its items. Caller wraps in SAVEPOINT so a per-row failure doesn't poison the batch. """ await db.execute(_ORDERS_UPSERT_SQL, _orders_row(d)) await db.execute( "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", (d["sync_run_id"], d["order_number"], d["status_at_run"]), ) raw_items = d.get("items", []) if raw_items: await db.execute("DELETE FROM order_items WHERE order_number = ?", (d["order_number"],)) await db.executemany(""" INSERT INTO order_items (order_number, sku, product_name, quantity, price, baseprice, vat, mapping_status, codmat, id_articol, cantitate_roa) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ (d["order_number"], item.get("sku"), item.get("product_name"), item.get("quantity"), item.get("price"), item.get("baseprice"), item.get("vat"), item.get("mapping_status"), item.get("codmat"), item.get("id_articol"), item.get("cantitate_roa")) for item in _dedup_items_by_sku(raw_items) ]) async def save_orders_batch(orders_data: list[dict]): """Batch save orders + sync_run_orders + order_items with per-order isolation. Three-tier strategy: 1. Pre-validate with validate_structural — malformed rows persist as MALFORMED + error_message, no items, and do not participate in the valid batch. 2. Optimistic executemany for the remaining valid rows (happy path). 3. On IntegrityError / ValueError / TypeError during the batch, fall back to per-order SAVEPOINTs so a single bad row doesn't block the rest. Per-order failures are marked MALFORMED + logged to the permanent error-history file. Re-raises OperationalError / OSError / ConnectionError / MemoryError at the top level — the scheduler interprets these as halt signals. A mid-loop ROLLBACK failure triggers a commit-before-reconnect so any successfully-inserted work (including MALFORMED entries) persists. """ if not orders_data: return import sqlite3 # Structural pre-flight — local import avoids the services package cycle from .validation_service import validate_structural valid: list[dict] = [] malformed: list[dict] = [] for d in orders_data: ok, err_type, err_msg = validate_structural(d) if ok: valid.append(d) else: fixed = _mark_malformed(d, f"{err_type}: {err_msg}") malformed.append(fixed) _log_order_error_history(fixed["order_number"], fixed["error_message"]) db = await get_sqlite() try: if malformed: await _insert_orders_only(db, malformed) if valid: # Savepoint around the batch — lets us roll back to a clean point # and take the per-order path without losing the MALFORMED rows # inserted above. await db.execute("SAVEPOINT batch") try: await _insert_valid_batch(db, valid) await db.execute("RELEASE SAVEPOINT batch") except (sqlite3.IntegrityError, ValueError, TypeError) as batch_err: logger.warning(f"save_orders_batch: batch insert failed, falling back per-order: {batch_err}") try: await db.execute("ROLLBACK TO SAVEPOINT batch") await db.execute("RELEASE SAVEPOINT batch") except sqlite3.OperationalError: # Savepoint rollback itself failed — commit whatever survived, # reconnect, continue from scratch on the valid list. db = await _safe_reconnect(db) for d in valid: try: await db.execute("SAVEPOINT ord") await _insert_single_order(db, d) await db.execute("RELEASE SAVEPOINT ord") except (sqlite3.IntegrityError, ValueError, TypeError) as per_err: reason = f"RUNTIME: {type(per_err).__name__}: {per_err}" try: await db.execute("ROLLBACK TO SAVEPOINT ord") await db.execute("RELEASE SAVEPOINT ord") await _insert_orders_only(db, [_mark_malformed(d, reason)]) _log_order_error_history(d["order_number"], reason) except sqlite3.OperationalError: reason = f"RUNTIME (post-reconnect): {type(per_err).__name__}: {per_err}" db = await _safe_reconnect(db) await _insert_orders_only(db, [_mark_malformed(d, reason)]) _log_order_error_history(d["order_number"], reason) await db.commit() finally: try: await db.close() except Exception: pass async def _safe_reconnect(db): """Commit whatever survived, close the broken connection, open a fresh one. Called when a SAVEPOINT rollback itself raises OperationalError — the connection is in a bad state and further work on it will fail. Commit preserves the MALFORMED rows inserted before the explosion. """ try: await db.commit() except Exception: pass try: await db.close() except Exception: pass return await get_sqlite() async def track_missing_sku(sku: str, product_name: str = "", order_count: int = 0, order_numbers: str = None, customers: str = None): """Track a missing SKU with order context.""" db = await get_sqlite() try: await db.execute(""" INSERT OR IGNORE INTO missing_skus (sku, product_name) VALUES (?, ?) """, (sku, product_name)) if order_count or order_numbers or customers: await db.execute(""" UPDATE missing_skus SET order_count = ?, order_numbers = ?, customers = ? WHERE sku = ? """, (order_count, order_numbers, customers, sku)) await db.commit() finally: await db.close() async def resolve_missing_skus_batch(skus: set): """Mark multiple missing SKUs as resolved (they now have mappings).""" if not skus: return 0 db = await get_sqlite() try: placeholders = ",".join("?" for _ in skus) cursor = await db.execute(f""" UPDATE missing_skus SET resolved = 1, resolved_at = datetime('now') WHERE sku IN ({placeholders}) AND resolved = 0 """, list(skus)) await db.commit() return cursor.rowcount finally: await db.close() async def resolve_missing_sku(sku: str): """Mark a missing SKU as resolved.""" db = await get_sqlite() try: await db.execute(""" UPDATE missing_skus SET resolved = 1, resolved_at = datetime('now') WHERE sku = ? """, (sku,)) await db.commit() finally: await db.close() async def get_missing_skus_paginated(page: int = 1, per_page: int = 20, resolved: int = 0, search: str = None): """Get paginated missing SKUs. resolved=-1 means show all. Optional search filters by sku or product_name (LIKE).""" db = await get_sqlite() try: offset = (page - 1) * per_page # Build WHERE clause parts where_parts = [] params_count = [] params_data = [] if resolved != -1: where_parts.append("resolved = ?") params_count.append(resolved) params_data.append(resolved) if search: like = f"%{search}%" where_parts.append("(LOWER(sku) LIKE LOWER(?) OR LOWER(COALESCE(product_name,'')) LIKE LOWER(?))") params_count.extend([like, like]) params_data.extend([like, like]) where_clause = ("WHERE " + " AND ".join(where_parts)) if where_parts else "" order_clause = ( "ORDER BY resolved ASC, order_count DESC, first_seen DESC" if resolved == -1 else "ORDER BY order_count DESC, first_seen DESC" ) cursor = await db.execute( f"SELECT COUNT(*) FROM missing_skus {where_clause}", params_count ) total = (await cursor.fetchone())[0] cursor = await db.execute(f""" SELECT sku, product_name, first_seen, resolved, resolved_at, order_count, order_numbers, customers FROM missing_skus {where_clause} {order_clause} LIMIT ? OFFSET ? """, params_data + [per_page, offset]) rows = await cursor.fetchall() return { "missing_skus": [dict(row) for row in rows], "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page if total > 0 else 0 } finally: await db.close() async def get_sync_runs(page: int = 1, per_page: int = 20): """Get paginated sync run history.""" db = await get_sqlite() try: offset = (page - 1) * per_page cursor = await db.execute("SELECT COUNT(*) FROM sync_runs") total = (await cursor.fetchone())[0] cursor = await db.execute(""" SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT ? OFFSET ? """, (per_page, offset)) rows = await cursor.fetchall() return { "runs": [dict(row) for row in rows], "total": total, "page": page, "pages": (total + per_page - 1) // per_page if total > 0 else 0 } finally: await db.close() async def get_sync_run_detail(run_id: str): """Get details for a specific sync run including its orders via sync_run_orders.""" db = await get_sqlite() try: cursor = await db.execute( "SELECT * FROM sync_runs WHERE run_id = ?", (run_id,) ) run = await cursor.fetchone() if not run: return None cursor = await db.execute(""" SELECT o.* FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number WHERE sro.sync_run_id = ? ORDER BY o.order_date """, (run_id,)) orders = await cursor.fetchall() return { "run": dict(run), "orders": [dict(o) for o in orders] } finally: await db.close() async def get_dashboard_stats(): """Get stats for the dashboard.""" db = await get_sqlite() try: cursor = await db.execute( f"SELECT COUNT(*) FROM orders WHERE status = '{OrderStatus.IMPORTED.value}'" ) imported = (await cursor.fetchone())[0] cursor = await db.execute( f"SELECT COUNT(*) FROM orders WHERE status = '{OrderStatus.SKIPPED.value}'" ) skipped = (await cursor.fetchone())[0] cursor = await db.execute( f"SELECT COUNT(*) FROM orders WHERE status = '{OrderStatus.ERROR.value}'" ) errors = (await cursor.fetchone())[0] cursor = await db.execute( "SELECT COUNT(*) FROM missing_skus WHERE resolved = 0" ) missing = (await cursor.fetchone())[0] cursor = await db.execute("SELECT COUNT(DISTINCT sku) FROM missing_skus") total_missing_skus = (await cursor.fetchone())[0] cursor = await db.execute( "SELECT COUNT(DISTINCT sku) FROM missing_skus WHERE resolved = 0" ) unresolved_skus = (await cursor.fetchone())[0] cursor = await db.execute(""" SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT 1 """) last_run = await cursor.fetchone() return { "imported": imported, "skipped": skipped, "errors": errors, "missing_skus": missing, "total_tracked_skus": total_missing_skus, "unresolved_skus": unresolved_skus, "last_run": dict(last_run) if last_run else None } finally: await db.close() async def get_scheduler_config(): """Get scheduler configuration from SQLite.""" db = await get_sqlite() try: cursor = await db.execute("SELECT key, value FROM scheduler_config") rows = await cursor.fetchall() return {row["key"]: row["value"] for row in rows} finally: await db.close() async def set_scheduler_config(key: str, value: str): """Set a scheduler configuration value.""" db = await get_sqlite() try: await db.execute(""" INSERT OR REPLACE INTO scheduler_config (key, value) VALUES (?, ?) """, (key, value)) await db.commit() finally: await db.close() # ── web_products ───────────────────────────────── async def upsert_web_product(sku: str, product_name: str): """Insert or update a web product, incrementing order_count.""" db = await get_sqlite() try: await db.execute(""" INSERT INTO web_products (sku, product_name, order_count) VALUES (?, ?, 1) ON CONFLICT(sku) DO UPDATE SET product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name), last_seen = datetime('now'), order_count = web_products.order_count + 1 """, (sku, product_name)) await db.commit() finally: await db.close() async def upsert_web_products_batch(items: list[tuple[str, str]]): """Batch upsert web products in a single transaction. items: list of (sku, product_name).""" if not items: return db = await get_sqlite() try: await db.executemany(""" INSERT INTO web_products (sku, product_name, order_count) VALUES (?, ?, 1) ON CONFLICT(sku) DO UPDATE SET product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name), last_seen = datetime('now'), order_count = web_products.order_count + 1 """, items) await db.commit() finally: await db.close() async def get_web_product_name(sku: str) -> str: """Lookup product name by SKU.""" db = await get_sqlite() try: cursor = await db.execute( "SELECT product_name FROM web_products WHERE sku = ?", (sku,) ) row = await cursor.fetchone() return row["product_name"] if row else "" finally: await db.close() async def get_web_products_batch(skus: list) -> dict: """Batch lookup product names by SKU list. Returns {sku: product_name}.""" if not skus: return {} db = await get_sqlite() try: placeholders = ",".join("?" for _ in skus) cursor = await db.execute( f"SELECT sku, product_name FROM web_products WHERE sku IN ({placeholders})", list(skus) ) rows = await cursor.fetchall() return {row["sku"]: row["product_name"] for row in rows} finally: await db.close() # ── order_items ────────────────────────────────── def _dedup_items_by_sku(items: list) -> list: """Deduplicate items by SKU within a single order. Sums quantities on collision. GoMag occasionally returns the same SKU on multiple lines (configurable products, promo splits). The order_items primary key is (order_number, sku) so the raw rows would violate UNIQUE. Keeps first price/vat/name; sums quantity + baseprice*qty. """ if not items: return items merged: dict = {} order: list = [] for item in items: sku = item.get("sku") if sku is None: order.append(item) continue if sku in merged: prev = merged[sku] prev["quantity"] = (prev.get("quantity") or 0) + (item.get("quantity") or 0) else: merged[sku] = dict(item) order.append(merged[sku]) return order async def _safe_upsert_order_items(db, order_number: str, items: list): """Replace order_items for one order inside a SAVEPOINT. On IntegrityError / ValueError / TypeError: rolls back the savepoint, tags the parent order MALFORMED, records the failure in the history log, and returns False. On success returns True. Caller is responsible for the outer connection lifecycle. """ import sqlite3 items = _dedup_items_by_sku(items) if items else [] await db.execute("SAVEPOINT items") try: await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,)) if items: await db.executemany(""" INSERT INTO order_items (order_number, sku, product_name, quantity, price, baseprice, vat, mapping_status, codmat, id_articol, cantitate_roa) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ (order_number, item.get("sku"), item.get("product_name"), item.get("quantity"), item.get("price"), item.get("baseprice"), item.get("vat"), item.get("mapping_status"), item.get("codmat"), item.get("id_articol"), item.get("cantitate_roa")) for item in items ]) await db.execute("RELEASE SAVEPOINT items") return True except (sqlite3.IntegrityError, ValueError, TypeError) as err: reason = f"ITEMS_FAIL: {type(err).__name__}: {err}" try: await db.execute("ROLLBACK TO SAVEPOINT items") await db.execute("RELEASE SAVEPOINT items") except sqlite3.OperationalError: # Outer caller will handle reconnect — just log and bail. logger.exception(f"_safe_upsert_order_items: rollback failed for {order_number}") raise # Tag parent order as MALFORMED without removing it from sync state. await db.execute( "UPDATE orders SET status = ?, error_message = ?, updated_at = datetime('now') WHERE order_number = ?", (OrderStatus.MALFORMED.value, reason, order_number), ) _log_order_error_history(order_number, reason) return False async def add_order_items(order_number: str, items: list): """Replace order items — delete any existing rows, then insert fresh batch. GoMag is source of truth: re-import must reflect quantity changes. Wrapped in _safe_upsert_order_items so a bad payload marks the parent order MALFORMED rather than exploding the sync. """ if not items: return db = await get_sqlite() try: await _safe_upsert_order_items(db, order_number, items) await db.commit() finally: await db.close() # ── sync phase failure tracking ─────────────────── async def record_phase_failure(run_id: str, phase: str, error_summary: str) -> None: """Insert a phase-failure marker and prune to the last 100 sync runs. `error_summary` must be short (error_type + message) — no raw payload, no PII. Used by _phase_wrap in sync_service to surface repeat failures to the escalation check and the /api/sync/health dashboard pill. """ db = await get_sqlite() try: await db.execute( """INSERT OR REPLACE INTO sync_phase_failures (run_id, phase, error_summary) VALUES (?, ?, ?)""", (run_id, phase, error_summary[:500] if error_summary else None), ) await db.execute(""" DELETE FROM sync_phase_failures WHERE run_id NOT IN ( SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT 100 ) """) await db.commit() finally: await db.close() async def get_recent_phase_failures(limit: int = 3) -> dict[str, int]: """Return a {phase: failure_count} map across the last N sync runs. Used by the escalation check (>=3 consecutive failures on the same phase halts the next sync) and by /api/sync/health for the dashboard pill. """ db = await get_sqlite() try: cursor = await db.execute( """ SELECT phase, COUNT(*) AS cnt FROM sync_phase_failures WHERE run_id IN ( SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT ? ) GROUP BY phase """, (limit,), ) rows = await cursor.fetchall() return {row[0]: row[1] for row in rows} finally: await db.close() async def get_order_items(order_number: str) -> list: """Fetch items for one order.""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT * FROM order_items WHERE order_number = ? ORDER BY sku """, (order_number,)) rows = await cursor.fetchall() return [dict(row) for row in rows] finally: await db.close() async def get_order_detail(order_number: str) -> dict: """Get full order detail: order metadata + items.""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT * FROM orders WHERE order_number = ? """, (order_number,)) order = await cursor.fetchone() if not order: return None cursor = await db.execute(""" SELECT * FROM order_items WHERE order_number = ? ORDER BY sku """, (order_number,)) items = await cursor.fetchall() return { "order": dict(order), "items": [dict(i) for i in items] } finally: await db.close() async def get_run_orders_filtered(run_id: str, status_filter: str = "all", page: int = 1, per_page: int = 50, sort_by: str = "order_date", sort_dir: str = "asc"): """Get paginated orders for a run via sync_run_orders junction table.""" db = await get_sqlite() try: where = "WHERE sro.sync_run_id = ?" params = [run_id] if status_filter and status_filter != "all": where += " AND UPPER(sro.status_at_run) = ?" params.append(status_filter.upper()) allowed_sort = {"order_date", "order_number", "customer_name", "items_count", "status", "first_seen_at", "updated_at"} if sort_by not in allowed_sort: sort_by = "order_date" if sort_dir.lower() not in ("asc", "desc"): sort_dir = "asc" cursor = await db.execute( f"SELECT COUNT(*) FROM orders o INNER JOIN sync_run_orders sro " f"ON sro.order_number = o.order_number {where}", params ) total = (await cursor.fetchone())[0] offset = (page - 1) * per_page cursor = await db.execute(f""" SELECT o.*, sro.status_at_run AS run_status FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number {where} ORDER BY o.{sort_by} {sort_dir} LIMIT ? OFFSET ? """, params + [per_page, offset]) rows = await cursor.fetchall() cursor = await db.execute(""" SELECT sro.status_at_run AS status, COUNT(*) as cnt FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number WHERE sro.sync_run_id = ? GROUP BY sro.status_at_run """, (run_id,)) status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()} # Use run_status (status_at_run) as the status field for each order row order_rows = [] for r in rows: d = dict(r) d["status"] = d.pop("run_status", d.get("status")) order_rows.append(d) return { "orders": order_rows, "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page if total > 0 else 0, "counts": { "imported": status_counts.get(OrderStatus.IMPORTED.value, 0), "skipped": status_counts.get(OrderStatus.SKIPPED.value, 0), "error": status_counts.get(OrderStatus.ERROR.value, 0), "already_imported": status_counts.get(OrderStatus.ALREADY_IMPORTED.value, 0), "cancelled": status_counts.get(OrderStatus.CANCELLED.value, 0), "malformed": status_counts.get(OrderStatus.MALFORMED.value, 0), "total": sum(status_counts.values()) } } finally: await db.close() async def get_orders(page: int = 1, per_page: int = 50, search: str = "", status_filter: str = "all", sort_by: str = "order_date", sort_dir: str = "desc", period_days: int = 7, period_start: str = "", period_end: str = ""): """Get orders with filters, sorting, and period. period_days=0 with period_start/period_end uses custom date range. period_days=0 without dates means all time. """ db = await get_sqlite() try: # Period + search clauses (used for counts — never include status filter) base_clauses = [] base_params = [] if period_days and period_days > 0: base_clauses.append("order_date >= date('now', ?)") base_params.append(f"-{period_days} days") elif period_days == 0 and period_start and period_end: base_clauses.append("order_date BETWEEN ? AND ?") base_params.extend([period_start, period_end]) if search: base_clauses.append("(order_number LIKE ? OR customer_name LIKE ?)") base_params.extend([f"%{search}%", f"%{search}%"]) # Data query adds status filter on top of base filters data_clauses = list(base_clauses) data_params = list(base_params) if status_filter and status_filter not in ("all", "UNINVOICED"): if status_filter.upper() == OrderStatus.IMPORTED.value: data_clauses.append(f"UPPER(status) IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')") elif status_filter.upper() == "DIFFS": data_clauses.append( "(anaf_cod_fiscal_adjusted = 1 OR anaf_denumire_mismatch = 1" " OR partner_mismatch = 1" " OR (cod_fiscal_gomag IS NOT NULL AND cod_fiscal_gomag != '' AND anaf_platitor_tva IS NOT NULL" " AND anaf_cod_fiscal_adjusted != 1" " AND ((UPPER(cod_fiscal_gomag) LIKE 'RO%' AND anaf_platitor_tva = 0)" " OR (UPPER(cod_fiscal_gomag) NOT LIKE 'RO%' AND anaf_platitor_tva = 1))))" ) else: data_clauses.append("UPPER(status) = ?") data_params.append(status_filter.upper()) where = ("WHERE " + " AND ".join(data_clauses)) if data_clauses else "" counts_where = ("WHERE " + " AND ".join(base_clauses)) if base_clauses else "" allowed_sort = {"order_date", "order_number", "customer_name", "items_count", "status", "first_seen_at", "updated_at"} if sort_by not in allowed_sort: sort_by = "order_date" if sort_dir.lower() not in ("asc", "desc"): sort_dir = "desc" cursor = await db.execute(f"SELECT COUNT(*) FROM orders {where}", data_params) total = (await cursor.fetchone())[0] offset = (page - 1) * per_page cursor = await db.execute(f""" SELECT * FROM orders {where} ORDER BY {sort_by} {sort_dir} LIMIT ? OFFSET ? """, data_params + [per_page, offset]) rows = await cursor.fetchall() # Counts by status — always on full period+search, never filtered by status cursor = await db.execute(f""" SELECT status, COUNT(*) as cnt FROM orders {counts_where} GROUP BY status """, base_params) status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()} # Uninvoiced count: IMPORTED/ALREADY_IMPORTED with no cached invoice, same period+search uninv_clauses = list(base_clauses) + [ f"UPPER(status) IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')", "(factura_numar IS NULL OR factura_numar = '')", ] uninv_where = "WHERE " + " AND ".join(uninv_clauses) cursor = await db.execute(f"SELECT COUNT(*) FROM orders {uninv_where}", base_params) uninvoiced_sqlite = (await cursor.fetchone())[0] # Uninvoiced > 3 days old uninv_old_clauses = list(base_clauses) + [ f"UPPER(status) IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')", "(factura_numar IS NULL OR factura_numar = '')", "order_date < datetime('now', '-3 days')", ] uninv_old_where = "WHERE " + " AND ".join(uninv_old_clauses) cursor = await db.execute(f"SELECT COUNT(*) FROM orders {uninv_old_where}", base_params) uninvoiced_old = (await cursor.fetchone())[0] # Diffs count: orders with ANAF adjustments, TVA mismatch, or partner mismatch diffs_clauses = list(base_clauses) + [ "(anaf_cod_fiscal_adjusted = 1 OR anaf_denumire_mismatch = 1" " OR partner_mismatch = 1" " OR (cod_fiscal_gomag IS NOT NULL AND cod_fiscal_gomag != '' AND anaf_platitor_tva IS NOT NULL" " AND anaf_cod_fiscal_adjusted != 1" " AND ((UPPER(cod_fiscal_gomag) LIKE 'RO%' AND anaf_platitor_tva = 0)" " OR (UPPER(cod_fiscal_gomag) NOT LIKE 'RO%' AND anaf_platitor_tva = 1))))" ] diffs_where = "WHERE " + " AND ".join(diffs_clauses) cursor = await db.execute(f"SELECT COUNT(*) FROM orders {diffs_where}", base_params) diffs_count = (await cursor.fetchone())[0] # Partner mismatches count pm_clauses = list(base_clauses) + ["partner_mismatch = 1"] pm_where = "WHERE " + " AND ".join(pm_clauses) cursor = await db.execute(f"SELECT COUNT(*) FROM orders {pm_where}", base_params) partner_mismatches_count = (await cursor.fetchone())[0] return { "orders": [dict(r) for r in rows], "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page if total > 0 else 0, "counts": { "imported": status_counts.get(OrderStatus.IMPORTED.value, 0), "already_imported": status_counts.get(OrderStatus.ALREADY_IMPORTED.value, 0), "imported_all": status_counts.get(OrderStatus.IMPORTED.value, 0) + status_counts.get(OrderStatus.ALREADY_IMPORTED.value, 0), "skipped": status_counts.get(OrderStatus.SKIPPED.value, 0), "error": status_counts.get(OrderStatus.ERROR.value, 0), "cancelled": status_counts.get(OrderStatus.CANCELLED.value, 0), "malformed": status_counts.get(OrderStatus.MALFORMED.value, 0), "total": sum(status_counts.values()), "uninvoiced_sqlite": uninvoiced_sqlite, "uninvoiced_old": uninvoiced_old, "diffs": diffs_count, "partner_mismatches": partner_mismatches_count, } } finally: await db.close() async def update_import_order_addresses(order_number: str, id_adresa_facturare: int = None, id_adresa_livrare: int = None): """Update ROA address IDs on an order record.""" db = await get_sqlite() try: await db.execute(""" UPDATE orders SET id_adresa_facturare = ?, id_adresa_livrare = ?, updated_at = datetime('now') WHERE order_number = ? """, (id_adresa_facturare, id_adresa_livrare, order_number)) await db.commit() finally: await db.close() # ── Invoice cache ──────────────────────────────── async def get_uninvoiced_imported_orders() -> list: """Get all imported orders that don't yet have invoice data cached.""" db = await get_sqlite() try: cursor = await db.execute(f""" SELECT order_number, id_comanda FROM orders WHERE status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}') AND id_comanda IS NOT NULL AND factura_numar IS NULL """) rows = await cursor.fetchall() return [dict(r) for r in rows] finally: await db.close() async def update_order_invoice(order_number: str, serie: str = None, numar: str = None, total_fara_tva: float = None, total_tva: float = None, total_cu_tva: float = None, data_act: str = None): """Cache invoice data from Oracle onto the order record.""" db = await get_sqlite() try: await db.execute(""" UPDATE orders SET factura_serie = ?, factura_numar = ?, factura_total_fara_tva = ?, factura_total_tva = ?, factura_total_cu_tva = ?, factura_data = ?, invoice_checked_at = datetime('now'), updated_at = datetime('now') WHERE order_number = ? """, (serie, numar, total_fara_tva, total_tva, total_cu_tva, data_act, order_number)) await db.commit() finally: await db.close() async def update_order_price_match(order_number: str, match: bool | None): """Cache price_match result (True=OK, False=mismatch, None=unavailable).""" db = await get_sqlite() try: val = None if match is None else (1 if match else 0) await db.execute( "UPDATE orders SET price_match = ?, updated_at = datetime('now') WHERE order_number = ?", (val, order_number), ) await db.commit() finally: await db.close() async def get_invoiced_imported_orders() -> list: """Get imported orders that HAVE cached invoice data (for re-verification).""" db = await get_sqlite() try: cursor = await db.execute(f""" SELECT order_number, id_comanda FROM orders WHERE status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}') AND id_comanda IS NOT NULL AND factura_numar IS NOT NULL AND factura_numar != '' """) rows = await cursor.fetchall() return [dict(r) for r in rows] finally: await db.close() async def get_all_imported_orders() -> list: """Get ALL imported orders with id_comanda (for checking if deleted in ROA).""" db = await get_sqlite() try: cursor = await db.execute(f""" SELECT order_number, id_comanda FROM orders WHERE status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}') AND id_comanda IS NOT NULL """) rows = await cursor.fetchall() return [dict(r) for r in rows] finally: await db.close() async def get_deleted_in_roa_order_numbers() -> set[str]: """Return set of order_numbers marked DELETED_IN_ROA (sticky-excluded from auto-sync).""" db = await get_sqlite() try: cursor = await db.execute( f"SELECT order_number FROM orders WHERE status = '{OrderStatus.DELETED_IN_ROA.value}'" ) rows = await cursor.fetchall() return {r[0] for r in rows} finally: await db.close() async def clear_order_invoice(order_number: str): """Clear cached invoice data when invoice was deleted in ROA.""" db = await get_sqlite() try: await db.execute(""" UPDATE orders SET factura_serie = NULL, factura_numar = NULL, factura_total_fara_tva = NULL, factura_total_tva = NULL, factura_total_cu_tva = NULL, factura_data = NULL, invoice_checked_at = datetime('now'), updated_at = datetime('now') WHERE order_number = ? """, (order_number,)) await db.commit() finally: await db.close() async def mark_order_deleted_in_roa(order_number: str): """Mark an order as deleted in ROA — clears id_comanda + invoice cache. order_items are preserved so the detail view can still show what was originally ordered. On 'Reimporta', add_order_items replaces them. """ db = await get_sqlite() try: await db.execute(f""" UPDATE orders SET status = '{OrderStatus.DELETED_IN_ROA.value}', id_comanda = NULL, id_partener = NULL, factura_serie = NULL, factura_numar = NULL, factura_total_fara_tva = NULL, factura_total_tva = NULL, factura_total_cu_tva = NULL, factura_data = NULL, invoice_checked_at = NULL, error_message = 'Comanda stearsa din ROA', updated_at = datetime('now') WHERE order_number = ? """, (order_number,)) await db.commit() finally: await db.close() async def mark_order_cancelled(order_number: str, web_status: str = "Anulata"): """Mark an order as cancelled from GoMag. Clears id_comanda and invoice cache.""" db = await get_sqlite() try: await db.execute(f""" UPDATE orders SET status = '{OrderStatus.CANCELLED.value}', id_comanda = NULL, id_partener = NULL, factura_serie = NULL, factura_numar = NULL, factura_total_fara_tva = NULL, factura_total_tva = NULL, factura_total_cu_tva = NULL, factura_data = NULL, invoice_checked_at = NULL, web_status = ?, error_message = 'Comanda anulata in GoMag', updated_at = datetime('now') WHERE order_number = ? """, (web_status, order_number)) await db.commit() finally: await db.close() # ── App Settings ───────────────────────────────── async def get_app_settings() -> dict: """Get all app settings as a dict.""" db = await get_sqlite() try: cursor = await db.execute("SELECT key, value FROM app_settings") rows = await cursor.fetchall() return {row["key"]: row["value"] for row in rows} finally: await db.close() async def set_app_setting(key: str, value: str): """Set a single app setting value.""" db = await get_sqlite() try: await db.execute(""" INSERT OR REPLACE INTO app_settings (key, value) VALUES (?, ?) """, (key, value)) await db.commit() finally: await db.close() # ── SKU-based order lookup ──────────────────────── async def get_skipped_orders_with_sku(sku: str) -> list[str]: """Get order_numbers of SKIPPED orders that contain the given SKU.""" db = await get_sqlite() try: cursor = await db.execute(f""" SELECT DISTINCT oi.order_number FROM order_items oi JOIN orders o ON o.order_number = oi.order_number WHERE oi.sku = ? AND o.status = '{OrderStatus.SKIPPED.value}' """, (sku,)) rows = await cursor.fetchall() return [row[0] for row in rows] finally: await db.close() # ── Price Sync Runs ─────────────────────────────── # ── ANAF Cache ─────────────────────────────────── async def get_anaf_cache(bare_cui: str) -> dict | None: """Get cached ANAF data for a CUI (valid for 7 days).""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT scp_tva, denumire_anaf, checked_at FROM anaf_cache WHERE cui = ? AND checked_at > datetime('now', '-7 days') """, (bare_cui,)) row = await cursor.fetchone() if not row: return None return { "scpTVA": bool(row["scp_tva"]) if row["scp_tva"] is not None else None, "denumire_anaf": row["denumire_anaf"] or "", "checked_at": row["checked_at"], } finally: await db.close() async def upsert_anaf_cache(cui: str, scp_tva: int | None, denumire_anaf: str): """Insert or update ANAF cache entry.""" db = await get_sqlite() try: await db.execute(""" INSERT OR REPLACE INTO anaf_cache (cui, scp_tva, denumire_anaf, checked_at) VALUES (?, ?, ?, datetime('now')) """, (cui, scp_tva, denumire_anaf)) await db.commit() finally: await db.close() async def bulk_populate_anaf_cache(results: dict[str, dict]): """Batch insert/update ANAF cache entries. results format: {cui: {"scpTVA": bool|None, "denumire_anaf": str, "checked_at": str}, ...} """ if not results: return db = await get_sqlite() try: rows = [] for cui, data in results.items(): scp = None if data.get("scpTVA") is True: scp = 1 elif data.get("scpTVA") is False: scp = 0 rows.append((cui, scp, data.get("denumire_anaf", ""), data.get("checked_at", _now_str()))) await db.executemany(""" INSERT OR REPLACE INTO anaf_cache (cui, scp_tva, denumire_anaf, checked_at) VALUES (?, ?, ?, ?) """, rows) await db.commit() finally: await db.close() async def get_expired_cuis_for_prepopulate() -> list[str]: """Get CUIs from recent orders that need ANAF cache refresh.""" from . import anaf_service db = await get_sqlite() try: cursor = await db.execute(""" SELECT DISTINCT cod_fiscal_gomag FROM orders WHERE cod_fiscal_gomag IS NOT NULL AND cod_fiscal_gomag != '' AND order_date >= date('now', '-3 months') """) rows = await cursor.fetchall() cuis_to_check = [] for row in rows: raw = row["cod_fiscal_gomag"] bare = anaf_service.strip_ro_prefix(raw) if not anaf_service.validate_cui(bare): continue # Check if cache is valid cached = await get_anaf_cache(bare) if cached is None: cuis_to_check.append(bare) return cuis_to_check finally: await db.close() # ── Partner/Address Data on Orders ───────────────── async def update_order_partner_data(order_number: str, partner_data: dict): """Update order with partner/ANAF/address comparison data. partner_data keys: cod_fiscal_gomag, cod_fiscal_roa, denumire_roa, anaf_platitor_tva, anaf_checked_at, anaf_cod_fiscal_adjusted, adresa_livrare_gomag, adresa_facturare_gomag, adresa_livrare_roa, adresa_facturare_roa, anaf_denumire_mismatch, denumire_anaf """ db = await get_sqlite() try: await db.execute(""" UPDATE orders SET cod_fiscal_gomag = ?, cod_fiscal_roa = ?, denumire_roa = ?, anaf_platitor_tva = ?, anaf_checked_at = ?, anaf_cod_fiscal_adjusted = ?, adresa_livrare_gomag = ?, adresa_facturare_gomag = ?, adresa_livrare_roa = ?, adresa_facturare_roa = ?, anaf_denumire_mismatch = ?, denumire_anaf = ?, address_mismatch = ?, updated_at = datetime('now') WHERE order_number = ? """, ( partner_data.get("cod_fiscal_gomag"), partner_data.get("cod_fiscal_roa"), partner_data.get("denumire_roa"), partner_data.get("anaf_platitor_tva"), partner_data.get("anaf_checked_at"), partner_data.get("anaf_cod_fiscal_adjusted", 0), partner_data.get("adresa_livrare_gomag"), partner_data.get("adresa_facturare_gomag"), partner_data.get("adresa_livrare_roa"), partner_data.get("adresa_facturare_roa"), partner_data.get("anaf_denumire_mismatch", 0), partner_data.get("denumire_anaf"), partner_data.get("address_mismatch", 0), order_number, )) await db.commit() finally: await db.close() async def update_gomag_addresses_batch(updates: list[dict]): """Update GoMag addresses and recompute address_mismatch for a batch of orders. Each dict: {order_number, adresa_livrare_gomag, adresa_facturare_gomag} """ if not updates: return from ..services.sync_service import _addr_match db = await get_sqlite() try: for u in updates: order_number = u["order_number"] livr_gomag = u.get("adresa_livrare_gomag") fact_gomag = u.get("adresa_facturare_gomag") # Update GoMag addresses await db.execute(""" UPDATE orders SET adresa_livrare_gomag = COALESCE(?, adresa_livrare_gomag), adresa_facturare_gomag = COALESCE(?, adresa_facturare_gomag), updated_at = datetime('now') WHERE order_number = ? """, (livr_gomag, fact_gomag, order_number)) # Recompute address_mismatch from stored addresses cursor = await db.execute( "SELECT adresa_livrare_gomag, adresa_livrare_roa, " "adresa_facturare_gomag, adresa_facturare_roa FROM orders WHERE order_number = ?", (order_number,) ) row = await cursor.fetchone() if row and (row[1] or row[3]): # has at least one ROA address livr_ok = _addr_match(row[0], row[1]) fact_ok = _addr_match(row[2], row[3]) new_val = 1 if (not livr_ok or not fact_ok) else 0 await db.execute( "UPDATE orders SET address_mismatch = ? WHERE order_number = ?", (new_val, order_number) ) await db.commit() finally: await db.close() async def get_order_address_ids(order_number: str) -> dict | None: """Return id_adresa_livrare, id_adresa_facturare, adresa_*_gomag for an order.""" db = await get_sqlite() try: cursor = await db.execute("""SELECT id_adresa_livrare, id_adresa_facturare, adresa_livrare_gomag, adresa_facturare_gomag, adresa_livrare_roa FROM orders WHERE order_number = ?""", [order_number]) row = await cursor.fetchone() return dict(row) if row else None finally: await db.close() async def update_order_address_cache(order_number: str, livr_roa: dict | None, fact_roa: dict | None, mismatch: bool): """Update ONLY the 3 address-cache columns — does NOT touch ANAF/partner fields.""" db = await get_sqlite() try: await db.execute(""" UPDATE orders SET adresa_livrare_roa = ?, adresa_facturare_roa = ?, address_mismatch = ?, updated_at = datetime('now') WHERE order_number = ? """, ( json.dumps(livr_roa) if livr_roa else None, json.dumps(fact_roa) if fact_roa else None, 1 if mismatch else 0, order_number, )) await db.commit() finally: await db.close() async def get_orders_with_address_ids() -> list[dict]: """Get all orders that have Oracle address IDs stored (for batch refresh).""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT order_number, id_adresa_livrare, id_adresa_facturare, adresa_livrare_gomag, adresa_facturare_gomag FROM orders WHERE id_adresa_livrare IS NOT NULL OR id_adresa_facturare IS NOT NULL """) rows = await cursor.fetchall() return [dict(r) for r in rows] finally: await db.close() async def get_orders_missing_anaf() -> list[dict]: """Get orders with cod_fiscal_roa set but no ANAF data (for backfill).""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT order_number, cod_fiscal_roa, denumire_roa, customer_name FROM orders WHERE cod_fiscal_roa IS NOT NULL AND cod_fiscal_roa != '' AND anaf_platitor_tva IS NULL AND status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}') """) rows = await cursor.fetchall() return [dict(r) for r in rows] finally: await db.close() async def get_anaf_cache_batch(bare_cuis: list[str]) -> dict[str, dict]: """Get cached ANAF data for multiple CUIs (valid for 7 days).""" if not bare_cuis: return {} db = await get_sqlite() try: placeholders = ",".join("?" for _ in bare_cuis) cursor = await db.execute(f""" SELECT cui, scp_tva, denumire_anaf, checked_at FROM anaf_cache WHERE cui IN ({placeholders}) AND checked_at > datetime('now', '-7 days') """, bare_cuis) rows = await cursor.fetchall() return { r["cui"]: { "scpTVA": bool(r["scp_tva"]) if r["scp_tva"] is not None else None, "denumire_anaf": r["denumire_anaf"] or "", "checked_at": r["checked_at"], } for r in rows } finally: await db.close() async def bulk_update_order_anaf_data(updates: list[tuple]): """Batch update orders with ANAF data. updates: list of (anaf_platitor_tva, anaf_checked_at, anaf_denumire_mismatch, denumire_anaf, order_number) """ if not updates: return db = await get_sqlite() try: await db.executemany(""" UPDATE orders SET anaf_platitor_tva = ?, anaf_checked_at = ?, anaf_denumire_mismatch = ?, denumire_anaf = ?, updated_at = datetime('now') WHERE order_number = ? """, updates) await db.commit() finally: await db.close() # ── Address Quality Cache (via app_settings) ────── async def get_incomplete_addresses_count() -> int: """Get cached count of orders with incomplete ROA addresses. Returns -1 if cache is stale (> 1 hour old) or not set. """ db = await get_sqlite() try: cursor = await db.execute( "SELECT value FROM app_settings WHERE key = 'incomplete_addresses_checked_at'" ) row = await cursor.fetchone() if not row or not row["value"]: return -1 # Check freshness from datetime import datetime, timedelta try: checked_at = datetime.fromisoformat(row["value"]) if datetime.now() - checked_at > timedelta(hours=1): return -1 except (ValueError, TypeError): return -1 cursor = await db.execute( "SELECT value FROM app_settings WHERE key = 'incomplete_addresses_count'" ) row = await cursor.fetchone() return int(row["value"]) if row and row["value"] else 0 finally: await db.close() async def set_incomplete_addresses_count(count: int): """Cache incomplete addresses count in app_settings.""" db = await get_sqlite() try: await db.execute( "INSERT OR REPLACE INTO app_settings (key, value) VALUES ('incomplete_addresses_count', ?)", (str(count),) ) await db.execute( "INSERT OR REPLACE INTO app_settings (key, value) VALUES ('incomplete_addresses_checked_at', ?)", (_now_str(),) ) await db.commit() finally: await db.close() # ── Partner Mismatch ────────────────────────────── async def get_orders_partner_data_batch(order_numbers: list) -> dict: """Return {order_number: {cod_fiscal_gomag, denumire_roa, id_partener, factura_numar, id_comanda}}.""" if not order_numbers: return {} db = await get_sqlite() try: result = {} for i in range(0, len(order_numbers), 500): batch = order_numbers[i:i+500] placeholders = ",".join("?" * len(batch)) cursor = await db.execute( f"SELECT order_number, cod_fiscal_gomag, denumire_roa, id_partener, " f"factura_numar, id_comanda FROM orders WHERE order_number IN ({placeholders})", batch ) for row in await cursor.fetchall(): result[row[0]] = { "cod_fiscal_gomag": row[1], "denumire_roa": row[2], "id_partener": row[3], "factura_numar": row[4], "id_comanda": row[5], } return result finally: await db.close() async def update_partner_mismatch_batch(updates: list) -> None: """Update partner_mismatch flag for a batch of orders. Each item: {order_number, partner_mismatch: 0|1} """ if not updates: return db = await get_sqlite() try: await db.executemany( "UPDATE orders SET partner_mismatch = ?, updated_at = datetime('now') WHERE order_number = ?", [(u["partner_mismatch"], u["order_number"]) for u in updates] ) await db.commit() finally: await db.close() async def clear_stale_partner_mismatches_no_cui(exclude_numbers: set) -> int: """Clear partner_mismatch=1 for orders with cod_fiscal_gomag=NULL that are NOT in the current sync batch. These were flagged by old code (before the no-CUI fix) and will never self-correct because they fall outside the active sync window. Returns number of rows cleared. """ db = await get_sqlite() try: if exclude_numbers: placeholders = ",".join("?" * len(exclude_numbers)) sql = f""" UPDATE orders SET partner_mismatch = 0, updated_at = datetime('now') WHERE partner_mismatch = 1 AND cod_fiscal_gomag IS NULL AND order_number NOT IN ({placeholders}) """ await db.execute(sql, list(exclude_numbers)) else: await db.execute(""" UPDATE orders SET partner_mismatch = 0, updated_at = datetime('now') WHERE partner_mismatch = 1 AND cod_fiscal_gomag IS NULL """) await db.commit() cursor = await db.execute("SELECT changes()") row = await cursor.fetchone() return row[0] if row else 0 finally: await db.close() async def update_partner_resync_data(order_number: str, data: dict) -> None: """Update partner fields + clear partner_mismatch after a successful resync.""" db = await get_sqlite() try: await db.execute(""" UPDATE orders SET id_partener = ?, cod_fiscal_gomag = ?, cod_fiscal_roa = ?, denumire_roa = ?, partner_mismatch = ?, updated_at = datetime('now') WHERE order_number = ? """, ( data.get("id_partener"), data.get("cod_fiscal_gomag"), data.get("cod_fiscal_roa"), data.get("denumire_roa"), data.get("partner_mismatch", 0), order_number, )) await db.commit() finally: await db.close()