Files
echo-core/dashboard/handlers/ralph.py
Marius Mutu 03d875974b Merge branch 'ralph/dashboard-realtime' — SSE realtime + story rollback
Server-Sent Events (TODO P3):
- GET /api/ralph/stream — signature-based change detection (poll FS 2s, emit
  doar la diff), heartbeat 30s, X-Accel-Buffering:no
- HTTPServer → ThreadingHTTPServer (altfel SSE blochează toate endpoint-urile)
- ralph.html: EventSource cu fallback permanent la polling 5s când CLOSED.
  Badge: 🟢 Live / ⏱ Polling / Offline

Story rollback (TODO P3):
- POST /api/ralph/<slug>/rollback — git revert --no-edit HEAD; fallback
  git reset --hard HEAD~1 doar la conflict
- Decrementează passes pe ultima story complete; clears failed/blocked/retries
  (atomic temp+rename)
- Slug strict regex ^[A-Za-z0-9_-]{1,64}$ + reject path traversal explicit
- Buton ↩️ pe card-uri running; confirm dialog înainte de execuție
- Response: {success, message, reverted_commit, story_reverted, method}

Tests: 39/39 pe test_dashboard_ralph_endpoint (era 19; +20 cazuri noi).

# Conflicts:
#	dashboard/api.py
#	dashboard/handlers/ralph.py
2026-04-26 19:14:17 +00:00

625 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Ralph live dashboard endpoints (W3 + instrumentation + realtime).
Endpoints:
GET /api/ralph/status — toate proiectele Ralph (cards data)
GET /api/ralph/stream — Server-Sent Events stream (realtime)
GET /api/ralph/<slug>/log — tail progress.txt (default 100 lines)
GET /api/ralph/<slug>/prd — full prd.json content
GET /api/ralph/usage[?days=N] — rate limit budget summary (cross-project)
POST /api/ralph/<slug>/stop — SIGTERM la Ralph PID
POST /api/ralph/<slug>/rollback — git revert HEAD + decrement last passing story
SSE detail: stream emite `event: status\\ndata: <json>\\n\\n` la schimbări (poll
fişiere la 2s); heartbeat la 30s pentru ca clientul să nu reseze conexiunea.
Necesită ThreadingHTTPServer în api.py — altfel un singur stream blochează tot.
Citește status din `~/workspace/<slug>/scripts/ralph/`:
- prd.json → stories (passes/failed/blocked/retries)
- progress.txt → log human-readable
- logs/iteration-*.log → mtime ultimului iter
- .ralph.pid → PID activ (verificat cu os.kill 0)
- usage.jsonl → token/cost log per iter (instrumentation MVP)
Reuse path constants din `dashboard/constants.py` (WORKSPACE_DIR).
"""
import json
import os
import re
import signal
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
import constants
# Best-effort import of pure functions for /api/ralph/usage (instrumentation MVP).
# Helper lives at <repo>/tools/ralph_usage.py — sibling of `dashboard/`.
_TOOLS_DIR = Path(__file__).resolve().parents[2] / "tools"
if str(_TOOLS_DIR) not in sys.path:
sys.path.insert(0, str(_TOOLS_DIR))
try:
import ralph_usage # type: ignore
except ImportError: # pragma: no cover — diagnostic only
ralph_usage = None # type: ignore
# Slug strict: alphanum + dash + underscore, max 64 chars. Reject path traversal explicit.
_SLUG_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
# Path Ralph per proiect (mereu în scripts/ralph/)
def _ralph_dir(project_dir: Path) -> Path:
return project_dir / "scripts" / "ralph"
# Estimare ETA simplistă: avg iter time × stories rămase
DEFAULT_ITER_MINUTES = 12 # midpoint din intervalul 8-15min menționat în plan
class RalphHandlers:
"""Mixin pentru /api/ralph/* — Ralph live status + control."""
# ── helpers ────────────────────────────────────────────────
def _ralph_validate_slug(self, slug: str):
"""Validează slug-ul + returnează project_dir sau None.
Strict: alphanum + dash + underscore, ≤64 chars. Path traversal sequences
(`..`, `/`, `\\`) sau caractere ne-alfanumerice sunt respinse înainte de
orice atingere a filesystem-ului.
"""
if not slug:
return None
# Defense-in-depth: explicit path-traversal/separator reject (regex îl
# acoperă, dar îl ţinem explicit ca safety net dacă regex-ul se relaxează).
if ".." in slug or "/" in slug or "\\" in slug:
return None
if not _SLUG_RE.match(slug):
return None
project_dir = constants.WORKSPACE_DIR / slug
try:
resolved = project_dir.resolve()
workspace_resolved = constants.WORKSPACE_DIR.resolve()
resolved.relative_to(workspace_resolved)
except (ValueError, OSError):
return None
if not project_dir.exists() or not project_dir.is_dir():
return None
return project_dir
def _ralph_pid_alive(self, ralph_dir: Path):
"""Întoarce (running: bool, pid: int|None)."""
pid_file = ralph_dir / ".ralph.pid"
if not pid_file.exists():
return False, None
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, 0) # signal 0 = check existence
return True, pid
except (ValueError, ProcessLookupError, PermissionError, OSError):
return False, None
def _ralph_eta_minutes(self, stories_remaining: int, last_iter_mtime: float | None) -> int | None:
"""Estimează minute rămase — None dacă nu avem date."""
if stories_remaining <= 0:
return 0
return stories_remaining * DEFAULT_ITER_MINUTES
def _ralph_summarize_project(self, project_dir: Path) -> dict | None:
"""Construiește dict de status per proiect — None dacă nu e Ralph project."""
ralph_dir = _ralph_dir(project_dir)
prd_json = ralph_dir / "prd.json"
if not prd_json.exists():
return None
# Defensive parse — corupt prd.json nu trebuie să dărâme dashboard
try:
prd = json.loads(prd_json.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return {
"slug": project_dir.name,
"status": "error",
"error": "prd.json invalid sau ilizibil",
"running": False,
"pid": None,
"stories": [],
"storiesTotal": 0,
"storiesComplete": 0,
"storiesFailed": 0,
"storiesBlocked": 0,
}
stories = prd.get("userStories", []) or []
total = len(stories)
complete = sum(1 for s in stories if s.get("passes"))
failed = sum(1 for s in stories if s.get("failed"))
blocked = sum(1 for s in stories if s.get("blocked"))
remaining = total - complete - failed - blocked
running, pid = self._ralph_pid_alive(ralph_dir)
# Last iteration mtime (pentru "acum X")
logs_dir = ralph_dir / "logs"
last_iter_mtime = None
last_iter_iso = None
if logs_dir.exists():
iter_logs = sorted(logs_dir.glob("iteration-*.log"), key=lambda f: f.stat().st_mtime, reverse=True)
if iter_logs:
last_iter_mtime = iter_logs[0].stat().st_mtime
last_iter_iso = datetime.fromtimestamp(last_iter_mtime).isoformat()
# Status compus pentru UI cards
if running:
top_status = "running"
elif failed > 0 and remaining == 0:
top_status = "failed"
elif complete == total and total > 0:
top_status = "complete"
elif blocked > 0 and running is False:
top_status = "blocked"
else:
top_status = "idle"
# Current story (DAG-eligible cel mai mic priority)
current_story = None
if running:
eligible = [
s for s in stories
if not s.get("passes") and not s.get("failed") and not s.get("blocked")
]
eligible.sort(key=lambda s: (s.get("priority", 999), s.get("id", "")))
if eligible:
current_story = {
"id": eligible[0].get("id"),
"title": eligible[0].get("title"),
"tags": eligible[0].get("tags", []),
"retries": eligible[0].get("retries", 0),
}
return {
"slug": project_dir.name,
"status": top_status,
"running": running,
"pid": pid,
"branchName": prd.get("branchName", ""),
"storiesTotal": total,
"storiesComplete": complete,
"storiesFailed": failed,
"storiesBlocked": blocked,
"storiesRemaining": remaining,
"currentStory": current_story,
"lastIterAt": last_iter_iso,
"etaMinutes": self._ralph_eta_minutes(remaining, last_iter_mtime),
"stories": [
{
"id": s.get("id"),
"title": s.get("title"),
"passes": bool(s.get("passes")),
"failed": bool(s.get("failed")),
"blocked": bool(s.get("blocked")),
"retries": int(s.get("retries", 0)),
"tags": s.get("tags", []),
"failureReason": s.get("failureReason", ""),
}
for s in stories
],
}
def _ralph_collect_status(self) -> dict:
"""Construieşte payload-ul de status pentru toate proiectele.
Folosit de `/api/ralph/status` (GET single-shot) şi de `/api/ralph/stream`
(SSE — emis la schimbări).
"""
projects: list[dict] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
summary = self._ralph_summarize_project(entry)
if summary is not None:
projects.append(summary)
return {
"projects": projects,
"fetchedAt": datetime.now().isoformat(),
"count": len(projects),
}
def _ralph_signature(self, snapshot: dict) -> tuple:
"""Compactă semnătură pentru change-detection în SSE — doar fields care
contează pentru UI (status, counts, current story). Timestamps de iter
au granularitate de second pentru a evita flicker pe nanosecond drift.
"""
sig: list[tuple] = []
for p in snapshot.get("projects", []) or []:
cs = p.get("currentStory") or {}
sig.append((
p.get("slug"),
p.get("status"),
bool(p.get("running")),
p.get("storiesTotal"),
p.get("storiesComplete"),
p.get("storiesFailed"),
p.get("storiesBlocked"),
p.get("lastIterAt"),
cs.get("id"),
cs.get("retries"),
))
return tuple(sorted(sig, key=lambda t: t[0] or ""))
# ── /api/ralph/status (GET) ────────────────────────────────
def handle_ralph_status(self):
"""Întoarce status pentru toate proiectele Ralph din workspace."""
try:
self.send_json(self._ralph_collect_status())
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/stream (GET, SSE) ───────────────────────────
def handle_ralph_stream(self):
"""Server-Sent Events: emite snapshot la schimbări (poll fişiere 2s).
Heartbeat la 30s pentru a evita timeout pe proxy-uri. Loop-ul iese
curat la BrokenPipe (clientul închis tab-ul). Necesită
ThreadingHTTPServer în api.py — altfel blochează toate request-urile.
"""
try:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
# Disable proxy buffering (nginx/cloudflare) — flush imediat
self.send_header("X-Accel-Buffering", "no")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
except (BrokenPipeError, ConnectionResetError):
return
last_signature: tuple | None = None
last_heartbeat = time.monotonic()
# Initial snapshot — clientul nu aşteaptă primul change
try:
snapshot = self._ralph_collect_status()
last_signature = self._ralph_signature(snapshot)
payload = json.dumps(snapshot).encode("utf-8")
self.wfile.write(b"event: status\ndata: " + payload + b"\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
return
except Exception as exc:
try:
err = json.dumps({"error": str(exc)}).encode("utf-8")
self.wfile.write(b"event: error\ndata: " + err + b"\n\n")
self.wfile.flush()
except Exception:
pass
return
# Stream loop
while True:
try:
time.sleep(2)
snapshot = self._ralph_collect_status()
signature = self._ralph_signature(snapshot)
now = time.monotonic()
if signature != last_signature:
payload = json.dumps(snapshot).encode("utf-8")
self.wfile.write(b"event: status\ndata: " + payload + b"\n\n")
self.wfile.flush()
last_signature = signature
last_heartbeat = now
elif now - last_heartbeat >= 30:
self.wfile.write(b"event: heartbeat\ndata: {}\n\n")
self.wfile.flush()
last_heartbeat = now
except (BrokenPipeError, ConnectionResetError):
return
except Exception:
# Best-effort: o iteraţie eşuată nu trebuie să termine stream-ul,
# dar dacă socketul e mort BrokenPipe va prinde next loop.
continue
# ── /api/ralph/<slug>/log (GET) ────────────────────────────
def handle_ralph_log(self, slug: str):
"""Tail progress.txt pentru un slug. Default last 100 lines."""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({"error": "Invalid project slug"}, 400)
return
from urllib.parse import parse_qs, urlparse
qs = parse_qs(urlparse(self.path).query)
try:
lines_n = min(int(qs.get("lines", ["100"])[0]), 1000)
except ValueError:
lines_n = 100
progress = _ralph_dir(project_dir) / "progress.txt"
if not progress.exists():
self.send_json({"slug": slug, "lines": [], "total": 0})
return
try:
content = progress.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
self.send_json({"error": f"read failed: {exc}"}, 500)
return
all_lines = content.splitlines()
tail = all_lines[-lines_n:] if len(all_lines) > lines_n else all_lines
self.send_json({
"slug": slug,
"lines": tail,
"total": len(all_lines),
})
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/<slug>/prd (GET) ────────────────────────────
def handle_ralph_prd(self, slug: str):
"""Returnează full prd.json pentru un slug."""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({"error": "Invalid project slug"}, 400)
return
prd_json = _ralph_dir(project_dir) / "prd.json"
if not prd_json.exists():
self.send_json({"error": "prd.json not found"}, 404)
return
try:
data = json.loads(prd_json.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
self.send_json({"error": f"prd.json invalid: {exc}"}, 500)
return
self.send_json(data)
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/usage (GET) ─────────────────────────────────
def handle_ralph_usage(self):
"""Returnează rate limit budget summary cross-project.
Citește toate `~/workspace/<slug>/scripts/ralph/usage.jsonl`, le concatenează,
rulează `ralph_usage.summarize` cu `?days=N` (default 7).
Răspuns:
{
"today": "YYYY-MM-DD",
"today_cost": float,
"today_runs": int,
"window_days": N,
"window_cost": float,
"window_runs": int,
"by_project": {...},
"by_day": {...},
"total_cost": float,
"total_runs": int
}
"""
try:
from urllib.parse import parse_qs, urlparse
qs = parse_qs(urlparse(self.path).query)
try:
days = int(qs.get("days", ["7"])[0])
if days <= 0:
days = 7
if days > 365:
days = 365
except ValueError:
days = 7
if ralph_usage is None:
self.send_json({"error": "ralph_usage helper unavailable"}, 500)
return
entries: list[dict] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
usage_path = _ralph_dir(entry) / "usage.jsonl"
if usage_path.exists():
entries.extend(ralph_usage.parse_usage_jsonl(usage_path))
summary = ralph_usage.summarize(entries, days=days)
summary["fetchedAt"] = datetime.now().isoformat()
self.send_json(summary)
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/<slug>/stop (POST) ──────────────────────────
def handle_ralph_stop(self, slug: str):
"""Trimite SIGTERM la Ralph PID. Verifică că PID-ul e în WORKSPACE_DIR."""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({"success": False, "error": "Invalid project slug"}, 400)
return
ralph_dir = _ralph_dir(project_dir)
pid_file = ralph_dir / ".ralph.pid"
if not pid_file.exists():
self.send_json({"success": False, "error": "No PID file"}, 404)
return
try:
pid = int(pid_file.read_text().strip())
except (ValueError, OSError) as exc:
self.send_json({"success": False, "error": f"Invalid PID file: {exc}"}, 500)
return
# Sandbox: verifică că procesul e în workspace (nu omoară random PID)
try:
proc_cwd = Path(f"/proc/{pid}/cwd").resolve()
if not str(proc_cwd).startswith(str(constants.WORKSPACE_DIR)):
self.send_json({"success": False, "error": "PID not in workspace"}, 403)
return
except (FileNotFoundError, PermissionError):
# Procesul nu mai există — best-effort cleanup
self.send_json({"success": True, "message": "Process already stopped"})
return
try:
os.killpg(os.getpgid(pid), signal.SIGTERM)
except ProcessLookupError:
self.send_json({"success": True, "message": "Process already stopped"})
return
except PermissionError:
self.send_json({"success": False, "error": "Permission denied"}, 403)
return
self.send_json({"success": True, "message": f"Ralph stopped (PID {pid})"})
except Exception as exc:
self.send_json({"success": False, "error": str(exc)}, 500)
# ── /api/ralph/<slug>/rollback (POST) ──────────────────────
def _ralph_decrement_last_pass(self, project_dir: Path) -> str | None:
"""Marchează ultima story `passes=True` (din ordinea din prd.json) ca
incompletă (`passes=False`, şterge `failed`/`blocked`/`failureReason`,
retries=0). Atomic write (temp + rename). Întoarce id-ul story-ului
sau None dacă nu există nimic de decrementat / prd.json invalid.
"""
prd_path = _ralph_dir(project_dir) / "prd.json"
if not prd_path.exists():
return None
try:
prd = json.loads(prd_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
stories = prd.get("userStories", []) or []
target_idx: int | None = None
# ultima poziţională cu passes=True (DAG-order = ordine de finalizare)
for i in range(len(stories) - 1, -1, -1):
if stories[i].get("passes"):
target_idx = i
break
if target_idx is None:
return None
story_id = stories[target_idx].get("id")
stories[target_idx]["passes"] = False
# Reset stare derivată — story-ul e disponibil pentru re-run
stories[target_idx].pop("failed", None)
stories[target_idx].pop("blocked", None)
stories[target_idx].pop("failureReason", None)
stories[target_idx]["retries"] = 0
# Atomic write (acelaşi pattern ca W3 ralph_dag.py)
tmp = prd_path.with_suffix(".json.tmp")
try:
tmp.write_text(json.dumps(prd, indent=2), encoding="utf-8")
tmp.replace(prd_path)
except OSError:
tmp.unlink(missing_ok=True)
return None
return story_id
def handle_ralph_rollback(self, slug: str):
"""Rollback ultimul commit într-un proiect Ralph.
Strategy: `git revert --no-edit HEAD` (history-preserving). Fallback la
`git reset --hard HEAD~1` doar dacă revert eşuează (conflict, binary
file). După succes, decrementează `passes` pe ultima story marcată
complete în prd.json (atomic write).
Returns: `{success, message, reverted_commit, story_reverted, method}`.
"""
try:
project_dir = self._ralph_validate_slug(slug)
if not project_dir:
self.send_json({
"success": False,
"message": "Invalid project slug",
"reverted_commit": None,
"story_reverted": None,
}, 400)
return
git_dir = project_dir / ".git"
if not git_dir.exists():
self.send_json({
"success": False,
"message": "Not a git repository",
"reverted_commit": None,
"story_reverted": None,
}, 400)
return
# Read HEAD before any operation (raportăm SHA-ul afectat)
head_proc = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=str(project_dir), capture_output=True, text=True, timeout=10,
)
if head_proc.returncode != 0:
self.send_json({
"success": False,
"message": f"git rev-parse HEAD failed: {head_proc.stderr.strip()}",
"reverted_commit": None,
"story_reverted": None,
}, 500)
return
commit_to_revert = head_proc.stdout.strip()
# Try revert (preserves history, recommended)
method = "revert"
revert = subprocess.run(
["git", "revert", "--no-edit", "HEAD"],
cwd=str(project_dir), capture_output=True, text=True, timeout=30,
)
if revert.returncode != 0:
# Conflict / binary file — abort & fall back to reset --hard
subprocess.run(
["git", "revert", "--abort"],
cwd=str(project_dir), capture_output=True, timeout=10,
)
reset = subprocess.run(
["git", "reset", "--hard", "HEAD~1"],
cwd=str(project_dir), capture_output=True, text=True, timeout=30,
)
if reset.returncode != 0:
self.send_json({
"success": False,
"message": (
f"revert failed ({revert.stderr.strip()[:200]}), "
f"reset failed ({reset.stderr.strip()[:200]})"
),
"reverted_commit": commit_to_revert,
"story_reverted": None,
}, 500)
return
method = "reset"
# Best-effort: decrement story passes (nu fail dacă lipseşte prd.json)
story_reverted = self._ralph_decrement_last_pass(project_dir)
short_sha = commit_to_revert[:8]
msg_bits = [f"Rolled back {short_sha} via git {method}"]
if story_reverted:
msg_bits.append(f"story {story_reverted} marked incomplete")
self.send_json({
"success": True,
"message": "; ".join(msg_bits),
"reverted_commit": commit_to_revert,
"story_reverted": story_reverted,
"method": method,
})
except subprocess.TimeoutExpired:
self.send_json({
"success": False,
"message": "git operation timed out",
"reverted_commit": None,
"story_reverted": None,
}, 500)
except Exception as exc:
self.send_json({
"success": False,
"message": str(exc),
"reverted_commit": None,
"story_reverted": None,
}, 500)