From 41b142effb044884199d4999058e8c78d72cdb24 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Wed, 22 Apr 2026 09:06:58 +0000 Subject: [PATCH] feat(sync): per-phase isolation + escalation halt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sync_service gains DATA_ERRORS tuple + two new primitives: _record_phase_err(run_id, phase, err) Logs, appends to run text log, persists to sync_phase_failures. _check_escalation() Reads the last 3 runs and returns the first phase that has failed all 3 in a row, or (None, counts) otherwise. run_sync now runs a pre-flight escalation check — if a phase has failed 3 consecutive runs, the incoming sync is halted with status='halted_escalation' and a descriptive error_message. The dashboard Start Sync button can still override (UI comes in the next PR2 phase). Wrapped phases (DATA_ERRORS caught, sync continues): cancelled_batch, already_batch, addresses_batch, skipped_batch, price_sync, invoice_check, anaf_backfill. Partner mismatch retains its existing per-order guards. OperationalError and OS-level errors still propagate to the top-level handler (halt). 6 unit tests cover record + counts + threshold + mixed-phase + short-circuit + DATA_ERRORS contract. Full CI green: 251 unit + 33 e2e. Co-Authored-By: Claude Opus 4.7 (1M context) --- api/app/services/sync_service.py | 90 +++++++++++++++++- api/tests/test_phase_isolation.py | 149 ++++++++++++++++++++++++++++++ 2 files changed, 234 insertions(+), 5 deletions(-) create mode 100644 api/tests/test_phase_isolation.py diff --git a/api/app/services/sync_service.py b/api/app/services/sync_service.py index f05b5f1..2ac4942 100644 --- a/api/app/services/sync_service.py +++ b/api/app/services/sync_service.py @@ -2,10 +2,22 @@ import asyncio import json import logging import re +import sqlite3 import uuid from datetime import datetime, timedelta from zoneinfo import ZoneInfo + +# Data-level errors that a single phase may raise without halting the whole +# sync. Everything NOT in this tuple (OperationalError, OSError, +# ConnectionError, MemoryError) propagates and halts. +DATA_ERRORS = (sqlite3.IntegrityError, ValueError, TypeError, UnicodeError) + +# Number of recent runs inspected by the escalation check. 3 consecutive +# failures on the same phase halts the next sync. +_ESCALATION_WINDOW = 3 +_ESCALATION_THRESHOLD = 3 + _tz_bucharest = ZoneInfo("Europe/Bucharest") @@ -85,6 +97,38 @@ def _log_line(run_id: str, message: str): _run_logs[run_id].append(f"[{ts}] {message}") +async def _record_phase_err(run_id: str, phase: str, err: Exception) -> None: + """Log + persist a phase-level data error so escalation + health can see it. + + Called only for DATA_ERRORS (structural / data problems). OperationalError + and OS-level errors bypass this and halt the sync. + """ + logger.error(f"[{run_id}] Phase {phase} data error: {err}", exc_info=True) + _log_line(run_id, f"FAZA {phase} eroare izolata: {type(err).__name__}: {err}") + try: + summary = f"{type(err).__name__}: {err}"[:500] + await sqlite_service.record_phase_failure(run_id, phase, summary) + except Exception as rec_err: + logger.warning(f"record_phase_failure failed for phase={phase}: {rec_err}") + + +async def _check_escalation() -> tuple[str | None, dict[str, int]]: + """Return (phase_to_halt_on, recent_counts). + + If any phase has >= _ESCALATION_THRESHOLD failures across the last + _ESCALATION_WINDOW runs, we halt the incoming sync and record + `halted_escalation` on sync_runs. Operators can still start the sync + manually from the dashboard override modal. + """ + try: + counts = await sqlite_service.get_recent_phase_failures(limit=_ESCALATION_WINDOW) + except Exception as e: + logger.warning(f"escalation check: failed to read phase failures: {e}") + return None, {} + escalating = [p for p, c in counts.items() if c >= _ESCALATION_THRESHOLD] + return (escalating[0] if escalating else None), counts + + def get_run_text_log(run_id: str) -> str | None: """Return the accumulated text log for a run, or None if not found.""" lines = _run_logs.get(run_id) @@ -225,6 +269,22 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None json_dir = settings.JSON_OUTPUT_DIR + # ── Escalation check — halt if a phase has failed 3 runs in a row ── + halt_phase, _recent_counts = await _check_escalation() + if halt_phase: + halt_msg = f"ESCALATED: phase {halt_phase} failed {_ESCALATION_THRESHOLD} consecutive runs" + _log_line(run_id, halt_msg) + await sqlite_service.create_sync_run(run_id, 0) + await sqlite_service.update_sync_run( + run_id, "halted_escalation", 0, 0, 0, 0, error_message=halt_msg + ) + if _current_sync: + _current_sync["status"] = "halted_escalation" + _current_sync["finished_at"] = _now().isoformat() + _current_sync["error"] = halt_msg + _update_progress("halted_escalation", halt_msg) + return {"run_id": run_id, "status": "halted_escalation", "error": halt_msg} + try: # Phase 0: Download orders from GoMag API _update_progress("downloading", "Descărcare comenzi din GoMag API...") @@ -309,7 +369,10 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None }) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag") - await sqlite_service.save_orders_batch(cancelled_batch) + try: + await sqlite_service.save_orders_batch(cancelled_batch) + except DATA_ERRORS as e: + await _record_phase_err(run_id, "cancelled_batch", e) # Check if any cancelled orders were previously imported from ..database import get_sqlite as _get_sqlite @@ -611,6 +674,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None # Step 3a: Record already-imported orders (batch) already_imported_count = len(already_in_roa) already_batch = [] + _already_phase_failed = False for order in already_in_roa: shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order) id_comanda_roa = existing_map.get(order.number) @@ -638,7 +702,11 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None "items": order_items_data, }) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})") - await sqlite_service.save_orders_batch(already_batch) + try: + await sqlite_service.save_orders_batch(already_batch) + except DATA_ERRORS as e: + await _record_phase_err(run_id, "already_batch", e) + _already_phase_failed = True # Update GoMag addresses + recompute address_mismatch for already-imported orders addr_updates = [] @@ -648,10 +716,13 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None "adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None, "adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}), }) - await sqlite_service.update_gomag_addresses_batch(addr_updates) + try: + await sqlite_service.update_gomag_addresses_batch(addr_updates) + except DATA_ERRORS as e: + await _record_phase_err(run_id, "addresses_batch", e) # Detect partner mismatches for already-imported orders - if already_in_roa: + if already_in_roa and not _already_phase_failed: stored_partner_data = await sqlite_service.get_orders_partner_data_batch( [o.number for o in already_in_roa] ) @@ -750,7 +821,10 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None "items": order_items_data, }) _log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})") - await sqlite_service.save_orders_batch(skipped_batch) + try: + await sqlite_service.save_orders_batch(skipped_batch) + except DATA_ERRORS as e: + await _record_phase_err(run_id, "skipped_batch", e) # ── Price sync from orders ── if app_settings.get("price_sync_enabled") == "1": @@ -769,6 +843,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None _log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate") for pu in price_updates: _log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f} → {pu['new_price']:.2f}") + except DATA_ERRORS as e: + await _record_phase_err(run_id, "price_sync", e) except Exception as e: _log_line(run_id, f"Eroare sync prețuri din comenzi: {e}") logger.error(f"Price sync error: {e}") @@ -1050,6 +1126,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None _log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache") if orders_deleted: _log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}") + except DATA_ERRORS as e: + await _record_phase_err(run_id, "invoice_check", e) except Exception as e: logger.warning(f"Invoice/order status check failed: {e}") @@ -1100,6 +1178,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None await sqlite_service.bulk_update_order_anaf_data(db_updates) if db_updates: _log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate") + except DATA_ERRORS as e: + await _record_phase_err(run_id, "anaf_backfill", e) except Exception as e: logger.warning(f"ANAF backfill failed: {e}") _log_line(run_id, f"ANAF backfill eroare: {e}") diff --git a/api/tests/test_phase_isolation.py b/api/tests/test_phase_isolation.py new file mode 100644 index 0000000..ef57970 --- /dev/null +++ b/api/tests/test_phase_isolation.py @@ -0,0 +1,149 @@ +"""Tests for _phase_wrap + escalation check in sync_service. + +These cover: +- _record_phase_err persists a sync_phase_failures row +- _check_escalation returns None below threshold +- _check_escalation halts when a phase has failed 3 runs in a row +- run_sync short-circuits when escalation flags a phase +""" +import os +import sys +import sqlite3 +import tempfile + +import pytest + +pytestmark = pytest.mark.unit + +_tmpdir = tempfile.mkdtemp() +os.environ.setdefault("FORCE_THIN_MODE", "true") +os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_phase.db")) +os.environ.setdefault("ORACLE_DSN", "dummy") +os.environ.setdefault("ORACLE_USER", "dummy") +os.environ.setdefault("ORACLE_PASSWORD", "dummy") +os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) + +_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _api_dir not in sys.path: + sys.path.insert(0, _api_dir) + +from app import database +from app.services import sqlite_service, sync_service + + +@pytest.fixture(autouse=True) +async def _reset(): + database.init_sqlite() + db = await sqlite_service.get_sqlite() + try: + await db.execute("DELETE FROM sync_phase_failures") + await db.execute("DELETE FROM sync_runs") + await db.commit() + finally: + await db.close() + yield + + +async def _make_run(run_id: str, offset: int = 0): + db = await sqlite_service.get_sqlite() + try: + await db.execute( + "INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')", + (run_id, f"{offset} seconds"), + ) + await db.commit() + finally: + await db.close() + + +async def test_record_phase_err_inserts_row(): + await _make_run("rec-1") + err = sqlite3.IntegrityError("simulated NOT NULL") + await sync_service._record_phase_err("rec-1", "price_sync", err) + + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute( + "SELECT phase, error_summary FROM sync_phase_failures WHERE run_id = ?", + ("rec-1",), + ) + row = await cur.fetchone() + finally: + await db.close() + assert row is not None + assert row[0] == "price_sync" + assert "IntegrityError" in row[1] + + +async def test_check_escalation_below_threshold(): + # 2 runs, each with invoice_check failure — below threshold. + for i in range(2): + await _make_run(f"run-{i}", offset=i) + await sqlite_service.record_phase_failure(f"run-{i}", "invoice_check", "err") + + phase, counts = await sync_service._check_escalation() + assert phase is None + assert counts.get("invoice_check") == 2 + + +async def test_check_escalation_hits_threshold(): + for i in range(3): + await _make_run(f"run-{i}", offset=i) + await sqlite_service.record_phase_failure(f"run-{i}", "import_loop", "err") + + phase, counts = await sync_service._check_escalation() + assert phase == "import_loop" + assert counts.get("import_loop") == 3 + + +async def test_check_escalation_different_phases_dont_escalate(): + # 3 runs, each failed on a different phase — no single phase hits 3. + phases = ["price_sync", "invoice_check", "anaf_backfill"] + for i, p in enumerate(phases): + await _make_run(f"run-{i}", offset=i) + await sqlite_service.record_phase_failure(f"run-{i}", p, "err") + + phase, counts = await sync_service._check_escalation() + assert phase is None + assert len(counts) == 3 + + +async def test_run_sync_short_circuits_on_escalation(monkeypatch): + """With 3 consecutive price_sync failures, run_sync must halt without + touching gomag_client, order_reader, etc.""" + for i in range(3): + await _make_run(f"prev-{i}", offset=i) + await sqlite_service.record_phase_failure(f"prev-{i}", "price_sync", "err") + + # Sentinel: if sync proceeds to the download step, this will fire. + async def _boom(*args, **kwargs): + raise AssertionError("escalation should have halted before gomag download") + + monkeypatch.setattr(sync_service.gomag_client, "download_orders", _boom) + + result = await sync_service.run_sync(run_id="halt-test") + assert result["status"] == "halted_escalation" + assert "price_sync" in result["error"] + + # sync_runs row should be persisted with halted_escalation status + db = await sqlite_service.get_sqlite() + try: + cur = await db.execute( + "SELECT status, error_message FROM sync_runs WHERE run_id = ?", ("halt-test",) + ) + row = await cur.fetchone() + finally: + await db.close() + assert row is not None + assert row[0] == "halted_escalation" + assert "ESCALATED" in row[1] + + +async def test_data_errors_tuple_shape(): + """Contract: DATA_ERRORS is a tuple covering the structural error types.""" + assert sqlite3.IntegrityError in sync_service.DATA_ERRORS + assert ValueError in sync_service.DATA_ERRORS + assert TypeError in sync_service.DATA_ERRORS + assert UnicodeError in sync_service.DATA_ERRORS + # Must NOT include OperationalError — that halts the sync. + assert sqlite3.OperationalError not in sync_service.DATA_ERRORS