From f448f74b2d09b3a7749cf86d3d53b4b6fd982930 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Wed, 22 Apr 2026 08:59:43 +0000 Subject: [PATCH] feat(sync): hybrid batch+savepoint isolation with reconnect fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit save_orders_batch now runs in three tiers: 1. validate_structural pre-flight splits each payload into valid or MALFORMED. MALFORMED rows persist with status + error_message + no items, and an append-only entry lands in sync_errors_history.log. 2. Optimistic executemany over the valid list inside a SAVEPOINT batch. 3. On IntegrityError / ValueError / TypeError, rollback the savepoint and fall back to per-order SAVEPOINT inserts so a single bad row cannot poison the rest of the batch. Mid-loop SAVEPOINT rollback failure now triggers _safe_reconnect: commit whatever survived, close the broken connection, open a fresh one and keep processing. Preserves MALFORMED rows recorded earlier — addresses the outside-voice gap where a crashed connection would lose uncommitted malformed evidence. Adds OrderStatus.MALFORMED and helper functions: _insert_orders_only — orders + sync_run_orders, no items _insert_valid_batch — happy-path bulk executemany _insert_single_order — per-order execute within savepoint _mark_malformed — non-mutating copy with wiped items _safe_reconnect — commit-close-reconnect guard 8 integration tests covering regression 485224762, structural pre-flight, per-order isolation on runtime fail, caller-dict immutability, and reconnect durability. 239 unit + 33 e2e green. Co-Authored-By: Claude Opus 4.7 (1M context) --- api/app/constants.py | 5 + api/app/services/sqlite_service.py | 331 +++++++++++++++------ api/tests/test_save_orders_batch_hybrid.py | 249 ++++++++++++++++ 3 files changed, 496 insertions(+), 89 deletions(-) create mode 100644 api/tests/test_save_orders_batch_hybrid.py diff --git a/api/app/constants.py b/api/app/constants.py index ecb9f2f..65dcf79 100644 --- a/api/app/constants.py +++ b/api/app/constants.py @@ -15,3 +15,8 @@ class OrderStatus(str, Enum): ERROR = "ERROR" CANCELLED = "CANCELLED" DELETED_IN_ROA = "DELETED_IN_ROA" + # Structural-fail: GoMag sent a payload that cannot be inserted as-is + # (missing fields, unparseable date, invalid quantity/price, or a runtime + # insert crash). Row persists with status=MALFORMED + error_message so + # operators can escalate to GoMag without blocking the rest of the batch. + MALFORMED = "MALFORMED" diff --git a/api/app/services/sqlite_service.py b/api/app/services/sqlite_service.py index 68d0329..81b0fd2 100644 --- a/api/app/services/sqlite_service.py +++ b/api/app/services/sqlite_service.py @@ -184,107 +184,260 @@ async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: await db.close() -async def save_orders_batch(orders_data: list[dict]): - """Batch save a list of orders + their sync_run_orders + order_items in one transaction. +# SQL for the orders upsert — reused by batch + single-order fallback paths. +_ORDERS_UPSERT_SQL = f""" + INSERT INTO orders + (order_number, order_date, customer_name, status, + id_comanda, id_partener, error_message, missing_skus, items_count, + last_sync_run_id, shipping_name, billing_name, + payment_method, delivery_method, order_total, + delivery_cost, discount_total, web_status, discount_split) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(order_number) DO UPDATE SET + customer_name = excluded.customer_name, + status = CASE + WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}' + THEN orders.status + ELSE excluded.status + END, + error_message = excluded.error_message, + missing_skus = excluded.missing_skus, + items_count = excluded.items_count, + id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda), + id_partener = COALESCE(excluded.id_partener, orders.id_partener), + times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}' + THEN orders.times_skipped + 1 + ELSE orders.times_skipped END, + last_sync_run_id = excluded.last_sync_run_id, + shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name), + billing_name = COALESCE(excluded.billing_name, orders.billing_name), + payment_method = COALESCE(excluded.payment_method, orders.payment_method), + delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method), + order_total = COALESCE(excluded.order_total, orders.order_total), + delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost), + discount_total = COALESCE(excluded.discount_total, orders.discount_total), + web_status = COALESCE(excluded.web_status, orders.web_status), + discount_split = COALESCE(excluded.discount_split, orders.discount_split), + updated_at = datetime('now') +""" - Each dict must have: sync_run_id, order_number, order_date, customer_name, status, - id_comanda, id_partener, error_message, missing_skus (list|None), items_count, - shipping_name, billing_name, payment_method, delivery_method, status_at_run, - items (list of item dicts), delivery_cost (optional), discount_total (optional), - web_status (optional). + +def _orders_row(d: dict) -> tuple: + return ( + d["order_number"], d["order_date"], d["customer_name"], d["status"], + d.get("id_comanda"), d.get("id_partener"), d.get("error_message"), + json.dumps(d["missing_skus"]) if d.get("missing_skus") else None, + d.get("items_count", 0), d["sync_run_id"], + d.get("shipping_name"), d.get("billing_name"), + d.get("payment_method"), d.get("delivery_method"), + d.get("order_total"), + d.get("delivery_cost"), d.get("discount_total"), + d.get("web_status"), d.get("discount_split"), + ) + + +def _mark_malformed(d: dict, reason: str) -> dict: + """Return a copy of d with status=MALFORMED, error_message set, items wiped. + + Does NOT mutate caller's dict. + """ + out = dict(d) + out["status"] = OrderStatus.MALFORMED.value + out["error_message"] = reason + out["items"] = [] + out["items_count"] = 0 + out["missing_skus"] = None + return out + + +async def _insert_orders_only(db, orders: list[dict]): + """Upsert only the `orders` + `sync_run_orders` rows (no items). + + Used for MALFORMED fallback where we can't trust item data. + """ + if not orders: + return + await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders]) + await db.executemany( + "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", + [(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) for d in orders], + ) + + +async def _insert_valid_batch(db, orders: list[dict]): + """Happy-path batch insert: orders + sync_run_orders + order_items in bulk. + + Caller wraps in a SAVEPOINT so a mid-batch failure rolls back to a clean + point and the per-order fallback can take over. + """ + if not orders: + return + await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders]) + await db.executemany( + "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", + [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders], + ) + + all_items: list[tuple] = [] + order_numbers_with_items: set = set() + for d in orders: + raw_items = d.get("items", []) + if not raw_items: + continue + order_numbers_with_items.add(d["order_number"]) + for item in _dedup_items_by_sku(raw_items): + all_items.append(( + d["order_number"], + item.get("sku"), item.get("product_name"), + item.get("quantity"), item.get("price"), item.get("baseprice"), + item.get("vat"), + item.get("mapping_status"), item.get("codmat"), + item.get("id_articol"), item.get("cantitate_roa"), + )) + if all_items: + placeholders = ",".join("?" * len(order_numbers_with_items)) + await db.execute( + f"DELETE FROM order_items WHERE order_number IN ({placeholders})", + tuple(order_numbers_with_items), + ) + await db.executemany(""" + INSERT INTO order_items + (order_number, sku, product_name, quantity, price, baseprice, + vat, mapping_status, codmat, id_articol, cantitate_roa) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, all_items) + + +async def _insert_single_order(db, d: dict): + """Insert one order + its sync_run_orders row + its items. + + Caller wraps in SAVEPOINT so a per-row failure doesn't poison the batch. + """ + await db.execute(_ORDERS_UPSERT_SQL, _orders_row(d)) + await db.execute( + "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", + (d["sync_run_id"], d["order_number"], d["status_at_run"]), + ) + raw_items = d.get("items", []) + if raw_items: + await db.execute("DELETE FROM order_items WHERE order_number = ?", (d["order_number"],)) + await db.executemany(""" + INSERT INTO order_items + (order_number, sku, product_name, quantity, price, baseprice, + vat, mapping_status, codmat, id_articol, cantitate_roa) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, [ + (d["order_number"], + item.get("sku"), item.get("product_name"), + item.get("quantity"), item.get("price"), item.get("baseprice"), + item.get("vat"), + item.get("mapping_status"), item.get("codmat"), + item.get("id_articol"), item.get("cantitate_roa")) + for item in _dedup_items_by_sku(raw_items) + ]) + + +async def save_orders_batch(orders_data: list[dict]): + """Batch save orders + sync_run_orders + order_items with per-order isolation. + + Three-tier strategy: + 1. Pre-validate with validate_structural — malformed rows persist as + MALFORMED + error_message, no items, and do not participate in the + valid batch. + 2. Optimistic executemany for the remaining valid rows (happy path). + 3. On IntegrityError / ValueError / TypeError during the batch, fall back + to per-order SAVEPOINTs so a single bad row doesn't block the rest. + Per-order failures are marked MALFORMED + logged to the permanent + error-history file. + + Re-raises OperationalError / OSError / ConnectionError / MemoryError at + the top level — the scheduler interprets these as halt signals. A + mid-loop ROLLBACK failure triggers a commit-before-reconnect so any + successfully-inserted work (including MALFORMED entries) persists. """ if not orders_data: return + + import sqlite3 + # Structural pre-flight — local import avoids the services package cycle + from .validation_service import validate_structural + + valid: list[dict] = [] + malformed: list[dict] = [] + for d in orders_data: + ok, err_type, err_msg = validate_structural(d) + if ok: + valid.append(d) + else: + fixed = _mark_malformed(d, f"{err_type}: {err_msg}") + malformed.append(fixed) + _log_order_error_history(fixed["order_number"], fixed["error_message"]) + db = await get_sqlite() try: - # 1. Upsert orders - await db.executemany(f""" - INSERT INTO orders - (order_number, order_date, customer_name, status, - id_comanda, id_partener, error_message, missing_skus, items_count, - last_sync_run_id, shipping_name, billing_name, - payment_method, delivery_method, order_total, - delivery_cost, discount_total, web_status, discount_split) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(order_number) DO UPDATE SET - customer_name = excluded.customer_name, - status = CASE - WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}' - THEN orders.status - ELSE excluded.status - END, - error_message = excluded.error_message, - missing_skus = excluded.missing_skus, - items_count = excluded.items_count, - id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda), - id_partener = COALESCE(excluded.id_partener, orders.id_partener), - times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}' - THEN orders.times_skipped + 1 - ELSE orders.times_skipped END, - last_sync_run_id = excluded.last_sync_run_id, - shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name), - billing_name = COALESCE(excluded.billing_name, orders.billing_name), - payment_method = COALESCE(excluded.payment_method, orders.payment_method), - delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method), - order_total = COALESCE(excluded.order_total, orders.order_total), - delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost), - discount_total = COALESCE(excluded.discount_total, orders.discount_total), - web_status = COALESCE(excluded.web_status, orders.web_status), - discount_split = COALESCE(excluded.discount_split, orders.discount_split), - updated_at = datetime('now') - """, [ - (d["order_number"], d["order_date"], d["customer_name"], d["status"], - d.get("id_comanda"), d.get("id_partener"), d.get("error_message"), - json.dumps(d["missing_skus"]) if d.get("missing_skus") else None, - d.get("items_count", 0), d["sync_run_id"], - d.get("shipping_name"), d.get("billing_name"), - d.get("payment_method"), d.get("delivery_method"), - d.get("order_total"), - d.get("delivery_cost"), d.get("discount_total"), - d.get("web_status"), d.get("discount_split")) - for d in orders_data - ]) + if malformed: + await _insert_orders_only(db, malformed) - # 2. Sync run orders - await db.executemany(""" - INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) - VALUES (?, ?, ?) - """, [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data]) + if valid: + # Savepoint around the batch — lets us roll back to a clean point + # and take the per-order path without losing the MALFORMED rows + # inserted above. + await db.execute("SAVEPOINT batch") + try: + await _insert_valid_batch(db, valid) + await db.execute("RELEASE SAVEPOINT batch") + except (sqlite3.IntegrityError, ValueError, TypeError) as batch_err: + logger.warning(f"save_orders_batch: batch insert failed, falling back per-order: {batch_err}") + try: + await db.execute("ROLLBACK TO SAVEPOINT batch") + await db.execute("RELEASE SAVEPOINT batch") + except sqlite3.OperationalError: + # Savepoint rollback itself failed — commit whatever survived, + # reconnect, continue from scratch on the valid list. + db = await _safe_reconnect(db) - # 3. Order items — replace semantics (GoMag source of truth). - # Dedup per-order by SKU (GoMag sometimes returns same SKU twice). - all_items = [] - order_numbers_with_items = set() - for d in orders_data: - raw_items = d.get("items", []) - if not raw_items: - continue - order_numbers_with_items.add(d["order_number"]) - for item in _dedup_items_by_sku(raw_items): - all_items.append(( - d["order_number"], - item.get("sku"), item.get("product_name"), - item.get("quantity"), item.get("price"), item.get("baseprice"), - item.get("vat"), - item.get("mapping_status"), item.get("codmat"), - item.get("id_articol"), item.get("cantitate_roa") - )) - if all_items: - placeholders = ",".join("?" * len(order_numbers_with_items)) - await db.execute( - f"DELETE FROM order_items WHERE order_number IN ({placeholders})", - tuple(order_numbers_with_items) - ) - await db.executemany(""" - INSERT INTO order_items - (order_number, sku, product_name, quantity, price, baseprice, - vat, mapping_status, codmat, id_articol, cantitate_roa) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, all_items) + for d in valid: + try: + await db.execute("SAVEPOINT ord") + await _insert_single_order(db, d) + await db.execute("RELEASE SAVEPOINT ord") + except (sqlite3.IntegrityError, ValueError, TypeError) as per_err: + reason = f"RUNTIME: {type(per_err).__name__}: {per_err}" + try: + await db.execute("ROLLBACK TO SAVEPOINT ord") + await db.execute("RELEASE SAVEPOINT ord") + await _insert_orders_only(db, [_mark_malformed(d, reason)]) + _log_order_error_history(d["order_number"], reason) + except sqlite3.OperationalError: + reason = f"RUNTIME (post-reconnect): {type(per_err).__name__}: {per_err}" + db = await _safe_reconnect(db) + await _insert_orders_only(db, [_mark_malformed(d, reason)]) + _log_order_error_history(d["order_number"], reason) await db.commit() finally: + try: + await db.close() + except Exception: + pass + + +async def _safe_reconnect(db): + """Commit whatever survived, close the broken connection, open a fresh one. + + Called when a SAVEPOINT rollback itself raises OperationalError — the + connection is in a bad state and further work on it will fail. Commit + preserves the MALFORMED rows inserted before the explosion. + """ + try: + await db.commit() + except Exception: + pass + try: await db.close() + except Exception: + pass + return await get_sqlite() async def track_missing_sku(sku: str, product_name: str = "", diff --git a/api/tests/test_save_orders_batch_hybrid.py b/api/tests/test_save_orders_batch_hybrid.py new file mode 100644 index 0000000..1ec55c8 --- /dev/null +++ b/api/tests/test_save_orders_batch_hybrid.py @@ -0,0 +1,249 @@ +"""Integration tests for hybrid save_orders_batch with per-order isolation. + +Covers: +- Regression 485224762 (dup SKU in one order) +- Structural pre-flight → MALFORMED rows +- Batch failure → per-order fallback with SAVEPOINT +- Rollback-failure → commit-close-reconnect path +""" +import os +import sys +import sqlite3 +import tempfile + +import pytest + +pytestmark = pytest.mark.unit + +_tmpdir = tempfile.mkdtemp() +_sqlite_path = os.path.join(_tmpdir, "test_hybrid.db") + +os.environ.setdefault("FORCE_THIN_MODE", "true") +os.environ.setdefault("SQLITE_DB_PATH", _sqlite_path) +os.environ.setdefault("ORACLE_DSN", "dummy") +os.environ.setdefault("ORACLE_USER", "dummy") +os.environ.setdefault("ORACLE_PASSWORD", "dummy") +os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) + +_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _api_dir not in sys.path: + sys.path.insert(0, _api_dir) + +from app import database +from app.services import sqlite_service +from app.constants import OrderStatus + + +@pytest.fixture(autouse=True) +async def _reset_db(): + database.init_sqlite() + db = await sqlite_service.get_sqlite() + try: + await db.execute("DELETE FROM order_items") + await db.execute("DELETE FROM sync_run_orders") + await db.execute("DELETE FROM orders") + await db.execute("DELETE FROM sync_runs") + await db.execute("INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now'), 'running')", ("test-run",)) + await db.commit() + finally: + await db.close() + yield + + +def _order(order_number, status=OrderStatus.IMPORTED.value, items=None, **overrides): + base = { + "sync_run_id": "test-run", + "order_number": order_number, + "order_date": "2026-04-22 10:00:00", + "customer_name": "Test Customer", + "status": status, + "status_at_run": status, + "items_count": len(items) if items else 0, + "items": items or [], + } + base.update(overrides) + return base + + +def _item(sku="SKU-A", qty=1, price=10.0): + return { + "sku": sku, "product_name": f"Product {sku}", + "quantity": qty, "price": price, "baseprice": price, + "vat": 19, "mapping_status": "direct", "codmat": None, + "id_articol": None, "cantitate_roa": None, + } + + +async def _orders_with_status(status): + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute("SELECT order_number FROM orders WHERE status = ?", (status,)) + rows = await cur.fetchall() + return [r[0] for r in rows] + finally: + await db.close() + + +async def _items_of(order_number): + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute("SELECT sku, quantity FROM order_items WHERE order_number = ?", (order_number,)) + rows = await cur.fetchall() + return [(r[0], r[1]) for r in rows] + finally: + await db.close() + + +# ── 1. Regression 485224762 — dup SKU on one order ────────────── + + +async def test_regression_dup_sku_485224762(): + """Dedup helper must let this order through; hybrid path must import it.""" + orders = [ + _order("485224762", items=[_item("SKU-X", qty=2), _item("SKU-X", qty=3)]) + ] + await sqlite_service.save_orders_batch(orders) + + imported = await _orders_with_status(OrderStatus.IMPORTED.value) + assert "485224762" in imported + + items = await _items_of("485224762") + assert len(items) == 1 + assert items[0][0] == "SKU-X" + # Qty summed by _dedup_items_by_sku + assert items[0][1] == 5 + + +# ── 2. Structural pre-flight → MALFORMED ──────────────────────── + + +async def test_structural_fail_empty_items(): + orders = [_order("MAL-1", items=[])] + await sqlite_service.save_orders_batch(orders) + mal = await _orders_with_status(OrderStatus.MALFORMED.value) + assert "MAL-1" in mal + + +async def test_structural_fail_mixed_batch(): + orders = [ + _order("GOOD-1", items=[_item()]), + _order("MAL-2", order_date="not-a-date", items=[_item()]), + _order("GOOD-2", items=[_item("SKU-B", qty=1)]), + ] + await sqlite_service.save_orders_batch(orders) + + assert set(await _orders_with_status(OrderStatus.IMPORTED.value)) == {"GOOD-1", "GOOD-2"} + assert await _orders_with_status(OrderStatus.MALFORMED.value) == ["MAL-2"] + + +async def test_malformed_error_message_persisted(): + orders = [_order("MAL-3", order_date="", items=[_item()])] + await sqlite_service.save_orders_batch(orders) + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute("SELECT error_message FROM orders WHERE order_number = ?", ("MAL-3",)) + row = await cur.fetchone() + assert row is not None + assert "INVALID_DATE" in row[0] + finally: + await db.close() + + +# ── 3. Runtime-fail mid-batch → per-order fallback ─────────────── + + +async def test_runtime_failure_isolated_per_order(monkeypatch): + """One order triggers IntegrityError on insert; rest still land.""" + import aiosqlite + + real_executemany = aiosqlite.core.Connection.executemany + real_execute = aiosqlite.core.Connection.execute + + def _is_orders_insert(sql: str) -> bool: + s = sql.upper() + return "INTO ORDERS" in s and "ORDER_ITEMS" not in s and "SYNC_RUN_ORDERS" not in s + + def _is_poison(row): + # row[0] = order_number, row[3] = status. Fail only when simulating + # the real runtime crash; let the MALFORMED fallback write succeed. + return row[0] == "POISON" and row[3] != OrderStatus.MALFORMED.value + + async def flaky_executemany(self, sql, rows): + rows_list = list(rows) + if _is_orders_insert(sql) and any(_is_poison(r) for r in rows_list): + raise sqlite3.IntegrityError("simulated NOT NULL violation") + return await real_executemany(self, sql, rows_list) + + async def flaky_execute(self, sql, params=None): + if params and _is_orders_insert(sql) and _is_poison(params): + raise sqlite3.IntegrityError("simulated NOT NULL violation per-order") + return await real_execute(self, sql, params) if params is not None else await real_execute(self, sql) + + monkeypatch.setattr(aiosqlite.core.Connection, "executemany", flaky_executemany) + monkeypatch.setattr(aiosqlite.core.Connection, "execute", flaky_execute) + + orders = [ + _order("BATCH-1", items=[_item("SKU-1")]), + _order("POISON", items=[_item("SKU-P")]), + _order("BATCH-2", items=[_item("SKU-2")]), + ] + await sqlite_service.save_orders_batch(orders) + + imported = set(await _orders_with_status(OrderStatus.IMPORTED.value)) + malformed = set(await _orders_with_status(OrderStatus.MALFORMED.value)) + # BATCH-1 and BATCH-2 land as IMPORTED via per-order SAVEPOINT path. + # POISON gets tagged MALFORMED because its single-order insert also raises. + assert {"BATCH-1", "BATCH-2"}.issubset(imported) + assert "POISON" in malformed + + +# ── 4. Empty batch is a no-op ─────────────────────────────────── + + +async def test_empty_batch_noop(): + await sqlite_service.save_orders_batch([]) + assert await _orders_with_status(OrderStatus.IMPORTED.value) == [] + + +# ── 5. Caller dict not mutated on MALFORMED ───────────────────── + + +async def test_caller_dict_not_mutated(): + raw = _order("OK-1", items=[]) # structural-fail + snapshot = dict(raw) + await sqlite_service.save_orders_batch([raw]) + # Caller's dict should be untouched + assert raw["status"] == snapshot["status"] + assert raw.get("error_message") == snapshot.get("error_message") + assert raw["items"] == snapshot["items"] + + +# ── 6. Reconnect path preserves prior work ────────────────────── + + +async def test_reconnect_preserves_malformed_and_continues(monkeypatch): + """If ROLLBACK TO SAVEPOINT itself fails, we commit, reconnect, keep going. + + We can't easily simulate the exact OperationalError, so we verify the + helper is wired by inspecting its behaviour on a live connection. + """ + db = await sqlite_service.get_sqlite() + try: + # Insert a MALFORMED row directly, then invoke _safe_reconnect. + await db.execute( + "INSERT OR REPLACE INTO orders (order_number, status, order_date) VALUES (?, ?, ?)", + ("BEFORE-RECON", OrderStatus.MALFORMED.value, "2026-04-22"), + ) + fresh = await sqlite_service._safe_reconnect(db) + assert fresh is not None + # Previous insert must be durable on fresh connection. + cur = await fresh.execute( + "SELECT status FROM orders WHERE order_number = ?", ("BEFORE-RECON",) + ) + row = await cur.fetchone() + assert row is not None + assert row[0] == OrderStatus.MALFORMED.value + await fresh.close() + finally: + # fresh was already closed; nothing else to do + pass