"""Tests for _phase_wrap + escalation check in sync_service. These cover: - _record_phase_err persists a sync_phase_failures row - _check_escalation returns None below threshold - _check_escalation halts when a phase has failed 3 runs in a row - run_sync short-circuits when escalation flags a phase """ import os import sys import sqlite3 import tempfile import pytest pytestmark = pytest.mark.unit _tmpdir = tempfile.mkdtemp() os.environ.setdefault("FORCE_THIN_MODE", "true") os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_phase.db")) os.environ.setdefault("ORACLE_DSN", "dummy") os.environ.setdefault("ORACLE_USER", "dummy") os.environ.setdefault("ORACLE_PASSWORD", "dummy") os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) _api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _api_dir not in sys.path: sys.path.insert(0, _api_dir) from app import database from app.services import sqlite_service, sync_service @pytest.fixture(autouse=True) async def _reset(): database.init_sqlite() db = await sqlite_service.get_sqlite() try: await db.execute("DELETE FROM sync_phase_failures") await db.execute("DELETE FROM sync_runs") await db.commit() finally: await db.close() yield async def _make_run(run_id: str, offset: int = 0): db = await sqlite_service.get_sqlite() try: await db.execute( "INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')", (run_id, f"{offset} seconds"), ) await db.commit() finally: await db.close() async def test_record_phase_err_inserts_row(): await _make_run("rec-1") err = sqlite3.IntegrityError("simulated NOT NULL") await sync_service._record_phase_err("rec-1", "price_sync", err) db = await sqlite_service.get_sqlite() try: cur = await db.execute( "SELECT phase, error_summary FROM sync_phase_failures WHERE run_id = ?", ("rec-1",), ) row = await cur.fetchone() finally: await db.close() assert row is not None assert row[0] == "price_sync" assert "IntegrityError" in row[1] async def test_check_escalation_below_threshold(): # 2 runs, each with invoice_check failure — below threshold. for i in range(2): await _make_run(f"run-{i}", offset=i) await sqlite_service.record_phase_failure(f"run-{i}", "invoice_check", "err") phase, counts = await sync_service._check_escalation() assert phase is None assert counts.get("invoice_check") == 2 async def test_check_escalation_hits_threshold(): for i in range(3): await _make_run(f"run-{i}", offset=i) await sqlite_service.record_phase_failure(f"run-{i}", "import_loop", "err") phase, counts = await sync_service._check_escalation() assert phase == "import_loop" assert counts.get("import_loop") == 3 async def test_check_escalation_different_phases_dont_escalate(): # 3 runs, each failed on a different phase — no single phase hits 3. phases = ["price_sync", "invoice_check", "anaf_backfill"] for i, p in enumerate(phases): await _make_run(f"run-{i}", offset=i) await sqlite_service.record_phase_failure(f"run-{i}", p, "err") phase, counts = await sync_service._check_escalation() assert phase is None assert len(counts) == 3 async def test_run_sync_short_circuits_on_escalation(monkeypatch): """With 3 consecutive price_sync failures, run_sync must halt without touching gomag_client, order_reader, etc.""" for i in range(3): await _make_run(f"prev-{i}", offset=i) await sqlite_service.record_phase_failure(f"prev-{i}", "price_sync", "err") # Sentinel: if sync proceeds to the download step, this will fire. async def _boom(*args, **kwargs): raise AssertionError("escalation should have halted before gomag download") monkeypatch.setattr(sync_service.gomag_client, "download_orders", _boom) result = await sync_service.run_sync(run_id="halt-test") assert result["status"] == "halted_escalation" assert "price_sync" in result["error"] # sync_runs row should be persisted with halted_escalation status db = await sqlite_service.get_sqlite() try: cur = await db.execute( "SELECT status, error_message FROM sync_runs WHERE run_id = ?", ("halt-test",) ) row = await cur.fetchone() finally: await db.close() assert row is not None assert row[0] == "halted_escalation" assert "ESCALATED" in row[1] async def test_data_errors_tuple_shape(): """Contract: DATA_ERRORS is a tuple covering the structural error types.""" assert sqlite3.IntegrityError in sync_service.DATA_ERRORS assert ValueError in sync_service.DATA_ERRORS assert TypeError in sync_service.DATA_ERRORS assert UnicodeError in sync_service.DATA_ERRORS # Must NOT include OperationalError — that halts the sync. assert sqlite3.OperationalError not in sync_service.DATA_ERRORS