Files
echo-core/dashboard/handlers/ralph.py
Marius Mutu ff9b9a0d1d feat(dashboard): SSE realtime + story rollback button
Replaces 5s polling on /echo/ralph.html with EventSource streaming and adds
a rollback control for the running Ralph cards.

Server (dashboard/handlers/ralph.py):
- /api/ralph/stream — Server-Sent Events. Emits `event: status` whenever a
  signature over the projects' state changes (poll filesystem at 2s); emits
  `event: heartbeat` every 30s to keep proxies happy. Disables proxy
  buffering via X-Accel-Buffering:no.
- /api/ralph/<slug>/rollback (POST) — runs `git revert --no-edit HEAD` in
  the project; falls back to `git reset --hard HEAD~1` only if revert
  reports conflict. After rolling back the commit, decrements `passes` on
  the last user story marked complete in prd.json (atomic temp+rename
  write, same pattern as ralph_dag.py). Returns
  `{success, message, reverted_commit, story_reverted, method}`.
- _ralph_validate_slug tightened to a strict regex (alphanum + dash +
  underscore, ≤64 chars) plus explicit ../, /, \ rejection. All previously
  accepted slugs still pass; URL-encoded traversal and shell metachars
  now blocked before the filesystem is touched.
- _ralph_collect_status / _ralph_signature factored out of
  handle_ralph_status so the SSE loop can reuse them and detect changes
  cheaply.

Server (dashboard/api.py):
- HTTPServer → ThreadingHTTPServer with daemon_threads=True. SSE is a
  long-lived response; without threading a single client would block all
  other dashboard endpoints.
- /api/ralph/stream (GET) and /api/ralph/<slug>/rollback (POST) wired
  into the dispatch.

Client (dashboard/ralph.html):
- EventSource('/api/ralph/stream') with permanent fallback to 5s polling
  when readyState=CLOSED (no server, CORS blocked, browser without SSE).
- Indicator badge: 🟢 Live (SSE), ⏱ Polling (fallback), Offline.
- Rollback button (undo-2 icon) on running cards; native confirm() with
  message: "Asta va da git revert HEAD pe <slug> și va decrementa ultima
  story trecută. Continui?"

Tests (tests/test_dashboard_ralph_endpoint.py, +20 cases):
- Strict slug validator: underscore allowed, >64 rejected, special chars
  / backslash / URL-encoded traversal rejected.
- _ralph_collect_status + _ralph_signature: stable when nothing changes,
  flips when project added or `passes` toggles.
- Rollback: invalid slug → 400, non-git project → 400, real two-commit
  repo revert succeeds and decrements last passing story (US-002 goes
  passes:false while US-001 stays passes:true), no-passing-stories case
  succeeds with story_reverted=None, response shape contract, atomic
  helper leaves no .tmp file behind.
- API routing smoke: confirms ThreadingHTTPServer + stream + rollback
  references present in dashboard/api.py.

39/39 tests pass on tests/test_dashboard_ralph_endpoint.py. Pre-existing
failures in test_dashboard_constants.py::test_base_dir_is_echo_core (the
worktree dir is `echo-core-realtime`, not `echo-core`) and
test_dashboard_unified_index.py::test_index_has_all_panels are unrelated
to this change and reproduced on master.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 19:07:13 +00:00

560 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Ralph live dashboard endpoints (W3 + dashboard-realtime).
Endpoints:
GET /api/ralph/status — toate proiectele Ralph (cards data)
GET /api/ralph/stream — Server-Sent Events stream (realtime)
GET /api/ralph/<slug>/log — tail progress.txt (default 100 lines)
GET /api/ralph/<slug>/prd — full prd.json content
POST /api/ralph/<slug>/stop — SIGTERM la Ralph PID
POST /api/ralph/<slug>/rollback — git revert HEAD + decrement last passing story
SSE detail: stream emite `event: status\\ndata: <json>\\n\\n` la schimbări (poll
fişiere la 2s); heartbeat la 30s pentru ca clientul să nu reseze conexiunea.
Necesită ThreadingHTTPServer în api.py — altfel un singur stream blochează tot.
Citește status din `~/workspace/<slug>/scripts/ralph/`:
- prd.json → stories (passes/failed/blocked/retries)
- progress.txt → log human-readable
- logs/iteration-*.log → mtime ultimului iter
- .ralph.pid → PID activ (verificat cu os.kill 0)
Reuse path constants din `dashboard/constants.py` (WORKSPACE_DIR).
"""
import json
import os
import re
import signal
import subprocess
import time
from datetime import datetime
from pathlib import Path
import constants
# Slug strict: alphanum + dash + underscore, max 64 chars. Reject path traversal explicit.
_SLUG_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
# Path Ralph per proiect (mereu în scripts/ralph/)
def _ralph_dir(project_dir: Path) -> Path:
return project_dir / "scripts" / "ralph"
# Estimare ETA simplistă: avg iter time × stories rămase
DEFAULT_ITER_MINUTES = 12 # midpoint din intervalul 8-15min menționat în plan
class RalphHandlers:
"""Mixin pentru /api/ralph/* — Ralph live status + control."""
# ── helpers ────────────────────────────────────────────────
def _ralph_validate_slug(self, slug: str):
"""Validează slug-ul + returnează project_dir sau None.
Strict: alphanum + dash + underscore, ≤64 chars. Path traversal sequences
(`..`, `/`, `\\`) sau caractere ne-alfanumerice sunt respinse înainte de
orice atingere a filesystem-ului.
"""
if not slug:
return None
# Defense-in-depth: explicit path-traversal/separator reject (regex îl
# acoperă, dar îl ţinem explicit ca safety net dacă regex-ul se relaxează).
if ".." in slug or "/" in slug or "\\" in slug:
return None
if not _SLUG_RE.match(slug):
return None
project_dir = constants.WORKSPACE_DIR / slug
try:
resolved = project_dir.resolve()
workspace_resolved = constants.WORKSPACE_DIR.resolve()
resolved.relative_to(workspace_resolved)
except (ValueError, OSError):
return None
if not project_dir.exists() or not project_dir.is_dir():
return None
return project_dir
def _ralph_pid_alive(self, ralph_dir: Path):
"""Întoarce (running: bool, pid: int|None)."""
pid_file = ralph_dir / ".ralph.pid"
if not pid_file.exists():
return False, None
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, 0) # signal 0 = check existence
return True, pid
except (ValueError, ProcessLookupError, PermissionError, OSError):
return False, None
def _ralph_eta_minutes(self, stories_remaining: int, last_iter_mtime: float | None) -> int | None:
"""Estimează minute rămase — None dacă nu avem date."""
if stories_remaining <= 0:
return 0
return stories_remaining * DEFAULT_ITER_MINUTES
def _ralph_summarize_project(self, project_dir: Path) -> dict | None:
"""Construiește dict de status per proiect — None dacă nu e Ralph project."""
ralph_dir = _ralph_dir(project_dir)
prd_json = ralph_dir / "prd.json"
if not prd_json.exists():
return None
# Defensive parse — corupt prd.json nu trebuie să dărâme dashboard
try:
prd = json.loads(prd_json.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return {
"slug": project_dir.name,
"status": "error",
"error": "prd.json invalid sau ilizibil",
"running": False,
"pid": None,
"stories": [],
"storiesTotal": 0,
"storiesComplete": 0,
"storiesFailed": 0,
"storiesBlocked": 0,
}
stories = prd.get("userStories", []) or []
total = len(stories)
complete = sum(1 for s in stories if s.get("passes"))
failed = sum(1 for s in stories if s.get("failed"))
blocked = sum(1 for s in stories if s.get("blocked"))
remaining = total - complete - failed - blocked
running, pid = self._ralph_pid_alive(ralph_dir)
# Last iteration mtime (pentru "acum X")
logs_dir = ralph_dir / "logs"
last_iter_mtime = None
last_iter_iso = None
if logs_dir.exists():
iter_logs = sorted(logs_dir.glob("iteration-*.log"), key=lambda f: f.stat().st_mtime, reverse=True)
if iter_logs:
last_iter_mtime = iter_logs[0].stat().st_mtime
last_iter_iso = datetime.fromtimestamp(last_iter_mtime).isoformat()
# Status compus pentru UI cards
if running:
top_status = "running"
elif failed > 0 and remaining == 0:
top_status = "failed"
elif complete == total and total > 0:
top_status = "complete"
elif blocked > 0 and running is False:
top_status = "blocked"
else:
top_status = "idle"
# Current story (DAG-eligible cel mai mic priority)
current_story = None
if running:
eligible = [
s for s in stories
if not s.get("passes") and not s.get("failed") and not s.get("blocked")
]
eligible.sort(key=lambda s: (s.get("priority", 999), s.get("id", "")))
if eligible:
current_story = {
"id": eligible[0].get("id"),
"title": eligible[0].get("title"),
"tags": eligible[0].get("tags", []),
"retries": eligible[0].get("retries", 0),
}
return {
"slug": project_dir.name,
"status": top_status,
"running": running,
"pid": pid,
"branchName": prd.get("branchName", ""),
"storiesTotal": total,
"storiesComplete": complete,
"storiesFailed": failed,
"storiesBlocked": blocked,
"storiesRemaining": remaining,
"currentStory": current_story,
"lastIterAt": last_iter_iso,
"etaMinutes": self._ralph_eta_minutes(remaining, last_iter_mtime),
"stories": [
{
"id": s.get("id"),
"title": s.get("title"),
"passes": bool(s.get("passes")),
"failed": bool(s.get("failed")),
"blocked": bool(s.get("blocked")),
"retries": int(s.get("retries", 0)),
"tags": s.get("tags", []),
"failureReason": s.get("failureReason", ""),
}
for s in stories
],
}
def _ralph_collect_status(self) -> dict:
"""Construieşte payload-ul de status pentru toate proiectele.
Folosit de `/api/ralph/status` (GET single-shot) şi de `/api/ralph/stream`
(SSE — emis la schimbări).
"""
projects: list[dict] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
summary = self._ralph_summarize_project(entry)
if summary is not None:
projects.append(summary)
return {
"projects": projects,
"fetchedAt": datetime.now().isoformat(),
"count": len(projects),
}
def _ralph_signature(self, snapshot: dict) -> tuple:
"""Compactă semnătură pentru change-detection în SSE — doar fields care
contează pentru UI (status, counts, current story). Timestamps de iter
au granularitate de second pentru a evita flicker pe nanosecond drift.
"""
sig: list[tuple] = []
for p in snapshot.get("projects", []) or []:
cs = p.get("currentStory") or {}
sig.append((
p.get("slug"),
p.get("status"),
bool(p.get("running")),
p.get("storiesTotal"),
p.get("storiesComplete"),
p.get("storiesFailed"),
p.get("storiesBlocked"),
p.get("lastIterAt"),
cs.get("id"),
cs.get("retries"),
))
return tuple(sorted(sig, key=lambda t: t[0] or ""))
# ── /api/ralph/status (GET) ────────────────────────────────
def handle_ralph_status(self):
"""Întoarce status pentru toate proiectele Ralph din workspace."""
try:
self.send_json(self._ralph_collect_status())
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/stream (GET, SSE) ───────────────────────────
def handle_ralph_stream(self):
"""Server-Sent Events: emite snapshot la schimbări (poll fişiere 2s).
Heartbeat la 30s pentru a evita timeout pe proxy-uri. Loop-ul iese
curat la BrokenPipe (clientul închis tab-ul). Necesită
ThreadingHTTPServer în api.py — altfel blochează toate request-urile.
"""
try:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
# Disable proxy buffering (nginx/cloudflare) — flush imediat
self.send_header("X-Accel-Buffering", "no")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
except (BrokenPipeError, ConnectionResetError):
return
last_signature: tuple | None = None
last_heartbeat = time.monotonic()
# Initial snapshot — clientul nu aşteaptă primul change
try:
snapshot = self._ralph_collect_status()
last_signature = self._ralph_signature(snapshot)
payload = json.dumps(snapshot).encode("utf-8")
self.wfile.write(b"event: status\ndata: " + payload + b"\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
return
except Exception as exc:
try:
err = json.dumps({"error": str(exc)}).encode("utf-8")
self.wfile.write(b"event: error\ndata: " + err + b"\n\n")
self.wfile.flush()
except Exception:
pass
return
# Stream loop
while True:
try:
time.sleep(2)
snapshot = self._ralph_collect_status()
signature = self._ralph_signature(snapshot)
now = time.monotonic()
if signature != last_signature:
payload = json.dumps(snapshot).encode("utf-8")
self.wfile.write(b"event: status\ndata: " + payload + b"\n\n")
self.wfile.flush()
last_signature = signature
last_heartbeat = now
elif now - last_heartbeat >= 30:
self.wfile.write(b"event: heartbeat\ndata: {}\n\n")
self.wfile.flush()
last_heartbeat = now
except (BrokenPipeError, ConnectionResetError):
return
except Exception:
# Best-effort: o iteraţie eşuată nu trebuie să termine stream-ul,
# dar dacă socketul e mort BrokenPipe va prinde next loop.
continue
# ── /api/ralph/<slug>/log (GET) ────────────────────────────
def handle_ralph_log(self, slug: str):
"""Tail progress.txt pentru un slug. Default last 100 lines."""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({"error": "Invalid project slug"}, 400)
return
from urllib.parse import parse_qs, urlparse
qs = parse_qs(urlparse(self.path).query)
try:
lines_n = min(int(qs.get("lines", ["100"])[0]), 1000)
except ValueError:
lines_n = 100
progress = _ralph_dir(project_dir) / "progress.txt"
if not progress.exists():
self.send_json({"slug": slug, "lines": [], "total": 0})
return
try:
content = progress.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
self.send_json({"error": f"read failed: {exc}"}, 500)
return
all_lines = content.splitlines()
tail = all_lines[-lines_n:] if len(all_lines) > lines_n else all_lines
self.send_json({
"slug": slug,
"lines": tail,
"total": len(all_lines),
})
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/<slug>/prd (GET) ────────────────────────────
def handle_ralph_prd(self, slug: str):
"""Returnează full prd.json pentru un slug."""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({"error": "Invalid project slug"}, 400)
return
prd_json = _ralph_dir(project_dir) / "prd.json"
if not prd_json.exists():
self.send_json({"error": "prd.json not found"}, 404)
return
try:
data = json.loads(prd_json.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
self.send_json({"error": f"prd.json invalid: {exc}"}, 500)
return
self.send_json(data)
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/<slug>/stop (POST) ──────────────────────────
def handle_ralph_stop(self, slug: str):
"""Trimite SIGTERM la Ralph PID. Verifică că PID-ul e în WORKSPACE_DIR."""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({"success": False, "error": "Invalid project slug"}, 400)
return
ralph_dir = _ralph_dir(project_dir)
pid_file = ralph_dir / ".ralph.pid"
if not pid_file.exists():
self.send_json({"success": False, "error": "No PID file"}, 404)
return
try:
pid = int(pid_file.read_text().strip())
except (ValueError, OSError) as exc:
self.send_json({"success": False, "error": f"Invalid PID file: {exc}"}, 500)
return
# Sandbox: verifică că procesul e în workspace (nu omoară random PID)
try:
proc_cwd = Path(f"/proc/{pid}/cwd").resolve()
if not str(proc_cwd).startswith(str(constants.WORKSPACE_DIR)):
self.send_json({"success": False, "error": "PID not in workspace"}, 403)
return
except (FileNotFoundError, PermissionError):
# Procesul nu mai există — best-effort cleanup
self.send_json({"success": True, "message": "Process already stopped"})
return
try:
os.killpg(os.getpgid(pid), signal.SIGTERM)
except ProcessLookupError:
self.send_json({"success": True, "message": "Process already stopped"})
return
except PermissionError:
self.send_json({"success": False, "error": "Permission denied"}, 403)
return
self.send_json({"success": True, "message": f"Ralph stopped (PID {pid})"})
except Exception as exc:
self.send_json({"success": False, "error": str(exc)}, 500)
# ── /api/ralph/<slug>/rollback (POST) ──────────────────────
def _ralph_decrement_last_pass(self, project_dir: Path) -> str | None:
"""Marchează ultima story `passes=True` (din ordinea din prd.json) ca
incompletă (`passes=False`, şterge `failed`/`blocked`/`failureReason`,
retries=0). Atomic write (temp + rename). Întoarce id-ul story-ului
sau None dacă nu există nimic de decrementat / prd.json invalid.
"""
prd_path = _ralph_dir(project_dir) / "prd.json"
if not prd_path.exists():
return None
try:
prd = json.loads(prd_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
stories = prd.get("userStories", []) or []
target_idx: int | None = None
# ultima poziţională cu passes=True (DAG-order = ordine de finalizare)
for i in range(len(stories) - 1, -1, -1):
if stories[i].get("passes"):
target_idx = i
break
if target_idx is None:
return None
story_id = stories[target_idx].get("id")
stories[target_idx]["passes"] = False
# Reset stare derivată — story-ul e disponibil pentru re-run
stories[target_idx].pop("failed", None)
stories[target_idx].pop("blocked", None)
stories[target_idx].pop("failureReason", None)
stories[target_idx]["retries"] = 0
# Atomic write (acelaşi pattern ca W3 ralph_dag.py)
tmp = prd_path.with_suffix(".json.tmp")
try:
tmp.write_text(json.dumps(prd, indent=2), encoding="utf-8")
tmp.replace(prd_path)
except OSError:
tmp.unlink(missing_ok=True)
return None
return story_id
def handle_ralph_rollback(self, slug: str):
"""Rollback ultimul commit într-un proiect Ralph.
Strategy: `git revert --no-edit HEAD` (history-preserving). Fallback la
`git reset --hard HEAD~1` doar dacă revert eşuează (conflict, binary
file). După succes, decrementează `passes` pe ultima story marcată
complete în prd.json (atomic write).
Returns: `{success, message, reverted_commit, story_reverted, method}`.
"""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({
"success": False,
"message": "Invalid project slug",
"reverted_commit": None,
"story_reverted": None,
}, 400)
return
git_dir = project_dir / ".git"
if not git_dir.exists():
self.send_json({
"success": False,
"message": "Not a git repository",
"reverted_commit": None,
"story_reverted": None,
}, 400)
return
# Read HEAD before any operation (raportăm SHA-ul afectat)
head_proc = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=str(project_dir), capture_output=True, text=True, timeout=10,
)
if head_proc.returncode != 0:
self.send_json({
"success": False,
"message": f"git rev-parse HEAD failed: {head_proc.stderr.strip()}",
"reverted_commit": None,
"story_reverted": None,
}, 500)
return
commit_to_revert = head_proc.stdout.strip()
# Try revert (preserves history, recommended)
method = "revert"
revert = subprocess.run(
["git", "revert", "--no-edit", "HEAD"],
cwd=str(project_dir), capture_output=True, text=True, timeout=30,
)
if revert.returncode != 0:
# Conflict / binary file — abort & fall back to reset --hard
subprocess.run(
["git", "revert", "--abort"],
cwd=str(project_dir), capture_output=True, timeout=10,
)
reset = subprocess.run(
["git", "reset", "--hard", "HEAD~1"],
cwd=str(project_dir), capture_output=True, text=True, timeout=30,
)
if reset.returncode != 0:
self.send_json({
"success": False,
"message": (
f"revert failed ({revert.stderr.strip()[:200]}), "
f"reset failed ({reset.stderr.strip()[:200]})"
),
"reverted_commit": commit_to_revert,
"story_reverted": None,
}, 500)
return
method = "reset"
# Best-effort: decrement story passes (nu fail dacă lipseşte prd.json)
story_reverted = self._ralph_decrement_last_pass(project_dir)
short_sha = commit_to_revert[:8]
msg_bits = [f"Rolled back {short_sha} via git {method}"]
if story_reverted:
msg_bits.append(f"story {story_reverted} marked incomplete")
self.send_json({
"success": True,
"message": "; ".join(msg_bits),
"reverted_commit": commit_to_revert,
"story_reverted": story_reverted,
"method": method,
})
except subprocess.TimeoutExpired:
self.send_json({
"success": False,
"message": "git operation timed out",
"reverted_commit": None,
"story_reverted": None,
}, 500)
except Exception as exc:
self.send_json({
"success": False,
"message": str(exc),
"reverted_commit": None,
"story_reverted": None,
}, 500)