Files
gomag-vending/api/app/routers/sync.py
Marius Mutu 9c42187f02 feat: add FastAPI admin dashboard with sync orchestration and test suite
Replace Flask admin with FastAPI app (api/app/) featuring:
- Dashboard with stat cards, sync control, and history
- Mappings CRUD for ARTICOLE_TERTI with CSV import/export
- Article autocomplete from NOM_ARTICOLE
- SKU pre-validation before import
- Sync orchestration: read JSONs -> validate -> import -> log to SQLite
- APScheduler for periodic sync from UI
- File logging to logs/sync_comenzi_YYYYMMDD_HHMMSS.log
- Oracle pool None guard (503 vs 500 on unavailable)

Test suite:
- test_app_basic.py: 30 tests (imports + routes) without Oracle
- test_integration.py: 9 integration tests with Oracle

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 14:35:16 +02:00

91 lines
2.8 KiB
Python

from fastapi import APIRouter, Request, BackgroundTasks
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from pathlib import Path
from typing import Optional
from ..services import sync_service, scheduler_service, sqlite_service
router = APIRouter(tags=["sync"])
templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates"))
class ScheduleConfig(BaseModel):
enabled: bool
interval_minutes: int = 5
# HTML pages
@router.get("/sync", response_class=HTMLResponse)
async def sync_page(request: Request):
return templates.TemplateResponse("dashboard.html", {"request": request})
@router.get("/sync/run/{run_id}", response_class=HTMLResponse)
async def sync_detail_page(request: Request, run_id: str):
return templates.TemplateResponse("sync_detail.html", {"request": request, "run_id": run_id})
# API endpoints
@router.post("/api/sync/start")
async def start_sync(background_tasks: BackgroundTasks):
"""Trigger a sync run in the background."""
status = await sync_service.get_sync_status()
if status.get("status") == "running":
return {"error": "Sync already running", "run_id": status.get("run_id")}
background_tasks.add_task(sync_service.run_sync)
return {"message": "Sync started"}
@router.post("/api/sync/stop")
async def stop_sync():
"""Stop a running sync."""
sync_service.stop_sync()
return {"message": "Stop signal sent"}
@router.get("/api/sync/status")
async def sync_status():
"""Get current sync status."""
status = await sync_service.get_sync_status()
stats = await sqlite_service.get_dashboard_stats()
return {**status, "stats": stats}
@router.get("/api/sync/history")
async def sync_history(page: int = 1, per_page: int = 20):
"""Get sync run history."""
return await sqlite_service.get_sync_runs(page, per_page)
@router.get("/api/sync/run/{run_id}")
async def sync_run_detail(run_id: str):
"""Get details for a specific sync run."""
detail = await sqlite_service.get_sync_run_detail(run_id)
if not detail:
return {"error": "Run not found"}
return detail
@router.put("/api/sync/schedule")
async def update_schedule(config: ScheduleConfig):
"""Update scheduler configuration."""
if config.enabled:
scheduler_service.start_scheduler(config.interval_minutes)
else:
scheduler_service.stop_scheduler()
# Persist config
await sqlite_service.set_scheduler_config("enabled", str(config.enabled))
await sqlite_service.set_scheduler_config("interval_minutes", str(config.interval_minutes))
return scheduler_service.get_scheduler_status()
@router.get("/api/sync/schedule")
async def get_schedule():
"""Get current scheduler status."""
return scheduler_service.get_scheduler_status()