test(dashboard): cover constants, git helper, cron endpoint, files sandbox

This commit is contained in:
2026-04-21 07:22:48 +00:00
parent fa7c0fd1c6
commit d741541e23
4 changed files with 486 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
"""Validate the dashboard `constants` module exposes the post-consolidation paths."""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DASH = PROJECT_ROOT / "dashboard"
# Make dashboard/ importable (same trick api.py does at runtime)
if str(DASH) not in sys.path:
sys.path.insert(0, str(DASH))
@pytest.fixture(scope="module")
def constants():
# Imported as a module, not via `from dashboard import constants`,
# because handlers use bare `import constants`.
import constants as _c # type: ignore
return _c
def test_base_dir_is_echo_core(constants):
assert constants.BASE_DIR == PROJECT_ROOT
assert constants.BASE_DIR.name == "echo-core"
def test_echo_core_dir_equals_base_dir(constants):
"""Post-consolidation: ECHO_CORE_DIR and BASE_DIR point at the same place."""
assert constants.ECHO_CORE_DIR == constants.BASE_DIR
def test_git_workspace_is_echo_core(constants):
"""Legacy clawd workspace must be gone."""
assert constants.GIT_WORKSPACE == constants.BASE_DIR
assert "clawd" not in str(constants.GIT_WORKSPACE)
def test_allowed_workspaces_do_not_include_clawd(constants):
for w in constants.ALLOWED_WORKSPACES:
assert "clawd" not in str(w), f"clawd leaked into ALLOWED_WORKSPACES: {w}"
def test_allowed_workspaces_include_echo_core_and_workspace(constants):
allowed = {str(p) for p in constants.ALLOWED_WORKSPACES}
assert str(constants.BASE_DIR) in allowed
assert str(constants.WORKSPACE_DIR) in allowed
def test_venv_python_is_dot_venv(constants):
"""The dashboard must spawn the .venv python (not legacy `venv/`)."""
assert constants.VENV_PYTHON == constants.BASE_DIR / ".venv" / "bin" / "python3"
assert ".venv" in str(constants.VENV_PYTHON)
def test_notes_dir_points_at_in_repo_memory(constants):
"""memory/ lives inside echo-core post-consolidation."""
assert constants.NOTES_DIR == constants.BASE_DIR / "memory" / "kb" / "youtube"
def test_eco_services_list(constants):
assert "echo-core" in constants.ECO_SERVICES
assert "echo-whatsapp-bridge" in constants.ECO_SERVICES
assert "echo-taskboard" in constants.ECO_SERVICES
def test_echo_log_and_sessions_paths(constants):
assert constants.ECHO_LOG_FILE == constants.BASE_DIR / "logs" / "echo-core.log"
assert constants.ECHO_SESSIONS_FILE == constants.BASE_DIR / "sessions" / "active.json"
def test_habits_file_is_inside_dashboard(constants):
assert constants.HABITS_FILE == constants.KANBAN_DIR / "habits.json"
assert constants.KANBAN_DIR == constants.BASE_DIR / "dashboard"

View File

@@ -0,0 +1,170 @@
"""Tests for the /api/cron endpoint (echo-core flat schema, no UTC→local conversion)."""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DASH = PROJECT_ROOT / "dashboard"
if str(DASH) not in sys.path:
sys.path.insert(0, str(DASH))
@pytest.fixture(scope="module")
def cron_module():
from handlers import cron as _c # type: ignore
return _c
@pytest.fixture
def handler(cron_module):
class _Stub(cron_module.CronHandlers):
def __init__(self):
self.captured = None
self.captured_code = None
def send_json(self, data, code=200):
self.captured = data
self.captured_code = code
return _Stub()
def _write_jobs(tmp_path: Path, jobs: list) -> Path:
cron_dir = tmp_path / "cron"
cron_dir.mkdir(parents=True, exist_ok=True)
(cron_dir / "jobs.json").write_text(json.dumps(jobs), encoding="utf-8")
return tmp_path
def test_parse_cron_time_no_utc_conversion(cron_module):
"""Echo-core cron strings are already Europe/Bucharest; no +3 shift."""
# 10:00 Bucharest should stay 10:00 in the display.
assert cron_module._parse_cron_time("0 10 * * *") == "10:00"
assert cron_module._parse_cron_time("30 8 * * 1-5") == "08:30"
def test_parse_cron_time_handles_hour_range(cron_module):
"""A cron like `0 9-17 * * *` should display the starting hour."""
assert cron_module._parse_cron_time("0 9-17 * * *") == "09:00"
def test_parse_cron_time_falls_back_on_unexpected_expr(cron_module):
assert cron_module._parse_cron_time("@hourly") == "@hourly"[:15]
assert cron_module._parse_cron_time("bad") == "bad"
def test_iso_to_epoch_ms_handles_empty(cron_module):
assert cron_module._iso_to_epoch_ms("") == 0
assert cron_module._iso_to_epoch_ms(None) == 0
assert cron_module._iso_to_epoch_ms("not a date") == 0
def test_iso_to_epoch_ms_parses_iso(cron_module):
# Well-known epoch
ms = cron_module._iso_to_epoch_ms("1970-01-01T00:00:00+00:00")
assert ms == 0
def test_missing_jobs_file_returns_empty(tmp_path, monkeypatch, handler):
import constants # type: ignore
monkeypatch.setattr(constants, "BASE_DIR", tmp_path)
handler.handle_cron_status()
assert handler.captured["jobs"] == []
assert "No jobs file" in handler.captured["error"]
def test_non_list_jobs_file_is_rejected(tmp_path, monkeypatch, handler):
import constants # type: ignore
cron_dir = tmp_path / "cron"
cron_dir.mkdir()
(cron_dir / "jobs.json").write_text('{"not": "a list"}', encoding="utf-8")
monkeypatch.setattr(constants, "BASE_DIR", tmp_path)
handler.handle_cron_status()
assert handler.captured["jobs"] == []
assert "shape" in handler.captured["error"].lower()
def test_disabled_jobs_are_filtered_out(tmp_path, monkeypatch, handler):
import constants # type: ignore
_write_jobs(tmp_path, [
{"name": "foo", "cron": "0 10 * * *", "enabled": True},
{"name": "bar", "cron": "0 11 * * *", "enabled": False},
])
monkeypatch.setattr(constants, "BASE_DIR", tmp_path)
handler.handle_cron_status()
names = [j["name"] for j in handler.captured["jobs"]]
assert names == ["foo"]
assert handler.captured["total"] == 1
def test_frontend_shape_is_preserved(tmp_path, monkeypatch, handler):
"""Output must carry: id, name, time, schedule, ranToday, lastStatus,
lastRunAtMs, nextRunAtMs."""
import constants # type: ignore
_write_jobs(tmp_path, [
{
"name": "anaf-monitor",
"cron": "0 10 * * 1-5",
"enabled": True,
"last_run": "2026-04-21T10:00:00+03:00",
"next_run": "2026-04-22T10:00:00+03:00",
"last_status": "ok",
},
])
monkeypatch.setattr(constants, "BASE_DIR", tmp_path)
handler.handle_cron_status()
jobs = handler.captured["jobs"]
assert len(jobs) == 1
j = jobs[0]
for k in ("id", "name", "time", "schedule", "ranToday",
"lastStatus", "lastRunAtMs", "nextRunAtMs"):
assert k in j, f"missing key {k!r} in response"
# Echo-core has no separate id — fallback is name.
assert j["id"] == "anaf-monitor"
assert j["name"] == "anaf-monitor"
assert j["time"] == "10:00"
assert j["schedule"] == "0 10 * * 1-5"
assert j["lastRunAtMs"] > 0
assert j["nextRunAtMs"] is not None and j["nextRunAtMs"] > 0
def test_ran_today_is_based_on_last_run_ms(tmp_path, monkeypatch, handler):
"""If last_run is in the past (before today 00:00), ranToday is False and lastStatus is None."""
import constants # type: ignore
_write_jobs(tmp_path, [
{
"name": "ancient",
"cron": "0 5 * * *",
"enabled": True,
"last_run": "2020-01-01T05:00:00+03:00",
"next_run": "2026-04-22T05:00:00+03:00",
"last_status": "ok",
},
])
monkeypatch.setattr(constants, "BASE_DIR", tmp_path)
handler.handle_cron_status()
j = handler.captured["jobs"][0]
assert j["ranToday"] is False
# lastStatus is only surfaced when ranToday — else it's None.
assert j["lastStatus"] is None
def test_jobs_are_sorted_by_display_time(tmp_path, monkeypatch, handler):
import constants # type: ignore
_write_jobs(tmp_path, [
{"name": "late", "cron": "0 22 * * *", "enabled": True},
{"name": "early", "cron": "0 5 * * *", "enabled": True},
{"name": "mid", "cron": "0 12 * * *", "enabled": True},
])
monkeypatch.setattr(constants, "BASE_DIR", tmp_path)
handler.handle_cron_status()
names = [j["name"] for j in handler.captured["jobs"]]
assert names == ["early", "mid", "late"]

View File

@@ -0,0 +1,107 @@
"""Tests for the /api/files sandbox — _resolve_sandboxed must block path traversal."""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DASH = PROJECT_ROOT / "dashboard"
if str(DASH) not in sys.path:
sys.path.insert(0, str(DASH))
@pytest.fixture(scope="module")
def files_module():
from handlers import files as _f # type: ignore
return _f
@pytest.fixture
def handler(files_module):
class _Stub(files_module.FilesHandlers):
def __init__(self):
self.captured = None
self.captured_code = None
def send_json(self, data, code=200):
self.captured = data
self.captured_code = code
return _Stub()
@pytest.fixture
def sandboxed(tmp_path, monkeypatch):
"""Replace ALLOWED_WORKSPACES with a single tmp_path to isolate tests."""
import constants # type: ignore
root = tmp_path / "repo"
root.mkdir()
monkeypatch.setattr(constants, "ALLOWED_WORKSPACES", [root])
return root
def test_resolve_accepts_file_inside_workspace(handler, sandboxed):
(sandboxed / "notes.md").write_text("hello", encoding="utf-8")
target, workspace = handler._resolve_sandboxed("notes.md")
assert target is not None
assert target == (sandboxed / "notes.md").resolve()
assert workspace == sandboxed
def test_resolve_accepts_nested_file(handler, sandboxed):
(sandboxed / "sub").mkdir()
(sandboxed / "sub" / "file.txt").write_text("x", encoding="utf-8")
target, workspace = handler._resolve_sandboxed("sub/file.txt")
assert target == (sandboxed / "sub" / "file.txt").resolve()
assert workspace == sandboxed
def test_resolve_rejects_parent_traversal(handler, sandboxed):
"""../etc/passwd-style requests must resolve OUTSIDE the workspace and
be refused by returning (None, None)."""
target, workspace = handler._resolve_sandboxed("../../../etc/passwd")
assert target is None
assert workspace is None
def test_resolve_rejects_absolute_escape(handler, sandboxed):
"""Absolute path that lands outside the workspace must be refused.
Note: Path('/base') / '/etc/passwd' == Path('/etc/passwd') because the
second path is absolute. The resolver must still refuse it.
"""
target, workspace = handler._resolve_sandboxed("/etc/passwd")
assert target is None, "absolute outside path must be refused"
assert workspace is None
def test_handle_files_get_returns_403_on_denied(handler, sandboxed):
"""End-to-end: /api/files with a traversal path must 403."""
# Simulate HTTP path parsing: the endpoint reads self.path
handler.path = "/api/files?path=../../../etc/passwd&action=list"
handler.handle_files_get()
assert handler.captured_code == 403
assert handler.captured["error"] == "Access denied"
def test_handle_files_get_reads_a_file(handler, sandboxed):
(sandboxed / "hello.txt").write_text("hi there", encoding="utf-8")
handler.path = "/api/files?path=hello.txt&action=list"
handler.handle_files_get()
assert handler.captured_code == 200 or handler.captured_code is None
assert handler.captured["type"] == "file"
assert handler.captured["content"] == "hi there"
def test_handle_files_get_lists_a_dir(handler, sandboxed):
(sandboxed / "a.md").write_text("a", encoding="utf-8")
(sandboxed / "b.md").write_text("b", encoding="utf-8")
(sandboxed / "nested").mkdir()
handler.path = "/api/files?path=&action=list"
handler.handle_files_get()
assert handler.captured["type"] == "dir"
names = {item["name"] for item in handler.captured["items"]}
assert names == {"a.md", "b.md", "nested"}

133
tests/test_dashboard_git.py Normal file
View File

@@ -0,0 +1,133 @@
"""Unit tests for dashboard.handlers.git — _run_git helper + endpoint shapes."""
from __future__ import annotations
import subprocess
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DASH = PROJECT_ROOT / "dashboard"
if str(DASH) not in sys.path:
sys.path.insert(0, str(DASH))
@pytest.fixture(scope="module")
def git_module():
from handlers import git as _g # type: ignore
return _g
@pytest.fixture
def handler(git_module):
"""A bare instance of GitHandlers with a captured send_json."""
class _Stub(git_module.GitHandlers):
def __init__(self):
self.captured = None
self.captured_code = None
def send_json(self, data, code=200):
self.captured = data
self.captured_code = code
return _Stub()
def test_run_git_is_a_subprocess_call(handler, git_module, tmp_path):
"""_run_git must use subprocess.run with the supplied cwd + timeout."""
with patch.object(git_module.subprocess, "run") as mock_run:
mock_run.return_value = subprocess.CompletedProcess(
args=["git", "status"], returncode=0, stdout="clean\n", stderr=""
)
result = handler._run_git(tmp_path, ["status"], timeout=3)
mock_run.assert_called_once()
args, kwargs = mock_run.call_args
assert args[0] == ["git", "status"]
assert kwargs["cwd"] == str(tmp_path)
assert kwargs["timeout"] == 3
assert kwargs["capture_output"] is True
assert kwargs["text"] is True
assert result.stdout == "clean\n"
def test_run_git_default_timeout_is_5(handler, git_module, tmp_path):
with patch.object(git_module.subprocess, "run") as mock_run:
mock_run.return_value = subprocess.CompletedProcess(
args=["git", "log"], returncode=0, stdout="", stderr=""
)
handler._run_git(tmp_path, ["log"])
_, kwargs = mock_run.call_args
assert kwargs["timeout"] == 5
def test_legacy_git_commit_handler_is_gone(git_module):
"""/api/git-commit was consolidated into /api/eco/git-commit."""
assert not hasattr(git_module.GitHandlers, "handle_git_commit")
def test_git_status_uses_echo_core_workspace(handler, git_module):
"""handle_git_status must target constants.GIT_WORKSPACE (echo-core), not clawd."""
captured_workspaces = []
def fake_run(_self, workspace, args, timeout=5):
captured_workspaces.append(workspace)
stdout_map = {
("branch", "--show-current"): "master\n",
("log", "-1", "--format=%h|%s|%cr"): "abc1234|test commit|1 hour ago\n",
("status", "--short"): "",
("diff", "--stat", "--cached"): "",
("diff", "--stat"): "",
}
return subprocess.CompletedProcess(
args=["git", *args],
returncode=0,
stdout=stdout_map.get(tuple(args), ""),
stderr="",
)
with patch.object(git_module.GitHandlers, "_run_git", fake_run):
handler.handle_git_status()
import constants # type: ignore
assert all(w == constants.GIT_WORKSPACE for w in captured_workspaces)
assert handler.captured is not None
assert handler.captured["branch"] == "master"
def test_git_status_parses_uncommitted_paths(handler, git_module):
def fake_run(_self, workspace, args, timeout=5):
if args == ["status", "--short"]:
return subprocess.CompletedProcess(
args=["git"] + args, returncode=0,
stdout=" M src/foo.py\n?? new_file.txt\n", stderr="",
)
if args[:1] == ["branch"]:
return subprocess.CompletedProcess(
args=["git"] + args, returncode=0, stdout="feat-x\n", stderr="",
)
if args[:1] == ["log"]:
return subprocess.CompletedProcess(
args=["git"] + args, returncode=0,
stdout="deadbee|msg|2 minutes ago\n", stderr="",
)
return subprocess.CompletedProcess(
args=["git"] + args, returncode=0, stdout="", stderr="",
)
with patch.object(git_module.GitHandlers, "_run_git", fake_run):
handler.handle_git_status()
data = handler.captured
assert data is not None
assert data["uncommittedCount"] == 2
assert data["clean"] is False
paths = [u["path"] for u in data["uncommittedParsed"]]
statuses = {u["status"] for u in data["uncommittedParsed"]}
assert "src/foo.py" in paths
assert "new_file.txt" in paths
assert "M" in statuses
assert "??" in statuses