stage-8: cron scheduler with APScheduler

Scheduler class, cron/jobs.json, Discord /cron commands, CLI cron subcommand, job lifecycle management. 88 new tests (281 total).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MoltBot Service
2026-02-13 16:12:56 +00:00
parent 09d3de003a
commit 24a4d87f8c
8 changed files with 1640 additions and 1 deletions

View File

@@ -52,6 +52,15 @@ def is_registered_channel(channel_id: str) -> bool:
return any(ch.get("id") == channel_id for ch in channels.values())
def _channel_alias_for_id(channel_id: str) -> str | None:
"""Resolve a Discord channel ID to its config alias."""
channels = _get_config().get("channels", {})
for alias, info in channels.items():
if info.get("id") == channel_id:
return alias
return None
# --- Message splitting helper ---
@@ -114,6 +123,14 @@ def create_bot(config: Config) -> discord.Client:
"`/model <choice>` — Change model for this channel's session",
"`/logs [n]` — Show last N log lines (default 10)",
"`/restart` — Restart the bot process (owner only)",
"",
"**Cron Jobs**",
"`/cron list` — List all scheduled jobs",
"`/cron run <name>` — Force-run a job now",
"`/cron add <name> <expr> [model]` — Create a scheduled job (admin)",
"`/cron remove <name>` — Remove a job (admin)",
"`/cron enable <name>` — Enable a job (admin)",
"`/cron disable <name>` — Disable a job (admin)",
]
await interaction.response.send_message(
"\n".join(lines), ephemeral=True
@@ -182,6 +199,207 @@ def create_bot(config: Config) -> discord.Client:
tree.add_command(admin_group)
# --- Cron commands ---
cron_group = app_commands.Group(
name="cron", description="Manage scheduled jobs"
)
@cron_group.command(name="list", description="List all scheduled jobs")
async def cron_list(interaction: discord.Interaction) -> None:
scheduler = getattr(client, "scheduler", None)
if scheduler is None:
await interaction.response.send_message(
"Scheduler not available.", ephemeral=True
)
return
jobs = scheduler.list_jobs()
if not jobs:
await interaction.response.send_message(
"No scheduled jobs.", ephemeral=True
)
return
lines = [
f"{'Name':<24} {'Cron':<14} {'Channel':<10} {'Model':<8} {'On':<5} {'Status':<8} {'Next Run'}"
]
for j in jobs:
enabled = "yes" if j.get("enabled") else "no"
last_status = j.get("last_status") or "\u2014"
next_run = j.get("next_run") or "\u2014"
if next_run != "\u2014" and len(next_run) > 19:
next_run = next_run[:19]
lines.append(
f"{j['name']:<24} {j['cron']:<14} {j['channel']:<10} {j['model']:<8} {enabled:<5} {last_status:<8} {next_run}"
)
table = "```\n" + "\n".join(lines) + "\n```"
await interaction.response.send_message(table, ephemeral=True)
@cron_group.command(name="run", description="Force-run a scheduled job")
@app_commands.describe(name="Job name to run")
async def cron_run(interaction: discord.Interaction, name: str) -> None:
scheduler = getattr(client, "scheduler", None)
if scheduler is None:
await interaction.response.send_message(
"Scheduler not available.", ephemeral=True
)
return
await interaction.response.defer()
try:
result = await scheduler.run_job(name)
truncated = result[:1900] if len(result) > 1900 else result
await interaction.followup.send(truncated)
except KeyError:
await interaction.followup.send(f"Job '{name}' not found.")
except Exception as e:
await interaction.followup.send(f"Error running job: {e}")
@cron_group.command(name="add", description="Create a new scheduled job")
@app_commands.describe(
name="Job name (lowercase, hyphens allowed)",
expression="Cron expression (e.g. '30 6 * * *')",
model="AI model to use (default: sonnet)",
)
@app_commands.choices(model=[
app_commands.Choice(name="opus", value="opus"),
app_commands.Choice(name="sonnet", value="sonnet"),
app_commands.Choice(name="haiku", value="haiku"),
])
async def cron_add(
interaction: discord.Interaction,
name: str,
expression: str,
model: app_commands.Choice[str] | None = None,
) -> None:
if not is_admin(str(interaction.user.id)):
await interaction.response.send_message(
"Admin only.", ephemeral=True
)
return
scheduler = getattr(client, "scheduler", None)
if scheduler is None:
await interaction.response.send_message(
"Scheduler not available.", ephemeral=True
)
return
channel_alias = _channel_alias_for_id(str(interaction.channel_id))
if channel_alias is None:
await interaction.response.send_message(
"This channel is not registered. Use `/channel add` first.",
ephemeral=True,
)
return
model_value = model.value if model else "sonnet"
await interaction.response.send_message(
f"Creating job **{name}** (`{expression}`, model: {model_value}, channel: {channel_alias}).\n"
"Send your prompt text as the next message in this channel.",
)
def check(m: discord.Message) -> bool:
return (
m.author == interaction.user
and m.channel.id == interaction.channel_id
)
try:
prompt_msg = await client.wait_for(
"message", check=check, timeout=120
)
except asyncio.TimeoutError:
await interaction.followup.send("Timed out waiting for prompt.")
return
try:
job = scheduler.add_job(
name=name,
cron=expression,
channel=channel_alias,
prompt=prompt_msg.content,
model=model_value,
)
next_run = job.get("next_run") or "\u2014"
await interaction.channel.send(
f"Job **{name}** created.\n"
f"Cron: `{expression}` | Channel: {channel_alias} | Model: {model_value}\n"
f"Next run: {next_run}"
)
except ValueError as e:
await interaction.channel.send(f"Error creating job: {e}")
@cron_group.command(name="remove", description="Remove a scheduled job")
@app_commands.describe(name="Job name to remove")
async def cron_remove(interaction: discord.Interaction, name: str) -> None:
if not is_admin(str(interaction.user.id)):
await interaction.response.send_message(
"Admin only.", ephemeral=True
)
return
scheduler = getattr(client, "scheduler", None)
if scheduler is None:
await interaction.response.send_message(
"Scheduler not available.", ephemeral=True
)
return
if scheduler.remove_job(name):
await interaction.response.send_message(
f"Job '{name}' removed.", ephemeral=True
)
else:
await interaction.response.send_message(
f"Job '{name}' not found.", ephemeral=True
)
@cron_group.command(name="enable", description="Enable a scheduled job")
@app_commands.describe(name="Job name to enable")
async def cron_enable(
interaction: discord.Interaction, name: str
) -> None:
if not is_admin(str(interaction.user.id)):
await interaction.response.send_message(
"Admin only.", ephemeral=True
)
return
scheduler = getattr(client, "scheduler", None)
if scheduler is None:
await interaction.response.send_message(
"Scheduler not available.", ephemeral=True
)
return
if scheduler.enable_job(name):
await interaction.response.send_message(
f"Job '{name}' enabled.", ephemeral=True
)
else:
await interaction.response.send_message(
f"Job '{name}' not found.", ephemeral=True
)
@cron_group.command(name="disable", description="Disable a scheduled job")
@app_commands.describe(name="Job name to disable")
async def cron_disable(
interaction: discord.Interaction, name: str
) -> None:
if not is_admin(str(interaction.user.id)):
await interaction.response.send_message(
"Admin only.", ephemeral=True
)
return
scheduler = getattr(client, "scheduler", None)
if scheduler is None:
await interaction.response.send_message(
"Scheduler not available.", ephemeral=True
)
return
if scheduler.disable_job(name):
await interaction.response.send_message(
f"Job '{name}' disabled.", ephemeral=True
)
else:
await interaction.response.send_message(
f"Job '{name}' not found.", ephemeral=True
)
tree.add_command(cron_group)
@tree.command(name="channels", description="List registered channels")
async def channels(interaction: discord.Interaction) -> None:
ch_map = config.get("channels", {})
@@ -340,6 +558,9 @@ def create_bot(config: Config) -> discord.Client:
@client.event
async def on_ready() -> None:
await tree.sync()
scheduler = getattr(client, "scheduler", None)
if scheduler is not None:
await scheduler.start()
logger.info("Echo Core online as %s", client.user)
async def _handle_chat(message: discord.Message) -> None:

View File

@@ -9,7 +9,8 @@ from pathlib import Path
from src.config import load_config
from src.secrets import get_secret
from src.adapters.discord_bot import create_bot
from src.adapters.discord_bot import create_bot, split_message
from src.scheduler import Scheduler
PROJECT_ROOT = Path(__file__).resolve().parent.parent
PID_FILE = PROJECT_ROOT / "echo-core.pid"
@@ -43,6 +44,26 @@ def main():
config = load_config()
client = create_bot(config)
# Scheduler setup
async def _send_to_channel(channel_alias: str, text: str) -> None:
"""Callback: resolve alias and send text to Discord channel."""
channels = config.get("channels", {})
ch_info = channels.get(channel_alias)
if not ch_info:
logger.warning("Cron: unknown channel alias '%s'", channel_alias)
return
channel_id = ch_info.get("id")
channel = client.get_channel(int(channel_id))
if channel is None:
logger.warning("Cron: channel %s not found in Discord cache", channel_id)
return
chunks = split_message(text)
for chunk in chunks:
await channel.send(chunk)
scheduler = Scheduler(send_callback=_send_to_channel, config=config)
client.scheduler = scheduler # type: ignore[attr-defined]
# PID file
PID_FILE.write_text(str(os.getpid()))
@@ -51,6 +72,7 @@ def main():
def handle_signal(sig, frame):
logger.info("Received signal %s, shutting down...", sig)
loop.create_task(scheduler.stop())
loop.create_task(client.close())
signal.signal(signal.SIGTERM, handle_signal)
@@ -59,6 +81,7 @@ def main():
try:
loop.run_until_complete(client.start(token))
except KeyboardInterrupt:
loop.run_until_complete(scheduler.stop())
loop.run_until_complete(client.close())
finally:
PID_FILE.unlink(missing_ok=True)

349
src/scheduler.py Normal file
View File

@@ -0,0 +1,349 @@
"""
Cron-like job scheduler for Echo-Core.
Wraps APScheduler AsyncIOScheduler to run Claude CLI prompts on a schedule,
sending output to designated Discord channels.
"""
import asyncio
import json
import logging
import os
import re
import subprocess
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Awaitable, Callable
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from src.claude_session import (
CLAUDE_BIN,
PROJECT_ROOT,
VALID_MODELS,
_safe_env,
build_system_prompt,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
JOBS_DIR = PROJECT_ROOT / "cron"
JOBS_FILE = JOBS_DIR / "jobs.json"
JOB_TIMEOUT = 300 # 5-minute default per job execution
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
_MAX_PROMPT_LEN = 10_000
# ---------------------------------------------------------------------------
# Scheduler class
# ---------------------------------------------------------------------------
class Scheduler:
"""Wraps APScheduler AsyncIOScheduler for Echo Core cron jobs."""
def __init__(
self,
send_callback: Callable[[str, str], Awaitable[None]] | None = None,
config=None,
) -> None:
self._send_callback = send_callback
self._config = config
self._scheduler = AsyncIOScheduler()
self._jobs: list[dict] = []
# ------------------------------------------------------------------
# Public methods
# ------------------------------------------------------------------
async def start(self) -> None:
"""Load jobs from jobs.json, schedule enabled ones, start scheduler."""
self._jobs = self._load_jobs()
for job in self._jobs:
if job.get("enabled", False):
self._schedule_job(job)
self._scheduler.start()
logger.info("Scheduler started with %d jobs (%d enabled)",
len(self._jobs),
sum(1 for j in self._jobs if j.get("enabled")))
async def stop(self) -> None:
"""Shut down APScheduler gracefully."""
self._scheduler.shutdown(wait=False)
logger.info("Scheduler stopped")
def add_job(
self,
name: str,
cron: str,
channel: str,
prompt: str,
model: str = "sonnet",
allowed_tools: list[str] | None = None,
) -> dict:
"""Validate, add job to list, save, and schedule. Returns new job dict."""
# Validate name
if not _NAME_RE.match(name):
raise ValueError(
f"Invalid job name '{name}'. Must match: lowercase alphanumeric "
"and hyphens, 1-63 chars, starting with alphanumeric."
)
# Duplicate check
if any(j["name"] == name for j in self._jobs):
raise ValueError(f"Job '{name}' already exists")
# Validate cron expression
try:
CronTrigger.from_crontab(cron)
except (ValueError, KeyError) as exc:
raise ValueError(f"Invalid cron expression '{cron}': {exc}")
# Validate model
if model not in VALID_MODELS:
raise ValueError(
f"Invalid model '{model}'. Must be one of: {', '.join(sorted(VALID_MODELS))}"
)
# Validate prompt
if not prompt or not prompt.strip():
raise ValueError("Prompt must be non-empty")
if len(prompt) > _MAX_PROMPT_LEN:
raise ValueError(f"Prompt too long ({len(prompt)} chars, max {_MAX_PROMPT_LEN})")
job = {
"name": name,
"cron": cron,
"channel": channel,
"model": model,
"prompt": prompt,
"allowed_tools": allowed_tools or [],
"enabled": True,
"last_run": None,
"last_status": None,
"next_run": None,
}
self._jobs.append(job)
self._schedule_job(job)
self._update_next_run(job)
self._save_jobs()
logger.info("Added job '%s' (cron: %s, channel: %s)", name, cron, channel)
return job
def remove_job(self, name: str) -> bool:
"""Remove job from list and APScheduler. Returns True if found."""
for i, job in enumerate(self._jobs):
if job["name"] == name:
self._jobs.pop(i)
try:
self._scheduler.remove_job(name)
except Exception:
pass
self._save_jobs()
logger.info("Removed job '%s'", name)
return True
return False
def enable_job(self, name: str) -> bool:
"""Enable job and schedule in APScheduler. Returns True if found."""
for job in self._jobs:
if job["name"] == name:
job["enabled"] = True
self._schedule_job(job)
self._update_next_run(job)
self._save_jobs()
logger.info("Enabled job '%s'", name)
return True
return False
def disable_job(self, name: str) -> bool:
"""Disable job and remove from APScheduler. Returns True if found."""
for job in self._jobs:
if job["name"] == name:
job["enabled"] = False
job["next_run"] = None
try:
self._scheduler.remove_job(name)
except Exception:
pass
self._save_jobs()
logger.info("Disabled job '%s'", name)
return True
return False
async def run_job(self, name: str) -> str:
"""Force-execute a job immediately. Returns Claude response text."""
job = self._find_job(name)
if job is None:
raise KeyError(f"Job '{name}' not found")
return await self._execute_job(job)
def list_jobs(self) -> list[dict]:
"""Return a copy of all jobs with current state."""
return [dict(j) for j in self._jobs]
# ------------------------------------------------------------------
# Internal methods
# ------------------------------------------------------------------
def _find_job(self, name: str) -> dict | None:
"""Find a job by name."""
for job in self._jobs:
if job["name"] == name:
return job
return None
def _load_jobs(self) -> list[dict]:
"""Read and parse jobs.json. Returns [] if missing or corrupt."""
try:
text = JOBS_FILE.read_text(encoding="utf-8")
if not text.strip():
return []
data = json.loads(text)
if not isinstance(data, list):
logger.error("jobs.json is not a list, treating as empty")
return []
return data
except FileNotFoundError:
return []
except json.JSONDecodeError as exc:
logger.error("jobs.json corrupt (%s), treating as empty", exc)
return []
def _save_jobs(self) -> None:
"""Atomically write current jobs list to jobs.json."""
JOBS_DIR.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
dir=JOBS_DIR, prefix=".jobs_", suffix=".json"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(self._jobs, f, indent=2, ensure_ascii=False)
f.write("\n")
os.replace(tmp_path, JOBS_FILE)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def _schedule_job(self, job: dict) -> None:
"""Add a single job to APScheduler."""
# Remove existing schedule if any
try:
self._scheduler.remove_job(job["name"])
except Exception:
pass
trigger = CronTrigger.from_crontab(job["cron"])
self._scheduler.add_job(
self._job_callback,
trigger=trigger,
id=job["name"],
args=[job["name"]],
max_instances=1,
)
async def _job_callback(self, job_name: str) -> None:
"""APScheduler callback — finds job and executes."""
job = self._find_job(job_name)
if job is None:
logger.error("Scheduled callback for unknown job '%s'", job_name)
return
await self._execute_job(job)
async def _execute_job(self, job: dict) -> str:
"""Execute a job: run Claude CLI, update state, send output."""
name = job["name"]
job["last_run"] = datetime.now(timezone.utc).isoformat()
# Build CLI command
cmd = [
CLAUDE_BIN, "-p", job["prompt"],
"--model", job["model"],
"--output-format", "json",
]
try:
system_prompt = build_system_prompt()
cmd += ["--system-prompt", system_prompt]
except FileNotFoundError:
pass
if job.get("allowed_tools"):
cmd += ["--allowedTools"] + job["allowed_tools"]
# Run in thread to not block event loop
result_text = ""
try:
proc = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=JOB_TIMEOUT,
env=_safe_env(),
cwd=PROJECT_ROOT,
)
if proc.returncode != 0:
error_msg = proc.stderr[:500] if proc.stderr else "unknown error"
raise RuntimeError(
f"Claude CLI error (exit {proc.returncode}): {error_msg}"
)
data = json.loads(proc.stdout)
result_text = data.get("result", "")
job["last_status"] = "ok"
logger.info("Job '%s' completed successfully", name)
except subprocess.TimeoutExpired:
job["last_status"] = "error"
result_text = f"[cron:{name}] Error: timed out after {JOB_TIMEOUT}s"
logger.error("Job '%s' timed out", name)
except (RuntimeError, json.JSONDecodeError) as exc:
job["last_status"] = "error"
result_text = f"[cron:{name}] Error: {exc}"
logger.error("Job '%s' failed: %s", name, exc)
except Exception as exc:
job["last_status"] = "error"
result_text = f"[cron:{name}] Error: {exc}"
logger.error("Job '%s' unexpected error: %s", name, exc)
# Update next_run from APScheduler
self._update_next_run(job)
# Save state
self._save_jobs()
# Send output via callback
if self._send_callback and result_text:
try:
await self._send_callback(job["channel"], result_text)
except Exception as exc:
logger.error("Job '%s' send_callback failed: %s", name, exc)
return result_text
def _update_next_run(self, job: dict) -> None:
"""Update job's next_run from APScheduler."""
try:
aps_job = self._scheduler.get_job(job["name"])
if aps_job and aps_job.next_run_time:
job["next_run"] = aps_job.next_run_time.isoformat()
else:
job["next_run"] = None
except Exception:
job["next_run"] = None