Files
gomag-vending/api/app/services/scheduler_service.py
Claude Agent aa9e580e1b fix(scheduler): prevent auto-enable with wrong interval on page load
Root cause: GET /api/sync/schedule returned interval_minutes=null when
scheduler was stopped, causing dropdown to stay on first HTML option
(1 min). Setting .value programmatically could trigger onchange, sending
a second PUT with interval=1 right after the user's intended interval.

- GET schedule falls back to DB/default (10 min) when scheduler is off
- Add _schedulerLoading flag to block onchange during loadSchedulerStatus
- Default interval 10 min everywhere (was 5 in backend)
- Cache bust dashboard.js v=33

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 13:57:22 +00:00

72 lines
1.9 KiB
Python

import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
logger = logging.getLogger(__name__)
_scheduler = None
_is_running = False
def init_scheduler():
"""Initialize the APScheduler instance."""
global _scheduler
_scheduler = AsyncIOScheduler()
logger.info("Scheduler initialized")
def start_scheduler(interval_minutes: int = 10):
"""Start the scheduler with the given interval."""
global _is_running
if _scheduler is None:
init_scheduler()
# Remove existing job if any
if _scheduler.get_job("sync_job"):
_scheduler.remove_job("sync_job")
from . import sync_service
_scheduler.add_job(
sync_service.run_sync,
trigger=IntervalTrigger(minutes=interval_minutes),
id="sync_job",
name="GoMag Sync",
replace_existing=True
)
if not _scheduler.running:
_scheduler.start()
_is_running = True
logger.info(f"Scheduler started with interval {interval_minutes}min")
def stop_scheduler():
"""Stop the scheduler."""
global _is_running
if _scheduler and _scheduler.running:
if _scheduler.get_job("sync_job"):
_scheduler.remove_job("sync_job")
_is_running = False
logger.info("Scheduler stopped")
def shutdown_scheduler():
"""Shutdown the scheduler completely."""
global _scheduler, _is_running
if _scheduler and _scheduler.running:
_scheduler.shutdown(wait=False)
_scheduler = None
_is_running = False
def get_scheduler_status():
"""Get current scheduler status."""
job = _scheduler.get_job("sync_job") if _scheduler else None
return {
"enabled": _is_running,
"next_run": job.next_run_time.isoformat() if job and job.next_run_time else None,
"interval_minutes": int(job.trigger.interval.total_seconds() / 60) if job else None
}