Files
echo-core/tests/test_scheduler.py
Marius Mutu b3ed653bb3 test(scheduler): cover shell-kind validation, execution, timezone, backward-compat
Adds four new test groups to tests/test_scheduler.py:

- TestTimezone: asserts AsyncIOScheduler is constructed with Europe/Bucharest.
- TestShellKind: 16 cases covering add_shell_job validation (duplicate name
  across claude/shell, invalid cron, empty/non-list/non-string command,
  bad report_on, bad timeout bounds/type, empty channel, custom report_on
  and timeout pass-through).
- TestShellExecute: 14 cases covering the report_on contract:
  - exit 0 + marker N>0 → forwards stdout
  - exit 0 + marker N==0 → silent
  - exit 0 + no marker   → silent + warning
  - report_on=always and =never variants
  - non-zero exit reports stderr even when report_on='never'
  - TimeoutExpired and launch exceptions report '[cron:X] Error: ...'
  - per-job timeout passed to subprocess.run; default 300 when None
  - subprocess.run receives the job's command list verbatim
  - stdout trimmed to 1500 ch; stderr trimmed to 500 ch
- TestBackwardCompat: a jobs.json entry without a 'kind' field dispatches
  to _execute_claude_job (never to _execute_shell_job); the existing Claude
  add_job/run_job round-trip still works with the old CLI invocation.
- TestMarkerRegex: parametrised positive/negative cases for _MARKER_RE.
2026-04-21 07:05:29 +00:00

960 lines
35 KiB
Python

"""Tests for src/scheduler.py — Cron job scheduler."""
import asyncio
import json
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from zoneinfo import ZoneInfo
import pytest
from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE, _MARKER_RE
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_jobs(tmp_path, monkeypatch):
"""Redirect JOBS_DIR / JOBS_FILE to tmp_path for isolation."""
jobs_dir = tmp_path / "cron"
jobs_dir.mkdir()
jobs_file = jobs_dir / "jobs.json"
monkeypatch.setattr("src.scheduler.JOBS_DIR", jobs_dir)
monkeypatch.setattr("src.scheduler.JOBS_FILE", jobs_file)
return {"dir": jobs_dir, "file": jobs_file}
@pytest.fixture
def callback():
"""Async mock send_callback."""
return AsyncMock()
@pytest.fixture
def sched(tmp_jobs, callback):
"""Create a Scheduler with mocked file paths and callback."""
return Scheduler(send_callback=callback)
def _sample_job(**overrides):
"""Return a minimal valid job dict."""
base = {
"name": "test-job",
"cron": "0 6 * * *",
"channel": "general",
"model": "sonnet",
"prompt": "Hello world",
"allowed_tools": [],
"enabled": True,
"last_run": None,
"last_status": None,
"next_run": None,
}
base.update(overrides)
return base
# ---------------------------------------------------------------------------
# _load_jobs / _save_jobs
# ---------------------------------------------------------------------------
class TestLoadJobs:
def test_load_empty_file(self, sched, tmp_jobs):
tmp_jobs["file"].write_text("")
result = sched._load_jobs()
assert result == []
def test_load_missing_file(self, sched, tmp_jobs):
# File doesn't exist yet
if tmp_jobs["file"].exists():
tmp_jobs["file"].unlink()
result = sched._load_jobs()
assert result == []
def test_load_valid_jobs(self, sched, tmp_jobs):
jobs = [_sample_job()]
tmp_jobs["file"].write_text(json.dumps(jobs))
result = sched._load_jobs()
assert len(result) == 1
assert result[0]["name"] == "test-job"
def test_load_corrupt_json(self, sched, tmp_jobs):
tmp_jobs["file"].write_text("{broken json!!!")
result = sched._load_jobs()
assert result == []
def test_load_non_list_json(self, sched, tmp_jobs):
tmp_jobs["file"].write_text('{"not": "a list"}')
result = sched._load_jobs()
assert result == []
class TestSaveJobs:
def test_save_creates_file(self, sched, tmp_jobs):
if tmp_jobs["file"].exists():
tmp_jobs["file"].unlink()
sched._jobs = [_sample_job()]
sched._save_jobs()
assert tmp_jobs["file"].exists()
data = json.loads(tmp_jobs["file"].read_text())
assert len(data) == 1
assert data[0]["name"] == "test-job"
def test_save_roundtrip(self, sched, tmp_jobs):
jobs = [_sample_job(), _sample_job(name="second-job")]
sched._jobs = jobs
sched._save_jobs()
loaded = sched._load_jobs()
assert len(loaded) == 2
assert loaded[0]["name"] == "test-job"
assert loaded[1]["name"] == "second-job"
def test_save_creates_dir_if_missing(self, tmp_path, monkeypatch, callback):
new_dir = tmp_path / "new_cron"
new_file = new_dir / "jobs.json"
monkeypatch.setattr("src.scheduler.JOBS_DIR", new_dir)
monkeypatch.setattr("src.scheduler.JOBS_FILE", new_file)
s = Scheduler(send_callback=callback)
s._jobs = [_sample_job()]
s._save_jobs()
assert new_dir.exists()
assert new_file.exists()
def test_save_atomic_pattern(self, sched, tmp_jobs):
"""Verify file is intact after save (atomic write via os.replace)."""
sched._jobs = [_sample_job()]
sched._save_jobs()
# File should be valid JSON
data = json.loads(tmp_jobs["file"].read_text())
assert data[0]["name"] == "test-job"
# Overwrite with different data
sched._jobs = [_sample_job(name="updated")]
sched._save_jobs()
data = json.loads(tmp_jobs["file"].read_text())
assert data[0]["name"] == "updated"
# ---------------------------------------------------------------------------
# add_job
# ---------------------------------------------------------------------------
class TestAddJob:
def test_add_job_creates_file(self, sched, tmp_jobs):
job = sched.add_job("my-job", "30 6 * * *", "general", "Do something")
assert job["name"] == "my-job"
assert job["cron"] == "30 6 * * *"
assert job["channel"] == "general"
assert job["model"] == "sonnet"
assert job["enabled"] is True
assert tmp_jobs["file"].exists()
data = json.loads(tmp_jobs["file"].read_text())
assert len(data) == 1
def test_add_job_with_model(self, sched, tmp_jobs):
job = sched.add_job("my-job", "0 * * * *", "ch", "prompt", model="haiku")
assert job["model"] == "haiku"
def test_add_job_with_tools(self, sched, tmp_jobs):
job = sched.add_job(
"my-job", "0 * * * *", "ch", "prompt",
allowed_tools=["Read", "Bash"]
)
assert job["allowed_tools"] == ["Read", "Bash"]
def test_add_job_duplicate_name_raises(self, sched, tmp_jobs):
sched.add_job("dupe", "0 * * * *", "ch", "prompt")
with pytest.raises(ValueError, match="already exists"):
sched.add_job("dupe", "0 * * * *", "ch", "prompt")
def test_add_job_invalid_cron_raises(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid cron"):
sched.add_job("job1", "bad cron", "ch", "prompt")
def test_add_job_invalid_model_raises(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid model"):
sched.add_job("job1", "0 * * * *", "ch", "prompt", model="gpt4")
def test_add_job_invalid_name_uppercase(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid job name"):
sched.add_job("MyJob", "0 * * * *", "ch", "prompt")
def test_add_job_invalid_name_spaces(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid job name"):
sched.add_job("my job", "0 * * * *", "ch", "prompt")
def test_add_job_invalid_name_special(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid job name"):
sched.add_job("my_job!", "0 * * * *", "ch", "prompt")
def test_add_job_empty_prompt_raises(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty"):
sched.add_job("job1", "0 * * * *", "ch", "")
def test_add_job_whitespace_prompt_raises(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty"):
sched.add_job("job1", "0 * * * *", "ch", " ")
def test_add_job_prompt_too_long(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="too long"):
sched.add_job("job1", "0 * * * *", "ch", "x" * 10_001)
# ---------------------------------------------------------------------------
# remove_job
# ---------------------------------------------------------------------------
class TestRemoveJob:
def test_remove_job(self, sched, tmp_jobs):
sched.add_job("to-remove", "0 * * * *", "ch", "prompt")
assert sched.remove_job("to-remove") is True
assert len(sched.list_jobs()) == 0
data = json.loads(tmp_jobs["file"].read_text())
assert len(data) == 0
def test_remove_job_not_found(self, sched, tmp_jobs):
assert sched.remove_job("nonexistent") is False
# ---------------------------------------------------------------------------
# enable_job / disable_job
# ---------------------------------------------------------------------------
class TestEnableDisableJob:
def test_enable_job(self, sched, tmp_jobs):
sched.add_job("toggler", "0 * * * *", "ch", "prompt")
sched.disable_job("toggler")
jobs = sched.list_jobs()
assert jobs[0]["enabled"] is False
assert sched.enable_job("toggler") is True
jobs = sched.list_jobs()
assert jobs[0]["enabled"] is True
def test_disable_job(self, sched, tmp_jobs):
sched.add_job("toggler", "0 * * * *", "ch", "prompt")
assert sched.disable_job("toggler") is True
jobs = sched.list_jobs()
assert jobs[0]["enabled"] is False
assert jobs[0]["next_run"] is None
def test_enable_not_found(self, sched, tmp_jobs):
assert sched.enable_job("nope") is False
def test_disable_not_found(self, sched, tmp_jobs):
assert sched.disable_job("nope") is False
def test_enable_persists(self, sched, tmp_jobs):
sched.add_job("persist", "0 * * * *", "ch", "prompt")
sched.disable_job("persist")
sched.enable_job("persist")
data = json.loads(tmp_jobs["file"].read_text())
assert data[0]["enabled"] is True
def test_disable_persists(self, sched, tmp_jobs):
sched.add_job("persist", "0 * * * *", "ch", "prompt")
sched.disable_job("persist")
data = json.loads(tmp_jobs["file"].read_text())
assert data[0]["enabled"] is False
# ---------------------------------------------------------------------------
# list_jobs
# ---------------------------------------------------------------------------
class TestListJobs:
def test_list_jobs_empty(self, sched, tmp_jobs):
assert sched.list_jobs() == []
def test_list_jobs_returns_copy(self, sched, tmp_jobs):
sched.add_job("job1", "0 * * * *", "ch", "prompt")
jobs = sched.list_jobs()
jobs[0]["name"] = "MUTATED"
# Internal state should not be affected
internal = sched.list_jobs()
assert internal[0]["name"] == "job1"
def test_list_jobs_returns_all(self, sched, tmp_jobs):
sched.add_job("a", "0 * * * *", "ch", "p1")
sched.add_job("b", "0 * * * *", "ch", "p2")
assert len(sched.list_jobs()) == 2
# ---------------------------------------------------------------------------
# run_job / _execute_job
# ---------------------------------------------------------------------------
class TestRunJob:
@pytest.mark.asyncio
async def test_run_job_not_found(self, sched, tmp_jobs):
with pytest.raises(KeyError, match="not found"):
await sched.run_job("nonexistent")
@pytest.mark.asyncio
async def test_execute_job_success(self, sched, tmp_jobs, callback):
sched.add_job("runner", "0 * * * *", "general", "test prompt")
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = json.dumps({"result": "Claude says hello"})
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc):
result = await sched.run_job("runner")
assert result == "Claude says hello"
assert sched._jobs[0]["last_status"] == "ok"
assert sched._jobs[0]["last_run"] is not None
callback.assert_awaited_once_with("general", "Claude says hello")
@pytest.mark.asyncio
async def test_execute_job_timeout(self, sched, tmp_jobs, callback):
sched.add_job("timeout-job", "0 * * * *", "ch", "prompt")
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=300)):
result = await sched.run_job("timeout-job")
assert "Error" in result
assert "timed out" in result
assert sched._jobs[0]["last_status"] == "error"
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_execute_job_cli_error(self, sched, tmp_jobs, callback):
sched.add_job("err-job", "0 * * * *", "ch", "prompt")
mock_proc = MagicMock()
mock_proc.returncode = 1
mock_proc.stdout = ""
mock_proc.stderr = "Some CLI error"
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc):
result = await sched.run_job("err-job")
assert "Error" in result
assert sched._jobs[0]["last_status"] == "error"
@pytest.mark.asyncio
async def test_execute_job_invalid_json(self, sched, tmp_jobs, callback):
sched.add_job("json-err", "0 * * * *", "ch", "prompt")
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = "not valid json"
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc):
result = await sched.run_job("json-err")
assert "Error" in result
assert sched._jobs[0]["last_status"] == "error"
@pytest.mark.asyncio
async def test_execute_job_with_allowed_tools(self, sched, tmp_jobs, callback):
sched.add_job(
"tools-job", "0 * * * *", "ch", "prompt",
allowed_tools=["Read", "Bash"]
)
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = json.dumps({"result": "ok"})
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc) as mock_run:
await sched.run_job("tools-job")
# Inspect the cmd passed to subprocess.run
cmd = mock_run.call_args[0][0]
assert "--allowedTools" in cmd
assert "Read" in cmd
assert "Bash" in cmd
@pytest.mark.asyncio
async def test_execute_job_no_callback(self, tmp_jobs):
"""Scheduler with no send_callback should not error on execution."""
s = Scheduler(send_callback=None)
s.add_job("no-cb", "0 * * * *", "ch", "prompt")
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = json.dumps({"result": "response"})
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc):
result = await s.run_job("no-cb")
assert result == "response"
@pytest.mark.asyncio
async def test_execute_job_updates_last_run(self, sched, tmp_jobs, callback):
sched.add_job("time-job", "0 * * * *", "ch", "prompt")
assert sched._jobs[0]["last_run"] is None
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = json.dumps({"result": "ok"})
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc):
await sched.run_job("time-job")
assert sched._jobs[0]["last_run"] is not None
# Verify it's a valid ISO timestamp
datetime.fromisoformat(sched._jobs[0]["last_run"])
# ---------------------------------------------------------------------------
# start / stop
# ---------------------------------------------------------------------------
class TestStartStop:
@pytest.mark.asyncio
async def test_start_loads_and_schedules(self, sched, tmp_jobs):
jobs = [_sample_job(name="enabled-job", enabled=True)]
tmp_jobs["file"].write_text(json.dumps(jobs))
with patch.object(sched._scheduler, "start"), \
patch.object(sched._scheduler, "add_job") as mock_add:
await sched.start()
assert len(sched._jobs) == 1
mock_add.assert_called_once()
@pytest.mark.asyncio
async def test_start_skips_disabled(self, sched, tmp_jobs):
jobs = [
_sample_job(name="on", enabled=True),
_sample_job(name="off", enabled=False),
]
tmp_jobs["file"].write_text(json.dumps(jobs))
with patch.object(sched._scheduler, "start"), \
patch.object(sched._scheduler, "add_job") as mock_add:
await sched.start()
assert len(sched._jobs) == 2
# Only the enabled job should be scheduled
assert mock_add.call_count == 1
@pytest.mark.asyncio
async def test_start_empty_file(self, sched, tmp_jobs):
with patch.object(sched._scheduler, "start"), \
patch.object(sched._scheduler, "add_job") as mock_add:
await sched.start()
assert sched._jobs == []
mock_add.assert_not_called()
@pytest.mark.asyncio
async def test_stop(self, sched):
with patch.object(sched._scheduler, "shutdown") as mock_shutdown:
await sched.stop()
mock_shutdown.assert_called_once_with(wait=False)
# ---------------------------------------------------------------------------
# Name regex validation
# ---------------------------------------------------------------------------
class TestNameRegex:
@pytest.mark.parametrize("name", [
"a", "abc", "my-job", "daily-email-summary",
"a1", "123", "0-test",
])
def test_valid_names(self, name):
assert _NAME_RE.match(name) is not None
@pytest.mark.parametrize("name", [
"", "A", "MyJob", "my_job", "my job", "-start",
"a" * 64, # too long (max 63)
"job!", "job@work",
])
def test_invalid_names(self, name):
assert _NAME_RE.match(name) is None
# ---------------------------------------------------------------------------
# Integration test: full lifecycle
# ---------------------------------------------------------------------------
class TestFullLifecycle:
@pytest.mark.asyncio
async def test_add_list_enable_run_disable_remove(self, sched, tmp_jobs, callback):
# Add
job = sched.add_job("lifecycle", "0 6 * * *", "ch", "test prompt")
assert job["enabled"] is True
# List
jobs = sched.list_jobs()
assert len(jobs) == 1
assert jobs[0]["name"] == "lifecycle"
# Disable
assert sched.disable_job("lifecycle") is True
assert sched.list_jobs()[0]["enabled"] is False
# Enable
assert sched.enable_job("lifecycle") is True
assert sched.list_jobs()[0]["enabled"] is True
# Run
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = json.dumps({"result": "lifecycle output"})
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc):
result = await sched.run_job("lifecycle")
assert result == "lifecycle output"
assert sched.list_jobs()[0]["last_status"] == "ok"
# Remove
assert sched.remove_job("lifecycle") is True
assert sched.list_jobs() == []
# ---------------------------------------------------------------------------
# Timezone
# ---------------------------------------------------------------------------
class TestTimezone:
def test_scheduler_uses_bucharest_tz(self, sched):
tz = sched._scheduler.timezone
# Support zoneinfo.ZoneInfo (py3.9+) and pytz-style tzinfo
key = getattr(tz, "key", None) or getattr(tz, "zone", None) or str(tz)
assert key == "Europe/Bucharest"
# ---------------------------------------------------------------------------
# Shell-kind validation (add_shell_job)
# ---------------------------------------------------------------------------
class TestShellKind:
def test_add_shell_job_creates_file(self, sched, tmp_jobs):
job = sched.add_shell_job(
"shelly", "0 9 * * *", "general",
["/bin/echo", "hello"],
)
assert job["name"] == "shelly"
assert job["kind"] == "shell"
assert job["command"] == ["/bin/echo", "hello"]
assert job["report_on"] == "changes"
assert job["timeout"] is None
assert job["enabled"] is True
assert tmp_jobs["file"].exists()
data = json.loads(tmp_jobs["file"].read_text())
assert len(data) == 1
assert data[0]["kind"] == "shell"
assert data[0]["command"] == ["/bin/echo", "hello"]
def test_add_shell_job_with_report_on_always(self, sched, tmp_jobs):
job = sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], report_on="always",
)
assert job["report_on"] == "always"
def test_add_shell_job_with_custom_timeout(self, sched, tmp_jobs):
job = sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=60,
)
assert job["timeout"] == 60
def test_add_shell_job_duplicate_name_with_claude(self, sched, tmp_jobs):
sched.add_job("x", "0 * * * *", "ch", "prompt")
with pytest.raises(ValueError, match="already exists"):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"])
def test_add_shell_job_duplicate_name_with_shell(self, sched, tmp_jobs):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"])
with pytest.raises(ValueError, match="already exists"):
sched.add_shell_job("x", "0 10 * * *", "ch", ["/bin/true"])
def test_add_shell_job_invalid_cron(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid cron"):
sched.add_shell_job("x", "not a cron", "ch", ["/bin/true"])
def test_add_shell_job_invalid_name(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid job name"):
sched.add_shell_job("BadName", "0 9 * * *", "ch", ["/bin/true"])
def test_add_shell_job_empty_command_list(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", [])
def test_add_shell_job_command_not_list(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", "/bin/echo hi") # type: ignore[arg-type]
def test_add_shell_job_command_with_empty_string(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", ""])
def test_add_shell_job_command_non_string_element(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", 1]) # type: ignore[list-item]
def test_add_shell_job_bad_report_on(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid report_on"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], report_on="maybe",
)
def test_add_shell_job_bad_timeout_zero(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="between 1 and 3600"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=0,
)
def test_add_shell_job_bad_timeout_negative(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="between 1 and 3600"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=-5,
)
def test_add_shell_job_bad_timeout_too_big(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="between 1 and 3600"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=7200,
)
def test_add_shell_job_bad_timeout_not_int(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="must be an int"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout="60", # type: ignore[arg-type]
)
def test_add_shell_job_empty_channel(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty"):
sched.add_shell_job("x", "0 9 * * *", " ", ["/bin/true"])
# ---------------------------------------------------------------------------
# Shell-kind execution
# ---------------------------------------------------------------------------
def _mock_proc(returncode=0, stdout="", stderr=""):
p = MagicMock()
p.returncode = returncode
p.stdout = stdout
p.stderr = stderr
return p
@pytest.fixture
def sched_with_shell_job(sched, tmp_jobs):
"""Scheduler pre-loaded with a shell job 'shelly' (report_on=changes)."""
sched.add_shell_job(
"shelly", "0 9 * * *", "general", ["/bin/true"], report_on="changes",
)
return sched
class TestShellExecute:
@pytest.mark.asyncio
async def test_shell_exit_zero_changes_marker_positive(
self, sched_with_shell_job, callback,
):
stdout = "some body\nGSTACK-CRON: changes=3\nfooter\n"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched_with_shell_job.run_job("shelly")
assert result == stdout[:1500]
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
callback.assert_awaited_once_with("general", stdout[:1500])
@pytest.mark.asyncio
async def test_shell_exit_zero_changes_marker_zero(
self, sched_with_shell_job, callback,
):
stdout = "nothing here\nGSTACK-CRON: changes=0\n"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched_with_shell_job.run_job("shelly")
assert result == ""
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
callback.assert_not_awaited()
@pytest.mark.asyncio
async def test_shell_exit_zero_no_marker(
self, sched_with_shell_job, callback, caplog,
):
stdout = "completely unrelated output\n"
import logging
caplog.set_level(logging.WARNING, logger="src.scheduler")
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched_with_shell_job.run_job("shelly")
assert result == ""
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
callback.assert_not_awaited()
assert any(
"missing GSTACK-CRON marker" in rec.message
for rec in caplog.records
)
@pytest.mark.asyncio
async def test_shell_exit_zero_report_on_always(self, sched, callback):
sched.add_shell_job(
"alw", "0 9 * * *", "general", ["/bin/true"], report_on="always",
)
stdout = "hello world, no marker here"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched.run_job("alw")
assert result == stdout
callback.assert_awaited_once_with("general", stdout)
@pytest.mark.asyncio
async def test_shell_exit_zero_report_on_never(self, sched, callback):
sched.add_shell_job(
"nev", "0 9 * * *", "general", ["/bin/true"], report_on="never",
)
stdout = "GSTACK-CRON: changes=99\nbig change"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched.run_job("nev")
assert result == ""
callback.assert_not_awaited()
@pytest.mark.asyncio
async def test_shell_exit_nonzero_always_reports(
self, sched_with_shell_job, callback,
):
with patch("subprocess.run", return_value=_mock_proc(1, "", "ANAF 502")):
result = await sched_with_shell_job.run_job("shelly")
assert result == "[cron:shelly] exit 1: ANAF 502"
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
callback.assert_awaited_once_with(
"general", "[cron:shelly] exit 1: ANAF 502",
)
@pytest.mark.asyncio
async def test_shell_exit_nonzero_never_still_reports(
self, sched, callback,
):
"""report_on=never must NOT suppress error reports."""
sched.add_shell_job(
"nev", "0 9 * * *", "ch", ["/bin/false"], report_on="never",
)
with patch("subprocess.run", return_value=_mock_proc(2, "", "boom")):
result = await sched.run_job("nev")
assert "exit 2" in result
assert "boom" in result
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_timeout_reports(self, sched_with_shell_job, callback):
with patch(
"subprocess.run",
side_effect=subprocess.TimeoutExpired(cmd="x", timeout=300),
):
result = await sched_with_shell_job.run_job("shelly")
assert "timed out after 300s" in result
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_subprocess_launch_exception(
self, sched_with_shell_job, callback,
):
with patch(
"subprocess.run",
side_effect=FileNotFoundError("no such binary"),
):
result = await sched_with_shell_job.run_job("shelly")
assert "Error" in result
assert "no such binary" in result
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_respects_per_job_timeout(self, sched, callback):
sched.add_shell_job(
"t", "0 9 * * *", "ch", ["/bin/true"], timeout=60,
)
stdout = "GSTACK-CRON: changes=1\nok\n"
with patch(
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
) as mock_run:
await sched.run_job("t")
assert mock_run.call_args.kwargs["timeout"] == 60
@pytest.mark.asyncio
async def test_shell_default_timeout_when_none(
self, sched_with_shell_job, callback,
):
"""timeout=None in job dict falls back to JOB_TIMEOUT (300)."""
stdout = "GSTACK-CRON: changes=1\n"
with patch(
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
) as mock_run:
await sched_with_shell_job.run_job("shelly")
assert mock_run.call_args.kwargs["timeout"] == 300
@pytest.mark.asyncio
async def test_shell_executes_command_list(
self, sched_with_shell_job, callback,
):
"""subprocess.run is invoked with the job's command list verbatim."""
stdout = "GSTACK-CRON: changes=1\n"
with patch(
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
) as mock_run:
await sched_with_shell_job.run_job("shelly")
cmd = mock_run.call_args.args[0]
assert cmd == ["/bin/true"]
@pytest.mark.asyncio
async def test_shell_marker_anywhere_in_output(self, sched, callback):
"""Marker can appear on any line, not just last."""
sched.add_shell_job(
"m", "0 9 * * *", "ch", ["/bin/true"], report_on="changes",
)
stdout = "GSTACK-CRON: changes=5\nbody follows\nmore body\n"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched.run_job("m")
assert result == stdout
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_stderr_trimmed_to_500(
self, sched_with_shell_job, callback,
):
long_stderr = "E" * 2000
with patch(
"subprocess.run", return_value=_mock_proc(1, "", long_stderr),
):
result = await sched_with_shell_job.run_job("shelly")
# 500 chars of stderr max
assert "E" * 500 in result
assert "E" * 501 not in result
@pytest.mark.asyncio
async def test_shell_stdout_trimmed_to_1500(self, sched, callback):
sched.add_shell_job(
"big", "0 9 * * *", "ch", ["/bin/true"], report_on="always",
)
big_stdout = "x" * 5000
with patch("subprocess.run", return_value=_mock_proc(0, big_stdout, "")):
result = await sched.run_job("big")
assert len(result) == 1500
# ---------------------------------------------------------------------------
# Backward compatibility — jobs without kind field default to "claude"
# ---------------------------------------------------------------------------
class TestBackwardCompat:
def test_legacy_job_without_kind_defaults_to_claude(self, sched, tmp_jobs):
"""A jobs.json written before kind existed must still dispatch to claude."""
legacy = [_sample_job(name="legacy")]
# Explicitly ensure no 'kind' key is present
assert "kind" not in legacy[0]
tmp_jobs["file"].write_text(json.dumps(legacy))
sched._jobs = sched._load_jobs()
assert sched._jobs[0].get("kind") is None # still legacy shape on disk
# Dispatch: ensure we go through the claude path, NOT the shell path
with patch.object(
sched, "_execute_claude_job", new=AsyncMock(return_value="ok")
) as claude, patch.object(
sched, "_execute_shell_job", new=AsyncMock(return_value="never")
) as shell:
asyncio.get_event_loop() # keep lints happy; actual run below
asyncio.run(sched._execute_job(sched._jobs[0]))
claude.assert_awaited_once()
shell.assert_not_awaited()
@pytest.mark.asyncio
async def test_claude_jobs_still_work_unchanged(
self, sched, tmp_jobs, callback,
):
"""Existing Claude job pattern: add_job + run_job path unchanged."""
sched.add_job("claude-j", "0 * * * *", "general", "hi")
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = json.dumps({"result": "response"})
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc) as mock_run:
result = await sched.run_job("claude-j")
assert result == "response"
# First positional arg is the CLI command list; confirm it targets CLAUDE_BIN
cmd = mock_run.call_args[0][0]
assert "-p" in cmd
assert "hi" in cmd
assert sched._jobs[0]["last_status"] == "ok"
callback.assert_awaited_once_with("general", "response")
# ---------------------------------------------------------------------------
# Marker regex unit checks
# ---------------------------------------------------------------------------
class TestMarkerRegex:
@pytest.mark.parametrize("stdout,expected", [
("GSTACK-CRON: changes=0\n", 0),
("GSTACK-CRON: changes=1\n", 1),
("GSTACK-CRON: changes=42\n", 42),
("foo\nbar\nGSTACK-CRON: changes=7\nbaz\n", 7),
("GSTACK-CRON: changes=3", 3), # no trailing newline
])
def test_marker_matches(self, stdout, expected):
m = _MARKER_RE.search(stdout)
assert m is not None
assert int(m.group(1)) == expected
@pytest.mark.parametrize("stdout", [
"",
"hello world",
"gstack-cron: changes=5\n", # lowercase
"GSTACK-CRON:changes=5\n", # no space after colon
"GSTACK-CRON: changes=\n", # empty int
"GSTACK-CRON: changes=abc\n", # non-int
"prefix GSTACK-CRON: changes=5\n", # not at start of line
])
def test_marker_no_match(self, stdout):
assert _MARKER_RE.search(stdout) is None