From 8681a92eecdbb8c574deec80f68759de3a0280e7 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Sat, 14 Mar 2026 00:12:10 +0200 Subject: [PATCH] fix(logs): use status_at_run for per-run order counts and filtering orders.status preserves IMPORTED over ALREADY_IMPORTED to avoid overwriting historical data, so per-run journal views must use sync_run_orders.status_at_run to show what actually happened in that specific run. Co-Authored-By: Claude Sonnet 4.6 --- api/app/services/sqlite_service.py | 174 +++++++++++++++++++++++++++-- 1 file changed, 165 insertions(+), 9 deletions(-) diff --git a/api/app/services/sqlite_service.py b/api/app/services/sqlite_service.py index ca6e9d0..1bbdd05 100644 --- a/api/app/services/sqlite_service.py +++ b/api/app/services/sqlite_service.py @@ -21,7 +21,8 @@ async def create_sync_run(run_id: str, json_files: int = 0): async def update_sync_run(run_id: str, status: str, total_orders: int = 0, imported: int = 0, skipped: int = 0, errors: int = 0, - error_message: str = None): + error_message: str = None, + already_imported: int = 0, new_imported: int = 0): """Update sync run with results.""" db = await get_sqlite() try: @@ -33,9 +34,12 @@ async def update_sync_run(run_id: str, status: str, total_orders: int = 0, imported = ?, skipped = ?, errors = ?, - error_message = ? + error_message = ?, + already_imported = ?, + new_imported = ? WHERE run_id = ? - """, (status, total_orders, imported, skipped, errors, error_message, run_id)) + """, (status, total_orders, imported, skipped, errors, error_message, + already_imported, new_imported, run_id)) await db.commit() finally: await db.close() @@ -58,7 +62,11 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str, payment_method, delivery_method) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(order_number) DO UPDATE SET - status = excluded.status, + status = CASE + WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED' + THEN orders.status + ELSE excluded.status + END, error_message = excluded.error_message, missing_skus = excluded.missing_skus, items_count = excluded.items_count, @@ -96,6 +104,86 @@ async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: await db.close() +async def save_orders_batch(orders_data: list[dict]): + """Batch save a list of orders + their sync_run_orders + order_items in one transaction. + + Each dict must have: sync_run_id, order_number, order_date, customer_name, status, + id_comanda, id_partener, error_message, missing_skus (list|None), items_count, + shipping_name, billing_name, payment_method, delivery_method, status_at_run, + items (list of item dicts). + """ + if not orders_data: + return + db = await get_sqlite() + try: + # 1. Upsert orders + await db.executemany(""" + INSERT INTO orders + (order_number, order_date, customer_name, status, + id_comanda, id_partener, error_message, missing_skus, items_count, + last_sync_run_id, shipping_name, billing_name, + payment_method, delivery_method) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(order_number) DO UPDATE SET + status = CASE + WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED' + THEN orders.status + ELSE excluded.status + END, + error_message = excluded.error_message, + missing_skus = excluded.missing_skus, + items_count = excluded.items_count, + id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda), + id_partener = COALESCE(excluded.id_partener, orders.id_partener), + times_skipped = CASE WHEN excluded.status = 'SKIPPED' + THEN orders.times_skipped + 1 + ELSE orders.times_skipped END, + last_sync_run_id = excluded.last_sync_run_id, + shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name), + billing_name = COALESCE(excluded.billing_name, orders.billing_name), + payment_method = COALESCE(excluded.payment_method, orders.payment_method), + delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method), + updated_at = datetime('now') + """, [ + (d["order_number"], d["order_date"], d["customer_name"], d["status"], + d.get("id_comanda"), d.get("id_partener"), d.get("error_message"), + json.dumps(d["missing_skus"]) if d.get("missing_skus") else None, + d.get("items_count", 0), d["sync_run_id"], + d.get("shipping_name"), d.get("billing_name"), + d.get("payment_method"), d.get("delivery_method")) + for d in orders_data + ]) + + # 2. Sync run orders + await db.executemany(""" + INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) + VALUES (?, ?, ?) + """, [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data]) + + # 3. Order items + all_items = [] + for d in orders_data: + for item in d.get("items", []): + all_items.append(( + d["order_number"], + item.get("sku"), item.get("product_name"), + item.get("quantity"), item.get("price"), item.get("vat"), + item.get("mapping_status"), item.get("codmat"), + item.get("id_articol"), item.get("cantitate_roa") + )) + if all_items: + await db.executemany(""" + INSERT OR IGNORE INTO order_items + (order_number, sku, product_name, quantity, price, vat, + mapping_status, codmat, id_articol, cantitate_roa) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, all_items) + + await db.commit() + finally: + await db.close() + + async def track_missing_sku(sku: str, product_name: str = "", order_count: int = 0, order_numbers: str = None, customers: str = None): @@ -338,6 +426,25 @@ async def upsert_web_product(sku: str, product_name: str): await db.close() +async def upsert_web_products_batch(items: list[tuple[str, str]]): + """Batch upsert web products in a single transaction. items: list of (sku, product_name).""" + if not items: + return + db = await get_sqlite() + try: + await db.executemany(""" + INSERT INTO web_products (sku, product_name, order_count) + VALUES (?, ?, 1) + ON CONFLICT(sku) DO UPDATE SET + product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name), + last_seen = datetime('now'), + order_count = web_products.order_count + 1 + """, items) + await db.commit() + finally: + await db.close() + + async def get_web_product_name(sku: str) -> str: """Lookup product name by SKU.""" db = await get_sqlite() @@ -444,7 +551,7 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all", params = [run_id] if status_filter and status_filter != "all": - where += " AND UPPER(o.status) = ?" + where += " AND UPPER(sro.status_at_run) = ?" params.append(status_filter.upper()) allowed_sort = {"order_date", "order_number", "customer_name", "items_count", @@ -462,7 +569,7 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all", offset = (page - 1) * per_page cursor = await db.execute(f""" - SELECT o.* FROM orders o + SELECT o.*, sro.status_at_run AS run_status FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number {where} ORDER BY o.{sort_by} {sort_dir} @@ -471,16 +578,23 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all", rows = await cursor.fetchall() cursor = await db.execute(""" - SELECT o.status, COUNT(*) as cnt + SELECT sro.status_at_run AS status, COUNT(*) as cnt FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number WHERE sro.sync_run_id = ? - GROUP BY o.status + GROUP BY sro.status_at_run """, (run_id,)) status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()} + # Use run_status (status_at_run) as the status field for each order row + order_rows = [] + for r in rows: + d = dict(r) + d["status"] = d.pop("run_status", d.get("status")) + order_rows.append(d) + return { - "orders": [dict(r) for r in rows], + "orders": order_rows, "total": total, "page": page, "per_page": per_page, @@ -489,6 +603,7 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all", "imported": status_counts.get("IMPORTED", 0), "skipped": status_counts.get("SKIPPED", 0), "error": status_counts.get("ERROR", 0), + "already_imported": status_counts.get("ALREADY_IMPORTED", 0), "total": sum(status_counts.values()) } } @@ -565,6 +680,7 @@ async def get_orders(page: int = 1, per_page: int = 50, "imported": status_counts.get("IMPORTED", 0), "skipped": status_counts.get("SKIPPED", 0), "error": status_counts.get("ERROR", 0), + "already_imported": status_counts.get("ALREADY_IMPORTED", 0), "total": sum(status_counts.values()) } } @@ -588,3 +704,43 @@ async def update_import_order_addresses(order_number: str, await db.commit() finally: await db.close() + + +# ── Invoice cache ──────────────────────────────── + +async def get_uninvoiced_imported_orders() -> list: + """Get all imported orders that don't yet have invoice data cached.""" + db = await get_sqlite() + try: + cursor = await db.execute(""" + SELECT order_number, id_comanda FROM orders + WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED') + AND id_comanda IS NOT NULL + AND factura_numar IS NULL + """) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + finally: + await db.close() + + +async def update_order_invoice(order_number: str, serie: str = None, + numar: str = None, total_fara_tva: float = None, + total_tva: float = None, total_cu_tva: float = None): + """Cache invoice data from Oracle onto the order record.""" + db = await get_sqlite() + try: + await db.execute(""" + UPDATE orders SET + factura_serie = ?, + factura_numar = ?, + factura_total_fara_tva = ?, + factura_total_tva = ?, + factura_total_cu_tva = ?, + invoice_checked_at = datetime('now'), + updated_at = datetime('now') + WHERE order_number = ? + """, (serie, numar, total_fara_tva, total_tva, total_cu_tva, order_number)) + await db.commit() + finally: + await db.close()