feat(sync): sync_phase_failures table for escalation tracking
New table sync_phase_failures(run_id, phase, error_summary, created_at)
with index on (phase, created_at). Minimal schema — no raw payload, no
PII — stores just enough to answer "did phase X fail in the last N
runs?" for the escalation check and the /api/sync/health pill.
Helpers in sqlite_service:
record_phase_failure(run_id, phase, error_summary)
INSERT OR REPLACE semantics (one row per run+phase), then prunes
to the most recent 100 sync_runs. error_summary clipped at 500
chars defensively.
get_recent_phase_failures(limit=3) → {phase: count} across the last N
runs, ordered by started_at desc.
6 unit tests cover creation, counting, pruning, empty state,
idempotency, and limit semantics.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -186,6 +186,15 @@ CREATE TABLE IF NOT EXISTS anaf_cache (
|
|||||||
denumire_anaf TEXT,
|
denumire_anaf TEXT,
|
||||||
checked_at TEXT NOT NULL
|
checked_at TEXT NOT NULL
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS sync_phase_failures (
|
||||||
|
run_id TEXT NOT NULL REFERENCES sync_runs(run_id),
|
||||||
|
phase TEXT NOT NULL,
|
||||||
|
error_summary TEXT,
|
||||||
|
created_at TEXT DEFAULT (datetime('now')),
|
||||||
|
PRIMARY KEY (run_id, phase)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_spf_phase_time ON sync_phase_failures(phase, created_at);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
_sqlite_db_path = None
|
_sqlite_db_path = None
|
||||||
|
|||||||
@@ -842,6 +842,60 @@ async def add_order_items(order_number: str, items: list):
|
|||||||
await db.close()
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ── sync phase failure tracking ───────────────────
|
||||||
|
|
||||||
|
|
||||||
|
async def record_phase_failure(run_id: str, phase: str, error_summary: str) -> None:
|
||||||
|
"""Insert a phase-failure marker and prune to the last 100 sync runs.
|
||||||
|
|
||||||
|
`error_summary` must be short (error_type + message) — no raw payload,
|
||||||
|
no PII. Used by _phase_wrap in sync_service to surface repeat failures
|
||||||
|
to the escalation check and the /api/sync/health dashboard pill.
|
||||||
|
"""
|
||||||
|
db = await get_sqlite()
|
||||||
|
try:
|
||||||
|
await db.execute(
|
||||||
|
"""INSERT OR REPLACE INTO sync_phase_failures (run_id, phase, error_summary)
|
||||||
|
VALUES (?, ?, ?)""",
|
||||||
|
(run_id, phase, error_summary[:500] if error_summary else None),
|
||||||
|
)
|
||||||
|
await db.execute("""
|
||||||
|
DELETE FROM sync_phase_failures
|
||||||
|
WHERE run_id NOT IN (
|
||||||
|
SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT 100
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
await db.commit()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def get_recent_phase_failures(limit: int = 3) -> dict[str, int]:
|
||||||
|
"""Return a {phase: failure_count} map across the last N sync runs.
|
||||||
|
|
||||||
|
Used by the escalation check (>=3 consecutive failures on the same
|
||||||
|
phase halts the next sync) and by /api/sync/health for the dashboard
|
||||||
|
pill.
|
||||||
|
"""
|
||||||
|
db = await get_sqlite()
|
||||||
|
try:
|
||||||
|
cursor = await db.execute(
|
||||||
|
"""
|
||||||
|
SELECT phase, COUNT(*) AS cnt
|
||||||
|
FROM sync_phase_failures
|
||||||
|
WHERE run_id IN (
|
||||||
|
SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT ?
|
||||||
|
)
|
||||||
|
GROUP BY phase
|
||||||
|
""",
|
||||||
|
(limit,),
|
||||||
|
)
|
||||||
|
rows = await cursor.fetchall()
|
||||||
|
return {row[0]: row[1] for row in rows}
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
async def get_order_items(order_number: str) -> list:
|
async def get_order_items(order_number: str) -> list:
|
||||||
"""Fetch items for one order."""
|
"""Fetch items for one order."""
|
||||||
db = await get_sqlite()
|
db = await get_sqlite()
|
||||||
|
|||||||
121
api/tests/test_sync_phase_failures.py
Normal file
121
api/tests/test_sync_phase_failures.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
"""Tests for sync_phase_failures table + helpers."""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.unit
|
||||||
|
|
||||||
|
_tmpdir = tempfile.mkdtemp()
|
||||||
|
os.environ.setdefault("FORCE_THIN_MODE", "true")
|
||||||
|
os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_spf.db"))
|
||||||
|
os.environ.setdefault("ORACLE_DSN", "dummy")
|
||||||
|
os.environ.setdefault("ORACLE_USER", "dummy")
|
||||||
|
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
|
||||||
|
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
|
||||||
|
|
||||||
|
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
if _api_dir not in sys.path:
|
||||||
|
sys.path.insert(0, _api_dir)
|
||||||
|
|
||||||
|
from app import database
|
||||||
|
from app.services import sqlite_service
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
async def _reset():
|
||||||
|
database.init_sqlite()
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
await db.execute("DELETE FROM sync_phase_failures")
|
||||||
|
await db.execute("DELETE FROM sync_runs")
|
||||||
|
await db.commit()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
async def _make_run(run_id: str, offset_seconds: int = 0):
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
await db.execute(
|
||||||
|
"INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')",
|
||||||
|
(run_id, f"{offset_seconds} seconds"),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_table_created_on_init():
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
cur = await db.execute(
|
||||||
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='sync_phase_failures'"
|
||||||
|
)
|
||||||
|
row = await cur.fetchone()
|
||||||
|
assert row is not None
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_record_and_read_phase_failures():
|
||||||
|
await _make_run("run-1")
|
||||||
|
await _make_run("run-2", offset_seconds=1)
|
||||||
|
await sqlite_service.record_phase_failure("run-1", "price_sync", "IntegrityError: X")
|
||||||
|
await sqlite_service.record_phase_failure("run-2", "price_sync", "IntegrityError: Y")
|
||||||
|
|
||||||
|
counts = await sqlite_service.get_recent_phase_failures(limit=3)
|
||||||
|
assert counts.get("price_sync") == 2
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_recent_limit_respected():
|
||||||
|
# 5 runs, each with a price_sync failure. Limit=3 should only count the latest 3.
|
||||||
|
for i in range(5):
|
||||||
|
run_id = f"run-{i}"
|
||||||
|
await _make_run(run_id, offset_seconds=i)
|
||||||
|
await sqlite_service.record_phase_failure(run_id, "price_sync", "fail")
|
||||||
|
|
||||||
|
counts = await sqlite_service.get_recent_phase_failures(limit=3)
|
||||||
|
assert counts["price_sync"] == 3
|
||||||
|
|
||||||
|
|
||||||
|
async def test_record_prunes_to_100_runs():
|
||||||
|
# Insert 105 runs each with a failure → table should end at <=100 rows after prune.
|
||||||
|
for i in range(105):
|
||||||
|
run_id = f"R{i:03d}"
|
||||||
|
await _make_run(run_id, offset_seconds=i)
|
||||||
|
await sqlite_service.record_phase_failure(run_id, "import_loop", "x")
|
||||||
|
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
cur = await db.execute("SELECT COUNT(*) FROM sync_phase_failures")
|
||||||
|
(total,) = await cur.fetchone()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
assert total <= 100
|
||||||
|
|
||||||
|
|
||||||
|
async def test_empty_phase_failures_returns_empty_dict():
|
||||||
|
counts = await sqlite_service.get_recent_phase_failures(limit=3)
|
||||||
|
assert counts == {}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_record_phase_failure_idempotent_per_run_phase():
|
||||||
|
"""PRIMARY KEY (run_id, phase) → second insert same run+phase updates in place."""
|
||||||
|
await _make_run("run-idem")
|
||||||
|
await sqlite_service.record_phase_failure("run-idem", "invoice_check", "first")
|
||||||
|
await sqlite_service.record_phase_failure("run-idem", "invoice_check", "second")
|
||||||
|
|
||||||
|
db = await sqlite_service.get_sqlite()
|
||||||
|
try:
|
||||||
|
cur = await db.execute(
|
||||||
|
"SELECT COUNT(*), MAX(error_summary) FROM sync_phase_failures WHERE run_id=? AND phase=?",
|
||||||
|
("run-idem", "invoice_check"),
|
||||||
|
)
|
||||||
|
(count, latest) = await cur.fetchone()
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
assert count == 1
|
||||||
|
assert latest == "second"
|
||||||
Reference in New Issue
Block a user