Adds four new test groups to tests/test_scheduler.py: - TestTimezone: asserts AsyncIOScheduler is constructed with Europe/Bucharest. - TestShellKind: 16 cases covering add_shell_job validation (duplicate name across claude/shell, invalid cron, empty/non-list/non-string command, bad report_on, bad timeout bounds/type, empty channel, custom report_on and timeout pass-through). - TestShellExecute: 14 cases covering the report_on contract: - exit 0 + marker N>0 → forwards stdout - exit 0 + marker N==0 → silent - exit 0 + no marker → silent + warning - report_on=always and =never variants - non-zero exit reports stderr even when report_on='never' - TimeoutExpired and launch exceptions report '[cron:X] Error: ...' - per-job timeout passed to subprocess.run; default 300 when None - subprocess.run receives the job's command list verbatim - stdout trimmed to 1500 ch; stderr trimmed to 500 ch - TestBackwardCompat: a jobs.json entry without a 'kind' field dispatches to _execute_claude_job (never to _execute_shell_job); the existing Claude add_job/run_job round-trip still works with the old CLI invocation. - TestMarkerRegex: parametrised positive/negative cases for _MARKER_RE.
960 lines
35 KiB
Python
960 lines
35 KiB
Python
"""Tests for src/scheduler.py — Cron job scheduler."""
|
|
|
|
import asyncio
|
|
import json
|
|
import subprocess
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import pytest
|
|
|
|
from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE, _MARKER_RE
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_jobs(tmp_path, monkeypatch):
|
|
"""Redirect JOBS_DIR / JOBS_FILE to tmp_path for isolation."""
|
|
jobs_dir = tmp_path / "cron"
|
|
jobs_dir.mkdir()
|
|
jobs_file = jobs_dir / "jobs.json"
|
|
|
|
monkeypatch.setattr("src.scheduler.JOBS_DIR", jobs_dir)
|
|
monkeypatch.setattr("src.scheduler.JOBS_FILE", jobs_file)
|
|
return {"dir": jobs_dir, "file": jobs_file}
|
|
|
|
|
|
@pytest.fixture
|
|
def callback():
|
|
"""Async mock send_callback."""
|
|
return AsyncMock()
|
|
|
|
|
|
@pytest.fixture
|
|
def sched(tmp_jobs, callback):
|
|
"""Create a Scheduler with mocked file paths and callback."""
|
|
return Scheduler(send_callback=callback)
|
|
|
|
|
|
def _sample_job(**overrides):
|
|
"""Return a minimal valid job dict."""
|
|
base = {
|
|
"name": "test-job",
|
|
"cron": "0 6 * * *",
|
|
"channel": "general",
|
|
"model": "sonnet",
|
|
"prompt": "Hello world",
|
|
"allowed_tools": [],
|
|
"enabled": True,
|
|
"last_run": None,
|
|
"last_status": None,
|
|
"next_run": None,
|
|
}
|
|
base.update(overrides)
|
|
return base
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _load_jobs / _save_jobs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLoadJobs:
|
|
def test_load_empty_file(self, sched, tmp_jobs):
|
|
tmp_jobs["file"].write_text("")
|
|
result = sched._load_jobs()
|
|
assert result == []
|
|
|
|
def test_load_missing_file(self, sched, tmp_jobs):
|
|
# File doesn't exist yet
|
|
if tmp_jobs["file"].exists():
|
|
tmp_jobs["file"].unlink()
|
|
result = sched._load_jobs()
|
|
assert result == []
|
|
|
|
def test_load_valid_jobs(self, sched, tmp_jobs):
|
|
jobs = [_sample_job()]
|
|
tmp_jobs["file"].write_text(json.dumps(jobs))
|
|
result = sched._load_jobs()
|
|
assert len(result) == 1
|
|
assert result[0]["name"] == "test-job"
|
|
|
|
def test_load_corrupt_json(self, sched, tmp_jobs):
|
|
tmp_jobs["file"].write_text("{broken json!!!")
|
|
result = sched._load_jobs()
|
|
assert result == []
|
|
|
|
def test_load_non_list_json(self, sched, tmp_jobs):
|
|
tmp_jobs["file"].write_text('{"not": "a list"}')
|
|
result = sched._load_jobs()
|
|
assert result == []
|
|
|
|
|
|
class TestSaveJobs:
|
|
def test_save_creates_file(self, sched, tmp_jobs):
|
|
if tmp_jobs["file"].exists():
|
|
tmp_jobs["file"].unlink()
|
|
sched._jobs = [_sample_job()]
|
|
sched._save_jobs()
|
|
assert tmp_jobs["file"].exists()
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert len(data) == 1
|
|
assert data[0]["name"] == "test-job"
|
|
|
|
def test_save_roundtrip(self, sched, tmp_jobs):
|
|
jobs = [_sample_job(), _sample_job(name="second-job")]
|
|
sched._jobs = jobs
|
|
sched._save_jobs()
|
|
|
|
loaded = sched._load_jobs()
|
|
assert len(loaded) == 2
|
|
assert loaded[0]["name"] == "test-job"
|
|
assert loaded[1]["name"] == "second-job"
|
|
|
|
def test_save_creates_dir_if_missing(self, tmp_path, monkeypatch, callback):
|
|
new_dir = tmp_path / "new_cron"
|
|
new_file = new_dir / "jobs.json"
|
|
monkeypatch.setattr("src.scheduler.JOBS_DIR", new_dir)
|
|
monkeypatch.setattr("src.scheduler.JOBS_FILE", new_file)
|
|
|
|
s = Scheduler(send_callback=callback)
|
|
s._jobs = [_sample_job()]
|
|
s._save_jobs()
|
|
|
|
assert new_dir.exists()
|
|
assert new_file.exists()
|
|
|
|
def test_save_atomic_pattern(self, sched, tmp_jobs):
|
|
"""Verify file is intact after save (atomic write via os.replace)."""
|
|
sched._jobs = [_sample_job()]
|
|
sched._save_jobs()
|
|
|
|
# File should be valid JSON
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert data[0]["name"] == "test-job"
|
|
|
|
# Overwrite with different data
|
|
sched._jobs = [_sample_job(name="updated")]
|
|
sched._save_jobs()
|
|
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert data[0]["name"] == "updated"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# add_job
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAddJob:
|
|
def test_add_job_creates_file(self, sched, tmp_jobs):
|
|
job = sched.add_job("my-job", "30 6 * * *", "general", "Do something")
|
|
assert job["name"] == "my-job"
|
|
assert job["cron"] == "30 6 * * *"
|
|
assert job["channel"] == "general"
|
|
assert job["model"] == "sonnet"
|
|
assert job["enabled"] is True
|
|
assert tmp_jobs["file"].exists()
|
|
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert len(data) == 1
|
|
|
|
def test_add_job_with_model(self, sched, tmp_jobs):
|
|
job = sched.add_job("my-job", "0 * * * *", "ch", "prompt", model="haiku")
|
|
assert job["model"] == "haiku"
|
|
|
|
def test_add_job_with_tools(self, sched, tmp_jobs):
|
|
job = sched.add_job(
|
|
"my-job", "0 * * * *", "ch", "prompt",
|
|
allowed_tools=["Read", "Bash"]
|
|
)
|
|
assert job["allowed_tools"] == ["Read", "Bash"]
|
|
|
|
def test_add_job_duplicate_name_raises(self, sched, tmp_jobs):
|
|
sched.add_job("dupe", "0 * * * *", "ch", "prompt")
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
sched.add_job("dupe", "0 * * * *", "ch", "prompt")
|
|
|
|
def test_add_job_invalid_cron_raises(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid cron"):
|
|
sched.add_job("job1", "bad cron", "ch", "prompt")
|
|
|
|
def test_add_job_invalid_model_raises(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid model"):
|
|
sched.add_job("job1", "0 * * * *", "ch", "prompt", model="gpt4")
|
|
|
|
def test_add_job_invalid_name_uppercase(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid job name"):
|
|
sched.add_job("MyJob", "0 * * * *", "ch", "prompt")
|
|
|
|
def test_add_job_invalid_name_spaces(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid job name"):
|
|
sched.add_job("my job", "0 * * * *", "ch", "prompt")
|
|
|
|
def test_add_job_invalid_name_special(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid job name"):
|
|
sched.add_job("my_job!", "0 * * * *", "ch", "prompt")
|
|
|
|
def test_add_job_empty_prompt_raises(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="non-empty"):
|
|
sched.add_job("job1", "0 * * * *", "ch", "")
|
|
|
|
def test_add_job_whitespace_prompt_raises(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="non-empty"):
|
|
sched.add_job("job1", "0 * * * *", "ch", " ")
|
|
|
|
def test_add_job_prompt_too_long(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="too long"):
|
|
sched.add_job("job1", "0 * * * *", "ch", "x" * 10_001)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# remove_job
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRemoveJob:
|
|
def test_remove_job(self, sched, tmp_jobs):
|
|
sched.add_job("to-remove", "0 * * * *", "ch", "prompt")
|
|
assert sched.remove_job("to-remove") is True
|
|
assert len(sched.list_jobs()) == 0
|
|
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert len(data) == 0
|
|
|
|
def test_remove_job_not_found(self, sched, tmp_jobs):
|
|
assert sched.remove_job("nonexistent") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# enable_job / disable_job
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEnableDisableJob:
|
|
def test_enable_job(self, sched, tmp_jobs):
|
|
sched.add_job("toggler", "0 * * * *", "ch", "prompt")
|
|
sched.disable_job("toggler")
|
|
|
|
jobs = sched.list_jobs()
|
|
assert jobs[0]["enabled"] is False
|
|
|
|
assert sched.enable_job("toggler") is True
|
|
jobs = sched.list_jobs()
|
|
assert jobs[0]["enabled"] is True
|
|
|
|
def test_disable_job(self, sched, tmp_jobs):
|
|
sched.add_job("toggler", "0 * * * *", "ch", "prompt")
|
|
assert sched.disable_job("toggler") is True
|
|
|
|
jobs = sched.list_jobs()
|
|
assert jobs[0]["enabled"] is False
|
|
assert jobs[0]["next_run"] is None
|
|
|
|
def test_enable_not_found(self, sched, tmp_jobs):
|
|
assert sched.enable_job("nope") is False
|
|
|
|
def test_disable_not_found(self, sched, tmp_jobs):
|
|
assert sched.disable_job("nope") is False
|
|
|
|
def test_enable_persists(self, sched, tmp_jobs):
|
|
sched.add_job("persist", "0 * * * *", "ch", "prompt")
|
|
sched.disable_job("persist")
|
|
sched.enable_job("persist")
|
|
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert data[0]["enabled"] is True
|
|
|
|
def test_disable_persists(self, sched, tmp_jobs):
|
|
sched.add_job("persist", "0 * * * *", "ch", "prompt")
|
|
sched.disable_job("persist")
|
|
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert data[0]["enabled"] is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_jobs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestListJobs:
|
|
def test_list_jobs_empty(self, sched, tmp_jobs):
|
|
assert sched.list_jobs() == []
|
|
|
|
def test_list_jobs_returns_copy(self, sched, tmp_jobs):
|
|
sched.add_job("job1", "0 * * * *", "ch", "prompt")
|
|
|
|
jobs = sched.list_jobs()
|
|
jobs[0]["name"] = "MUTATED"
|
|
|
|
# Internal state should not be affected
|
|
internal = sched.list_jobs()
|
|
assert internal[0]["name"] == "job1"
|
|
|
|
def test_list_jobs_returns_all(self, sched, tmp_jobs):
|
|
sched.add_job("a", "0 * * * *", "ch", "p1")
|
|
sched.add_job("b", "0 * * * *", "ch", "p2")
|
|
assert len(sched.list_jobs()) == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# run_job / _execute_job
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRunJob:
|
|
@pytest.mark.asyncio
|
|
async def test_run_job_not_found(self, sched, tmp_jobs):
|
|
with pytest.raises(KeyError, match="not found"):
|
|
await sched.run_job("nonexistent")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_success(self, sched, tmp_jobs, callback):
|
|
sched.add_job("runner", "0 * * * *", "general", "test prompt")
|
|
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.stdout = json.dumps({"result": "Claude says hello"})
|
|
mock_proc.stderr = ""
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc):
|
|
result = await sched.run_job("runner")
|
|
|
|
assert result == "Claude says hello"
|
|
assert sched._jobs[0]["last_status"] == "ok"
|
|
assert sched._jobs[0]["last_run"] is not None
|
|
callback.assert_awaited_once_with("general", "Claude says hello")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_timeout(self, sched, tmp_jobs, callback):
|
|
sched.add_job("timeout-job", "0 * * * *", "ch", "prompt")
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=300)):
|
|
result = await sched.run_job("timeout-job")
|
|
|
|
assert "Error" in result
|
|
assert "timed out" in result
|
|
assert sched._jobs[0]["last_status"] == "error"
|
|
callback.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_cli_error(self, sched, tmp_jobs, callback):
|
|
sched.add_job("err-job", "0 * * * *", "ch", "prompt")
|
|
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 1
|
|
mock_proc.stdout = ""
|
|
mock_proc.stderr = "Some CLI error"
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc):
|
|
result = await sched.run_job("err-job")
|
|
|
|
assert "Error" in result
|
|
assert sched._jobs[0]["last_status"] == "error"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_invalid_json(self, sched, tmp_jobs, callback):
|
|
sched.add_job("json-err", "0 * * * *", "ch", "prompt")
|
|
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.stdout = "not valid json"
|
|
mock_proc.stderr = ""
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc):
|
|
result = await sched.run_job("json-err")
|
|
|
|
assert "Error" in result
|
|
assert sched._jobs[0]["last_status"] == "error"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_with_allowed_tools(self, sched, tmp_jobs, callback):
|
|
sched.add_job(
|
|
"tools-job", "0 * * * *", "ch", "prompt",
|
|
allowed_tools=["Read", "Bash"]
|
|
)
|
|
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.stdout = json.dumps({"result": "ok"})
|
|
mock_proc.stderr = ""
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc) as mock_run:
|
|
await sched.run_job("tools-job")
|
|
|
|
# Inspect the cmd passed to subprocess.run
|
|
cmd = mock_run.call_args[0][0]
|
|
assert "--allowedTools" in cmd
|
|
assert "Read" in cmd
|
|
assert "Bash" in cmd
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_no_callback(self, tmp_jobs):
|
|
"""Scheduler with no send_callback should not error on execution."""
|
|
s = Scheduler(send_callback=None)
|
|
s.add_job("no-cb", "0 * * * *", "ch", "prompt")
|
|
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.stdout = json.dumps({"result": "response"})
|
|
mock_proc.stderr = ""
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc):
|
|
result = await s.run_job("no-cb")
|
|
|
|
assert result == "response"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_updates_last_run(self, sched, tmp_jobs, callback):
|
|
sched.add_job("time-job", "0 * * * *", "ch", "prompt")
|
|
assert sched._jobs[0]["last_run"] is None
|
|
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.stdout = json.dumps({"result": "ok"})
|
|
mock_proc.stderr = ""
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc):
|
|
await sched.run_job("time-job")
|
|
|
|
assert sched._jobs[0]["last_run"] is not None
|
|
# Verify it's a valid ISO timestamp
|
|
datetime.fromisoformat(sched._jobs[0]["last_run"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# start / stop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStartStop:
|
|
@pytest.mark.asyncio
|
|
async def test_start_loads_and_schedules(self, sched, tmp_jobs):
|
|
jobs = [_sample_job(name="enabled-job", enabled=True)]
|
|
tmp_jobs["file"].write_text(json.dumps(jobs))
|
|
|
|
with patch.object(sched._scheduler, "start"), \
|
|
patch.object(sched._scheduler, "add_job") as mock_add:
|
|
await sched.start()
|
|
|
|
assert len(sched._jobs) == 1
|
|
mock_add.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_skips_disabled(self, sched, tmp_jobs):
|
|
jobs = [
|
|
_sample_job(name="on", enabled=True),
|
|
_sample_job(name="off", enabled=False),
|
|
]
|
|
tmp_jobs["file"].write_text(json.dumps(jobs))
|
|
|
|
with patch.object(sched._scheduler, "start"), \
|
|
patch.object(sched._scheduler, "add_job") as mock_add:
|
|
await sched.start()
|
|
|
|
assert len(sched._jobs) == 2
|
|
# Only the enabled job should be scheduled
|
|
assert mock_add.call_count == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_empty_file(self, sched, tmp_jobs):
|
|
with patch.object(sched._scheduler, "start"), \
|
|
patch.object(sched._scheduler, "add_job") as mock_add:
|
|
await sched.start()
|
|
|
|
assert sched._jobs == []
|
|
mock_add.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop(self, sched):
|
|
with patch.object(sched._scheduler, "shutdown") as mock_shutdown:
|
|
await sched.stop()
|
|
mock_shutdown.assert_called_once_with(wait=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Name regex validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNameRegex:
|
|
@pytest.mark.parametrize("name", [
|
|
"a", "abc", "my-job", "daily-email-summary",
|
|
"a1", "123", "0-test",
|
|
])
|
|
def test_valid_names(self, name):
|
|
assert _NAME_RE.match(name) is not None
|
|
|
|
@pytest.mark.parametrize("name", [
|
|
"", "A", "MyJob", "my_job", "my job", "-start",
|
|
"a" * 64, # too long (max 63)
|
|
"job!", "job@work",
|
|
])
|
|
def test_invalid_names(self, name):
|
|
assert _NAME_RE.match(name) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Integration test: full lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFullLifecycle:
|
|
@pytest.mark.asyncio
|
|
async def test_add_list_enable_run_disable_remove(self, sched, tmp_jobs, callback):
|
|
# Add
|
|
job = sched.add_job("lifecycle", "0 6 * * *", "ch", "test prompt")
|
|
assert job["enabled"] is True
|
|
|
|
# List
|
|
jobs = sched.list_jobs()
|
|
assert len(jobs) == 1
|
|
assert jobs[0]["name"] == "lifecycle"
|
|
|
|
# Disable
|
|
assert sched.disable_job("lifecycle") is True
|
|
assert sched.list_jobs()[0]["enabled"] is False
|
|
|
|
# Enable
|
|
assert sched.enable_job("lifecycle") is True
|
|
assert sched.list_jobs()[0]["enabled"] is True
|
|
|
|
# Run
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.stdout = json.dumps({"result": "lifecycle output"})
|
|
mock_proc.stderr = ""
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc):
|
|
result = await sched.run_job("lifecycle")
|
|
|
|
assert result == "lifecycle output"
|
|
assert sched.list_jobs()[0]["last_status"] == "ok"
|
|
|
|
# Remove
|
|
assert sched.remove_job("lifecycle") is True
|
|
assert sched.list_jobs() == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Timezone
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestTimezone:
|
|
def test_scheduler_uses_bucharest_tz(self, sched):
|
|
tz = sched._scheduler.timezone
|
|
# Support zoneinfo.ZoneInfo (py3.9+) and pytz-style tzinfo
|
|
key = getattr(tz, "key", None) or getattr(tz, "zone", None) or str(tz)
|
|
assert key == "Europe/Bucharest"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shell-kind validation (add_shell_job)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestShellKind:
|
|
def test_add_shell_job_creates_file(self, sched, tmp_jobs):
|
|
job = sched.add_shell_job(
|
|
"shelly", "0 9 * * *", "general",
|
|
["/bin/echo", "hello"],
|
|
)
|
|
assert job["name"] == "shelly"
|
|
assert job["kind"] == "shell"
|
|
assert job["command"] == ["/bin/echo", "hello"]
|
|
assert job["report_on"] == "changes"
|
|
assert job["timeout"] is None
|
|
assert job["enabled"] is True
|
|
assert tmp_jobs["file"].exists()
|
|
|
|
data = json.loads(tmp_jobs["file"].read_text())
|
|
assert len(data) == 1
|
|
assert data[0]["kind"] == "shell"
|
|
assert data[0]["command"] == ["/bin/echo", "hello"]
|
|
|
|
def test_add_shell_job_with_report_on_always(self, sched, tmp_jobs):
|
|
job = sched.add_shell_job(
|
|
"x", "0 9 * * *", "ch", ["/bin/true"], report_on="always",
|
|
)
|
|
assert job["report_on"] == "always"
|
|
|
|
def test_add_shell_job_with_custom_timeout(self, sched, tmp_jobs):
|
|
job = sched.add_shell_job(
|
|
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=60,
|
|
)
|
|
assert job["timeout"] == 60
|
|
|
|
def test_add_shell_job_duplicate_name_with_claude(self, sched, tmp_jobs):
|
|
sched.add_job("x", "0 * * * *", "ch", "prompt")
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"])
|
|
|
|
def test_add_shell_job_duplicate_name_with_shell(self, sched, tmp_jobs):
|
|
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"])
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
sched.add_shell_job("x", "0 10 * * *", "ch", ["/bin/true"])
|
|
|
|
def test_add_shell_job_invalid_cron(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid cron"):
|
|
sched.add_shell_job("x", "not a cron", "ch", ["/bin/true"])
|
|
|
|
def test_add_shell_job_invalid_name(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid job name"):
|
|
sched.add_shell_job("BadName", "0 9 * * *", "ch", ["/bin/true"])
|
|
|
|
def test_add_shell_job_empty_command_list(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="non-empty list"):
|
|
sched.add_shell_job("x", "0 9 * * *", "ch", [])
|
|
|
|
def test_add_shell_job_command_not_list(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="non-empty list"):
|
|
sched.add_shell_job("x", "0 9 * * *", "ch", "/bin/echo hi") # type: ignore[arg-type]
|
|
|
|
def test_add_shell_job_command_with_empty_string(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="non-empty list"):
|
|
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", ""])
|
|
|
|
def test_add_shell_job_command_non_string_element(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="non-empty list"):
|
|
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", 1]) # type: ignore[list-item]
|
|
|
|
def test_add_shell_job_bad_report_on(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="Invalid report_on"):
|
|
sched.add_shell_job(
|
|
"x", "0 9 * * *", "ch", ["/bin/true"], report_on="maybe",
|
|
)
|
|
|
|
def test_add_shell_job_bad_timeout_zero(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="between 1 and 3600"):
|
|
sched.add_shell_job(
|
|
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=0,
|
|
)
|
|
|
|
def test_add_shell_job_bad_timeout_negative(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="between 1 and 3600"):
|
|
sched.add_shell_job(
|
|
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=-5,
|
|
)
|
|
|
|
def test_add_shell_job_bad_timeout_too_big(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="between 1 and 3600"):
|
|
sched.add_shell_job(
|
|
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=7200,
|
|
)
|
|
|
|
def test_add_shell_job_bad_timeout_not_int(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="must be an int"):
|
|
sched.add_shell_job(
|
|
"x", "0 9 * * *", "ch", ["/bin/true"], timeout="60", # type: ignore[arg-type]
|
|
)
|
|
|
|
def test_add_shell_job_empty_channel(self, sched, tmp_jobs):
|
|
with pytest.raises(ValueError, match="non-empty"):
|
|
sched.add_shell_job("x", "0 9 * * *", " ", ["/bin/true"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shell-kind execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _mock_proc(returncode=0, stdout="", stderr=""):
|
|
p = MagicMock()
|
|
p.returncode = returncode
|
|
p.stdout = stdout
|
|
p.stderr = stderr
|
|
return p
|
|
|
|
|
|
@pytest.fixture
|
|
def sched_with_shell_job(sched, tmp_jobs):
|
|
"""Scheduler pre-loaded with a shell job 'shelly' (report_on=changes)."""
|
|
sched.add_shell_job(
|
|
"shelly", "0 9 * * *", "general", ["/bin/true"], report_on="changes",
|
|
)
|
|
return sched
|
|
|
|
|
|
class TestShellExecute:
|
|
@pytest.mark.asyncio
|
|
async def test_shell_exit_zero_changes_marker_positive(
|
|
self, sched_with_shell_job, callback,
|
|
):
|
|
stdout = "some body\nGSTACK-CRON: changes=3\nfooter\n"
|
|
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
|
|
result = await sched_with_shell_job.run_job("shelly")
|
|
assert result == stdout[:1500]
|
|
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
|
|
callback.assert_awaited_once_with("general", stdout[:1500])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_exit_zero_changes_marker_zero(
|
|
self, sched_with_shell_job, callback,
|
|
):
|
|
stdout = "nothing here\nGSTACK-CRON: changes=0\n"
|
|
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
|
|
result = await sched_with_shell_job.run_job("shelly")
|
|
assert result == ""
|
|
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
|
|
callback.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_exit_zero_no_marker(
|
|
self, sched_with_shell_job, callback, caplog,
|
|
):
|
|
stdout = "completely unrelated output\n"
|
|
import logging
|
|
caplog.set_level(logging.WARNING, logger="src.scheduler")
|
|
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
|
|
result = await sched_with_shell_job.run_job("shelly")
|
|
assert result == ""
|
|
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
|
|
callback.assert_not_awaited()
|
|
assert any(
|
|
"missing GSTACK-CRON marker" in rec.message
|
|
for rec in caplog.records
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_exit_zero_report_on_always(self, sched, callback):
|
|
sched.add_shell_job(
|
|
"alw", "0 9 * * *", "general", ["/bin/true"], report_on="always",
|
|
)
|
|
stdout = "hello world, no marker here"
|
|
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
|
|
result = await sched.run_job("alw")
|
|
assert result == stdout
|
|
callback.assert_awaited_once_with("general", stdout)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_exit_zero_report_on_never(self, sched, callback):
|
|
sched.add_shell_job(
|
|
"nev", "0 9 * * *", "general", ["/bin/true"], report_on="never",
|
|
)
|
|
stdout = "GSTACK-CRON: changes=99\nbig change"
|
|
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
|
|
result = await sched.run_job("nev")
|
|
assert result == ""
|
|
callback.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_exit_nonzero_always_reports(
|
|
self, sched_with_shell_job, callback,
|
|
):
|
|
with patch("subprocess.run", return_value=_mock_proc(1, "", "ANAF 502")):
|
|
result = await sched_with_shell_job.run_job("shelly")
|
|
assert result == "[cron:shelly] exit 1: ANAF 502"
|
|
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
|
|
callback.assert_awaited_once_with(
|
|
"general", "[cron:shelly] exit 1: ANAF 502",
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_exit_nonzero_never_still_reports(
|
|
self, sched, callback,
|
|
):
|
|
"""report_on=never must NOT suppress error reports."""
|
|
sched.add_shell_job(
|
|
"nev", "0 9 * * *", "ch", ["/bin/false"], report_on="never",
|
|
)
|
|
with patch("subprocess.run", return_value=_mock_proc(2, "", "boom")):
|
|
result = await sched.run_job("nev")
|
|
assert "exit 2" in result
|
|
assert "boom" in result
|
|
callback.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_timeout_reports(self, sched_with_shell_job, callback):
|
|
with patch(
|
|
"subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired(cmd="x", timeout=300),
|
|
):
|
|
result = await sched_with_shell_job.run_job("shelly")
|
|
assert "timed out after 300s" in result
|
|
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
|
|
callback.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_subprocess_launch_exception(
|
|
self, sched_with_shell_job, callback,
|
|
):
|
|
with patch(
|
|
"subprocess.run",
|
|
side_effect=FileNotFoundError("no such binary"),
|
|
):
|
|
result = await sched_with_shell_job.run_job("shelly")
|
|
assert "Error" in result
|
|
assert "no such binary" in result
|
|
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
|
|
callback.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_respects_per_job_timeout(self, sched, callback):
|
|
sched.add_shell_job(
|
|
"t", "0 9 * * *", "ch", ["/bin/true"], timeout=60,
|
|
)
|
|
stdout = "GSTACK-CRON: changes=1\nok\n"
|
|
with patch(
|
|
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
|
|
) as mock_run:
|
|
await sched.run_job("t")
|
|
assert mock_run.call_args.kwargs["timeout"] == 60
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_default_timeout_when_none(
|
|
self, sched_with_shell_job, callback,
|
|
):
|
|
"""timeout=None in job dict falls back to JOB_TIMEOUT (300)."""
|
|
stdout = "GSTACK-CRON: changes=1\n"
|
|
with patch(
|
|
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
|
|
) as mock_run:
|
|
await sched_with_shell_job.run_job("shelly")
|
|
assert mock_run.call_args.kwargs["timeout"] == 300
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_executes_command_list(
|
|
self, sched_with_shell_job, callback,
|
|
):
|
|
"""subprocess.run is invoked with the job's command list verbatim."""
|
|
stdout = "GSTACK-CRON: changes=1\n"
|
|
with patch(
|
|
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
|
|
) as mock_run:
|
|
await sched_with_shell_job.run_job("shelly")
|
|
cmd = mock_run.call_args.args[0]
|
|
assert cmd == ["/bin/true"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_marker_anywhere_in_output(self, sched, callback):
|
|
"""Marker can appear on any line, not just last."""
|
|
sched.add_shell_job(
|
|
"m", "0 9 * * *", "ch", ["/bin/true"], report_on="changes",
|
|
)
|
|
stdout = "GSTACK-CRON: changes=5\nbody follows\nmore body\n"
|
|
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
|
|
result = await sched.run_job("m")
|
|
assert result == stdout
|
|
callback.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_stderr_trimmed_to_500(
|
|
self, sched_with_shell_job, callback,
|
|
):
|
|
long_stderr = "E" * 2000
|
|
with patch(
|
|
"subprocess.run", return_value=_mock_proc(1, "", long_stderr),
|
|
):
|
|
result = await sched_with_shell_job.run_job("shelly")
|
|
# 500 chars of stderr max
|
|
assert "E" * 500 in result
|
|
assert "E" * 501 not in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shell_stdout_trimmed_to_1500(self, sched, callback):
|
|
sched.add_shell_job(
|
|
"big", "0 9 * * *", "ch", ["/bin/true"], report_on="always",
|
|
)
|
|
big_stdout = "x" * 5000
|
|
with patch("subprocess.run", return_value=_mock_proc(0, big_stdout, "")):
|
|
result = await sched.run_job("big")
|
|
assert len(result) == 1500
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Backward compatibility — jobs without kind field default to "claude"
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBackwardCompat:
|
|
def test_legacy_job_without_kind_defaults_to_claude(self, sched, tmp_jobs):
|
|
"""A jobs.json written before kind existed must still dispatch to claude."""
|
|
legacy = [_sample_job(name="legacy")]
|
|
# Explicitly ensure no 'kind' key is present
|
|
assert "kind" not in legacy[0]
|
|
tmp_jobs["file"].write_text(json.dumps(legacy))
|
|
sched._jobs = sched._load_jobs()
|
|
|
|
assert sched._jobs[0].get("kind") is None # still legacy shape on disk
|
|
# Dispatch: ensure we go through the claude path, NOT the shell path
|
|
with patch.object(
|
|
sched, "_execute_claude_job", new=AsyncMock(return_value="ok")
|
|
) as claude, patch.object(
|
|
sched, "_execute_shell_job", new=AsyncMock(return_value="never")
|
|
) as shell:
|
|
asyncio.get_event_loop() # keep lints happy; actual run below
|
|
asyncio.run(sched._execute_job(sched._jobs[0]))
|
|
claude.assert_awaited_once()
|
|
shell.assert_not_awaited()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claude_jobs_still_work_unchanged(
|
|
self, sched, tmp_jobs, callback,
|
|
):
|
|
"""Existing Claude job pattern: add_job + run_job path unchanged."""
|
|
sched.add_job("claude-j", "0 * * * *", "general", "hi")
|
|
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.stdout = json.dumps({"result": "response"})
|
|
mock_proc.stderr = ""
|
|
|
|
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
|
|
patch("subprocess.run", return_value=mock_proc) as mock_run:
|
|
result = await sched.run_job("claude-j")
|
|
|
|
assert result == "response"
|
|
# First positional arg is the CLI command list; confirm it targets CLAUDE_BIN
|
|
cmd = mock_run.call_args[0][0]
|
|
assert "-p" in cmd
|
|
assert "hi" in cmd
|
|
assert sched._jobs[0]["last_status"] == "ok"
|
|
callback.assert_awaited_once_with("general", "response")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Marker regex unit checks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMarkerRegex:
|
|
@pytest.mark.parametrize("stdout,expected", [
|
|
("GSTACK-CRON: changes=0\n", 0),
|
|
("GSTACK-CRON: changes=1\n", 1),
|
|
("GSTACK-CRON: changes=42\n", 42),
|
|
("foo\nbar\nGSTACK-CRON: changes=7\nbaz\n", 7),
|
|
("GSTACK-CRON: changes=3", 3), # no trailing newline
|
|
])
|
|
def test_marker_matches(self, stdout, expected):
|
|
m = _MARKER_RE.search(stdout)
|
|
assert m is not None
|
|
assert int(m.group(1)) == expected
|
|
|
|
@pytest.mark.parametrize("stdout", [
|
|
"",
|
|
"hello world",
|
|
"gstack-cron: changes=5\n", # lowercase
|
|
"GSTACK-CRON:changes=5\n", # no space after colon
|
|
"GSTACK-CRON: changes=\n", # empty int
|
|
"GSTACK-CRON: changes=abc\n", # non-int
|
|
"prefix GSTACK-CRON: changes=5\n", # not at start of line
|
|
])
|
|
def test_marker_no_match(self, stdout):
|
|
assert _MARKER_RE.search(stdout) is None
|