Replace Flask admin with FastAPI app (api/app/) featuring: - Dashboard with stat cards, sync control, and history - Mappings CRUD for ARTICOLE_TERTI with CSV import/export - Article autocomplete from NOM_ARTICOLE - SKU pre-validation before import - Sync orchestration: read JSONs -> validate -> import -> log to SQLite - APScheduler for periodic sync from UI - File logging to logs/sync_comenzi_YYYYMMDD_HHMMSS.log - Oracle pool None guard (503 vs 500 on unavailable) Test suite: - test_app_basic.py: 30 tests (imports + routes) without Oracle - test_integration.py: 9 integration tests with Oracle Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
72 lines
1.9 KiB
Python
72 lines
1.9 KiB
Python
import logging
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_scheduler = None
|
|
_is_running = False
|
|
|
|
|
|
def init_scheduler():
|
|
"""Initialize the APScheduler instance."""
|
|
global _scheduler
|
|
_scheduler = AsyncIOScheduler()
|
|
logger.info("Scheduler initialized")
|
|
|
|
|
|
def start_scheduler(interval_minutes: int = 5):
|
|
"""Start the scheduler with the given interval."""
|
|
global _is_running
|
|
if _scheduler is None:
|
|
init_scheduler()
|
|
|
|
# Remove existing job if any
|
|
if _scheduler.get_job("sync_job"):
|
|
_scheduler.remove_job("sync_job")
|
|
|
|
from . import sync_service
|
|
|
|
_scheduler.add_job(
|
|
sync_service.run_sync,
|
|
trigger=IntervalTrigger(minutes=interval_minutes),
|
|
id="sync_job",
|
|
name="GoMag Sync",
|
|
replace_existing=True
|
|
)
|
|
|
|
if not _scheduler.running:
|
|
_scheduler.start()
|
|
|
|
_is_running = True
|
|
logger.info(f"Scheduler started with interval {interval_minutes}min")
|
|
|
|
|
|
def stop_scheduler():
|
|
"""Stop the scheduler."""
|
|
global _is_running
|
|
if _scheduler and _scheduler.running:
|
|
if _scheduler.get_job("sync_job"):
|
|
_scheduler.remove_job("sync_job")
|
|
_is_running = False
|
|
logger.info("Scheduler stopped")
|
|
|
|
|
|
def shutdown_scheduler():
|
|
"""Shutdown the scheduler completely."""
|
|
global _scheduler, _is_running
|
|
if _scheduler and _scheduler.running:
|
|
_scheduler.shutdown(wait=False)
|
|
_scheduler = None
|
|
_is_running = False
|
|
|
|
|
|
def get_scheduler_status():
|
|
"""Get current scheduler status."""
|
|
job = _scheduler.get_job("sync_job") if _scheduler else None
|
|
return {
|
|
"enabled": _is_running,
|
|
"next_run": job.next_run_time.isoformat() if job and job.next_run_time else None,
|
|
"interval_minutes": int(job.trigger.interval.total_seconds() / 60) if job else None
|
|
}
|