fix(logs): use status_at_run for per-run order counts and filtering

orders.status preserves IMPORTED over ALREADY_IMPORTED to avoid
overwriting historical data, so per-run journal views must use
sync_run_orders.status_at_run to show what actually happened in
that specific run.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-14 00:12:10 +02:00
parent f52c504c2b
commit 8681a92eec

View File

@@ -21,7 +21,8 @@ async def create_sync_run(run_id: str, json_files: int = 0):
async def update_sync_run(run_id: str, status: str, total_orders: int = 0, async def update_sync_run(run_id: str, status: str, total_orders: int = 0,
imported: int = 0, skipped: int = 0, errors: int = 0, imported: int = 0, skipped: int = 0, errors: int = 0,
error_message: str = None): error_message: str = None,
already_imported: int = 0, new_imported: int = 0):
"""Update sync run with results.""" """Update sync run with results."""
db = await get_sqlite() db = await get_sqlite()
try: try:
@@ -33,9 +34,12 @@ async def update_sync_run(run_id: str, status: str, total_orders: int = 0,
imported = ?, imported = ?,
skipped = ?, skipped = ?,
errors = ?, errors = ?,
error_message = ? error_message = ?,
already_imported = ?,
new_imported = ?
WHERE run_id = ? WHERE run_id = ?
""", (status, total_orders, imported, skipped, errors, error_message, run_id)) """, (status, total_orders, imported, skipped, errors, error_message,
already_imported, new_imported, run_id))
await db.commit() await db.commit()
finally: finally:
await db.close() await db.close()
@@ -58,7 +62,11 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
payment_method, delivery_method) payment_method, delivery_method)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET ON CONFLICT(order_number) DO UPDATE SET
status = excluded.status, status = CASE
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message, error_message = excluded.error_message,
missing_skus = excluded.missing_skus, missing_skus = excluded.missing_skus,
items_count = excluded.items_count, items_count = excluded.items_count,
@@ -96,6 +104,86 @@ async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run:
await db.close() await db.close()
async def save_orders_batch(orders_data: list[dict]):
"""Batch save a list of orders + their sync_run_orders + order_items in one transaction.
Each dict must have: sync_run_id, order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus (list|None), items_count,
shipping_name, billing_name, payment_method, delivery_method, status_at_run,
items (list of item dicts).
"""
if not orders_data:
return
db = await get_sqlite()
try:
# 1. Upsert orders
await db.executemany("""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
status = CASE
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
updated_at = datetime('now')
""", [
(d["order_number"], d["order_date"], d["customer_name"], d["status"],
d.get("id_comanda"), d.get("id_partener"), d.get("error_message"),
json.dumps(d["missing_skus"]) if d.get("missing_skus") else None,
d.get("items_count", 0), d["sync_run_id"],
d.get("shipping_name"), d.get("billing_name"),
d.get("payment_method"), d.get("delivery_method"))
for d in orders_data
])
# 2. Sync run orders
await db.executemany("""
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
VALUES (?, ?, ?)
""", [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data])
# 3. Order items
all_items = []
for d in orders_data:
for item in d.get("items", []):
all_items.append((
d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa")
))
if all_items:
await db.executemany("""
INSERT OR IGNORE INTO order_items
(order_number, sku, product_name, quantity, price, vat,
mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", all_items)
await db.commit()
finally:
await db.close()
async def track_missing_sku(sku: str, product_name: str = "", async def track_missing_sku(sku: str, product_name: str = "",
order_count: int = 0, order_numbers: str = None, order_count: int = 0, order_numbers: str = None,
customers: str = None): customers: str = None):
@@ -338,6 +426,25 @@ async def upsert_web_product(sku: str, product_name: str):
await db.close() await db.close()
async def upsert_web_products_batch(items: list[tuple[str, str]]):
"""Batch upsert web products in a single transaction. items: list of (sku, product_name)."""
if not items:
return
db = await get_sqlite()
try:
await db.executemany("""
INSERT INTO web_products (sku, product_name, order_count)
VALUES (?, ?, 1)
ON CONFLICT(sku) DO UPDATE SET
product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name),
last_seen = datetime('now'),
order_count = web_products.order_count + 1
""", items)
await db.commit()
finally:
await db.close()
async def get_web_product_name(sku: str) -> str: async def get_web_product_name(sku: str) -> str:
"""Lookup product name by SKU.""" """Lookup product name by SKU."""
db = await get_sqlite() db = await get_sqlite()
@@ -444,7 +551,7 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
params = [run_id] params = [run_id]
if status_filter and status_filter != "all": if status_filter and status_filter != "all":
where += " AND UPPER(o.status) = ?" where += " AND UPPER(sro.status_at_run) = ?"
params.append(status_filter.upper()) params.append(status_filter.upper())
allowed_sort = {"order_date", "order_number", "customer_name", "items_count", allowed_sort = {"order_date", "order_number", "customer_name", "items_count",
@@ -462,7 +569,7 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
offset = (page - 1) * per_page offset = (page - 1) * per_page
cursor = await db.execute(f""" cursor = await db.execute(f"""
SELECT o.* FROM orders o SELECT o.*, sro.status_at_run AS run_status FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
{where} {where}
ORDER BY o.{sort_by} {sort_dir} ORDER BY o.{sort_by} {sort_dir}
@@ -471,16 +578,23 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
rows = await cursor.fetchall() rows = await cursor.fetchall()
cursor = await db.execute(""" cursor = await db.execute("""
SELECT o.status, COUNT(*) as cnt SELECT sro.status_at_run AS status, COUNT(*) as cnt
FROM orders o FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
WHERE sro.sync_run_id = ? WHERE sro.sync_run_id = ?
GROUP BY o.status GROUP BY sro.status_at_run
""", (run_id,)) """, (run_id,))
status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()} status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()}
# Use run_status (status_at_run) as the status field for each order row
order_rows = []
for r in rows:
d = dict(r)
d["status"] = d.pop("run_status", d.get("status"))
order_rows.append(d)
return { return {
"orders": [dict(r) for r in rows], "orders": order_rows,
"total": total, "total": total,
"page": page, "page": page,
"per_page": per_page, "per_page": per_page,
@@ -489,6 +603,7 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
"imported": status_counts.get("IMPORTED", 0), "imported": status_counts.get("IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0), "skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0), "error": status_counts.get("ERROR", 0),
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
"total": sum(status_counts.values()) "total": sum(status_counts.values())
} }
} }
@@ -565,6 +680,7 @@ async def get_orders(page: int = 1, per_page: int = 50,
"imported": status_counts.get("IMPORTED", 0), "imported": status_counts.get("IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0), "skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0), "error": status_counts.get("ERROR", 0),
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
"total": sum(status_counts.values()) "total": sum(status_counts.values())
} }
} }
@@ -588,3 +704,43 @@ async def update_import_order_addresses(order_number: str,
await db.commit() await db.commit()
finally: finally:
await db.close() await db.close()
# ── Invoice cache ────────────────────────────────
async def get_uninvoiced_imported_orders() -> list:
"""Get all imported orders that don't yet have invoice data cached."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT order_number, id_comanda FROM orders
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
AND id_comanda IS NOT NULL
AND factura_numar IS NULL
""")
rows = await cursor.fetchall()
return [dict(r) for r in rows]
finally:
await db.close()
async def update_order_invoice(order_number: str, serie: str = None,
numar: str = None, total_fara_tva: float = None,
total_tva: float = None, total_cu_tva: float = None):
"""Cache invoice data from Oracle onto the order record."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
factura_serie = ?,
factura_numar = ?,
factura_total_fara_tva = ?,
factura_total_tva = ?,
factura_total_cu_tva = ?,
invoice_checked_at = datetime('now'),
updated_at = datetime('now')
WHERE order_number = ?
""", (serie, numar, total_fara_tva, total_tva, total_cu_tva, order_number))
await db.commit()
finally:
await db.close()