- Add SSE event bus in sync_service (subscribe/unsubscribe/_emit) - Add GET /api/sync/stream SSE endpoint for real-time sync progress - Rewrite logs.html: unified runs table + live feed + summary + filters - Rewrite logs.js: SSE EventSource client, run selection, pagination - Dashboard: clickable runs navigate to /logs?run=, sync started banner - Remove "Import Comenzi" nav item, delete sync_detail.html - Add error_message column to sync_runs table with migration - Fix: export TNS_ADMIN as OS env var so oracledb finds tnsnames.ora - Fix: use get_oracle_connection() instead of direct pool.acquire() - Fix: CRM_POLITICI_PRET_ART INSERT to match actual table schema Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
300 lines
13 KiB
Python
300 lines
13 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
from . import order_reader, validation_service, import_service, sqlite_service
|
|
from ..config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Sync state
|
|
_sync_lock = asyncio.Lock()
|
|
_current_sync = None # dict with run_id, status, progress info
|
|
|
|
# SSE subscriber system
|
|
_subscribers: list[asyncio.Queue] = []
|
|
|
|
|
|
def subscribe() -> asyncio.Queue:
|
|
"""Subscribe to sync events. Returns a queue that will receive event dicts."""
|
|
q = asyncio.Queue()
|
|
_subscribers.append(q)
|
|
return q
|
|
|
|
|
|
def unsubscribe(q: asyncio.Queue):
|
|
"""Unsubscribe from sync events."""
|
|
try:
|
|
_subscribers.remove(q)
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
async def _emit(event: dict):
|
|
"""Push an event to all subscriber queues."""
|
|
for q in _subscribers:
|
|
try:
|
|
q.put_nowait(event)
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
|
|
async def get_sync_status():
|
|
"""Get current sync status."""
|
|
if _current_sync:
|
|
return {**_current_sync}
|
|
return {"status": "idle"}
|
|
|
|
|
|
async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict:
|
|
"""Prepare a sync run - creates run_id and sets initial state.
|
|
Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running.
|
|
"""
|
|
global _current_sync
|
|
if _sync_lock.locked():
|
|
return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None}
|
|
|
|
run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
|
|
_current_sync = {
|
|
"run_id": run_id,
|
|
"status": "running",
|
|
"started_at": datetime.now().isoformat(),
|
|
"progress": "Starting..."
|
|
}
|
|
return {"run_id": run_id, "status": "starting"}
|
|
|
|
|
|
async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict:
|
|
"""Run a full sync cycle. Returns summary dict."""
|
|
global _current_sync
|
|
|
|
if _sync_lock.locked():
|
|
return {"error": "Sync already running"}
|
|
|
|
async with _sync_lock:
|
|
# Use provided run_id or generate one
|
|
if not run_id:
|
|
run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
|
|
_current_sync = {
|
|
"run_id": run_id,
|
|
"status": "running",
|
|
"started_at": datetime.now().isoformat(),
|
|
"progress": "Reading JSON files..."
|
|
}
|
|
|
|
_current_sync["progress"] = "Reading JSON files..."
|
|
await _emit({"type": "phase", "run_id": run_id, "message": "Reading JSON files..."})
|
|
|
|
try:
|
|
# Step 1: Read orders
|
|
orders, json_count = order_reader.read_json_orders()
|
|
await sqlite_service.create_sync_run(run_id, json_count)
|
|
await _emit({"type": "phase", "run_id": run_id, "message": f"Found {len(orders)} orders in {json_count} files"})
|
|
|
|
if not orders:
|
|
await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0)
|
|
summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count}
|
|
await _emit({"type": "completed", "run_id": run_id, "summary": summary})
|
|
return summary
|
|
|
|
_current_sync["progress"] = f"Validating {len(orders)} orders..."
|
|
await _emit({"type": "phase", "run_id": run_id, "message": f"Validating {len(orders)} orders..."})
|
|
|
|
# Step 2a: Find new orders (not yet in Oracle)
|
|
all_order_numbers = [o.number for o in orders]
|
|
new_orders = await asyncio.to_thread(
|
|
validation_service.find_new_orders, all_order_numbers
|
|
)
|
|
|
|
# Step 2b: Validate SKUs (blocking Oracle call -> run in thread)
|
|
all_skus = order_reader.get_all_skus(orders)
|
|
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus)
|
|
importable, skipped = validation_service.classify_orders(orders, validation)
|
|
|
|
await _emit({"type": "phase", "run_id": run_id, "message": f"{len(importable)} importable, {len(skipped)} skipped (missing SKUs)"})
|
|
|
|
# Step 2c: Build SKU context from skipped orders
|
|
sku_context = {} # {sku: {"orders": [], "customers": []}}
|
|
for order, missing_skus_list in skipped:
|
|
customer = order.billing.company_name or \
|
|
f"{order.billing.firstname} {order.billing.lastname}"
|
|
for sku in missing_skus_list:
|
|
if sku not in sku_context:
|
|
sku_context[sku] = {"orders": [], "customers": []}
|
|
if order.number not in sku_context[sku]["orders"]:
|
|
sku_context[sku]["orders"].append(order.number)
|
|
if customer not in sku_context[sku]["customers"]:
|
|
sku_context[sku]["customers"].append(customer)
|
|
|
|
# Track missing SKUs with context
|
|
for sku in validation["missing"]:
|
|
product_name = ""
|
|
for order in orders:
|
|
for item in order.items:
|
|
if item.sku == sku:
|
|
product_name = item.name
|
|
break
|
|
if product_name:
|
|
break
|
|
ctx = sku_context.get(sku, {})
|
|
await sqlite_service.track_missing_sku(
|
|
sku, product_name,
|
|
order_count=len(ctx.get("orders", [])),
|
|
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
|
|
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
|
|
)
|
|
|
|
# Step 2d: Pre-validate prices for importable articles
|
|
id_pol = id_pol or settings.ID_POL
|
|
if id_pol and importable:
|
|
_current_sync["progress"] = "Validating prices..."
|
|
await _emit({"type": "phase", "run_id": run_id, "message": "Validating prices..."})
|
|
# Gather all CODMATs from importable orders
|
|
all_codmats = set()
|
|
for order in importable:
|
|
for item in order.items:
|
|
if item.sku in validation["mapped"]:
|
|
# Mapped SKUs resolve to codmat via ARTICOLE_TERTI (handled by import)
|
|
pass
|
|
elif item.sku in validation["direct"]:
|
|
all_codmats.add(item.sku)
|
|
# For mapped SKUs, we'd need the ARTICOLE_TERTI lookup - direct SKUs = codmat
|
|
if all_codmats:
|
|
price_result = await asyncio.to_thread(
|
|
validation_service.validate_prices, all_codmats, id_pol
|
|
)
|
|
if price_result["missing_price"]:
|
|
logger.info(
|
|
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
|
|
f"direct articles in policy {id_pol}"
|
|
)
|
|
await asyncio.to_thread(
|
|
validation_service.ensure_prices,
|
|
price_result["missing_price"], id_pol
|
|
)
|
|
|
|
# Step 3: Record skipped orders + emit events
|
|
for order, missing_skus in skipped:
|
|
customer = order.billing.company_name or \
|
|
f"{order.billing.firstname} {order.billing.lastname}"
|
|
await sqlite_service.add_import_order(
|
|
sync_run_id=run_id,
|
|
order_number=order.number,
|
|
order_date=order.date,
|
|
customer_name=customer,
|
|
status="SKIPPED",
|
|
missing_skus=missing_skus,
|
|
items_count=len(order.items)
|
|
)
|
|
await _emit({
|
|
"type": "order_result", "run_id": run_id,
|
|
"order_number": order.number, "customer_name": customer,
|
|
"status": "SKIPPED", "missing_skus": missing_skus,
|
|
"items_count": len(order.items), "progress": f"0/{len(importable)}"
|
|
})
|
|
|
|
# Step 4: Import valid orders
|
|
imported_count = 0
|
|
error_count = 0
|
|
|
|
for i, order in enumerate(importable):
|
|
progress_str = f"{i+1}/{len(importable)}"
|
|
_current_sync["progress"] = f"Importing {progress_str}: #{order.number}"
|
|
|
|
result = await asyncio.to_thread(
|
|
import_service.import_single_order,
|
|
order, id_pol=id_pol, id_sectie=id_sectie
|
|
)
|
|
customer = order.billing.company_name or \
|
|
f"{order.billing.firstname} {order.billing.lastname}"
|
|
|
|
if result["success"]:
|
|
imported_count += 1
|
|
await sqlite_service.add_import_order(
|
|
sync_run_id=run_id,
|
|
order_number=order.number,
|
|
order_date=order.date,
|
|
customer_name=customer,
|
|
status="IMPORTED",
|
|
id_comanda=result["id_comanda"],
|
|
id_partener=result["id_partener"],
|
|
items_count=len(order.items)
|
|
)
|
|
await _emit({
|
|
"type": "order_result", "run_id": run_id,
|
|
"order_number": order.number, "customer_name": customer,
|
|
"status": "IMPORTED", "items_count": len(order.items),
|
|
"id_comanda": result["id_comanda"], "progress": progress_str
|
|
})
|
|
else:
|
|
error_count += 1
|
|
await sqlite_service.add_import_order(
|
|
sync_run_id=run_id,
|
|
order_number=order.number,
|
|
order_date=order.date,
|
|
customer_name=customer,
|
|
status="ERROR",
|
|
id_partener=result.get("id_partener"),
|
|
error_message=result["error"],
|
|
items_count=len(order.items)
|
|
)
|
|
await _emit({
|
|
"type": "order_result", "run_id": run_id,
|
|
"order_number": order.number, "customer_name": customer,
|
|
"status": "ERROR", "error_message": result["error"],
|
|
"items_count": len(order.items), "progress": progress_str
|
|
})
|
|
|
|
# Safety: stop if too many errors
|
|
if error_count > 10:
|
|
logger.warning("Too many errors, stopping sync")
|
|
break
|
|
|
|
# Step 5: Update sync run
|
|
status = "completed" if error_count <= 10 else "failed"
|
|
await sqlite_service.update_sync_run(
|
|
run_id, status, len(orders), imported_count, len(skipped), error_count
|
|
)
|
|
|
|
summary = {
|
|
"run_id": run_id,
|
|
"status": status,
|
|
"json_files": json_count,
|
|
"total_orders": len(orders),
|
|
"new_orders": len(new_orders),
|
|
"imported": imported_count,
|
|
"skipped": len(skipped),
|
|
"errors": error_count,
|
|
"missing_skus": len(validation["missing"])
|
|
}
|
|
|
|
logger.info(
|
|
f"Sync {run_id} completed: {imported_count} imported, "
|
|
f"{len(skipped)} skipped, {error_count} errors"
|
|
)
|
|
await _emit({"type": "completed", "run_id": run_id, "summary": summary})
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Sync {run_id} failed: {e}")
|
|
await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e))
|
|
_current_sync["error"] = str(e)
|
|
await _emit({"type": "failed", "run_id": run_id, "error": str(e)})
|
|
return {"run_id": run_id, "status": "failed", "error": str(e)}
|
|
finally:
|
|
# Keep _current_sync for 10 seconds so status endpoint can show final result
|
|
async def _clear_current_sync():
|
|
await asyncio.sleep(10)
|
|
global _current_sync
|
|
_current_sync = None
|
|
asyncio.ensure_future(_clear_current_sync())
|
|
|
|
|
|
def stop_sync():
|
|
"""Signal sync to stop. Currently sync runs to completion."""
|
|
# For now, sync runs are not cancellable mid-flight.
|
|
# Future: use an asyncio.Event for cooperative cancellation.
|
|
pass
|