test(scheduler): cover shell-kind validation, execution, timezone, backward-compat

Adds four new test groups to tests/test_scheduler.py:

- TestTimezone: asserts AsyncIOScheduler is constructed with Europe/Bucharest.
- TestShellKind: 16 cases covering add_shell_job validation (duplicate name
  across claude/shell, invalid cron, empty/non-list/non-string command,
  bad report_on, bad timeout bounds/type, empty channel, custom report_on
  and timeout pass-through).
- TestShellExecute: 14 cases covering the report_on contract:
  - exit 0 + marker N>0 → forwards stdout
  - exit 0 + marker N==0 → silent
  - exit 0 + no marker   → silent + warning
  - report_on=always and =never variants
  - non-zero exit reports stderr even when report_on='never'
  - TimeoutExpired and launch exceptions report '[cron:X] Error: ...'
  - per-job timeout passed to subprocess.run; default 300 when None
  - subprocess.run receives the job's command list verbatim
  - stdout trimmed to 1500 ch; stderr trimmed to 500 ch
- TestBackwardCompat: a jobs.json entry without a 'kind' field dispatches
  to _execute_claude_job (never to _execute_shell_job); the existing Claude
  add_job/run_job round-trip still works with the old CLI invocation.
- TestMarkerRegex: parametrised positive/negative cases for _MARKER_RE.
This commit is contained in:
2026-04-21 07:05:29 +00:00
parent e747491b85
commit b3ed653bb3

View File

@@ -6,10 +6,11 @@ import subprocess
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from zoneinfo import ZoneInfo
import pytest
from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE
from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE, _MARKER_RE
# ---------------------------------------------------------------------------
@@ -548,3 +549,411 @@ class TestFullLifecycle:
# Remove
assert sched.remove_job("lifecycle") is True
assert sched.list_jobs() == []
# ---------------------------------------------------------------------------
# Timezone
# ---------------------------------------------------------------------------
class TestTimezone:
def test_scheduler_uses_bucharest_tz(self, sched):
tz = sched._scheduler.timezone
# Support zoneinfo.ZoneInfo (py3.9+) and pytz-style tzinfo
key = getattr(tz, "key", None) or getattr(tz, "zone", None) or str(tz)
assert key == "Europe/Bucharest"
# ---------------------------------------------------------------------------
# Shell-kind validation (add_shell_job)
# ---------------------------------------------------------------------------
class TestShellKind:
def test_add_shell_job_creates_file(self, sched, tmp_jobs):
job = sched.add_shell_job(
"shelly", "0 9 * * *", "general",
["/bin/echo", "hello"],
)
assert job["name"] == "shelly"
assert job["kind"] == "shell"
assert job["command"] == ["/bin/echo", "hello"]
assert job["report_on"] == "changes"
assert job["timeout"] is None
assert job["enabled"] is True
assert tmp_jobs["file"].exists()
data = json.loads(tmp_jobs["file"].read_text())
assert len(data) == 1
assert data[0]["kind"] == "shell"
assert data[0]["command"] == ["/bin/echo", "hello"]
def test_add_shell_job_with_report_on_always(self, sched, tmp_jobs):
job = sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], report_on="always",
)
assert job["report_on"] == "always"
def test_add_shell_job_with_custom_timeout(self, sched, tmp_jobs):
job = sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=60,
)
assert job["timeout"] == 60
def test_add_shell_job_duplicate_name_with_claude(self, sched, tmp_jobs):
sched.add_job("x", "0 * * * *", "ch", "prompt")
with pytest.raises(ValueError, match="already exists"):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"])
def test_add_shell_job_duplicate_name_with_shell(self, sched, tmp_jobs):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"])
with pytest.raises(ValueError, match="already exists"):
sched.add_shell_job("x", "0 10 * * *", "ch", ["/bin/true"])
def test_add_shell_job_invalid_cron(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid cron"):
sched.add_shell_job("x", "not a cron", "ch", ["/bin/true"])
def test_add_shell_job_invalid_name(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid job name"):
sched.add_shell_job("BadName", "0 9 * * *", "ch", ["/bin/true"])
def test_add_shell_job_empty_command_list(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", [])
def test_add_shell_job_command_not_list(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", "/bin/echo hi") # type: ignore[arg-type]
def test_add_shell_job_command_with_empty_string(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", ""])
def test_add_shell_job_command_non_string_element(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty list"):
sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", 1]) # type: ignore[list-item]
def test_add_shell_job_bad_report_on(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="Invalid report_on"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], report_on="maybe",
)
def test_add_shell_job_bad_timeout_zero(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="between 1 and 3600"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=0,
)
def test_add_shell_job_bad_timeout_negative(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="between 1 and 3600"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=-5,
)
def test_add_shell_job_bad_timeout_too_big(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="between 1 and 3600"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout=7200,
)
def test_add_shell_job_bad_timeout_not_int(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="must be an int"):
sched.add_shell_job(
"x", "0 9 * * *", "ch", ["/bin/true"], timeout="60", # type: ignore[arg-type]
)
def test_add_shell_job_empty_channel(self, sched, tmp_jobs):
with pytest.raises(ValueError, match="non-empty"):
sched.add_shell_job("x", "0 9 * * *", " ", ["/bin/true"])
# ---------------------------------------------------------------------------
# Shell-kind execution
# ---------------------------------------------------------------------------
def _mock_proc(returncode=0, stdout="", stderr=""):
p = MagicMock()
p.returncode = returncode
p.stdout = stdout
p.stderr = stderr
return p
@pytest.fixture
def sched_with_shell_job(sched, tmp_jobs):
"""Scheduler pre-loaded with a shell job 'shelly' (report_on=changes)."""
sched.add_shell_job(
"shelly", "0 9 * * *", "general", ["/bin/true"], report_on="changes",
)
return sched
class TestShellExecute:
@pytest.mark.asyncio
async def test_shell_exit_zero_changes_marker_positive(
self, sched_with_shell_job, callback,
):
stdout = "some body\nGSTACK-CRON: changes=3\nfooter\n"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched_with_shell_job.run_job("shelly")
assert result == stdout[:1500]
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
callback.assert_awaited_once_with("general", stdout[:1500])
@pytest.mark.asyncio
async def test_shell_exit_zero_changes_marker_zero(
self, sched_with_shell_job, callback,
):
stdout = "nothing here\nGSTACK-CRON: changes=0\n"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched_with_shell_job.run_job("shelly")
assert result == ""
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
callback.assert_not_awaited()
@pytest.mark.asyncio
async def test_shell_exit_zero_no_marker(
self, sched_with_shell_job, callback, caplog,
):
stdout = "completely unrelated output\n"
import logging
caplog.set_level(logging.WARNING, logger="src.scheduler")
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched_with_shell_job.run_job("shelly")
assert result == ""
assert sched_with_shell_job._jobs[0]["last_status"] == "ok"
callback.assert_not_awaited()
assert any(
"missing GSTACK-CRON marker" in rec.message
for rec in caplog.records
)
@pytest.mark.asyncio
async def test_shell_exit_zero_report_on_always(self, sched, callback):
sched.add_shell_job(
"alw", "0 9 * * *", "general", ["/bin/true"], report_on="always",
)
stdout = "hello world, no marker here"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched.run_job("alw")
assert result == stdout
callback.assert_awaited_once_with("general", stdout)
@pytest.mark.asyncio
async def test_shell_exit_zero_report_on_never(self, sched, callback):
sched.add_shell_job(
"nev", "0 9 * * *", "general", ["/bin/true"], report_on="never",
)
stdout = "GSTACK-CRON: changes=99\nbig change"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched.run_job("nev")
assert result == ""
callback.assert_not_awaited()
@pytest.mark.asyncio
async def test_shell_exit_nonzero_always_reports(
self, sched_with_shell_job, callback,
):
with patch("subprocess.run", return_value=_mock_proc(1, "", "ANAF 502")):
result = await sched_with_shell_job.run_job("shelly")
assert result == "[cron:shelly] exit 1: ANAF 502"
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
callback.assert_awaited_once_with(
"general", "[cron:shelly] exit 1: ANAF 502",
)
@pytest.mark.asyncio
async def test_shell_exit_nonzero_never_still_reports(
self, sched, callback,
):
"""report_on=never must NOT suppress error reports."""
sched.add_shell_job(
"nev", "0 9 * * *", "ch", ["/bin/false"], report_on="never",
)
with patch("subprocess.run", return_value=_mock_proc(2, "", "boom")):
result = await sched.run_job("nev")
assert "exit 2" in result
assert "boom" in result
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_timeout_reports(self, sched_with_shell_job, callback):
with patch(
"subprocess.run",
side_effect=subprocess.TimeoutExpired(cmd="x", timeout=300),
):
result = await sched_with_shell_job.run_job("shelly")
assert "timed out after 300s" in result
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_subprocess_launch_exception(
self, sched_with_shell_job, callback,
):
with patch(
"subprocess.run",
side_effect=FileNotFoundError("no such binary"),
):
result = await sched_with_shell_job.run_job("shelly")
assert "Error" in result
assert "no such binary" in result
assert sched_with_shell_job._jobs[0]["last_status"] == "error"
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_respects_per_job_timeout(self, sched, callback):
sched.add_shell_job(
"t", "0 9 * * *", "ch", ["/bin/true"], timeout=60,
)
stdout = "GSTACK-CRON: changes=1\nok\n"
with patch(
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
) as mock_run:
await sched.run_job("t")
assert mock_run.call_args.kwargs["timeout"] == 60
@pytest.mark.asyncio
async def test_shell_default_timeout_when_none(
self, sched_with_shell_job, callback,
):
"""timeout=None in job dict falls back to JOB_TIMEOUT (300)."""
stdout = "GSTACK-CRON: changes=1\n"
with patch(
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
) as mock_run:
await sched_with_shell_job.run_job("shelly")
assert mock_run.call_args.kwargs["timeout"] == 300
@pytest.mark.asyncio
async def test_shell_executes_command_list(
self, sched_with_shell_job, callback,
):
"""subprocess.run is invoked with the job's command list verbatim."""
stdout = "GSTACK-CRON: changes=1\n"
with patch(
"subprocess.run", return_value=_mock_proc(0, stdout, ""),
) as mock_run:
await sched_with_shell_job.run_job("shelly")
cmd = mock_run.call_args.args[0]
assert cmd == ["/bin/true"]
@pytest.mark.asyncio
async def test_shell_marker_anywhere_in_output(self, sched, callback):
"""Marker can appear on any line, not just last."""
sched.add_shell_job(
"m", "0 9 * * *", "ch", ["/bin/true"], report_on="changes",
)
stdout = "GSTACK-CRON: changes=5\nbody follows\nmore body\n"
with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")):
result = await sched.run_job("m")
assert result == stdout
callback.assert_awaited_once()
@pytest.mark.asyncio
async def test_shell_stderr_trimmed_to_500(
self, sched_with_shell_job, callback,
):
long_stderr = "E" * 2000
with patch(
"subprocess.run", return_value=_mock_proc(1, "", long_stderr),
):
result = await sched_with_shell_job.run_job("shelly")
# 500 chars of stderr max
assert "E" * 500 in result
assert "E" * 501 not in result
@pytest.mark.asyncio
async def test_shell_stdout_trimmed_to_1500(self, sched, callback):
sched.add_shell_job(
"big", "0 9 * * *", "ch", ["/bin/true"], report_on="always",
)
big_stdout = "x" * 5000
with patch("subprocess.run", return_value=_mock_proc(0, big_stdout, "")):
result = await sched.run_job("big")
assert len(result) == 1500
# ---------------------------------------------------------------------------
# Backward compatibility — jobs without kind field default to "claude"
# ---------------------------------------------------------------------------
class TestBackwardCompat:
def test_legacy_job_without_kind_defaults_to_claude(self, sched, tmp_jobs):
"""A jobs.json written before kind existed must still dispatch to claude."""
legacy = [_sample_job(name="legacy")]
# Explicitly ensure no 'kind' key is present
assert "kind" not in legacy[0]
tmp_jobs["file"].write_text(json.dumps(legacy))
sched._jobs = sched._load_jobs()
assert sched._jobs[0].get("kind") is None # still legacy shape on disk
# Dispatch: ensure we go through the claude path, NOT the shell path
with patch.object(
sched, "_execute_claude_job", new=AsyncMock(return_value="ok")
) as claude, patch.object(
sched, "_execute_shell_job", new=AsyncMock(return_value="never")
) as shell:
asyncio.get_event_loop() # keep lints happy; actual run below
asyncio.run(sched._execute_job(sched._jobs[0]))
claude.assert_awaited_once()
shell.assert_not_awaited()
@pytest.mark.asyncio
async def test_claude_jobs_still_work_unchanged(
self, sched, tmp_jobs, callback,
):
"""Existing Claude job pattern: add_job + run_job path unchanged."""
sched.add_job("claude-j", "0 * * * *", "general", "hi")
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = json.dumps({"result": "response"})
mock_proc.stderr = ""
with patch("src.scheduler.build_system_prompt", return_value="sys"), \
patch("subprocess.run", return_value=mock_proc) as mock_run:
result = await sched.run_job("claude-j")
assert result == "response"
# First positional arg is the CLI command list; confirm it targets CLAUDE_BIN
cmd = mock_run.call_args[0][0]
assert "-p" in cmd
assert "hi" in cmd
assert sched._jobs[0]["last_status"] == "ok"
callback.assert_awaited_once_with("general", "response")
# ---------------------------------------------------------------------------
# Marker regex unit checks
# ---------------------------------------------------------------------------
class TestMarkerRegex:
@pytest.mark.parametrize("stdout,expected", [
("GSTACK-CRON: changes=0\n", 0),
("GSTACK-CRON: changes=1\n", 1),
("GSTACK-CRON: changes=42\n", 42),
("foo\nbar\nGSTACK-CRON: changes=7\nbaz\n", 7),
("GSTACK-CRON: changes=3", 3), # no trailing newline
])
def test_marker_matches(self, stdout, expected):
m = _MARKER_RE.search(stdout)
assert m is not None
assert int(m.group(1)) == expected
@pytest.mark.parametrize("stdout", [
"",
"hello world",
"gstack-cron: changes=5\n", # lowercase
"GSTACK-CRON:changes=5\n", # no space after colon
"GSTACK-CRON: changes=\n", # empty int
"GSTACK-CRON: changes=abc\n", # non-int
"prefix GSTACK-CRON: changes=5\n", # not at start of line
])
def test_marker_no_match(self, stdout):
assert _MARKER_RE.search(stdout) is None