"""Tests for the unified /api/projects/* endpoints + auth + concurrency.
Covers (Lane C2 — Tasks #1–#29):
- T#1 unified status merges workspace + approved
- T#2 /propose validation + 201
- T#3 /approve
- T#4 /unapprove
- T#5 /cancel
- T#7 signature mtime cache
- T#8–T#12, T#19 planning endpoints
- T#13 legacy /ralph.html → /echo/workspace.html redirect
- T#14 cookie-required POST + SSE GET
- T#15 wrong-cookie 401
- T#16 flock serializes concurrent writes
- T#23 _derive_status table
- T#24 router planning unaffected by jsonlock
- T#25 lock timeout surfaces (LockTimeoutError → 503)
- T#26 transcript endpoint returns raw markdown (DOMPurify is client-side)
- T#27 If-Match version mismatch → 409
- T#28 cross-process write_locked serialization
- T#29 login/logout flow
"""
from __future__ import annotations
import io
import json
import multiprocessing
import os
import sys
import threading
import time
from pathlib import Path
from unittest.mock import patch
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DASH = PROJECT_ROOT / "dashboard"
if str(DASH) not in sys.path:
sys.path.insert(0, str(DASH))
# ── shared stub handler ──────────────────────────────────────────────
class _Headers(dict):
"""Mimic http.server's headers — case-insensitive .get()."""
def get(self, key, default=None):
for k in self:
if k.lower() == key.lower():
return self[k]
return default
@pytest.fixture
def projects_module():
from handlers import projects as _p # type: ignore
return _p
@pytest.fixture
def auth_module():
from handlers import auth as _a # type: ignore
return _a
@pytest.fixture
def stub(projects_module, auth_module, tmp_path, monkeypatch):
"""Build a stubbed handler mixing ProjectsHandlers + AuthHandlers.
Re-routes APPROVED_TASKS_FILE + WORKSPACE_DIR to tmp paths and provides
captures for send_json + raw send_response/send_header/wfile flow.
"""
import constants # type: ignore
# Re-route paths into tmp_path
approved_file = tmp_path / "approved-tasks.json"
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
monkeypatch.setattr(projects_module, "APPROVED_TASKS_FILE", approved_file)
monkeypatch.setattr(constants, "WORKSPACE_DIR", workspace_dir)
# Reset signature cache so each test starts clean.
projects_module._SIG_CACHE.update({"git_mtime": None, "signature": None, "ts": 0.0})
# Pin a known dashboard token so cookie checks are deterministic.
monkeypatch.setenv("DASHBOARD_TOKEN", "test-token")
monkeypatch.setattr(auth_module, "_DASHBOARD_TOKEN", None)
class _Stub(projects_module.ProjectsHandlers, auth_module.AuthHandlers):
def __init__(self):
self.captured = None
self.captured_code = None
self.path = "/api/projects"
self.command = "GET"
self.headers = _Headers()
self.rfile = io.BytesIO(b"")
self.wfile = io.BytesIO()
# Raw response capture — used by handle_login / handle_logout /
# handle_projects_stream which write headers+body manually.
self.response_code = None
self.response_headers: list[tuple[str, str]] = []
self.response_ended = False
self.responses: list[dict] = []
# send_json — used by most endpoints.
def send_json(self, data, code=200):
self.captured = data
self.captured_code = code
# Mirror as a raw response too so auth tests can check codes uniformly.
self.responses.append({"code": code, "headers": [], "body": json.dumps(data).encode()})
def send_error(self, code, message=None): # pragma: no cover — fallthrough
self.captured = {"error_code": code, "message": message}
self.captured_code = code
self.responses.append({"code": code, "headers": [], "body": b""})
# Raw response API — used by manual flows (login, SSE, etc).
def send_response(self, code):
self.response_code = code
self.response_headers = []
self.response_ended = False
def send_header(self, name, value):
self.response_headers.append((name, value))
def end_headers(self):
self.response_ended = True
self.responses.append({
"code": self.response_code,
"headers": list(self.response_headers),
"body": b"",
})
# Helpers for tests ----------------------------------------
def set_body(self, payload):
if isinstance(payload, (dict, list)):
blob = json.dumps(payload).encode()
self.headers["Content-Type"] = "application/json"
elif isinstance(payload, str):
blob = payload.encode()
else:
blob = bytes(payload or b"")
self.headers["Content-Length"] = str(len(blob))
self.rfile = io.BytesIO(blob)
def set_cookie(self, value):
self.headers["Cookie"] = f"dashboard={value}"
return _Stub()
# ─────────────────────────────────────────────────────────────────────
# T#14 / T#15 / T#29 — auth (cookie + login/logout)
# ─────────────────────────────────────────────────────────────────────
class TestAuth:
def test_no_cookie_check_returns_false(self, stub):
# _check_dashboard_cookie is what the do_POST middleware uses.
assert stub._check_dashboard_cookie() is False
def test_wrong_cookie_check_returns_false(self, stub):
stub.set_cookie("not-the-token")
assert stub._check_dashboard_cookie() is False
def test_correct_cookie_check_returns_true(self, stub):
stub.set_cookie("test-token")
assert stub._check_dashboard_cookie() is True
# ── login flow ─────────────────────────────────────────────────
def test_login_sets_cookie(self, stub):
stub.headers["Content-Type"] = "application/x-www-form-urlencoded"
body = b"token=test-token"
stub.headers["Content-Length"] = str(len(body))
stub.rfile = io.BytesIO(body)
stub.handle_login()
assert stub.response_code == 302
names = {h[0]: h[1] for h in stub.response_headers}
assert "Set-Cookie" in names
assert "dashboard=test-token" in names["Set-Cookie"]
assert names.get("Location") == "/echo/workspace.html"
def test_login_wrong_token_returns_401(self, stub):
stub.headers["Content-Type"] = "application/x-www-form-urlencoded"
body = b"token=wrong"
stub.headers["Content-Length"] = str(len(body))
stub.rfile = io.BytesIO(body)
stub.handle_login()
assert stub.response_code == 401
def test_login_accepts_json_body(self, stub):
stub.headers["Content-Type"] = "application/json"
body = json.dumps({"token": "test-token"}).encode()
stub.headers["Content-Length"] = str(len(body))
stub.rfile = io.BytesIO(body)
stub.handle_login()
assert stub.response_code == 302
def test_logout_clears_cookie(self, stub):
stub.handle_logout()
assert stub.response_code == 200
names = {h[0]: h[1] for h in stub.response_headers}
assert "Set-Cookie" in names
assert "Max-Age=0" in names["Set-Cookie"]
# ─────────────────────────────────────────────────────────────────────
# T#1 — unified status (workspace + approved-tasks merge)
# ─────────────────────────────────────────────────────────────────────
def _seed_approved(stub, projects_module, projects):
"""Write an approved-tasks.json file via the helper so version is set."""
def _mut(d):
d["projects"] = projects
d["version"] = 1
return d
projects_module._write_approved(_mut)
def _make_workspace(stub, projects_module, slug):
import constants # type: ignore
(constants.WORKSPACE_DIR / slug).mkdir(parents=True, exist_ok=True)
class TestUnifiedStatus:
def test_unified_status_merges_workspace_and_approved(self, stub, projects_module):
_make_workspace(stub, projects_module, "alpha")
_seed_approved(stub, projects_module, [
{"name": "alpha", "description": "the alpha project",
"status": "approved", "proposed_at": None, "approved_at": None,
"started_at": None, "pid": None, "planning_session_id": None,
"final_plan_path": None},
{"name": "ghost", "description": "no workspace yet",
"status": "pending", "proposed_at": None, "approved_at": None,
"started_at": None, "pid": None, "planning_session_id": None,
"final_plan_path": None},
])
stub.handle_unified_status()
assert stub.captured_code == 200
out = stub.captured
assert "version" in out
slugs = sorted(p["slug"] for p in out["projects"])
assert slugs == ["alpha", "ghost"]
assert out["count"] == 2
assert "fetchedAt" in out
# ─────────────────────────────────────────────────────────────────────
# T#23 — _derive_status table
# ─────────────────────────────────────────────────────────────────────
class TestDeriveStatus:
@pytest.mark.parametrize(
"approved,prd,expected",
[
(None, None, "idle"),
({"status": "pending"}, None, "pending"),
({"status": "approved"}, None, "approved"),
({"status": "planning"}, None, "planning"),
({"status": "failed"}, None, "failed"),
(None, {"userStories": [{"passes": True}]}, "complete"),
(None, {"userStories": [
{"passes": False, "blocked": True}]}, "blocked"),
(None, {"userStories": [
{"passes": False, "failed": True}]}, "failed"),
(None, {"userStories": [
{"passes": True}, {"passes": False}]}, "idle"),
],
)
def test_table(self, stub, approved, prd, expected, monkeypatch, projects_module):
# Make sure no PID-alive logic interferes.
monkeypatch.setattr(
projects_module, "_pid_alive_with_cmdline", lambda pid: (False, "")
)
result = stub._derive_status("slug", approved, None, prd)
assert result == expected, f"approved={approved} prd={prd}"
def test_running_ralph_wins_over_manual(self, stub, projects_module, monkeypatch):
"""When PID is alive and cmdline contains ralph.sh → running-ralph."""
monkeypatch.setattr(
projects_module,
"_pid_alive_with_cmdline",
lambda pid: (True, "/bin/bash tools/ralph/ralph.sh demo"),
)
out = stub._derive_status("demo", {"pid": 12345, "status": "approved"}, None, None)
assert out == "running-ralph"
def test_running_manual_when_pid_alive_no_ralph(self, stub, projects_module, monkeypatch):
monkeypatch.setattr(
projects_module,
"_pid_alive_with_cmdline",
lambda pid: (True, "/usr/bin/python3 some_script.py"),
)
out = stub._derive_status("demo", {"pid": 12345, "status": "approved"}, None, None)
assert out == "running-manual"
# ─────────────────────────────────────────────────────────────────────
# T#2 — /propose
# ─────────────────────────────────────────────────────────────────────
class TestPropose:
def _post(self, stub, payload):
stub.path = "/api/projects/propose"
stub.command = "POST"
stub.set_body(payload)
def test_propose_valid_creates_pending_entry(self, stub, projects_module):
self._post(stub, {"slug": "new-proj", "description": "a brand new project"})
stub.handle_propose()
assert stub.captured_code == 201
assert stub.captured["slug"] == "new-proj"
assert stub.captured["status"] == "pending"
# Verify on disk
data = projects_module._read_approved()
assert any(p["name"] == "new-proj" for p in data["projects"])
def test_propose_duplicate_slug_returns_409(self, stub, projects_module):
self._post(stub, {"slug": "dup-proj", "description": "first time around"})
stub.handle_propose()
# Second propose with same slug
self._post(stub, {"slug": "dup-proj", "description": "second time around"})
stub.handle_propose()
assert stub.captured_code == 409
def test_propose_invalid_slug_returns_400(self, stub):
self._post(stub, {"slug": "AB", "description": "too short uppercase slug"})
stub.handle_propose()
assert stub.captured_code == 400
assert stub.captured.get("error") == "validation_failed"
def test_propose_short_description_returns_400(self, stub):
self._post(stub, {"slug": "good-slug", "description": "short"})
stub.handle_propose()
assert stub.captured_code == 400
assert "description" in stub.captured.get("fields", {})
# ─────────────────────────────────────────────────────────────────────
# T#3 / T#4 / T#5 / T#27 — approve / unapprove / cancel + If-Match
# ─────────────────────────────────────────────────────────────────────
class TestStatusMutators:
def _seed(self, stub, projects_module, slug, status="pending"):
_seed_approved(stub, projects_module, [{
"name": slug, "description": "x" * 12, "status": status,
"proposed_at": None, "approved_at": None, "started_at": None,
"pid": None, "planning_session_id": None, "final_plan_path": None,
}])
def _post(self, stub, payload, version=None):
stub.command = "POST"
if version is not None:
stub.headers["If-Match"] = str(version)
stub.set_body(payload)
def test_approve_pending_returns_200(self, stub, projects_module):
self._seed(stub, projects_module, "to-approve")
self._post(stub, {"slug": "to-approve"})
stub.handle_approve()
assert stub.captured_code == 200
assert stub.captured["status"] == "approved"
def test_unapprove_approved_returns_200(self, stub, projects_module):
self._seed(stub, projects_module, "to-flip", status="approved")
self._post(stub, {"slug": "to-flip"})
stub.handle_unapprove()
assert stub.captured_code == 200
assert stub.captured["status"] == "pending"
def test_cancel_returns_200(self, stub, projects_module):
self._seed(stub, projects_module, "kill-me")
self._post(stub, {"slug": "kill-me"})
stub.handle_cancel()
assert stub.captured_code == 200
assert stub.captured["status"] == "cancelled"
def test_action_on_changed_version_returns_409(self, stub, projects_module):
self._seed(stub, projects_module, "stale-target")
# Read current version, then bump it via an unrelated write.
current = projects_module._get_version_from(projects_module._read_approved())
def _bump(d):
d["projects"].append({
"name": "noise", "description": "y" * 12, "status": "pending",
})
projects_module._bump_version(d)
return d
projects_module._write_approved(_bump)
# Now attempt approve with the *stale* version → 409.
self._post(stub, {"slug": "stale-target"}, version=current)
stub.handle_approve()
assert stub.captured_code == 409
assert stub.captured.get("error") == "stale"
def test_invalid_slug_returns_400(self, stub):
self._post(stub, {"slug": "AA"})
stub.handle_approve()
assert stub.captured_code == 400
# ─────────────────────────────────────────────────────────────────────
# T#7 — signature stability + mtime cache
# ─────────────────────────────────────────────────────────────────────
class TestSignature:
def test_signature_changes_when_project_added(self, stub, projects_module):
sig1 = projects_module._compute_signature()
_make_workspace(stub, projects_module, "newone")
sig2 = projects_module._compute_signature()
assert sig1 != sig2
def test_signature_mtime_cache_skips_git(self, stub, projects_module, monkeypatch):
"""Same `.git/index` mtime → cached porcelain output (subprocess not called twice)."""
# Pin the mtime so cache hits.
monkeypatch.setattr(projects_module, "_git_index_mtime", lambda: 1234.0)
calls = []
real_run = projects_module.subprocess.run
def _fake_run(*args, **kwargs):
calls.append(args)
class _R:
returncode = 0
stdout = ""
return _R()
monkeypatch.setattr(projects_module.subprocess, "run", _fake_run)
sig1 = projects_module._compute_signature()
sig2 = projects_module._compute_signature()
assert sig1 == sig2
# Only the FIRST call should have invoked git status.
assert len(calls) == 1
# ─────────────────────────────────────────────────────────────────────
# T#8 – T#12 / T#19 — planning endpoints
# ─────────────────────────────────────────────────────────────────────
class _FakeSession:
def __init__(self, planning_session_id="ps-1", phase="/office-hours"):
self.planning_session_id = planning_session_id
self.phase = phase
class TestPlanningEndpoints:
def test_plan_start_sets_planning_status(self, stub, projects_module, monkeypatch):
# Mock the orchestrator so we don't hit Claude CLI.
from src import planning_orchestrator as po
monkeypatch.setattr(
po.PlanningOrchestrator, "start",
lambda **kw: (_FakeSession(planning_session_id="ps-1"), "first message"),
)
stub.command = "POST"
stub.set_body({"description": "a fresh planning project"})
stub.handle_plan_start("plan-target")
assert stub.captured_code == 200
# On-disk status is "planning"
data = projects_module._read_approved()
proj = next(p for p in data["projects"] if p["name"] == "plan-target")
assert proj["status"] == "planning"
assert proj["planning_session_id"] == "ps-1"
def test_plan_respond_returns_message(self, stub, monkeypatch):
from src import planning_orchestrator as po
monkeypatch.setattr(
po.PlanningOrchestrator, "respond",
lambda **kw: (_FakeSession(phase="/office-hours"), "second message", False),
)
stub.command = "POST"
stub.set_body({"message": "hello"})
stub.handle_plan_respond("plan-target")
assert stub.captured_code == 200
assert stub.captured["message"] == "second message"
def test_plan_respond_no_active_session_returns_404(self, stub, monkeypatch):
from src import planning_orchestrator as po
monkeypatch.setattr(
po.PlanningOrchestrator, "respond",
lambda **kw: (None, "no session", False),
)
stub.command = "POST"
stub.set_body({"message": "hello"})
stub.handle_plan_respond("missing")
assert stub.captured_code == 404
def test_plan_state_returns_active(self, stub, monkeypatch):
from src import planning_session as ps
monkeypatch.setattr(
ps, "get_planning_state",
lambda adapter, channel: {
"phase": "/office-hours",
"phases_planned": ["/office-hours"],
"phases_completed": [],
"session_id": "s",
"planning_session_id": "ps-1",
} if channel == "active-slug" else None,
)
stub.handle_plan_state("active-slug")
assert stub.captured_code == 200
assert stub.captured["status"] == "active"
assert stub.captured["phase"] == "/office-hours"
def test_plan_finalize_sets_approved(self, stub, projects_module, monkeypatch):
# Seed an approved-tasks pending entry first.
_seed_approved(stub, projects_module, [{
"name": "fin-target", "description": "x" * 12, "status": "planning",
"proposed_at": None, "approved_at": None, "started_at": None,
"pid": None, "planning_session_id": "ps-1", "final_plan_path": None,
}])
from src import planning_session as ps
from src import planning_orchestrator as po
monkeypatch.setattr(
ps, "get_planning_state",
lambda adapter, channel: {"final_plan_path": "/tmp/fin.md"},
)
monkeypatch.setattr(ps, "clear_planning_state", lambda *a, **kw: True)
monkeypatch.setattr(
po.PlanningOrchestrator, "final_plan_path",
lambda slug: Path("/tmp/fin.md"),
)
stub.command = "POST"
stub.set_body({})
stub.handle_plan_finalize("fin-target")
assert stub.captured_code == 200
assert stub.captured["status"] == "approved"
data = projects_module._read_approved()
proj = next(p for p in data["projects"] if p["name"] == "fin-target")
assert proj["status"] == "approved"
assert proj["final_plan_path"]
def test_plan_cancel_sets_pending(self, stub, projects_module, monkeypatch):
_seed_approved(stub, projects_module, [{
"name": "cancel-target", "description": "x" * 12, "status": "planning",
"proposed_at": None, "approved_at": None, "started_at": None,
"pid": None, "planning_session_id": "ps-1", "final_plan_path": None,
}])
from src import planning_orchestrator as po
monkeypatch.setattr(po.PlanningOrchestrator, "cancel", lambda *a, **kw: True)
stub.command = "POST"
stub.set_body({})
stub.handle_plan_cancel_planning("cancel-target")
assert stub.captured_code == 200
assert stub.captured["status"] == "pending"
data = projects_module._read_approved()
proj = next(p for p in data["projects"] if p["name"] == "cancel-target")
assert proj["status"] == "pending"
def test_plan_respond_slow_propagates_session_id(self, stub, monkeypatch):
"""T#19 surrogate — slow planning still returns the message + phase."""
from src import planning_orchestrator as po
def _slow_respond(**kw):
time.sleep(0.05)
return (_FakeSession(phase="/office-hours"), "delayed", False)
monkeypatch.setattr(po.PlanningOrchestrator, "respond", _slow_respond)
stub.command = "POST"
stub.set_body({"message": "yo"})
stub.handle_plan_respond("slow-target")
assert stub.captured_code == 200
assert stub.captured["message"] == "delayed"
# ─────────────────────────────────────────────────────────────────────
# T#14 ext — SSE auth
# ─────────────────────────────────────────────────────────────────────
class TestSseAuth:
def test_sse_stream_requires_cookie(self, stub):
# No Cookie header → handler short-circuits with 401.
stub.path = "/api/projects/stream"
stub.command = "GET"
stub.handle_projects_stream()
assert stub.response_code == 401
# ─────────────────────────────────────────────────────────────────────
# T#26 — markdown pipeline (server returns raw, not pre-rendered)
# ─────────────────────────────────────────────────────────────────────
class TestMarkdownPipeline:
def test_planning_response_marked_dompurify_dom_pipeline(
self, stub, projects_module, monkeypatch, tmp_path,
):
"""The transcript endpoint must return raw markdown (no HTML rendering).
Client-side DOMPurify is the only sanitiser; the server should be a
dumb passthrough.
"""
import constants # type: ignore
slug = "xss-test"
# Drop a final-plan.md with a script tag — must come back verbatim.
plan_dir = constants.WORKSPACE_DIR / slug / "scripts" / "ralph"
plan_dir.mkdir(parents=True)
raw_md = "# Plan\n\n\n- item one"
(plan_dir / "final-plan.md").write_text(raw_md, encoding="utf-8")
from src import planning_session as ps
monkeypatch.setattr(
ps, "get_planning_state",
lambda adapter, channel: {
"phase": "__complete__",
"phases_completed": ["/office-hours"],
"last_text_excerpt": "",
},
)
stub.handle_plan_transcript(slug)
assert stub.captured_code == 200
assert stub.captured["final_plan"] == raw_md
# The excerpt is also passed through raw — DOMPurify lives in the page.
assert "