Compare commits

...

9 Commits

Author SHA1 Message Date
Claude Agent
7e30523242 feat(retry): allow retry for MALFORMED orders
MALFORMED is now a valid retry source alongside ERROR / SKIPPED /
DELETED_IN_ROA. The next sync will re-run validate_structural and
either reclassify or keep the MALFORMED tag — either way, operators
get the same "Retry" button they have for other failure paths
without needing a separate UI affordance.

278 unit + 33 e2e green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 09:16:16 +00:00
Claude Agent
bb6f3a3b87 feat(sync): /api/sync/health endpoint + dashboard health pill + MALFORMED UI
Backend:
- GET /api/sync/health returns {last_sync_at, last_sync_status,
  last_halt_reason, recent_phase_failures, escalation_phase, is_healthy}.
  healthy when last run was completed (or none yet), no phase has
  tripped the 3-in-a-row escalation, and recent failures <= 1.
- Dashboard + run-level endpoints include `malformed` count so the
  Defecte pill can render.

Frontend:
- Health pill in .sync-card-controls with three states — healthy
  (success green, check icon), warning (amber, triangle), escalated
  (error red, x-octagon + glow). Tooltip exposes the halt reason and
  the top phases with recent failures.
- Status-dot + badge add MALFORMED treatment via --compare orange,
  distinct from ERROR red. DESIGN.md notes the diagnostic rationale
  (ERROR = runtime, MALFORMED = payload source issue).
- Defecte filter pill on dashboard + logs pages. Mobile segmented
  control includes Defecte count. Counts wired to the malformed key.
- startSync() shows a native confirm modal when state is
  halted_escalation — operator override still possible, not silenced.
- ORDER_STATUS.MALFORMED mirror added to shared.js.
- Cache-bust: style.css v46, shared.js v47, dashboard.js v52,
  logs.js v16.

5 endpoint tests cover empty state, completed, failed, escalated,
single-failure warning. Full CI: 257 unit + 33 e2e green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 09:14:46 +00:00
Claude Agent
41b142effb feat(sync): per-phase isolation + escalation halt
sync_service gains DATA_ERRORS tuple + two new primitives:
  _record_phase_err(run_id, phase, err)
    Logs, appends to run text log, persists to sync_phase_failures.
  _check_escalation()
    Reads the last 3 runs and returns the first phase that has failed
    all 3 in a row, or (None, counts) otherwise.

run_sync now runs a pre-flight escalation check — if a phase has failed
3 consecutive runs, the incoming sync is halted with
status='halted_escalation' and a descriptive error_message. The
dashboard Start Sync button can still override (UI comes in the next
PR2 phase).

Wrapped phases (DATA_ERRORS caught, sync continues):
  cancelled_batch, already_batch, addresses_batch, skipped_batch,
  price_sync, invoice_check, anaf_backfill.
Partner mismatch retains its existing per-order guards. OperationalError
and OS-level errors still propagate to the top-level handler (halt).

6 unit tests cover record + counts + threshold + mixed-phase +
short-circuit + DATA_ERRORS contract. Full CI green: 251 unit + 33 e2e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 09:06:58 +00:00
Claude Agent
1e4e3279f7 feat(sync): sync_phase_failures table for escalation tracking
New table sync_phase_failures(run_id, phase, error_summary, created_at)
with index on (phase, created_at). Minimal schema — no raw payload, no
PII — stores just enough to answer "did phase X fail in the last N
runs?" for the escalation check and the /api/sync/health pill.

Helpers in sqlite_service:
  record_phase_failure(run_id, phase, error_summary)
    INSERT OR REPLACE semantics (one row per run+phase), then prunes
    to the most recent 100 sync_runs. error_summary clipped at 500
    chars defensively.
  get_recent_phase_failures(limit=3) → {phase: count} across the last N
    runs, ordered by started_at desc.

6 unit tests cover creation, counting, pruning, empty state,
idempotency, and limit semantics.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 09:02:08 +00:00
Claude Agent
47a6bd83a4 feat(sync): per-order SAVEPOINT protection for order_items upsert
_safe_upsert_order_items(db, order_number, items) wraps the
DELETE + INSERT OR REPLACE pair in SAVEPOINT items. On
IntegrityError / ValueError / TypeError it rolls the savepoint
back, tags the parent order MALFORMED, logs to the error history
file, and returns False to the caller. add_order_items now delegates
to this helper so a single bad payload cannot leave order_items in
a split state.

2 integration tests: happy path + simulated INSERT crash via
aiosqlite monkeypatch. Existing order_items overwrite regression
tests still pass (5/5).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 09:00:57 +00:00
Claude Agent
f448f74b2d feat(sync): hybrid batch+savepoint isolation with reconnect fix
save_orders_batch now runs in three tiers:
  1. validate_structural pre-flight splits each payload into valid or
     MALFORMED. MALFORMED rows persist with status + error_message + no
     items, and an append-only entry lands in sync_errors_history.log.
  2. Optimistic executemany over the valid list inside a SAVEPOINT batch.
  3. On IntegrityError / ValueError / TypeError, rollback the savepoint
     and fall back to per-order SAVEPOINT inserts so a single bad row
     cannot poison the rest of the batch.

Mid-loop SAVEPOINT rollback failure now triggers _safe_reconnect:
commit whatever survived, close the broken connection, open a fresh
one and keep processing. Preserves MALFORMED rows recorded earlier —
addresses the outside-voice gap where a crashed connection would lose
uncommitted malformed evidence.

Adds OrderStatus.MALFORMED and helper functions:
  _insert_orders_only  — orders + sync_run_orders, no items
  _insert_valid_batch  — happy-path bulk executemany
  _insert_single_order — per-order execute within savepoint
  _mark_malformed      — non-mutating copy with wiped items
  _safe_reconnect      — commit-close-reconnect guard

8 integration tests covering regression 485224762, structural
pre-flight, per-order isolation on runtime fail, caller-dict
immutability, and reconnect durability. 239 unit + 33 e2e green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 08:59:43 +00:00
Claude Agent
d7610a6f33 feat(sync): persistent append-only error history log
_log_order_error_history(order_number, msg) writes to
logs/sync_errors_history.log via a dedicated RotatingFileHandler
(100MB × 12 backups). Logger is lazy-initialised and non-propagating
so it doesn't pollute the root logger.

Purpose: orders.error_message is overwritten when a retry succeeds,
so the history log preserves permanent audit of every malformed-order
event regardless of later outcome. Helper never raises — callers are
already in a degraded path.

3 unit tests: append semantics, multi-order, exception isolation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 08:54:11 +00:00
Claude Agent
38498bec6d feat(validation): add structural pre-flight validator
validate_structural(order) runs before save_orders_batch insert.
Catches malformed payloads (MISSING_FIELD, INVALID_DATE, EMPTY_ITEMS,
INVALID_QUANTITY, INVALID_PRICE) that would otherwise crash the batch
insert or downstream pipeline. 17 unit tests cover each rule.

Does NOT validate SKU existence — redundant with _dedup_items_by_sku
pass-through and validate_skus Oracle lookup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 08:52:32 +00:00
Claude Agent
f6d283b743 refactor(status): introduce OrderStatus enum, replace string literals
Centralized order status values in api/app/constants.py via a
str-valued Enum so comparisons keep working. Replaced literals in:
- services: sync_service, sqlite_service, retry_service
- routers: sync, dashboard
- templates: dashboard.html, logs.html
- static JS: shared (ORDER_STATUS mirror), dashboard, logs
- tests: requirements, order_items_overwrite, business_rules

MALFORMED intentionally NOT added — introduced in follow-up PR2
(per-order failure isolation).

Full test suite: 231 unit + 33 e2e pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 08:45:32 +00:00
25 changed files with 1850 additions and 247 deletions

View File

@@ -160,9 +160,12 @@ Strategy: invert surfaces, reduce accent saturation ~15%, keep semantic colors r
| ALREADY_IMPORTED | `--info` | `--info-light` | none |
| CANCELLED | `--cancelled` | `--cancelled-light` | none |
| DELETED_IN_ROA | `--cancelled` | `--cancelled-light` | none |
| MALFORMED | `--compare` | `--compare-light` | `0 0 8px 2px rgba(234,88,12,0.35)` |
**Design rule:** Problems glow, success is calm. The operator's eye is pulled to rows that need action.
**ERROR vs MALFORMED:** ERROR red signals a runtime issue operators can fix on our side (Oracle hiccup, network, stale state). MALFORMED orange signals the payload itself is broken at the source — the operator should escalate to GoMag rather than keep retrying. Visually distinct colors make the diagnostic path obvious at a glance.
## Spacing
- **Base unit:** 4px
- **Density:** Comfortable — not cramped, not wasteful

22
api/app/constants.py Normal file
View File

@@ -0,0 +1,22 @@
"""Application-wide constants shared across services, routers, and tests."""
from enum import Enum
class OrderStatus(str, Enum):
"""Order status values stored in SQLite `orders.status` column.
Inherits from `str` so existing string comparisons (==, in, dict.get)
keep working. Always use `.value` when passing to SQL queries or JSON
payloads to avoid Python-version-specific str(enum) surprises.
"""
IMPORTED = "IMPORTED"
ALREADY_IMPORTED = "ALREADY_IMPORTED"
SKIPPED = "SKIPPED"
ERROR = "ERROR"
CANCELLED = "CANCELLED"
DELETED_IN_ROA = "DELETED_IN_ROA"
# Structural-fail: GoMag sent a payload that cannot be inserted as-is
# (missing fields, unparseable date, invalid quantity/price, or a runtime
# insert crash). Row persists with status=MALFORMED + error_message so
# operators can escalate to GoMag without blocking the rest of the batch.
MALFORMED = "MALFORMED"

View File

@@ -186,6 +186,15 @@ CREATE TABLE IF NOT EXISTS anaf_cache (
denumire_anaf TEXT,
checked_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sync_phase_failures (
run_id TEXT NOT NULL REFERENCES sync_runs(run_id),
phase TEXT NOT NULL,
error_summary TEXT,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (run_id, phase)
);
CREATE INDEX IF NOT EXISTS idx_spf_phase_time ON sync_phase_failures(phase, created_at);
"""
_sqlite_db_path = None

View File

@@ -4,9 +4,11 @@ from fastapi.responses import HTMLResponse
from pathlib import Path
from ..services import sqlite_service
from ..constants import OrderStatus
router = APIRouter()
templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates"))
templates.env.globals["OrderStatus"] = OrderStatus
@router.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):

View File

@@ -14,9 +14,11 @@ from typing import Optional
from ..services import sync_service, scheduler_service, sqlite_service, invoice_service
from .. import database
from ..constants import OrderStatus
router = APIRouter(tags=["sync"])
templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates"))
templates.env.globals["OrderStatus"] = OrderStatus
async def _enrich_items_with_codmat(items: list) -> None:
@@ -158,6 +160,52 @@ async def sync_status():
return result
@router.get("/api/sync/health")
async def sync_health():
"""Aggregated sync health snapshot used by the dashboard pill.
Fields:
last_sync_at ISO timestamp of most recent run start (or null).
last_sync_status completed | failed | running | halted_escalation | null.
last_halt_reason error_message from that run (only populated on
failed / halted_escalation).
recent_phase_failures {phase: count} across the last 3 runs.
escalation_phase the phase that tripped the 3-in-a-row halt, or null.
is_healthy completed last + <=1 recent phase failure.
"""
db = await sqlite_service.get_sqlite()
try:
cursor = await db.execute(
"SELECT run_id, started_at, status, error_message "
"FROM sync_runs ORDER BY started_at DESC LIMIT 1"
)
last_row = await cursor.fetchone()
finally:
await db.close()
last = dict(last_row) if last_row else {}
last_status = last.get("status")
halt_reason = last.get("error_message") if last_status in ("failed", "halted_escalation") else None
counts = await sqlite_service.get_recent_phase_failures(limit=3)
escalation_phase = next((p for p, c in counts.items() if c >= 3), None)
is_healthy = (
last_status in (None, "completed")
and escalation_phase is None
and sum(counts.values()) <= 1
)
return {
"last_sync_at": last.get("started_at"),
"last_sync_status": last_status,
"last_halt_reason": halt_reason,
"recent_phase_failures": counts,
"escalation_phase": escalation_phase,
"is_healthy": is_healthy,
}
@router.get("/api/sync/history")
async def sync_history(page: int = 1, per_page: int = 20):
"""Get sync run history."""
@@ -231,13 +279,13 @@ def _format_text_log_from_detail(detail: dict) -> str:
customer = o.get("customer_name", "?")
order_date = o.get("order_date") or "?"
if status == "IMPORTED":
if status == OrderStatus.IMPORTED.value:
id_cmd = o.get("id_comanda", "?")
lines.append(f"#{number} [{order_date}] {customer} → IMPORTAT (ID: {id_cmd})")
elif status == "ALREADY_IMPORTED":
elif status == OrderStatus.ALREADY_IMPORTED.value:
id_cmd = o.get("id_comanda", "?")
lines.append(f"#{number} [{order_date}] {customer} → DEJA IMPORTAT (ID: {id_cmd})")
elif status == "SKIPPED":
elif status == OrderStatus.SKIPPED.value:
missing = o.get("missing_skus", "")
if isinstance(missing, str):
try:
@@ -246,7 +294,7 @@ def _format_text_log_from_detail(detail: dict) -> str:
missing = [missing] if missing else []
skus_str = ", ".join(missing) if isinstance(missing, list) else str(missing)
lines.append(f"#{number} [{order_date}] {customer} → OMIS (lipsa: {skus_str})")
elif status == "ERROR":
elif status == OrderStatus.ERROR.value:
err = o.get("error_message", "necunoscuta")
lines.append(f"#{number} [{order_date}] {customer} → EROARE: {err}")
@@ -618,7 +666,7 @@ async def dashboard_orders(page: int = 1, per_page: int = 50,
is_invoiced_filter = (status == "INVOICED")
# For UNINVOICED/INVOICED: fetch all IMPORTED orders, then filter post-invoice-check
fetch_status = "IMPORTED" if (is_uninvoiced_filter or is_invoiced_filter) else status
fetch_status = OrderStatus.IMPORTED.value if (is_uninvoiced_filter or is_invoiced_filter) else status
fetch_per_page = 10000 if (is_uninvoiced_filter or is_invoiced_filter) else per_page
fetch_page = 1 if (is_uninvoiced_filter or is_invoiced_filter) else page
@@ -687,7 +735,7 @@ async def dashboard_orders(page: int = 1, per_page: int = 50,
newly_invoiced = sum(1 for o in uncached_orders if o.get("invoice") and o["invoice"].get("facturat"))
uninvoiced_base = counts.get("uninvoiced_sqlite", sum(
1 for o in all_orders
if o.get("status") in ("IMPORTED", "ALREADY_IMPORTED") and not o.get("invoice")
if o.get("status") in (OrderStatus.IMPORTED.value, OrderStatus.ALREADY_IMPORTED.value) and not o.get("invoice")
))
counts["nefacturate"] = max(0, uninvoiced_base - newly_invoiced)
imported_total = counts.get("imported_all") or counts.get("imported", 0)
@@ -713,7 +761,7 @@ async def dashboard_orders(page: int = 1, per_page: int = 50,
# For UNINVOICED filter: apply server-side filtering + pagination
if is_uninvoiced_filter:
filtered = [o for o in all_orders if o.get("status") in ("IMPORTED", "ALREADY_IMPORTED") and not o.get("invoice")]
filtered = [o for o in all_orders if o.get("status") in (OrderStatus.IMPORTED.value, OrderStatus.ALREADY_IMPORTED.value) and not o.get("invoice")]
total = len(filtered)
offset = (page - 1) * per_page
result["orders"] = filtered[offset:offset + per_page]
@@ -722,7 +770,7 @@ async def dashboard_orders(page: int = 1, per_page: int = 50,
result["per_page"] = per_page
result["pages"] = (total + per_page - 1) // per_page if total > 0 else 0
elif is_invoiced_filter:
filtered = [o for o in all_orders if o.get("status") in ("IMPORTED", "ALREADY_IMPORTED") and o.get("invoice")]
filtered = [o for o in all_orders if o.get("status") in (OrderStatus.IMPORTED.value, OrderStatus.ALREADY_IMPORTED.value) and o.get("invoice")]
total = len(filtered)
offset = (page - 1) * per_page
result["orders"] = filtered[offset:offset + per_page]

View File

@@ -4,6 +4,8 @@ import logging
import tempfile
from datetime import datetime, timedelta
from ..constants import OrderStatus
logger = logging.getLogger(__name__)
@@ -70,7 +72,7 @@ async def _download_and_reimport(order_number: str, order_date_str: str, custome
order_number=order_number,
order_date=order_date_str,
customer_name=customer_name,
status="ERROR",
status=OrderStatus.ERROR.value,
error_message=f"Retry failed: {e}",
)
return {"success": False, "message": f"Eroare import: {e}"}
@@ -103,7 +105,7 @@ async def _download_and_reimport(order_number: str, order_date_str: str, custome
order_number=order_number,
order_date=order_date_str,
customer_name=customer_name,
status="IMPORTED",
status=OrderStatus.IMPORTED.value,
id_comanda=result.get("id_comanda"),
id_partener=result.get("id_partener"),
error_message=None,
@@ -116,7 +118,7 @@ async def _download_and_reimport(order_number: str, order_date_str: str, custome
)
await sqlite_service.add_order_items(order_number, order_items_data)
logger.info(f"Retry successful for order {order_number} → IMPORTED ({len(order_items_data)} items)")
return {"success": True, "message": "Comanda reimportata cu succes", "status": "IMPORTED"}
return {"success": True, "message": "Comanda reimportata cu succes", "status": OrderStatus.IMPORTED.value}
else:
error = result.get("error", "Unknown error")
await sqlite_service.upsert_order(
@@ -124,11 +126,11 @@ async def _download_and_reimport(order_number: str, order_date_str: str, custome
order_number=order_number,
order_date=order_date_str,
customer_name=customer_name,
status="ERROR",
status=OrderStatus.ERROR.value,
error_message=f"Retry: {error}",
)
await sqlite_service.add_order_items(order_number, order_items_data)
return {"success": False, "message": f"Import esuat: {error}", "status": "ERROR"}
return {"success": False, "message": f"Import esuat: {error}", "status": OrderStatus.ERROR.value}
async def retry_single_order(order_number: str, app_settings: dict) -> dict:
@@ -157,8 +159,10 @@ async def retry_single_order(order_number: str, app_settings: dict) -> dict:
order_data = detail["order"]
status = order_data.get("status", "")
if status not in ("ERROR", "SKIPPED", "DELETED_IN_ROA"):
return {"success": False, "message": f"Retry permis doar pentru ERROR/SKIPPED/DELETED_IN_ROA (status actual: {status})"}
if status not in (OrderStatus.ERROR.value, OrderStatus.SKIPPED.value,
OrderStatus.DELETED_IN_ROA.value, OrderStatus.MALFORMED.value):
return {"success": False,
"message": f"Retry permis doar pentru ERROR/SKIPPED/DELETED_IN_ROA/MALFORMED (status actual: {status})"}
order_date_str = order_data.get("order_date", "")
customer_name = order_data.get("customer_name", "")
@@ -196,7 +200,7 @@ async def resync_single_order(order_number: str, app_settings: dict) -> dict:
status = order_data.get("status", "")
id_comanda = order_data.get("id_comanda")
if status not in ("IMPORTED", "ALREADY_IMPORTED") or not id_comanda:
if status not in (OrderStatus.IMPORTED.value, OrderStatus.ALREADY_IMPORTED.value) or not id_comanda:
return {"success": False, "message": f"Resync permis doar pentru IMPORTED/ALREADY_IMPORTED cu id_comanda (status actual: {status})"}
# Invoice safety gate
@@ -269,7 +273,7 @@ async def delete_single_order(order_number: str) -> dict:
status = order_data.get("status", "")
id_comanda = order_data.get("id_comanda")
if status not in ("IMPORTED", "ALREADY_IMPORTED") or not id_comanda:
if status not in (OrderStatus.IMPORTED.value, OrderStatus.ALREADY_IMPORTED.value) or not id_comanda:
return {"success": False, "message": f"Stergere permisa doar pentru IMPORTED/ALREADY_IMPORTED cu id_comanda (status actual: {status})"}
# Invoice safety gate

View File

@@ -1,8 +1,11 @@
import json
import logging
import logging.handlers
import os
from datetime import datetime
from zoneinfo import ZoneInfo
from ..database import get_sqlite, get_sqlite_sync
from ..constants import OrderStatus
# Re-export so other services can import get_sqlite from sqlite_service
__all__ = ["get_sqlite", "get_sqlite_sync"]
@@ -17,6 +20,61 @@ def _now_str():
logger = logging.getLogger(__name__)
# Dedicated append-only logger for per-order errors.
# orders.error_message is overwritten when retry succeeds — this file
# keeps the permanent audit trail.
_error_history_logger: logging.Logger | None = None
def _get_error_history_logger() -> logging.Logger:
"""Lazily-initialised logger writing to logs/sync_errors_history.log.
Append-only. Rolls over at 100MB with 12 kept backups (~monthly cadence
under prod load).
"""
global _error_history_logger
if _error_history_logger is not None:
return _error_history_logger
lg = logging.getLogger("sync_errors_history")
lg.setLevel(logging.INFO)
lg.propagate = False
# Find project root by walking up from this file
here = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(here, "..", "..", ".."))
logs_dir = os.path.join(project_root, "logs")
os.makedirs(logs_dir, exist_ok=True)
log_path = os.path.join(logs_dir, "sync_errors_history.log")
if not any(
isinstance(h, logging.handlers.RotatingFileHandler)
and getattr(h, "baseFilename", "") == log_path
for h in lg.handlers
):
handler = logging.handlers.RotatingFileHandler(
log_path, maxBytes=100 * 1024 * 1024, backupCount=12, encoding="utf-8"
)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
lg.addHandler(handler)
_error_history_logger = lg
return lg
def _log_order_error_history(order_number: str, error_msg: str) -> None:
"""Append an order-level failure line to the permanent error history log.
Called from save_orders_batch + add_order_items on MALFORMED fallback,
so the evidence survives later retry-success overwrites of
orders.error_message.
"""
try:
_get_error_history_logger().warning(f"ORDER_FAIL {order_number}: {error_msg}")
except Exception as e:
logger.warning(f"_log_order_error_history failed for {order_number}: {e}")
async def create_sync_run(run_id: str, json_files: int = 0):
"""Create a new sync run record."""
db = await get_sqlite()
@@ -68,7 +126,7 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
"""Upsert a single order — one row per order_number, status updated in place."""
db = await get_sqlite()
try:
await db.execute("""
await db.execute(f"""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
@@ -79,7 +137,7 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
ON CONFLICT(order_number) DO UPDATE SET
customer_name = excluded.customer_name,
status = CASE
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}'
THEN orders.status
ELSE excluded.status
END,
@@ -88,7 +146,7 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
@@ -126,107 +184,260 @@ async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run:
await db.close()
async def save_orders_batch(orders_data: list[dict]):
"""Batch save a list of orders + their sync_run_orders + order_items in one transaction.
# SQL for the orders upsert — reused by batch + single-order fallback paths.
_ORDERS_UPSERT_SQL = f"""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method, order_total,
delivery_cost, discount_total, web_status, discount_split)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
customer_name = excluded.customer_name,
status = CASE
WHEN orders.status = '{OrderStatus.IMPORTED.value}' AND excluded.status = '{OrderStatus.ALREADY_IMPORTED.value}'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = '{OrderStatus.SKIPPED.value}'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
order_total = COALESCE(excluded.order_total, orders.order_total),
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
web_status = COALESCE(excluded.web_status, orders.web_status),
discount_split = COALESCE(excluded.discount_split, orders.discount_split),
updated_at = datetime('now')
"""
Each dict must have: sync_run_id, order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus (list|None), items_count,
shipping_name, billing_name, payment_method, delivery_method, status_at_run,
items (list of item dicts), delivery_cost (optional), discount_total (optional),
web_status (optional).
def _orders_row(d: dict) -> tuple:
return (
d["order_number"], d["order_date"], d["customer_name"], d["status"],
d.get("id_comanda"), d.get("id_partener"), d.get("error_message"),
json.dumps(d["missing_skus"]) if d.get("missing_skus") else None,
d.get("items_count", 0), d["sync_run_id"],
d.get("shipping_name"), d.get("billing_name"),
d.get("payment_method"), d.get("delivery_method"),
d.get("order_total"),
d.get("delivery_cost"), d.get("discount_total"),
d.get("web_status"), d.get("discount_split"),
)
def _mark_malformed(d: dict, reason: str) -> dict:
"""Return a copy of d with status=MALFORMED, error_message set, items wiped.
Does NOT mutate caller's dict.
"""
out = dict(d)
out["status"] = OrderStatus.MALFORMED.value
out["error_message"] = reason
out["items"] = []
out["items_count"] = 0
out["missing_skus"] = None
return out
async def _insert_orders_only(db, orders: list[dict]):
"""Upsert only the `orders` + `sync_run_orders` rows (no items).
Used for MALFORMED fallback where we can't trust item data.
"""
if not orders:
return
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
[(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) for d in orders],
)
async def _insert_valid_batch(db, orders: list[dict]):
"""Happy-path batch insert: orders + sync_run_orders + order_items in bulk.
Caller wraps in a SAVEPOINT so a mid-batch failure rolls back to a clean
point and the per-order fallback can take over.
"""
if not orders:
return
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
[(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders],
)
all_items: list[tuple] = []
order_numbers_with_items: set = set()
for d in orders:
raw_items = d.get("items", [])
if not raw_items:
continue
order_numbers_with_items.add(d["order_number"])
for item in _dedup_items_by_sku(raw_items):
all_items.append((
d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"),
))
if all_items:
placeholders = ",".join("?" * len(order_numbers_with_items))
await db.execute(
f"DELETE FROM order_items WHERE order_number IN ({placeholders})",
tuple(order_numbers_with_items),
)
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", all_items)
async def _insert_single_order(db, d: dict):
"""Insert one order + its sync_run_orders row + its items.
Caller wraps in SAVEPOINT so a per-row failure doesn't poison the batch.
"""
await db.execute(_ORDERS_UPSERT_SQL, _orders_row(d))
await db.execute(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
(d["sync_run_id"], d["order_number"], d["status_at_run"]),
)
raw_items = d.get("items", [])
if raw_items:
await db.execute("DELETE FROM order_items WHERE order_number = ?", (d["order_number"],))
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"))
for item in _dedup_items_by_sku(raw_items)
])
async def save_orders_batch(orders_data: list[dict]):
"""Batch save orders + sync_run_orders + order_items with per-order isolation.
Three-tier strategy:
1. Pre-validate with validate_structural — malformed rows persist as
MALFORMED + error_message, no items, and do not participate in the
valid batch.
2. Optimistic executemany for the remaining valid rows (happy path).
3. On IntegrityError / ValueError / TypeError during the batch, fall back
to per-order SAVEPOINTs so a single bad row doesn't block the rest.
Per-order failures are marked MALFORMED + logged to the permanent
error-history file.
Re-raises OperationalError / OSError / ConnectionError / MemoryError at
the top level — the scheduler interprets these as halt signals. A
mid-loop ROLLBACK failure triggers a commit-before-reconnect so any
successfully-inserted work (including MALFORMED entries) persists.
"""
if not orders_data:
return
import sqlite3
# Structural pre-flight — local import avoids the services package cycle
from .validation_service import validate_structural
valid: list[dict] = []
malformed: list[dict] = []
for d in orders_data:
ok, err_type, err_msg = validate_structural(d)
if ok:
valid.append(d)
else:
fixed = _mark_malformed(d, f"{err_type}: {err_msg}")
malformed.append(fixed)
_log_order_error_history(fixed["order_number"], fixed["error_message"])
db = await get_sqlite()
try:
# 1. Upsert orders
await db.executemany("""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method, order_total,
delivery_cost, discount_total, web_status, discount_split)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
customer_name = excluded.customer_name,
status = CASE
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
order_total = COALESCE(excluded.order_total, orders.order_total),
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
web_status = COALESCE(excluded.web_status, orders.web_status),
discount_split = COALESCE(excluded.discount_split, orders.discount_split),
updated_at = datetime('now')
""", [
(d["order_number"], d["order_date"], d["customer_name"], d["status"],
d.get("id_comanda"), d.get("id_partener"), d.get("error_message"),
json.dumps(d["missing_skus"]) if d.get("missing_skus") else None,
d.get("items_count", 0), d["sync_run_id"],
d.get("shipping_name"), d.get("billing_name"),
d.get("payment_method"), d.get("delivery_method"),
d.get("order_total"),
d.get("delivery_cost"), d.get("discount_total"),
d.get("web_status"), d.get("discount_split"))
for d in orders_data
])
if malformed:
await _insert_orders_only(db, malformed)
# 2. Sync run orders
await db.executemany("""
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
VALUES (?, ?, ?)
""", [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data])
if valid:
# Savepoint around the batch — lets us roll back to a clean point
# and take the per-order path without losing the MALFORMED rows
# inserted above.
await db.execute("SAVEPOINT batch")
try:
await _insert_valid_batch(db, valid)
await db.execute("RELEASE SAVEPOINT batch")
except (sqlite3.IntegrityError, ValueError, TypeError) as batch_err:
logger.warning(f"save_orders_batch: batch insert failed, falling back per-order: {batch_err}")
try:
await db.execute("ROLLBACK TO SAVEPOINT batch")
await db.execute("RELEASE SAVEPOINT batch")
except sqlite3.OperationalError:
# Savepoint rollback itself failed — commit whatever survived,
# reconnect, continue from scratch on the valid list.
db = await _safe_reconnect(db)
# 3. Order items — replace semantics (GoMag source of truth).
# Dedup per-order by SKU (GoMag sometimes returns same SKU twice).
all_items = []
order_numbers_with_items = set()
for d in orders_data:
raw_items = d.get("items", [])
if not raw_items:
continue
order_numbers_with_items.add(d["order_number"])
for item in _dedup_items_by_sku(raw_items):
all_items.append((
d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa")
))
if all_items:
placeholders = ",".join("?" * len(order_numbers_with_items))
await db.execute(
f"DELETE FROM order_items WHERE order_number IN ({placeholders})",
tuple(order_numbers_with_items)
)
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", all_items)
for d in valid:
try:
await db.execute("SAVEPOINT ord")
await _insert_single_order(db, d)
await db.execute("RELEASE SAVEPOINT ord")
except (sqlite3.IntegrityError, ValueError, TypeError) as per_err:
reason = f"RUNTIME: {type(per_err).__name__}: {per_err}"
try:
await db.execute("ROLLBACK TO SAVEPOINT ord")
await db.execute("RELEASE SAVEPOINT ord")
await _insert_orders_only(db, [_mark_malformed(d, reason)])
_log_order_error_history(d["order_number"], reason)
except sqlite3.OperationalError:
reason = f"RUNTIME (post-reconnect): {type(per_err).__name__}: {per_err}"
db = await _safe_reconnect(db)
await _insert_orders_only(db, [_mark_malformed(d, reason)])
_log_order_error_history(d["order_number"], reason)
await db.commit()
finally:
try:
await db.close()
except Exception:
pass
async def _safe_reconnect(db):
"""Commit whatever survived, close the broken connection, open a fresh one.
Called when a SAVEPOINT rollback itself raises OperationalError — the
connection is in a bad state and further work on it will fail. Commit
preserves the MALFORMED rows inserted before the explosion.
"""
try:
await db.commit()
except Exception:
pass
try:
await db.close()
except Exception:
pass
return await get_sqlite()
async def track_missing_sku(sku: str, product_name: str = "",
@@ -400,17 +611,17 @@ async def get_dashboard_stats():
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'IMPORTED'"
f"SELECT COUNT(*) FROM orders WHERE status = '{OrderStatus.IMPORTED.value}'"
)
imported = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'SKIPPED'"
f"SELECT COUNT(*) FROM orders WHERE status = '{OrderStatus.SKIPPED.value}'"
)
skipped = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'ERROR'"
f"SELECT COUNT(*) FROM orders WHERE status = '{OrderStatus.ERROR.value}'"
)
errors = (await cursor.fetchone())[0]
@@ -563,38 +774,128 @@ def _dedup_items_by_sku(items: list) -> list:
return order
async def _safe_upsert_order_items(db, order_number: str, items: list):
"""Replace order_items for one order inside a SAVEPOINT.
On IntegrityError / ValueError / TypeError: rolls back the savepoint,
tags the parent order MALFORMED, records the failure in the history
log, and returns False. On success returns True.
Caller is responsible for the outer connection lifecycle.
"""
import sqlite3
items = _dedup_items_by_sku(items) if items else []
await db.execute("SAVEPOINT items")
try:
await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,))
if items:
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(order_number,
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"))
for item in items
])
await db.execute("RELEASE SAVEPOINT items")
return True
except (sqlite3.IntegrityError, ValueError, TypeError) as err:
reason = f"ITEMS_FAIL: {type(err).__name__}: {err}"
try:
await db.execute("ROLLBACK TO SAVEPOINT items")
await db.execute("RELEASE SAVEPOINT items")
except sqlite3.OperationalError:
# Outer caller will handle reconnect — just log and bail.
logger.exception(f"_safe_upsert_order_items: rollback failed for {order_number}")
raise
# Tag parent order as MALFORMED without removing it from sync state.
await db.execute(
"UPDATE orders SET status = ?, error_message = ?, updated_at = datetime('now') WHERE order_number = ?",
(OrderStatus.MALFORMED.value, reason, order_number),
)
_log_order_error_history(order_number, reason)
return False
async def add_order_items(order_number: str, items: list):
"""Replace order items — delete any existing rows, then insert fresh batch.
GoMag is source of truth: re-import must reflect quantity changes.
Atomic (DELETE + INSERT in one transaction). Items with the same SKU are
merged (quantities summed) to satisfy the (order_number, sku) PK.
Wrapped in _safe_upsert_order_items so a bad payload marks the parent
order MALFORMED rather than exploding the sync.
"""
if not items:
return
items = _dedup_items_by_sku(items)
db = await get_sqlite()
try:
await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,))
await db.executemany("""
INSERT INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(order_number,
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"))
for item in items
])
await _safe_upsert_order_items(db, order_number, items)
await db.commit()
finally:
await db.close()
# ── sync phase failure tracking ───────────────────
async def record_phase_failure(run_id: str, phase: str, error_summary: str) -> None:
"""Insert a phase-failure marker and prune to the last 100 sync runs.
`error_summary` must be short (error_type + message) — no raw payload,
no PII. Used by _phase_wrap in sync_service to surface repeat failures
to the escalation check and the /api/sync/health dashboard pill.
"""
db = await get_sqlite()
try:
await db.execute(
"""INSERT OR REPLACE INTO sync_phase_failures (run_id, phase, error_summary)
VALUES (?, ?, ?)""",
(run_id, phase, error_summary[:500] if error_summary else None),
)
await db.execute("""
DELETE FROM sync_phase_failures
WHERE run_id NOT IN (
SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT 100
)
""")
await db.commit()
finally:
await db.close()
async def get_recent_phase_failures(limit: int = 3) -> dict[str, int]:
"""Return a {phase: failure_count} map across the last N sync runs.
Used by the escalation check (>=3 consecutive failures on the same
phase halts the next sync) and by /api/sync/health for the dashboard
pill.
"""
db = await get_sqlite()
try:
cursor = await db.execute(
"""
SELECT phase, COUNT(*) AS cnt
FROM sync_phase_failures
WHERE run_id IN (
SELECT run_id FROM sync_runs ORDER BY started_at DESC LIMIT ?
)
GROUP BY phase
""",
(limit,),
)
rows = await cursor.fetchall()
return {row[0]: row[1] for row in rows}
finally:
await db.close()
async def get_order_items(order_number: str) -> list:
"""Fetch items for one order."""
db = await get_sqlite()
@@ -694,11 +995,12 @@ async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
"counts": {
"imported": status_counts.get("IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0),
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
"cancelled": status_counts.get("CANCELLED", 0),
"imported": status_counts.get(OrderStatus.IMPORTED.value, 0),
"skipped": status_counts.get(OrderStatus.SKIPPED.value, 0),
"error": status_counts.get(OrderStatus.ERROR.value, 0),
"already_imported": status_counts.get(OrderStatus.ALREADY_IMPORTED.value, 0),
"cancelled": status_counts.get(OrderStatus.CANCELLED.value, 0),
"malformed": status_counts.get(OrderStatus.MALFORMED.value, 0),
"total": sum(status_counts.values())
}
}
@@ -738,8 +1040,8 @@ async def get_orders(page: int = 1, per_page: int = 50,
data_params = list(base_params)
if status_filter and status_filter not in ("all", "UNINVOICED"):
if status_filter.upper() == "IMPORTED":
data_clauses.append("UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')")
if status_filter.upper() == OrderStatus.IMPORTED.value:
data_clauses.append(f"UPPER(status) IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')")
elif status_filter.upper() == "DIFFS":
data_clauses.append(
"(anaf_cod_fiscal_adjusted = 1 OR anaf_denumire_mismatch = 1"
@@ -785,7 +1087,7 @@ async def get_orders(page: int = 1, per_page: int = 50,
# Uninvoiced count: IMPORTED/ALREADY_IMPORTED with no cached invoice, same period+search
uninv_clauses = list(base_clauses) + [
"UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')",
f"UPPER(status) IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')",
"(factura_numar IS NULL OR factura_numar = '')",
]
uninv_where = "WHERE " + " AND ".join(uninv_clauses)
@@ -794,7 +1096,7 @@ async def get_orders(page: int = 1, per_page: int = 50,
# Uninvoiced > 3 days old
uninv_old_clauses = list(base_clauses) + [
"UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')",
f"UPPER(status) IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')",
"(factura_numar IS NULL OR factura_numar = '')",
"order_date < datetime('now', '-3 days')",
]
@@ -828,12 +1130,13 @@ async def get_orders(page: int = 1, per_page: int = 50,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
"counts": {
"imported": status_counts.get("IMPORTED", 0),
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
"imported_all": status_counts.get("IMPORTED", 0) + status_counts.get("ALREADY_IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0),
"cancelled": status_counts.get("CANCELLED", 0),
"imported": status_counts.get(OrderStatus.IMPORTED.value, 0),
"already_imported": status_counts.get(OrderStatus.ALREADY_IMPORTED.value, 0),
"imported_all": status_counts.get(OrderStatus.IMPORTED.value, 0) + status_counts.get(OrderStatus.ALREADY_IMPORTED.value, 0),
"skipped": status_counts.get(OrderStatus.SKIPPED.value, 0),
"error": status_counts.get(OrderStatus.ERROR.value, 0),
"cancelled": status_counts.get(OrderStatus.CANCELLED.value, 0),
"malformed": status_counts.get(OrderStatus.MALFORMED.value, 0),
"total": sum(status_counts.values()),
"uninvoiced_sqlite": uninvoiced_sqlite,
"uninvoiced_old": uninvoiced_old,
@@ -869,9 +1172,9 @@ async def get_uninvoiced_imported_orders() -> list:
"""Get all imported orders that don't yet have invoice data cached."""
db = await get_sqlite()
try:
cursor = await db.execute("""
cursor = await db.execute(f"""
SELECT order_number, id_comanda FROM orders
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
WHERE status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')
AND id_comanda IS NOT NULL
AND factura_numar IS NULL
""")
@@ -923,9 +1226,9 @@ async def get_invoiced_imported_orders() -> list:
"""Get imported orders that HAVE cached invoice data (for re-verification)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
cursor = await db.execute(f"""
SELECT order_number, id_comanda FROM orders
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
WHERE status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')
AND id_comanda IS NOT NULL
AND factura_numar IS NOT NULL AND factura_numar != ''
""")
@@ -939,9 +1242,9 @@ async def get_all_imported_orders() -> list:
"""Get ALL imported orders with id_comanda (for checking if deleted in ROA)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
cursor = await db.execute(f"""
SELECT order_number, id_comanda FROM orders
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
WHERE status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')
AND id_comanda IS NOT NULL
""")
rows = await cursor.fetchall()
@@ -976,9 +1279,9 @@ async def mark_order_deleted_in_roa(order_number: str):
db = await get_sqlite()
try:
await db.execute("DELETE FROM order_items WHERE order_number = ?", (order_number,))
await db.execute("""
await db.execute(f"""
UPDATE orders SET
status = 'DELETED_IN_ROA',
status = '{OrderStatus.DELETED_IN_ROA.value}',
id_comanda = NULL,
id_partener = NULL,
factura_serie = NULL,
@@ -1001,9 +1304,9 @@ async def mark_order_cancelled(order_number: str, web_status: str = "Anulata"):
"""Mark an order as cancelled from GoMag. Clears id_comanda and invoice cache."""
db = await get_sqlite()
try:
await db.execute("""
await db.execute(f"""
UPDATE orders SET
status = 'CANCELLED',
status = '{OrderStatus.CANCELLED.value}',
id_comanda = NULL,
id_partener = NULL,
factura_serie = NULL,
@@ -1055,11 +1358,11 @@ async def get_skipped_orders_with_sku(sku: str) -> list[str]:
"""Get order_numbers of SKIPPED orders that contain the given SKU."""
db = await get_sqlite()
try:
cursor = await db.execute("""
cursor = await db.execute(f"""
SELECT DISTINCT oi.order_number
FROM order_items oi
JOIN orders o ON o.order_number = oi.order_number
WHERE oi.sku = ? AND o.status = 'SKIPPED'
WHERE oi.sku = ? AND o.status = '{OrderStatus.SKIPPED.value}'
""", (sku,))
rows = await cursor.fetchall()
return [row[0] for row in rows]
@@ -1314,7 +1617,7 @@ async def get_orders_missing_anaf() -> list[dict]:
WHERE cod_fiscal_roa IS NOT NULL
AND cod_fiscal_roa != ''
AND anaf_platitor_tva IS NULL
AND status IN ('IMPORTED', 'ALREADY_IMPORTED')
AND status IN ('{OrderStatus.IMPORTED.value}', '{OrderStatus.ALREADY_IMPORTED.value}')
""")
rows = await cursor.fetchall()
return [dict(r) for r in rows]

View File

@@ -2,10 +2,22 @@ import asyncio
import json
import logging
import re
import sqlite3
import uuid
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
# Data-level errors that a single phase may raise without halting the whole
# sync. Everything NOT in this tuple (OperationalError, OSError,
# ConnectionError, MemoryError) propagates and halts.
DATA_ERRORS = (sqlite3.IntegrityError, ValueError, TypeError, UnicodeError)
# Number of recent runs inspected by the escalation check. 3 consecutive
# failures on the same phase halts the next sync.
_ESCALATION_WINDOW = 3
_ESCALATION_THRESHOLD = 3
_tz_bucharest = ZoneInfo("Europe/Bucharest")
@@ -16,6 +28,7 @@ def _now():
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client, anaf_service
from ..config import settings
from .. import database
from ..constants import OrderStatus
logger = logging.getLogger(__name__)
@@ -84,6 +97,38 @@ def _log_line(run_id: str, message: str):
_run_logs[run_id].append(f"[{ts}] {message}")
async def _record_phase_err(run_id: str, phase: str, err: Exception) -> None:
"""Log + persist a phase-level data error so escalation + health can see it.
Called only for DATA_ERRORS (structural / data problems). OperationalError
and OS-level errors bypass this and halt the sync.
"""
logger.error(f"[{run_id}] Phase {phase} data error: {err}", exc_info=True)
_log_line(run_id, f"FAZA {phase} eroare izolata: {type(err).__name__}: {err}")
try:
summary = f"{type(err).__name__}: {err}"[:500]
await sqlite_service.record_phase_failure(run_id, phase, summary)
except Exception as rec_err:
logger.warning(f"record_phase_failure failed for phase={phase}: {rec_err}")
async def _check_escalation() -> tuple[str | None, dict[str, int]]:
"""Return (phase_to_halt_on, recent_counts).
If any phase has >= _ESCALATION_THRESHOLD failures across the last
_ESCALATION_WINDOW runs, we halt the incoming sync and record
`halted_escalation` on sync_runs. Operators can still start the sync
manually from the dashboard override modal.
"""
try:
counts = await sqlite_service.get_recent_phase_failures(limit=_ESCALATION_WINDOW)
except Exception as e:
logger.warning(f"escalation check: failed to read phase failures: {e}")
return None, {}
escalating = [p for p, c in counts.items() if c >= _ESCALATION_THRESHOLD]
return (escalating[0] if escalating else None), counts
def get_run_text_log(run_id: str) -> str | None:
"""Return the accumulated text log for a run, or None if not found."""
lines = _run_logs.get(run_id)
@@ -166,20 +211,20 @@ async def _fix_stale_error_orders(existing_map: dict, run_id: str):
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT order_number FROM orders WHERE status = 'ERROR'"
f"SELECT order_number FROM orders WHERE status = '{OrderStatus.ERROR.value}'"
)
error_orders = [row["order_number"] for row in await cursor.fetchall()]
fixed = 0
for order_number in error_orders:
if order_number in existing_map:
id_comanda = existing_map[order_number]
await db.execute("""
await db.execute(f"""
UPDATE orders SET
status = 'ALREADY_IMPORTED',
status = '{OrderStatus.ALREADY_IMPORTED.value}',
id_comanda = ?,
error_message = NULL,
updated_at = datetime('now')
WHERE order_number = ? AND status = 'ERROR'
WHERE order_number = ? AND status = '{OrderStatus.ERROR.value}'
""", (id_comanda, order_number))
fixed += 1
_log_line(run_id, f"#{order_number} → status corectat ERROR → ALREADY_IMPORTED (ID: {id_comanda})")
@@ -224,6 +269,22 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
json_dir = settings.JSON_OUTPUT_DIR
# ── Escalation check — halt if a phase has failed 3 runs in a row ──
halt_phase, _recent_counts = await _check_escalation()
if halt_phase:
halt_msg = f"ESCALATED: phase {halt_phase} failed {_ESCALATION_THRESHOLD} consecutive runs"
_log_line(run_id, halt_msg)
await sqlite_service.create_sync_run(run_id, 0)
await sqlite_service.update_sync_run(
run_id, "halted_escalation", 0, 0, 0, 0, error_message=halt_msg
)
if _current_sync:
_current_sync["status"] = "halted_escalation"
_current_sync["finished_at"] = _now().isoformat()
_current_sync["error"] = halt_msg
_update_progress("halted_escalation", halt_msg)
return {"run_id": run_id, "status": "halted_escalation", "error": halt_msg}
try:
# Phase 0: Download orders from GoMag API
_update_progress("downloading", "Descărcare comenzi din GoMag API...")
@@ -293,7 +354,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
cancelled_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "CANCELLED", "status_at_run": "CANCELLED",
"status": OrderStatus.CANCELLED.value, "status_at_run": OrderStatus.CANCELLED.value,
"id_comanda": None, "id_partener": None,
"error_message": "Comanda anulata in GoMag",
"missing_skus": None,
@@ -308,7 +369,10 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag")
await sqlite_service.save_orders_batch(cancelled_batch)
try:
await sqlite_service.save_orders_batch(cancelled_batch)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "cancelled_batch", e)
# Check if any cancelled orders were previously imported
from ..database import get_sqlite as _get_sqlite
@@ -320,7 +384,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
SELECT order_number, id_comanda FROM orders
WHERE order_number IN ({placeholders})
AND id_comanda IS NOT NULL
AND status = 'CANCELLED'
AND status = '{OrderStatus.CANCELLED.value}'
""", cancelled_numbers)
previously_imported = [dict(r) for r in await cursor.fetchall()]
finally:
@@ -610,6 +674,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
# Step 3a: Record already-imported orders (batch)
already_imported_count = len(already_in_roa)
already_batch = []
_already_phase_failed = False
for order in already_in_roa:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
id_comanda_roa = existing_map.get(order.number)
@@ -624,7 +689,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
already_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "ALREADY_IMPORTED", "status_at_run": "ALREADY_IMPORTED",
"status": OrderStatus.ALREADY_IMPORTED.value, "status_at_run": OrderStatus.ALREADY_IMPORTED.value,
"id_comanda": id_comanda_roa, "id_partener": None,
"error_message": None, "missing_skus": None,
"items_count": len(order.items),
@@ -637,7 +702,11 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})")
await sqlite_service.save_orders_batch(already_batch)
try:
await sqlite_service.save_orders_batch(already_batch)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "already_batch", e)
_already_phase_failed = True
# Update GoMag addresses + recompute address_mismatch for already-imported orders
addr_updates = []
@@ -647,10 +716,13 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
"adresa_livrare_gomag": json.dumps({"address": order.shipping.address, "city": order.shipping.city, "region": order.shipping.region}) if order.shipping else None,
"adresa_facturare_gomag": json.dumps({"address": order.billing.address, "city": order.billing.city, "region": order.billing.region}),
})
await sqlite_service.update_gomag_addresses_batch(addr_updates)
try:
await sqlite_service.update_gomag_addresses_batch(addr_updates)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "addresses_batch", e)
# Detect partner mismatches for already-imported orders
if already_in_roa:
if already_in_roa and not _already_phase_failed:
stored_partner_data = await sqlite_service.get_orders_partner_data_batch(
[o.number for o in already_in_roa]
)
@@ -736,7 +808,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
skipped_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "SKIPPED", "status_at_run": "SKIPPED",
"status": OrderStatus.SKIPPED.value, "status_at_run": OrderStatus.SKIPPED.value,
"id_comanda": None, "id_partener": None,
"error_message": None, "missing_skus": missing_skus,
"items_count": len(order.items),
@@ -749,7 +821,10 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
await sqlite_service.save_orders_batch(skipped_batch)
try:
await sqlite_service.save_orders_batch(skipped_batch)
except DATA_ERRORS as e:
await _record_phase_err(run_id, "skipped_batch", e)
# ── Price sync from orders ──
if app_settings.get("price_sync_enabled") == "1":
@@ -768,6 +843,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
_log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate")
for pu in price_updates:
_log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f}{pu['new_price']:.2f}")
except DATA_ERRORS as e:
await _record_phase_err(run_id, "price_sync", e)
except Exception as e:
_log_line(run_id, f"Eroare sync prețuri din comenzi: {e}")
logger.error(f"Price sync error: {e}")
@@ -901,7 +978,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="IMPORTED",
status=OrderStatus.IMPORTED.value,
id_comanda=result["id_comanda"],
id_partener=result["id_partener"],
items_count=len(order.items),
@@ -915,7 +992,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
web_status=order.status or None,
discount_split=discount_split_json,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "IMPORTED")
await sqlite_service.add_sync_run_order(run_id, order.number, OrderStatus.IMPORTED.value)
# Store ROA address IDs (R9)
await sqlite_service.update_import_order_addresses(
order.number,
@@ -968,7 +1045,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="ERROR",
status=OrderStatus.ERROR.value,
id_partener=result.get("id_partener"),
error_message=result["error"],
items_count=len(order.items),
@@ -982,7 +1059,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
web_status=order.status or None,
discount_split=discount_split_json,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "ERROR")
await sqlite_service.add_sync_run_order(run_id, order.number, OrderStatus.ERROR.value)
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → EROARE: {result['error']}")
@@ -1049,6 +1126,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
_log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache")
if orders_deleted:
_log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}")
except DATA_ERRORS as e:
await _record_phase_err(run_id, "invoice_check", e)
except Exception as e:
logger.warning(f"Invoice/order status check failed: {e}")
@@ -1099,6 +1178,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
await sqlite_service.bulk_update_order_anaf_data(db_updates)
if db_updates:
_log_line(run_id, f"ANAF backfill: {len(db_updates)}/{len(orders_needing_anaf)} comenzi actualizate")
except DATA_ERRORS as e:
await _record_phase_err(run_id, "anaf_backfill", e)
except Exception as e:
logger.warning(f"ANAF backfill failed: {e}")
_log_line(run_id, f"ANAF backfill eroare: {e}")

View File

@@ -1,11 +1,77 @@
import asyncio
import logging
from datetime import datetime
from .. import database
from . import sqlite_service
logger = logging.getLogger(__name__)
def validate_structural(order: dict) -> tuple[bool, str | None, str | None]:
"""Pre-flight structural validator used by save_orders_batch.
Returns (True, None, None) on pass, (False, error_type, error_msg) on fail.
Rules are intentionally minimal — only catches malformed payloads that
would crash downstream inserts. Semantic checks (SKU existence, price
comparison, etc.) are handled in later phases.
"""
if not isinstance(order, dict):
return False, "MISSING_FIELD", f"order is not a dict: {type(order).__name__}"
order_number = order.get("order_number")
if order_number is None or str(order_number).strip() == "":
return False, "MISSING_FIELD", "order_number is missing or empty"
raw_date = order.get("order_date")
if raw_date in (None, ""):
return False, "INVALID_DATE", "order_date is missing or empty"
if isinstance(raw_date, datetime):
pass
elif isinstance(raw_date, str):
parsed = None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
try:
parsed = datetime.strptime(raw_date, fmt)
break
except ValueError:
continue
if parsed is None:
try:
parsed = datetime.fromisoformat(raw_date.replace("Z", "+00:00"))
except ValueError:
return False, "INVALID_DATE", f"order_date not parseable: {raw_date!r}"
else:
return False, "INVALID_DATE", f"order_date wrong type: {type(raw_date).__name__}"
items = order.get("items")
if not items or not isinstance(items, list):
return False, "EMPTY_ITEMS", "items missing or not a non-empty list"
for idx, item in enumerate(items):
if not isinstance(item, dict):
return False, "EMPTY_ITEMS", f"item[{idx}] is not a dict"
qty_raw = item.get("quantity")
if qty_raw is None or qty_raw == "":
return False, "INVALID_QUANTITY", f"item[{idx}] quantity missing"
try:
qty = float(qty_raw)
except (TypeError, ValueError):
return False, "INVALID_QUANTITY", f"item[{idx}] quantity not numeric: {qty_raw!r}"
if qty <= 0:
return False, "INVALID_QUANTITY", f"item[{idx}] quantity not > 0: {qty}"
price_raw = item.get("price")
if price_raw is None or price_raw == "":
return False, "INVALID_PRICE", f"item[{idx}] price missing"
try:
float(price_raw)
except (TypeError, ValueError):
return False, "INVALID_PRICE", f"item[{idx}] price not numeric: {price_raw!r}"
return True, None, None
async def reconcile_unresolved_missing_skus(conn=None) -> dict:
"""Revalidate all resolved=0 SKUs in missing_skus against Oracle.
Fail-soft: logs warning and returns zero if Oracle is unavailable.

View File

@@ -495,6 +495,7 @@ input[type="checkbox"] {
.dot-yellow { background: var(--warning); box-shadow: 0 0 6px 2px rgba(202,138,4,0.3); }
.dot-red { background: var(--error); box-shadow: 0 0 8px 2px rgba(220,38,38,0.35); }
.dot-gray { background: var(--cancelled); }
.dot-orange { background: var(--compare); box-shadow: 0 0 8px 2px rgba(234,88,12,0.35); }
.dot-blue { background: var(--info); }
/* ── Flat row (mobile + desktop) ────────────────── */
@@ -805,6 +806,45 @@ tr.mapping-deleted td {
.sync-status-dot.running { background: var(--info); animation: pulse-dot 2s ease-in-out infinite; }
.sync-status-dot.completed { background: var(--success); }
.sync-status-dot.failed { background: var(--error); }
.sync-status-dot.malformed { background: var(--compare); box-shadow: 0 0 8px 2px rgba(234,88,12,0.35); }
/* ── Sync health pill (dashboard only) ────────────── */
.health-pill {
display: inline-flex;
align-items: center;
gap: 0.375rem;
padding: 0.375rem 0.625rem;
min-height: 32px;
border-radius: 999px;
font-size: 0.8125rem;
line-height: 1;
font-weight: 500;
border: 1px solid transparent;
cursor: default;
user-select: none;
transition: background 120ms ease;
}
.health-pill i { font-size: 0.9375rem; line-height: 1; }
.health-pill.healthy {
background: var(--success-light);
color: var(--success-text);
border-color: var(--success);
}
.health-pill.warning {
background: var(--warning-light);
color: var(--warning-text);
border-color: var(--warning);
}
.health-pill.escalated {
background: var(--error-light);
color: var(--error-text);
border-color: var(--error);
box-shadow: 0 0 8px 2px rgba(220,38,38,0.35);
}
/* Ensure adequate touch target on mobile */
@media (max-width: 600px) {
.health-pill { min-height: 44px; padding: 0.5rem 0.75rem; }
}
/* ── Custom period range inputs ──────────────────── */
.period-custom-range {

View File

@@ -162,9 +162,76 @@ document.addEventListener('DOMContentLoaded', () => {
});
});
// ── Sync Health pill ─────────────────────────────
let _lastHealth = null;
async function pollSyncHealth() {
try {
const data = await fetchJSON('/api/sync/health');
_lastHealth = data;
renderHealthPill(data);
} catch (e) { /* fail-soft: keep last state */ }
}
function renderHealthPill(h) {
const pill = document.getElementById('syncHealthPill');
if (!pill) return;
const icon = pill.querySelector('i');
const label = pill.querySelector('.health-pill-label');
let state = 'healthy', iconCls = 'bi-check-circle-fill', text = 'Sanatos', tooltip;
const recent = h.recent_phase_failures || {};
const recentCount = Object.values(recent).reduce((a, b) => a + (b || 0), 0);
if (h.escalation_phase || h.last_sync_status === 'halted_escalation') {
state = 'escalated';
iconCls = 'bi-x-octagon-fill';
text = 'Blocat';
tooltip = `Blocat — faza "${h.escalation_phase || '?'}" a esuat 3 sync-uri consecutive.\n`
+ `Ultima eroare: ${h.last_halt_reason || '—'}\n`
+ `Click Start Sync pentru override manual.`;
} else if (h.last_sync_status === 'failed' || recentCount > 0) {
state = 'warning';
iconCls = 'bi-exclamation-triangle-fill';
text = 'Atentie';
const topPhases = Object.entries(recent).slice(0, 3)
.map(([p, c]) => `${p} (${c} of last 3)`).join(', ');
tooltip = `Atentie — ${topPhases || 'sync anterior esuat'}`
+ (h.last_halt_reason ? `\nLast error: ${h.last_halt_reason}` : '');
} else {
const lastAt = h.last_sync_at ? h.last_sync_at.replace('T', ' ').slice(5, 16) : 'nicio rulare';
tooltip = `Sanatos — ultimul sync: ${lastAt}`;
}
pill.className = 'health-pill ' + state;
pill.setAttribute('aria-label', `Sync: ${text}`);
pill.title = tooltip;
if (icon) icon.className = 'bi ' + iconCls;
if (label) label.textContent = text;
}
function startHealthPolling() {
pollSyncHealth();
setInterval(pollSyncHealth, 10000);
}
document.addEventListener('DOMContentLoaded', startHealthPolling);
// ── Sync Controls ─────────────────────────────────
async function startSync() {
// Escalation override — confirm before overriding the auto-halt
if (_lastHealth && (_lastHealth.escalation_phase
|| _lastHealth.last_sync_status === 'halted_escalation')) {
const phase = _lastHealth.escalation_phase || '?';
const reason = _lastHealth.last_halt_reason || '(unknown)';
const msg = `⚠ Sync blocat automat\n\n`
+ `Faza "${phase}" a esuat in ultimele 3 sync-uri consecutive.\n`
+ `Ultima eroare: ${reason}\n\n`
+ `Repornesti oricum?`;
if (!confirm(msg)) return;
}
try {
const res = await fetch('/api/sync/start', { method: 'POST' });
const data = await res.json();
@@ -174,6 +241,7 @@ async function startSync() {
}
// Polling will detect the running state — just speed it up immediately
pollSyncStatus();
pollSyncHealth();
} catch (err) {
alert('Eroare: ' + err.message);
}
@@ -329,6 +397,7 @@ async function loadDashOrders() {
if (el('cntFact')) el('cntFact').textContent = c.facturate || 0;
if (el('cntNef')) el('cntNef').textContent = c.nefacturate || c.uninvoiced || 0;
if (el('cntCanc')) el('cntCanc').textContent = c.cancelled || 0;
if (el('cntMal')) el('cntMal').textContent = c.malformed || 0;
if (el('cntDiff')) el('cntDiff').textContent = c.diffs || 0;
// Attention card
@@ -376,7 +445,7 @@ async function loadDashOrders() {
<td class="text-end text-muted">${fmtCost(o.delivery_cost)}</td>
<td class="text-end text-muted">${fmtCost(o.discount_total)}</td>
<td class="text-end fw-bold">${orderTotal}</td>
<td class="kebab-dropdown" onclick="event.stopPropagation()">${(o.status === 'IMPORTED' || o.status === 'ALREADY_IMPORTED') && !(o.invoice && o.invoice.facturat) ? '<div class="dropdown"><button class="btn btn-sm border-0" aria-label="Actiuni comanda" data-bs-toggle="dropdown"><i class="bi bi-three-dots-vertical"></i></button><ul class="dropdown-menu dropdown-menu-end"><li><button class="dropdown-item" onclick="dashResyncOrder(\'' + esc(o.order_number) + '\', this)"><i class="bi bi-arrow-repeat me-2"></i>Resync</button></li><li><button class="dropdown-item text-danger" onclick="dashDeleteOrder(\'' + esc(o.order_number) + '\', this)"><i class="bi bi-trash me-2"></i>Sterge din ROA</button></li></ul></div>' : ''}</td>
<td class="kebab-dropdown" onclick="event.stopPropagation()">${(o.status === ORDER_STATUS.IMPORTED || o.status === ORDER_STATUS.ALREADY_IMPORTED) && !(o.invoice && o.invoice.facturat) ? '<div class="dropdown"><button class="btn btn-sm border-0" aria-label="Actiuni comanda" data-bs-toggle="dropdown"><i class="bi bi-three-dots-vertical"></i></button><ul class="dropdown-menu dropdown-menu-end"><li><button class="dropdown-item" onclick="dashResyncOrder(\'' + esc(o.order_number) + '\', this)"><i class="bi bi-arrow-repeat me-2"></i>Resync</button></li><li><button class="dropdown-item text-danger" onclick="dashDeleteOrder(\'' + esc(o.order_number) + '\', this)"><i class="bi bi-trash me-2"></i>Sterge din ROA</button></li></ul></div>' : ''}</td>
</tr>`;
}).join('');
}
@@ -409,12 +478,13 @@ async function loadDashOrders() {
// Mobile segmented control
renderMobileSegmented('dashMobileSeg', [
{ label: 'Toate', count: c.total || 0, value: 'all', active: (activeStatus || 'all') === 'all', colorClass: 'fc-neutral' },
{ label: 'Imp.', count: c.imported_all || c.imported || 0, value: 'IMPORTED', active: activeStatus === 'IMPORTED', colorClass: 'fc-green' },
{ label: 'Omise', count: c.skipped || 0, value: 'SKIPPED', active: activeStatus === 'SKIPPED', colorClass: 'fc-yellow' },
{ label: 'Erori', count: c.error || c.errors || 0, value: 'ERROR', active: activeStatus === 'ERROR', colorClass: 'fc-red' },
{ label: 'Imp.', count: c.imported_all || c.imported || 0, value: ORDER_STATUS.IMPORTED, active: activeStatus === ORDER_STATUS.IMPORTED, colorClass: 'fc-green' },
{ label: 'Omise', count: c.skipped || 0, value: ORDER_STATUS.SKIPPED, active: activeStatus === ORDER_STATUS.SKIPPED, colorClass: 'fc-yellow' },
{ label: 'Erori', count: c.error || c.errors || 0, value: ORDER_STATUS.ERROR, active: activeStatus === ORDER_STATUS.ERROR, colorClass: 'fc-red' },
{ label: 'Fact.', count: c.facturate || 0, value: 'INVOICED', active: activeStatus === 'INVOICED', colorClass: 'fc-green' },
{ label: 'Nefact.', count: c.nefacturate || c.uninvoiced || 0, value: 'UNINVOICED', active: activeStatus === 'UNINVOICED', colorClass: 'fc-red' },
{ label: 'Anulate', count: c.cancelled || 0, value: 'CANCELLED', active: activeStatus === 'CANCELLED', colorClass: 'fc-dark' },
{ label: 'Anulate', count: c.cancelled || 0, value: ORDER_STATUS.CANCELLED, active: activeStatus === ORDER_STATUS.CANCELLED, colorClass: 'fc-dark' },
{ label: 'Def.', count: c.malformed || 0, value: ORDER_STATUS.MALFORMED, active: activeStatus === ORDER_STATUS.MALFORMED, colorClass: 'fc-orange' },
{ label: 'Dif.', count: c.diffs || 0, value: 'DIFFS', active: activeStatus === 'DIFFS', colorClass: 'fc-orange' }
], (val) => {
document.querySelectorAll('.filter-pill[data-status]').forEach(b => b.classList.remove('active'));
@@ -496,10 +566,10 @@ function escHtml(s) {
function statusLabelText(status) {
switch ((status || '').toUpperCase()) {
case 'IMPORTED': return 'Importat';
case 'ALREADY_IMPORTED': return 'Deja imp.';
case 'SKIPPED': return 'Omis';
case 'ERROR': return 'Eroare';
case ORDER_STATUS.IMPORTED: return 'Importat';
case ORDER_STATUS.ALREADY_IMPORTED: return 'Deja imp.';
case ORDER_STATUS.SKIPPED: return 'Omis';
case ORDER_STATUS.ERROR: return 'Eroare';
default: return esc(status);
}
}
@@ -523,7 +593,7 @@ function diffDots(o, mobile) {
}
function invoiceDot(order) {
if (order.status !== 'IMPORTED' && order.status !== 'ALREADY_IMPORTED') return '';
if (order.status !== ORDER_STATUS.IMPORTED && order.status !== ORDER_STATUS.ALREADY_IMPORTED) return '';
if (order.invoice && order.invoice.facturat) return '<span class="dot dot-green" style="box-shadow:none" title="Facturat"></span>';
return '<span class="dot dot-red" style="box-shadow:none" title="Nefacturat"></span>';
}

View File

@@ -28,10 +28,10 @@ function runStatusBadge(status) {
function logStatusText(status) {
switch ((status || '').toUpperCase()) {
case 'IMPORTED': return 'Importat';
case 'ALREADY_IMPORTED': return 'Deja imp.';
case 'SKIPPED': return 'Omis';
case 'ERROR': return 'Eroare';
case ORDER_STATUS.IMPORTED: return 'Importat';
case ORDER_STATUS.ALREADY_IMPORTED: return 'Deja imp.';
case ORDER_STATUS.SKIPPED: return 'Omis';
case ORDER_STATUS.ERROR: return 'Eroare';
default: return esc(status);
}
}
@@ -137,6 +137,8 @@ async function loadRunOrders(runId, statusFilter, page) {
document.getElementById('countError').textContent = counts.error || 0;
const alreadyEl = document.getElementById('countAlreadyImported');
if (alreadyEl) alreadyEl.textContent = counts.already_imported || 0;
const malEl = document.getElementById('countMalformed');
if (malEl) malEl.textContent = counts.malformed || 0;
const tbody = document.getElementById('runOrdersBody');
const orders = data.orders || [];
@@ -144,9 +146,9 @@ async function loadRunOrders(runId, statusFilter, page) {
if (orders.length === 0) {
tbody.innerHTML = '<tr><td colspan="9" class="text-center text-muted py-3">Nicio comanda</td></tr>';
} else {
const problemOrders = orders.filter(o => ['ERROR', 'SKIPPED'].includes(o.status));
const okOrders = orders.filter(o => ['IMPORTED', 'ALREADY_IMPORTED'].includes(o.status));
const otherOrders = orders.filter(o => !['ERROR', 'SKIPPED', 'IMPORTED', 'ALREADY_IMPORTED'].includes(o.status));
const problemOrders = orders.filter(o => [ORDER_STATUS.ERROR, ORDER_STATUS.SKIPPED].includes(o.status));
const okOrders = orders.filter(o => [ORDER_STATUS.IMPORTED, ORDER_STATUS.ALREADY_IMPORTED].includes(o.status));
const otherOrders = orders.filter(o => ![ORDER_STATUS.ERROR, ORDER_STATUS.SKIPPED, ORDER_STATUS.IMPORTED, ORDER_STATUS.ALREADY_IMPORTED].includes(o.status));
function orderRow(o, i) {
const dateStr = fmtDate(o.order_date);
@@ -195,9 +197,9 @@ async function loadRunOrders(runId, statusFilter, page) {
if (orders.length === 0) {
mobileList.innerHTML = '<div class="flat-row text-muted py-3 justify-content-center">Nicio comanda</div>';
} else {
const problemOrders = orders.filter(o => ['ERROR', 'SKIPPED'].includes(o.status));
const okOrders = orders.filter(o => ['IMPORTED', 'ALREADY_IMPORTED'].includes(o.status));
const otherOrders = orders.filter(o => !['ERROR', 'SKIPPED', 'IMPORTED', 'ALREADY_IMPORTED'].includes(o.status));
const problemOrders = orders.filter(o => [ORDER_STATUS.ERROR, ORDER_STATUS.SKIPPED].includes(o.status));
const okOrders = orders.filter(o => [ORDER_STATUS.IMPORTED, ORDER_STATUS.ALREADY_IMPORTED].includes(o.status));
const otherOrders = orders.filter(o => ![ORDER_STATUS.ERROR, ORDER_STATUS.SKIPPED, ORDER_STATUS.IMPORTED, ORDER_STATUS.ALREADY_IMPORTED].includes(o.status));
function mobileRow(o) {
const d = o.order_date || '';
@@ -235,10 +237,11 @@ async function loadRunOrders(runId, statusFilter, page) {
// Mobile segmented control
renderMobileSegmented('logsMobileSeg', [
{ label: 'Toate', count: counts.total || 0, value: 'all', active: currentFilter === 'all', colorClass: 'fc-neutral' },
{ label: 'Imp.', count: counts.imported || 0, value: 'IMPORTED', active: currentFilter === 'IMPORTED', colorClass: 'fc-green' },
{ label: 'Deja', count: counts.already_imported || 0, value: 'ALREADY_IMPORTED', active: currentFilter === 'ALREADY_IMPORTED', colorClass: 'fc-blue' },
{ label: 'Omise', count: counts.skipped || 0, value: 'SKIPPED', active: currentFilter === 'SKIPPED', colorClass: 'fc-yellow' },
{ label: 'Erori', count: counts.error || 0, value: 'ERROR', active: currentFilter === 'ERROR', colorClass: 'fc-red' }
{ label: 'Imp.', count: counts.imported || 0, value: ORDER_STATUS.IMPORTED, active: currentFilter === ORDER_STATUS.IMPORTED, colorClass: 'fc-green' },
{ label: 'Deja', count: counts.already_imported || 0, value: ORDER_STATUS.ALREADY_IMPORTED, active: currentFilter === ORDER_STATUS.ALREADY_IMPORTED, colorClass: 'fc-blue' },
{ label: 'Omise', count: counts.skipped || 0, value: ORDER_STATUS.SKIPPED, active: currentFilter === ORDER_STATUS.SKIPPED, colorClass: 'fc-yellow' },
{ label: 'Erori', count: counts.error || 0, value: ORDER_STATUS.ERROR, active: currentFilter === ORDER_STATUS.ERROR, colorClass: 'fc-red' },
{ label: 'Defecte', count: counts.malformed || 0, value: ORDER_STATUS.MALFORMED, active: currentFilter === ORDER_STATUS.MALFORMED, colorClass: 'fc-orange' }
], (val) => filterOrders(val));
// Orders pagination

View File

@@ -11,6 +11,17 @@
};
})();
// ── Order status constants (mirror of Python OrderStatus enum) ────────────
const ORDER_STATUS = Object.freeze({
IMPORTED: 'IMPORTED',
ALREADY_IMPORTED: 'ALREADY_IMPORTED',
SKIPPED: 'SKIPPED',
ERROR: 'ERROR',
CANCELLED: 'CANCELLED',
DELETED_IN_ROA: 'DELETED_IN_ROA',
MALFORMED: 'MALFORMED',
});
// ── HTML escaping ─────────────────────────────────
function esc(s) {
if (s == null) return '';
@@ -503,12 +514,13 @@ function fmtNum(v) {
function orderStatusBadge(status) {
switch ((status || '').toUpperCase()) {
case 'IMPORTED': return '<span class="badge bg-success">Importat</span>';
case 'ALREADY_IMPORTED': return '<span class="badge bg-info">Deja importat</span>';
case 'SKIPPED': return '<span class="badge bg-warning">Omis</span>';
case 'ERROR': return '<span class="badge bg-danger">Eroare</span>';
case 'CANCELLED': return '<span class="badge bg-secondary">Anulat</span>';
case 'DELETED_IN_ROA': return '<span class="badge bg-dark">Sters din ROA</span>';
case ORDER_STATUS.IMPORTED: return '<span class="badge bg-success">Importat</span>';
case ORDER_STATUS.ALREADY_IMPORTED: return '<span class="badge bg-info">Deja importat</span>';
case ORDER_STATUS.SKIPPED: return '<span class="badge bg-warning">Omis</span>';
case ORDER_STATUS.ERROR: return '<span class="badge bg-danger">Eroare</span>';
case ORDER_STATUS.CANCELLED: return '<span class="badge bg-secondary">Anulat</span>';
case ORDER_STATUS.DELETED_IN_ROA: return '<span class="badge bg-dark">Sters din ROA</span>';
case ORDER_STATUS.MALFORMED: return '<span class="badge" style="background:var(--compare-light);color:var(--compare-text);border:1px solid var(--compare)">Defect</span>';
default: return `<span class="badge bg-secondary">${esc(status)}</span>`;
}
}
@@ -844,7 +856,7 @@ async function renderOrderDetailModal(orderNumber, opts) {
// Retry button (only for ERROR/SKIPPED orders)
const retryBtn = document.getElementById('detailRetryBtn');
if (retryBtn) {
const canRetry = ['ERROR', 'SKIPPED', 'DELETED_IN_ROA'].includes((order.status || '').toUpperCase());
const canRetry = [ORDER_STATUS.ERROR, ORDER_STATUS.SKIPPED, ORDER_STATUS.DELETED_IN_ROA].includes((order.status || '').toUpperCase());
retryBtn.style.display = canRetry ? '' : 'none';
if (canRetry) {
retryBtn.onclick = async () => {
@@ -879,7 +891,7 @@ async function renderOrderDetailModal(orderNumber, opts) {
// Resync button (IMPORTED/ALREADY_IMPORTED only)
const resyncBtn = document.getElementById('detailResyncBtn');
if (resyncBtn) {
const canResync = ['IMPORTED', 'ALREADY_IMPORTED'].includes((order.status || '').toUpperCase());
const canResync = [ORDER_STATUS.IMPORTED, ORDER_STATUS.ALREADY_IMPORTED].includes((order.status || '').toUpperCase());
resyncBtn.style.display = canResync ? '' : 'none';
if (canResync) {
const isInvoiced = !!(order.factura_numar);
@@ -930,7 +942,7 @@ async function renderOrderDetailModal(orderNumber, opts) {
// Delete button (IMPORTED/ALREADY_IMPORTED only)
const deleteBtn = document.getElementById('detailDeleteBtn');
if (deleteBtn) {
const canDelete = ['IMPORTED', 'ALREADY_IMPORTED'].includes((order.status || '').toUpperCase());
const canDelete = [ORDER_STATUS.IMPORTED, ORDER_STATUS.ALREADY_IMPORTED].includes((order.status || '').toUpperCase());
deleteBtn.style.display = canDelete ? '' : 'none';
if (canDelete) {
const isInvoiced = !!(order.factura_numar);
@@ -1015,20 +1027,22 @@ function inlineConfirmAction(btn, confirmText, actionFn, opts) {
// ── Dot helper ────────────────────────────────────
function statusDot(status) {
switch ((status || '').toUpperCase()) {
case 'IMPORTED':
case 'ALREADY_IMPORTED':
case ORDER_STATUS.IMPORTED:
case ORDER_STATUS.ALREADY_IMPORTED:
case 'COMPLETED':
case 'RESOLVED':
return '<span class="dot dot-green"></span>';
case 'SKIPPED':
case ORDER_STATUS.SKIPPED:
case 'UNRESOLVED':
case 'INCOMPLETE':
return '<span class="dot dot-yellow"></span>';
case 'ERROR':
case ORDER_STATUS.ERROR:
case 'FAILED':
return '<span class="dot dot-red"></span>';
case 'CANCELLED':
case 'DELETED_IN_ROA':
case ORDER_STATUS.MALFORMED:
return '<span class="dot dot-orange" title="Date defecte — escalat la GoMag"></span>';
case ORDER_STATUS.CANCELLED:
case ORDER_STATUS.DELETED_IN_ROA:
return '<span class="dot dot-gray"></span>';
default:
return '<span class="dot dot-gray"></span>';
@@ -1168,7 +1182,7 @@ function _renderHeaderInfo(order) {
}
// ERROR orders: muted dashes for ROA fields
if (order.status === 'ERROR' && !order.id_comanda) {
if (order.status === ORDER_STATUS.ERROR && !order.id_comanda) {
document.getElementById('detailIdComanda').innerHTML = '<span class="text-muted">\u2014</span>';
document.getElementById('detailIdPartener').innerHTML = '<span class="text-muted">\u2014</span>';
}

View File

@@ -19,7 +19,7 @@
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.2/font/bootstrap-icons.css" rel="stylesheet">
{% set rp = request.scope.get('root_path', '') %}
<link href="{{ rp }}/static/css/style.css?v=45" rel="stylesheet">
<link href="{{ rp }}/static/css/style.css?v=46" rel="stylesheet">
</head>
<body>
<!-- Top Navbar (hidden on mobile via CSS) -->
@@ -169,7 +169,7 @@
<script>window.ROOT_PATH = "{{ rp }}";</script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/js/bootstrap.bundle.min.js"></script>
<script src="{{ rp }}/static/js/shared.js?v=46"></script>
<script src="{{ rp }}/static/js/shared.js?v=47"></script>
<script>
// Dark mode toggle
function toggleDarkMode() {

View File

@@ -14,6 +14,11 @@
<div class="sync-card-controls">
<span id="syncStatusDot" class="sync-status-dot idle"></span>
<span id="syncStatusText" class="text-secondary">Inactiv</span>
<span id="syncHealthPill" class="health-pill healthy" role="status"
aria-label="Sync sanatos" title="Verificare stare sync">
<i class="bi bi-check-circle-fill" aria-hidden="true"></i>
<span class="health-pill-label">Sanatos</span>
</span>
<div class="d-flex align-items-center gap-2">
<label class="d-flex align-items-center gap-1 text-muted">
Auto:
@@ -70,12 +75,13 @@
<input type="search" id="orderSearch" placeholder="Cauta comanda, client..." class="search-input">
<!-- Status pills -->
<button class="filter-pill active d-none d-md-inline-flex" data-status="all">Toate <span class="filter-count fc-neutral" id="cntAll">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="IMPORTED">Importat <span class="filter-count fc-green" id="cntImp">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="SKIPPED">Omise <span class="filter-count fc-yellow" id="cntSkip">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="ERROR">Erori <span class="filter-count fc-red" id="cntErr">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="{{ OrderStatus.IMPORTED.value }}">Importat <span class="filter-count fc-green" id="cntImp">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="{{ OrderStatus.SKIPPED.value }}">Omise <span class="filter-count fc-yellow" id="cntSkip">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="{{ OrderStatus.ERROR.value }}">Erori <span class="filter-count fc-red" id="cntErr">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="INVOICED">Facturate <span class="filter-count fc-green" id="cntFact">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="UNINVOICED">Nefacturate <span class="filter-count fc-red" id="cntNef">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="CANCELLED">Anulate <span class="filter-count fc-dark" id="cntCanc">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="{{ OrderStatus.CANCELLED.value }}">Anulate <span class="filter-count fc-dark" id="cntCanc">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="{{ OrderStatus.MALFORMED.value }}">Defecte <span class="filter-count fc-orange" id="cntMal">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-status="DIFFS">Diferente <span class="filter-count fc-orange" id="cntDiff">0</span></button>
<button class="btn btn-sm btn-outline-secondary d-none d-md-inline-flex" id="btnRefreshInvoices" onclick="refreshInvoices()" title="Actualizeaza status facturi din Oracle">&#8635;</button>
</div>
@@ -115,5 +121,5 @@
{% endblock %}
{% block scripts %}
<script src="{{ request.scope.get('root_path', '') }}/static/js/dashboard.js?v=51"></script>
<script src="{{ request.scope.get('root_path', '') }}/static/js/dashboard.js?v=52"></script>
{% endblock %}

View File

@@ -59,10 +59,11 @@
<!-- Filter pills -->
<div class="filter-bar mb-3" id="orderFilterPills">
<button class="filter-pill active d-none d-md-inline-flex" data-log-status="all">Toate <span class="filter-count fc-neutral" id="countAll">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="IMPORTED">Importate <span class="filter-count fc-green" id="countImported">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="ALREADY_IMPORTED">Deja imp. <span class="filter-count fc-blue" id="countAlreadyImported">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="SKIPPED">Omise <span class="filter-count fc-yellow" id="countSkipped">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="ERROR">Erori <span class="filter-count fc-red" id="countError">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="{{ OrderStatus.IMPORTED.value }}">Importate <span class="filter-count fc-green" id="countImported">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="{{ OrderStatus.ALREADY_IMPORTED.value }}">Deja imp. <span class="filter-count fc-blue" id="countAlreadyImported">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="{{ OrderStatus.SKIPPED.value }}">Omise <span class="filter-count fc-yellow" id="countSkipped">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="{{ OrderStatus.ERROR.value }}">Erori <span class="filter-count fc-red" id="countError">0</span></button>
<button class="filter-pill d-none d-md-inline-flex" data-log-status="{{ OrderStatus.MALFORMED.value }}">Defecte <span class="filter-count fc-orange" id="countMalformed">0</span></button>
</div>
<div class="d-md-none mb-2" id="logsMobileSeg" style="overflow-x:auto"></div>
@@ -109,5 +110,5 @@
{% endblock %}
{% block scripts %}
<script src="{{ request.scope.get('root_path', '') }}/static/js/logs.js?v=15"></script>
<script src="{{ request.scope.get('root_path', '') }}/static/js/logs.js?v=16"></script>
{% endblock %}

View File

@@ -36,6 +36,7 @@ from unittest.mock import MagicMock, patch
from app.services.import_service import build_articles_json, compute_discount_split
from app.services.order_reader import OrderData, OrderItem
from app.constants import OrderStatus
# ---------------------------------------------------------------------------
@@ -857,14 +858,14 @@ class TestRefreshOrderAddress:
def test_null_address_ids_returns_422(self, client, db):
"""Orders without Oracle address IDs return 422."""
db.execute("INSERT OR IGNORE INTO orders (order_number, status) VALUES ('test-no-addr', 'IMPORTED')")
db.execute(f"INSERT OR IGNORE INTO orders (order_number, status) VALUES ('test-no-addr', '{OrderStatus.IMPORTED.value}')")
db.commit()
res = client.post("/api/orders/test-no-addr/refresh-address")
assert res.status_code == 422
def test_oracle_unavailable_returns_503(self, client, db, monkeypatch):
"""Oracle connection failure returns 503."""
db.execute("INSERT OR IGNORE INTO orders (order_number, status, id_adresa_livrare) VALUES ('test-oracle-fail', 'IMPORTED', 4116)")
db.execute(f"INSERT OR IGNORE INTO orders (order_number, status, id_adresa_livrare) VALUES ('test-oracle-fail', '{OrderStatus.IMPORTED.value}', 4116)")
db.commit()
import asyncio as _asyncio
@@ -878,7 +879,7 @@ class TestRefreshOrderAddress:
def test_refresh_returns_8_fields(self, client, db, monkeypatch):
"""Successful refresh returns 8-field address dict."""
db.execute("INSERT OR IGNORE INTO orders (order_number, status, id_adresa_livrare) VALUES ('test-refresh-ok', 'IMPORTED', 4116)")
db.execute(f"INSERT OR IGNORE INTO orders (order_number, status, id_adresa_livrare) VALUES ('test-refresh-ok', '{OrderStatus.IMPORTED.value}', 4116)")
db.commit()
mock_result = (
@@ -908,7 +909,7 @@ class TestRefreshOrderAddress:
from unittest.mock import AsyncMock # noqa: E402 (already imported MagicMock/patch above)
def _make_order_detail(status='IMPORTED', id_comanda=12345, factura_numar=None):
def _make_order_detail(status=OrderStatus.IMPORTED.value, id_comanda=12345, factura_numar=None):
return {
"order": {
"order_number": "1001",
@@ -983,7 +984,7 @@ class TestResyncDeleteSafetyGates:
from app.services import retry_service
with patch('app.services.sqlite_service.get_order_detail',
new=AsyncMock(return_value=_make_order_detail(status='ERROR'))), \
new=AsyncMock(return_value=_make_order_detail(status=OrderStatus.ERROR.value))), \
patch('app.services.sync_service._sync_lock', new=_unlocked_lock()):
result = await retry_service.resync_single_order("1001", {})
@@ -1051,7 +1052,7 @@ class TestResyncDeleteHappyPaths:
from app.services import retry_service
with patch('app.services.sqlite_service.get_order_detail',
new=AsyncMock(return_value=_make_order_detail(status='DELETED_IN_ROA'))), \
new=AsyncMock(return_value=_make_order_detail(status=OrderStatus.DELETED_IN_ROA.value))), \
patch('app.services.sync_service._sync_lock', new=_unlocked_lock()), \
patch('app.services.retry_service._download_and_reimport',
new=AsyncMock(return_value={"success": True, "message": "ok"})):

View File

@@ -0,0 +1,75 @@
"""Tests for _log_order_error_history — permanent audit trail."""
import os
import sys
import logging
import logging.handlers
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from app.services import sqlite_service
@pytest.fixture
def reset_logger(tmp_path, monkeypatch):
"""Redirect error history log into tmp_path for isolation."""
sqlite_service._error_history_logger = None
lg = logging.getLogger("sync_errors_history")
for h in list(lg.handlers):
lg.removeHandler(h)
logs_dir = tmp_path / "logs"
logs_dir.mkdir()
target = logs_dir / "sync_errors_history.log"
def fake_get_logger():
if sqlite_service._error_history_logger is not None:
return sqlite_service._error_history_logger
inner = logging.getLogger("sync_errors_history")
inner.setLevel(logging.INFO)
inner.propagate = False
handler = logging.handlers.RotatingFileHandler(
str(target), maxBytes=100 * 1024 * 1024, backupCount=12, encoding="utf-8"
)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
inner.addHandler(handler)
sqlite_service._error_history_logger = inner
return inner
monkeypatch.setattr(sqlite_service, "_get_error_history_logger", fake_get_logger)
yield target
sqlite_service._error_history_logger = None
lg = logging.getLogger("sync_errors_history")
for h in list(lg.handlers):
h.close()
lg.removeHandler(h)
@pytest.mark.unit
def test_log_order_error_history_writes_line(reset_logger):
sqlite_service._log_order_error_history("485224762", "UNIQUE constraint failed")
logging.shutdown()
content = reset_logger.read_text(encoding="utf-8")
assert "ORDER_FAIL 485224762" in content
assert "UNIQUE constraint failed" in content
@pytest.mark.unit
def test_log_order_error_history_appends(reset_logger):
sqlite_service._log_order_error_history("1", "err-a")
sqlite_service._log_order_error_history("2", "err-b")
sqlite_service._log_order_error_history("2", "err-b-retry")
logging.shutdown()
content = reset_logger.read_text(encoding="utf-8")
assert "ORDER_FAIL 1: err-a" in content
assert "ORDER_FAIL 2: err-b" in content
# Two entries for order 2 — append-only guarantee
assert content.count("ORDER_FAIL 2:") == 2
@pytest.mark.unit
def test_log_order_error_history_swallows_errors(monkeypatch):
"""Callable must never raise — caller is already in a degraded path."""
def boom():
raise RuntimeError("disk full")
monkeypatch.setattr(sqlite_service, "_get_error_history_logger", boom)
sqlite_service._log_order_error_history("X", "ignored")

View File

@@ -35,6 +35,7 @@ if _api_dir not in sys.path:
from app import database
from app.services import sqlite_service
from app.constants import OrderStatus
@pytest.fixture(autouse=True)
@@ -69,7 +70,7 @@ async def _seed_order(order_number="TEST-001"):
order_number=order_number,
order_date="2026-01-01",
customer_name="Test",
status="IMPORTED",
status=OrderStatus.IMPORTED.value,
)
@@ -192,5 +193,5 @@ async def test_mark_order_deleted_removes_items():
finally:
await db.close()
assert row is not None
assert row["status"] == "DELETED_IN_ROA"
assert row["status"] == OrderStatus.DELETED_IN_ROA.value
assert row["id_comanda"] is None

View File

@@ -0,0 +1,149 @@
"""Tests for _phase_wrap + escalation check in sync_service.
These cover:
- _record_phase_err persists a sync_phase_failures row
- _check_escalation returns None below threshold
- _check_escalation halts when a phase has failed 3 runs in a row
- run_sync short-circuits when escalation flags a phase
"""
import os
import sys
import sqlite3
import tempfile
import pytest
pytestmark = pytest.mark.unit
_tmpdir = tempfile.mkdtemp()
os.environ.setdefault("FORCE_THIN_MODE", "true")
os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_phase.db"))
os.environ.setdefault("ORACLE_DSN", "dummy")
os.environ.setdefault("ORACLE_USER", "dummy")
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _api_dir not in sys.path:
sys.path.insert(0, _api_dir)
from app import database
from app.services import sqlite_service, sync_service
@pytest.fixture(autouse=True)
async def _reset():
database.init_sqlite()
db = await sqlite_service.get_sqlite()
try:
await db.execute("DELETE FROM sync_phase_failures")
await db.execute("DELETE FROM sync_runs")
await db.commit()
finally:
await db.close()
yield
async def _make_run(run_id: str, offset: int = 0):
db = await sqlite_service.get_sqlite()
try:
await db.execute(
"INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')",
(run_id, f"{offset} seconds"),
)
await db.commit()
finally:
await db.close()
async def test_record_phase_err_inserts_row():
await _make_run("rec-1")
err = sqlite3.IntegrityError("simulated NOT NULL")
await sync_service._record_phase_err("rec-1", "price_sync", err)
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute(
"SELECT phase, error_summary FROM sync_phase_failures WHERE run_id = ?",
("rec-1",),
)
row = await cur.fetchone()
finally:
await db.close()
assert row is not None
assert row[0] == "price_sync"
assert "IntegrityError" in row[1]
async def test_check_escalation_below_threshold():
# 2 runs, each with invoice_check failure — below threshold.
for i in range(2):
await _make_run(f"run-{i}", offset=i)
await sqlite_service.record_phase_failure(f"run-{i}", "invoice_check", "err")
phase, counts = await sync_service._check_escalation()
assert phase is None
assert counts.get("invoice_check") == 2
async def test_check_escalation_hits_threshold():
for i in range(3):
await _make_run(f"run-{i}", offset=i)
await sqlite_service.record_phase_failure(f"run-{i}", "import_loop", "err")
phase, counts = await sync_service._check_escalation()
assert phase == "import_loop"
assert counts.get("import_loop") == 3
async def test_check_escalation_different_phases_dont_escalate():
# 3 runs, each failed on a different phase — no single phase hits 3.
phases = ["price_sync", "invoice_check", "anaf_backfill"]
for i, p in enumerate(phases):
await _make_run(f"run-{i}", offset=i)
await sqlite_service.record_phase_failure(f"run-{i}", p, "err")
phase, counts = await sync_service._check_escalation()
assert phase is None
assert len(counts) == 3
async def test_run_sync_short_circuits_on_escalation(monkeypatch):
"""With 3 consecutive price_sync failures, run_sync must halt without
touching gomag_client, order_reader, etc."""
for i in range(3):
await _make_run(f"prev-{i}", offset=i)
await sqlite_service.record_phase_failure(f"prev-{i}", "price_sync", "err")
# Sentinel: if sync proceeds to the download step, this will fire.
async def _boom(*args, **kwargs):
raise AssertionError("escalation should have halted before gomag download")
monkeypatch.setattr(sync_service.gomag_client, "download_orders", _boom)
result = await sync_service.run_sync(run_id="halt-test")
assert result["status"] == "halted_escalation"
assert "price_sync" in result["error"]
# sync_runs row should be persisted with halted_escalation status
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute(
"SELECT status, error_message FROM sync_runs WHERE run_id = ?", ("halt-test",)
)
row = await cur.fetchone()
finally:
await db.close()
assert row is not None
assert row[0] == "halted_escalation"
assert "ESCALATED" in row[1]
async def test_data_errors_tuple_shape():
"""Contract: DATA_ERRORS is a tuple covering the structural error types."""
assert sqlite3.IntegrityError in sync_service.DATA_ERRORS
assert ValueError in sync_service.DATA_ERRORS
assert TypeError in sync_service.DATA_ERRORS
assert UnicodeError in sync_service.DATA_ERRORS
# Must NOT include OperationalError — that halts the sync.
assert sqlite3.OperationalError not in sync_service.DATA_ERRORS

View File

@@ -36,6 +36,7 @@ import pytest_asyncio
from app.database import init_sqlite
from app.services import sqlite_service
from app.constants import OrderStatus
# Initialize SQLite once before any tests run
init_sqlite()
@@ -70,10 +71,10 @@ def seed_baseline_data():
# Add the first order (IMPORTED) with items
await sqlite_service.upsert_order(
"RUN001", "ORD001", "2025-01-15", "Test Client", "IMPORTED",
"RUN001", "ORD001", "2025-01-15", "Test Client", OrderStatus.IMPORTED.value,
id_comanda=100, id_partener=200, items_count=2
)
await sqlite_service.add_sync_run_order("RUN001", "ORD001", "IMPORTED")
await sqlite_service.add_sync_run_order("RUN001", "ORD001", OrderStatus.IMPORTED.value)
items = [
{
@@ -103,15 +104,15 @@ def seed_baseline_data():
# Add more orders for filter tests
await sqlite_service.upsert_order(
"RUN001", "ORD002", "2025-01-16", "Client 2", "SKIPPED",
"RUN001", "ORD002", "2025-01-16", "Client 2", OrderStatus.SKIPPED.value,
missing_skus=["SKU99"], items_count=1
)
await sqlite_service.add_sync_run_order("RUN001", "ORD002", "SKIPPED")
await sqlite_service.add_sync_run_order("RUN001", "ORD002", OrderStatus.SKIPPED.value)
await sqlite_service.upsert_order(
"RUN001", "ORD003", "2025-01-17", "Client 3", "ERROR",
"RUN001", "ORD003", "2025-01-17", "Client 3", OrderStatus.ERROR.value,
error_message="Test error", items_count=3
)
await sqlite_service.add_sync_run_order("RUN001", "ORD003", "ERROR")
await sqlite_service.add_sync_run_order("RUN001", "ORD003", OrderStatus.ERROR.value)
asyncio.run(_seed())
yield
@@ -212,7 +213,7 @@ async def test_get_order_detail_not_found():
async def test_get_order_detail_status():
"""Seeded ORD001 should have IMPORTED status."""
detail = await sqlite_service.get_order_detail("ORD001")
assert detail["order"]["status"] == "IMPORTED"
assert detail["order"]["status"] == OrderStatus.IMPORTED.value
# ---------------------------------------------------------------------------
@@ -232,7 +233,7 @@ async def test_get_run_orders_filtered_all():
@pytest.mark.asyncio
async def test_get_run_orders_filtered_imported():
"""Filter IMPORTED should return only ORD001."""
result = await sqlite_service.get_run_orders_filtered("RUN001", "IMPORTED", 1, 50)
result = await sqlite_service.get_run_orders_filtered("RUN001", OrderStatus.IMPORTED.value, 1, 50)
assert result["total"] == 1
assert result["orders"][0]["order_number"] == "ORD001"
@@ -240,7 +241,7 @@ async def test_get_run_orders_filtered_imported():
@pytest.mark.asyncio
async def test_get_run_orders_filtered_skipped():
"""Filter SKIPPED should return only ORD002."""
result = await sqlite_service.get_run_orders_filtered("RUN001", "SKIPPED", 1, 50)
result = await sqlite_service.get_run_orders_filtered("RUN001", OrderStatus.SKIPPED.value, 1, 50)
assert result["total"] == 1
assert result["orders"][0]["order_number"] == "ORD002"
@@ -248,7 +249,7 @@ async def test_get_run_orders_filtered_skipped():
@pytest.mark.asyncio
async def test_get_run_orders_filtered_error():
"""Filter ERROR should return only ORD003."""
result = await sqlite_service.get_run_orders_filtered("RUN001", "ERROR", 1, 50)
result = await sqlite_service.get_run_orders_filtered("RUN001", OrderStatus.ERROR.value, 1, 50)
assert result["total"] == 1
assert result["orders"][0]["order_number"] == "ORD003"
@@ -360,10 +361,10 @@ def test_api_sync_run_orders(client):
def test_api_sync_run_orders_filtered(client):
"""R1: Filtering by status=IMPORTED returns only IMPORTED orders."""
resp = client.get("/api/sync/run/RUN001/orders?status=IMPORTED")
resp = client.get(f"/api/sync/run/RUN001/orders?status={OrderStatus.IMPORTED.value}")
assert resp.status_code == 200
data = resp.json()
assert all(o["status"] == "IMPORTED" for o in data["orders"])
assert all(o["status"] == OrderStatus.IMPORTED.value for o in data["orders"])
def test_api_sync_run_orders_pagination_fields(client):

View File

@@ -0,0 +1,307 @@
"""Integration tests for hybrid save_orders_batch with per-order isolation.
Covers:
- Regression 485224762 (dup SKU in one order)
- Structural pre-flight → MALFORMED rows
- Batch failure → per-order fallback with SAVEPOINT
- Rollback-failure → commit-close-reconnect path
"""
import os
import sys
import sqlite3
import tempfile
import pytest
pytestmark = pytest.mark.unit
_tmpdir = tempfile.mkdtemp()
_sqlite_path = os.path.join(_tmpdir, "test_hybrid.db")
os.environ.setdefault("FORCE_THIN_MODE", "true")
os.environ.setdefault("SQLITE_DB_PATH", _sqlite_path)
os.environ.setdefault("ORACLE_DSN", "dummy")
os.environ.setdefault("ORACLE_USER", "dummy")
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _api_dir not in sys.path:
sys.path.insert(0, _api_dir)
from app import database
from app.services import sqlite_service
from app.constants import OrderStatus
@pytest.fixture(autouse=True)
async def _reset_db():
database.init_sqlite()
db = await sqlite_service.get_sqlite()
try:
await db.execute("DELETE FROM order_items")
await db.execute("DELETE FROM sync_run_orders")
await db.execute("DELETE FROM orders")
await db.execute("DELETE FROM sync_runs")
await db.execute("INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now'), 'running')", ("test-run",))
await db.commit()
finally:
await db.close()
yield
def _order(order_number, status=OrderStatus.IMPORTED.value, items=None, **overrides):
base = {
"sync_run_id": "test-run",
"order_number": order_number,
"order_date": "2026-04-22 10:00:00",
"customer_name": "Test Customer",
"status": status,
"status_at_run": status,
"items_count": len(items) if items else 0,
"items": items or [],
}
base.update(overrides)
return base
def _item(sku="SKU-A", qty=1, price=10.0):
return {
"sku": sku, "product_name": f"Product {sku}",
"quantity": qty, "price": price, "baseprice": price,
"vat": 19, "mapping_status": "direct", "codmat": None,
"id_articol": None, "cantitate_roa": None,
}
async def _orders_with_status(status):
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute("SELECT order_number FROM orders WHERE status = ?", (status,))
rows = await cur.fetchall()
return [r[0] for r in rows]
finally:
await db.close()
async def _items_of(order_number):
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute("SELECT sku, quantity FROM order_items WHERE order_number = ?", (order_number,))
rows = await cur.fetchall()
return [(r[0], r[1]) for r in rows]
finally:
await db.close()
# ── 1. Regression 485224762 — dup SKU on one order ──────────────
async def test_regression_dup_sku_485224762():
"""Dedup helper must let this order through; hybrid path must import it."""
orders = [
_order("485224762", items=[_item("SKU-X", qty=2), _item("SKU-X", qty=3)])
]
await sqlite_service.save_orders_batch(orders)
imported = await _orders_with_status(OrderStatus.IMPORTED.value)
assert "485224762" in imported
items = await _items_of("485224762")
assert len(items) == 1
assert items[0][0] == "SKU-X"
# Qty summed by _dedup_items_by_sku
assert items[0][1] == 5
# ── 2. Structural pre-flight → MALFORMED ────────────────────────
async def test_structural_fail_empty_items():
orders = [_order("MAL-1", items=[])]
await sqlite_service.save_orders_batch(orders)
mal = await _orders_with_status(OrderStatus.MALFORMED.value)
assert "MAL-1" in mal
async def test_structural_fail_mixed_batch():
orders = [
_order("GOOD-1", items=[_item()]),
_order("MAL-2", order_date="not-a-date", items=[_item()]),
_order("GOOD-2", items=[_item("SKU-B", qty=1)]),
]
await sqlite_service.save_orders_batch(orders)
assert set(await _orders_with_status(OrderStatus.IMPORTED.value)) == {"GOOD-1", "GOOD-2"}
assert await _orders_with_status(OrderStatus.MALFORMED.value) == ["MAL-2"]
async def test_malformed_error_message_persisted():
orders = [_order("MAL-3", order_date="", items=[_item()])]
await sqlite_service.save_orders_batch(orders)
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute("SELECT error_message FROM orders WHERE order_number = ?", ("MAL-3",))
row = await cur.fetchone()
assert row is not None
assert "INVALID_DATE" in row[0]
finally:
await db.close()
# ── 3. Runtime-fail mid-batch → per-order fallback ───────────────
async def test_runtime_failure_isolated_per_order(monkeypatch):
"""One order triggers IntegrityError on insert; rest still land."""
import aiosqlite
real_executemany = aiosqlite.core.Connection.executemany
real_execute = aiosqlite.core.Connection.execute
def _is_orders_insert(sql: str) -> bool:
s = sql.upper()
return "INTO ORDERS" in s and "ORDER_ITEMS" not in s and "SYNC_RUN_ORDERS" not in s
def _is_poison(row):
# row[0] = order_number, row[3] = status. Fail only when simulating
# the real runtime crash; let the MALFORMED fallback write succeed.
return row[0] == "POISON" and row[3] != OrderStatus.MALFORMED.value
async def flaky_executemany(self, sql, rows):
rows_list = list(rows)
if _is_orders_insert(sql) and any(_is_poison(r) for r in rows_list):
raise sqlite3.IntegrityError("simulated NOT NULL violation")
return await real_executemany(self, sql, rows_list)
async def flaky_execute(self, sql, params=None):
if params and _is_orders_insert(sql) and _is_poison(params):
raise sqlite3.IntegrityError("simulated NOT NULL violation per-order")
return await real_execute(self, sql, params) if params is not None else await real_execute(self, sql)
monkeypatch.setattr(aiosqlite.core.Connection, "executemany", flaky_executemany)
monkeypatch.setattr(aiosqlite.core.Connection, "execute", flaky_execute)
orders = [
_order("BATCH-1", items=[_item("SKU-1")]),
_order("POISON", items=[_item("SKU-P")]),
_order("BATCH-2", items=[_item("SKU-2")]),
]
await sqlite_service.save_orders_batch(orders)
imported = set(await _orders_with_status(OrderStatus.IMPORTED.value))
malformed = set(await _orders_with_status(OrderStatus.MALFORMED.value))
# BATCH-1 and BATCH-2 land as IMPORTED via per-order SAVEPOINT path.
# POISON gets tagged MALFORMED because its single-order insert also raises.
assert {"BATCH-1", "BATCH-2"}.issubset(imported)
assert "POISON" in malformed
# ── 4. Empty batch is a no-op ───────────────────────────────────
async def test_empty_batch_noop():
await sqlite_service.save_orders_batch([])
assert await _orders_with_status(OrderStatus.IMPORTED.value) == []
# ── 5. Caller dict not mutated on MALFORMED ─────────────────────
async def test_caller_dict_not_mutated():
raw = _order("OK-1", items=[]) # structural-fail
snapshot = dict(raw)
await sqlite_service.save_orders_batch([raw])
# Caller's dict should be untouched
assert raw["status"] == snapshot["status"]
assert raw.get("error_message") == snapshot.get("error_message")
assert raw["items"] == snapshot["items"]
# ── 6. Reconnect path preserves prior work ──────────────────────
async def test_reconnect_preserves_malformed_and_continues(monkeypatch):
"""If ROLLBACK TO SAVEPOINT itself fails, we commit, reconnect, keep going.
We can't easily simulate the exact OperationalError, so we verify the
helper is wired by inspecting its behaviour on a live connection.
"""
db = await sqlite_service.get_sqlite()
try:
# Insert a MALFORMED row directly, then invoke _safe_reconnect.
await db.execute(
"INSERT OR REPLACE INTO orders (order_number, status, order_date) VALUES (?, ?, ?)",
("BEFORE-RECON", OrderStatus.MALFORMED.value, "2026-04-22"),
)
fresh = await sqlite_service._safe_reconnect(db)
assert fresh is not None
# Previous insert must be durable on fresh connection.
cur = await fresh.execute(
"SELECT status FROM orders WHERE order_number = ?", ("BEFORE-RECON",)
)
row = await cur.fetchone()
assert row is not None
assert row[0] == OrderStatus.MALFORMED.value
await fresh.close()
finally:
# fresh was already closed; nothing else to do
pass
# ── 7. _safe_upsert_order_items — success + savepoint rollback ──
async def test_safe_upsert_items_happy_path():
# Seed parent order so FK context is valid.
await sqlite_service.save_orders_batch([_order("SAFE-1", items=[])])
db = await sqlite_service.get_sqlite()
try:
ok = await sqlite_service._safe_upsert_order_items(
db, "SAFE-1", [_item("SKU-H", qty=2)]
)
await db.commit()
finally:
await db.close()
assert ok is True
items = await _items_of("SAFE-1")
assert items == [("SKU-H", 2)]
async def test_safe_upsert_items_rolls_back_and_marks_malformed(monkeypatch):
await sqlite_service.save_orders_batch([_order("SAFE-2", items=[_item("PRE", qty=1)])])
import aiosqlite
real_executemany = aiosqlite.core.Connection.executemany
async def boom_on_items(self, sql, rows):
if "INSERT INTO order_items" in sql.upper().replace("\n", " ").replace(" ", " ").upper() or "ORDER_ITEMS" in sql.upper():
raise sqlite3.IntegrityError("simulated items insert crash")
return await real_executemany(self, sql, rows)
monkeypatch.setattr(aiosqlite.core.Connection, "executemany", boom_on_items)
db = await sqlite_service.get_sqlite()
try:
ok = await sqlite_service._safe_upsert_order_items(
db, "SAFE-2", [_item("SKU-BAD", qty=1)]
)
await db.commit()
finally:
await db.close()
assert ok is False
# Parent order was tagged MALFORMED, pre-existing items were wiped by DELETE
# (which ran inside the rolled-back savepoint, so they should survive).
malformed = await _orders_with_status(OrderStatus.MALFORMED.value)
assert "SAFE-2" in malformed
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute(
"SELECT error_message FROM orders WHERE order_number = ?", ("SAFE-2",)
)
row = await cur.fetchone()
assert row is not None and "ITEMS_FAIL" in row[0]
finally:
await db.close()

View File

@@ -0,0 +1,110 @@
"""Tests for GET /api/sync/health."""
import os
import sys
import tempfile
import pytest
pytestmark = pytest.mark.unit
_tmpdir = tempfile.mkdtemp()
os.environ.setdefault("FORCE_THIN_MODE", "true")
os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_health.db"))
os.environ.setdefault("ORACLE_DSN", "dummy")
os.environ.setdefault("ORACLE_USER", "dummy")
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _api_dir not in sys.path:
sys.path.insert(0, _api_dir)
from fastapi.testclient import TestClient
from app import database
from app.services import sqlite_service
from app.main import app
client = TestClient(app)
@pytest.fixture(autouse=True)
async def _reset():
database.init_sqlite()
db = await sqlite_service.get_sqlite()
try:
await db.execute("DELETE FROM sync_phase_failures")
await db.execute("DELETE FROM sync_runs")
await db.commit()
finally:
await db.close()
yield
async def _make_run(run_id: str, status: str = "completed", offset: int = 0,
error_message: str | None = None):
db = await sqlite_service.get_sqlite()
try:
await db.execute(
"""INSERT INTO sync_runs (run_id, started_at, status, error_message)
VALUES (?, datetime('now', ?), ?, ?)""",
(run_id, f"{offset} seconds", status, error_message),
)
await db.commit()
finally:
await db.close()
async def test_health_empty_state():
r = client.get("/api/sync/health")
assert r.status_code == 200
data = r.json()
assert data["last_sync_at"] is None
assert data["last_sync_status"] is None
assert data["recent_phase_failures"] == {}
assert data["escalation_phase"] is None
assert data["is_healthy"] is True
async def test_health_completed_is_healthy():
await _make_run("ok-1", status="completed")
r = client.get("/api/sync/health")
data = r.json()
assert data["last_sync_status"] == "completed"
assert data["is_healthy"] is True
async def test_health_reports_last_failure():
await _make_run("fail-1", status="failed", error_message="boom")
r = client.get("/api/sync/health")
data = r.json()
assert data["last_sync_status"] == "failed"
assert data["last_halt_reason"] == "boom"
assert data["is_healthy"] is False
async def test_health_detects_escalation():
# 3 consecutive runs each with price_sync failure → escalation flagged.
for i in range(3):
run_id = f"esc-{i}"
await _make_run(run_id, status="failed", offset=i,
error_message="ESCALATED: phase price_sync failed 3 consecutive runs")
await sqlite_service.record_phase_failure(run_id, "price_sync", "IntegrityError: X")
r = client.get("/api/sync/health")
data = r.json()
assert data["escalation_phase"] == "price_sync"
assert data["is_healthy"] is False
assert data["recent_phase_failures"]["price_sync"] == 3
assert "ESCALATED" in (data["last_halt_reason"] or "")
async def test_health_one_phase_failure_still_warning_not_healthy():
await _make_run("recent-fail", status="completed")
await sqlite_service.record_phase_failure("recent-fail", "invoice_check", "err")
r = client.get("/api/sync/health")
data = r.json()
# 1 recent phase failure → is_healthy stays True (<=1 tolerance); healthy
assert data["is_healthy"] is True
assert data["recent_phase_failures"]["invoice_check"] == 1

View File

@@ -0,0 +1,121 @@
"""Tests for sync_phase_failures table + helpers."""
import os
import sys
import tempfile
import pytest
pytestmark = pytest.mark.unit
_tmpdir = tempfile.mkdtemp()
os.environ.setdefault("FORCE_THIN_MODE", "true")
os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_spf.db"))
os.environ.setdefault("ORACLE_DSN", "dummy")
os.environ.setdefault("ORACLE_USER", "dummy")
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _api_dir not in sys.path:
sys.path.insert(0, _api_dir)
from app import database
from app.services import sqlite_service
@pytest.fixture(autouse=True)
async def _reset():
database.init_sqlite()
db = await sqlite_service.get_sqlite()
try:
await db.execute("DELETE FROM sync_phase_failures")
await db.execute("DELETE FROM sync_runs")
await db.commit()
finally:
await db.close()
yield
async def _make_run(run_id: str, offset_seconds: int = 0):
db = await sqlite_service.get_sqlite()
try:
await db.execute(
"INSERT INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now', ?), 'running')",
(run_id, f"{offset_seconds} seconds"),
)
await db.commit()
finally:
await db.close()
async def test_table_created_on_init():
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='sync_phase_failures'"
)
row = await cur.fetchone()
assert row is not None
finally:
await db.close()
async def test_record_and_read_phase_failures():
await _make_run("run-1")
await _make_run("run-2", offset_seconds=1)
await sqlite_service.record_phase_failure("run-1", "price_sync", "IntegrityError: X")
await sqlite_service.record_phase_failure("run-2", "price_sync", "IntegrityError: Y")
counts = await sqlite_service.get_recent_phase_failures(limit=3)
assert counts.get("price_sync") == 2
async def test_get_recent_limit_respected():
# 5 runs, each with a price_sync failure. Limit=3 should only count the latest 3.
for i in range(5):
run_id = f"run-{i}"
await _make_run(run_id, offset_seconds=i)
await sqlite_service.record_phase_failure(run_id, "price_sync", "fail")
counts = await sqlite_service.get_recent_phase_failures(limit=3)
assert counts["price_sync"] == 3
async def test_record_prunes_to_100_runs():
# Insert 105 runs each with a failure → table should end at <=100 rows after prune.
for i in range(105):
run_id = f"R{i:03d}"
await _make_run(run_id, offset_seconds=i)
await sqlite_service.record_phase_failure(run_id, "import_loop", "x")
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute("SELECT COUNT(*) FROM sync_phase_failures")
(total,) = await cur.fetchone()
finally:
await db.close()
assert total <= 100
async def test_empty_phase_failures_returns_empty_dict():
counts = await sqlite_service.get_recent_phase_failures(limit=3)
assert counts == {}
async def test_record_phase_failure_idempotent_per_run_phase():
"""PRIMARY KEY (run_id, phase) → second insert same run+phase updates in place."""
await _make_run("run-idem")
await sqlite_service.record_phase_failure("run-idem", "invoice_check", "first")
await sqlite_service.record_phase_failure("run-idem", "invoice_check", "second")
db = await sqlite_service.get_sqlite()
try:
cur = await db.execute(
"SELECT COUNT(*), MAX(error_summary) FROM sync_phase_failures WHERE run_id=? AND phase=?",
("run-idem", "invoice_check"),
)
(count, latest) = await cur.fetchone()
finally:
await db.close()
assert count == 1
assert latest == "second"

View File

@@ -0,0 +1,166 @@
"""Unit tests for validation_service.validate_structural().
Structural pre-flight validator — only catches malformed payloads that
would crash downstream inserts. Does NOT check SKU existence (handled
by validate_skus) or duplicate SKUs (handled by _dedup_items_by_sku).
"""
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from app.services.validation_service import validate_structural
def _valid_order(**overrides):
base = {
"order_number": "123456",
"order_date": "2026-04-22 10:00:00",
"items": [{"sku": "ABC", "quantity": 2, "price": 15.50}],
}
base.update(overrides)
return base
@pytest.mark.unit
def test_valid_order_passes():
ok, err_type, err_msg = validate_structural(_valid_order())
assert ok is True
assert err_type is None
assert err_msg is None
@pytest.mark.unit
def test_missing_order_number():
ok, err_type, _ = validate_structural(_valid_order(order_number=""))
assert ok is False
assert err_type == "MISSING_FIELD"
ok, err_type, _ = validate_structural(_valid_order(order_number=None))
assert ok is False
assert err_type == "MISSING_FIELD"
@pytest.mark.unit
def test_non_dict_order():
ok, err_type, _ = validate_structural("not a dict")
assert ok is False
assert err_type == "MISSING_FIELD"
@pytest.mark.unit
def test_invalid_date_unparseable():
ok, err_type, _ = validate_structural(_valid_order(order_date="not-a-date"))
assert ok is False
assert err_type == "INVALID_DATE"
@pytest.mark.unit
def test_invalid_date_missing():
ok, err_type, _ = validate_structural(_valid_order(order_date=None))
assert ok is False
assert err_type == "INVALID_DATE"
@pytest.mark.unit
def test_date_iso_format_passes():
ok, _, _ = validate_structural(_valid_order(order_date="2026-04-22T10:00:00"))
assert ok is True
@pytest.mark.unit
def test_empty_items():
ok, err_type, _ = validate_structural(_valid_order(items=[]))
assert ok is False
assert err_type == "EMPTY_ITEMS"
ok, err_type, _ = validate_structural(_valid_order(items=None))
assert ok is False
assert err_type == "EMPTY_ITEMS"
@pytest.mark.unit
def test_items_not_list():
ok, err_type, _ = validate_structural(_valid_order(items="ABC"))
assert ok is False
assert err_type == "EMPTY_ITEMS"
@pytest.mark.unit
def test_item_not_dict():
ok, err_type, _ = validate_structural(_valid_order(items=["just-a-string"]))
assert ok is False
assert err_type == "EMPTY_ITEMS"
@pytest.mark.unit
def test_invalid_quantity_zero():
ok, err_type, _ = validate_structural(
_valid_order(items=[{"sku": "A", "quantity": 0, "price": 1}])
)
assert ok is False
assert err_type == "INVALID_QUANTITY"
@pytest.mark.unit
def test_invalid_quantity_negative():
ok, err_type, _ = validate_structural(
_valid_order(items=[{"sku": "A", "quantity": -3, "price": 1}])
)
assert ok is False
assert err_type == "INVALID_QUANTITY"
@pytest.mark.unit
def test_invalid_quantity_non_numeric():
ok, err_type, _ = validate_structural(
_valid_order(items=[{"sku": "A", "quantity": "abc", "price": 1}])
)
assert ok is False
assert err_type == "INVALID_QUANTITY"
@pytest.mark.unit
def test_invalid_quantity_missing():
ok, err_type, _ = validate_structural(
_valid_order(items=[{"sku": "A", "price": 1}])
)
assert ok is False
assert err_type == "INVALID_QUANTITY"
@pytest.mark.unit
def test_invalid_price_non_numeric():
ok, err_type, _ = validate_structural(
_valid_order(items=[{"sku": "A", "quantity": 1, "price": "NaN-text"}])
)
assert ok is False
assert err_type == "INVALID_PRICE"
@pytest.mark.unit
def test_invalid_price_missing():
ok, err_type, _ = validate_structural(
_valid_order(items=[{"sku": "A", "quantity": 1}])
)
assert ok is False
assert err_type == "INVALID_PRICE"
@pytest.mark.unit
def test_price_zero_allowed():
"""Complex sets can legitimately have price=0 on one leg."""
ok, _, _ = validate_structural(
_valid_order(items=[{"sku": "A", "quantity": 1, "price": 0}])
)
assert ok is True
@pytest.mark.unit
def test_sku_null_passes_structural():
"""SKU validation is handled downstream, NOT here."""
ok, _, _ = validate_structural(
_valid_order(items=[{"sku": None, "quantity": 1, "price": 1}])
)
assert ok is True