diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index c9c3fcb..3090633 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -6,10 +6,11 @@ import subprocess from datetime import datetime, timezone from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch +from zoneinfo import ZoneInfo import pytest -from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE +from src.scheduler import Scheduler, JOBS_FILE, JOBS_DIR, _NAME_RE, _MARKER_RE # --------------------------------------------------------------------------- @@ -548,3 +549,411 @@ class TestFullLifecycle: # Remove assert sched.remove_job("lifecycle") is True assert sched.list_jobs() == [] + + +# --------------------------------------------------------------------------- +# Timezone +# --------------------------------------------------------------------------- + + +class TestTimezone: + def test_scheduler_uses_bucharest_tz(self, sched): + tz = sched._scheduler.timezone + # Support zoneinfo.ZoneInfo (py3.9+) and pytz-style tzinfo + key = getattr(tz, "key", None) or getattr(tz, "zone", None) or str(tz) + assert key == "Europe/Bucharest" + + +# --------------------------------------------------------------------------- +# Shell-kind validation (add_shell_job) +# --------------------------------------------------------------------------- + + +class TestShellKind: + def test_add_shell_job_creates_file(self, sched, tmp_jobs): + job = sched.add_shell_job( + "shelly", "0 9 * * *", "general", + ["/bin/echo", "hello"], + ) + assert job["name"] == "shelly" + assert job["kind"] == "shell" + assert job["command"] == ["/bin/echo", "hello"] + assert job["report_on"] == "changes" + assert job["timeout"] is None + assert job["enabled"] is True + assert tmp_jobs["file"].exists() + + data = json.loads(tmp_jobs["file"].read_text()) + assert len(data) == 1 + assert data[0]["kind"] == "shell" + assert data[0]["command"] == ["/bin/echo", "hello"] + + def test_add_shell_job_with_report_on_always(self, sched, tmp_jobs): + job = sched.add_shell_job( + "x", "0 9 * * *", "ch", ["/bin/true"], report_on="always", + ) + assert job["report_on"] == "always" + + def test_add_shell_job_with_custom_timeout(self, sched, tmp_jobs): + job = sched.add_shell_job( + "x", "0 9 * * *", "ch", ["/bin/true"], timeout=60, + ) + assert job["timeout"] == 60 + + def test_add_shell_job_duplicate_name_with_claude(self, sched, tmp_jobs): + sched.add_job("x", "0 * * * *", "ch", "prompt") + with pytest.raises(ValueError, match="already exists"): + sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"]) + + def test_add_shell_job_duplicate_name_with_shell(self, sched, tmp_jobs): + sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/true"]) + with pytest.raises(ValueError, match="already exists"): + sched.add_shell_job("x", "0 10 * * *", "ch", ["/bin/true"]) + + def test_add_shell_job_invalid_cron(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid cron"): + sched.add_shell_job("x", "not a cron", "ch", ["/bin/true"]) + + def test_add_shell_job_invalid_name(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid job name"): + sched.add_shell_job("BadName", "0 9 * * *", "ch", ["/bin/true"]) + + def test_add_shell_job_empty_command_list(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="non-empty list"): + sched.add_shell_job("x", "0 9 * * *", "ch", []) + + def test_add_shell_job_command_not_list(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="non-empty list"): + sched.add_shell_job("x", "0 9 * * *", "ch", "/bin/echo hi") # type: ignore[arg-type] + + def test_add_shell_job_command_with_empty_string(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="non-empty list"): + sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", ""]) + + def test_add_shell_job_command_non_string_element(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="non-empty list"): + sched.add_shell_job("x", "0 9 * * *", "ch", ["/bin/echo", 1]) # type: ignore[list-item] + + def test_add_shell_job_bad_report_on(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="Invalid report_on"): + sched.add_shell_job( + "x", "0 9 * * *", "ch", ["/bin/true"], report_on="maybe", + ) + + def test_add_shell_job_bad_timeout_zero(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="between 1 and 3600"): + sched.add_shell_job( + "x", "0 9 * * *", "ch", ["/bin/true"], timeout=0, + ) + + def test_add_shell_job_bad_timeout_negative(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="between 1 and 3600"): + sched.add_shell_job( + "x", "0 9 * * *", "ch", ["/bin/true"], timeout=-5, + ) + + def test_add_shell_job_bad_timeout_too_big(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="between 1 and 3600"): + sched.add_shell_job( + "x", "0 9 * * *", "ch", ["/bin/true"], timeout=7200, + ) + + def test_add_shell_job_bad_timeout_not_int(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="must be an int"): + sched.add_shell_job( + "x", "0 9 * * *", "ch", ["/bin/true"], timeout="60", # type: ignore[arg-type] + ) + + def test_add_shell_job_empty_channel(self, sched, tmp_jobs): + with pytest.raises(ValueError, match="non-empty"): + sched.add_shell_job("x", "0 9 * * *", " ", ["/bin/true"]) + + +# --------------------------------------------------------------------------- +# Shell-kind execution +# --------------------------------------------------------------------------- + + +def _mock_proc(returncode=0, stdout="", stderr=""): + p = MagicMock() + p.returncode = returncode + p.stdout = stdout + p.stderr = stderr + return p + + +@pytest.fixture +def sched_with_shell_job(sched, tmp_jobs): + """Scheduler pre-loaded with a shell job 'shelly' (report_on=changes).""" + sched.add_shell_job( + "shelly", "0 9 * * *", "general", ["/bin/true"], report_on="changes", + ) + return sched + + +class TestShellExecute: + @pytest.mark.asyncio + async def test_shell_exit_zero_changes_marker_positive( + self, sched_with_shell_job, callback, + ): + stdout = "some body\nGSTACK-CRON: changes=3\nfooter\n" + with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")): + result = await sched_with_shell_job.run_job("shelly") + assert result == stdout[:1500] + assert sched_with_shell_job._jobs[0]["last_status"] == "ok" + callback.assert_awaited_once_with("general", stdout[:1500]) + + @pytest.mark.asyncio + async def test_shell_exit_zero_changes_marker_zero( + self, sched_with_shell_job, callback, + ): + stdout = "nothing here\nGSTACK-CRON: changes=0\n" + with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")): + result = await sched_with_shell_job.run_job("shelly") + assert result == "" + assert sched_with_shell_job._jobs[0]["last_status"] == "ok" + callback.assert_not_awaited() + + @pytest.mark.asyncio + async def test_shell_exit_zero_no_marker( + self, sched_with_shell_job, callback, caplog, + ): + stdout = "completely unrelated output\n" + import logging + caplog.set_level(logging.WARNING, logger="src.scheduler") + with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")): + result = await sched_with_shell_job.run_job("shelly") + assert result == "" + assert sched_with_shell_job._jobs[0]["last_status"] == "ok" + callback.assert_not_awaited() + assert any( + "missing GSTACK-CRON marker" in rec.message + for rec in caplog.records + ) + + @pytest.mark.asyncio + async def test_shell_exit_zero_report_on_always(self, sched, callback): + sched.add_shell_job( + "alw", "0 9 * * *", "general", ["/bin/true"], report_on="always", + ) + stdout = "hello world, no marker here" + with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")): + result = await sched.run_job("alw") + assert result == stdout + callback.assert_awaited_once_with("general", stdout) + + @pytest.mark.asyncio + async def test_shell_exit_zero_report_on_never(self, sched, callback): + sched.add_shell_job( + "nev", "0 9 * * *", "general", ["/bin/true"], report_on="never", + ) + stdout = "GSTACK-CRON: changes=99\nbig change" + with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")): + result = await sched.run_job("nev") + assert result == "" + callback.assert_not_awaited() + + @pytest.mark.asyncio + async def test_shell_exit_nonzero_always_reports( + self, sched_with_shell_job, callback, + ): + with patch("subprocess.run", return_value=_mock_proc(1, "", "ANAF 502")): + result = await sched_with_shell_job.run_job("shelly") + assert result == "[cron:shelly] exit 1: ANAF 502" + assert sched_with_shell_job._jobs[0]["last_status"] == "error" + callback.assert_awaited_once_with( + "general", "[cron:shelly] exit 1: ANAF 502", + ) + + @pytest.mark.asyncio + async def test_shell_exit_nonzero_never_still_reports( + self, sched, callback, + ): + """report_on=never must NOT suppress error reports.""" + sched.add_shell_job( + "nev", "0 9 * * *", "ch", ["/bin/false"], report_on="never", + ) + with patch("subprocess.run", return_value=_mock_proc(2, "", "boom")): + result = await sched.run_job("nev") + assert "exit 2" in result + assert "boom" in result + callback.assert_awaited_once() + + @pytest.mark.asyncio + async def test_shell_timeout_reports(self, sched_with_shell_job, callback): + with patch( + "subprocess.run", + side_effect=subprocess.TimeoutExpired(cmd="x", timeout=300), + ): + result = await sched_with_shell_job.run_job("shelly") + assert "timed out after 300s" in result + assert sched_with_shell_job._jobs[0]["last_status"] == "error" + callback.assert_awaited_once() + + @pytest.mark.asyncio + async def test_shell_subprocess_launch_exception( + self, sched_with_shell_job, callback, + ): + with patch( + "subprocess.run", + side_effect=FileNotFoundError("no such binary"), + ): + result = await sched_with_shell_job.run_job("shelly") + assert "Error" in result + assert "no such binary" in result + assert sched_with_shell_job._jobs[0]["last_status"] == "error" + callback.assert_awaited_once() + + @pytest.mark.asyncio + async def test_shell_respects_per_job_timeout(self, sched, callback): + sched.add_shell_job( + "t", "0 9 * * *", "ch", ["/bin/true"], timeout=60, + ) + stdout = "GSTACK-CRON: changes=1\nok\n" + with patch( + "subprocess.run", return_value=_mock_proc(0, stdout, ""), + ) as mock_run: + await sched.run_job("t") + assert mock_run.call_args.kwargs["timeout"] == 60 + + @pytest.mark.asyncio + async def test_shell_default_timeout_when_none( + self, sched_with_shell_job, callback, + ): + """timeout=None in job dict falls back to JOB_TIMEOUT (300).""" + stdout = "GSTACK-CRON: changes=1\n" + with patch( + "subprocess.run", return_value=_mock_proc(0, stdout, ""), + ) as mock_run: + await sched_with_shell_job.run_job("shelly") + assert mock_run.call_args.kwargs["timeout"] == 300 + + @pytest.mark.asyncio + async def test_shell_executes_command_list( + self, sched_with_shell_job, callback, + ): + """subprocess.run is invoked with the job's command list verbatim.""" + stdout = "GSTACK-CRON: changes=1\n" + with patch( + "subprocess.run", return_value=_mock_proc(0, stdout, ""), + ) as mock_run: + await sched_with_shell_job.run_job("shelly") + cmd = mock_run.call_args.args[0] + assert cmd == ["/bin/true"] + + @pytest.mark.asyncio + async def test_shell_marker_anywhere_in_output(self, sched, callback): + """Marker can appear on any line, not just last.""" + sched.add_shell_job( + "m", "0 9 * * *", "ch", ["/bin/true"], report_on="changes", + ) + stdout = "GSTACK-CRON: changes=5\nbody follows\nmore body\n" + with patch("subprocess.run", return_value=_mock_proc(0, stdout, "")): + result = await sched.run_job("m") + assert result == stdout + callback.assert_awaited_once() + + @pytest.mark.asyncio + async def test_shell_stderr_trimmed_to_500( + self, sched_with_shell_job, callback, + ): + long_stderr = "E" * 2000 + with patch( + "subprocess.run", return_value=_mock_proc(1, "", long_stderr), + ): + result = await sched_with_shell_job.run_job("shelly") + # 500 chars of stderr max + assert "E" * 500 in result + assert "E" * 501 not in result + + @pytest.mark.asyncio + async def test_shell_stdout_trimmed_to_1500(self, sched, callback): + sched.add_shell_job( + "big", "0 9 * * *", "ch", ["/bin/true"], report_on="always", + ) + big_stdout = "x" * 5000 + with patch("subprocess.run", return_value=_mock_proc(0, big_stdout, "")): + result = await sched.run_job("big") + assert len(result) == 1500 + + +# --------------------------------------------------------------------------- +# Backward compatibility — jobs without kind field default to "claude" +# --------------------------------------------------------------------------- + + +class TestBackwardCompat: + def test_legacy_job_without_kind_defaults_to_claude(self, sched, tmp_jobs): + """A jobs.json written before kind existed must still dispatch to claude.""" + legacy = [_sample_job(name="legacy")] + # Explicitly ensure no 'kind' key is present + assert "kind" not in legacy[0] + tmp_jobs["file"].write_text(json.dumps(legacy)) + sched._jobs = sched._load_jobs() + + assert sched._jobs[0].get("kind") is None # still legacy shape on disk + # Dispatch: ensure we go through the claude path, NOT the shell path + with patch.object( + sched, "_execute_claude_job", new=AsyncMock(return_value="ok") + ) as claude, patch.object( + sched, "_execute_shell_job", new=AsyncMock(return_value="never") + ) as shell: + asyncio.get_event_loop() # keep lints happy; actual run below + asyncio.run(sched._execute_job(sched._jobs[0])) + claude.assert_awaited_once() + shell.assert_not_awaited() + + @pytest.mark.asyncio + async def test_claude_jobs_still_work_unchanged( + self, sched, tmp_jobs, callback, + ): + """Existing Claude job pattern: add_job + run_job path unchanged.""" + sched.add_job("claude-j", "0 * * * *", "general", "hi") + + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = json.dumps({"result": "response"}) + mock_proc.stderr = "" + + with patch("src.scheduler.build_system_prompt", return_value="sys"), \ + patch("subprocess.run", return_value=mock_proc) as mock_run: + result = await sched.run_job("claude-j") + + assert result == "response" + # First positional arg is the CLI command list; confirm it targets CLAUDE_BIN + cmd = mock_run.call_args[0][0] + assert "-p" in cmd + assert "hi" in cmd + assert sched._jobs[0]["last_status"] == "ok" + callback.assert_awaited_once_with("general", "response") + + +# --------------------------------------------------------------------------- +# Marker regex unit checks +# --------------------------------------------------------------------------- + + +class TestMarkerRegex: + @pytest.mark.parametrize("stdout,expected", [ + ("GSTACK-CRON: changes=0\n", 0), + ("GSTACK-CRON: changes=1\n", 1), + ("GSTACK-CRON: changes=42\n", 42), + ("foo\nbar\nGSTACK-CRON: changes=7\nbaz\n", 7), + ("GSTACK-CRON: changes=3", 3), # no trailing newline + ]) + def test_marker_matches(self, stdout, expected): + m = _MARKER_RE.search(stdout) + assert m is not None + assert int(m.group(1)) == expected + + @pytest.mark.parametrize("stdout", [ + "", + "hello world", + "gstack-cron: changes=5\n", # lowercase + "GSTACK-CRON:changes=5\n", # no space after colon + "GSTACK-CRON: changes=\n", # empty int + "GSTACK-CRON: changes=abc\n", # non-int + "prefix GSTACK-CRON: changes=5\n", # not at start of line + ]) + def test_marker_no_match(self, stdout): + assert _MARKER_RE.search(stdout) is None