Merges workspace.html + ralph.html into a single unified project hub with: - Cookie-based auth (DASHBOARD_TOKEN, HttpOnly, SameSite=Strict) - 9-state project badge system (running-ralph/manual, planning, approved, pending, blocked, failed, complete, idle) with BUTTONS_FOR_STATE matrix - SSE realtime + polling fallback, version-based optimistic concurrency (If-Match) - Planning chat modal (phase stepper, markdown bubbles, 50s+ wait state, auto-resume) - Propose modal (Variant B: inline Plan-with-Echo checkbox) - 5-type toast taxonomy (success/info/warning/busy/error, 3px colored left-bar) - Inter font self-hosted + shared tokens.css design system + DESIGN.md - src/jsonlock.py (flock helper, sidecar .lock for stable inode) - src/approved_tasks_cli.py (shell-safe wrapper for cron/ralph.sh) - 55 new tests (T#1–T#30) + real jsonlock bug fix caught by T#16/T#28 - No emoji anywhere (enforced by test_dashboard_no_emoji.py) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
166 lines
5.3 KiB
Python
166 lines
5.3 KiB
Python
"""Tests for src.jsonlock — flock-based JSON locking primitives.
|
||
|
||
Covers:
|
||
- read_locked: missing file, valid JSON
|
||
- write_locked: create-on-missing, mutator chain, atomic temp file
|
||
- re-entrant write_locked (same thread, no deadlock)
|
||
- LockTimeoutError on contended lock (cross-thread)
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import fcntl
|
||
import json
|
||
import os
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
|
||
from src import jsonlock
|
||
from src.jsonlock import (
|
||
LockTimeoutError,
|
||
read_locked,
|
||
write_locked,
|
||
)
|
||
|
||
|
||
# ── read_locked ──────────────────────────────────────────────────────
|
||
|
||
|
||
def test_read_locked_missing_file(tmp_path):
|
||
"""read_locked surfaces FileNotFoundError on missing path.
|
||
|
||
(Callers are expected to catch and default to {}; the helper itself
|
||
does not swallow this error.)
|
||
"""
|
||
target = tmp_path / "nope.json"
|
||
with pytest.raises(FileNotFoundError):
|
||
read_locked(str(target))
|
||
|
||
|
||
def test_read_locked_valid_json(tmp_path):
|
||
target = tmp_path / "ok.json"
|
||
target.write_text(json.dumps({"a": 1, "b": [2, 3]}), encoding="utf-8")
|
||
assert read_locked(str(target)) == {"a": 1, "b": [2, 3]}
|
||
|
||
|
||
# ── write_locked ─────────────────────────────────────────────────────
|
||
|
||
|
||
def test_write_locked_creates_file(tmp_path):
|
||
target = tmp_path / "new.json"
|
||
seen: list = []
|
||
|
||
def _mut(data):
|
||
seen.append(dict(data))
|
||
data["created"] = True
|
||
return data
|
||
|
||
out = write_locked(str(target), _mut)
|
||
assert out == {"created": True}
|
||
assert seen == [{}]
|
||
assert target.exists()
|
||
assert json.loads(target.read_text()) == {"created": True}
|
||
|
||
|
||
def test_write_locked_mutator_applied(tmp_path):
|
||
target = tmp_path / "exists.json"
|
||
target.write_text(json.dumps({"counter": 1}), encoding="utf-8")
|
||
|
||
seen: list = []
|
||
|
||
def _mut(data):
|
||
seen.append(dict(data))
|
||
data["counter"] = data.get("counter", 0) + 1
|
||
return data
|
||
|
||
out = write_locked(str(target), _mut)
|
||
assert out == {"counter": 2}
|
||
assert seen == [{"counter": 1}]
|
||
assert json.loads(target.read_text()) == {"counter": 2}
|
||
|
||
|
||
def test_write_locked_atomic(tmp_path):
|
||
"""After write_locked returns, the .tmp sibling must be gone (rename clean)."""
|
||
target = tmp_path / "atomic.json"
|
||
write_locked(str(target), lambda d: {"x": 42})
|
||
assert target.exists()
|
||
assert not (tmp_path / "atomic.json.tmp").exists()
|
||
assert json.loads(target.read_text()) == {"x": 42}
|
||
|
||
|
||
# ── re-entry guard ───────────────────────────────────────────────────
|
||
|
||
|
||
def test_reentrant_write_locked(tmp_path):
|
||
"""Same-thread write_locked nested inside the mutator must not deadlock."""
|
||
target = tmp_path / "reentry.json"
|
||
|
||
def _outer(data):
|
||
data["outer"] = True
|
||
# Re-entrant call to the same path — must skip flock acquisition
|
||
# (would deadlock otherwise on the held LOCK_EX).
|
||
write_locked(str(target), _inner)
|
||
# Re-read after inner; the inner write replaced the file.
|
||
return data
|
||
|
||
def _inner(data):
|
||
# Inner observes whatever is currently on disk (data is read at start
|
||
# of write_locked; we just stash an inner marker).
|
||
data["inner"] = True
|
||
return data
|
||
|
||
out = write_locked(str(target), _outer)
|
||
# The outer mutator's `data` is independent of the inner write and
|
||
# is what gets persisted last (replaces the inner-only file).
|
||
persisted = json.loads(target.read_text())
|
||
assert persisted.get("outer") is True
|
||
# `out` is the dict the outer mutator returned.
|
||
assert out is not None
|
||
|
||
|
||
# ── timeout ──────────────────────────────────────────────────────────
|
||
|
||
|
||
def test_lock_timeout_raises(tmp_path, monkeypatch):
|
||
"""If another thread holds LOCK_EX, write_locked must give up with LockTimeoutError.
|
||
|
||
We patch _TIMEOUT_SEC down to keep the test fast (the real value of 5s
|
||
× 2 retries would make the test take 10s).
|
||
"""
|
||
monkeypatch.setattr(jsonlock, "_TIMEOUT_SEC", 0.1)
|
||
monkeypatch.setattr(jsonlock, "_POLL_INTERVAL", 0.01)
|
||
|
||
target = tmp_path / "contended.json"
|
||
target.write_text("{}", encoding="utf-8")
|
||
lock_path = str(target) + ".lock"
|
||
|
||
holder_acquired = threading.Event()
|
||
holder_release = threading.Event()
|
||
|
||
def _holder():
|
||
fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o644)
|
||
try:
|
||
fcntl.flock(fd, fcntl.LOCK_EX)
|
||
holder_acquired.set()
|
||
# Hold until the test signals release.
|
||
holder_release.wait(timeout=5.0)
|
||
try:
|
||
fcntl.flock(fd, fcntl.LOCK_UN)
|
||
except OSError:
|
||
pass
|
||
finally:
|
||
os.close(fd)
|
||
|
||
t = threading.Thread(target=_holder, daemon=True)
|
||
t.start()
|
||
assert holder_acquired.wait(timeout=2.0), "holder thread did not acquire lock"
|
||
|
||
try:
|
||
with pytest.raises(LockTimeoutError):
|
||
write_locked(str(target), lambda d: {"should": "fail"})
|
||
finally:
|
||
holder_release.set()
|
||
t.join(timeout=2.0)
|