"""Tests for GET /api/sync/health.""" import os import sys import tempfile import pytest pytestmark = pytest.mark.unit _tmpdir = tempfile.mkdtemp() os.environ.setdefault("FORCE_THIN_MODE", "true") os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_health.db")) os.environ.setdefault("ORACLE_DSN", "dummy") os.environ.setdefault("ORACLE_USER", "dummy") os.environ.setdefault("ORACLE_PASSWORD", "dummy") os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) _api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _api_dir not in sys.path: sys.path.insert(0, _api_dir) from fastapi.testclient import TestClient from app import database from app.services import sqlite_service from app.main import app client = TestClient(app) @pytest.fixture(autouse=True) async def _reset(): database.init_sqlite() db = await sqlite_service.get_sqlite() try: await db.execute("DELETE FROM sync_phase_failures") await db.execute("DELETE FROM sync_runs") await db.commit() finally: await db.close() yield async def _make_run(run_id: str, status: str = "completed", offset: int = 0, error_message: str | None = None): db = await sqlite_service.get_sqlite() try: await db.execute( """INSERT INTO sync_runs (run_id, started_at, status, error_message) VALUES (?, datetime('now', ?), ?, ?)""", (run_id, f"{offset} seconds", status, error_message), ) await db.commit() finally: await db.close() async def test_health_empty_state(): r = client.get("/api/sync/health") assert r.status_code == 200 data = r.json() assert data["last_sync_at"] is None assert data["last_sync_status"] is None assert data["recent_phase_failures"] == {} assert data["escalation_phase"] is None assert data["is_healthy"] is True async def test_health_completed_is_healthy(): await _make_run("ok-1", status="completed") r = client.get("/api/sync/health") data = r.json() assert data["last_sync_status"] == "completed" assert data["is_healthy"] is True async def test_health_reports_last_failure(): await _make_run("fail-1", status="failed", error_message="boom") r = client.get("/api/sync/health") data = r.json() assert data["last_sync_status"] == "failed" assert data["last_halt_reason"] == "boom" assert data["is_healthy"] is False async def test_health_detects_escalation(): # 3 consecutive runs each with price_sync failure → escalation flagged. for i in range(3): run_id = f"esc-{i}" await _make_run(run_id, status="failed", offset=i, error_message="ESCALATED: phase price_sync failed 3 consecutive runs") await sqlite_service.record_phase_failure(run_id, "price_sync", "IntegrityError: X") r = client.get("/api/sync/health") data = r.json() assert data["escalation_phase"] == "price_sync" assert data["is_healthy"] is False assert data["recent_phase_failures"]["price_sync"] == 3 assert "ESCALATED" in (data["last_halt_reason"] or "") async def test_health_one_phase_failure_still_warning_not_healthy(): await _make_run("recent-fail", status="completed") await sqlite_service.record_phase_failure("recent-fail", "invoice_check", "err") r = client.get("/api/sync/health") data = r.json() # 1 recent phase failure → is_healthy stays True (<=1 tolerance); healthy assert data["is_healthy"] is True assert data["recent_phase_failures"]["invoice_check"] == 1