"""Tests for src/scheduler.py — Cron job scheduler.""" import asyncio import json import subprocess from datetime import datetime, timezone from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def tmp_jobs(tmp_path, monkeypatch): """Redirect JOBS_DIR / JOBS_FILE to tmp_path for isolation.""" jobs_dir = tmp_path / "cron" jobs_dir.mkdir() jobs_file = jobs_dir / "jobs.json" monkeypatch.setattr("src.scheduler.JOBS_DIR", jobs_dir) monkeypatch.setattr("src.scheduler.JOBS_FILE", jobs_file) return {"dir": jobs_dir, "file": jobs_file} @pytest.fixture def callback(): """Async mock send_callback.""" return AsyncMock() @pytest.fixture def sched(tmp_jobs, callback): """Create a Scheduler with mocked file paths and callback.""" return Scheduler(send_callback=callback) def _sample_job(**overrides): """Return a minimal valid job dict.""" base = { "name": "test-job", "cron": "0 6 * * *", "channel": "general", "model": "sonnet", "prompt": "Hello world", "allowed_tools": [], "enabled": True, "last_run": None, "last_status": None, "next_run": None, } base.update(overrides) return base # --------------------------------------------------------------------------- # _load_jobs / _save_jobs # --------------------------------------------------------------------------- class TestLoadJobs: def test_load_empty_file(self, sched, tmp_jobs): tmp_jobs["file"].write_text("") result = sched._load_jobs() assert result == [] def test_load_missing_file(self, sched, tmp_jobs): # File doesn't exist yet if tmp_jobs["file"].exists(): tmp_jobs["file"].unlink() result = sched._load_jobs() assert result == [] def test_load_valid_jobs(self, sched, tmp_jobs): jobs = [_sample_job()] tmp_jobs["file"].write_text(json.dumps(jobs)) result = sched._load_jobs() assert len(result) == 1 assert result[0]["name"] == "test-job" def test_load_corrupt_json(self, sched, tmp_jobs): tmp_jobs["file"].write_text("{broken json!!!") result = sched._load_jobs() assert result == [] def test_load_non_list_json(self, sched, tmp_jobs): tmp_jobs["file"].write_text('{"not": "a list"}') result = sched._load_jobs() assert result == [] class TestSaveJobs: def test_save_creates_file(self, sched, tmp_jobs): if tmp_jobs["file"].exists(): tmp_jobs["file"].unlink() sched._jobs = [_sample_job()] sched._save_jobs() assert tmp_jobs["file"].exists() data = json.loads(tmp_jobs["file"].read_text()) assert len(data) == 1 assert data[0]["name"] == "test-job" def test_save_roundtrip(self, sched, tmp_jobs): jobs = [_sample_job(), _sample_job(name="second-job")] sched._jobs = jobs sched._save_jobs() loaded = sched._load_jobs() assert len(loaded) == 2 assert loaded[0]["name"] == "test-job" assert loaded[1]["name"] == "second-job" def test_save_creates_dir_if_missing(self, tmp_path, monkeypatch, callback): new_dir = tmp_path / "new_cron" new_file = new_dir / "jobs.json" monkeypatch.setattr("src.scheduler.JOBS_DIR", new_dir) monkeypatch.setattr("src.scheduler.JOBS_FILE", new_file) s = Scheduler(send_callback=callback) s._jobs = [_sample_job()] s._save_jobs() assert new_dir.exists() assert new_file.exists() def test_save_atomic_pattern(self, sched, tmp_jobs): """Verify file is intact after save (atomic write via os.replace).""" sched._jobs = [_sample_job()] sched._save_jobs() # File should be valid JSON data = json.loads(tmp_jobs["file"].read_text()) assert data[0]["name"] == "test-job" # Overwrite with different data sched._jobs = [_sample_job(name="updated")] sched._save_jobs() data = json.loads(tmp_jobs["file"].read_text()) assert data[0]["name"] == "updated" # --------------------------------------------------------------------------- # add_job # --------------------------------------------------------------------------- class TestAddJob: def test_add_job_creates_file(self, sched, tmp_jobs): job = sched.add_job("my-job", "30 6 * * *", "general", "Do something") assert job["name"] == "my-job" assert job["cron"] == "30 6 * * *" assert job["channel"] == "general" assert job["model"] == "sonnet" assert job["enabled"] is True assert tmp_jobs["file"].exists() data = json.loads(tmp_jobs["file"].read_text()) assert len(data) == 1 def test_add_job_with_model(self, sched, tmp_jobs): job = sched.add_job("my-job", "0 * * * *", "ch", "prompt", model="haiku") assert job["model"] == "haiku" def test_add_job_with_tools(self, sched, tmp_jobs): job = sched.add_job( "my-job", "0 * * * *", "ch", "prompt", allowed_tools=["Read", "Bash"] ) assert job["allowed_tools"] == ["Read", "Bash"] def test_add_job_duplicate_name_raises(self, sched, tmp_jobs): sched.add_job("dupe", "0 * * * *", "ch", "prompt") with pytest.raises(ValueError, match="already exists"): sched.add_job("dupe", "0 * * * *", "ch", "prompt") def test_add_job_invalid_cron_raises(self, sched, tmp_jobs): with pytest.raises(ValueError, match="Invalid cron"): sched.add_job("job1", "bad cron", "ch", "prompt") def test_add_job_invalid_model_raises(self, sched, tmp_jobs): with pytest.raises(ValueError, match="Invalid model"): sched.add_job("job1", "0 * * * *", "ch", "prompt", model="gpt4") def test_add_job_invalid_name_uppercase(self, sched, tmp_jobs): with pytest.raises(ValueError, match="Invalid job name"): sched.add_job("MyJob", "0 * * * *", "ch", "prompt") def test_add_job_invalid_name_spaces(self, sched, tmp_jobs): with pytest.raises(ValueError, match="Invalid job name"): sched.add_job("my job", "0 * * * *", "ch", "prompt") def test_add_job_invalid_name_special(self, sched, tmp_jobs): with pytest.raises(ValueError, match="Invalid job name"): sched.add_job("my_job!", "0 * * * *", "ch", "prompt") def test_add_job_empty_prompt_raises(self, sched, tmp_jobs): with pytest.raises(ValueError, match="non-empty"): sched.add_job("job1", "0 * * * *", "ch", "") def test_add_job_whitespace_prompt_raises(self, sched, tmp_jobs): with pytest.raises(ValueError, match="non-empty"): sched.add_job("job1", "0 * * * *", "ch", " ") def test_add_job_prompt_too_long(self, sched, tmp_jobs): with pytest.raises(ValueError, match="too long"): sched.add_job("job1", "0 * * * *", "ch", "x" * 10_001) # --------------------------------------------------------------------------- # remove_job # --------------------------------------------------------------------------- class TestRemoveJob: def test_remove_job(self, sched, tmp_jobs): sched.add_job("to-remove", "0 * * * *", "ch", "prompt") assert sched.remove_job("to-remove") is True assert len(sched.list_jobs()) == 0 data = json.loads(tmp_jobs["file"].read_text()) assert len(data) == 0 def test_remove_job_not_found(self, sched, tmp_jobs): assert sched.remove_job("nonexistent") is False # --------------------------------------------------------------------------- # enable_job / disable_job # --------------------------------------------------------------------------- class TestEnableDisableJob: def test_enable_job(self, sched, tmp_jobs): sched.add_job("toggler", "0 * * * *", "ch", "prompt") sched.disable_job("toggler") jobs = sched.list_jobs() assert jobs[0]["enabled"] is False assert sched.enable_job("toggler") is True jobs = sched.list_jobs() assert jobs[0]["enabled"] is True def test_disable_job(self, sched, tmp_jobs): sched.add_job("toggler", "0 * * * *", "ch", "prompt") assert sched.disable_job("toggler") is True jobs = sched.list_jobs() assert jobs[0]["enabled"] is False assert jobs[0]["next_run"] is None def test_enable_not_found(self, sched, tmp_jobs): assert sched.enable_job("nope") is False def test_disable_not_found(self, sched, tmp_jobs): assert sched.disable_job("nope") is False def test_enable_persists(self, sched, tmp_jobs): sched.add_job("persist", "0 * * * *", "ch", "prompt") sched.disable_job("persist") sched.enable_job("persist") data = json.loads(tmp_jobs["file"].read_text()) assert data[0]["enabled"] is True def test_disable_persists(self, sched, tmp_jobs): sched.add_job("persist", "0 * * * *", "ch", "prompt") sched.disable_job("persist") data = json.loads(tmp_jobs["file"].read_text()) assert data[0]["enabled"] is False # --------------------------------------------------------------------------- # list_jobs # --------------------------------------------------------------------------- class TestListJobs: def test_list_jobs_empty(self, sched, tmp_jobs): assert sched.list_jobs() == [] def test_list_jobs_returns_copy(self, sched, tmp_jobs): sched.add_job("job1", "0 * * * *", "ch", "prompt") jobs = sched.list_jobs() jobs[0]["name"] = "MUTATED" # Internal state should not be affected internal = sched.list_jobs() assert internal[0]["name"] == "job1" def test_list_jobs_returns_all(self, sched, tmp_jobs): sched.add_job("a", "0 * * * *", "ch", "p1") sched.add_job("b", "0 * * * *", "ch", "p2") assert len(sched.list_jobs()) == 2 # --------------------------------------------------------------------------- # run_job / _execute_job # --------------------------------------------------------------------------- class TestRunJob: @pytest.mark.asyncio async def test_run_job_not_found(self, sched, tmp_jobs): with pytest.raises(KeyError, match="not found"): await sched.run_job("nonexistent") @pytest.mark.asyncio async def test_execute_job_success(self, sched, tmp_jobs, callback): sched.add_job("runner", "0 * * * *", "general", "test prompt") mock_proc = MagicMock() mock_proc.returncode = 0 mock_proc.stdout = json.dumps({"result": "Claude says hello"}) mock_proc.stderr = "" with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", return_value=mock_proc): result = await sched.run_job("runner") assert result == "Claude says hello" assert sched._jobs[0]["last_status"] == "ok" assert sched._jobs[0]["last_run"] is not None callback.assert_awaited_once_with("general", "Claude says hello") @pytest.mark.asyncio async def test_execute_job_timeout(self, sched, tmp_jobs, callback): sched.add_job("timeout-job", "0 * * * *", "ch", "prompt") with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=300)): result = await sched.run_job("timeout-job") assert "Error" in result assert "timed out" in result assert sched._jobs[0]["last_status"] == "error" callback.assert_awaited_once() @pytest.mark.asyncio async def test_execute_job_cli_error(self, sched, tmp_jobs, callback): sched.add_job("err-job", "0 * * * *", "ch", "prompt") mock_proc = MagicMock() mock_proc.returncode = 1 mock_proc.stdout = "" mock_proc.stderr = "Some CLI error" with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", return_value=mock_proc): result = await sched.run_job("err-job") assert "Error" in result assert sched._jobs[0]["last_status"] == "error" @pytest.mark.asyncio async def test_execute_job_invalid_json(self, sched, tmp_jobs, callback): sched.add_job("json-err", "0 * * * *", "ch", "prompt") mock_proc = MagicMock() mock_proc.returncode = 0 mock_proc.stdout = "not valid json" mock_proc.stderr = "" with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", return_value=mock_proc): result = await sched.run_job("json-err") assert "Error" in result assert sched._jobs[0]["last_status"] == "error" @pytest.mark.asyncio async def test_execute_job_with_allowed_tools(self, sched, tmp_jobs, callback): sched.add_job( "tools-job", "0 * * * *", "ch", "prompt", allowed_tools=["Read", "Bash"] ) mock_proc = MagicMock() mock_proc.returncode = 0 mock_proc.stdout = json.dumps({"result": "ok"}) mock_proc.stderr = "" with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", return_value=mock_proc) as mock_run: await sched.run_job("tools-job") # Inspect the cmd passed to subprocess.run cmd = mock_run.call_args[0][0] assert "--allowedTools" in cmd assert "Read" in cmd assert "Bash" in cmd @pytest.mark.asyncio async def test_execute_job_no_callback(self, tmp_jobs): """Scheduler with no send_callback should not error on execution.""" s = Scheduler(send_callback=None) s.add_job("no-cb", "0 * * * *", "ch", "prompt") mock_proc = MagicMock() mock_proc.returncode = 0 mock_proc.stdout = json.dumps({"result": "response"}) mock_proc.stderr = "" with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", return_value=mock_proc): result = await s.run_job("no-cb") assert result == "response" @pytest.mark.asyncio async def test_execute_job_updates_last_run(self, sched, tmp_jobs, callback): sched.add_job("time-job", "0 * * * *", "ch", "prompt") assert sched._jobs[0]["last_run"] is None mock_proc = MagicMock() mock_proc.returncode = 0 mock_proc.stdout = json.dumps({"result": "ok"}) mock_proc.stderr = "" with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", return_value=mock_proc): await sched.run_job("time-job") assert sched._jobs[0]["last_run"] is not None # Verify it's a valid ISO timestamp datetime.fromisoformat(sched._jobs[0]["last_run"]) # --------------------------------------------------------------------------- # start / stop # --------------------------------------------------------------------------- class TestStartStop: @pytest.mark.asyncio async def test_start_loads_and_schedules(self, sched, tmp_jobs): jobs = [_sample_job(name="enabled-job", enabled=True)] tmp_jobs["file"].write_text(json.dumps(jobs)) with patch.object(sched._scheduler, "start"), \ patch.object(sched._scheduler, "add_job") as mock_add: await sched.start() assert len(sched._jobs) == 1 mock_add.assert_called_once() @pytest.mark.asyncio async def test_start_skips_disabled(self, sched, tmp_jobs): jobs = [ _sample_job(name="on", enabled=True), _sample_job(name="off", enabled=False), ] tmp_jobs["file"].write_text(json.dumps(jobs)) with patch.object(sched._scheduler, "start"), \ patch.object(sched._scheduler, "add_job") as mock_add: await sched.start() assert len(sched._jobs) == 2 # Only the enabled job should be scheduled assert mock_add.call_count == 1 @pytest.mark.asyncio async def test_start_empty_file(self, sched, tmp_jobs): with patch.object(sched._scheduler, "start"), \ patch.object(sched._scheduler, "add_job") as mock_add: await sched.start() assert sched._jobs == [] mock_add.assert_not_called() @pytest.mark.asyncio async def test_stop(self, sched): with patch.object(sched._scheduler, "shutdown") as mock_shutdown: await sched.stop() mock_shutdown.assert_called_once_with(wait=False) # --------------------------------------------------------------------------- # Name regex validation # --------------------------------------------------------------------------- class TestNameRegex: @pytest.mark.parametrize("name", [ "a", "abc", "my-job", "daily-email-summary", "a1", "123", "0-test", ]) def test_valid_names(self, name): assert _NAME_RE.match(name) is not None @pytest.mark.parametrize("name", [ "", "A", "MyJob", "my_job", "my job", "-start", "a" * 64, # too long (max 63) "job!", "job@work", ]) def test_invalid_names(self, name): assert _NAME_RE.match(name) is None # --------------------------------------------------------------------------- # Integration test: full lifecycle # --------------------------------------------------------------------------- class TestFullLifecycle: @pytest.mark.asyncio async def test_add_list_enable_run_disable_remove(self, sched, tmp_jobs, callback): # Add job = sched.add_job("lifecycle", "0 6 * * *", "ch", "test prompt") assert job["enabled"] is True # List jobs = sched.list_jobs() assert len(jobs) == 1 assert jobs[0]["name"] == "lifecycle" # Disable assert sched.disable_job("lifecycle") is True assert sched.list_jobs()[0]["enabled"] is False # Enable assert sched.enable_job("lifecycle") is True assert sched.list_jobs()[0]["enabled"] is True # Run mock_proc = MagicMock() mock_proc.returncode = 0 mock_proc.stdout = json.dumps({"result": "lifecycle output"}) mock_proc.stderr = "" with patch("src.scheduler.build_system_prompt", return_value="sys"), \ patch("subprocess.run", return_value=mock_proc): result = await sched.run_job("lifecycle") assert result == "lifecycle output" assert sched.list_jobs()[0]["last_status"] == "ok" # Remove assert sched.remove_job("lifecycle") is True assert sched.list_jobs() == []