#!/usr/bin/env python3 """ Ralph DAG + tag-validation helpers (W3 smart gates). Două responsabilități: 1. Tag validation heuristic — anti-silent-regression. Forțează tags pe baza diff content (chiar dacă Opus le-a omis). Rulat ÎNAINTE de smart gate dispatch. 2. DAG topological sort — alege următorul story eligibil (passes/failed/blocked propagation). Rulat de ralph.sh la începutul fiecărei iterații. CLI subcommands (apelate din ralph.sh): python3 ralph_dag.py infer-tags → printează tags inferate (newline-separated) pe baza git diff HEAD~1. python3 ralph_dag.py next-story → printează story_id eligibil (DAG-aware) sau exit 1 dacă nimic. python3 ralph_dag.py mark-failed → marchează story.failed=true cu motiv; propagă blocked la dependenți. python3 ralph_dag.py incr-retry → +1 retries; dacă >=3, mark failed cu reason="max_retries"; print new count. python3 ralph_dag.py force-tags → adaugă tags inferate DIN DIFF în story.tags (idempotent, deduplicat); dacă tags sunt vide după → fallback la "run-all" (NU modifică, doar print "EMPTY"). """ import json import re import subprocess import sys from pathlib import Path from typing import List, Optional VALID_TAGS = {"ui", "db", "vercel", "refactor", "docs", "backend", "infra"} # Heuristici diff → tag. Sortate stabil pentru determinism în teste. UI_PATTERN = re.compile(r'\.(vue|tsx|jsx|html|css|scss|svelte)$', re.IGNORECASE) DB_MIGRATIONS = re.compile(r'(^|/)migrations?/', re.IGNORECASE) DB_SQL = re.compile(r'\.sql$', re.IGNORECASE) def infer_tags_from_paths(paths: List[str], has_vercel_json: bool = False) -> List[str]: """Pure function: dat list de file paths atinse + flag vercel.json, întoarce tags inferate. Returnează lista deduplicată, ordonată: ui, db, vercel. """ tags = set() for p in paths: if not p: continue if UI_PATTERN.search(p): tags.add("ui") if DB_MIGRATIONS.search(p) or DB_SQL.search(p): tags.add("db") if has_vercel_json: tags.add("vercel") # Ordine stabilă pentru determinism (teste + diff-uri reproducibile) return sorted(tags, key=lambda t: ("ui", "db", "vercel").index(t) if t in ("ui", "db", "vercel") else 99) def force_include_tags(existing_tags: List[str], diff_paths: List[str], has_vercel_json: bool) -> List[str]: """Combinator: existing tags ∪ inferred din diff. Filtrează la VALID_TAGS. Garanție anti-silent-regression: chiar dacă Opus a marcat story=docs, dacă diff atinge .vue/.tsx → ui e forțat. Story=docs care realmente atinge UI ar fi avut silent skip /qa. """ inferred = infer_tags_from_paths(diff_paths, has_vercel_json) combined = [] seen = set() for t in list(existing_tags) + inferred: if t in VALID_TAGS and t not in seen: combined.append(t) seen.add(t) return combined def get_diff_paths(project_dir: Path, ref: str = "HEAD~1") -> List[str]: """Întoarce file paths din `git diff --name-only ` în project_dir. Lista vidă dacă git nu e disponibil sau nu există commit anterior. """ try: result = subprocess.run( ["git", "-C", str(project_dir), "diff", "--name-only", ref], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: # Fallback: diff vs HEAD (uncommitted changes) — util pe primul commit result = subprocess.run( ["git", "-C", str(project_dir), "diff", "--name-only", "HEAD"], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: return [] return [p.strip() for p in result.stdout.splitlines() if p.strip()] except (subprocess.TimeoutExpired, FileNotFoundError): return [] def topological_eligible(stories: List[dict]) -> Optional[dict]: """Întoarce primul story eligibil (DAG-aware) — sau None dacă nimic. Eligibil = !passes ∧ !failed ∧ !blocked ∧ toate `dependsOn` au `passes==True`. Dacă story.dependsOn conține un ID `failed` → story-ul DEVINE blocked (mutat in-place). Sortare: priority asc. """ by_id = {s.get("id"): s for s in stories} # Pas 1: propagă blocked dacă vreun dep e failed changed = True while changed: changed = False for s in stories: if s.get("passes") or s.get("failed") or s.get("blocked"): continue for dep_id in s.get("dependsOn") or []: dep = by_id.get(dep_id) if dep and (dep.get("failed") or dep.get("blocked")): s["blocked"] = True s["failureReason"] = f"blocked_by:{dep_id}" changed = True break # Pas 2: găsește story eligibil cu cea mai mică priority eligible = [] for s in stories: if s.get("passes") or s.get("failed") or s.get("blocked"): continue deps = s.get("dependsOn") or [] if all(by_id.get(d, {}).get("passes") for d in deps): eligible.append(s) if not eligible: return None eligible.sort(key=lambda s: (s.get("priority", 999), s.get("id", ""))) return eligible[0] def _load_prd(prd_path: Path) -> dict: with open(prd_path, encoding="utf-8") as f: return json.load(f) def _save_prd(prd_path: Path, data: dict) -> None: """Atomic write — temp file + rename, evită corruption mid-write.""" tmp = prd_path.with_suffix(".json.tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) tmp.replace(prd_path) def cmd_next_story(prd_path: Path) -> int: data = _load_prd(prd_path) stories = data.get("userStories", []) chosen = topological_eligible(stories) # Salvăm dacă topological_eligible a propagat blocked (mutație in-place) _save_prd(prd_path, data) if not chosen: return 1 print(chosen.get("id", "")) return 0 def cmd_mark_failed(prd_path: Path, story_id: str, reason: str) -> int: data = _load_prd(prd_path) found = False for s in data.get("userStories", []): if s.get("id") == story_id: s["failed"] = True s["passes"] = False s["failureReason"] = reason or "unknown" found = True break if not found: print(f"Story {story_id} not found", file=sys.stderr) return 1 # Propagate blocked la dependenți topological_eligible(data.get("userStories", [])) _save_prd(prd_path, data) print(f"failed: {story_id} ({reason})") return 0 def cmd_incr_retry(prd_path: Path, story_id: str) -> int: data = _load_prd(prd_path) for s in data.get("userStories", []): if s.get("id") == story_id: s["retries"] = int(s.get("retries", 0)) + 1 new = s["retries"] if new >= 3: s["failed"] = True s["failureReason"] = "max_retries" topological_eligible(data.get("userStories", [])) _save_prd(prd_path, data) print(new) return 0 print(f"Story {story_id} not found", file=sys.stderr) return 1 def cmd_force_tags(prd_path: Path, story_id: str, project_dir: Path) -> int: data = _load_prd(prd_path) diff_paths = get_diff_paths(project_dir) has_vercel = (project_dir / "vercel.json").exists() for s in data.get("userStories", []): if s.get("id") == story_id: existing = s.get("tags") or [] forced = force_include_tags(existing, diff_paths, has_vercel) s["tags"] = forced _save_prd(prd_path, data) if not forced: print("EMPTY") # ralph.sh interpretează ca run-all-gates fallback else: for t in forced: print(t) return 0 print(f"Story {story_id} not found", file=sys.stderr) return 1 def cmd_infer_tags(story_id: str, project_dir: Path) -> int: """Variant care NU modifică prd.json — doar print tags inferate din diff curent.""" diff_paths = get_diff_paths(project_dir) has_vercel = (project_dir / "vercel.json").exists() inferred = infer_tags_from_paths(diff_paths, has_vercel) for t in inferred: print(t) return 0 if inferred else 1 def main() -> int: if len(sys.argv) < 2: print(__doc__) return 2 cmd = sys.argv[1] args = sys.argv[2:] try: if cmd == "next-story" and len(args) == 1: return cmd_next_story(Path(args[0])) if cmd == "mark-failed" and len(args) == 3: return cmd_mark_failed(Path(args[0]), args[1], args[2]) if cmd == "incr-retry" and len(args) == 2: return cmd_incr_retry(Path(args[0]), args[1]) if cmd == "force-tags" and len(args) == 3: return cmd_force_tags(Path(args[0]), args[1], Path(args[2])) if cmd == "infer-tags" and len(args) == 2: return cmd_infer_tags(args[0], Path(args[1])) print(f"Unknown command: {cmd}", file=sys.stderr) print(__doc__, file=sys.stderr) return 2 except FileNotFoundError as exc: print(f"File not found: {exc}", file=sys.stderr) return 3 except json.JSONDecodeError as exc: print(f"Invalid JSON: {exc}", file=sys.stderr) return 3 if __name__ == "__main__": sys.exit(main())