Files
echo-core/tests/test_dashboard_projects_endpoint.py
Marius Mutu 5e930ade02 feat(dashboard): unified workspace hub — cookie auth, 9-state projects, planning chat
Merges workspace.html + ralph.html into a single unified project hub with:
- Cookie-based auth (DASHBOARD_TOKEN, HttpOnly, SameSite=Strict)
- 9-state project badge system (running-ralph/manual, planning, approved,
  pending, blocked, failed, complete, idle) with BUTTONS_FOR_STATE matrix
- SSE realtime + polling fallback, version-based optimistic concurrency (If-Match)
- Planning chat modal (phase stepper, markdown bubbles, 50s+ wait state, auto-resume)
- Propose modal (Variant B: inline Plan-with-Echo checkbox)
- 5-type toast taxonomy (success/info/warning/busy/error, 3px colored left-bar)
- Inter font self-hosted + shared tokens.css design system + DESIGN.md
- src/jsonlock.py (flock helper, sidecar .lock for stable inode)
- src/approved_tasks_cli.py (shell-safe wrapper for cron/ralph.sh)
- 55 new tests (T#1–T#30) + real jsonlock bug fix caught by T#16/T#28
- No emoji anywhere (enforced by test_dashboard_no_emoji.py)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 07:26:19 +00:00

812 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for the unified /api/projects/* endpoints + auth + concurrency.
Covers (Lane C2 — Tasks #1#29):
- T#1 unified status merges workspace + approved
- T#2 /propose validation + 201
- T#3 /approve
- T#4 /unapprove
- T#5 /cancel
- T#7 signature mtime cache
- T#8T#12, T#19 planning endpoints
- T#13 legacy /ralph.html → /echo/workspace.html redirect
- T#14 cookie-required POST + SSE GET
- T#15 wrong-cookie 401
- T#16 flock serializes concurrent writes
- T#23 _derive_status table
- T#24 router planning unaffected by jsonlock
- T#25 lock timeout surfaces (LockTimeoutError → 503)
- T#26 transcript endpoint returns raw markdown (DOMPurify is client-side)
- T#27 If-Match version mismatch → 409
- T#28 cross-process write_locked serialization
- T#29 login/logout flow
"""
from __future__ import annotations
import io
import json
import multiprocessing
import os
import sys
import threading
import time
from pathlib import Path
from unittest.mock import patch
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DASH = PROJECT_ROOT / "dashboard"
if str(DASH) not in sys.path:
sys.path.insert(0, str(DASH))
# ── shared stub handler ──────────────────────────────────────────────
class _Headers(dict):
"""Mimic http.server's headers — case-insensitive .get()."""
def get(self, key, default=None):
for k in self:
if k.lower() == key.lower():
return self[k]
return default
@pytest.fixture
def projects_module():
from handlers import projects as _p # type: ignore
return _p
@pytest.fixture
def auth_module():
from handlers import auth as _a # type: ignore
return _a
@pytest.fixture
def stub(projects_module, auth_module, tmp_path, monkeypatch):
"""Build a stubbed handler mixing ProjectsHandlers + AuthHandlers.
Re-routes APPROVED_TASKS_FILE + WORKSPACE_DIR to tmp paths and provides
captures for send_json + raw send_response/send_header/wfile flow.
"""
import constants # type: ignore
# Re-route paths into tmp_path
approved_file = tmp_path / "approved-tasks.json"
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
monkeypatch.setattr(projects_module, "APPROVED_TASKS_FILE", approved_file)
monkeypatch.setattr(constants, "WORKSPACE_DIR", workspace_dir)
# Reset signature cache so each test starts clean.
projects_module._SIG_CACHE.update({"git_mtime": None, "signature": None, "ts": 0.0})
# Pin a known dashboard token so cookie checks are deterministic.
monkeypatch.setenv("DASHBOARD_TOKEN", "test-token")
monkeypatch.setattr(auth_module, "_DASHBOARD_TOKEN", None)
class _Stub(projects_module.ProjectsHandlers, auth_module.AuthHandlers):
def __init__(self):
self.captured = None
self.captured_code = None
self.path = "/api/projects"
self.command = "GET"
self.headers = _Headers()
self.rfile = io.BytesIO(b"")
self.wfile = io.BytesIO()
# Raw response capture — used by handle_login / handle_logout /
# handle_projects_stream which write headers+body manually.
self.response_code = None
self.response_headers: list[tuple[str, str]] = []
self.response_ended = False
self.responses: list[dict] = []
# send_json — used by most endpoints.
def send_json(self, data, code=200):
self.captured = data
self.captured_code = code
# Mirror as a raw response too so auth tests can check codes uniformly.
self.responses.append({"code": code, "headers": [], "body": json.dumps(data).encode()})
def send_error(self, code, message=None): # pragma: no cover — fallthrough
self.captured = {"error_code": code, "message": message}
self.captured_code = code
self.responses.append({"code": code, "headers": [], "body": b""})
# Raw response API — used by manual flows (login, SSE, etc).
def send_response(self, code):
self.response_code = code
self.response_headers = []
self.response_ended = False
def send_header(self, name, value):
self.response_headers.append((name, value))
def end_headers(self):
self.response_ended = True
self.responses.append({
"code": self.response_code,
"headers": list(self.response_headers),
"body": b"",
})
# Helpers for tests ----------------------------------------
def set_body(self, payload):
if isinstance(payload, (dict, list)):
blob = json.dumps(payload).encode()
self.headers["Content-Type"] = "application/json"
elif isinstance(payload, str):
blob = payload.encode()
else:
blob = bytes(payload or b"")
self.headers["Content-Length"] = str(len(blob))
self.rfile = io.BytesIO(blob)
def set_cookie(self, value):
self.headers["Cookie"] = f"dashboard={value}"
return _Stub()
# ─────────────────────────────────────────────────────────────────────
# T#14 / T#15 / T#29 — auth (cookie + login/logout)
# ─────────────────────────────────────────────────────────────────────
class TestAuth:
def test_no_cookie_check_returns_false(self, stub):
# _check_dashboard_cookie is what the do_POST middleware uses.
assert stub._check_dashboard_cookie() is False
def test_wrong_cookie_check_returns_false(self, stub):
stub.set_cookie("not-the-token")
assert stub._check_dashboard_cookie() is False
def test_correct_cookie_check_returns_true(self, stub):
stub.set_cookie("test-token")
assert stub._check_dashboard_cookie() is True
# ── login flow ─────────────────────────────────────────────────
def test_login_sets_cookie(self, stub):
stub.headers["Content-Type"] = "application/x-www-form-urlencoded"
body = b"token=test-token"
stub.headers["Content-Length"] = str(len(body))
stub.rfile = io.BytesIO(body)
stub.handle_login()
assert stub.response_code == 302
names = {h[0]: h[1] for h in stub.response_headers}
assert "Set-Cookie" in names
assert "dashboard=test-token" in names["Set-Cookie"]
assert names.get("Location") == "/echo/workspace.html"
def test_login_wrong_token_returns_401(self, stub):
stub.headers["Content-Type"] = "application/x-www-form-urlencoded"
body = b"token=wrong"
stub.headers["Content-Length"] = str(len(body))
stub.rfile = io.BytesIO(body)
stub.handle_login()
assert stub.response_code == 401
def test_login_accepts_json_body(self, stub):
stub.headers["Content-Type"] = "application/json"
body = json.dumps({"token": "test-token"}).encode()
stub.headers["Content-Length"] = str(len(body))
stub.rfile = io.BytesIO(body)
stub.handle_login()
assert stub.response_code == 302
def test_logout_clears_cookie(self, stub):
stub.handle_logout()
assert stub.response_code == 200
names = {h[0]: h[1] for h in stub.response_headers}
assert "Set-Cookie" in names
assert "Max-Age=0" in names["Set-Cookie"]
# ─────────────────────────────────────────────────────────────────────
# T#1 — unified status (workspace + approved-tasks merge)
# ─────────────────────────────────────────────────────────────────────
def _seed_approved(stub, projects_module, projects):
"""Write an approved-tasks.json file via the helper so version is set."""
def _mut(d):
d["projects"] = projects
d["version"] = 1
return d
projects_module._write_approved(_mut)
def _make_workspace(stub, projects_module, slug):
import constants # type: ignore
(constants.WORKSPACE_DIR / slug).mkdir(parents=True, exist_ok=True)
class TestUnifiedStatus:
def test_unified_status_merges_workspace_and_approved(self, stub, projects_module):
_make_workspace(stub, projects_module, "alpha")
_seed_approved(stub, projects_module, [
{"name": "alpha", "description": "the alpha project",
"status": "approved", "proposed_at": None, "approved_at": None,
"started_at": None, "pid": None, "planning_session_id": None,
"final_plan_path": None},
{"name": "ghost", "description": "no workspace yet",
"status": "pending", "proposed_at": None, "approved_at": None,
"started_at": None, "pid": None, "planning_session_id": None,
"final_plan_path": None},
])
stub.handle_unified_status()
assert stub.captured_code == 200
out = stub.captured
assert "version" in out
slugs = sorted(p["slug"] for p in out["projects"])
assert slugs == ["alpha", "ghost"]
assert out["count"] == 2
assert "fetchedAt" in out
# ─────────────────────────────────────────────────────────────────────
# T#23 — _derive_status table
# ─────────────────────────────────────────────────────────────────────
class TestDeriveStatus:
@pytest.mark.parametrize(
"approved,prd,expected",
[
(None, None, "idle"),
({"status": "pending"}, None, "pending"),
({"status": "approved"}, None, "approved"),
({"status": "planning"}, None, "planning"),
({"status": "failed"}, None, "failed"),
(None, {"userStories": [{"passes": True}]}, "complete"),
(None, {"userStories": [
{"passes": False, "blocked": True}]}, "blocked"),
(None, {"userStories": [
{"passes": False, "failed": True}]}, "failed"),
(None, {"userStories": [
{"passes": True}, {"passes": False}]}, "idle"),
],
)
def test_table(self, stub, approved, prd, expected, monkeypatch, projects_module):
# Make sure no PID-alive logic interferes.
monkeypatch.setattr(
projects_module, "_pid_alive_with_cmdline", lambda pid: (False, "")
)
result = stub._derive_status("slug", approved, None, prd)
assert result == expected, f"approved={approved} prd={prd}"
def test_running_ralph_wins_over_manual(self, stub, projects_module, monkeypatch):
"""When PID is alive and cmdline contains ralph.sh → running-ralph."""
monkeypatch.setattr(
projects_module,
"_pid_alive_with_cmdline",
lambda pid: (True, "/bin/bash tools/ralph/ralph.sh demo"),
)
out = stub._derive_status("demo", {"pid": 12345, "status": "approved"}, None, None)
assert out == "running-ralph"
def test_running_manual_when_pid_alive_no_ralph(self, stub, projects_module, monkeypatch):
monkeypatch.setattr(
projects_module,
"_pid_alive_with_cmdline",
lambda pid: (True, "/usr/bin/python3 some_script.py"),
)
out = stub._derive_status("demo", {"pid": 12345, "status": "approved"}, None, None)
assert out == "running-manual"
# ─────────────────────────────────────────────────────────────────────
# T#2 — /propose
# ─────────────────────────────────────────────────────────────────────
class TestPropose:
def _post(self, stub, payload):
stub.path = "/api/projects/propose"
stub.command = "POST"
stub.set_body(payload)
def test_propose_valid_creates_pending_entry(self, stub, projects_module):
self._post(stub, {"slug": "new-proj", "description": "a brand new project"})
stub.handle_propose()
assert stub.captured_code == 201
assert stub.captured["slug"] == "new-proj"
assert stub.captured["status"] == "pending"
# Verify on disk
data = projects_module._read_approved()
assert any(p["name"] == "new-proj" for p in data["projects"])
def test_propose_duplicate_slug_returns_409(self, stub, projects_module):
self._post(stub, {"slug": "dup-proj", "description": "first time around"})
stub.handle_propose()
# Second propose with same slug
self._post(stub, {"slug": "dup-proj", "description": "second time around"})
stub.handle_propose()
assert stub.captured_code == 409
def test_propose_invalid_slug_returns_400(self, stub):
self._post(stub, {"slug": "AB", "description": "too short uppercase slug"})
stub.handle_propose()
assert stub.captured_code == 400
assert stub.captured.get("error") == "validation_failed"
def test_propose_short_description_returns_400(self, stub):
self._post(stub, {"slug": "good-slug", "description": "short"})
stub.handle_propose()
assert stub.captured_code == 400
assert "description" in stub.captured.get("fields", {})
# ─────────────────────────────────────────────────────────────────────
# T#3 / T#4 / T#5 / T#27 — approve / unapprove / cancel + If-Match
# ─────────────────────────────────────────────────────────────────────
class TestStatusMutators:
def _seed(self, stub, projects_module, slug, status="pending"):
_seed_approved(stub, projects_module, [{
"name": slug, "description": "x" * 12, "status": status,
"proposed_at": None, "approved_at": None, "started_at": None,
"pid": None, "planning_session_id": None, "final_plan_path": None,
}])
def _post(self, stub, payload, version=None):
stub.command = "POST"
if version is not None:
stub.headers["If-Match"] = str(version)
stub.set_body(payload)
def test_approve_pending_returns_200(self, stub, projects_module):
self._seed(stub, projects_module, "to-approve")
self._post(stub, {"slug": "to-approve"})
stub.handle_approve()
assert stub.captured_code == 200
assert stub.captured["status"] == "approved"
def test_unapprove_approved_returns_200(self, stub, projects_module):
self._seed(stub, projects_module, "to-flip", status="approved")
self._post(stub, {"slug": "to-flip"})
stub.handle_unapprove()
assert stub.captured_code == 200
assert stub.captured["status"] == "pending"
def test_cancel_returns_200(self, stub, projects_module):
self._seed(stub, projects_module, "kill-me")
self._post(stub, {"slug": "kill-me"})
stub.handle_cancel()
assert stub.captured_code == 200
assert stub.captured["status"] == "cancelled"
def test_action_on_changed_version_returns_409(self, stub, projects_module):
self._seed(stub, projects_module, "stale-target")
# Read current version, then bump it via an unrelated write.
current = projects_module._get_version_from(projects_module._read_approved())
def _bump(d):
d["projects"].append({
"name": "noise", "description": "y" * 12, "status": "pending",
})
projects_module._bump_version(d)
return d
projects_module._write_approved(_bump)
# Now attempt approve with the *stale* version → 409.
self._post(stub, {"slug": "stale-target"}, version=current)
stub.handle_approve()
assert stub.captured_code == 409
assert stub.captured.get("error") == "stale"
def test_invalid_slug_returns_400(self, stub):
self._post(stub, {"slug": "AA"})
stub.handle_approve()
assert stub.captured_code == 400
# ─────────────────────────────────────────────────────────────────────
# T#7 — signature stability + mtime cache
# ─────────────────────────────────────────────────────────────────────
class TestSignature:
def test_signature_changes_when_project_added(self, stub, projects_module):
sig1 = projects_module._compute_signature()
_make_workspace(stub, projects_module, "newone")
sig2 = projects_module._compute_signature()
assert sig1 != sig2
def test_signature_mtime_cache_skips_git(self, stub, projects_module, monkeypatch):
"""Same `.git/index` mtime → cached porcelain output (subprocess not called twice)."""
# Pin the mtime so cache hits.
monkeypatch.setattr(projects_module, "_git_index_mtime", lambda: 1234.0)
calls = []
real_run = projects_module.subprocess.run
def _fake_run(*args, **kwargs):
calls.append(args)
class _R:
returncode = 0
stdout = ""
return _R()
monkeypatch.setattr(projects_module.subprocess, "run", _fake_run)
sig1 = projects_module._compute_signature()
sig2 = projects_module._compute_signature()
assert sig1 == sig2
# Only the FIRST call should have invoked git status.
assert len(calls) == 1
# ─────────────────────────────────────────────────────────────────────
# T#8 T#12 / T#19 — planning endpoints
# ─────────────────────────────────────────────────────────────────────
class _FakeSession:
def __init__(self, planning_session_id="ps-1", phase="/office-hours"):
self.planning_session_id = planning_session_id
self.phase = phase
class TestPlanningEndpoints:
def test_plan_start_sets_planning_status(self, stub, projects_module, monkeypatch):
# Mock the orchestrator so we don't hit Claude CLI.
from src import planning_orchestrator as po
monkeypatch.setattr(
po.PlanningOrchestrator, "start",
lambda **kw: (_FakeSession(planning_session_id="ps-1"), "first message"),
)
stub.command = "POST"
stub.set_body({"description": "a fresh planning project"})
stub.handle_plan_start("plan-target")
assert stub.captured_code == 200
# On-disk status is "planning"
data = projects_module._read_approved()
proj = next(p for p in data["projects"] if p["name"] == "plan-target")
assert proj["status"] == "planning"
assert proj["planning_session_id"] == "ps-1"
def test_plan_respond_returns_message(self, stub, monkeypatch):
from src import planning_orchestrator as po
monkeypatch.setattr(
po.PlanningOrchestrator, "respond",
lambda **kw: (_FakeSession(phase="/office-hours"), "second message", False),
)
stub.command = "POST"
stub.set_body({"message": "hello"})
stub.handle_plan_respond("plan-target")
assert stub.captured_code == 200
assert stub.captured["message"] == "second message"
def test_plan_respond_no_active_session_returns_404(self, stub, monkeypatch):
from src import planning_orchestrator as po
monkeypatch.setattr(
po.PlanningOrchestrator, "respond",
lambda **kw: (None, "no session", False),
)
stub.command = "POST"
stub.set_body({"message": "hello"})
stub.handle_plan_respond("missing")
assert stub.captured_code == 404
def test_plan_state_returns_active(self, stub, monkeypatch):
from src import planning_session as ps
monkeypatch.setattr(
ps, "get_planning_state",
lambda adapter, channel: {
"phase": "/office-hours",
"phases_planned": ["/office-hours"],
"phases_completed": [],
"session_id": "s",
"planning_session_id": "ps-1",
} if channel == "active-slug" else None,
)
stub.handle_plan_state("active-slug")
assert stub.captured_code == 200
assert stub.captured["status"] == "active"
assert stub.captured["phase"] == "/office-hours"
def test_plan_finalize_sets_approved(self, stub, projects_module, monkeypatch):
# Seed an approved-tasks pending entry first.
_seed_approved(stub, projects_module, [{
"name": "fin-target", "description": "x" * 12, "status": "planning",
"proposed_at": None, "approved_at": None, "started_at": None,
"pid": None, "planning_session_id": "ps-1", "final_plan_path": None,
}])
from src import planning_session as ps
from src import planning_orchestrator as po
monkeypatch.setattr(
ps, "get_planning_state",
lambda adapter, channel: {"final_plan_path": "/tmp/fin.md"},
)
monkeypatch.setattr(ps, "clear_planning_state", lambda *a, **kw: True)
monkeypatch.setattr(
po.PlanningOrchestrator, "final_plan_path",
lambda slug: Path("/tmp/fin.md"),
)
stub.command = "POST"
stub.set_body({})
stub.handle_plan_finalize("fin-target")
assert stub.captured_code == 200
assert stub.captured["status"] == "approved"
data = projects_module._read_approved()
proj = next(p for p in data["projects"] if p["name"] == "fin-target")
assert proj["status"] == "approved"
assert proj["final_plan_path"]
def test_plan_cancel_sets_pending(self, stub, projects_module, monkeypatch):
_seed_approved(stub, projects_module, [{
"name": "cancel-target", "description": "x" * 12, "status": "planning",
"proposed_at": None, "approved_at": None, "started_at": None,
"pid": None, "planning_session_id": "ps-1", "final_plan_path": None,
}])
from src import planning_orchestrator as po
monkeypatch.setattr(po.PlanningOrchestrator, "cancel", lambda *a, **kw: True)
stub.command = "POST"
stub.set_body({})
stub.handle_plan_cancel_planning("cancel-target")
assert stub.captured_code == 200
assert stub.captured["status"] == "pending"
data = projects_module._read_approved()
proj = next(p for p in data["projects"] if p["name"] == "cancel-target")
assert proj["status"] == "pending"
def test_plan_respond_slow_propagates_session_id(self, stub, monkeypatch):
"""T#19 surrogate — slow planning still returns the message + phase."""
from src import planning_orchestrator as po
def _slow_respond(**kw):
time.sleep(0.05)
return (_FakeSession(phase="/office-hours"), "delayed", False)
monkeypatch.setattr(po.PlanningOrchestrator, "respond", _slow_respond)
stub.command = "POST"
stub.set_body({"message": "yo"})
stub.handle_plan_respond("slow-target")
assert stub.captured_code == 200
assert stub.captured["message"] == "delayed"
# ─────────────────────────────────────────────────────────────────────
# T#14 ext — SSE auth
# ─────────────────────────────────────────────────────────────────────
class TestSseAuth:
def test_sse_stream_requires_cookie(self, stub):
# No Cookie header → handler short-circuits with 401.
stub.path = "/api/projects/stream"
stub.command = "GET"
stub.handle_projects_stream()
assert stub.response_code == 401
# ─────────────────────────────────────────────────────────────────────
# T#26 — markdown pipeline (server returns raw, not pre-rendered)
# ─────────────────────────────────────────────────────────────────────
class TestMarkdownPipeline:
def test_planning_response_marked_dompurify_dom_pipeline(
self, stub, projects_module, monkeypatch, tmp_path,
):
"""The transcript endpoint must return raw markdown (no HTML rendering).
Client-side DOMPurify is the only sanitiser; the server should be a
dumb passthrough.
"""
import constants # type: ignore
slug = "xss-test"
# Drop a final-plan.md with a script tag — must come back verbatim.
plan_dir = constants.WORKSPACE_DIR / slug / "scripts" / "ralph"
plan_dir.mkdir(parents=True)
raw_md = "# Plan\n<script>alert(1)</script>\n\n- item one"
(plan_dir / "final-plan.md").write_text(raw_md, encoding="utf-8")
from src import planning_session as ps
monkeypatch.setattr(
ps, "get_planning_state",
lambda adapter, channel: {
"phase": "__complete__",
"phases_completed": ["/office-hours"],
"last_text_excerpt": "<script>alert('x')</script>",
},
)
stub.handle_plan_transcript(slug)
assert stub.captured_code == 200
assert stub.captured["final_plan"] == raw_md
# The excerpt is also passed through raw — DOMPurify lives in the page.
assert "<script>" in stub.captured["last_text_excerpt"]
# ─────────────────────────────────────────────────────────────────────
# T#13 — legacy /ralph.html → /echo/workspace.html
# ─────────────────────────────────────────────────────────────────────
class TestLegacyRedirect:
def test_ralph_html_redirects_to_workspace(self):
"""Smoke check that api.py contains the redirect handler.
We check the source rather than spinning up a real HTTP server —
matches the api routing test pattern in
tests/test_dashboard_ralph_endpoint.py.
"""
src = (PROJECT_ROOT / "dashboard" / "api.py").read_text()
assert "/ralph.html" in src
assert "/echo/workspace.html" in src
# The redirect block must use status 302 — keep the user's bookmarks alive.
# (Sanity check; not exhaustive parse.)
assert "send_response(302)" in src
# ─────────────────────────────────────────────────────────────────────
# T#16 / T#25 / T#28 — concurrency
# ─────────────────────────────────────────────────────────────────────
def _writer_thread(path: str, key: str, iterations: int = 10):
"""Thread target — bumps a counter under flock, races with siblings."""
from src.jsonlock import write_locked
for _ in range(iterations):
def _mut(data):
data[key] = data.get(key, 0) + 1
return data
write_locked(path, _mut)
class TestConcurrency:
def test_flock_serializes_concurrent_writes(self, tmp_path):
"""T#16 — two threads bumping the same file via write_locked don't corrupt."""
target = tmp_path / "race.json"
target.write_text("{}", encoding="utf-8")
t1 = threading.Thread(target=_writer_thread, args=(str(target), "a", 25))
t2 = threading.Thread(target=_writer_thread, args=(str(target), "b", 25))
t1.start(); t2.start()
t1.join(); t2.join()
data = json.loads(target.read_text())
# No lost updates — each writer should land all 25 bumps.
assert data["a"] == 25
assert data["b"] == 25
def test_flock_timeout_returns_lock_timeout_error(self, tmp_path, monkeypatch):
"""T#25 — when a peer hogs LOCK_EX past the deadline, write_locked raises.
The dashboard layer is expected to translate this to 503; this test
pins the LockTimeoutError surface so that contract is testable.
"""
from src import jsonlock
from src.jsonlock import LockTimeoutError, write_locked
monkeypatch.setattr(jsonlock, "_TIMEOUT_SEC", 0.1)
monkeypatch.setattr(jsonlock, "_POLL_INTERVAL", 0.01)
target = tmp_path / "hostage.json"
target.write_text("{}", encoding="utf-8")
# write_locked contends on the sidecar lockfile, not the data file.
lock_path = str(target) + ".lock"
acquired = threading.Event()
release = threading.Event()
def _hold():
import fcntl
fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o644)
try:
fcntl.flock(fd, fcntl.LOCK_EX)
acquired.set()
release.wait(timeout=5)
try:
fcntl.flock(fd, fcntl.LOCK_UN)
except OSError:
pass
finally:
os.close(fd)
t = threading.Thread(target=_hold, daemon=True)
t.start()
try:
assert acquired.wait(timeout=2)
with pytest.raises(LockTimeoutError):
write_locked(str(target), lambda d: {"x": 1})
finally:
release.set()
t.join(timeout=2)
# ── multiprocessing helper (must be top-level for pickling) ──────────
def _proc_writer(target_path: str, key: str, iterations: int):
"""Run in a child process — uses the project's src.jsonlock helper."""
sys.path.insert(0, str(PROJECT_ROOT))
from src.jsonlock import write_locked
for _ in range(iterations):
def _mut(data, k=key):
data[k] = data.get(k, 0) + 1
return data
write_locked(target_path, _mut)
def test_cron_and_dashboard_concurrent_writes_serialize(tmp_path):
"""T#28 — two processes share flock semantics across PIDs.
Linux flock is per-fd but POSIX file locks coordinate at the inode level,
so two processes opening the same path get serialised. This test makes
sure the helper preserves that property end-to-end.
"""
target = tmp_path / "two-procs.json"
target.write_text("{}", encoding="utf-8")
ctx = multiprocessing.get_context("spawn")
p1 = ctx.Process(target=_proc_writer, args=(str(target), "p1", 15))
p2 = ctx.Process(target=_proc_writer, args=(str(target), "p2", 15))
p1.start(); p2.start()
p1.join(timeout=30); p2.join(timeout=30)
assert p1.exitcode == 0, "p1 didn't exit cleanly"
assert p2.exitcode == 0, "p2 didn't exit cleanly"
data = json.loads(target.read_text())
assert data.get("p1") == 15, f"p1 lost updates: {data}"
assert data.get("p2") == 15, f"p2 lost updates: {data}"
# ─────────────────────────────────────────────────────────────────────
# T#24 — router planning unaffected by jsonlock
# ─────────────────────────────────────────────────────────────────────
class TestRouterPlanningRegression:
"""Smoke checks that the router still handles /p, /a, /k, /l after
jsonlock wrapping. We don't exercise the full Claude CLI; we just verify
the command dispatch remains intact.
"""
def test_slash_commands_routed(self, tmp_path, monkeypatch):
from src import router, planning_session
# Re-route APPROVED_TASKS_FILE so we don't poison the real file.
approved = tmp_path / "approved-tasks.json"
approved.write_text(json.dumps({"projects": [], "version": 0}))
monkeypatch.setattr(router, "APPROVED_TASKS_FILE", approved)
# /a (list pending) — should be routed as a command, never hit Claude.
with patch("src.router.send_message") as mock_send:
response, is_cmd = router.route_message(
"ch-1", "user-1", "/a", adapter_name="discord",
)
mock_send.assert_not_called()
assert is_cmd is True
# ─────────────────────────────────────────────────────────────────────
# Misc — version helpers
# ─────────────────────────────────────────────────────────────────────
class TestVersionHelpers:
def test_get_version_handles_missing(self, projects_module):
assert projects_module._get_version_from({}) == 0
assert projects_module._get_version_from({"version": 7}) == 7
assert projects_module._get_version_from({"version": "bad"}) == 0
def test_bump_version_increments(self, projects_module):
d = {"version": 4}
out = projects_module._bump_version(d)
assert out["version"] == 5