"""Tests for sync_phase_failures table + helpers.""" import os import sys import tempfile import pytest pytestmark = pytest.mark.unit _tmpdir = tempfile.mkdtemp() os.environ.setdefault("FORCE_THIN_MODE", "true") os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_spf.db")) os.environ.setdefault("ORACLE_DSN", "dummy") os.environ.setdefault("ORACLE_USER", "dummy") os.environ.setdefault("ORACLE_PASSWORD", "dummy") os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) _api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _api_dir not in sys.path: sys.path.insert(0, _api_dir) from app import database from app.services import sqlite_service @pytest.fixture(autouse=True) async def _reset(): database.init_sqlite() db = await sqlite_service.get_sqlite() try: await db.execute("DELETE FROM sync_phase_failures") await db.execute("DELETE FROM sync_runs") await db.commit() finally: await db.close() yield async def _make_run(run_id: str, offset_seconds: int = 0): db = await sqlite_service.get_sqlite() try: await db.execute( "INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')", (run_id, f"{offset_seconds} seconds"), ) await db.commit() finally: await db.close() async def test_table_created_on_init(): db = await sqlite_service.get_sqlite() try: cur = await db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_phase_failures'" ) row = await cur.fetchone() assert row is not None finally: await db.close() async def test_record_and_read_phase_failures(): await _make_run("run-1") await _make_run("run-2", offset_seconds=1) await sqlite_service.record_phase_failure("run-1", "price_sync", "IntegrityError: X") await sqlite_service.record_phase_failure("run-2", "price_sync", "IntegrityError: Y") counts = await sqlite_service.get_recent_phase_failures(limit=3) assert counts.get("price_sync") == 2 async def test_get_recent_limit_respected(): # 5 runs, each with a price_sync failure. Limit=3 should only count the latest 3. for i in range(5): run_id = f"run-{i}" await _make_run(run_id, offset_seconds=i) await sqlite_service.record_phase_failure(run_id, "price_sync", "fail") counts = await sqlite_service.get_recent_phase_failures(limit=3) assert counts["price_sync"] == 3 async def test_record_prunes_to_100_runs(): # Insert 105 runs each with a failure → table should end at <=100 rows after prune. for i in range(105): run_id = f"R{i:03d}" await _make_run(run_id, offset_seconds=i) await sqlite_service.record_phase_failure(run_id, "import_loop", "x") db = await sqlite_service.get_sqlite() try: cur = await db.execute("SELECT COUNT(*) FROM sync_phase_failures") (total,) = await cur.fetchone() finally: await db.close() assert total <= 100 async def test_empty_phase_failures_returns_empty_dict(): counts = await sqlite_service.get_recent_phase_failures(limit=3) assert counts == {} async def test_record_phase_failure_idempotent_per_run_phase(): """PRIMARY KEY (run_id, phase) → second insert same run+phase updates in place.""" await _make_run("run-idem") await sqlite_service.record_phase_failure("run-idem", "invoice_check", "first") await sqlite_service.record_phase_failure("run-idem", "invoice_check", "second") db = await sqlite_service.get_sqlite() try: cur = await db.execute( "SELECT COUNT(*), MAX(error_summary) FROM sync_phase_failures WHERE run_id=? AND phase=?", ("run-idem", "invoice_check"), ) (count, latest) = await cur.fetchone() finally: await db.close() assert count == 1 assert latest == "second"