From d741541e2386c26c108b837521a91145feb55e1a Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Tue, 21 Apr 2026 07:22:48 +0000 Subject: [PATCH] test(dashboard): cover constants, git helper, cron endpoint, files sandbox --- tests/test_dashboard_constants.py | 76 ++++++++++++ tests/test_dashboard_cron_endpoint.py | 170 ++++++++++++++++++++++++++ tests/test_dashboard_files_sandbox.py | 107 ++++++++++++++++ tests/test_dashboard_git.py | 133 ++++++++++++++++++++ 4 files changed, 486 insertions(+) create mode 100644 tests/test_dashboard_constants.py create mode 100644 tests/test_dashboard_cron_endpoint.py create mode 100644 tests/test_dashboard_files_sandbox.py create mode 100644 tests/test_dashboard_git.py diff --git a/tests/test_dashboard_constants.py b/tests/test_dashboard_constants.py new file mode 100644 index 0000000..e5c14a5 --- /dev/null +++ b/tests/test_dashboard_constants.py @@ -0,0 +1,76 @@ +"""Validate the dashboard `constants` module exposes the post-consolidation paths.""" +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +DASH = PROJECT_ROOT / "dashboard" + +# Make dashboard/ importable (same trick api.py does at runtime) +if str(DASH) not in sys.path: + sys.path.insert(0, str(DASH)) + + +@pytest.fixture(scope="module") +def constants(): + # Imported as a module, not via `from dashboard import constants`, + # because handlers use bare `import constants`. + import constants as _c # type: ignore + return _c + + +def test_base_dir_is_echo_core(constants): + assert constants.BASE_DIR == PROJECT_ROOT + assert constants.BASE_DIR.name == "echo-core" + + +def test_echo_core_dir_equals_base_dir(constants): + """Post-consolidation: ECHO_CORE_DIR and BASE_DIR point at the same place.""" + assert constants.ECHO_CORE_DIR == constants.BASE_DIR + + +def test_git_workspace_is_echo_core(constants): + """Legacy clawd workspace must be gone.""" + assert constants.GIT_WORKSPACE == constants.BASE_DIR + assert "clawd" not in str(constants.GIT_WORKSPACE) + + +def test_allowed_workspaces_do_not_include_clawd(constants): + for w in constants.ALLOWED_WORKSPACES: + assert "clawd" not in str(w), f"clawd leaked into ALLOWED_WORKSPACES: {w}" + + +def test_allowed_workspaces_include_echo_core_and_workspace(constants): + allowed = {str(p) for p in constants.ALLOWED_WORKSPACES} + assert str(constants.BASE_DIR) in allowed + assert str(constants.WORKSPACE_DIR) in allowed + + +def test_venv_python_is_dot_venv(constants): + """The dashboard must spawn the .venv python (not legacy `venv/`).""" + assert constants.VENV_PYTHON == constants.BASE_DIR / ".venv" / "bin" / "python3" + assert ".venv" in str(constants.VENV_PYTHON) + + +def test_notes_dir_points_at_in_repo_memory(constants): + """memory/ lives inside echo-core post-consolidation.""" + assert constants.NOTES_DIR == constants.BASE_DIR / "memory" / "kb" / "youtube" + + +def test_eco_services_list(constants): + assert "echo-core" in constants.ECO_SERVICES + assert "echo-whatsapp-bridge" in constants.ECO_SERVICES + assert "echo-taskboard" in constants.ECO_SERVICES + + +def test_echo_log_and_sessions_paths(constants): + assert constants.ECHO_LOG_FILE == constants.BASE_DIR / "logs" / "echo-core.log" + assert constants.ECHO_SESSIONS_FILE == constants.BASE_DIR / "sessions" / "active.json" + + +def test_habits_file_is_inside_dashboard(constants): + assert constants.HABITS_FILE == constants.KANBAN_DIR / "habits.json" + assert constants.KANBAN_DIR == constants.BASE_DIR / "dashboard" diff --git a/tests/test_dashboard_cron_endpoint.py b/tests/test_dashboard_cron_endpoint.py new file mode 100644 index 0000000..e9c0ebc --- /dev/null +++ b/tests/test_dashboard_cron_endpoint.py @@ -0,0 +1,170 @@ +"""Tests for the /api/cron endpoint (echo-core flat schema, no UTC→local conversion).""" +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +DASH = PROJECT_ROOT / "dashboard" + +if str(DASH) not in sys.path: + sys.path.insert(0, str(DASH)) + + +@pytest.fixture(scope="module") +def cron_module(): + from handlers import cron as _c # type: ignore + return _c + + +@pytest.fixture +def handler(cron_module): + class _Stub(cron_module.CronHandlers): + def __init__(self): + self.captured = None + self.captured_code = None + + def send_json(self, data, code=200): + self.captured = data + self.captured_code = code + + return _Stub() + + +def _write_jobs(tmp_path: Path, jobs: list) -> Path: + cron_dir = tmp_path / "cron" + cron_dir.mkdir(parents=True, exist_ok=True) + (cron_dir / "jobs.json").write_text(json.dumps(jobs), encoding="utf-8") + return tmp_path + + +def test_parse_cron_time_no_utc_conversion(cron_module): + """Echo-core cron strings are already Europe/Bucharest; no +3 shift.""" + # 10:00 Bucharest should stay 10:00 in the display. + assert cron_module._parse_cron_time("0 10 * * *") == "10:00" + assert cron_module._parse_cron_time("30 8 * * 1-5") == "08:30" + + +def test_parse_cron_time_handles_hour_range(cron_module): + """A cron like `0 9-17 * * *` should display the starting hour.""" + assert cron_module._parse_cron_time("0 9-17 * * *") == "09:00" + + +def test_parse_cron_time_falls_back_on_unexpected_expr(cron_module): + assert cron_module._parse_cron_time("@hourly") == "@hourly"[:15] + assert cron_module._parse_cron_time("bad") == "bad" + + +def test_iso_to_epoch_ms_handles_empty(cron_module): + assert cron_module._iso_to_epoch_ms("") == 0 + assert cron_module._iso_to_epoch_ms(None) == 0 + assert cron_module._iso_to_epoch_ms("not a date") == 0 + + +def test_iso_to_epoch_ms_parses_iso(cron_module): + # Well-known epoch + ms = cron_module._iso_to_epoch_ms("1970-01-01T00:00:00+00:00") + assert ms == 0 + + +def test_missing_jobs_file_returns_empty(tmp_path, monkeypatch, handler): + import constants # type: ignore + monkeypatch.setattr(constants, "BASE_DIR", tmp_path) + handler.handle_cron_status() + assert handler.captured["jobs"] == [] + assert "No jobs file" in handler.captured["error"] + + +def test_non_list_jobs_file_is_rejected(tmp_path, monkeypatch, handler): + import constants # type: ignore + cron_dir = tmp_path / "cron" + cron_dir.mkdir() + (cron_dir / "jobs.json").write_text('{"not": "a list"}', encoding="utf-8") + monkeypatch.setattr(constants, "BASE_DIR", tmp_path) + handler.handle_cron_status() + assert handler.captured["jobs"] == [] + assert "shape" in handler.captured["error"].lower() + + +def test_disabled_jobs_are_filtered_out(tmp_path, monkeypatch, handler): + import constants # type: ignore + _write_jobs(tmp_path, [ + {"name": "foo", "cron": "0 10 * * *", "enabled": True}, + {"name": "bar", "cron": "0 11 * * *", "enabled": False}, + ]) + monkeypatch.setattr(constants, "BASE_DIR", tmp_path) + handler.handle_cron_status() + names = [j["name"] for j in handler.captured["jobs"]] + assert names == ["foo"] + assert handler.captured["total"] == 1 + + +def test_frontend_shape_is_preserved(tmp_path, monkeypatch, handler): + """Output must carry: id, name, time, schedule, ranToday, lastStatus, + lastRunAtMs, nextRunAtMs.""" + import constants # type: ignore + _write_jobs(tmp_path, [ + { + "name": "anaf-monitor", + "cron": "0 10 * * 1-5", + "enabled": True, + "last_run": "2026-04-21T10:00:00+03:00", + "next_run": "2026-04-22T10:00:00+03:00", + "last_status": "ok", + }, + ]) + monkeypatch.setattr(constants, "BASE_DIR", tmp_path) + handler.handle_cron_status() + + jobs = handler.captured["jobs"] + assert len(jobs) == 1 + j = jobs[0] + for k in ("id", "name", "time", "schedule", "ranToday", + "lastStatus", "lastRunAtMs", "nextRunAtMs"): + assert k in j, f"missing key {k!r} in response" + # Echo-core has no separate id — fallback is name. + assert j["id"] == "anaf-monitor" + assert j["name"] == "anaf-monitor" + assert j["time"] == "10:00" + assert j["schedule"] == "0 10 * * 1-5" + assert j["lastRunAtMs"] > 0 + assert j["nextRunAtMs"] is not None and j["nextRunAtMs"] > 0 + + +def test_ran_today_is_based_on_last_run_ms(tmp_path, monkeypatch, handler): + """If last_run is in the past (before today 00:00), ranToday is False and lastStatus is None.""" + import constants # type: ignore + _write_jobs(tmp_path, [ + { + "name": "ancient", + "cron": "0 5 * * *", + "enabled": True, + "last_run": "2020-01-01T05:00:00+03:00", + "next_run": "2026-04-22T05:00:00+03:00", + "last_status": "ok", + }, + ]) + monkeypatch.setattr(constants, "BASE_DIR", tmp_path) + handler.handle_cron_status() + + j = handler.captured["jobs"][0] + assert j["ranToday"] is False + # lastStatus is only surfaced when ranToday — else it's None. + assert j["lastStatus"] is None + + +def test_jobs_are_sorted_by_display_time(tmp_path, monkeypatch, handler): + import constants # type: ignore + _write_jobs(tmp_path, [ + {"name": "late", "cron": "0 22 * * *", "enabled": True}, + {"name": "early", "cron": "0 5 * * *", "enabled": True}, + {"name": "mid", "cron": "0 12 * * *", "enabled": True}, + ]) + monkeypatch.setattr(constants, "BASE_DIR", tmp_path) + handler.handle_cron_status() + + names = [j["name"] for j in handler.captured["jobs"]] + assert names == ["early", "mid", "late"] diff --git a/tests/test_dashboard_files_sandbox.py b/tests/test_dashboard_files_sandbox.py new file mode 100644 index 0000000..986bcd6 --- /dev/null +++ b/tests/test_dashboard_files_sandbox.py @@ -0,0 +1,107 @@ +"""Tests for the /api/files sandbox — _resolve_sandboxed must block path traversal.""" +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +DASH = PROJECT_ROOT / "dashboard" + +if str(DASH) not in sys.path: + sys.path.insert(0, str(DASH)) + + +@pytest.fixture(scope="module") +def files_module(): + from handlers import files as _f # type: ignore + return _f + + +@pytest.fixture +def handler(files_module): + class _Stub(files_module.FilesHandlers): + def __init__(self): + self.captured = None + self.captured_code = None + + def send_json(self, data, code=200): + self.captured = data + self.captured_code = code + + return _Stub() + + +@pytest.fixture +def sandboxed(tmp_path, monkeypatch): + """Replace ALLOWED_WORKSPACES with a single tmp_path to isolate tests.""" + import constants # type: ignore + root = tmp_path / "repo" + root.mkdir() + monkeypatch.setattr(constants, "ALLOWED_WORKSPACES", [root]) + return root + + +def test_resolve_accepts_file_inside_workspace(handler, sandboxed): + (sandboxed / "notes.md").write_text("hello", encoding="utf-8") + target, workspace = handler._resolve_sandboxed("notes.md") + assert target is not None + assert target == (sandboxed / "notes.md").resolve() + assert workspace == sandboxed + + +def test_resolve_accepts_nested_file(handler, sandboxed): + (sandboxed / "sub").mkdir() + (sandboxed / "sub" / "file.txt").write_text("x", encoding="utf-8") + target, workspace = handler._resolve_sandboxed("sub/file.txt") + assert target == (sandboxed / "sub" / "file.txt").resolve() + assert workspace == sandboxed + + +def test_resolve_rejects_parent_traversal(handler, sandboxed): + """../etc/passwd-style requests must resolve OUTSIDE the workspace and + be refused by returning (None, None).""" + target, workspace = handler._resolve_sandboxed("../../../etc/passwd") + assert target is None + assert workspace is None + + +def test_resolve_rejects_absolute_escape(handler, sandboxed): + """Absolute path that lands outside the workspace must be refused. + + Note: Path('/base') / '/etc/passwd' == Path('/etc/passwd') because the + second path is absolute. The resolver must still refuse it. + """ + target, workspace = handler._resolve_sandboxed("/etc/passwd") + assert target is None, "absolute outside path must be refused" + assert workspace is None + + +def test_handle_files_get_returns_403_on_denied(handler, sandboxed): + """End-to-end: /api/files with a traversal path must 403.""" + # Simulate HTTP path parsing: the endpoint reads self.path + handler.path = "/api/files?path=../../../etc/passwd&action=list" + handler.handle_files_get() + assert handler.captured_code == 403 + assert handler.captured["error"] == "Access denied" + + +def test_handle_files_get_reads_a_file(handler, sandboxed): + (sandboxed / "hello.txt").write_text("hi there", encoding="utf-8") + handler.path = "/api/files?path=hello.txt&action=list" + handler.handle_files_get() + assert handler.captured_code == 200 or handler.captured_code is None + assert handler.captured["type"] == "file" + assert handler.captured["content"] == "hi there" + + +def test_handle_files_get_lists_a_dir(handler, sandboxed): + (sandboxed / "a.md").write_text("a", encoding="utf-8") + (sandboxed / "b.md").write_text("b", encoding="utf-8") + (sandboxed / "nested").mkdir() + handler.path = "/api/files?path=&action=list" + handler.handle_files_get() + assert handler.captured["type"] == "dir" + names = {item["name"] for item in handler.captured["items"]} + assert names == {"a.md", "b.md", "nested"} diff --git a/tests/test_dashboard_git.py b/tests/test_dashboard_git.py new file mode 100644 index 0000000..c675dae --- /dev/null +++ b/tests/test_dashboard_git.py @@ -0,0 +1,133 @@ +"""Unit tests for dashboard.handlers.git — _run_git helper + endpoint shapes.""" +from __future__ import annotations + +import subprocess +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +DASH = PROJECT_ROOT / "dashboard" + +if str(DASH) not in sys.path: + sys.path.insert(0, str(DASH)) + + +@pytest.fixture(scope="module") +def git_module(): + from handlers import git as _g # type: ignore + return _g + + +@pytest.fixture +def handler(git_module): + """A bare instance of GitHandlers with a captured send_json.""" + class _Stub(git_module.GitHandlers): + def __init__(self): + self.captured = None + self.captured_code = None + + def send_json(self, data, code=200): + self.captured = data + self.captured_code = code + + return _Stub() + + +def test_run_git_is_a_subprocess_call(handler, git_module, tmp_path): + """_run_git must use subprocess.run with the supplied cwd + timeout.""" + with patch.object(git_module.subprocess, "run") as mock_run: + mock_run.return_value = subprocess.CompletedProcess( + args=["git", "status"], returncode=0, stdout="clean\n", stderr="" + ) + result = handler._run_git(tmp_path, ["status"], timeout=3) + + mock_run.assert_called_once() + args, kwargs = mock_run.call_args + assert args[0] == ["git", "status"] + assert kwargs["cwd"] == str(tmp_path) + assert kwargs["timeout"] == 3 + assert kwargs["capture_output"] is True + assert kwargs["text"] is True + assert result.stdout == "clean\n" + + +def test_run_git_default_timeout_is_5(handler, git_module, tmp_path): + with patch.object(git_module.subprocess, "run") as mock_run: + mock_run.return_value = subprocess.CompletedProcess( + args=["git", "log"], returncode=0, stdout="", stderr="" + ) + handler._run_git(tmp_path, ["log"]) + _, kwargs = mock_run.call_args + assert kwargs["timeout"] == 5 + + +def test_legacy_git_commit_handler_is_gone(git_module): + """/api/git-commit was consolidated into /api/eco/git-commit.""" + assert not hasattr(git_module.GitHandlers, "handle_git_commit") + + +def test_git_status_uses_echo_core_workspace(handler, git_module): + """handle_git_status must target constants.GIT_WORKSPACE (echo-core), not clawd.""" + captured_workspaces = [] + + def fake_run(_self, workspace, args, timeout=5): + captured_workspaces.append(workspace) + stdout_map = { + ("branch", "--show-current"): "master\n", + ("log", "-1", "--format=%h|%s|%cr"): "abc1234|test commit|1 hour ago\n", + ("status", "--short"): "", + ("diff", "--stat", "--cached"): "", + ("diff", "--stat"): "", + } + return subprocess.CompletedProcess( + args=["git", *args], + returncode=0, + stdout=stdout_map.get(tuple(args), ""), + stderr="", + ) + + with patch.object(git_module.GitHandlers, "_run_git", fake_run): + handler.handle_git_status() + + import constants # type: ignore + assert all(w == constants.GIT_WORKSPACE for w in captured_workspaces) + assert handler.captured is not None + assert handler.captured["branch"] == "master" + + +def test_git_status_parses_uncommitted_paths(handler, git_module): + def fake_run(_self, workspace, args, timeout=5): + if args == ["status", "--short"]: + return subprocess.CompletedProcess( + args=["git"] + args, returncode=0, + stdout=" M src/foo.py\n?? new_file.txt\n", stderr="", + ) + if args[:1] == ["branch"]: + return subprocess.CompletedProcess( + args=["git"] + args, returncode=0, stdout="feat-x\n", stderr="", + ) + if args[:1] == ["log"]: + return subprocess.CompletedProcess( + args=["git"] + args, returncode=0, + stdout="deadbee|msg|2 minutes ago\n", stderr="", + ) + return subprocess.CompletedProcess( + args=["git"] + args, returncode=0, stdout="", stderr="", + ) + + with patch.object(git_module.GitHandlers, "_run_git", fake_run): + handler.handle_git_status() + + data = handler.captured + assert data is not None + assert data["uncommittedCount"] == 2 + assert data["clean"] is False + paths = [u["path"] for u in data["uncommittedParsed"]] + statuses = {u["status"] for u in data["uncommittedParsed"]} + assert "src/foo.py" in paths + assert "new_file.txt" in paths + assert "M" in statuses + assert "??" in statuses