Files
echo-core/tools/ralph_dag.py
Marius Mutu 655ed3ae09 feat(ralph): smart gates + DAG + dashboard live (W3)
Restructurare Ralph QC loop pe smart gate dispatcher tag-driven (în loc de
5 faze fixe), DAG dependsOn cu propagare blocked, retry guard 3-strike, rate
limit detection, plus dashboard live cu polling 5s.

Changes:
- tools/ralph_prd_generator.py: parametru optional final_plan_path; când e
  furnizat, invocă Claude Opus pe final-plan.md pentru extragere user stories
  cu schema extinsă (tags, dependsOn, acceptanceCriteria 3-5). Backward compat
  păstrat — fără final_plan_path, fallback la heuristic-ul vechi.
- tools/ralph/prd-template.json: schema W3 (tags[], dependsOn[], retries,
  failed, blocked, failureReason, requiresDesignReview).
- tools/ralph/prompt.md: 4 faze (impl, base quality, smart gates, commit) +
  dispatcher pe story.tags. Tags vide → run-all-gates fallback (safe default).
- tools/ralph_dag.py (nou): tag validation heuristic anti-silent-regression
  (force ui dacă diff atinge .vue/.tsx/.html/.css/.scss; force db pentru
  migrations sau .sql; force vercel dacă există vercel.json) + topological
  sort cu blocked propagation + atomic prd.json updates.
- tools/ralph/ralph.sh: --max-turns 30, DAG-aware story selection, retry
  counter cu auto-fail la 3, rate limit detection (sleep 30min + 1 retry),
  CLI subcommands prin tools/ralph_dag.py helper.
- dashboard/handlers/ralph.py (nou): /api/ralph/status + /<slug>/log + /prd
  + /stop. Defensive vs corrupt prd.json. Sandbox-ed PID kill.
- dashboard/ralph.html (nou): live cards 3/2/1 col responsive, polling 5s,
  drawer pentru log/PRD viewer, status colors (--status-running/blocked/
  failed/complete declarate inline), Lucide icons cu aria-labels.
- dashboard/api.py: mount /api/ralph/* (GET status/log/prd, POST stop).
- tests/: 72 teste noi (smart gates, DAG, retry, dashboard endpoint).

Note arhitecturale:
- Polling 5s ales peste SSE/WebSocket (suficient pentru iter Ralph 8-15min)
- Tag validation rulează POST-iter pe diff git pentru anti-silent-regression
- Rate limit retry: 1 dată per rulare, apoi mark failed=rate_limited

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 18:36:35 +00:00

268 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Ralph DAG + tag-validation helpers (W3 smart gates).
Două responsabilități:
1. Tag validation heuristic — anti-silent-regression. Forțează tags pe baza
diff content (chiar dacă Opus le-a omis). Rulat ÎNAINTE de smart gate dispatch.
2. DAG topological sort — alege următorul story eligibil (passes/failed/blocked
propagation). Rulat de ralph.sh la începutul fiecărei iterații.
CLI subcommands (apelate din ralph.sh):
python3 ralph_dag.py infer-tags <story_id> <project_dir>
→ printează tags inferate (newline-separated) pe baza git diff HEAD~1.
python3 ralph_dag.py next-story <prd.json>
→ printează story_id eligibil (DAG-aware) sau exit 1 dacă nimic.
python3 ralph_dag.py mark-failed <prd.json> <story_id> <reason>
→ marchează story.failed=true cu motiv; propagă blocked la dependenți.
python3 ralph_dag.py incr-retry <prd.json> <story_id>
→ +1 retries; dacă >=3, mark failed cu reason="max_retries"; print new count.
python3 ralph_dag.py force-tags <prd.json> <story_id> <project_dir>
→ adaugă tags inferate DIN DIFF în story.tags (idempotent, deduplicat);
dacă tags sunt vide după → fallback la "run-all" (NU modifică, doar print "EMPTY").
"""
import json
import re
import subprocess
import sys
from pathlib import Path
from typing import List, Optional
VALID_TAGS = {"ui", "db", "vercel", "refactor", "docs", "backend", "infra"}
# Heuristici diff → tag. Sortate stabil pentru determinism în teste.
UI_PATTERN = re.compile(r'\.(vue|tsx|jsx|html|css|scss|svelte)$', re.IGNORECASE)
DB_MIGRATIONS = re.compile(r'(^|/)migrations?/', re.IGNORECASE)
DB_SQL = re.compile(r'\.sql$', re.IGNORECASE)
def infer_tags_from_paths(paths: List[str], has_vercel_json: bool = False) -> List[str]:
"""Pure function: dat list de file paths atinse + flag vercel.json, întoarce tags inferate.
Returnează lista deduplicată, ordonată: ui, db, vercel.
"""
tags = set()
for p in paths:
if not p:
continue
if UI_PATTERN.search(p):
tags.add("ui")
if DB_MIGRATIONS.search(p) or DB_SQL.search(p):
tags.add("db")
if has_vercel_json:
tags.add("vercel")
# Ordine stabilă pentru determinism (teste + diff-uri reproducibile)
return sorted(tags, key=lambda t: ("ui", "db", "vercel").index(t) if t in ("ui", "db", "vercel") else 99)
def force_include_tags(existing_tags: List[str], diff_paths: List[str], has_vercel_json: bool) -> List[str]:
"""Combinator: existing tags inferred din diff. Filtrează la VALID_TAGS.
Garanție anti-silent-regression: chiar dacă Opus a marcat story=docs, dacă diff
atinge .vue/.tsx → ui e forțat. Story=docs care realmente atinge UI ar fi avut
silent skip /qa.
"""
inferred = infer_tags_from_paths(diff_paths, has_vercel_json)
combined = []
seen = set()
for t in list(existing_tags) + inferred:
if t in VALID_TAGS and t not in seen:
combined.append(t)
seen.add(t)
return combined
def get_diff_paths(project_dir: Path, ref: str = "HEAD~1") -> List[str]:
"""Întoarce file paths din `git diff --name-only <ref>` în project_dir.
Lista vidă dacă git nu e disponibil sau nu există commit anterior.
"""
try:
result = subprocess.run(
["git", "-C", str(project_dir), "diff", "--name-only", ref],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
# Fallback: diff vs HEAD (uncommitted changes) — util pe primul commit
result = subprocess.run(
["git", "-C", str(project_dir), "diff", "--name-only", "HEAD"],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
return []
return [p.strip() for p in result.stdout.splitlines() if p.strip()]
except (subprocess.TimeoutExpired, FileNotFoundError):
return []
def topological_eligible(stories: List[dict]) -> Optional[dict]:
"""Întoarce primul story eligibil (DAG-aware) — sau None dacă nimic.
Eligibil = !passes ∧ !failed ∧ !blocked ∧ toate `dependsOn` au `passes==True`.
Dacă story.dependsOn conține un ID `failed` → story-ul DEVINE blocked (mutat in-place).
Sortare: priority asc.
"""
by_id = {s.get("id"): s for s in stories}
# Pas 1: propagă blocked dacă vreun dep e failed
changed = True
while changed:
changed = False
for s in stories:
if s.get("passes") or s.get("failed") or s.get("blocked"):
continue
for dep_id in s.get("dependsOn") or []:
dep = by_id.get(dep_id)
if dep and (dep.get("failed") or dep.get("blocked")):
s["blocked"] = True
s["failureReason"] = f"blocked_by:{dep_id}"
changed = True
break
# Pas 2: găsește story eligibil cu cea mai mică priority
eligible = []
for s in stories:
if s.get("passes") or s.get("failed") or s.get("blocked"):
continue
deps = s.get("dependsOn") or []
if all(by_id.get(d, {}).get("passes") for d in deps):
eligible.append(s)
if not eligible:
return None
eligible.sort(key=lambda s: (s.get("priority", 999), s.get("id", "")))
return eligible[0]
def _load_prd(prd_path: Path) -> dict:
with open(prd_path, encoding="utf-8") as f:
return json.load(f)
def _save_prd(prd_path: Path, data: dict) -> None:
"""Atomic write — temp file + rename, evită corruption mid-write."""
tmp = prd_path.with_suffix(".json.tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
tmp.replace(prd_path)
def cmd_next_story(prd_path: Path) -> int:
data = _load_prd(prd_path)
stories = data.get("userStories", [])
chosen = topological_eligible(stories)
# Salvăm dacă topological_eligible a propagat blocked (mutație in-place)
_save_prd(prd_path, data)
if not chosen:
return 1
print(chosen.get("id", ""))
return 0
def cmd_mark_failed(prd_path: Path, story_id: str, reason: str) -> int:
data = _load_prd(prd_path)
found = False
for s in data.get("userStories", []):
if s.get("id") == story_id:
s["failed"] = True
s["passes"] = False
s["failureReason"] = reason or "unknown"
found = True
break
if not found:
print(f"Story {story_id} not found", file=sys.stderr)
return 1
# Propagate blocked la dependenți
topological_eligible(data.get("userStories", []))
_save_prd(prd_path, data)
print(f"failed: {story_id} ({reason})")
return 0
def cmd_incr_retry(prd_path: Path, story_id: str) -> int:
data = _load_prd(prd_path)
for s in data.get("userStories", []):
if s.get("id") == story_id:
s["retries"] = int(s.get("retries", 0)) + 1
new = s["retries"]
if new >= 3:
s["failed"] = True
s["failureReason"] = "max_retries"
topological_eligible(data.get("userStories", []))
_save_prd(prd_path, data)
print(new)
return 0
print(f"Story {story_id} not found", file=sys.stderr)
return 1
def cmd_force_tags(prd_path: Path, story_id: str, project_dir: Path) -> int:
data = _load_prd(prd_path)
diff_paths = get_diff_paths(project_dir)
has_vercel = (project_dir / "vercel.json").exists()
for s in data.get("userStories", []):
if s.get("id") == story_id:
existing = s.get("tags") or []
forced = force_include_tags(existing, diff_paths, has_vercel)
s["tags"] = forced
_save_prd(prd_path, data)
if not forced:
print("EMPTY") # ralph.sh interpretează ca run-all-gates fallback
else:
for t in forced:
print(t)
return 0
print(f"Story {story_id} not found", file=sys.stderr)
return 1
def cmd_infer_tags(story_id: str, project_dir: Path) -> int:
"""Variant care NU modifică prd.json — doar print tags inferate din diff curent."""
diff_paths = get_diff_paths(project_dir)
has_vercel = (project_dir / "vercel.json").exists()
inferred = infer_tags_from_paths(diff_paths, has_vercel)
for t in inferred:
print(t)
return 0 if inferred else 1
def main() -> int:
if len(sys.argv) < 2:
print(__doc__)
return 2
cmd = sys.argv[1]
args = sys.argv[2:]
try:
if cmd == "next-story" and len(args) == 1:
return cmd_next_story(Path(args[0]))
if cmd == "mark-failed" and len(args) == 3:
return cmd_mark_failed(Path(args[0]), args[1], args[2])
if cmd == "incr-retry" and len(args) == 2:
return cmd_incr_retry(Path(args[0]), args[1])
if cmd == "force-tags" and len(args) == 3:
return cmd_force_tags(Path(args[0]), args[1], Path(args[2]))
if cmd == "infer-tags" and len(args) == 2:
return cmd_infer_tags(args[0], Path(args[1]))
print(f"Unknown command: {cmd}", file=sys.stderr)
print(__doc__, file=sys.stderr)
return 2
except FileNotFoundError as exc:
print(f"File not found: {exc}", file=sys.stderr)
return 3
except json.JSONDecodeError as exc:
print(f"Invalid JSON: {exc}", file=sys.stderr)
return 3
if __name__ == "__main__":
sys.exit(main())