Files
gomag-vending/api/app/services/sync_service.py
Marius Mutu 5f8b9b6003 feat(dashboard): redesign UI with smart polling, unified sync card, filter bar
Replace SSE with smart polling (30s idle / 3s when running). Unify sync
panel into single two-row card with live progress text. Add unified filter
bar (period dropdown, status pills, search) with period-total counts.
Add Client/Cont tooltip for different shipping/billing persons. Add SKU
mappings pct_total badges + complete/incomplete filter + 409 duplicate
check. Add missing SKUs search + rescan progress UX. Migrate SQLite
orders schema (shipping_name, billing_name, payment_method,
delivery_method). Fix JSON_OUTPUT_DIR path for server running from
project root. Fix pagination controls showing top+bottom with per-page
selector (25/50/100/250).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 17:55:36 +02:00

411 lines
19 KiB
Python

import asyncio
import json
import logging
import uuid
from datetime import datetime
from . import order_reader, validation_service, import_service, sqlite_service
from ..config import settings
logger = logging.getLogger(__name__)
# Sync state
_sync_lock = asyncio.Lock()
_current_sync = None # dict with run_id, status, progress info
# In-memory text log buffer per run
_run_logs: dict[str, list[str]] = {}
def _log_line(run_id: str, message: str):
"""Append a timestamped line to the in-memory log buffer."""
if run_id not in _run_logs:
_run_logs[run_id] = []
ts = datetime.now().strftime("%H:%M:%S")
_run_logs[run_id].append(f"[{ts}] {message}")
def get_run_text_log(run_id: str) -> str | None:
"""Return the accumulated text log for a run, or None if not found."""
lines = _run_logs.get(run_id)
if lines is None:
return None
return "\n".join(lines)
def _update_progress(phase: str, phase_text: str, current: int = 0, total: int = 0,
counts: dict = None):
"""Update _current_sync with progress details for polling."""
global _current_sync
if _current_sync is None:
return
_current_sync["phase"] = phase
_current_sync["phase_text"] = phase_text
_current_sync["progress_current"] = current
_current_sync["progress_total"] = total
_current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0}
async def get_sync_status():
"""Get current sync status."""
if _current_sync:
return {**_current_sync}
return {"status": "idle"}
async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict:
"""Prepare a sync run - creates run_id and sets initial state.
Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running.
"""
global _current_sync
if _sync_lock.locked():
return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None}
run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
_current_sync = {
"run_id": run_id,
"status": "running",
"started_at": datetime.now().isoformat(),
"finished_at": None,
"phase": "starting",
"phase_text": "Starting...",
"progress_current": 0,
"progress_total": 0,
"counts": {"imported": 0, "skipped": 0, "errors": 0},
}
return {"run_id": run_id, "status": "starting"}
async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict:
"""Run a full sync cycle. Returns summary dict."""
global _current_sync
if _sync_lock.locked():
return {"error": "Sync already running"}
async with _sync_lock:
# Use provided run_id or generate one
if not run_id:
run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
_current_sync = {
"run_id": run_id,
"status": "running",
"started_at": datetime.now().isoformat(),
"finished_at": None,
"phase": "reading",
"phase_text": "Reading JSON files...",
"progress_current": 0,
"progress_total": 0,
"counts": {"imported": 0, "skipped": 0, "errors": 0},
}
_update_progress("reading", "Reading JSON files...")
started_dt = datetime.now()
_run_logs[run_id] = [
f"=== Sync Run {run_id} ===",
f"Inceput: {started_dt.strftime('%d.%m.%Y %H:%M:%S')}",
""
]
_log_line(run_id, "Citire fisiere JSON...")
try:
# Step 1: Read orders and sort chronologically (oldest first - R3)
orders, json_count = order_reader.read_json_orders()
orders.sort(key=lambda o: o.date or '')
await sqlite_service.create_sync_run(run_id, json_count)
_update_progress("reading", f"Found {len(orders)} orders in {json_count} files", 0, len(orders))
_log_line(run_id, f"Gasite {len(orders)} comenzi in {json_count} fisiere")
# Populate web_products catalog from all orders (R4)
for order in orders:
for item in order.items:
if item.sku and item.name:
await sqlite_service.upsert_web_product(item.sku, item.name)
if not orders:
_log_line(run_id, "Nicio comanda gasita.")
await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0)
_update_progress("completed", "No orders found")
summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count}
return summary
_update_progress("validation", f"Validating {len(orders)} orders...", 0, len(orders))
# Step 2a: Find new orders (not yet in Oracle)
all_order_numbers = [o.number for o in orders]
new_orders = await asyncio.to_thread(
validation_service.find_new_orders, all_order_numbers
)
# Step 2b: Validate SKUs (blocking Oracle call -> run in thread)
all_skus = order_reader.get_all_skus(orders)
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus)
importable, skipped = validation_service.classify_orders(orders, validation)
_update_progress("validation", f"{len(importable)} importable, {len(skipped)} skipped (missing SKUs)",
0, len(importable))
_log_line(run_id, f"Validare SKU-uri: {len(importable)} importabile, {len(skipped)} nemapate")
# Step 2c: Build SKU context from skipped orders
sku_context = {} # {sku: {"orders": [], "customers": []}}
for order, missing_skus_list in skipped:
customer = order.billing.company_name or \
f"{order.billing.firstname} {order.billing.lastname}"
for sku in missing_skus_list:
if sku not in sku_context:
sku_context[sku] = {"orders": [], "customers": []}
if order.number not in sku_context[sku]["orders"]:
sku_context[sku]["orders"].append(order.number)
if customer not in sku_context[sku]["customers"]:
sku_context[sku]["customers"].append(customer)
# Track missing SKUs with context
for sku in validation["missing"]:
product_name = ""
for order in orders:
for item in order.items:
if item.sku == sku:
product_name = item.name
break
if product_name:
break
ctx = sku_context.get(sku, {})
await sqlite_service.track_missing_sku(
sku, product_name,
order_count=len(ctx.get("orders", [])),
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
)
# Step 2d: Pre-validate prices for importable articles
id_pol = id_pol or settings.ID_POL
id_sectie = id_sectie or settings.ID_SECTIE
logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
_log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
if id_pol and importable:
_update_progress("validation", "Validating prices...", 0, len(importable))
_log_line(run_id, "Validare preturi...")
# Gather all CODMATs from importable orders
all_codmats = set()
for order in importable:
for item in order.items:
if item.sku in validation["mapped"]:
# Mapped SKUs resolve to codmat via ARTICOLE_TERTI (handled by import)
pass
elif item.sku in validation["direct"]:
all_codmats.add(item.sku)
# For mapped SKUs, we'd need the ARTICOLE_TERTI lookup - direct SKUs = codmat
if all_codmats:
price_result = await asyncio.to_thread(
validation_service.validate_prices, all_codmats, id_pol
)
if price_result["missing_price"]:
logger.info(
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
f"direct articles in policy {id_pol}"
)
await asyncio.to_thread(
validation_service.ensure_prices,
price_result["missing_price"], id_pol
)
# Step 3: Record skipped orders + store items
skipped_count = 0
for order, missing_skus in skipped:
skipped_count += 1
# Derive shipping / billing names
shipping_name = ""
if order.shipping:
shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip()
billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip()
if not shipping_name:
shipping_name = billing_name
customer = shipping_name or order.billing.company_name or billing_name
payment_method = getattr(order, 'payment_name', None) or None
delivery_method = getattr(order, 'delivery_name', None) or None
await sqlite_service.upsert_order(
sync_run_id=run_id,
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="SKIPPED",
missing_skus=missing_skus,
items_count=len(order.items),
shipping_name=shipping_name,
billing_name=billing_name,
payment_method=payment_method,
delivery_method=delivery_method,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "SKIPPED")
# Store order items with mapping status (R9)
order_items_data = []
for item in order.items:
ms = "missing" if item.sku in validation["missing"] else \
"mapped" if item.sku in validation["mapped"] else "direct"
order_items_data.append({
"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price, "vat": item.vat,
"mapping_status": ms, "codmat": None, "id_articol": None,
"cantitate_roa": None
})
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
_update_progress("skipped", f"Skipped {skipped_count}/{len(skipped)}: #{order.number} {customer}",
0, len(importable),
{"imported": 0, "skipped": skipped_count, "errors": 0})
# Step 4: Import valid orders
imported_count = 0
error_count = 0
for i, order in enumerate(importable):
# Derive shipping / billing names
shipping_name = ""
if order.shipping:
shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip()
billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip()
if not shipping_name:
shipping_name = billing_name
customer = shipping_name or order.billing.company_name or billing_name
payment_method = getattr(order, 'payment_name', None) or None
delivery_method = getattr(order, 'delivery_name', None) or None
_update_progress("import",
f"Import {i+1}/{len(importable)}: #{order.number} {customer}",
i + 1, len(importable),
{"imported": imported_count, "skipped": len(skipped), "errors": error_count})
result = await asyncio.to_thread(
import_service.import_single_order,
order, id_pol=id_pol, id_sectie=id_sectie
)
# Build order items data for storage (R9)
order_items_data = []
for item in order.items:
ms = "mapped" if item.sku in validation["mapped"] else "direct"
order_items_data.append({
"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price, "vat": item.vat,
"mapping_status": ms, "codmat": None, "id_articol": None,
"cantitate_roa": None
})
if result["success"]:
imported_count += 1
await sqlite_service.upsert_order(
sync_run_id=run_id,
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="IMPORTED",
id_comanda=result["id_comanda"],
id_partener=result["id_partener"],
items_count=len(order.items),
shipping_name=shipping_name,
billing_name=billing_name,
payment_method=payment_method,
delivery_method=delivery_method,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "IMPORTED")
# Store ROA address IDs (R9)
await sqlite_service.update_import_order_addresses(
order.number,
id_adresa_facturare=result.get("id_adresa_facturare"),
id_adresa_livrare=result.get("id_adresa_livrare")
)
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → IMPORTAT (ID: {result['id_comanda']})")
else:
error_count += 1
await sqlite_service.upsert_order(
sync_run_id=run_id,
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="ERROR",
id_partener=result.get("id_partener"),
error_message=result["error"],
items_count=len(order.items),
shipping_name=shipping_name,
billing_name=billing_name,
payment_method=payment_method,
delivery_method=delivery_method,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "ERROR")
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → EROARE: {result['error']}")
# Safety: stop if too many errors
if error_count > 10:
logger.warning("Too many errors, stopping sync")
break
# Step 5: Update sync run
status = "completed" if error_count <= 10 else "failed"
await sqlite_service.update_sync_run(
run_id, status, len(orders), imported_count, len(skipped), error_count
)
summary = {
"run_id": run_id,
"status": status,
"json_files": json_count,
"total_orders": len(orders),
"new_orders": len(new_orders),
"imported": imported_count,
"skipped": len(skipped),
"errors": error_count,
"missing_skus": len(validation["missing"])
}
_update_progress("completed",
f"Completed: {imported_count} imported, {len(skipped)} skipped, {error_count} errors",
len(importable), len(importable),
{"imported": imported_count, "skipped": len(skipped), "errors": error_count})
if _current_sync:
_current_sync["status"] = status
_current_sync["finished_at"] = datetime.now().isoformat()
logger.info(
f"Sync {run_id} completed: {imported_count} imported, "
f"{len(skipped)} skipped, {error_count} errors"
)
duration = (datetime.now() - started_dt).total_seconds()
_log_line(run_id, "")
_run_logs[run_id].append(f"Finalizat: {imported_count} importate, {len(skipped)} nemapate, {error_count} erori din {len(orders)} comenzi | Durata: {int(duration)}s")
return summary
except Exception as e:
logger.error(f"Sync {run_id} failed: {e}")
_log_line(run_id, f"EROARE FATALA: {e}")
await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e))
if _current_sync:
_current_sync["status"] = "failed"
_current_sync["finished_at"] = datetime.now().isoformat()
_current_sync["error"] = str(e)
return {"run_id": run_id, "status": "failed", "error": str(e)}
finally:
# Keep _current_sync for 10 seconds so status endpoint can show final result
async def _clear_current_sync():
await asyncio.sleep(10)
global _current_sync
_current_sync = None
asyncio.ensure_future(_clear_current_sync())
async def _clear_run_logs():
await asyncio.sleep(300) # 5 minutes
_run_logs.pop(run_id, None)
asyncio.ensure_future(_clear_run_logs())
def stop_sync():
"""Signal sync to stop. Currently sync runs to completion."""
# For now, sync runs are not cancellable mid-flight.
# Future: use an asyncio.Event for cooperative cancellation.
pass