"""Integration tests for hybrid save_orders_batch with per-order isolation. Covers: - Regression 485224762 (dup SKU in one order) - Structural pre-flight → MALFORMED rows - Batch failure → per-order fallback with SAVEPOINT - Rollback-failure → commit-close-reconnect path """ import os import sys import sqlite3 import tempfile import pytest pytestmark = pytest.mark.unit _tmpdir = tempfile.mkdtemp() _sqlite_path = os.path.join(_tmpdir, "test_hybrid.db") os.environ.setdefault("FORCE_THIN_MODE", "true") os.environ.setdefault("SQLITE_DB_PATH", _sqlite_path) os.environ.setdefault("ORACLE_DSN", "dummy") os.environ.setdefault("ORACLE_USER", "dummy") os.environ.setdefault("ORACLE_PASSWORD", "dummy") os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) _api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _api_dir not in sys.path: sys.path.insert(0, _api_dir) from app import database from app.services import sqlite_service from app.constants import OrderStatus @pytest.fixture(autouse=True) async def _reset_db(): database.init_sqlite() db = await sqlite_service.get_sqlite() try: await db.execute("DELETE FROM order_items") await db.execute("DELETE FROM sync_run_orders") await db.execute("DELETE FROM orders") await db.execute("DELETE FROM sync_runs") await db.execute("INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now'), 'running')", ("test-run",)) await db.commit() finally: await db.close() yield def _order(order_number, status=OrderStatus.IMPORTED.value, items=None, **overrides): base = { "sync_run_id": "test-run", "order_number": order_number, "order_date": "2026-04-22 10:00:00", "customer_name": "Test Customer", "status": status, "status_at_run": status, "items_count": len(items) if items else 0, "items": items or [], } base.update(overrides) return base def _item(sku="SKU-A", qty=1, price=10.0): return { "sku": sku, "product_name": f"Product {sku}", "quantity": qty, "price": price, "baseprice": price, "vat": 19, "mapping_status": "direct", "codmat": None, "id_articol": None, "cantitate_roa": None, } async def _orders_with_status(status): db = await sqlite_service.get_sqlite() try: cur = await db.execute("SELECT order_number FROM orders WHERE status = ?", (status,)) rows = await cur.fetchall() return [r[0] for r in rows] finally: await db.close() async def _items_of(order_number): db = await sqlite_service.get_sqlite() try: cur = await db.execute("SELECT sku, quantity FROM order_items WHERE order_number = ?", (order_number,)) rows = await cur.fetchall() return [(r[0], r[1]) for r in rows] finally: await db.close() # ── 1. Regression 485224762 — dup SKU on one order ────────────── async def test_regression_dup_sku_485224762(): """Dedup helper must let this order through; hybrid path must import it.""" orders = [ _order("485224762", items=[_item("SKU-X", qty=2), _item("SKU-X", qty=3)]) ] await sqlite_service.save_orders_batch(orders) imported = await _orders_with_status(OrderStatus.IMPORTED.value) assert "485224762" in imported items = await _items_of("485224762") assert len(items) == 1 assert items[0][0] == "SKU-X" # Qty summed by _dedup_items_by_sku assert items[0][1] == 5 # ── 2. Structural pre-flight → MALFORMED ──────────────────────── async def test_structural_fail_empty_items(): orders = [_order("MAL-1", items=[])] await sqlite_service.save_orders_batch(orders) mal = await _orders_with_status(OrderStatus.MALFORMED.value) assert "MAL-1" in mal async def test_structural_fail_mixed_batch(): orders = [ _order("GOOD-1", items=[_item()]), _order("MAL-2", order_date="not-a-date", items=[_item()]), _order("GOOD-2", items=[_item("SKU-B", qty=1)]), ] await sqlite_service.save_orders_batch(orders) assert set(await _orders_with_status(OrderStatus.IMPORTED.value)) == {"GOOD-1", "GOOD-2"} assert await _orders_with_status(OrderStatus.MALFORMED.value) == ["MAL-2"] async def test_malformed_error_message_persisted(): orders = [_order("MAL-3", order_date="", items=[_item()])] await sqlite_service.save_orders_batch(orders) db = await sqlite_service.get_sqlite() try: cur = await db.execute("SELECT error_message FROM orders WHERE order_number = ?", ("MAL-3",)) row = await cur.fetchone() assert row is not None assert "INVALID_DATE" in row[0] finally: await db.close() # ── 3. Runtime-fail mid-batch → per-order fallback ─────────────── async def test_runtime_failure_isolated_per_order(monkeypatch): """One order triggers IntegrityError on insert; rest still land.""" import aiosqlite real_executemany = aiosqlite.core.Connection.executemany real_execute = aiosqlite.core.Connection.execute def _is_orders_insert(sql: str) -> bool: s = sql.upper() return "INTO ORDERS" in s and "ORDER_ITEMS" not in s and "SYNC_RUN_ORDERS" not in s def _is_poison(row): # row[0] = order_number, row[3] = status. Fail only when simulating # the real runtime crash; let the MALFORMED fallback write succeed. return row[0] == "POISON" and row[3] != OrderStatus.MALFORMED.value async def flaky_executemany(self, sql, rows): rows_list = list(rows) if _is_orders_insert(sql) and any(_is_poison(r) for r in rows_list): raise sqlite3.IntegrityError("simulated NOT NULL violation") return await real_executemany(self, sql, rows_list) async def flaky_execute(self, sql, params=None): if params and _is_orders_insert(sql) and _is_poison(params): raise sqlite3.IntegrityError("simulated NOT NULL violation per-order") return await real_execute(self, sql, params) if params is not None else await real_execute(self, sql) monkeypatch.setattr(aiosqlite.core.Connection, "executemany", flaky_executemany) monkeypatch.setattr(aiosqlite.core.Connection, "execute", flaky_execute) orders = [ _order("BATCH-1", items=[_item("SKU-1")]), _order("POISON", items=[_item("SKU-P")]), _order("BATCH-2", items=[_item("SKU-2")]), ] await sqlite_service.save_orders_batch(orders) imported = set(await _orders_with_status(OrderStatus.IMPORTED.value)) malformed = set(await _orders_with_status(OrderStatus.MALFORMED.value)) # BATCH-1 and BATCH-2 land as IMPORTED via per-order SAVEPOINT path. # POISON gets tagged MALFORMED because its single-order insert also raises. assert {"BATCH-1", "BATCH-2"}.issubset(imported) assert "POISON" in malformed # ── 4. Empty batch is a no-op ─────────────────────────────────── async def test_empty_batch_noop(): await sqlite_service.save_orders_batch([]) assert await _orders_with_status(OrderStatus.IMPORTED.value) == [] # ── 5. Caller dict not mutated on MALFORMED ───────────────────── async def test_caller_dict_not_mutated(): raw = _order("OK-1", items=[]) # structural-fail snapshot = dict(raw) await sqlite_service.save_orders_batch([raw]) # Caller's dict should be untouched assert raw["status"] == snapshot["status"] assert raw.get("error_message") == snapshot.get("error_message") assert raw["items"] == snapshot["items"] # ── 6. Reconnect path preserves prior work ────────────────────── async def test_reconnect_preserves_malformed_and_continues(monkeypatch): """If ROLLBACK TO SAVEPOINT itself fails, we commit, reconnect, keep going. We can't easily simulate the exact OperationalError, so we verify the helper is wired by inspecting its behaviour on a live connection. """ db = await sqlite_service.get_sqlite() try: # Insert a MALFORMED row directly, then invoke _safe_reconnect. await db.execute( "INSERT OR REPLACE INTO orders (order_number, status, order_date) VALUES (?, ?, ?)", ("BEFORE-RECON", OrderStatus.MALFORMED.value, "2026-04-22"), ) fresh = await sqlite_service._safe_reconnect(db) assert fresh is not None # Previous insert must be durable on fresh connection. cur = await fresh.execute( "SELECT status FROM orders WHERE order_number = ?", ("BEFORE-RECON",) ) row = await cur.fetchone() assert row is not None assert row[0] == OrderStatus.MALFORMED.value await fresh.close() finally: # fresh was already closed; nothing else to do pass # ── 7. _safe_upsert_order_items — success + savepoint rollback ── async def test_safe_upsert_items_happy_path(): # Seed parent order so FK context is valid. await sqlite_service.save_orders_batch([_order("SAFE-1", items=[])]) db = await sqlite_service.get_sqlite() try: ok = await sqlite_service._safe_upsert_order_items( db, "SAFE-1", [_item("SKU-H", qty=2)] ) await db.commit() finally: await db.close() assert ok is True items = await _items_of("SAFE-1") assert items == [("SKU-H", 2)] async def test_safe_upsert_items_rolls_back_and_marks_malformed(monkeypatch): await sqlite_service.save_orders_batch([_order("SAFE-2", items=[_item("PRE", qty=1)])]) import aiosqlite real_executemany = aiosqlite.core.Connection.executemany async def boom_on_items(self, sql, rows): if "INSERT INTO order_items" in sql.upper().replace("\n", " ").replace(" ", " ").upper() or "ORDER_ITEMS" in sql.upper(): raise sqlite3.IntegrityError("simulated items insert crash") return await real_executemany(self, sql, rows) monkeypatch.setattr(aiosqlite.core.Connection, "executemany", boom_on_items) db = await sqlite_service.get_sqlite() try: ok = await sqlite_service._safe_upsert_order_items( db, "SAFE-2", [_item("SKU-BAD", qty=1)] ) await db.commit() finally: await db.close() assert ok is False # Parent order was tagged MALFORMED, pre-existing items were wiped by DELETE # (which ran inside the rolled-back savepoint, so they should survive). malformed = await _orders_with_status(OrderStatus.MALFORMED.value) assert "SAFE-2" in malformed db = await sqlite_service.get_sqlite() try: cur = await db.execute( "SELECT error_message FROM orders WHERE order_number = ?", ("SAFE-2",) ) row = await cur.fetchone() assert row is not None and "ITEMS_FAIL" in row[0] finally: await db.close()