Replace import_orders (insert-per-run) with orders table (one row per order, upsert on conflict). Eliminates dedup CTE on every dashboard query and prevents unbounded row growth at 4-500 orders/sync. Key changes: - orders table: PK order_number, upsert via ON CONFLICT DO UPDATE; COALESCE preserves id_comanda once set; times_skipped auto-increments - sync_run_orders: lightweight junction (sync_run_id, order_number) replaces sync_run_id column on orders - order_items: PK changed to (order_number, sku), INSERT OR IGNORE - Auto-migration in init_sqlite(): import_orders → orders on first boot, old table renamed to import_orders_bak - /api/dashboard/orders: period_days param (3/7/30/0=all, default 7) - Dashboard: period selector buttons in orders card header - start.sh: stop existing process on port 5003 before restart; remove --reload (broken on WSL2 /mnt/e/) - Add invoice_service, E2E Playwright tests, Oracle package updates Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
352 lines
13 KiB
Python
352 lines
13 KiB
Python
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Request, BackgroundTasks
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import HTMLResponse
|
|
from starlette.responses import StreamingResponse
|
|
from pydantic import BaseModel
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from ..services import sync_service, scheduler_service, sqlite_service, invoice_service
|
|
|
|
router = APIRouter(tags=["sync"])
|
|
templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates"))
|
|
|
|
|
|
class ScheduleConfig(BaseModel):
|
|
enabled: bool
|
|
interval_minutes: int = 5
|
|
|
|
|
|
# SSE streaming endpoint
|
|
@router.get("/api/sync/stream")
|
|
async def sync_stream(request: Request):
|
|
"""SSE stream for real-time sync progress."""
|
|
q = sync_service.subscribe()
|
|
|
|
async def event_generator():
|
|
try:
|
|
while True:
|
|
# Check if client disconnected
|
|
if await request.is_disconnected():
|
|
break
|
|
try:
|
|
event = await asyncio.wait_for(q.get(), timeout=15.0)
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
if event.get("type") in ("completed", "failed"):
|
|
break
|
|
except asyncio.TimeoutError:
|
|
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
|
|
finally:
|
|
sync_service.unsubscribe(q)
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
|
|
)
|
|
|
|
|
|
# API endpoints
|
|
@router.post("/api/sync/start")
|
|
async def start_sync(background_tasks: BackgroundTasks):
|
|
"""Trigger a sync run in the background."""
|
|
result = await sync_service.prepare_sync()
|
|
if result.get("error"):
|
|
return {"error": result["error"], "run_id": result.get("run_id")}
|
|
|
|
run_id = result["run_id"]
|
|
background_tasks.add_task(sync_service.run_sync, run_id=run_id)
|
|
return {"message": "Sync started", "run_id": run_id}
|
|
|
|
|
|
@router.post("/api/sync/stop")
|
|
async def stop_sync():
|
|
"""Stop a running sync."""
|
|
sync_service.stop_sync()
|
|
return {"message": "Stop signal sent"}
|
|
|
|
|
|
@router.get("/api/sync/status")
|
|
async def sync_status():
|
|
"""Get current sync status."""
|
|
status = await sync_service.get_sync_status()
|
|
stats = await sqlite_service.get_dashboard_stats()
|
|
return {**status, "stats": stats}
|
|
|
|
|
|
@router.get("/api/sync/history")
|
|
async def sync_history(page: int = 1, per_page: int = 20):
|
|
"""Get sync run history."""
|
|
return await sqlite_service.get_sync_runs(page, per_page)
|
|
|
|
|
|
@router.get("/logs", response_class=HTMLResponse)
|
|
async def logs_page(request: Request, run: str = None):
|
|
return templates.TemplateResponse("logs.html", {"request": request, "selected_run": run or ""})
|
|
|
|
|
|
@router.get("/api/sync/run/{run_id}")
|
|
async def sync_run_detail(run_id: str):
|
|
"""Get details for a specific sync run."""
|
|
detail = await sqlite_service.get_sync_run_detail(run_id)
|
|
if not detail:
|
|
return {"error": "Run not found"}
|
|
return detail
|
|
|
|
|
|
@router.get("/api/sync/run/{run_id}/log")
|
|
async def sync_run_log(run_id: str):
|
|
"""Get detailed log per order for a sync run."""
|
|
detail = await sqlite_service.get_sync_run_detail(run_id)
|
|
if not detail:
|
|
return {"error": "Run not found", "status_code": 404}
|
|
orders = detail.get("orders", [])
|
|
return {
|
|
"run_id": run_id,
|
|
"run": detail.get("run", {}),
|
|
"orders": [
|
|
{
|
|
"order_number": o.get("order_number"),
|
|
"order_date": o.get("order_date"),
|
|
"customer_name": o.get("customer_name"),
|
|
"items_count": o.get("items_count"),
|
|
"status": o.get("status"),
|
|
"id_comanda": o.get("id_comanda"),
|
|
"id_partener": o.get("id_partener"),
|
|
"error_message": o.get("error_message"),
|
|
"missing_skus": o.get("missing_skus"),
|
|
}
|
|
for o in orders
|
|
]
|
|
}
|
|
|
|
|
|
def _format_text_log_from_detail(detail: dict) -> str:
|
|
"""Build a text log from SQLite stored data for completed runs."""
|
|
run = detail.get("run", {})
|
|
orders = detail.get("orders", [])
|
|
|
|
run_id = run.get("run_id", "?")
|
|
started = run.get("started_at", "")
|
|
|
|
lines = [f"=== Sync Run {run_id} ==="]
|
|
if started:
|
|
try:
|
|
dt = datetime.fromisoformat(started)
|
|
lines.append(f"Inceput: {dt.strftime('%d.%m.%Y %H:%M:%S')}")
|
|
except (ValueError, TypeError):
|
|
lines.append(f"Inceput: {started}")
|
|
lines.append("")
|
|
|
|
for o in orders:
|
|
status = (o.get("status") or "").upper()
|
|
number = o.get("order_number", "?")
|
|
customer = o.get("customer_name", "?")
|
|
order_date = o.get("order_date") or "?"
|
|
|
|
if status == "IMPORTED":
|
|
id_cmd = o.get("id_comanda", "?")
|
|
lines.append(f"#{number} [{order_date}] {customer} → IMPORTAT (ID: {id_cmd})")
|
|
elif status == "SKIPPED":
|
|
missing = o.get("missing_skus", "")
|
|
if isinstance(missing, str):
|
|
try:
|
|
missing = json.loads(missing)
|
|
except (json.JSONDecodeError, TypeError):
|
|
missing = [missing] if missing else []
|
|
skus_str = ", ".join(missing) if isinstance(missing, list) else str(missing)
|
|
lines.append(f"#{number} [{order_date}] {customer} → OMIS (lipsa: {skus_str})")
|
|
elif status == "ERROR":
|
|
err = o.get("error_message", "necunoscuta")
|
|
lines.append(f"#{number} [{order_date}] {customer} → EROARE: {err}")
|
|
|
|
# Summary line
|
|
lines.append("")
|
|
total = run.get("total_orders", 0)
|
|
imported = run.get("imported", 0)
|
|
skipped = run.get("skipped", 0)
|
|
errors = run.get("errors", 0)
|
|
|
|
duration_str = ""
|
|
finished = run.get("finished_at", "")
|
|
if started and finished:
|
|
try:
|
|
dt_start = datetime.fromisoformat(started)
|
|
dt_end = datetime.fromisoformat(finished)
|
|
secs = int((dt_end - dt_start).total_seconds())
|
|
duration_str = f" | Durata: {secs}s"
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
lines.append(f"Finalizat: {imported} importate, {skipped} nemapate, {errors} erori din {total} comenzi{duration_str}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
@router.get("/api/sync/run/{run_id}/text-log")
|
|
async def sync_run_text_log(run_id: str):
|
|
"""Get text log for a sync run - live from memory or reconstructed from SQLite."""
|
|
# Check in-memory first (active/recent runs)
|
|
live_log = sync_service.get_run_text_log(run_id)
|
|
if live_log is not None:
|
|
status = "running"
|
|
current = await sync_service.get_sync_status()
|
|
if current.get("run_id") != run_id or current.get("status") != "running":
|
|
status = "completed"
|
|
return {"text": live_log, "status": status, "finished": status != "running"}
|
|
|
|
# Fall back to SQLite for historical runs
|
|
detail = await sqlite_service.get_sync_run_detail(run_id)
|
|
if not detail:
|
|
return {"error": "Run not found", "text": "", "status": "unknown", "finished": True}
|
|
|
|
run = detail.get("run", {})
|
|
text = _format_text_log_from_detail(detail)
|
|
status = run.get("status", "completed")
|
|
return {"text": text, "status": status, "finished": True}
|
|
|
|
|
|
@router.get("/api/sync/run/{run_id}/orders")
|
|
async def sync_run_orders(run_id: str, status: str = "all", page: int = 1, per_page: int = 50,
|
|
sort_by: str = "created_at", sort_dir: str = "asc"):
|
|
"""Get filtered, paginated orders for a sync run (R1)."""
|
|
return await sqlite_service.get_run_orders_filtered(run_id, status, page, per_page,
|
|
sort_by=sort_by, sort_dir=sort_dir)
|
|
|
|
|
|
def _get_articole_terti_for_skus(skus: set) -> dict:
|
|
"""Query ARTICOLE_TERTI for all active codmat/cantitate/procent per SKU."""
|
|
from .. import database
|
|
result = {}
|
|
sku_list = list(skus)
|
|
conn = database.get_oracle_connection()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
for i in range(0, len(sku_list), 500):
|
|
batch = sku_list[i:i+500]
|
|
placeholders = ",".join([f":s{j}" for j in range(len(batch))])
|
|
params = {f"s{j}": sku for j, sku in enumerate(batch)}
|
|
cur.execute(f"""
|
|
SELECT at.sku, at.codmat, at.cantitate_roa, at.procent_pret,
|
|
na.denumire
|
|
FROM ARTICOLE_TERTI at
|
|
LEFT JOIN NOM_ARTICOLE na ON na.codmat = at.codmat AND na.sters = 0 AND na.inactiv = 0
|
|
WHERE at.sku IN ({placeholders}) AND at.activ = 1 AND at.sters = 0
|
|
ORDER BY at.sku, at.codmat
|
|
""", params)
|
|
for row in cur:
|
|
sku = row[0]
|
|
if sku not in result:
|
|
result[sku] = []
|
|
result[sku].append({
|
|
"codmat": row[1],
|
|
"cantitate_roa": float(row[2]) if row[2] else 1,
|
|
"procent_pret": float(row[3]) if row[3] else 100,
|
|
"denumire": row[4] or ""
|
|
})
|
|
finally:
|
|
database.pool.release(conn)
|
|
return result
|
|
|
|
|
|
@router.get("/api/sync/order/{order_number}")
|
|
async def order_detail(order_number: str):
|
|
"""Get order detail with line items (R9), enriched with ARTICOLE_TERTI data."""
|
|
detail = await sqlite_service.get_order_detail(order_number)
|
|
if not detail:
|
|
return {"error": "Order not found"}
|
|
|
|
# Enrich items with ARTICOLE_TERTI mappings from Oracle
|
|
items = detail.get("items", [])
|
|
skus = {item["sku"] for item in items if item.get("sku")}
|
|
if skus:
|
|
codmat_map = await asyncio.to_thread(_get_articole_terti_for_skus, skus)
|
|
for item in items:
|
|
sku = item.get("sku")
|
|
if sku and sku in codmat_map:
|
|
item["codmat_details"] = codmat_map[sku]
|
|
|
|
return detail
|
|
|
|
|
|
@router.get("/api/dashboard/orders")
|
|
async def dashboard_orders(page: int = 1, per_page: int = 50,
|
|
search: str = "", status: str = "all",
|
|
sort_by: str = "order_date", sort_dir: str = "desc",
|
|
period_days: int = 7):
|
|
"""Get orders for dashboard, enriched with invoice data. period_days=0 means all time."""
|
|
is_uninvoiced_filter = (status == "UNINVOICED")
|
|
|
|
# For UNINVOICED: fetch all IMPORTED orders, then filter post-invoice-check
|
|
fetch_status = "IMPORTED" if is_uninvoiced_filter else status
|
|
fetch_per_page = 10000 if is_uninvoiced_filter else per_page
|
|
fetch_page = 1 if is_uninvoiced_filter else page
|
|
|
|
result = await sqlite_service.get_orders(
|
|
page=fetch_page, per_page=fetch_per_page, search=search,
|
|
status_filter=fetch_status, sort_by=sort_by, sort_dir=sort_dir,
|
|
period_days=period_days
|
|
)
|
|
|
|
# Enrich imported orders with invoice data from Oracle
|
|
all_orders = result["orders"]
|
|
imported_orders = [o for o in all_orders if o.get("id_comanda")]
|
|
invoice_data = {}
|
|
if imported_orders:
|
|
id_comanda_list = [o["id_comanda"] for o in imported_orders]
|
|
invoice_data = await asyncio.to_thread(
|
|
invoice_service.check_invoices_for_orders, id_comanda_list
|
|
)
|
|
|
|
for o in all_orders:
|
|
idc = o.get("id_comanda")
|
|
if idc and idc in invoice_data:
|
|
o["invoice"] = invoice_data[idc]
|
|
else:
|
|
o["invoice"] = None
|
|
|
|
# Count uninvoiced (IMPORTED without invoice)
|
|
uninvoiced_count = sum(
|
|
1 for o in all_orders
|
|
if o.get("status") == "IMPORTED" and not o.get("invoice")
|
|
)
|
|
result["counts"]["uninvoiced"] = uninvoiced_count
|
|
|
|
# For UNINVOICED filter: apply server-side filtering + pagination
|
|
if is_uninvoiced_filter:
|
|
filtered = [o for o in all_orders if o.get("status") == "IMPORTED" and not o.get("invoice")]
|
|
total = len(filtered)
|
|
offset = (page - 1) * per_page
|
|
result["orders"] = filtered[offset:offset + per_page]
|
|
result["total"] = total
|
|
result["page"] = page
|
|
result["per_page"] = per_page
|
|
result["pages"] = (total + per_page - 1) // per_page if total > 0 else 0
|
|
|
|
return result
|
|
|
|
|
|
@router.put("/api/sync/schedule")
|
|
async def update_schedule(config: ScheduleConfig):
|
|
"""Update scheduler configuration."""
|
|
if config.enabled:
|
|
scheduler_service.start_scheduler(config.interval_minutes)
|
|
else:
|
|
scheduler_service.stop_scheduler()
|
|
|
|
# Persist config
|
|
await sqlite_service.set_scheduler_config("enabled", str(config.enabled))
|
|
await sqlite_service.set_scheduler_config("interval_minutes", str(config.interval_minutes))
|
|
|
|
return scheduler_service.get_scheduler_status()
|
|
|
|
|
|
@router.get("/api/sync/schedule")
|
|
async def get_schedule():
|
|
"""Get current scheduler status."""
|
|
return scheduler_service.get_scheduler_status()
|