Merge branch 'ralph/dashboard-realtime' — SSE realtime + story rollback

Server-Sent Events (TODO P3):
- GET /api/ralph/stream — signature-based change detection (poll FS 2s, emit
  doar la diff), heartbeat 30s, X-Accel-Buffering:no
- HTTPServer → ThreadingHTTPServer (altfel SSE blochează toate endpoint-urile)
- ralph.html: EventSource cu fallback permanent la polling 5s când CLOSED.
  Badge: 🟢 Live / ⏱ Polling / Offline

Story rollback (TODO P3):
- POST /api/ralph/<slug>/rollback — git revert --no-edit HEAD; fallback
  git reset --hard HEAD~1 doar la conflict
- Decrementează passes pe ultima story complete; clears failed/blocked/retries
  (atomic temp+rename)
- Slug strict regex ^[A-Za-z0-9_-]{1,64}$ + reject path traversal explicit
- Buton ↩️ pe card-uri running; confirm dialog înainte de execuție
- Response: {success, message, reverted_commit, story_reverted, method}

Tests: 39/39 pe test_dashboard_ralph_endpoint (era 19; +20 cazuri noi).

# Conflicts:
#	dashboard/api.py
#	dashboard/handlers/ralph.py
This commit is contained in:
2026-04-26 19:14:17 +00:00
4 changed files with 647 additions and 39 deletions

View File

@@ -1,14 +1,17 @@
"""Ralph live dashboard endpoints (W3 + instrumentation).
"""Ralph live dashboard endpoints (W3 + instrumentation + realtime).
Endpoints:
GET /api/ralph/status — toate proiectele Ralph (cards data)
GET /api/ralph/stream — Server-Sent Events stream (realtime)
GET /api/ralph/<slug>/log — tail progress.txt (default 100 lines)
GET /api/ralph/<slug>/prd — full prd.json content
GET /api/ralph/usage[?days=N] — rate limit budget summary (cross-project)
POST /api/ralph/<slug>/stop — SIGTERM la Ralph PID
POST /api/ralph/<slug>/rollback — git revert HEAD + decrement last passing story
Polling: 5s din ralph.html (suficient pentru iter 8-15min Ralph).
NU SSE/WebSocket pentru MVP.
SSE detail: stream emite `event: status\\ndata: <json>\\n\\n` la schimbări (poll
fişiere la 2s); heartbeat la 30s pentru ca clientul să nu reseze conexiunea.
Necesită ThreadingHTTPServer în api.py — altfel un singur stream blochează tot.
Citește status din `~/workspace/<slug>/scripts/ralph/`:
- prd.json → stories (passes/failed/blocked/retries)
@@ -21,11 +24,13 @@ Reuse path constants din `dashboard/constants.py` (WORKSPACE_DIR).
"""
import json
import os
import re
import signal
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
from urllib.parse import unquote
import constants
@@ -40,6 +45,10 @@ except ImportError: # pragma: no cover — diagnostic only
ralph_usage = None # type: ignore
# Slug strict: alphanum + dash + underscore, max 64 chars. Reject path traversal explicit.
_SLUG_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
# Path Ralph per proiect (mereu în scripts/ralph/)
def _ralph_dir(project_dir: Path) -> Path:
return project_dir / "scripts" / "ralph"
@@ -54,10 +63,20 @@ class RalphHandlers:
# ── helpers ────────────────────────────────────────────────
def _ralph_validate_slug(self, slug: str):
"""Validează slug-ul + returnează project_dir sau None."""
if not slug or "/" in slug or ".." in slug:
"""Validează slug-ul + returnează project_dir sau None.
Strict: alphanum + dash + underscore, ≤64 chars. Path traversal sequences
(`..`, `/`, `\\`) sau caractere ne-alfanumerice sunt respinse înainte de
orice atingere a filesystem-ului.
"""
if not slug:
return None
# Defense-in-depth: explicit path-traversal/separator reject (regex îl
# acoperă, dar îl ţinem explicit ca safety net dacă regex-ul se relaxează).
if ".." in slug or "/" in slug or "\\" in slug:
return None
if not _SLUG_RE.match(slug):
return None
slug = unquote(slug)
project_dir = constants.WORKSPACE_DIR / slug
try:
resolved = project_dir.resolve()
@@ -187,30 +206,121 @@ class RalphHandlers:
],
}
# ── /api/ralph/status (GET) ────────────────────────────────
def handle_ralph_status(self):
"""Întoarce status pentru toate proiectele Ralph din workspace."""
try:
projects = []
if not constants.WORKSPACE_DIR.exists():
self.send_json({"projects": [], "fetchedAt": datetime.now().isoformat()})
return
def _ralph_collect_status(self) -> dict:
"""Construieşte payload-ul de status pentru toate proiectele.
Folosit de `/api/ralph/status` (GET single-shot) şi de `/api/ralph/stream`
(SSE — emis la schimbări).
"""
projects: list[dict] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
summary = self._ralph_summarize_project(entry)
if summary is not None:
projects.append(summary)
return {
"projects": projects,
"fetchedAt": datetime.now().isoformat(),
"count": len(projects),
}
self.send_json({
"projects": projects,
"fetchedAt": datetime.now().isoformat(),
"count": len(projects),
})
def _ralph_signature(self, snapshot: dict) -> tuple:
"""Compactă semnătură pentru change-detection în SSE — doar fields care
contează pentru UI (status, counts, current story). Timestamps de iter
au granularitate de second pentru a evita flicker pe nanosecond drift.
"""
sig: list[tuple] = []
for p in snapshot.get("projects", []) or []:
cs = p.get("currentStory") or {}
sig.append((
p.get("slug"),
p.get("status"),
bool(p.get("running")),
p.get("storiesTotal"),
p.get("storiesComplete"),
p.get("storiesFailed"),
p.get("storiesBlocked"),
p.get("lastIterAt"),
cs.get("id"),
cs.get("retries"),
))
return tuple(sorted(sig, key=lambda t: t[0] or ""))
# ── /api/ralph/status (GET) ────────────────────────────────
def handle_ralph_status(self):
"""Întoarce status pentru toate proiectele Ralph din workspace."""
try:
self.send_json(self._ralph_collect_status())
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/stream (GET, SSE) ───────────────────────────
def handle_ralph_stream(self):
"""Server-Sent Events: emite snapshot la schimbări (poll fişiere 2s).
Heartbeat la 30s pentru a evita timeout pe proxy-uri. Loop-ul iese
curat la BrokenPipe (clientul închis tab-ul). Necesită
ThreadingHTTPServer în api.py — altfel blochează toate request-urile.
"""
try:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
# Disable proxy buffering (nginx/cloudflare) — flush imediat
self.send_header("X-Accel-Buffering", "no")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
except (BrokenPipeError, ConnectionResetError):
return
last_signature: tuple | None = None
last_heartbeat = time.monotonic()
# Initial snapshot — clientul nu aşteaptă primul change
try:
snapshot = self._ralph_collect_status()
last_signature = self._ralph_signature(snapshot)
payload = json.dumps(snapshot).encode("utf-8")
self.wfile.write(b"event: status\ndata: " + payload + b"\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
return
except Exception as exc:
try:
err = json.dumps({"error": str(exc)}).encode("utf-8")
self.wfile.write(b"event: error\ndata: " + err + b"\n\n")
self.wfile.flush()
except Exception:
pass
return
# Stream loop
while True:
try:
time.sleep(2)
snapshot = self._ralph_collect_status()
signature = self._ralph_signature(snapshot)
now = time.monotonic()
if signature != last_signature:
payload = json.dumps(snapshot).encode("utf-8")
self.wfile.write(b"event: status\ndata: " + payload + b"\n\n")
self.wfile.flush()
last_signature = signature
last_heartbeat = now
elif now - last_heartbeat >= 30:
self.wfile.write(b"event: heartbeat\ndata: {}\n\n")
self.wfile.flush()
last_heartbeat = now
except (BrokenPipeError, ConnectionResetError):
return
except Exception:
# Best-effort: o iteraţie eşuată nu trebuie să termine stream-ul,
# dar dacă socketul e mort BrokenPipe va prinde next loop.
continue
# ── /api/ralph/<slug>/log (GET) ────────────────────────────
def handle_ralph_log(self, slug: str):
"""Tail progress.txt pentru un slug. Default last 100 lines."""
@@ -368,3 +478,147 @@ class RalphHandlers:
self.send_json({"success": True, "message": f"Ralph stopped (PID {pid})"})
except Exception as exc:
self.send_json({"success": False, "error": str(exc)}, 500)
# ── /api/ralph/<slug>/rollback (POST) ──────────────────────
def _ralph_decrement_last_pass(self, project_dir: Path) -> str | None:
"""Marchează ultima story `passes=True` (din ordinea din prd.json) ca
incompletă (`passes=False`, şterge `failed`/`blocked`/`failureReason`,
retries=0). Atomic write (temp + rename). Întoarce id-ul story-ului
sau None dacă nu există nimic de decrementat / prd.json invalid.
"""
prd_path = _ralph_dir(project_dir) / "prd.json"
if not prd_path.exists():
return None
try:
prd = json.loads(prd_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
stories = prd.get("userStories", []) or []
target_idx: int | None = None
# ultima poziţională cu passes=True (DAG-order = ordine de finalizare)
for i in range(len(stories) - 1, -1, -1):
if stories[i].get("passes"):
target_idx = i
break
if target_idx is None:
return None
story_id = stories[target_idx].get("id")
stories[target_idx]["passes"] = False
# Reset stare derivată — story-ul e disponibil pentru re-run
stories[target_idx].pop("failed", None)
stories[target_idx].pop("blocked", None)
stories[target_idx].pop("failureReason", None)
stories[target_idx]["retries"] = 0
# Atomic write (acelaşi pattern ca W3 ralph_dag.py)
tmp = prd_path.with_suffix(".json.tmp")
try:
tmp.write_text(json.dumps(prd, indent=2), encoding="utf-8")
tmp.replace(prd_path)
except OSError:
tmp.unlink(missing_ok=True)
return None
return story_id
def handle_ralph_rollback(self, slug: str):
"""Rollback ultimul commit într-un proiect Ralph.
Strategy: `git revert --no-edit HEAD` (history-preserving). Fallback la
`git reset --hard HEAD~1` doar dacă revert eşuează (conflict, binary
file). După succes, decrementează `passes` pe ultima story marcată
complete în prd.json (atomic write).
Returns: `{success, message, reverted_commit, story_reverted, method}`.
"""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({
"success": False,
"message": "Invalid project slug",
"reverted_commit": None,
"story_reverted": None,
}, 400)
return
git_dir = project_dir / ".git"
if not git_dir.exists():
self.send_json({
"success": False,
"message": "Not a git repository",
"reverted_commit": None,
"story_reverted": None,
}, 400)
return
# Read HEAD before any operation (raportăm SHA-ul afectat)
head_proc = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=str(project_dir), capture_output=True, text=True, timeout=10,
)
if head_proc.returncode != 0:
self.send_json({
"success": False,
"message": f"git rev-parse HEAD failed: {head_proc.stderr.strip()}",
"reverted_commit": None,
"story_reverted": None,
}, 500)
return
commit_to_revert = head_proc.stdout.strip()
# Try revert (preserves history, recommended)
method = "revert"
revert = subprocess.run(
["git", "revert", "--no-edit", "HEAD"],
cwd=str(project_dir), capture_output=True, text=True, timeout=30,
)
if revert.returncode != 0:
# Conflict / binary file — abort & fall back to reset --hard
subprocess.run(
["git", "revert", "--abort"],
cwd=str(project_dir), capture_output=True, timeout=10,
)
reset = subprocess.run(
["git", "reset", "--hard", "HEAD~1"],
cwd=str(project_dir), capture_output=True, text=True, timeout=30,
)
if reset.returncode != 0:
self.send_json({
"success": False,
"message": (
f"revert failed ({revert.stderr.strip()[:200]}), "
f"reset failed ({reset.stderr.strip()[:200]})"
),
"reverted_commit": commit_to_revert,
"story_reverted": None,
}, 500)
return
method = "reset"
# Best-effort: decrement story passes (nu fail dacă lipseşte prd.json)
story_reverted = self._ralph_decrement_last_pass(project_dir)
short_sha = commit_to_revert[:8]
msg_bits = [f"Rolled back {short_sha} via git {method}"]
if story_reverted:
msg_bits.append(f"story {story_reverted} marked incomplete")
self.send_json({
"success": True,
"message": "; ".join(msg_bits),
"reverted_commit": commit_to_revert,
"story_reverted": story_reverted,
"method": method,
})
except subprocess.TimeoutExpired:
self.send_json({
"success": False,
"message": "git operation timed out",
"reverted_commit": None,
"story_reverted": None,
}, 500)
except Exception as exc:
self.send_json({
"success": False,
"message": str(exc),
"reverted_commit": None,
"story_reverted": None,
}, 500)