From 47a6bd83a4d3e001ad5995d37fbf7dc164c19150 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Wed, 22 Apr 2026 09:00:57 +0000 Subject: [PATCH] feat(sync): per-order SAVEPOINT protection for order_items upsert _safe_upsert_order_items(db, order_number, items) wraps the DELETE + INSERT OR REPLACE pair in SAVEPOINT items. On IntegrityError / ValueError / TypeError it rolls the savepoint back, tags the parent order MALFORMED, logs to the error history file, and returns False to the caller. add_order_items now delegates to this helper so a single bad payload cannot leave order_items in a split state. 2 integration tests: happy path + simulated INSERT crash via aiosqlite monkeypatch. Existing order_items overwrite regression tests still pass (5/5). Co-Authored-By: Claude Opus 4.7 (1M context) --- api/app/services/sqlite_service.py | 72 ++++++++++++++++------ api/tests/test_save_orders_batch_hybrid.py | 58 +++++++++++++++++ 2 files changed, 112 insertions(+), 18 deletions(-) diff --git a/api/app/services/sqlite_service.py b/api/app/services/sqlite_service.py index 81b0fd2..89d4934 100644 --- a/api/app/services/sqlite_service.py +++ b/api/app/services/sqlite_service.py @@ -774,33 +774,69 @@ def _dedup_items_by_sku(items: list) -> list: return order +async def _safe_upsert_order_items(db, order_number: str, items: list): + """Replace order_items for one order inside a SAVEPOINT. + + On IntegrityError / ValueError / TypeError: rolls back the savepoint, + tags the parent order MALFORMED, records the failure in the history + log, and returns False. On success returns True. + + Caller is responsible for the outer connection lifecycle. + """ + import sqlite3 + + items = _dedup_items_by_sku(items) if items else [] + + await db.execute("SAVEPOINT items") + try: + await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,)) + if items: + await db.executemany(""" + INSERT INTO order_items + (order_number, sku, product_name, quantity, price, baseprice, + vat, mapping_status, codmat, id_articol, cantitate_roa) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, [ + (order_number, + item.get("sku"), item.get("product_name"), + item.get("quantity"), item.get("price"), item.get("baseprice"), + item.get("vat"), + item.get("mapping_status"), item.get("codmat"), + item.get("id_articol"), item.get("cantitate_roa")) + for item in items + ]) + await db.execute("RELEASE SAVEPOINT items") + return True + except (sqlite3.IntegrityError, ValueError, TypeError) as err: + reason = f"ITEMS_FAIL: {type(err).__name__}: {err}" + try: + await db.execute("ROLLBACK TO SAVEPOINT items") + await db.execute("RELEASE SAVEPOINT items") + except sqlite3.OperationalError: + # Outer caller will handle reconnect — just log and bail. + logger.exception(f"_safe_upsert_order_items: rollback failed for {order_number}") + raise + # Tag parent order as MALFORMED without removing it from sync state. + await db.execute( + "UPDATE orders SET status = ?, error_message = ?, updated_at = datetime('now') WHERE order_number = ?", + (OrderStatus.MALFORMED.value, reason, order_number), + ) + _log_order_error_history(order_number, reason) + return False + + async def add_order_items(order_number: str, items: list): """Replace order items — delete any existing rows, then insert fresh batch. GoMag is source of truth: re-import must reflect quantity changes. - Atomic (DELETE + INSERT in one transaction). Items with the same SKU are - merged (quantities summed) to satisfy the (order_number, sku) PK. + Wrapped in _safe_upsert_order_items so a bad payload marks the parent + order MALFORMED rather than exploding the sync. """ if not items: return - items = _dedup_items_by_sku(items) db = await get_sqlite() try: - await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,)) - await db.executemany(""" - INSERT INTO order_items - (order_number, sku, product_name, quantity, price, baseprice, - vat, mapping_status, codmat, id_articol, cantitate_roa) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, [ - (order_number, - item.get("sku"), item.get("product_name"), - item.get("quantity"), item.get("price"), item.get("baseprice"), - item.get("vat"), - item.get("mapping_status"), item.get("codmat"), - item.get("id_articol"), item.get("cantitate_roa")) - for item in items - ]) + await _safe_upsert_order_items(db, order_number, items) await db.commit() finally: await db.close() diff --git a/api/tests/test_save_orders_batch_hybrid.py b/api/tests/test_save_orders_batch_hybrid.py index 1ec55c8..60a4d61 100644 --- a/api/tests/test_save_orders_batch_hybrid.py +++ b/api/tests/test_save_orders_batch_hybrid.py @@ -247,3 +247,61 @@ async def test_reconnect_preserves_malformed_and_continues(monkeypatch): finally: # fresh was already closed; nothing else to do pass + + +# ── 7. _safe_upsert_order_items — success + savepoint rollback ── + + +async def test_safe_upsert_items_happy_path(): + # Seed parent order so FK context is valid. + await sqlite_service.save_orders_batch([_order("SAFE-1", items=[])]) + db = await sqlite_service.get_sqlite() + try: + ok = await sqlite_service._safe_upsert_order_items( + db, "SAFE-1", [_item("SKU-H", qty=2)] + ) + await db.commit() + finally: + await db.close() + assert ok is True + items = await _items_of("SAFE-1") + assert items == [("SKU-H", 2)] + + +async def test_safe_upsert_items_rolls_back_and_marks_malformed(monkeypatch): + await sqlite_service.save_orders_batch([_order("SAFE-2", items=[_item("PRE", qty=1)])]) + + import aiosqlite + real_executemany = aiosqlite.core.Connection.executemany + + async def boom_on_items(self, sql, rows): + if "INSERT INTO order_items" in sql.upper().replace("\n", " ").replace(" ", " ").upper() or "ORDER_ITEMS" in sql.upper(): + raise sqlite3.IntegrityError("simulated items insert crash") + return await real_executemany(self, sql, rows) + + monkeypatch.setattr(aiosqlite.core.Connection, "executemany", boom_on_items) + + db = await sqlite_service.get_sqlite() + try: + ok = await sqlite_service._safe_upsert_order_items( + db, "SAFE-2", [_item("SKU-BAD", qty=1)] + ) + await db.commit() + finally: + await db.close() + + assert ok is False + # Parent order was tagged MALFORMED, pre-existing items were wiped by DELETE + # (which ran inside the rolled-back savepoint, so they should survive). + malformed = await _orders_with_status(OrderStatus.MALFORMED.value) + assert "SAFE-2" in malformed + + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute( + "SELECT error_message FROM orders WHERE order_number = ?", ("SAFE-2",) + ) + row = await cur.fetchone() + assert row is not None and "ITEMS_FAIL" in row[0] + finally: + await db.close()