From e747491b858de711af507fe4416ce5b869b3e818 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Tue, 21 Apr 2026 07:05:19 +0000 Subject: [PATCH] feat(scheduler): add kind:"shell" jobs with Bucharest tz and GSTACK-CRON marker - AsyncIOScheduler now runs in Europe/Bucharest so cron strings in jobs.json match local wall-clock time. - New add_shell_job() validates name, cron, command list, channel, report_on (always|changes|never), and optional timeout (1..3600s). Existing add_job() stays untouched for the Claude path. - _execute_job dispatches on job['kind'] (default 'claude'); legacy jobs without the field still route to the Claude executor. Refactored the Claude path into _execute_claude_job; new _execute_shell_job runs subprocess with _safe_env + PROJECT_ROOT cwd. - Shell semantics: non-zero exit always forwards stderr (trimmed to 500 ch) as '[cron:NAME] exit CODE: STDERR' regardless of report_on. On exit 0, 'always' forwards stdout (trimmed to 1500 ch), 'never' stays silent, and 'changes' parses the GSTACK-CRON marker (^GSTACK-CRON: changes=\d+$) and forwards stdout only when N>0; missing/malformed marker logs a warning and stays silent. - Timeout honoured per-job (falls back to JOB_TIMEOUT=300s). --- src/scheduler.py | 218 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 200 insertions(+), 18 deletions(-) diff --git a/src/scheduler.py b/src/scheduler.py index 9c03bbd..a4b1c09 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -15,6 +15,7 @@ import tempfile from datetime import datetime, timezone from pathlib import Path from typing import Awaitable, Callable +from zoneinfo import ZoneInfo from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger @@ -39,6 +40,12 @@ JOB_TIMEOUT = 300 # 5-minute default per job execution _NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$") _MAX_PROMPT_LEN = 10_000 +_MAX_SHELL_OUTPUT = 1500 # chars of stdout to forward to channel +_MAX_STDERR_REPORT = 500 # chars of stderr on non-zero exit +_VALID_REPORT_ON = {"always", "changes", "never"} +_VALID_KINDS = {"claude", "shell"} +_SCHEDULER_TZ = ZoneInfo("Europe/Bucharest") +_MARKER_RE = re.compile(r"^GSTACK-CRON:\s+changes=(\d+)\s*$", re.MULTILINE) # --------------------------------------------------------------------------- # Scheduler class @@ -55,7 +62,7 @@ class Scheduler: ) -> None: self._send_callback = send_callback self._config = config - self._scheduler = AsyncIOScheduler() + self._scheduler = AsyncIOScheduler(timezone=_SCHEDULER_TZ) self._jobs: list[dict] = [] # ------------------------------------------------------------------ @@ -138,6 +145,94 @@ class Scheduler: logger.info("Added job '%s' (cron: %s, channel: %s)", name, cron, channel) return job + def add_shell_job( + self, + name: str, + cron: str, + channel: str, + command: list[str], + report_on: str = "changes", + timeout: int | None = None, + ) -> dict: + """Validate, add a shell job to list, save, and schedule. + + Shell jobs execute an arbitrary command via subprocess. Reporting policy: + - report_on="always": always forward stdout (truncated) on exit 0 + - report_on="never": never forward stdout on exit 0 + - report_on="changes": parse GSTACK-CRON marker ('changes=N') and + forward stdout only if N>0 + Non-zero exit status ALWAYS reports stderr regardless of report_on. + """ + # Validate name + if not _NAME_RE.match(name): + raise ValueError( + f"Invalid job name '{name}'. Must match: lowercase alphanumeric " + "and hyphens, 1-63 chars, starting with alphanumeric." + ) + + # Duplicate check (across claude+shell) + if any(j["name"] == name for j in self._jobs): + raise ValueError(f"Job '{name}' already exists") + + # Validate cron expression + try: + CronTrigger.from_crontab(cron, timezone=_SCHEDULER_TZ) + except (ValueError, KeyError) as exc: + raise ValueError(f"Invalid cron expression '{cron}': {exc}") + + # Validate channel + if not isinstance(channel, str) or not channel.strip(): + raise ValueError("Channel must be a non-empty string") + + # Validate command + if ( + not isinstance(command, list) + or not command + or not all(isinstance(c, str) and c.strip() for c in command) + ): + raise ValueError( + "Command must be a non-empty list of non-empty strings" + ) + + # Validate report_on + if report_on not in _VALID_REPORT_ON: + raise ValueError( + f"Invalid report_on '{report_on}'. Must be one of: " + f"{', '.join(sorted(_VALID_REPORT_ON))}" + ) + + # Validate timeout + if timeout is not None: + if not isinstance(timeout, int) or isinstance(timeout, bool): + raise ValueError("Timeout must be an int (seconds)") + if timeout < 1 or timeout > 3600: + raise ValueError("Timeout must be between 1 and 3600 seconds") + + job = { + "name": name, + "kind": "shell", + "cron": cron, + "channel": channel, + "command": list(command), + "report_on": report_on, + "timeout": timeout, + "enabled": True, + "last_run": None, + "last_status": None, + "next_run": None, + } + + self._jobs.append(job) + self._schedule_job(job) + self._update_next_run(job) + self._save_jobs() + + logger.info( + "Added shell job '%s' (cron: %s, channel: %s, report_on: %s)", + name, cron, channel, report_on, + ) + return job + def remove_job(self, name: str) -> bool: """Remove job from list and APScheduler. Returns True if found.""" for i, job in enumerate(self._jobs): @@ -263,10 +358,36 @@ class Scheduler: await self._execute_job(job) async def _execute_job(self, job: dict) -> str: - """Execute a job: run Claude CLI, update state, send output.""" + """Execute a job: dispatch by kind, update state, forward output.""" name = job["name"] job["last_run"] = datetime.now(timezone.utc).isoformat() + kind = job.get("kind", "claude") + if kind == "shell": + result_text = await self._execute_shell_job(job) + else: + result_text = await self._execute_claude_job(job) + + # Update next_run from APScheduler + self._update_next_run(job) + # Save state + self._save_jobs() + + # Send output via callback if we have something to send + if result_text and self._send_callback: + try: + await self._send_callback(job["channel"], result_text) + except Exception as exc: + logger.error("Job '%s' send_callback failed: %s", name, exc) + elif not result_text: + logger.debug("Job '%s' produced no output, skipping send", name) + + return result_text or "" + + async def _execute_claude_job(self, job: dict) -> str: + """Run a Claude CLI job and return the response text (or error marker).""" + name = job["name"] + # Build CLI command cmd = [ CLAUDE_BIN, "-p", job["prompt"], @@ -283,7 +404,6 @@ class Scheduler: if job.get("allowed_tools"): cmd += ["--allowedTools"] + job["allowed_tools"] - # Run in thread to not block event loop result_text = "" try: proc = await asyncio.to_thread( @@ -322,23 +442,85 @@ class Scheduler: result_text = f"[cron:{name}] Error: {exc}" logger.error("Job '%s' unexpected error: %s", name, exc) - # Update next_run from APScheduler - self._update_next_run(job) - - # Save state - self._save_jobs() - - # Send output via callback - if not result_text: - logger.warning("Job '%s' produced empty result, skipping send", name) - elif self._send_callback: - try: - await self._send_callback(job["channel"], result_text) - except Exception as exc: - logger.error("Job '%s' send_callback failed: %s", name, exc) - return result_text + async def _execute_shell_job(self, job: dict) -> str: + """Run a shell command job, honour report_on policy, return text to forward. + + Exit != 0 always reports stderr (trimmed). Exit == 0 obeys report_on: + 'always' forwards stdout, 'never' stays silent, 'changes' parses the + GSTACK-CRON marker ('changes=N') and forwards stdout only if N>0. + Missing/malformed marker is logged and treated as 'no changes'. + """ + name = job["name"] + cmd = list(job["command"]) + timeout = job.get("timeout") + if not isinstance(timeout, int) or timeout <= 0: + timeout = JOB_TIMEOUT + report_on = job.get("report_on", "changes") + + try: + proc = await asyncio.to_thread( + subprocess.run, + cmd, + capture_output=True, + text=True, + timeout=timeout, + cwd=PROJECT_ROOT, + env=_safe_env(), + ) + except subprocess.TimeoutExpired: + job["last_status"] = "error" + logger.error("Shell job '%s' timed out after %ss", name, timeout) + return f"[cron:{name}] Error: timed out after {timeout}s" + except Exception as exc: + job["last_status"] = "error" + logger.error("Shell job '%s' failed to launch: %s", name, exc) + return f"[cron:{name}] Error: {exc}" + + if proc.returncode != 0: + job["last_status"] = "error" + stderr_trim = (proc.stderr or "").strip()[:_MAX_STDERR_REPORT] + logger.error( + "Shell job '%s' exit %d: %s", name, proc.returncode, stderr_trim, + ) + return f"[cron:{name}] exit {proc.returncode}: {stderr_trim}" + + job["last_status"] = "ok" + stdout = proc.stdout or "" + + if report_on == "never": + logger.info("Shell job '%s' ok (report_on=never, silent)", name) + return "" + + if report_on == "always": + logger.info("Shell job '%s' ok (report_on=always)", name) + return stdout[:_MAX_SHELL_OUTPUT] + + # report_on == "changes" + match = _MARKER_RE.search(stdout) + if not match: + logger.warning( + "Shell job '%s' missing GSTACK-CRON marker " + "(report_on=changes, staying silent)", + name, + ) + return "" + try: + n = int(match.group(1)) + except ValueError: + logger.warning( + "Shell job '%s' GSTACK-CRON marker has non-int payload", name, + ) + return "" + + if n <= 0: + logger.info("Shell job '%s' ok (0 changes, silent)", name) + return "" + + logger.info("Shell job '%s' ok (%d changes, forwarding)", name, n) + return stdout[:_MAX_SHELL_OUTPUT] + def _update_next_run(self, job: dict) -> None: """Update job's next_run from APScheduler.""" try: