diff --git a/cli.py b/cli.py index 215b485..8419b36 100755 --- a/cli.py +++ b/cli.py @@ -292,6 +292,119 @@ def cmd_send(args): print(response) +def cmd_cron(args): + """Handle cron subcommand.""" + if args.cron_action == "list": + _cron_list() + elif args.cron_action == "run": + _cron_run(args.name) + elif args.cron_action == "add": + tools = [t.strip() for t in args.tools.split(",")] if args.tools else [] + if args.prompt == "-": + prompt = sys.stdin.read().strip() + else: + prompt = args.prompt + _cron_add(args.name, args.expression, args.channel, prompt, + args.model, tools) + elif args.cron_action == "remove": + _cron_remove(args.name) + elif args.cron_action == "enable": + _cron_enable(args.name) + elif args.cron_action == "disable": + _cron_disable(args.name) + + +def _cron_list(): + """List scheduled jobs in tabular format.""" + from src.scheduler import Scheduler + s = Scheduler() + s._jobs = s._load_jobs() + jobs = s.list_jobs() + if not jobs: + print("No scheduled jobs.") + return + + print(f"{'Name':<24} {'Cron':<16} {'Channel':<10} {'Model':<8} {'Enabled':<8} {'Last Status':<12} {'Next Run'}") + print(f"{'-'*24} {'-'*16} {'-'*10} {'-'*8} {'-'*8} {'-'*12} {'-'*20}") + + for job in jobs: + name = job.get("name", "?") + cron = job.get("cron", "?") + channel = job.get("channel", "?") + model = job.get("model", "?") + enabled = "yes" if job.get("enabled") else "no" + last_status = job.get("last_status") or "-" + next_run = job.get("next_run") or "-" + if next_run != "-" and len(next_run) > 19: + next_run = next_run[:19].replace("T", " ") + print(f"{name:<24} {cron:<16} {channel:<10} {model:<8} {enabled:<8} {last_status:<12} {next_run}") + + +def _cron_run(name: str): + """Force-run a job and print output.""" + import asyncio + from src.scheduler import Scheduler + + async def _run(): + s = Scheduler() + s._jobs = s._load_jobs() + return await s.run_job(name) + + try: + result = asyncio.run(_run()) + print(result) + except KeyError as exc: + print(f"Error: {exc}") + sys.exit(1) + + +def _cron_add(name: str, cron: str, channel: str, prompt: str, + model: str, tools: list[str]): + """Add a new cron job.""" + from src.scheduler import Scheduler + s = Scheduler() + s._jobs = s._load_jobs() + try: + job = s.add_job(name, cron, channel, prompt, model, tools or None) + print(f"Job '{job['name']}' added (cron: {job['cron']}, channel: {job['channel']}, model: {job['model']})") + except ValueError as exc: + print(f"Error: {exc}") + sys.exit(1) + + +def _cron_remove(name: str): + """Remove a cron job.""" + from src.scheduler import Scheduler + s = Scheduler() + s._jobs = s._load_jobs() + if s.remove_job(name): + print(f"Job '{name}' removed.") + else: + print(f"Job '{name}' not found.") + + +def _cron_enable(name: str): + """Enable a cron job.""" + from src.scheduler import Scheduler + s = Scheduler() + s._jobs = s._load_jobs() + if s.enable_job(name): + print(f"Job '{name}' enabled.") + else: + print(f"Job '{name}' not found.") + + +def _cron_disable(name: str): + """Disable a cron job.""" + from src.scheduler import Scheduler + s = Scheduler() + s._jobs = s._load_jobs() + if s.disable_job(name): + print(f"Job '{name}' disabled.") + else: + print(f"Job '{name}' not found.") + + def cmd_secrets(args): """Handle secrets subcommand.""" if args.secrets_action == "set": @@ -396,6 +509,32 @@ def main(): secrets_sub.add_parser("test", help="Check required secrets") + # cron + cron_parser = sub.add_parser("cron", help="Manage scheduled jobs") + cron_sub = cron_parser.add_subparsers(dest="cron_action") + + cron_sub.add_parser("list", help="List scheduled jobs") + + cron_run_p = cron_sub.add_parser("run", help="Force-run a job") + cron_run_p.add_argument("name", help="Job name") + + cron_add_p = cron_sub.add_parser("add", help="Add a scheduled job") + cron_add_p.add_argument("name", help="Job name") + cron_add_p.add_argument("expression", help="Cron expression (e.g. '30 6 * * *')") + cron_add_p.add_argument("--channel", required=True, help="Channel alias") + cron_add_p.add_argument("--prompt", required=True, help="Prompt text (use '-' for stdin)") + cron_add_p.add_argument("--model", default="sonnet", help="Model (default: sonnet)") + cron_add_p.add_argument("--tools", default=None, help="Comma-separated allowed tools") + + cron_remove_p = cron_sub.add_parser("remove", help="Remove a job") + cron_remove_p.add_argument("name", help="Job name") + + cron_enable_p = cron_sub.add_parser("enable", help="Enable a job") + cron_enable_p.add_argument("name", help="Job name") + + cron_disable_p = cron_sub.add_parser("disable", help="Disable a job") + cron_disable_p.add_argument("name", help="Job name") + # Parse and dispatch args = parser.parse_args() @@ -415,6 +554,9 @@ def main(): cmd_channel(a) if a.channel_action else (channel_parser.print_help() or sys.exit(0)) ), "send": cmd_send, + "cron": lambda a: ( + cmd_cron(a) if a.cron_action else (cron_parser.print_help() or sys.exit(0)) + ), "secrets": lambda a: ( cmd_secrets(a) if a.secrets_action else (secrets_parser.print_help() or sys.exit(0)) ), diff --git a/cron/jobs.json b/cron/jobs.json new file mode 100644 index 0000000..fe51488 --- /dev/null +++ b/cron/jobs.json @@ -0,0 +1 @@ +[] diff --git a/src/adapters/discord_bot.py b/src/adapters/discord_bot.py index c18ead8..8a607df 100644 --- a/src/adapters/discord_bot.py +++ b/src/adapters/discord_bot.py @@ -52,6 +52,15 @@ def is_registered_channel(channel_id: str) -> bool: return any(ch.get("id") == channel_id for ch in channels.values()) +def _channel_alias_for_id(channel_id: str) -> str | None: + """Resolve a Discord channel ID to its config alias.""" + channels = _get_config().get("channels", {}) + for alias, info in channels.items(): + if info.get("id") == channel_id: + return alias + return None + + # --- Message splitting helper --- @@ -114,6 +123,14 @@ def create_bot(config: Config) -> discord.Client: "`/model ` — Change model for this channel's session", "`/logs [n]` — Show last N log lines (default 10)", "`/restart` — Restart the bot process (owner only)", + "", + "**Cron Jobs**", + "`/cron list` — List all scheduled jobs", + "`/cron run ` — Force-run a job now", + "`/cron add [model]` — Create a scheduled job (admin)", + "`/cron remove ` — Remove a job (admin)", + "`/cron enable ` — Enable a job (admin)", + "`/cron disable ` — Disable a job (admin)", ] await interaction.response.send_message( "\n".join(lines), ephemeral=True @@ -182,6 +199,207 @@ def create_bot(config: Config) -> discord.Client: tree.add_command(admin_group) + # --- Cron commands --- + + cron_group = app_commands.Group( + name="cron", description="Manage scheduled jobs" + ) + + @cron_group.command(name="list", description="List all scheduled jobs") + async def cron_list(interaction: discord.Interaction) -> None: + scheduler = getattr(client, "scheduler", None) + if scheduler is None: + await interaction.response.send_message( + "Scheduler not available.", ephemeral=True + ) + return + jobs = scheduler.list_jobs() + if not jobs: + await interaction.response.send_message( + "No scheduled jobs.", ephemeral=True + ) + return + lines = [ + f"{'Name':<24} {'Cron':<14} {'Channel':<10} {'Model':<8} {'On':<5} {'Status':<8} {'Next Run'}" + ] + for j in jobs: + enabled = "yes" if j.get("enabled") else "no" + last_status = j.get("last_status") or "\u2014" + next_run = j.get("next_run") or "\u2014" + if next_run != "\u2014" and len(next_run) > 19: + next_run = next_run[:19] + lines.append( + f"{j['name']:<24} {j['cron']:<14} {j['channel']:<10} {j['model']:<8} {enabled:<5} {last_status:<8} {next_run}" + ) + table = "```\n" + "\n".join(lines) + "\n```" + await interaction.response.send_message(table, ephemeral=True) + + @cron_group.command(name="run", description="Force-run a scheduled job") + @app_commands.describe(name="Job name to run") + async def cron_run(interaction: discord.Interaction, name: str) -> None: + scheduler = getattr(client, "scheduler", None) + if scheduler is None: + await interaction.response.send_message( + "Scheduler not available.", ephemeral=True + ) + return + await interaction.response.defer() + try: + result = await scheduler.run_job(name) + truncated = result[:1900] if len(result) > 1900 else result + await interaction.followup.send(truncated) + except KeyError: + await interaction.followup.send(f"Job '{name}' not found.") + except Exception as e: + await interaction.followup.send(f"Error running job: {e}") + + @cron_group.command(name="add", description="Create a new scheduled job") + @app_commands.describe( + name="Job name (lowercase, hyphens allowed)", + expression="Cron expression (e.g. '30 6 * * *')", + model="AI model to use (default: sonnet)", + ) + @app_commands.choices(model=[ + app_commands.Choice(name="opus", value="opus"), + app_commands.Choice(name="sonnet", value="sonnet"), + app_commands.Choice(name="haiku", value="haiku"), + ]) + async def cron_add( + interaction: discord.Interaction, + name: str, + expression: str, + model: app_commands.Choice[str] | None = None, + ) -> None: + if not is_admin(str(interaction.user.id)): + await interaction.response.send_message( + "Admin only.", ephemeral=True + ) + return + scheduler = getattr(client, "scheduler", None) + if scheduler is None: + await interaction.response.send_message( + "Scheduler not available.", ephemeral=True + ) + return + channel_alias = _channel_alias_for_id(str(interaction.channel_id)) + if channel_alias is None: + await interaction.response.send_message( + "This channel is not registered. Use `/channel add` first.", + ephemeral=True, + ) + return + model_value = model.value if model else "sonnet" + await interaction.response.send_message( + f"Creating job **{name}** (`{expression}`, model: {model_value}, channel: {channel_alias}).\n" + "Send your prompt text as the next message in this channel.", + ) + + def check(m: discord.Message) -> bool: + return ( + m.author == interaction.user + and m.channel.id == interaction.channel_id + ) + + try: + prompt_msg = await client.wait_for( + "message", check=check, timeout=120 + ) + except asyncio.TimeoutError: + await interaction.followup.send("Timed out waiting for prompt.") + return + + try: + job = scheduler.add_job( + name=name, + cron=expression, + channel=channel_alias, + prompt=prompt_msg.content, + model=model_value, + ) + next_run = job.get("next_run") or "\u2014" + await interaction.channel.send( + f"Job **{name}** created.\n" + f"Cron: `{expression}` | Channel: {channel_alias} | Model: {model_value}\n" + f"Next run: {next_run}" + ) + except ValueError as e: + await interaction.channel.send(f"Error creating job: {e}") + + @cron_group.command(name="remove", description="Remove a scheduled job") + @app_commands.describe(name="Job name to remove") + async def cron_remove(interaction: discord.Interaction, name: str) -> None: + if not is_admin(str(interaction.user.id)): + await interaction.response.send_message( + "Admin only.", ephemeral=True + ) + return + scheduler = getattr(client, "scheduler", None) + if scheduler is None: + await interaction.response.send_message( + "Scheduler not available.", ephemeral=True + ) + return + if scheduler.remove_job(name): + await interaction.response.send_message( + f"Job '{name}' removed.", ephemeral=True + ) + else: + await interaction.response.send_message( + f"Job '{name}' not found.", ephemeral=True + ) + + @cron_group.command(name="enable", description="Enable a scheduled job") + @app_commands.describe(name="Job name to enable") + async def cron_enable( + interaction: discord.Interaction, name: str + ) -> None: + if not is_admin(str(interaction.user.id)): + await interaction.response.send_message( + "Admin only.", ephemeral=True + ) + return + scheduler = getattr(client, "scheduler", None) + if scheduler is None: + await interaction.response.send_message( + "Scheduler not available.", ephemeral=True + ) + return + if scheduler.enable_job(name): + await interaction.response.send_message( + f"Job '{name}' enabled.", ephemeral=True + ) + else: + await interaction.response.send_message( + f"Job '{name}' not found.", ephemeral=True + ) + + @cron_group.command(name="disable", description="Disable a scheduled job") + @app_commands.describe(name="Job name to disable") + async def cron_disable( + interaction: discord.Interaction, name: str + ) -> None: + if not is_admin(str(interaction.user.id)): + await interaction.response.send_message( + "Admin only.", ephemeral=True + ) + return + scheduler = getattr(client, "scheduler", None) + if scheduler is None: + await interaction.response.send_message( + "Scheduler not available.", ephemeral=True + ) + return + if scheduler.disable_job(name): + await interaction.response.send_message( + f"Job '{name}' disabled.", ephemeral=True + ) + else: + await interaction.response.send_message( + f"Job '{name}' not found.", ephemeral=True + ) + + tree.add_command(cron_group) + @tree.command(name="channels", description="List registered channels") async def channels(interaction: discord.Interaction) -> None: ch_map = config.get("channels", {}) @@ -340,6 +558,9 @@ def create_bot(config: Config) -> discord.Client: @client.event async def on_ready() -> None: await tree.sync() + scheduler = getattr(client, "scheduler", None) + if scheduler is not None: + await scheduler.start() logger.info("Echo Core online as %s", client.user) async def _handle_chat(message: discord.Message) -> None: diff --git a/src/main.py b/src/main.py index eda664a..599c34c 100644 --- a/src/main.py +++ b/src/main.py @@ -9,7 +9,8 @@ from pathlib import Path from src.config import load_config from src.secrets import get_secret -from src.adapters.discord_bot import create_bot +from src.adapters.discord_bot import create_bot, split_message +from src.scheduler import Scheduler PROJECT_ROOT = Path(__file__).resolve().parent.parent PID_FILE = PROJECT_ROOT / "echo-core.pid" @@ -43,6 +44,26 @@ def main(): config = load_config() client = create_bot(config) + # Scheduler setup + async def _send_to_channel(channel_alias: str, text: str) -> None: + """Callback: resolve alias and send text to Discord channel.""" + channels = config.get("channels", {}) + ch_info = channels.get(channel_alias) + if not ch_info: + logger.warning("Cron: unknown channel alias '%s'", channel_alias) + return + channel_id = ch_info.get("id") + channel = client.get_channel(int(channel_id)) + if channel is None: + logger.warning("Cron: channel %s not found in Discord cache", channel_id) + return + chunks = split_message(text) + for chunk in chunks: + await channel.send(chunk) + + scheduler = Scheduler(send_callback=_send_to_channel, config=config) + client.scheduler = scheduler # type: ignore[attr-defined] + # PID file PID_FILE.write_text(str(os.getpid())) @@ -51,6 +72,7 @@ def main(): def handle_signal(sig, frame): logger.info("Received signal %s, shutting down...", sig) + loop.create_task(scheduler.stop()) loop.create_task(client.close()) signal.signal(signal.SIGTERM, handle_signal) @@ -59,6 +81,7 @@ def main(): try: loop.run_until_complete(client.start(token)) except KeyboardInterrupt: + loop.run_until_complete(scheduler.stop()) loop.run_until_complete(client.close()) finally: PID_FILE.unlink(missing_ok=True) diff --git a/src/scheduler.py b/src/scheduler.py new file mode 100644 index 0000000..e66e185 --- /dev/null +++ b/src/scheduler.py @@ -0,0 +1,349 @@ +""" +Cron-like job scheduler for Echo-Core. + +Wraps APScheduler AsyncIOScheduler to run Claude CLI prompts on a schedule, +sending output to designated Discord channels. +""" + +import asyncio +import json +import logging +import os +import re +import subprocess +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Awaitable, Callable + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger + +from src.claude_session import ( + CLAUDE_BIN, + PROJECT_ROOT, + VALID_MODELS, + _safe_env, + build_system_prompt, +) + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +JOBS_DIR = PROJECT_ROOT / "cron" +JOBS_FILE = JOBS_DIR / "jobs.json" +JOB_TIMEOUT = 300 # 5-minute default per job execution + +_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$") +_MAX_PROMPT_LEN = 10_000 + +# --------------------------------------------------------------------------- +# Scheduler class +# --------------------------------------------------------------------------- + + +class Scheduler: + """Wraps APScheduler AsyncIOScheduler for Echo Core cron jobs.""" + + def __init__( + self, + send_callback: Callable[[str, str], Awaitable[None]] | None = None, + config=None, + ) -> None: + self._send_callback = send_callback + self._config = config + self._scheduler = AsyncIOScheduler() + self._jobs: list[dict] = [] + + # ------------------------------------------------------------------ + # Public methods + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Load jobs from jobs.json, schedule enabled ones, start scheduler.""" + self._jobs = self._load_jobs() + for job in self._jobs: + if job.get("enabled", False): + self._schedule_job(job) + self._scheduler.start() + logger.info("Scheduler started with %d jobs (%d enabled)", + len(self._jobs), + sum(1 for j in self._jobs if j.get("enabled"))) + + async def stop(self) -> None: + """Shut down APScheduler gracefully.""" + self._scheduler.shutdown(wait=False) + logger.info("Scheduler stopped") + + def add_job( + self, + name: str, + cron: str, + channel: str, + prompt: str, + model: str = "sonnet", + allowed_tools: list[str] | None = None, + ) -> dict: + """Validate, add job to list, save, and schedule. Returns new job dict.""" + # Validate name + if not _NAME_RE.match(name): + raise ValueError( + f"Invalid job name '{name}'. Must match: lowercase alphanumeric " + "and hyphens, 1-63 chars, starting with alphanumeric." + ) + + # Duplicate check + if any(j["name"] == name for j in self._jobs): + raise ValueError(f"Job '{name}' already exists") + + # Validate cron expression + try: + CronTrigger.from_crontab(cron) + except (ValueError, KeyError) as exc: + raise ValueError(f"Invalid cron expression '{cron}': {exc}") + + # Validate model + if model not in VALID_MODELS: + raise ValueError( + f"Invalid model '{model}'. Must be one of: {', '.join(sorted(VALID_MODELS))}" + ) + + # Validate prompt + if not prompt or not prompt.strip(): + raise ValueError("Prompt must be non-empty") + if len(prompt) > _MAX_PROMPT_LEN: + raise ValueError(f"Prompt too long ({len(prompt)} chars, max {_MAX_PROMPT_LEN})") + + job = { + "name": name, + "cron": cron, + "channel": channel, + "model": model, + "prompt": prompt, + "allowed_tools": allowed_tools or [], + "enabled": True, + "last_run": None, + "last_status": None, + "next_run": None, + } + + self._jobs.append(job) + self._schedule_job(job) + self._update_next_run(job) + self._save_jobs() + + logger.info("Added job '%s' (cron: %s, channel: %s)", name, cron, channel) + return job + + def remove_job(self, name: str) -> bool: + """Remove job from list and APScheduler. Returns True if found.""" + for i, job in enumerate(self._jobs): + if job["name"] == name: + self._jobs.pop(i) + try: + self._scheduler.remove_job(name) + except Exception: + pass + self._save_jobs() + logger.info("Removed job '%s'", name) + return True + return False + + def enable_job(self, name: str) -> bool: + """Enable job and schedule in APScheduler. Returns True if found.""" + for job in self._jobs: + if job["name"] == name: + job["enabled"] = True + self._schedule_job(job) + self._update_next_run(job) + self._save_jobs() + logger.info("Enabled job '%s'", name) + return True + return False + + def disable_job(self, name: str) -> bool: + """Disable job and remove from APScheduler. Returns True if found.""" + for job in self._jobs: + if job["name"] == name: + job["enabled"] = False + job["next_run"] = None + try: + self._scheduler.remove_job(name) + except Exception: + pass + self._save_jobs() + logger.info("Disabled job '%s'", name) + return True + return False + + async def run_job(self, name: str) -> str: + """Force-execute a job immediately. Returns Claude response text.""" + job = self._find_job(name) + if job is None: + raise KeyError(f"Job '{name}' not found") + + return await self._execute_job(job) + + def list_jobs(self) -> list[dict]: + """Return a copy of all jobs with current state.""" + return [dict(j) for j in self._jobs] + + # ------------------------------------------------------------------ + # Internal methods + # ------------------------------------------------------------------ + + def _find_job(self, name: str) -> dict | None: + """Find a job by name.""" + for job in self._jobs: + if job["name"] == name: + return job + return None + + def _load_jobs(self) -> list[dict]: + """Read and parse jobs.json. Returns [] if missing or corrupt.""" + try: + text = JOBS_FILE.read_text(encoding="utf-8") + if not text.strip(): + return [] + data = json.loads(text) + if not isinstance(data, list): + logger.error("jobs.json is not a list, treating as empty") + return [] + return data + except FileNotFoundError: + return [] + except json.JSONDecodeError as exc: + logger.error("jobs.json corrupt (%s), treating as empty", exc) + return [] + + def _save_jobs(self) -> None: + """Atomically write current jobs list to jobs.json.""" + JOBS_DIR.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp( + dir=JOBS_DIR, prefix=".jobs_", suffix=".json" + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(self._jobs, f, indent=2, ensure_ascii=False) + f.write("\n") + os.replace(tmp_path, JOBS_FILE) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + def _schedule_job(self, job: dict) -> None: + """Add a single job to APScheduler.""" + # Remove existing schedule if any + try: + self._scheduler.remove_job(job["name"]) + except Exception: + pass + + trigger = CronTrigger.from_crontab(job["cron"]) + self._scheduler.add_job( + self._job_callback, + trigger=trigger, + id=job["name"], + args=[job["name"]], + max_instances=1, + ) + + async def _job_callback(self, job_name: str) -> None: + """APScheduler callback — finds job and executes.""" + job = self._find_job(job_name) + if job is None: + logger.error("Scheduled callback for unknown job '%s'", job_name) + return + await self._execute_job(job) + + async def _execute_job(self, job: dict) -> str: + """Execute a job: run Claude CLI, update state, send output.""" + name = job["name"] + job["last_run"] = datetime.now(timezone.utc).isoformat() + + # Build CLI command + cmd = [ + CLAUDE_BIN, "-p", job["prompt"], + "--model", job["model"], + "--output-format", "json", + ] + + try: + system_prompt = build_system_prompt() + cmd += ["--system-prompt", system_prompt] + except FileNotFoundError: + pass + + if job.get("allowed_tools"): + cmd += ["--allowedTools"] + job["allowed_tools"] + + # Run in thread to not block event loop + result_text = "" + try: + proc = await asyncio.to_thread( + subprocess.run, + cmd, + capture_output=True, + text=True, + timeout=JOB_TIMEOUT, + env=_safe_env(), + cwd=PROJECT_ROOT, + ) + + if proc.returncode != 0: + error_msg = proc.stderr[:500] if proc.stderr else "unknown error" + raise RuntimeError( + f"Claude CLI error (exit {proc.returncode}): {error_msg}" + ) + + data = json.loads(proc.stdout) + result_text = data.get("result", "") + job["last_status"] = "ok" + logger.info("Job '%s' completed successfully", name) + + except subprocess.TimeoutExpired: + job["last_status"] = "error" + result_text = f"[cron:{name}] Error: timed out after {JOB_TIMEOUT}s" + logger.error("Job '%s' timed out", name) + + except (RuntimeError, json.JSONDecodeError) as exc: + job["last_status"] = "error" + result_text = f"[cron:{name}] Error: {exc}" + logger.error("Job '%s' failed: %s", name, exc) + + except Exception as exc: + job["last_status"] = "error" + result_text = f"[cron:{name}] Error: {exc}" + logger.error("Job '%s' unexpected error: %s", name, exc) + + # Update next_run from APScheduler + self._update_next_run(job) + + # Save state + self._save_jobs() + + # Send output via callback + if self._send_callback and result_text: + try: + await self._send_callback(job["channel"], result_text) + except Exception as exc: + logger.error("Job '%s' send_callback failed: %s", name, exc) + + return result_text + + def _update_next_run(self, job: dict) -> None: + """Update job's next_run from APScheduler.""" + try: + aps_job = self._scheduler.get_job(job["name"]) + if aps_job and aps_job.next_run_time: + job["next_run"] = aps_job.next_run_time.isoformat() + else: + job["next_run"] = None + except Exception: + job["next_run"] = None diff --git a/tests/test_cli.py b/tests/test_cli.py index ddf745e..f09c126 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -364,3 +364,124 @@ class TestSend: with pytest.raises(SystemExit): cli.cmd_send(_args(alias="nope", message=["hi"])) assert "unknown channel" in capsys.readouterr().out.lower() + + +# --------------------------------------------------------------------------- +# cron list +# --------------------------------------------------------------------------- + + +class TestCronList: + def test_list_empty(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.list_jobs.return_value = [] + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_list() + assert "No scheduled jobs" in capsys.readouterr().out + + def test_list_shows_table(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [ + { + "name": "daily-run", + "cron": "30 6 * * *", + "channel": "work", + "model": "sonnet", + "enabled": True, + "last_status": "ok", + "next_run": None, + } + ] + mock_sched.list_jobs.return_value = mock_sched._load_jobs.return_value + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_list() + out = capsys.readouterr().out + assert "daily-run" in out + assert "30 6 * * *" in out + assert "sonnet" in out + + +# --------------------------------------------------------------------------- +# cron add / remove / enable / disable +# --------------------------------------------------------------------------- + + +class TestCronAdd: + def test_add_success(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.add_job.return_value = { + "name": "new-job", + "cron": "0 * * * *", + "channel": "ch", + "model": "sonnet", + } + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_add("new-job", "0 * * * *", "ch", "hello prompt", "sonnet", []) + out = capsys.readouterr().out + assert "new-job" in out + assert "added" in out.lower() + + def test_add_error(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.add_job.side_effect = ValueError("duplicate name") + with patch("src.scheduler.Scheduler", return_value=mock_sched): + with pytest.raises(SystemExit): + cli._cron_add("dup", "0 * * * *", "ch", "prompt", "sonnet", []) + assert "duplicate name" in capsys.readouterr().out + + +class TestCronRemove: + def test_remove_found(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.remove_job.return_value = True + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_remove("old-job") + assert "removed" in capsys.readouterr().out.lower() + + def test_remove_not_found(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.remove_job.return_value = False + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_remove("ghost") + assert "not found" in capsys.readouterr().out.lower() + + +class TestCronEnable: + def test_enable_found(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.enable_job.return_value = True + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_enable("my-job") + assert "enabled" in capsys.readouterr().out.lower() + + def test_enable_not_found(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.enable_job.return_value = False + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_enable("nope") + assert "not found" in capsys.readouterr().out.lower() + + +class TestCronDisable: + def test_disable_found(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.disable_job.return_value = True + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_disable("my-job") + assert "disabled" in capsys.readouterr().out.lower() + + def test_disable_not_found(self, iso, capsys): + mock_sched = MagicMock() + mock_sched._load_jobs.return_value = [] + mock_sched.disable_job.return_value = False + with patch("src.scheduler.Scheduler", return_value=mock_sched): + cli._cron_disable("nope") + assert "not found" in capsys.readouterr().out.lower() diff --git a/tests/test_discord.py b/tests/test_discord.py index cc58be0..25078b9 100644 --- a/tests/test_discord.py +++ b/tests/test_discord.py @@ -656,3 +656,235 @@ class TestLogsSlashCommand: msg = interaction.response.send_message.call_args assert "no log file" in msg.args[0].lower() + + +# --- /cron slash commands --- + + +class TestCronList: + @pytest.mark.asyncio + async def test_cron_list_shows_table(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.list_jobs.return_value = [ + { + "name": "daily-summary", + "cron": "30 6 * * *", + "channel": "work", + "model": "sonnet", + "enabled": True, + "last_status": "ok", + "next_run": "2025-01-15T06:30:00+00:00", + } + ] + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "list") + assert cmd is not None + interaction = _mock_interaction() + await cmd.callback(interaction) + + msg = interaction.response.send_message.call_args + text = msg.args[0] + assert "daily-summary" in text + assert "30 6 * * *" in text + assert "```" in text + + @pytest.mark.asyncio + async def test_cron_list_empty(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.list_jobs.return_value = [] + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "list") + interaction = _mock_interaction() + await cmd.callback(interaction) + + msg = interaction.response.send_message.call_args + assert "no scheduled jobs" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_list_no_scheduler(self, owned_bot): + cmd = _find_subcommand(owned_bot.tree, "cron", "list") + interaction = _mock_interaction() + await cmd.callback(interaction) + + msg = interaction.response.send_message.call_args + assert "not available" in msg.args[0].lower() + + +class TestCronRun: + @pytest.mark.asyncio + async def test_cron_run_defers_and_runs(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.run_job = AsyncMock(return_value="Job output here") + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "run") + assert cmd is not None + interaction = _mock_interaction() + interaction.followup = AsyncMock() + await cmd.callback(interaction, name="my-job") + + interaction.response.defer.assert_awaited_once() + mock_scheduler.run_job.assert_awaited_once_with("my-job") + interaction.followup.send.assert_awaited_once_with("Job output here") + + @pytest.mark.asyncio + async def test_cron_run_not_found(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.run_job = AsyncMock(side_effect=KeyError("Job 'nope' not found")) + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "run") + interaction = _mock_interaction() + interaction.followup = AsyncMock() + await cmd.callback(interaction, name="nope") + + interaction.response.defer.assert_awaited_once() + msg = interaction.followup.send.call_args + assert "not found" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_run_no_scheduler(self, owned_bot): + cmd = _find_subcommand(owned_bot.tree, "cron", "run") + interaction = _mock_interaction() + await cmd.callback(interaction, name="test") + + msg = interaction.response.send_message.call_args + assert "not available" in msg.args[0].lower() + + +class TestCronAdd: + @pytest.mark.asyncio + async def test_cron_add_admin_only(self, owned_bot): + cmd = _find_subcommand(owned_bot.tree, "cron", "add") + assert cmd is not None + # Non-admin user + interaction = _mock_interaction(user_id="999") + await cmd.callback(interaction, name="test", expression="0 * * * *", model=None) + + msg = interaction.response.send_message.call_args + assert "admin only" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_add_rejects_non_admin(self, owned_bot): + cmd = _find_subcommand(owned_bot.tree, "cron", "add") + interaction = _mock_interaction(user_id="888") + await cmd.callback(interaction, name="test", expression="0 * * * *", model=None) + + msg = interaction.response.send_message.call_args + assert "admin only" in msg.args[0].lower() + + +class TestCronRemove: + @pytest.mark.asyncio + async def test_cron_remove_admin_only(self, owned_bot): + cmd = _find_subcommand(owned_bot.tree, "cron", "remove") + assert cmd is not None + interaction = _mock_interaction(user_id="999") + await cmd.callback(interaction, name="test") + + msg = interaction.response.send_message.call_args + assert "admin only" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_remove_success(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.remove_job.return_value = True + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "remove") + interaction = _mock_interaction(user_id="111") # owner + await cmd.callback(interaction, name="my-job") + + msg = interaction.response.send_message.call_args + assert "removed" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_remove_not_found(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.remove_job.return_value = False + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "remove") + interaction = _mock_interaction(user_id="111") + await cmd.callback(interaction, name="ghost") + + msg = interaction.response.send_message.call_args + assert "not found" in msg.args[0].lower() + + +class TestCronEnable: + @pytest.mark.asyncio + async def test_cron_enable_admin_only(self, owned_bot): + cmd = _find_subcommand(owned_bot.tree, "cron", "enable") + assert cmd is not None + interaction = _mock_interaction(user_id="999") + await cmd.callback(interaction, name="test") + + msg = interaction.response.send_message.call_args + assert "admin only" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_enable_success(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.enable_job.return_value = True + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "enable") + interaction = _mock_interaction(user_id="111") + await cmd.callback(interaction, name="my-job") + + msg = interaction.response.send_message.call_args + assert "enabled" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_enable_not_found(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.enable_job.return_value = False + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "enable") + interaction = _mock_interaction(user_id="111") + await cmd.callback(interaction, name="ghost") + + msg = interaction.response.send_message.call_args + assert "not found" in msg.args[0].lower() + + +class TestCronDisable: + @pytest.mark.asyncio + async def test_cron_disable_admin_only(self, owned_bot): + cmd = _find_subcommand(owned_bot.tree, "cron", "disable") + assert cmd is not None + interaction = _mock_interaction(user_id="999") + await cmd.callback(interaction, name="test") + + msg = interaction.response.send_message.call_args + assert "admin only" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_disable_success(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.disable_job.return_value = True + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "disable") + interaction = _mock_interaction(user_id="111") + await cmd.callback(interaction, name="my-job") + + msg = interaction.response.send_message.call_args + assert "disabled" in msg.args[0].lower() + + @pytest.mark.asyncio + async def test_cron_disable_not_found(self, owned_bot): + mock_scheduler = MagicMock() + mock_scheduler.disable_job.return_value = False + owned_bot.scheduler = mock_scheduler + + cmd = _find_subcommand(owned_bot.tree, "cron", "disable") + interaction = _mock_interaction(user_id="111") + await cmd.callback(interaction, name="ghost") + + msg = interaction.response.send_message.call_args + assert "not found" in msg.args[0].lower() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..c9c3fcb --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,550 @@ +"""Tests for src/scheduler.py — Cron job scheduler.""" + +import asyncio +import json +import subprocess +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def tmp_jobs(tmp_path, monkeypatch): + """Redirect JOBS_DIR / JOBS_FILE to tmp_path for isolation.""" + jobs_dir = tmp_path / "cron" + jobs_dir.mkdir() + jobs_file = jobs_dir / "jobs.json" + + monkeypatch.setattr("src.scheduler.JOBS_DIR", jobs_dir) + monkeypatch.setattr("src.scheduler.JOBS_FILE", jobs_file) + return {"dir": jobs_dir, "file": jobs_file} + + +@pytest.fixture +def callback(): + """Async mock send_callback.""" + return AsyncMock() + + +@pytest.fixture +def sched(tmp_jobs, callback): + """Create a Scheduler with mocked file paths and callback.""" + return Scheduler(send_callback=callback) + + +def _sample_job(**overrides): + """Return a minimal valid job dict.""" + base = { + "name": "test-job", + "cron": "0 6 * * *", + "channel": "general", + "model": "sonnet", + "prompt": "Hello world", + "allowed_tools": [], + "enabled": True, + "last_run": None, + "last_status": None, + "next_run": None, + } + base.update(overrides) + return base + + +# --------------------------------------------------------------------------- +# _load_jobs / _save_jobs +# --------------------------------------------------------------------------- + + +class TestLoadJobs: + def test_load_empty_file(self, sched, tmp_jobs): + tmp_jobs["file"].write_text("") + result = sched._load_jobs() + assert result == [] + + def test_load_missing_file(self, sched, tmp_jobs): + # File doesn't exist yet + if tmp_jobs["file"].exists(): + tmp_jobs["file"].unlink() + result = sched._load_jobs() + assert result == [] + + def test_load_valid_jobs(self, sched, tmp_jobs): + jobs = [_sample_job()] + tmp_jobs["file"].write_text(json.dumps(jobs)) + result = sched._load_jobs() + assert len(result) == 1 + assert result[0]["name"] == "test-job" + + def test_load_corrupt_json(self, sched, tmp_jobs): + tmp_jobs["file"].write_text("{broken json!!!") + result = sched._load_jobs() + assert result == [] + + def test_load_non_list_json(self, sched, tmp_jobs): + tmp_jobs["file"].write_text('{"not": "a list"}') + result = sched._load_jobs() + assert result == [] + + +class TestSaveJobs: + def test_save_creates_file(self, sched, tmp_jobs): + if tmp_jobs["file"].exists(): + tmp_jobs["file"].unlink() + sched._jobs = [_sample_job()] + sched._save_jobs() + assert tmp_jobs["file"].exists() + data = json.loads(tmp_jobs["file"].read_text()) + assert len(data) == 1 + assert data[0]["name"] == "test-job" + + def test_save_roundtrip(self, sched, tmp_jobs): + jobs = [_sample_job(), _sample_job(name="second-job")] + sched._jobs = jobs + sched._save_jobs() + + loaded = sched._load_jobs() + assert len(loaded) == 2 + assert loaded[0]["name"] == "test-job" + assert loaded[1]["name"] == "second-job" + + def test_save_creates_dir_if_missing(self, tmp_path, monkeypatch, callback): + new_dir = tmp_path / "new_cron" + new_file = new_dir / "jobs.json" + monkeypatch.setattr("src.scheduler.JOBS_DIR", new_dir) + monkeypatch.setattr("src.scheduler.JOBS_FILE", new_file) + + s = Scheduler(send_callback=callback) + s._jobs = [_sample_job()] + s._save_jobs() + + assert new_dir.exists() + assert new_file.exists() + + def test_save_atomic_pattern(self, sched, tmp_jobs): + """Verify file is intact after save (atomic write via os.replace).""" + sched._jobs = [_sample_job()] + sched._save_jobs() + + # File should be valid JSON + data = json.loads(tmp_jobs["file"].read_text()) + assert data[0]["name"] == "test-job" + + # Overwrite with different data + sched._jobs = [_sample_job(name="updated")] + sched._save_jobs() + + data = json.loads(tmp_jobs["file"].read_text()) + assert data[0]["name"] == "updated" + + +# --------------------------------------------------------------------------- +# add_job +# --------------------------------------------------------------------------- + + +class TestAddJob: + def test_add_job_creates_file(self, sched, tmp_jobs): + job = sched.add_job("my-job", "30 6 * * *", "general", "Do something") + assert job["name"] == "my-job" + assert job["cron"] == "30 6 * * *" + assert job["channel"] == "general" + assert job["model"] == "sonnet" + assert job["enabled"] is True + assert tmp_jobs["file"].exists() + + data = json.loads(tmp_jobs["file"].read_text()) + assert len(data) == 1 + + def test_add_job_with_model(self, sched, tmp_jobs): + job = sched.add_job("my-job", "0 * * * *", "ch", "prompt", model="haiku") + assert job["model"] == "haiku" + + def test_add_job_with_tools(self, sched, tmp_jobs): + job = sched.add_job( + "my-job", "0 * * * *", "ch", "prompt", + allowed_tools=["Read", "Bash"] + ) + assert job["allowed_tools"] == ["Read", "Bash"] + + def test_add_job_duplicate_name_raises(self, sched, tmp_jobs): + sched.add_job("dupe", "0 * * * *", "ch", "prompt") + with pytest.raises(ValueError, match="already exists"): + sched.add_job("dupe", "0 * * * *", "ch", "prompt") + + def test_add_job_invalid_cron_raises(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid cron"): + sched.add_job("job1", "bad cron", "ch", "prompt") + + def test_add_job_invalid_model_raises(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid model"): + sched.add_job("job1", "0 * * * *", "ch", "prompt", model="gpt4") + + def test_add_job_invalid_name_uppercase(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid job name"): + sched.add_job("MyJob", "0 * * * *", "ch", "prompt") + + def test_add_job_invalid_name_spaces(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid job name"): + sched.add_job("my job", "0 * * * *", "ch", "prompt") + + def test_add_job_invalid_name_special(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid job name"): + sched.add_job("my_job!", "0 * * * *", "ch", "prompt") + + def test_add_job_empty_prompt_raises(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="non-empty"): + sched.add_job("job1", "0 * * * *", "ch", "") + + def test_add_job_whitespace_prompt_raises(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="non-empty"): + sched.add_job("job1", "0 * * * *", "ch", " ") + + def test_add_job_prompt_too_long(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="too long"): + sched.add_job("job1", "0 * * * *", "ch", "x" * 10_001) + + +# --------------------------------------------------------------------------- +# remove_job +# --------------------------------------------------------------------------- + + +class TestRemoveJob: + def test_remove_job(self, sched, tmp_jobs): + sched.add_job("to-remove", "0 * * * *", "ch", "prompt") + assert sched.remove_job("to-remove") is True + assert len(sched.list_jobs()) == 0 + + data = json.loads(tmp_jobs["file"].read_text()) + assert len(data) == 0 + + def test_remove_job_not_found(self, sched, tmp_jobs): + assert sched.remove_job("nonexistent") is False + + +# --------------------------------------------------------------------------- +# enable_job / disable_job +# --------------------------------------------------------------------------- + + +class TestEnableDisableJob: + def test_enable_job(self, sched, tmp_jobs): + sched.add_job("toggler", "0 * * * *", "ch", "prompt") + sched.disable_job("toggler") + + jobs = sched.list_jobs() + assert jobs[0]["enabled"] is False + + assert sched.enable_job("toggler") is True + jobs = sched.list_jobs() + assert jobs[0]["enabled"] is True + + def test_disable_job(self, sched, tmp_jobs): + sched.add_job("toggler", "0 * * * *", "ch", "prompt") + assert sched.disable_job("toggler") is True + + jobs = sched.list_jobs() + assert jobs[0]["enabled"] is False + assert jobs[0]["next_run"] is None + + def test_enable_not_found(self, sched, tmp_jobs): + assert sched.enable_job("nope") is False + + def test_disable_not_found(self, sched, tmp_jobs): + assert sched.disable_job("nope") is False + + def test_enable_persists(self, sched, tmp_jobs): + sched.add_job("persist", "0 * * * *", "ch", "prompt") + sched.disable_job("persist") + sched.enable_job("persist") + + data = json.loads(tmp_jobs["file"].read_text()) + assert data[0]["enabled"] is True + + def test_disable_persists(self, sched, tmp_jobs): + sched.add_job("persist", "0 * * * *", "ch", "prompt") + sched.disable_job("persist") + + data = json.loads(tmp_jobs["file"].read_text()) + assert data[0]["enabled"] is False + + +# --------------------------------------------------------------------------- +# list_jobs +# --------------------------------------------------------------------------- + + +class TestListJobs: + def test_list_jobs_empty(self, sched, tmp_jobs): + assert sched.list_jobs() == [] + + def test_list_jobs_returns_copy(self, sched, tmp_jobs): + sched.add_job("job1", "0 * * * *", "ch", "prompt") + + jobs = sched.list_jobs() + jobs[0]["name"] = "MUTATED" + + # Internal state should not be affected + internal = sched.list_jobs() + assert internal[0]["name"] == "job1" + + def test_list_jobs_returns_all(self, sched, tmp_jobs): + sched.add_job("a", "0 * * * *", "ch", "p1") + sched.add_job("b", "0 * * * *", "ch", "p2") + assert len(sched.list_jobs()) == 2 + + +# --------------------------------------------------------------------------- +# run_job / _execute_job +# --------------------------------------------------------------------------- + + +class TestRunJob: + @pytest.mark.asyncio + async def test_run_job_not_found(self, sched, tmp_jobs): + with pytest.raises(KeyError, match="not found"): + await sched.run_job("nonexistent") + + @pytest.mark.asyncio + async def test_execute_job_success(self, sched, tmp_jobs, callback): + sched.add_job("runner", "0 * * * *", "general", "test prompt") + + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = json.dumps({"result": "Claude says hello"}) + mock_proc.stderr = "" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc): + result = await sched.run_job("runner") + + assert result == "Claude says hello" + assert sched._jobs[0]["last_status"] == "ok" + assert sched._jobs[0]["last_run"] is not None + callback.assert_awaited_once_with("general", "Claude says hello") + + @pytest.mark.asyncio + async def test_execute_job_timeout(self, sched, tmp_jobs, callback): + sched.add_job("timeout-job", "0 * * * *", "ch", "prompt") + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=300)): + result = await sched.run_job("timeout-job") + + assert "Error" in result + assert "timed out" in result + assert sched._jobs[0]["last_status"] == "error" + callback.assert_awaited_once() + + @pytest.mark.asyncio + async def test_execute_job_cli_error(self, sched, tmp_jobs, callback): + sched.add_job("err-job", "0 * * * *", "ch", "prompt") + + mock_proc = MagicMock() + mock_proc.returncode = 1 + mock_proc.stdout = "" + mock_proc.stderr = "Some CLI error" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc): + result = await sched.run_job("err-job") + + assert "Error" in result + assert sched._jobs[0]["last_status"] == "error" + + @pytest.mark.asyncio + async def test_execute_job_invalid_json(self, sched, tmp_jobs, callback): + sched.add_job("json-err", "0 * * * *", "ch", "prompt") + + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = "not valid json" + mock_proc.stderr = "" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc): + result = await sched.run_job("json-err") + + assert "Error" in result + assert sched._jobs[0]["last_status"] == "error" + + @pytest.mark.asyncio + async def test_execute_job_with_allowed_tools(self, sched, tmp_jobs, callback): + sched.add_job( + "tools-job", "0 * * * *", "ch", "prompt", + allowed_tools=["Read", "Bash"] + ) + + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = json.dumps({"result": "ok"}) + mock_proc.stderr = "" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc) as mock_run: + await sched.run_job("tools-job") + + # Inspect the cmd passed to subprocess.run + cmd = mock_run.call_args[0][0] + assert "--allowedTools" in cmd + assert "Read" in cmd + assert "Bash" in cmd + + @pytest.mark.asyncio + async def test_execute_job_no_callback(self, tmp_jobs): + """Scheduler with no send_callback should not error on execution.""" + s = Scheduler(send_callback=None) + s.add_job("no-cb", "0 * * * *", "ch", "prompt") + + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = json.dumps({"result": "response"}) + mock_proc.stderr = "" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc): + result = await s.run_job("no-cb") + + assert result == "response" + + @pytest.mark.asyncio + async def test_execute_job_updates_last_run(self, sched, tmp_jobs, callback): + sched.add_job("time-job", "0 * * * *", "ch", "prompt") + assert sched._jobs[0]["last_run"] is None + + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = json.dumps({"result": "ok"}) + mock_proc.stderr = "" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc): + await sched.run_job("time-job") + + assert sched._jobs[0]["last_run"] is not None + # Verify it's a valid ISO timestamp + datetime.fromisoformat(sched._jobs[0]["last_run"]) + + +# --------------------------------------------------------------------------- +# start / stop +# --------------------------------------------------------------------------- + + +class TestStartStop: + @pytest.mark.asyncio + async def test_start_loads_and_schedules(self, sched, tmp_jobs): + jobs = [_sample_job(name="enabled-job", enabled=True)] + tmp_jobs["file"].write_text(json.dumps(jobs)) + + with patch.object(sched._scheduler, "start"), \ + patch.object(sched._scheduler, "add_job") as mock_add: + await sched.start() + + assert len(sched._jobs) == 1 + mock_add.assert_called_once() + + @pytest.mark.asyncio + async def test_start_skips_disabled(self, sched, tmp_jobs): + jobs = [ + _sample_job(name="on", enabled=True), + _sample_job(name="off", enabled=False), + ] + tmp_jobs["file"].write_text(json.dumps(jobs)) + + with patch.object(sched._scheduler, "start"), \ + patch.object(sched._scheduler, "add_job") as mock_add: + await sched.start() + + assert len(sched._jobs) == 2 + # Only the enabled job should be scheduled + assert mock_add.call_count == 1 + + @pytest.mark.asyncio + async def test_start_empty_file(self, sched, tmp_jobs): + with patch.object(sched._scheduler, "start"), \ + patch.object(sched._scheduler, "add_job") as mock_add: + await sched.start() + + assert sched._jobs == [] + mock_add.assert_not_called() + + @pytest.mark.asyncio + async def test_stop(self, sched): + with patch.object(sched._scheduler, "shutdown") as mock_shutdown: + await sched.stop() + mock_shutdown.assert_called_once_with(wait=False) + + +# --------------------------------------------------------------------------- +# Name regex validation +# --------------------------------------------------------------------------- + + +class TestNameRegex: + @pytest.mark.parametrize("name", [ + "a", "abc", "my-job", "daily-email-summary", + "a1", "123", "0-test", + ]) + def test_valid_names(self, name): + assert _NAME_RE.match(name) is not None + + @pytest.mark.parametrize("name", [ + "", "A", "MyJob", "my_job", "my job", "-start", + "a" * 64, # too long (max 63) + "job!", "job@work", + ]) + def test_invalid_names(self, name): + assert _NAME_RE.match(name) is None + + +# --------------------------------------------------------------------------- +# Integration test: full lifecycle +# --------------------------------------------------------------------------- + + +class TestFullLifecycle: + @pytest.mark.asyncio + async def test_add_list_enable_run_disable_remove(self, sched, tmp_jobs, callback): + # Add + job = sched.add_job("lifecycle", "0 6 * * *", "ch", "test prompt") + assert job["enabled"] is True + + # List + jobs = sched.list_jobs() + assert len(jobs) == 1 + assert jobs[0]["name"] == "lifecycle" + + # Disable + assert sched.disable_job("lifecycle") is True + assert sched.list_jobs()[0]["enabled"] is False + + # Enable + assert sched.enable_job("lifecycle") is True + assert sched.list_jobs()[0]["enabled"] is True + + # Run + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = json.dumps({"result": "lifecycle output"}) + mock_proc.stderr = "" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc): + result = await sched.run_job("lifecycle") + + assert result == "lifecycle output" + assert sched.list_jobs()[0]["last_status"] == "ok" + + # Remove + assert sched.remove_job("lifecycle") is True + assert sched.list_jobs() == []