diff --git a/api/app/database.py b/api/app/database.py index fbce776..c5bca76 100644 --- a/api/app/database.py +++ b/api/app/database.py @@ -19,6 +19,10 @@ def init_oracle(): instantclient_path = settings.INSTANTCLIENTPATH dsn = settings.ORACLE_DSN + # Ensure TNS_ADMIN is set as OS env var so oracledb can find tnsnames.ora + if settings.TNS_ADMIN: + os.environ['TNS_ADMIN'] = settings.TNS_ADMIN + if force_thin: logger.info(f"FORCE_THIN_MODE=true: thin mode for {dsn}") elif instantclient_path: @@ -68,7 +72,8 @@ CREATE TABLE IF NOT EXISTS sync_runs ( imported INTEGER DEFAULT 0, skipped INTEGER DEFAULT 0, errors INTEGER DEFAULT 0, - json_files INTEGER DEFAULT 0 + json_files INTEGER DEFAULT 0, + error_message TEXT ); CREATE TABLE IF NOT EXISTS import_orders ( @@ -129,6 +134,12 @@ def init_sqlite(): if col not in cols: conn.execute(f"ALTER TABLE missing_skus ADD COLUMN {col} {typedef}") logger.info(f"Migrated missing_skus: added column {col}") + # Migrate sync_runs: add error_message column + cursor = conn.execute("PRAGMA table_info(sync_runs)") + sync_cols = {row[1] for row in cursor.fetchall()} + if "error_message" not in sync_cols: + conn.execute("ALTER TABLE sync_runs ADD COLUMN error_message TEXT") + logger.info("Migrated sync_runs: added column error_message") conn.commit() except Exception as e: logger.warning(f"Migration check failed: {e}") diff --git a/api/app/routers/sync.py b/api/app/routers/sync.py index a4827f9..c8d7418 100644 --- a/api/app/routers/sync.py +++ b/api/app/routers/sync.py @@ -1,6 +1,10 @@ +import asyncio +import json + from fastapi import APIRouter, Request, BackgroundTasks from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse +from starlette.responses import StreamingResponse from pydantic import BaseModel from pathlib import Path from typing import Optional @@ -16,27 +20,46 @@ class ScheduleConfig(BaseModel): interval_minutes: int = 5 -# HTML pages -@router.get("/sync", response_class=HTMLResponse) -async def sync_page(request: Request): - return templates.TemplateResponse("dashboard.html", {"request": request}) +# SSE streaming endpoint +@router.get("/api/sync/stream") +async def sync_stream(request: Request): + """SSE stream for real-time sync progress.""" + q = sync_service.subscribe() + async def event_generator(): + try: + while True: + # Check if client disconnected + if await request.is_disconnected(): + break + try: + event = await asyncio.wait_for(q.get(), timeout=15.0) + yield f"data: {json.dumps(event)}\n\n" + if event.get("type") in ("completed", "failed"): + break + except asyncio.TimeoutError: + yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" + finally: + sync_service.unsubscribe(q) -@router.get("/sync/run/{run_id}", response_class=HTMLResponse) -async def sync_detail_page(request: Request, run_id: str): - return templates.TemplateResponse("sync_detail.html", {"request": request, "run_id": run_id}) + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"} + ) # API endpoints @router.post("/api/sync/start") async def start_sync(background_tasks: BackgroundTasks): """Trigger a sync run in the background.""" - status = await sync_service.get_sync_status() - if status.get("status") == "running": - return {"error": "Sync already running", "run_id": status.get("run_id")} + result = await sync_service.prepare_sync() + if result.get("error"): + return {"error": result["error"], "run_id": result.get("run_id")} - background_tasks.add_task(sync_service.run_sync) - return {"message": "Sync started"} + run_id = result["run_id"] + background_tasks.add_task(sync_service.run_sync, run_id=run_id) + return {"message": "Sync started", "run_id": run_id} @router.post("/api/sync/stop") @@ -61,8 +84,8 @@ async def sync_history(page: int = 1, per_page: int = 20): @router.get("/logs", response_class=HTMLResponse) -async def logs_page(request: Request): - return templates.TemplateResponse("logs.html", {"request": request}) +async def logs_page(request: Request, run: str = None): + return templates.TemplateResponse("logs.html", {"request": request, "selected_run": run or ""}) @router.get("/api/sync/run/{run_id}") diff --git a/api/app/services/import_service.py b/api/app/services/import_service.py index f24f7ae..5278032 100644 --- a/api/app/services/import_service.py +++ b/api/app/services/import_service.py @@ -91,6 +91,8 @@ def import_single_order(order, id_pol: int = None, id_sectie: int = None) -> dic order_number = clean_web_text(order.number) order_date = convert_web_date(order.date) + if database.pool is None: + raise RuntimeError("Oracle pool not initialized") with database.pool.acquire() as conn: with conn.cursor() as cur: # Step 1: Process partner diff --git a/api/app/services/sqlite_service.py b/api/app/services/sqlite_service.py index 895c5b2..a8bf136 100644 --- a/api/app/services/sqlite_service.py +++ b/api/app/services/sqlite_service.py @@ -20,7 +20,8 @@ async def create_sync_run(run_id: str, json_files: int = 0): async def update_sync_run(run_id: str, status: str, total_orders: int = 0, - imported: int = 0, skipped: int = 0, errors: int = 0): + imported: int = 0, skipped: int = 0, errors: int = 0, + error_message: str = None): """Update sync run with results.""" db = await get_sqlite() try: @@ -31,9 +32,10 @@ async def update_sync_run(run_id: str, status: str, total_orders: int = 0, total_orders = ?, imported = ?, skipped = ?, - errors = ? + errors = ?, + error_message = ? WHERE run_id = ? - """, (status, total_orders, imported, skipped, errors, run_id)) + """, (status, total_orders, imported, skipped, errors, error_message, run_id)) await db.commit() finally: await db.close() diff --git a/api/app/services/sync_service.py b/api/app/services/sync_service.py index ca8a2ec..58061d0 100644 --- a/api/app/services/sync_service.py +++ b/api/app/services/sync_service.py @@ -13,6 +13,33 @@ logger = logging.getLogger(__name__) _sync_lock = asyncio.Lock() _current_sync = None # dict with run_id, status, progress info +# SSE subscriber system +_subscribers: list[asyncio.Queue] = [] + + +def subscribe() -> asyncio.Queue: + """Subscribe to sync events. Returns a queue that will receive event dicts.""" + q = asyncio.Queue() + _subscribers.append(q) + return q + + +def unsubscribe(q: asyncio.Queue): + """Unsubscribe from sync events.""" + try: + _subscribers.remove(q) + except ValueError: + pass + + +async def _emit(event: dict): + """Push an event to all subscriber queues.""" + for q in _subscribers: + try: + q.put_nowait(event) + except asyncio.QueueFull: + pass + async def get_sync_status(): """Get current sync status.""" @@ -21,7 +48,25 @@ async def get_sync_status(): return {"status": "idle"} -async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: +async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict: + """Prepare a sync run - creates run_id and sets initial state. + Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running. + """ + global _current_sync + if _sync_lock.locked(): + return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None} + + run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6] + _current_sync = { + "run_id": run_id, + "status": "running", + "started_at": datetime.now().isoformat(), + "progress": "Starting..." + } + return {"run_id": run_id, "status": "starting"} + + +async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict: """Run a full sync cycle. Returns summary dict.""" global _current_sync @@ -29,30 +74,33 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: return {"error": "Sync already running"} async with _sync_lock: - run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6] - _current_sync = { - "run_id": run_id, - "status": "running", - "started_at": datetime.now().isoformat(), - "progress": "Reading JSON files..." - } + # Use provided run_id or generate one + if not run_id: + run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6] + _current_sync = { + "run_id": run_id, + "status": "running", + "started_at": datetime.now().isoformat(), + "progress": "Reading JSON files..." + } + + _current_sync["progress"] = "Reading JSON files..." + await _emit({"type": "phase", "run_id": run_id, "message": "Reading JSON files..."}) try: # Step 1: Read orders orders, json_count = order_reader.read_json_orders() await sqlite_service.create_sync_run(run_id, json_count) + await _emit({"type": "phase", "run_id": run_id, "message": f"Found {len(orders)} orders in {json_count} files"}) if not orders: await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0) - _current_sync = None - return { - "run_id": run_id, - "status": "completed", - "message": "No orders found", - "json_files": json_count - } + summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count} + await _emit({"type": "completed", "run_id": run_id, "summary": summary}) + return summary _current_sync["progress"] = f"Validating {len(orders)} orders..." + await _emit({"type": "phase", "run_id": run_id, "message": f"Validating {len(orders)} orders..."}) # Step 2a: Find new orders (not yet in Oracle) all_order_numbers = [o.number for o in orders] @@ -65,6 +113,8 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: validation = await asyncio.to_thread(validation_service.validate_skus, all_skus) importable, skipped = validation_service.classify_orders(orders, validation) + await _emit({"type": "phase", "run_id": run_id, "message": f"{len(importable)} importable, {len(skipped)} skipped (missing SKUs)"}) + # Step 2c: Build SKU context from skipped orders sku_context = {} # {sku: {"orders": [], "customers": []}} for order, missing_skus_list in skipped: @@ -100,6 +150,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: id_pol = id_pol or settings.ID_POL if id_pol and importable: _current_sync["progress"] = "Validating prices..." + await _emit({"type": "phase", "run_id": run_id, "message": "Validating prices..."}) # Gather all CODMATs from importable orders all_codmats = set() for order in importable: @@ -124,7 +175,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: price_result["missing_price"], id_pol ) - # Step 3: Record skipped orders + # Step 3: Record skipped orders + emit events for order, missing_skus in skipped: customer = order.billing.company_name or \ f"{order.billing.firstname} {order.billing.lastname}" @@ -137,13 +188,20 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: missing_skus=missing_skus, items_count=len(order.items) ) + await _emit({ + "type": "order_result", "run_id": run_id, + "order_number": order.number, "customer_name": customer, + "status": "SKIPPED", "missing_skus": missing_skus, + "items_count": len(order.items), "progress": f"0/{len(importable)}" + }) # Step 4: Import valid orders imported_count = 0 error_count = 0 for i, order in enumerate(importable): - _current_sync["progress"] = f"Importing {i+1}/{len(importable)}: #{order.number}" + progress_str = f"{i+1}/{len(importable)}" + _current_sync["progress"] = f"Importing {progress_str}: #{order.number}" result = await asyncio.to_thread( import_service.import_single_order, @@ -164,6 +222,12 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: id_partener=result["id_partener"], items_count=len(order.items) ) + await _emit({ + "type": "order_result", "run_id": run_id, + "order_number": order.number, "customer_name": customer, + "status": "IMPORTED", "items_count": len(order.items), + "id_comanda": result["id_comanda"], "progress": progress_str + }) else: error_count += 1 await sqlite_service.add_import_order( @@ -176,6 +240,12 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: error_message=result["error"], items_count=len(order.items) ) + await _emit({ + "type": "order_result", "run_id": run_id, + "order_number": order.number, "customer_name": customer, + "status": "ERROR", "error_message": result["error"], + "items_count": len(order.items), "progress": progress_str + }) # Safety: stop if too many errors if error_count > 10: @@ -204,14 +274,22 @@ async def run_sync(id_pol: int = None, id_sectie: int = None) -> dict: f"Sync {run_id} completed: {imported_count} imported, " f"{len(skipped)} skipped, {error_count} errors" ) + await _emit({"type": "completed", "run_id": run_id, "summary": summary}) return summary except Exception as e: logger.error(f"Sync {run_id} failed: {e}") - await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1) + await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e)) + _current_sync["error"] = str(e) + await _emit({"type": "failed", "run_id": run_id, "error": str(e)}) return {"run_id": run_id, "status": "failed", "error": str(e)} finally: - _current_sync = None + # Keep _current_sync for 10 seconds so status endpoint can show final result + async def _clear_current_sync(): + await asyncio.sleep(10) + global _current_sync + _current_sync = None + asyncio.ensure_future(_clear_current_sync()) def stop_sync(): diff --git a/api/app/services/validation_service.py b/api/app/services/validation_service.py index 63cfcaa..7798bac 100644 --- a/api/app/services/validation_service.py +++ b/api/app/services/validation_service.py @@ -17,7 +17,8 @@ def validate_skus(skus: set[str]) -> dict: direct = set() sku_list = list(skus) - with database.pool.acquire() as conn: + conn = database.get_oracle_connection() + try: with conn.cursor() as cur: # Check in batches of 500 for i in range(0, len(sku_list), 500): @@ -44,6 +45,8 @@ def validate_skus(skus: set[str]) -> dict: """, params2) for row in cur: direct.add(row[0]) + finally: + database.pool.release(conn) missing = skus - mapped - direct @@ -80,7 +83,8 @@ def find_new_orders(order_numbers: list[str]) -> set[str]: existing = set() num_list = list(order_numbers) - with database.pool.acquire() as conn: + conn = database.get_oracle_connection() + try: with conn.cursor() as cur: for i in range(0, len(num_list), 500): batch = num_list[i:i+500] @@ -93,6 +97,8 @@ def find_new_orders(order_numbers: list[str]) -> set[str]: """, params) for row in cur: existing.add(row[0]) + finally: + database.pool.release(conn) new_orders = set(order_numbers) - existing logger.info(f"Order check: {len(new_orders)} new, {len(existing)} already exist out of {len(order_numbers)} total") @@ -109,7 +115,8 @@ def validate_prices(codmats: set[str], id_pol: int) -> dict: ids_with_price = set() codmat_list = list(codmats) - with database.pool.acquire() as conn: + conn = database.get_oracle_connection() + try: with conn.cursor() as cur: # Step 1: Get ID_ARTICOL for each CODMAT for i in range(0, len(codmat_list), 500): @@ -138,6 +145,8 @@ def validate_prices(codmats: set[str], id_pol: int) -> dict: """, params) for row in cur: ids_with_price.add(row[0]) + finally: + database.pool.release(conn) # Map back to CODMATs has_price = {cm for cm, aid in codmat_to_id.items() if aid in ids_with_price} @@ -151,7 +160,8 @@ def ensure_prices(codmats: set[str], id_pol: int): if not codmats: return - with database.pool.acquire() as conn: + conn = database.get_oracle_connection() + try: with conn.cursor() as cur: # Get ID_VALUTA for this policy cur.execute(""" @@ -176,16 +186,18 @@ def ensure_prices(codmats: set[str], id_pol: int): cur.execute(""" INSERT INTO CRM_POLITICI_PRET_ART - (ID_POL_ART, ID_POL, ID_ARTICOL, PRET, ID_COMANDA, ID_VALUTA, - ID_UTIL, DATAORA, PROC_TVAV, ID_PARTR, ID_PARTZ, - PRETFTVA, PRETCTVA, CANTITATE, ID_UM, PRET_MIN, PRET_MIN_TVA) + (ID_POL_ART, ID_POL, ID_ARTICOL, PRET, ID_VALUTA, + ID_UTIL, DATAORA, PROC_TVAV, + PRETFTVA, PRETCTVA) VALUES - (SEQ_CRM_POLITICI_PRET_ART.NEXTVAL, :id_pol, :id_articol, 0, NULL, :id_valuta, - -3, SYSDATE, 1.19, NULL, NULL, - 0, 0, 0, NULL, 0, 0) + (SEQ_CRM_POLITICI_PRET_ART.NEXTVAL, :id_pol, :id_articol, 0, :id_valuta, + -3, SYSDATE, 1.19, + 0, 0) """, {"id_pol": id_pol, "id_articol": id_articol, "id_valuta": id_valuta}) logger.info(f"Pret 0 adaugat pentru CODMAT {codmat} in politica {id_pol}") conn.commit() + finally: + database.pool.release(conn) logger.info(f"Ensure prices done: {len(codmats)} CODMATs processed for policy {id_pol}") diff --git a/api/app/static/css/style.css b/api/app/static/css/style.css index fbda7d5..9f8c226 100644 --- a/api/app/static/css/style.css +++ b/api/app/static/css/style.css @@ -212,3 +212,73 @@ body { align-items: center; justify-content: center; } + +/* Live Feed */ +.live-feed { + max-height: 300px; + overflow-y: auto; + font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace; + font-size: 0.8125rem; + scroll-behavior: smooth; +} + +.feed-entry { + padding: 0.35rem 0.75rem; + border-bottom: 1px solid #f1f5f9; + display: flex; + align-items: baseline; + gap: 0.5rem; +} + +.feed-entry:last-child { + border-bottom: none; +} + +.feed-entry.phase { + background-color: #eff6ff; + color: #1e40af; +} + +.feed-entry.error { + background-color: #fef2f2; + color: #991b1b; +} + +.feed-entry.success { + color: #166534; +} + +.feed-entry .feed-time { + color: #94a3b8; + white-space: nowrap; + min-width: 5rem; +} + +.feed-entry .feed-icon { + min-width: 1.25rem; + text-align: center; +} + +.feed-entry .feed-msg { + flex: 1; + word-break: break-word; +} + +/* Live pulse animation */ +.live-pulse { + animation: pulse 1.5s ease-in-out infinite; +} + +@keyframes pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.5; } +} + +/* Clickable table rows */ +.table-hover tbody tr[data-href] { + cursor: pointer; +} + +.table-hover tbody tr[data-href]:hover { + background-color: #e2e8f0; +} diff --git a/api/app/static/js/dashboard.js b/api/app/static/js/dashboard.js index c5cd722..1a4a8ee 100644 --- a/api/app/static/js/dashboard.js +++ b/api/app/static/js/dashboard.js @@ -114,7 +114,7 @@ async function loadSyncHistory() { } const statusClass = r.status === 'completed' ? 'bg-success' : r.status === 'running' ? 'bg-primary' : 'bg-danger'; - return `
${esc(s)}`).join('')}${esc(event.order_number || '-')}${esc(s)}`).join('') +
- '${esc(s)}`).join('') + '';
}
- } catch (e) {
- // malformed JSON — skip
- }
+ } catch (e) { /* skip */ }
}
-
const details = order.error_message
? `${esc(order.error_message)}${missingSkuTags}`
: missingSkuTags || '-';
@@ -142,75 +345,61 @@ async function loadRunLog(runId) {
`;
}).join('');
- filterRow.style.display = '';
- // Reset filter to "Toate"
+ // Reset filter
document.querySelectorAll('[data-filter]').forEach(btn => {
btn.classList.toggle('active', btn.dataset.filter === 'all');
});
applyFilter('all');
} catch (err) {
- tbody.innerHTML = `