feat(sync): hybrid batch+savepoint isolation with reconnect fix

save_orders_batch now runs in three tiers:
  1. validate_structural pre-flight splits each payload into valid or
     MALFORMED. MALFORMED rows persist with status + error_message + no
     items, and an append-only entry lands in sync_errors_history.log.
  2. Optimistic executemany over the valid list inside a SAVEPOINT batch.
  3. On IntegrityError / ValueError / TypeError, rollback the savepoint
     and fall back to per-order SAVEPOINT inserts so a single bad row
     cannot poison the rest of the batch.

Mid-loop SAVEPOINT rollback failure now triggers _safe_reconnect:
commit whatever survived, close the broken connection, open a fresh
one and keep processing. Preserves MALFORMED rows recorded earlier —
addresses the outside-voice gap where a crashed connection would lose
uncommitted malformed evidence.

Adds OrderStatus.MALFORMED and helper functions:
  _insert_orders_only  — orders + sync_run_orders, no items
  _insert_valid_batch  — happy-path bulk executemany
  _insert_single_order — per-order execute within savepoint
  _mark_malformed      — non-mutating copy with wiped items
  _safe_reconnect      — commit-close-reconnect guard

8 integration tests covering regression 485224762, structural
pre-flight, per-order isolation on runtime fail, caller-dict
immutability, and reconnect durability. 239 unit + 33 e2e green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-04-22 08:59:43 +00:00
parent d7610a6f33
commit f448f74b2d
3 changed files with 496 additions and 89 deletions

View File

@@ -15,3 +15,8 @@ class OrderStatus(str, Enum):
ERROR = "ERROR"
CANCELLED = "CANCELLED"
DELETED_IN_ROA = "DELETED_IN_ROA"
# Structural-fail: GoMag sent a payload that cannot be inserted as-is
# (missing fields, unparseable date, invalid quantity/price, or a runtime
# insert crash). Row persists with status=MALFORMED + error_message so
# operators can escalate to GoMag without blocking the rest of the batch.
MALFORMED = "MALFORMED"

View File

@@ -184,107 +184,260 @@ async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run:
await db.close()
async def save_orders_batch(orders_data: list[dict]):
"""Batch save a list of orders + their sync_run_orders + order_items in one transaction.
# SQL for the orders upsert — reused by batch + single-order fallback paths.
_ORDERS_UPSERT_SQL = f"""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method, order_total,
delivery_cost, discount_total, web_status, discount_split)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
customer_name = excluded.customer_name,
status = CASE
WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
order_total = COALESCE(excluded.order_total, orders.order_total),
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
web_status = COALESCE(excluded.web_status, orders.web_status),
discount_split = COALESCE(excluded.discount_split, orders.discount_split),
updated_at = datetime('now')
"""
Each dict must have: sync_run_id, order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus (list|None), items_count,
shipping_name, billing_name, payment_method, delivery_method, status_at_run,
items (list of item dicts), delivery_cost (optional), discount_total (optional),
web_status (optional).
def _orders_row(d: dict) -> tuple:
return (
d["order_number"], d["order_date"], d["customer_name"], d["status"],
d.get("id_comanda"), d.get("id_partener"), d.get("error_message"),
json.dumps(d["missing_skus"]) if d.get("missing_skus") else None,
d.get("items_count", 0), d["sync_run_id"],
d.get("shipping_name"), d.get("billing_name"),
d.get("payment_method"), d.get("delivery_method"),
d.get("order_total"),
d.get("delivery_cost"), d.get("discount_total"),
d.get("web_status"), d.get("discount_split"),
)
def _mark_malformed(d: dict, reason: str) -> dict:
"""Return a copy of d with status=MALFORMED, error_message set, items wiped.
Does NOT mutate caller's dict.
"""
out = dict(d)
out["status"] = OrderStatus.MALFORMED.value
out["error_message"] = reason
out["items"] = []
out["items_count"] = 0
out["missing_skus"] = None
return out
async def _insert_orders_only(db, orders: list[dict]):
"""Upsert only the `orders` + `sync_run_orders` rows (no items).
Used for MALFORMED fallback where we can't trust item data.
"""
if not orders:
return
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
[(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) for d in orders],
)
async def _insert_valid_batch(db, orders: list[dict]):
"""Happy-path batch insert: orders + sync_run_orders + order_items in bulk.
Caller wraps in a SAVEPOINT so a mid-batch failure rolls back to a clean
point and the per-order fallback can take over.
"""
if not orders:
return
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
[(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders],
)
all_items: list[tuple] = []
order_numbers_with_items: set = set()
for d in orders:
raw_items = d.get("items", [])
if not raw_items:
continue
order_numbers_with_items.add(d["order_number"])
for item in _dedup_items_by_sku(raw_items):
all_items.append((
d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"),
))
if all_items:
placeholders = ",".join("?" * len(order_numbers_with_items))
await db.execute(
f"DELETE FROM order_items WHERE order_number IN ({placeholders})",
tuple(order_numbers_with_items),
)
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", all_items)
async def _insert_single_order(db, d: dict):
"""Insert one order + its sync_run_orders row + its items.
Caller wraps in SAVEPOINT so a per-row failure doesn't poison the batch.
"""
await db.execute(_ORDERS_UPSERT_SQL, _orders_row(d))
await db.execute(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
(d["sync_run_id"], d["order_number"], d["status_at_run"]),
)
raw_items = d.get("items", [])
if raw_items:
await db.execute("DELETE FROM order_items WHERE order_number = ?", (d["order_number"],))
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"))
for item in _dedup_items_by_sku(raw_items)
])
async def save_orders_batch(orders_data: list[dict]):
"""Batch save orders + sync_run_orders + order_items with per-order isolation.
Three-tier strategy:
1. Pre-validate with validate_structural — malformed rows persist as
MALFORMED + error_message, no items, and do not participate in the
valid batch.
2. Optimistic executemany for the remaining valid rows (happy path).
3. On IntegrityError / ValueError / TypeError during the batch, fall back
to per-order SAVEPOINTs so a single bad row doesn't block the rest.
Per-order failures are marked MALFORMED + logged to the permanent
error-history file.
Re-raises OperationalError / OSError / ConnectionError / MemoryError at
the top level — the scheduler interprets these as halt signals. A
mid-loop ROLLBACK failure triggers a commit-before-reconnect so any
successfully-inserted work (including MALFORMED entries) persists.
"""
if not orders_data:
return
import sqlite3
# Structural pre-flight — local import avoids the services package cycle
from .validation_service import validate_structural
valid: list[dict] = []
malformed: list[dict] = []
for d in orders_data:
ok, err_type, err_msg = validate_structural(d)
if ok:
valid.append(d)
else:
fixed = _mark_malformed(d, f"{err_type}: {err_msg}")
malformed.append(fixed)
_log_order_error_history(fixed["order_number"], fixed["error_message"])
db = await get_sqlite()
try:
# 1. Upsert orders
await db.executemany(f"""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method, order_total,
delivery_cost, discount_total, web_status, discount_split)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
customer_name = excluded.customer_name,
status = CASE
WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
order_total = COALESCE(excluded.order_total, orders.order_total),
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
web_status = COALESCE(excluded.web_status, orders.web_status),
discount_split = COALESCE(excluded.discount_split, orders.discount_split),
updated_at = datetime('now')
""", [
(d["order_number"], d["order_date"], d["customer_name"], d["status"],
d.get("id_comanda"), d.get("id_partener"), d.get("error_message"),
json.dumps(d["missing_skus"]) if d.get("missing_skus") else None,
d.get("items_count", 0), d["sync_run_id"],
d.get("shipping_name"), d.get("billing_name"),
d.get("payment_method"), d.get("delivery_method"),
d.get("order_total"),
d.get("delivery_cost"), d.get("discount_total"),
d.get("web_status"), d.get("discount_split"))
for d in orders_data
])
if malformed:
await _insert_orders_only(db, malformed)
# 2. Sync run orders
await db.executemany("""
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
VALUES (?, ?, ?)
""", [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data])
if valid:
# Savepoint around the batch — lets us roll back to a clean point
# and take the per-order path without losing the MALFORMED rows
# inserted above.
await db.execute("SAVEPOINT batch")
try:
await _insert_valid_batch(db, valid)
await db.execute("RELEASE SAVEPOINT batch")
except (sqlite3.IntegrityError, ValueError, TypeError) as batch_err:
logger.warning(f"save_orders_batch: batch insert failed, falling back per-order: {batch_err}")
try:
await db.execute("ROLLBACK TO SAVEPOINT batch")
await db.execute("RELEASE SAVEPOINT batch")
except sqlite3.OperationalError:
# Savepoint rollback itself failed — commit whatever survived,
# reconnect, continue from scratch on the valid list.
db = await _safe_reconnect(db)
# 3. Order items — replace semantics (GoMag source of truth).
# Dedup per-order by SKU (GoMag sometimes returns same SKU twice).
all_items = []
order_numbers_with_items = set()
for d in orders_data:
raw_items = d.get("items", [])
if not raw_items:
continue
order_numbers_with_items.add(d["order_number"])
for item in _dedup_items_by_sku(raw_items):
all_items.append((
d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa")
))
if all_items:
placeholders = ",".join("?" * len(order_numbers_with_items))
await db.execute(
f"DELETE FROM order_items WHERE order_number IN ({placeholders})",
tuple(order_numbers_with_items)
)
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", all_items)
for d in valid:
try:
await db.execute("SAVEPOINT ord")
await _insert_single_order(db, d)
await db.execute("RELEASE SAVEPOINT ord")
except (sqlite3.IntegrityError, ValueError, TypeError) as per_err:
reason = f"RUNTIME: {type(per_err).__name__}: {per_err}"
try:
await db.execute("ROLLBACK TO SAVEPOINT ord")
await db.execute("RELEASE SAVEPOINT ord")
await _insert_orders_only(db, [_mark_malformed(d, reason)])
_log_order_error_history(d["order_number"], reason)
except sqlite3.OperationalError:
reason = f"RUNTIME (post-reconnect): {type(per_err).__name__}: {per_err}"
db = await _safe_reconnect(db)
await _insert_orders_only(db, [_mark_malformed(d, reason)])
_log_order_error_history(d["order_number"], reason)
await db.commit()
finally:
try:
await db.close()
except Exception:
pass
async def _safe_reconnect(db):
"""Commit whatever survived, close the broken connection, open a fresh one.
Called when a SAVEPOINT rollback itself raises OperationalError — the
connection is in a bad state and further work on it will fail. Commit
preserves the MALFORMED rows inserted before the explosion.
"""
try:
await db.commit()
except Exception:
pass
try:
await db.close()
except Exception:
pass
return await get_sqlite()
async def track_missing_sku(sku: str, product_name: str = "",