""" Cron-like job scheduler for Echo-Core. Wraps APScheduler AsyncIOScheduler to run Claude CLI prompts on a schedule, sending output to designated Discord channels. """ import asyncio import json import logging import os import re import subprocess import tempfile from datetime import datetime, timezone from pathlib import Path from typing import Awaitable, Callable from zoneinfo import ZoneInfo from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from src.claude_session import ( CLAUDE_BIN, PROJECT_ROOT, VALID_MODELS, _safe_env, build_system_prompt, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- JOBS_DIR = PROJECT_ROOT / "cron" JOBS_FILE = JOBS_DIR / "jobs.json" JOB_TIMEOUT = 300 # 5-minute default per job execution _NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$") _MAX_PROMPT_LEN = 10_000 _MAX_SHELL_OUTPUT = 1500 # chars of stdout to forward to channel _MAX_STDERR_REPORT = 500 # chars of stderr on non-zero exit _VALID_REPORT_ON = {"always", "changes", "never"} _VALID_KINDS = {"claude", "shell"} _SCHEDULER_TZ = ZoneInfo("Europe/Bucharest") _MARKER_RE = re.compile(r"^GSTACK-CRON:\s+changes=(\d+)\s*$", re.MULTILINE) # --------------------------------------------------------------------------- # Scheduler class # --------------------------------------------------------------------------- class Scheduler: """Wraps APScheduler AsyncIOScheduler for Echo Core cron jobs.""" def __init__( self, send_callback: Callable[[str, str], Awaitable[None]] | None = None, config=None, ) -> None: self._send_callback = send_callback self._config = config self._scheduler = AsyncIOScheduler(timezone=_SCHEDULER_TZ) self._jobs: list[dict] = [] # ------------------------------------------------------------------ # Public methods # ------------------------------------------------------------------ async def start(self) -> None: """Load jobs from jobs.json, schedule enabled ones, start scheduler.""" self._jobs = self._load_jobs() for job in self._jobs: if job.get("enabled", False): self._schedule_job(job) self._scheduler.start() logger.info("Scheduler started with %d jobs (%d enabled)", len(self._jobs), sum(1 for j in self._jobs if j.get("enabled"))) async def stop(self) -> None: """Shut down APScheduler gracefully.""" self._scheduler.shutdown(wait=False) logger.info("Scheduler stopped") def add_job( self, name: str, cron: str, channel: str, prompt: str, model: str = "sonnet", allowed_tools: list[str] | None = None, ) -> dict: """Validate, add job to list, save, and schedule. Returns new job dict.""" # Validate name if not _NAME_RE.match(name): raise ValueError( f"Invalid job name '{name}'. Must match: lowercase alphanumeric " "and hyphens, 1-63 chars, starting with alphanumeric." ) # Duplicate check if any(j["name"] == name for j in self._jobs): raise ValueError(f"Job '{name}' already exists") # Validate cron expression try: CronTrigger.from_crontab(cron) except (ValueError, KeyError) as exc: raise ValueError(f"Invalid cron expression '{cron}': {exc}") # Validate model if model not in VALID_MODELS: raise ValueError( f"Invalid model '{model}'. Must be one of: {', '.join(sorted(VALID_MODELS))}" ) # Validate prompt if not prompt or not prompt.strip(): raise ValueError("Prompt must be non-empty") if len(prompt) > _MAX_PROMPT_LEN: raise ValueError(f"Prompt too long ({len(prompt)} chars, max {_MAX_PROMPT_LEN})") job = { "name": name, "cron": cron, "channel": channel, "model": model, "prompt": prompt, "allowed_tools": allowed_tools or [], "enabled": True, "last_run": None, "last_status": None, "next_run": None, } self._jobs.append(job) self._schedule_job(job) self._update_next_run(job) self._save_jobs() logger.info("Added job '%s' (cron: %s, channel: %s)", name, cron, channel) return job def add_shell_job( self, name: str, cron: str, channel: str, command: list[str], report_on: str = "changes", timeout: int | None = None, ) -> dict: """Validate, add a shell job to list, save, and schedule. Shell jobs execute an arbitrary command via subprocess. Reporting policy: - report_on="always": always forward stdout (truncated) on exit 0 - report_on="never": never forward stdout on exit 0 - report_on="changes": parse GSTACK-CRON marker ('changes=N') and forward stdout only if N>0 Non-zero exit status ALWAYS reports stderr regardless of report_on. """ # Validate name if not _NAME_RE.match(name): raise ValueError( f"Invalid job name '{name}'. Must match: lowercase alphanumeric " "and hyphens, 1-63 chars, starting with alphanumeric." ) # Duplicate check (across claude+shell) if any(j["name"] == name for j in self._jobs): raise ValueError(f"Job '{name}' already exists") # Validate cron expression try: CronTrigger.from_crontab(cron, timezone=_SCHEDULER_TZ) except (ValueError, KeyError) as exc: raise ValueError(f"Invalid cron expression '{cron}': {exc}") # Validate channel if not isinstance(channel, str) or not channel.strip(): raise ValueError("Channel must be a non-empty string") # Validate command if ( not isinstance(command, list) or not command or not all(isinstance(c, str) and c.strip() for c in command) ): raise ValueError( "Command must be a non-empty list of non-empty strings" ) # Validate report_on if report_on not in _VALID_REPORT_ON: raise ValueError( f"Invalid report_on '{report_on}'. Must be one of: " f"{', '.join(sorted(_VALID_REPORT_ON))}" ) # Validate timeout if timeout is not None: if not isinstance(timeout, int) or isinstance(timeout, bool): raise ValueError("Timeout must be an int (seconds)") if timeout < 1 or timeout > 3600: raise ValueError("Timeout must be between 1 and 3600 seconds") job = { "name": name, "kind": "shell", "cron": cron, "channel": channel, "command": list(command), "report_on": report_on, "timeout": timeout, "enabled": True, "last_run": None, "last_status": None, "next_run": None, } self._jobs.append(job) self._schedule_job(job) self._update_next_run(job) self._save_jobs() logger.info( "Added shell job '%s' (cron: %s, channel: %s, report_on: %s)", name, cron, channel, report_on, ) return job def remove_job(self, name: str) -> bool: """Remove job from list and APScheduler. Returns True if found.""" for i, job in enumerate(self._jobs): if job["name"] == name: self._jobs.pop(i) try: self._scheduler.remove_job(name) except Exception: pass self._save_jobs() logger.info("Removed job '%s'", name) return True return False def enable_job(self, name: str) -> bool: """Enable job and schedule in APScheduler. Returns True if found.""" for job in self._jobs: if job["name"] == name: job["enabled"] = True self._schedule_job(job) self._update_next_run(job) self._save_jobs() logger.info("Enabled job '%s'", name) return True return False def disable_job(self, name: str) -> bool: """Disable job and remove from APScheduler. Returns True if found.""" for job in self._jobs: if job["name"] == name: job["enabled"] = False job["next_run"] = None try: self._scheduler.remove_job(name) except Exception: pass self._save_jobs() logger.info("Disabled job '%s'", name) return True return False async def run_job(self, name: str) -> str: """Force-execute a job immediately. Returns Claude response text.""" job = self._find_job(name) if job is None: raise KeyError(f"Job '{name}' not found") return await self._execute_job(job) def list_jobs(self) -> list[dict]: """Return a copy of all jobs with current state.""" return [dict(j) for j in self._jobs] # ------------------------------------------------------------------ # Internal methods # ------------------------------------------------------------------ def _find_job(self, name: str) -> dict | None: """Find a job by name.""" for job in self._jobs: if job["name"] == name: return job return None def _load_jobs(self) -> list[dict]: """Read and parse jobs.json. Returns [] if missing or corrupt.""" try: text = JOBS_FILE.read_text(encoding="utf-8") if not text.strip(): return [] data = json.loads(text) if not isinstance(data, list): logger.error("jobs.json is not a list, treating as empty") return [] return data except FileNotFoundError: return [] except json.JSONDecodeError as exc: logger.error("jobs.json corrupt (%s), treating as empty", exc) return [] def _save_jobs(self) -> None: """Atomically write current jobs list to jobs.json.""" JOBS_DIR.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp( dir=JOBS_DIR, prefix=".jobs_", suffix=".json" ) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(self._jobs, f, indent=2, ensure_ascii=False) f.write("\n") os.replace(tmp_path, JOBS_FILE) except BaseException: try: os.unlink(tmp_path) except OSError: pass raise def _schedule_job(self, job: dict) -> None: """Add a single job to APScheduler.""" # Remove existing schedule if any try: self._scheduler.remove_job(job["name"]) except Exception: pass trigger = CronTrigger.from_crontab(job["cron"]) self._scheduler.add_job( self._job_callback, trigger=trigger, id=job["name"], args=[job["name"]], max_instances=1, ) async def _job_callback(self, job_name: str) -> None: """APScheduler callback — finds job and executes.""" job = self._find_job(job_name) if job is None: logger.error("Scheduled callback for unknown job '%s'", job_name) return await self._execute_job(job) async def _execute_job(self, job: dict) -> str: """Execute a job: dispatch by kind, update state, forward output.""" name = job["name"] job["last_run"] = datetime.now(timezone.utc).isoformat() kind = job.get("kind", "claude") if kind == "shell": result_text = await self._execute_shell_job(job) else: result_text = await self._execute_claude_job(job) # Update next_run from APScheduler self._update_next_run(job) # Save state self._save_jobs() # Send output via callback if we have something to send if result_text and "HEARTBEAT_OK" not in result_text and self._send_callback: try: await self._send_callback(job["channel"], result_text) except Exception as exc: logger.error("Job '%s' send_callback failed: %s", name, exc) elif not result_text: logger.debug("Job '%s' produced no output, skipping send", name) return result_text or "" async def _execute_claude_job(self, job: dict) -> str: """Run a Claude CLI job and return the response text (or error marker).""" name = job["name"] # Build CLI command cmd = [ CLAUDE_BIN, "-p", job["prompt"], "--model", job["model"], "--output-format", "json", "--dangerously-skip-permissions", ] try: system_prompt = build_system_prompt() cmd += ["--system-prompt", system_prompt] except FileNotFoundError: pass if job.get("allowed_tools"): cmd += ["--allowedTools"] + job["allowed_tools"] result_text = "" try: proc = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=JOB_TIMEOUT, env=_safe_env(), cwd=PROJECT_ROOT, ) if proc.returncode != 0: error_msg = proc.stderr[:500] if proc.stderr else "unknown error" raise RuntimeError( f"Claude CLI error (exit {proc.returncode}): {error_msg}" ) data = json.loads(proc.stdout) result_text = data.get("result", "") job["last_status"] = "ok" logger.info("Job '%s' completed successfully", name) except subprocess.TimeoutExpired: job["last_status"] = "error" result_text = f"[cron:{name}] Error: timed out after {JOB_TIMEOUT}s" logger.error("Job '%s' timed out", name) except (RuntimeError, json.JSONDecodeError) as exc: job["last_status"] = "error" result_text = f"[cron:{name}] Error: {exc}" logger.error("Job '%s' failed: %s", name, exc) except Exception as exc: job["last_status"] = "error" result_text = f"[cron:{name}] Error: {exc}" logger.error("Job '%s' unexpected error: %s", name, exc) return result_text async def _execute_shell_job(self, job: dict) -> str: """Run a shell command job, honour report_on policy, return text to forward. Exit != 0 always reports stderr (trimmed). Exit == 0 obeys report_on: 'always' forwards stdout, 'never' stays silent, 'changes' parses the GSTACK-CRON marker ('changes=N') and forwards stdout only if N>0. Missing/malformed marker is logged and treated as 'no changes'. """ name = job["name"] cmd = list(job["command"]) timeout = job.get("timeout") if not isinstance(timeout, int) or timeout <= 0: timeout = JOB_TIMEOUT report_on = job.get("report_on", "changes") try: proc = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=timeout, cwd=PROJECT_ROOT, env=_safe_env(), ) except subprocess.TimeoutExpired: job["last_status"] = "error" logger.error("Shell job '%s' timed out after %ss", name, timeout) return f"[cron:{name}] Error: timed out after {timeout}s" except Exception as exc: job["last_status"] = "error" logger.error("Shell job '%s' failed to launch: %s", name, exc) return f"[cron:{name}] Error: {exc}" if proc.returncode != 0: job["last_status"] = "error" stderr_trim = (proc.stderr or "").strip()[:_MAX_STDERR_REPORT] logger.error( "Shell job '%s' exit %d: %s", name, proc.returncode, stderr_trim, ) return f"[cron:{name}] exit {proc.returncode}: {stderr_trim}" job["last_status"] = "ok" stdout = proc.stdout or "" if report_on == "never": logger.info("Shell job '%s' ok (report_on=never, silent)", name) return "" if report_on == "always": logger.info("Shell job '%s' ok (report_on=always)", name) return stdout[:_MAX_SHELL_OUTPUT] # report_on == "changes" match = _MARKER_RE.search(stdout) if not match: logger.warning( "Shell job '%s' missing GSTACK-CRON marker " "(report_on=changes, staying silent)", name, ) return "" try: n = int(match.group(1)) except ValueError: logger.warning( "Shell job '%s' GSTACK-CRON marker has non-int payload", name, ) return "" if n <= 0: logger.info("Shell job '%s' ok (0 changes, silent)", name) return "" logger.info("Shell job '%s' ok (%d changes, forwarding)", name, n) return stdout[:_MAX_SHELL_OUTPUT] def _update_next_run(self, job: dict) -> None: """Update job's next_run from APScheduler.""" try: aps_job = self._scheduler.get_job(job["name"]) if aps_job and aps_job.next_run_time: job["next_run"] = aps_job.next_run_time.isoformat() else: job["next_run"] = None except Exception: job["next_run"] = None