feat: add FastAPI admin dashboard with sync orchestration and test suite

Replace Flask admin with FastAPI app (api/app/) featuring:
- Dashboard with stat cards, sync control, and history
- Mappings CRUD for ARTICOLE_TERTI with CSV import/export
- Article autocomplete from NOM_ARTICOLE
- SKU pre-validation before import
- Sync orchestration: read JSONs -> validate -> import -> log to SQLite
- APScheduler for periodic sync from UI
- File logging to logs/sync_comenzi_YYYYMMDD_HHMMSS.log
- Oracle pool None guard (503 vs 500 on unavailable)

Test suite:
- test_app_basic.py: 30 tests (imports + routes) without Oracle
- test_integration.py: 9 integration tests with Oracle

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 14:35:16 +02:00
parent 902f99c507
commit 9c42187f02
35 changed files with 3730 additions and 54 deletions

View File

@@ -0,0 +1,71 @@
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
logger = logging.getLogger(__name__)
_scheduler = None
_is_running = False
def init_scheduler():
"""Initialize the APScheduler instance."""
global _scheduler
_scheduler = AsyncIOScheduler()
logger.info("Scheduler initialized")
def start_scheduler(interval_minutes: int = 5):
"""Start the scheduler with the given interval."""
global _is_running
if _scheduler is None:
init_scheduler()
# Remove existing job if any
if _scheduler.get_job("sync_job"):
_scheduler.remove_job("sync_job")
from . import sync_service
_scheduler.add_job(
sync_service.run_sync,
trigger=IntervalTrigger(minutes=interval_minutes),
id="sync_job",
name="GoMag Sync",
replace_existing=True
)
if not _scheduler.running:
_scheduler.start()
_is_running = True
logger.info(f"Scheduler started with interval {interval_minutes}min")
def stop_scheduler():
"""Stop the scheduler."""
global _is_running
if _scheduler and _scheduler.running:
if _scheduler.get_job("sync_job"):
_scheduler.remove_job("sync_job")
_is_running = False
logger.info("Scheduler stopped")
def shutdown_scheduler():
"""Shutdown the scheduler completely."""
global _scheduler, _is_running
if _scheduler and _scheduler.running:
_scheduler.shutdown(wait=False)
_scheduler = None
_is_running = False
def get_scheduler_status():
"""Get current scheduler status."""
job = _scheduler.get_job("sync_job") if _scheduler else None
return {
"enabled": _is_running,
"next_run": job.next_run_time.isoformat() if job and job.next_run_time else None,
"interval_minutes": int(job.trigger.interval.total_seconds() / 60) if job else None
}