feat(scheduler): add kind:"shell" jobs with Bucharest tz and GSTACK-CRON marker

- AsyncIOScheduler now runs in Europe/Bucharest so cron strings in jobs.json
  match local wall-clock time.
- New add_shell_job() validates name, cron, command list, channel, report_on
  (always|changes|never), and optional timeout (1..3600s). Existing add_job()
  stays untouched for the Claude path.
- _execute_job dispatches on job['kind'] (default 'claude'); legacy jobs
  without the field still route to the Claude executor. Refactored the
  Claude path into _execute_claude_job; new _execute_shell_job runs
  subprocess with _safe_env + PROJECT_ROOT cwd.
- Shell semantics: non-zero exit always forwards stderr (trimmed to 500 ch)
  as '[cron:NAME] exit CODE: STDERR' regardless of report_on. On exit 0,
  'always' forwards stdout (trimmed to 1500 ch), 'never' stays silent, and
  'changes' parses the GSTACK-CRON marker (^GSTACK-CRON: changes=\d+$) and
  forwards stdout only when N>0; missing/malformed marker logs a warning
  and stays silent.
- Timeout honoured per-job (falls back to JOB_TIMEOUT=300s).
This commit is contained in:
2026-04-21 07:05:19 +00:00
parent ca9167d129
commit e747491b85

View File

@@ -15,6 +15,7 @@ import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Awaitable, Callable
from zoneinfo import ZoneInfo
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
@@ -39,6 +40,12 @@ JOB_TIMEOUT = 300 # 5-minute default per job execution
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
_MAX_PROMPT_LEN = 10_000
_MAX_SHELL_OUTPUT = 1500 # chars of stdout to forward to channel
_MAX_STDERR_REPORT = 500 # chars of stderr on non-zero exit
_VALID_REPORT_ON = {"always", "changes", "never"}
_VALID_KINDS = {"claude", "shell"}
_SCHEDULER_TZ = ZoneInfo("Europe/Bucharest")
_MARKER_RE = re.compile(r"^GSTACK-CRON:\s+changes=(\d+)\s*$", re.MULTILINE)
# ---------------------------------------------------------------------------
# Scheduler class
@@ -55,7 +62,7 @@ class Scheduler:
) -> None:
self._send_callback = send_callback
self._config = config
self._scheduler = AsyncIOScheduler()
self._scheduler = AsyncIOScheduler(timezone=_SCHEDULER_TZ)
self._jobs: list[dict] = []
# ------------------------------------------------------------------
@@ -138,6 +145,94 @@ class Scheduler:
logger.info("Added job '%s' (cron: %s, channel: %s)", name, cron, channel)
return job
def add_shell_job(
self,
name: str,
cron: str,
channel: str,
command: list[str],
report_on: str = "changes",
timeout: int | None = None,
) -> dict:
"""Validate, add a shell job to list, save, and schedule.
Shell jobs execute an arbitrary command via subprocess. Reporting policy:
- report_on="always": always forward stdout (truncated) on exit 0
- report_on="never": never forward stdout on exit 0
- report_on="changes": parse GSTACK-CRON marker ('changes=N') and
forward stdout only if N>0
Non-zero exit status ALWAYS reports stderr regardless of report_on.
"""
# Validate name
if not _NAME_RE.match(name):
raise ValueError(
f"Invalid job name '{name}'. Must match: lowercase alphanumeric "
"and hyphens, 1-63 chars, starting with alphanumeric."
)
# Duplicate check (across claude+shell)
if any(j["name"] == name for j in self._jobs):
raise ValueError(f"Job '{name}' already exists")
# Validate cron expression
try:
CronTrigger.from_crontab(cron, timezone=_SCHEDULER_TZ)
except (ValueError, KeyError) as exc:
raise ValueError(f"Invalid cron expression '{cron}': {exc}")
# Validate channel
if not isinstance(channel, str) or not channel.strip():
raise ValueError("Channel must be a non-empty string")
# Validate command
if (
not isinstance(command, list)
or not command
or not all(isinstance(c, str) and c.strip() for c in command)
):
raise ValueError(
"Command must be a non-empty list of non-empty strings"
)
# Validate report_on
if report_on not in _VALID_REPORT_ON:
raise ValueError(
f"Invalid report_on '{report_on}'. Must be one of: "
f"{', '.join(sorted(_VALID_REPORT_ON))}"
)
# Validate timeout
if timeout is not None:
if not isinstance(timeout, int) or isinstance(timeout, bool):
raise ValueError("Timeout must be an int (seconds)")
if timeout < 1 or timeout > 3600:
raise ValueError("Timeout must be between 1 and 3600 seconds")
job = {
"name": name,
"kind": "shell",
"cron": cron,
"channel": channel,
"command": list(command),
"report_on": report_on,
"timeout": timeout,
"enabled": True,
"last_run": None,
"last_status": None,
"next_run": None,
}
self._jobs.append(job)
self._schedule_job(job)
self._update_next_run(job)
self._save_jobs()
logger.info(
"Added shell job '%s' (cron: %s, channel: %s, report_on: %s)",
name, cron, channel, report_on,
)
return job
def remove_job(self, name: str) -> bool:
"""Remove job from list and APScheduler. Returns True if found."""
for i, job in enumerate(self._jobs):
@@ -263,10 +358,36 @@ class Scheduler:
await self._execute_job(job)
async def _execute_job(self, job: dict) -> str:
"""Execute a job: run Claude CLI, update state, send output."""
"""Execute a job: dispatch by kind, update state, forward output."""
name = job["name"]
job["last_run"] = datetime.now(timezone.utc).isoformat()
kind = job.get("kind", "claude")
if kind == "shell":
result_text = await self._execute_shell_job(job)
else:
result_text = await self._execute_claude_job(job)
# Update next_run from APScheduler
self._update_next_run(job)
# Save state
self._save_jobs()
# Send output via callback if we have something to send
if result_text and self._send_callback:
try:
await self._send_callback(job["channel"], result_text)
except Exception as exc:
logger.error("Job '%s' send_callback failed: %s", name, exc)
elif not result_text:
logger.debug("Job '%s' produced no output, skipping send", name)
return result_text or ""
async def _execute_claude_job(self, job: dict) -> str:
"""Run a Claude CLI job and return the response text (or error marker)."""
name = job["name"]
# Build CLI command
cmd = [
CLAUDE_BIN, "-p", job["prompt"],
@@ -283,7 +404,6 @@ class Scheduler:
if job.get("allowed_tools"):
cmd += ["--allowedTools"] + job["allowed_tools"]
# Run in thread to not block event loop
result_text = ""
try:
proc = await asyncio.to_thread(
@@ -322,23 +442,85 @@ class Scheduler:
result_text = f"[cron:{name}] Error: {exc}"
logger.error("Job '%s' unexpected error: %s", name, exc)
# Update next_run from APScheduler
self._update_next_run(job)
# Save state
self._save_jobs()
# Send output via callback
if not result_text:
logger.warning("Job '%s' produced empty result, skipping send", name)
elif self._send_callback:
try:
await self._send_callback(job["channel"], result_text)
except Exception as exc:
logger.error("Job '%s' send_callback failed: %s", name, exc)
return result_text
async def _execute_shell_job(self, job: dict) -> str:
"""Run a shell command job, honour report_on policy, return text to forward.
Exit != 0 always reports stderr (trimmed). Exit == 0 obeys report_on:
'always' forwards stdout, 'never' stays silent, 'changes' parses the
GSTACK-CRON marker ('changes=N') and forwards stdout only if N>0.
Missing/malformed marker is logged and treated as 'no changes'.
"""
name = job["name"]
cmd = list(job["command"])
timeout = job.get("timeout")
if not isinstance(timeout, int) or timeout <= 0:
timeout = JOB_TIMEOUT
report_on = job.get("report_on", "changes")
try:
proc = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=timeout,
cwd=PROJECT_ROOT,
env=_safe_env(),
)
except subprocess.TimeoutExpired:
job["last_status"] = "error"
logger.error("Shell job '%s' timed out after %ss", name, timeout)
return f"[cron:{name}] Error: timed out after {timeout}s"
except Exception as exc:
job["last_status"] = "error"
logger.error("Shell job '%s' failed to launch: %s", name, exc)
return f"[cron:{name}] Error: {exc}"
if proc.returncode != 0:
job["last_status"] = "error"
stderr_trim = (proc.stderr or "").strip()[:_MAX_STDERR_REPORT]
logger.error(
"Shell job '%s' exit %d: %s", name, proc.returncode, stderr_trim,
)
return f"[cron:{name}] exit {proc.returncode}: {stderr_trim}"
job["last_status"] = "ok"
stdout = proc.stdout or ""
if report_on == "never":
logger.info("Shell job '%s' ok (report_on=never, silent)", name)
return ""
if report_on == "always":
logger.info("Shell job '%s' ok (report_on=always)", name)
return stdout[:_MAX_SHELL_OUTPUT]
# report_on == "changes"
match = _MARKER_RE.search(stdout)
if not match:
logger.warning(
"Shell job '%s' missing GSTACK-CRON marker "
"(report_on=changes, staying silent)",
name,
)
return ""
try:
n = int(match.group(1))
except ValueError:
logger.warning(
"Shell job '%s' GSTACK-CRON marker has non-int payload", name,
)
return ""
if n <= 0:
logger.info("Shell job '%s' ok (0 changes, silent)", name)
return ""
logger.info("Shell job '%s' ok (%d changes, forwarding)", name, n)
return stdout[:_MAX_SHELL_OUTPUT]
def _update_next_run(self, job: dict) -> None:
"""Update job's next_run from APScheduler."""
try: