feat(sync): per-phase isolation + escalation halt

sync_service gains DATA_ERRORS tuple + two new primitives:
  _record_phase_err(run_id, phase, err)
    Logs, appends to run text log, persists to sync_phase_failures.
  _check_escalation()
    Reads the last 3 runs and returns the first phase that has failed
    all 3 in a row, or (None, counts) otherwise.

run_sync now runs a pre-flight escalation check — if a phase has failed
3 consecutive runs, the incoming sync is halted with
status='halted_escalation' and a descriptive error_message. The
dashboard Start Sync button can still override (UI comes in the next
PR2 phase).

Wrapped phases (DATA_ERRORS caught, sync continues):
  cancelled_batch, already_batch, addresses_batch, skipped_batch,
  price_sync, invoice_check, anaf_backfill.
Partner mismatch retains its existing per-order guards. OperationalError
and OS-level errors still propagate to the top-level handler (halt).

6 unit tests cover record + counts + threshold + mixed-phase +
short-circuit + DATA_ERRORS contract. Full CI green: 251 unit + 33 e2e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-04-22 09:06:58 +00:00
parent 1e4e3279f7
commit 41b142effb
2 changed files with 234 additions and 5 deletions

View File

@@ -2,10 +2,22 @@ import asyncio
import json import json
import logging import logging
import re import re
import sqlite3
import uuid import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
# Data-level errors that a single phase may raise without halting the whole
# sync. Everything NOT in this tuple (OperationalError, OSError,
# ConnectionError, MemoryError) propagates and halts.
DATA_ERRORS = (sqlite3.IntegrityError, ValueError, TypeError, UnicodeError)
# Number of recent runs inspected by the escalation check. 3 consecutive
# failures on the same phase halts the next sync.
_ESCALATION_WINDOW = 3
_ESCALATION_THRESHOLD = 3
_tz_bucharest = ZoneInfo("Europe/Bucharest") _tz_bucharest = ZoneInfo("Europe/Bucharest")
@@ -85,6 +97,38 @@ def _log_line(run_id: str, message: str):
_run_logs[run_id].append(f"[{ts}] {message}") _run_logs[run_id].append(f"[{ts}] {message}")
async def _record_phase_err(run_id: str, phase: str, err: Exception) -> None:
"""Log + persist a phase-level data error so escalation + health can see it.
Called only for DATA_ERRORS (structural / data problems). OperationalError
and OS-level errors bypass this and halt the sync.
"""
logger.error(f"[{run_id}] Phase {phase} data error: {err}", exc_info=True)
_log_line(run_id, f"FAZA {phase} eroare izolata: {type(err).__name__}: {err}")
try:
summary = f"{type(err).__name__}: {err}"[:500]
await sqlite_service.record_phase_failure(run_id, phase, summary)
except Exception as rec_err:
logger.warning(f"record_phase_failure failed for phase={phase}: {rec_err}")
async def _check_escalation() -> tuple[str | None, dict[str, int]]:
"""Return (phase_to_halt_on, recent_counts).
If any phase has >= _ESCALATION_THRESHOLD failures across the last
_ESCALATION_WINDOW runs, we halt the incoming sync and record
`halted_escalation` on sync_runs. Operators can still start the sync
manually from the dashboard override modal.
"""
try:
counts = await sqlite_service.get_recent_phase_failures(limit=_ESCALATION_WINDOW)
except Exception as e:
logger.warning(f"escalation check: failed to read phase failures: {e}")
return None, {}
escalating = [p for p, c in counts.items() if c >= _ESCALATION_THRESHOLD]
return (escalating[0] if escalating else None), counts
def get_run_text_log(run_id: str) -> str | None: def get_run_text_log(run_id: str) -> str | None:
"""Return the accumulated text log for a run, or None if not found.""" """Return the accumulated text log for a run, or None if not found."""
lines = _run_logs.get(run_id) lines = _run_logs.get(run_id)
@@ -225,6 +269,22 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
json_dir = settings.JSON_OUTPUT_DIR json_dir = settings.JSON_OUTPUT_DIR
# ── Escalation check — halt if a phase has failed 3 runs in a row ──
halt_phase, _recent_counts = await _check_escalation()
if halt_phase:
halt_msg = f"ESCALATED: phase {halt_phase} failed {_ESCALATION_THRESHOLD} consecutive runs"
_log_line(run_id, halt_msg)
await sqlite_service.create_sync_run(run_id, 0)
await sqlite_service.update_sync_run(
run_id, "halted_escalation", 0, 0, 0, 0, error_message=halt_msg
)
if _current_sync:
_current_sync["status"] = "halted_escalation"
_current_sync["finished_at"] = _now().isoformat()
_current_sync["error"] = halt_msg
_update_progress("halted_escalation", halt_msg)
return {"run_id": run_id, "status": "halted_escalation", "error": halt_msg}
try: try:
# Phase 0: Download orders from GoMag API # Phase 0: Download orders from GoMag API
_update_progress("downloading", "Descărcare comenzi din GoMag API...") _update_progress("downloading", "Descărcare comenzi din GoMag API...")
@@ -309,7 +369,10 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
}) })
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag") _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag")
try:
await sqlite_service.save_orders_batch(cancelled_batch) await sqlite_service.save_orders_batch(cancelled_batch)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "cancelled_batch", e)
# Check if any cancelled orders were previously imported # Check if any cancelled orders were previously imported
from ..database import get_sqlite as _get_sqlite from ..database import get_sqlite as _get_sqlite
@@ -611,6 +674,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
# Step 3a: Record already-imported orders (batch) # Step 3a: Record already-imported orders (batch)
already_imported_count = len(already_in_roa) already_imported_count = len(already_in_roa)
already_batch = [] already_batch = []
_already_phase_failed = False
for order in already_in_roa: for order in already_in_roa:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order) shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
id_comanda_roa = existing_map.get(order.number) id_comanda_roa = existing_map.get(order.number)
@@ -638,7 +702,11 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
"items": order_items_data, "items": order_items_data,
}) })
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})") _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})")
try:
await sqlite_service.save_orders_batch(already_batch) await sqlite_service.save_orders_batch(already_batch)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "already_batch", e)
_already_phase_failed = True
# Update GoMag addresses + recompute address_mismatch for already-imported orders # Update GoMag addresses + recompute address_mismatch for already-imported orders
addr_updates = [] addr_updates = []
@@ -648,10 +716,13 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
"adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None, "adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None,
"adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}), "adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}),
}) })
try:
await sqlite_service.update_gomag_addresses_batch(addr_updates) await sqlite_service.update_gomag_addresses_batch(addr_updates)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "addresses_batch", e)
# Detect partner mismatches for already-imported orders # Detect partner mismatches for already-imported orders
if already_in_roa: if already_in_roa and not _already_phase_failed:
stored_partner_data = await sqlite_service.get_orders_partner_data_batch( stored_partner_data = await sqlite_service.get_orders_partner_data_batch(
[o.number for o in already_in_roa] [o.number for o in already_in_roa]
) )
@@ -750,7 +821,10 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
"items": order_items_data, "items": order_items_data,
}) })
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})") _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
try:
await sqlite_service.save_orders_batch(skipped_batch) await sqlite_service.save_orders_batch(skipped_batch)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "skipped_batch", e)
# ── Price sync from orders ── # ── Price sync from orders ──
if app_settings.get("price_sync_enabled") == "1": if app_settings.get("price_sync_enabled") == "1":
@@ -769,6 +843,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
_log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate") _log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate")
for pu in price_updates: for pu in price_updates:
_log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f}{pu['new_price']:.2f}") _log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f}{pu['new_price']:.2f}")
except DATA_ERRORS as e:
await _record_phase_err(run_id, "price_sync", e)
except Exception as e: except Exception as e:
_log_line(run_id, f"Eroare sync prețuri din comenzi: {e}") _log_line(run_id, f"Eroare sync prețuri din comenzi: {e}")
logger.error(f"Price sync error: {e}") logger.error(f"Price sync error: {e}")
@@ -1050,6 +1126,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
_log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache") _log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache")
if orders_deleted: if orders_deleted:
_log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}") _log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}")
except DATA_ERRORS as e:
await _record_phase_err(run_id, "invoice_check", e)
except Exception as e: except Exception as e:
logger.warning(f"Invoice/order status check failed: {e}") logger.warning(f"Invoice/order status check failed: {e}")
@@ -1100,6 +1178,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
await sqlite_service.bulk_update_order_anaf_data(db_updates) await sqlite_service.bulk_update_order_anaf_data(db_updates)
if db_updates: if db_updates:
_log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate") _log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate")
except DATA_ERRORS as e:
await _record_phase_err(run_id, "anaf_backfill", e)
except Exception as e: except Exception as e:
logger.warning(f"ANAF backfill failed: {e}") logger.warning(f"ANAF backfill failed: {e}")
_log_line(run_id, f"ANAF backfill eroare: {e}") _log_line(run_id, f"ANAF backfill eroare: {e}")

View File

@@ -0,0 +1,149 @@
"""Tests for _phase_wrap + escalation check in sync_service.
These cover:
- _record_phase_err persists a sync_phase_failures row
- _check_escalation returns None below threshold
- _check_escalation halts when a phase has failed 3 runs in a row
- run_sync short-circuits when escalation flags a phase
"""
import os
import sys
import sqlite3
import tempfile
import pytest
pytestmark = pytest.mark.unit
_tmpdir = tempfile.mkdtemp()
os.environ.setdefault("FORCE_THIN_MODE", "true")
os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_phase.db"))
os.environ.setdefault("ORACLE_DSN", "dummy")
os.environ.setdefault("ORACLE_USER", "dummy")
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _api_dir not in sys.path:
sys.path.insert(0, _api_dir)
from app import database
from app.services import sqlite_service, sync_service
@pytest.fixture(autouse=True)
async def _reset():
database.init_sqlite()
db = await sqlite_service.get_sqlite()
try:
await db.execute("DELETE FROM sync_phase_failures")
await db.execute("DELETE FROM sync_runs")
await db.commit()
finally:
await db.close()
yield
async def _make_run(run_id: str, offset: int = 0):
db = await sqlite_service.get_sqlite()
try:
await db.execute(
"INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')",
(run_id, f"{offset} seconds"),
)
await db.commit()
finally:
await db.close()
async def test_record_phase_err_inserts_row():
await _make_run("rec-1")
err = sqlite3.IntegrityError("simulated NOT NULL")
await sync_service._record_phase_err("rec-1", "price_sync", err)
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute(
"SELECT phase, error_summary FROM sync_phase_failures WHERE run_id = ?",
("rec-1",),
)
row = await cur.fetchone()
finally:
await db.close()
assert row is not None
assert row[0] == "price_sync"
assert "IntegrityError" in row[1]
async def test_check_escalation_below_threshold():
# 2 runs, each with invoice_check failure — below threshold.
for i in range(2):
await _make_run(f"run-{i}", offset=i)
await sqlite_service.record_phase_failure(f"run-{i}", "invoice_check", "err")
phase, counts = await sync_service._check_escalation()
assert phase is None
assert counts.get("invoice_check") == 2
async def test_check_escalation_hits_threshold():
for i in range(3):
await _make_run(f"run-{i}", offset=i)
await sqlite_service.record_phase_failure(f"run-{i}", "import_loop", "err")
phase, counts = await sync_service._check_escalation()
assert phase == "import_loop"
assert counts.get("import_loop") == 3
async def test_check_escalation_different_phases_dont_escalate():
# 3 runs, each failed on a different phase — no single phase hits 3.
phases = ["price_sync", "invoice_check", "anaf_backfill"]
for i, p in enumerate(phases):
await _make_run(f"run-{i}", offset=i)
await sqlite_service.record_phase_failure(f"run-{i}", p, "err")
phase, counts = await sync_service._check_escalation()
assert phase is None
assert len(counts) == 3
async def test_run_sync_short_circuits_on_escalation(monkeypatch):
"""With 3 consecutive price_sync failures, run_sync must halt without
touching gomag_client, order_reader, etc."""
for i in range(3):
await _make_run(f"prev-{i}", offset=i)
await sqlite_service.record_phase_failure(f"prev-{i}", "price_sync", "err")
# Sentinel: if sync proceeds to the download step, this will fire.
async def _boom(*args, **kwargs):
raise AssertionError("escalation should have halted before gomag download")
monkeypatch.setattr(sync_service.gomag_client, "download_orders", _boom)
result = await sync_service.run_sync(run_id="halt-test")
assert result["status"] == "halted_escalation"
assert "price_sync" in result["error"]
# sync_runs row should be persisted with halted_escalation status
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute(
"SELECT status, error_message FROM sync_runs WHERE run_id = ?", ("halt-test",)
)
row = await cur.fetchone()
finally:
await db.close()
assert row is not None
assert row[0] == "halted_escalation"
assert "ESCALATED" in row[1]
async def test_data_errors_tuple_shape():
"""Contract: DATA_ERRORS is a tuple covering the structural error types."""
assert sqlite3.IntegrityError in sync_service.DATA_ERRORS
assert ValueError in sync_service.DATA_ERRORS
assert TypeError in sync_service.DATA_ERRORS
assert UnicodeError in sync_service.DATA_ERRORS
# Must NOT include OperationalError — that halts the sync.
assert sqlite3.OperationalError not in sync_service.DATA_ERRORS