feat(sync): per-order SAVEPOINT protection for order_items upsert
_safe_upsert_order_items(db, order_number, items) wraps the DELETE + INSERT OR REPLACE pair in SAVEPOINT items. On IntegrityError / ValueError / TypeError it rolls the savepoint back, tags the parent order MALFORMED, logs to the error history file, and returns False to the caller. add_order_items now delegates to this helper so a single bad payload cannot leave order_items in a split state. 2 integration tests: happy path + simulated INSERT crash via aiosqlite monkeypatch. Existing order_items overwrite regression tests still pass (5/5). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -774,33 +774,69 @@ def _dedup_items_by_sku(items: list) -> list:
|
|||||||
return order
|
return order
|
||||||
|
|
||||||
|
|
||||||
|
async def _safe_upsert_order_items(db, order_number: str, items: list):
|
||||||
|
"""Replace order_items for one order inside a SAVEPOINT.
|
||||||
|
|
||||||
|
On IntegrityError / ValueError / TypeError: rolls back the savepoint,
|
||||||
|
tags the parent order MALFORMED, records the failure in the history
|
||||||
|
log, and returns False. On success returns True.
|
||||||
|
|
||||||
|
Caller is responsible for the outer connection lifecycle.
|
||||||
|
"""
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
items = _dedup_items_by_sku(items) if items else []
|
||||||
|
|
||||||
|
await db.execute("SAVEPOINT items")
|
||||||
|
try:
|
||||||
|
await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,))
|
||||||
|
if items:
|
||||||
|
await db.executemany("""
|
||||||
|
INSERT INTO order_items
|
||||||
|
(order_number, sku, product_name, quantity, price, baseprice,
|
||||||
|
vat, mapping_status, codmat, id_articol, cantitate_roa)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""", [
|
||||||
|
(order_number,
|
||||||
|
item.get("sku"), item.get("product_name"),
|
||||||
|
item.get("quantity"), item.get("price"), item.get("baseprice"),
|
||||||
|
item.get("vat"),
|
||||||
|
item.get("mapping_status"), item.get("codmat"),
|
||||||
|
item.get("id_articol"), item.get("cantitate_roa"))
|
||||||
|
for item in items
|
||||||
|
])
|
||||||
|
await db.execute("RELEASE SAVEPOINT items")
|
||||||
|
return True
|
||||||
|
except (sqlite3.IntegrityError, ValueError, TypeError) as err:
|
||||||
|
reason = f"ITEMS_FAIL: {type(err).__name__}: {err}"
|
||||||
|
try:
|
||||||
|
await db.execute("ROLLBACK TO SAVEPOINT items")
|
||||||
|
await db.execute("RELEASE SAVEPOINT items")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
# Outer caller will handle reconnect — just log and bail.
|
||||||
|
logger.exception(f"_safe_upsert_order_items: rollback failed for {order_number}")
|
||||||
|
raise
|
||||||
|
# Tag parent order as MALFORMED without removing it from sync state.
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE orders SET status = ?, error_message = ?, updated_at = datetime('now') WHERE order_number = ?",
|
||||||
|
(OrderStatus.MALFORMED.value, reason, order_number),
|
||||||
|
)
|
||||||
|
_log_order_error_history(order_number, reason)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def add_order_items(order_number: str, items: list):
|
async def add_order_items(order_number: str, items: list):
|
||||||
"""Replace order items — delete any existing rows, then insert fresh batch.
|
"""Replace order items — delete any existing rows, then insert fresh batch.
|
||||||
|
|
||||||
GoMag is source of truth: re-import must reflect quantity changes.
|
GoMag is source of truth: re-import must reflect quantity changes.
|
||||||
Atomic (DELETE + INSERT in one transaction). Items with the same SKU are
|
Wrapped in _safe_upsert_order_items so a bad payload marks the parent
|
||||||
merged (quantities summed) to satisfy the (order_number, sku) PK.
|
order MALFORMED rather than exploding the sync.
|
||||||
"""
|
"""
|
||||||
if not items:
|
if not items:
|
||||||
return
|
return
|
||||||
items = _dedup_items_by_sku(items)
|
|
||||||
db = await get_sqlite()
|
db = await get_sqlite()
|
||||||
try:
|
try:
|
||||||
await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,))
|
await _safe_upsert_order_items(db, order_number, items)
|
||||||
await db.executemany("""
|
|
||||||
INSERT INTO order_items
|
|
||||||
(order_number, sku, product_name, quantity, price, baseprice,
|
|
||||||
vat, mapping_status, codmat, id_articol, cantitate_roa)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
||||||
""", [
|
|
||||||
(order_number,
|
|
||||||
item.get("sku"), item.get("product_name"),
|
|
||||||
item.get("quantity"), item.get("price"), item.get("baseprice"),
|
|
||||||
item.get("vat"),
|
|
||||||
item.get("mapping_status"), item.get("codmat"),
|
|
||||||
item.get("id_articol"), item.get("cantitate_roa"))
|
|
||||||
for item in items
|
|
||||||
])
|
|
||||||
await db.commit()
|
await db.commit()
|
||||||
finally:
|
finally:
|
||||||
await db.close()
|
await db.close()
|
||||||
|
|||||||
@@ -247,3 +247,61 @@ async def test_reconnect_preserves_malformed_and_continues(monkeypatch):
|
|||||||
finally:
|
finally:
|
||||||
# fresh was already closed; nothing else to do
|
# fresh was already closed; nothing else to do
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ── 7. _safe_upsert_order_items — success + savepoint rollback ──
|
||||||
|
|
||||||
|
|
||||||
|
async def test_safe_upsert_items_happy_path():
|
||||||
|
# Seed parent order so FK context is valid.
|
||||||
|
await sqlite_service.save_orders_batch([_order("SAFE-1", items=[])])
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
ok = await sqlite_service._safe_upsert_order_items(
|
||||||
|
db, "SAFE-1", [_item("SKU-H", qty=2)]
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
assert ok is True
|
||||||
|
items = await _items_of("SAFE-1")
|
||||||
|
assert items == [("SKU-H", 2)]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_safe_upsert_items_rolls_back_and_marks_malformed(monkeypatch):
|
||||||
|
await sqlite_service.save_orders_batch([_order("SAFE-2", items=[_item("PRE", qty=1)])])
|
||||||
|
|
||||||
|
import aiosqlite
|
||||||
|
real_executemany = aiosqlite.core.Connection.executemany
|
||||||
|
|
||||||
|
async def boom_on_items(self, sql, rows):
|
||||||
|
if "INSERT INTO order_items" in sql.upper().replace("\n", " ").replace(" ", " ").upper() or "ORDER_ITEMS" in sql.upper():
|
||||||
|
raise sqlite3.IntegrityError("simulated items insert crash")
|
||||||
|
return await real_executemany(self, sql, rows)
|
||||||
|
|
||||||
|
monkeypatch.setattr(aiosqlite.core.Connection, "executemany", boom_on_items)
|
||||||
|
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
ok = await sqlite_service._safe_upsert_order_items(
|
||||||
|
db, "SAFE-2", [_item("SKU-BAD", qty=1)]
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
assert ok is False
|
||||||
|
# Parent order was tagged MALFORMED, pre-existing items were wiped by DELETE
|
||||||
|
# (which ran inside the rolled-back savepoint, so they should survive).
|
||||||
|
malformed = await _orders_with_status(OrderStatus.MALFORMED.value)
|
||||||
|
assert "SAFE-2" in malformed
|
||||||
|
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
cur = await db.execute(
|
||||||
|
"SELECT error_message FROM orders WHERE order_number = ?", ("SAFE-2",)
|
||||||
|
)
|
||||||
|
row = await cur.fetchone()
|
||||||
|
assert row is not None and "ITEMS_FAIL" in row[0]
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user