diff --git a/api/app/database.py b/api/app/database.py index cb3584b..abee56f 100644 --- a/api/app/database.py +++ b/api/app/database.py @@ -186,6 +186,15 @@ CREATE TABLE IF NOT EXISTS anaf_cache ( denumire_anaf TEXT, checked_at TEXT NOT NULL ); + +CREATE TABLE IF NOT EXISTS sync_phase_failures ( + run_id TEXT NOT NULL REFERENCES sync_runs(run_id), + phase TEXT NOT NULL, + error_summary TEXT, + created_at TEXT DEFAULT (datetime('now')), + PRIMARY KEY (run_id, phase) +); +CREATE INDEX IF NOT EXISTS idx_spf_phase_time ON sync_phase_failures(phase, created_at); """ _sqlite_db_path = None diff --git a/api/app/services/sqlite_service.py b/api/app/services/sqlite_service.py index 89d4934..30c548e 100644 --- a/api/app/services/sqlite_service.py +++ b/api/app/services/sqlite_service.py @@ -842,6 +842,60 @@ async def add_order_items(order_number: str, items: list): await db.close() +# ── sync phase failure tracking ─────────────────── + + +async def record_phase_failure(run_id: str, phase: str, error_summary: str) -> None: + """Insert a phase-failure marker and prune to the last 100 sync runs. + + `error_summary` must be short (error_type + message) — no raw payload, + no PII. Used by _phase_wrap in sync_service to surface repeat failures + to the escalation check and the /api/sync/health dashboard pill. + """ + db = await get_sqlite() + try: + await db.execute( + """INSERT OR REPLACE INTO sync_phase_failures (run_id, phase, error_summary) + VALUES (?, ?, ?)""", + (run_id, phase, error_summary[:500] if error_summary else None), + ) + await db.execute(""" + DELETE FROM sync_phase_failures + WHERE run_id NOT IN ( + SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT 100 + ) + """) + await db.commit() + finally: + await db.close() + + +async def get_recent_phase_failures(limit: int = 3) -> dict[str, int]: + """Return a {phase: failure_count} map across the last N sync runs. + + Used by the escalation check (>=3 consecutive failures on the same + phase halts the next sync) and by /api/sync/health for the dashboard + pill. + """ + db = await get_sqlite() + try: + cursor = await db.execute( + """ + SELECT phase, COUNT(*) AS cnt + FROM sync_phase_failures + WHERE run_id IN ( + SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT ? + ) + GROUP BY phase + """, + (limit,), + ) + rows = await cursor.fetchall() + return {row[0]: row[1] for row in rows} + finally: + await db.close() + + async def get_order_items(order_number: str) -> list: """Fetch items for one order.""" db = await get_sqlite() diff --git a/api/tests/test_sync_phase_failures.py b/api/tests/test_sync_phase_failures.py new file mode 100644 index 0000000..7931f7e --- /dev/null +++ b/api/tests/test_sync_phase_failures.py @@ -0,0 +1,121 @@ +"""Tests for sync_phase_failures table + helpers.""" +import os +import sys +import tempfile + +import pytest + +pytestmark = pytest.mark.unit + +_tmpdir = tempfile.mkdtemp() +os.environ.setdefault("FORCE_THIN_MODE", "true") +os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_spf.db")) +os.environ.setdefault("ORACLE_DSN", "dummy") +os.environ.setdefault("ORACLE_USER", "dummy") +os.environ.setdefault("ORACLE_PASSWORD", "dummy") +os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) + +_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _api_dir not in sys.path: + sys.path.insert(0, _api_dir) + +from app import database +from app.services import sqlite_service + + +@pytest.fixture(autouse=True) +async def _reset(): + database.init_sqlite() + db = await sqlite_service.get_sqlite() + try: + await db.execute("DELETE FROM sync_phase_failures") + await db.execute("DELETE FROM sync_runs") + await db.commit() + finally: + await db.close() + yield + + +async def _make_run(run_id: str, offset_seconds: int = 0): + db = await sqlite_service.get_sqlite() + try: + await db.execute( + "INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')", + (run_id, f"{offset_seconds} seconds"), + ) + await db.commit() + finally: + await db.close() + + +async def test_table_created_on_init(): + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_phase_failures'" + ) + row = await cur.fetchone() + assert row is not None + finally: + await db.close() + + +async def test_record_and_read_phase_failures(): + await _make_run("run-1") + await _make_run("run-2", offset_seconds=1) + await sqlite_service.record_phase_failure("run-1", "price_sync", "IntegrityError: X") + await sqlite_service.record_phase_failure("run-2", "price_sync", "IntegrityError: Y") + + counts = await sqlite_service.get_recent_phase_failures(limit=3) + assert counts.get("price_sync") == 2 + + +async def test_get_recent_limit_respected(): + # 5 runs, each with a price_sync failure. Limit=3 should only count the latest 3. + for i in range(5): + run_id = f"run-{i}" + await _make_run(run_id, offset_seconds=i) + await sqlite_service.record_phase_failure(run_id, "price_sync", "fail") + + counts = await sqlite_service.get_recent_phase_failures(limit=3) + assert counts["price_sync"] == 3 + + +async def test_record_prunes_to_100_runs(): + # Insert 105 runs each with a failure → table should end at <=100 rows after prune. + for i in range(105): + run_id = f"R{i:03d}" + await _make_run(run_id, offset_seconds=i) + await sqlite_service.record_phase_failure(run_id, "import_loop", "x") + + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute("SELECT COUNT(*) FROM sync_phase_failures") + (total,) = await cur.fetchone() + finally: + await db.close() + assert total <= 100 + + +async def test_empty_phase_failures_returns_empty_dict(): + counts = await sqlite_service.get_recent_phase_failures(limit=3) + assert counts == {} + + +async def test_record_phase_failure_idempotent_per_run_phase(): + """PRIMARY KEY (run_id, phase) → second insert same run+phase updates in place.""" + await _make_run("run-idem") + await sqlite_service.record_phase_failure("run-idem", "invoice_check", "first") + await sqlite_service.record_phase_failure("run-idem", "invoice_check", "second") + + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute( + "SELECT COUNT(*), MAX(error_summary) FROM sync_phase_failures WHERE run_id=? AND phase=?", + ("run-idem", "invoice_check"), + ) + (count, latest) = await cur.fetchone() + finally: + await db.close() + assert count == 1 + assert latest == "second"