"""Unified project endpoints for the new dashboard (Task #4 backend). Replaces the old separate /api/workspace + /api/ralph payloads with a single /api/projects view that merges: - workspace dirs (`~/workspace//`) - approved-tasks.json entries - per-project Ralph PRD (`scripts/ralph/prd.json`) Endpoints exposed via the mixin: GET /api/projects — full list + version GET /api/projects/signature — change-detection hash for SSE GET /api/projects/stream — SSE stream of changes GET /api/projects//plan/state GET /api/projects//plan/transcript POST /api/projects/propose POST /api/projects/approve (If-Match: ) POST /api/projects/unapprove (If-Match: ) POST /api/projects/cancel (If-Match: ) POST /api/projects//plan/start POST /api/projects//plan/respond POST /api/projects//plan/finalize POST /api/projects//plan/cancel POST /api/projects//plan/advance Status derivation (one of these strings — see `_derive_status`): running-ralph — has PID, /proc//cmdline contains ralph.sh running-manual — has PID, alive, but not ralph.sh planning — approved-tasks.json entry has status=planning approved — entry has status=approved (waiting for night-execute) pending — entry has status=pending blocked — prd has stories, all incomplete are blocked failed — prd has stories, last run had failures complete — prd has stories, all passes==True idle — workspace dir exists but no signal """ from __future__ import annotations import hashlib import json import logging import os import re import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path from urllib.parse import parse_qs, urlparse import constants from handlers._validators import parse_json_body, validate_description, validate_slug # Make `from src.x import ...` work — same trick youtube.py uses. if str(constants.BASE_DIR) not in sys.path: sys.path.insert(0, str(constants.BASE_DIR)) log = logging.getLogger(__name__) APPROVED_TASKS_FILE = constants.BASE_DIR / "approved-tasks.json" class _StaleVersion(Exception): """Raised inside a write_approved mutator to abort on If-Match mismatch.""" pass # Module-level signature cache — only re-run `git status --porcelain` when the # repo's `.git/index` mtime advances. Keeps SSE polling cheap. _SIG_CACHE: dict = {"git_mtime": None, "signature": None, "ts": 0.0} # ─── helpers ─────────────────────────────────────────────────────────── def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _read_approved() -> dict: """Read approved-tasks.json under a shared flock.""" try: from src.jsonlock import read_locked # type: ignore return read_locked(str(APPROVED_TASKS_FILE)) except FileNotFoundError: return {"projects": [], "last_updated": None, "version": 0} except (json.JSONDecodeError, OSError) as exc: # pragma: no cover log.error("approved-tasks.json read failed: %s", exc) return {"projects": [], "last_updated": None, "version": 0} def _write_approved(mutator): """Persist approved-tasks.json under an exclusive flock + atomic replace.""" from src.jsonlock import write_locked # type: ignore def _wrap(data: dict) -> dict: if not isinstance(data, dict): data = {} data.setdefault("projects", []) data = mutator(data) data["last_updated"] = _now_iso() return data return write_locked(str(APPROVED_TASKS_FILE), _wrap) def _find_project(data: dict, slug: str) -> dict | None: for p in data.get("projects", []) or []: if (p.get("name") or "").lower() == slug.lower(): return p return None def _read_prd(project_dir: Path) -> dict | None: prd_path = project_dir / "scripts" / "ralph" / "prd.json" if not prd_path.exists(): return None try: return json.loads(prd_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return None def _pid_alive_with_cmdline(pid: int | None) -> tuple[bool, str]: """(alive, cmdline) — cmdline is empty string if not readable.""" if not pid or not isinstance(pid, int): return False, "" try: os.kill(pid, 0) except (ProcessLookupError, PermissionError, OSError): return False, "" cmdline = "" try: with open(f"/proc/{pid}/cmdline", "rb") as f: cmdline = f.read().decode("utf-8", errors="replace").replace("\x00", " ") except (FileNotFoundError, PermissionError, OSError): cmdline = "" return True, cmdline # ─── status derivation ──────────────────────────────────────────────── def _derive_status(slug, approved_entry, workspace_dir, prd_data) -> str: """Compute one of nine UI states (see module docstring).""" # Process signal trumps everything else. pid = (approved_entry or {}).get("pid") alive, cmdline = _pid_alive_with_cmdline(pid) if alive: if "ralph.sh" in cmdline: return "running-ralph" return "running-manual" # Approved-tasks entry status (planning/approved/pending takes precedence # over PRD-derived states because the user explicitly set it). if approved_entry: s = (approved_entry.get("status") or "").lower() if s == "planning": return "planning" if s == "approved": return "approved" if s == "pending": return "pending" if s in ("complete", "completed"): # fall through to prd check — explicit complete wins if prd matches pass if s == "failed": return "failed" if s in ("cancelled", "canceled"): return "idle" # PRD-derived states (only for projects that ran Ralph). stories = (prd_data or {}).get("userStories") or [] if stories: all_pass = all(bool(s.get("passes")) for s in stories) any_pass = any(bool(s.get("passes")) for s in stories) if all_pass: return "complete" # Distinguish blocked vs failed using per-story flags. incomplete = [s for s in stories if not s.get("passes")] if incomplete and all(s.get("blocked") for s in incomplete): return "blocked" if any(s.get("failed") for s in stories): return "failed" if any_pass: # partial progress without active PID — call it idle (re-runnable) return "idle" return "idle" def _stories_progress(prd_data: dict | None) -> tuple[int, int]: stories = (prd_data or {}).get("userStories") or [] total = len(stories) done = sum(1 for s in stories if s.get("passes")) return total, done def _last_iter_mtime(project_dir: Path) -> float | None: logs = project_dir / "scripts" / "ralph" / "logs" if not logs.exists(): return None try: files = list(logs.glob("iteration-*.log")) if not files: return None return max(f.stat().st_mtime for f in files) except OSError: return None def _eta_seconds(prd_data, total, done, last_iter_mtime) -> int | None: """Crude ETA: 12 minutes per remaining story (matches RalphHandlers).""" if total <= 0: return None remaining = total - done if remaining <= 0: return 0 return remaining * 12 * 60 def _last_message_preview(slug: str) -> str: """Return the most recent planning excerpt for any adapter on this slug. The dashboard owns adapter="dashboard" but we also surface chats started via discord/telegram so the unified UI reflects reality. """ try: from src.planning_session import _load_planning_state # type: ignore except Exception: # pragma: no cover — defensive return "" try: state = _load_planning_state() except Exception: return "" best = "" best_ts = "" for _key, entry in state.items(): if (entry.get("slug") or "").lower() != slug.lower(): continue ts = entry.get("updated_at") or entry.get("started_at") or "" excerpt = entry.get("last_text_excerpt") or "" if ts >= best_ts and excerpt: best = excerpt best_ts = ts return best[:200] # ─── planning transcript helpers ────────────────────────────────────── _EXTERNAL_CONTENT_RE = re.compile( r"^\s*\[EXTERNAL CONTENT\]\s*\n(.*)\n\[END EXTERNAL CONTENT\]\s*$", re.DOTALL, ) _COMMAND_ARGS_RE = re.compile(r"(.*?)", re.DOTALL) def _planning_jsonl_path(slug: str, session_id: str) -> Path | None: """Path to the Claude CLI JSONL transcript for a planning session. PlanningSession runs with cwd = `~/workspace//` if it exists, else falls back to the echo-core repo root (see `PlanningSession.cwd`). Claude CLI stores transcripts under `~/.claude/projects//.jsonl`, where encoded-cwd is the absolute path with `/` replaced by `-`. """ if not session_id: return None workspace = constants.WORKSPACE_DIR / slug cwd = workspace if workspace.is_dir() else constants.BASE_DIR encoded = str(cwd).replace("/", "-") return Path.home() / ".claude" / "projects" / encoded / f"{session_id}.jsonl" def _clean_user_text(text: str) -> str: """Strip wrappers Echo adds around user input sent to the planning subprocess.""" m = _EXTERNAL_CONTENT_RE.match(text) if m: text = m.group(1).strip() if "" in text: m = _COMMAND_ARGS_RE.search(text) if m: text = m.group(1).strip() return text def _extract_text_blocks(content) -> str: """Concatenate `text`-type blocks from a Claude `message.content` field.""" if isinstance(content, str): return content if isinstance(content, list): parts = [] for block in content: if isinstance(block, dict) and block.get("type") == "text": t = block.get("text") or "" if t: parts.append(t) return "\n\n".join(parts) return "" def _load_planning_history(slug: str, session_id: str | None) -> list[dict]: """Parse the Claude session JSONL into a `[{role, text}, ...]` chat history. Skips queue / system / tool-result entries. Empty list on any structural problem so callers can fall back to `last_text_excerpt`. """ if not session_id: return [] path = _planning_jsonl_path(slug, session_id) if path is None or not path.exists(): return [] history: list[dict] = [] try: with path.open(encoding="utf-8") as fh: for raw in fh: raw = raw.strip() if not raw: continue try: entry = json.loads(raw) except json.JSONDecodeError: continue etype = entry.get("type") if etype not in ("user", "assistant"): continue if etype == "user" and "toolUseResult" in entry: continue if entry.get("isMeta"): continue msg = entry.get("message") or {} text = _extract_text_blocks(msg.get("content")) if etype == "user": text = _clean_user_text(text) text = (text or "").strip() if not text: continue history.append({"role": etype, "text": text}) except OSError: return [] return history # ─── version helpers ────────────────────────────────────────────────── def _get_version_from(data: dict) -> int: try: return int(data.get("version") or 0) except (TypeError, ValueError): return 0 def _bump_version(data: dict) -> dict: data["version"] = _get_version_from(data) + 1 return data # ─── signature for SSE change detection ─────────────────────────────── def _git_index_mtime() -> float | None: idx = constants.BASE_DIR / ".git" / "index" try: return idx.stat().st_mtime except OSError: return None def _compute_signature() -> str: """Hash of (git porcelain status, workspace count, approved-tasks version). Uses the cached value when `.git/index` mtime hasn't changed. """ now = time.monotonic() git_mtime = _git_index_mtime() cached_mtime = _SIG_CACHE.get("git_mtime") cached_sig = _SIG_CACHE.get("signature") # Always include workspace count + approved version + per-project mtimes # in the signature (cheap stat). Only the git porcelain piece is cached. ws_count = 0 project_marks: list[tuple] = [] if constants.WORKSPACE_DIR.exists(): for entry in sorted(constants.WORKSPACE_DIR.iterdir()): if not entry.is_dir() or entry.name.startswith("."): continue ws_count += 1 prd = entry / "scripts" / "ralph" / "prd.json" pid_file = entry / "scripts" / "ralph" / ".ralph.pid" try: pmt = prd.stat().st_mtime if prd.exists() else 0 except OSError: pmt = 0 try: pid_mt = pid_file.stat().st_mtime if pid_file.exists() else 0 except OSError: pid_mt = 0 project_marks.append((entry.name, pmt, pid_mt)) # Approved version approved = _read_approved() version = _get_version_from(approved) # Git porcelain — cached on .git/index mtime if git_mtime == cached_mtime and cached_sig is not None: git_part = cached_sig else: try: res = subprocess.run( ["git", "status", "--porcelain"], cwd=str(constants.BASE_DIR), capture_output=True, text=True, timeout=5, ) git_part = res.stdout if res.returncode == 0 else "" except (subprocess.TimeoutExpired, FileNotFoundError, OSError): git_part = "" _SIG_CACHE["git_mtime"] = git_mtime _SIG_CACHE["signature"] = git_part _SIG_CACHE["ts"] = now h = hashlib.sha256() h.update(git_part.encode("utf-8", errors="replace")) h.update(str(ws_count).encode()) h.update(str(version).encode()) for name, pmt, pid_mt in project_marks: h.update(name.encode("utf-8")) h.update(f"{pmt:.3f}|{pid_mt:.3f}".encode()) return h.hexdigest() # ─── projects mixin ─────────────────────────────────────────────────── class ProjectsHandlers: """Mixin: /api/projects/* endpoints — unified workspace + Ralph + planning.""" # ── helpers (instance) ───────────────────────────────────────── def _get_version(self) -> int: return _get_version_from(_read_approved()) def _increment_version(self, data: dict) -> dict: return _bump_version(data) def _derive_status(self, slug, approved_entry, workspace_entry, prd_data) -> str: return _derive_status(slug, approved_entry, workspace_entry, prd_data) def _build_unified_payload(self) -> dict: approved = _read_approved() approved_entries: dict[str, dict] = { (p.get("name") or "").lower(): p for p in approved.get("projects", []) or [] } # Walk workspace dirs first so we capture projects even without an # approved-tasks entry. slugs_seen: set[str] = set() result: list[dict] = [] if constants.WORKSPACE_DIR.exists(): for entry in sorted(constants.WORKSPACE_DIR.iterdir()): if not entry.is_dir() or entry.name.startswith("."): continue slug = entry.name slugs_seen.add(slug.lower()) ae = approved_entries.get(slug.lower()) prd = _read_prd(entry) result.append(self._project_record(slug, entry, ae, prd)) # Then projects that exist only in approved-tasks (no workspace yet). for ae in approved.get("projects", []) or []: slug = ae.get("name") or "" if not slug or slug.lower() in slugs_seen: continue result.append(self._project_record(slug, None, ae, None)) return { "projects": sorted(result, key=lambda p: p.get("slug") or ""), "version": _get_version_from(approved), "fetchedAt": _now_iso(), "count": len(result), } def _project_record(self, slug, workspace_dir, approved_entry, prd_data) -> dict: status = _derive_status(slug, approved_entry, workspace_dir, prd_data) total, done = _stories_progress(prd_data) last_iter = _last_iter_mtime(workspace_dir) if workspace_dir else None eta = _eta_seconds(prd_data, total, done, last_iter) ae = approved_entry or {} return { "slug": slug, "status": status, "name": slug, "description": ae.get("description") or "", "proposed_at": ae.get("proposed_at"), "approved_at": ae.get("approved_at"), "started_at": ae.get("started_at"), "pid": ae.get("pid"), "stories_total": total, "stories_done": done, "eta_seconds": eta, "last_message_preview": _last_message_preview(slug), "error_reason": ae.get("error") or "", "has_workspace": workspace_dir is not None, "has_prd": prd_data is not None, } # ── GET /api/projects ────────────────────────────────────────── def handle_unified_status(self): try: self.send_json(self._build_unified_payload()) except Exception as exc: log.exception("handle_unified_status failed") self.send_json({"error": str(exc)}, 500) # ── GET /api/projects/signature ──────────────────────────────── def handle_unified_signature(self): try: sig = _compute_signature() approved = _read_approved() self.send_json({ "signature": sig, "version": _get_version_from(approved), "fetchedAt": _now_iso(), }) except Exception as exc: log.exception("handle_unified_signature failed") self.send_json({"error": str(exc)}, 500) # ── GET /api/projects/stream (SSE) ──────────────────────────── def handle_projects_stream(self): """Stream `data: \\n\\n` whenever the signature changes. Explicit cookie check (the global POST middleware doesn't cover GET). Keep-alive ping every 15s. Exits cleanly on BrokenPipe. """ # Auth: require cookie even though this is a GET. if hasattr(self, "_check_dashboard_cookie") and not self._check_dashboard_cookie(): body = b'{"error":"Unauthorized"}' try: self.send_response(401) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except (BrokenPipeError, ConnectionResetError): pass return # CSRF/Origin allowlist (skipped if no Origin/Referer — curl etc). origin = self.headers.get("Origin", "") or "" referer = self.headers.get("Referer", "") or "" allowed = ["http://127.0.0.1:8088", "http://localhost:8088"] dh = os.environ.get("DASHBOARD_HOST", "").strip() if dh: allowed.append(dh) if origin or referer: check = origin or referer if not any(check.startswith(a) for a in allowed): body = b'{"error":"CSRF"}' try: self.send_response(403) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except (BrokenPipeError, ConnectionResetError): pass return try: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("X-Accel-Buffering", "no") self.send_header("Connection", "keep-alive") self.end_headers() except (BrokenPipeError, ConnectionResetError): return last_sig: str | None = None last_ping = time.monotonic() # Initial snapshot try: payload = self._build_unified_payload() last_sig = _compute_signature() self.wfile.write(b"data: " + json.dumps(payload).encode("utf-8") + b"\n\n") self.wfile.flush() except (BrokenPipeError, ConnectionResetError): return except Exception as exc: try: err = json.dumps({"error": str(exc)}).encode("utf-8") self.wfile.write(b"data: " + err + b"\n\n") self.wfile.flush() except Exception: pass return while True: try: time.sleep(2) sig = _compute_signature() now = time.monotonic() if sig != last_sig: payload = self._build_unified_payload() body = json.dumps(payload).encode("utf-8") self.wfile.write(b"data: " + body + b"\n\n") self.wfile.flush() last_sig = sig last_ping = now elif now - last_ping >= 15: self.wfile.write(b'data: {"type":"ping"}\n\n') self.wfile.flush() last_ping = now except (BrokenPipeError, ConnectionResetError): return except Exception: # Best-effort: one bad iter shouldn't kill the stream. continue # ── POST /api/projects/propose ───────────────────────────────── def handle_propose(self): body = parse_json_body(self) if body is None: return # parse_json_body already sent 400 slug = (body.get("slug") or "").strip() description = (body.get("description") or "").strip() slug_err = validate_slug(slug) desc_err = validate_description(description) if slug_err or desc_err: errors: dict[str, str] = {} if slug_err: errors["slug"] = slug_err if desc_err: errors["description"] = desc_err self.send_json({"error": "validation_failed", "fields": errors}, 400) return existing = _find_project(_read_approved(), slug) if existing: self.send_json( {"error": "duplicate_slug", "slug": slug, "status": existing.get("status")}, 409, ) return def _add(data: dict) -> dict: # Re-check inside the lock — guard against races. if _find_project(data, slug) is not None: raise FileExistsError(slug) data["projects"].append({ "name": slug, "description": description, "status": "pending", "planning_session_id": None, "final_plan_path": None, "proposed_at": _now_iso(), "approved_at": None, "started_at": None, "pid": None, }) _bump_version(data) return data try: _write_approved(_add) except FileExistsError: self.send_json({"error": "duplicate_slug", "slug": slug}, 409) return except Exception as exc: log.exception("propose failed") self.send_json({"error": str(exc)}, 500) return self.send_json({"slug": slug, "status": "pending"}, 201) # ── helpers for approve/unapprove/cancel ────────────────────── def _read_if_match(self) -> int | None: raw = self.headers.get("If-Match", "") or self.headers.get("if-match", "") if not raw: return None raw = raw.strip().strip('"') try: return int(raw) except (TypeError, ValueError): return None def _set_status_locked(self, slug: str, target_status: str, **extra) -> tuple[bool, dict]: """Mutate approved-tasks: set status + extra fields. Bumps version. Returns (ok, project_entry_or_error_dict). """ client_ver = self._read_if_match() captured: dict = {} def _mut(data: dict) -> dict: current_ver = _get_version_from(data) if client_ver is not None and current_ver != client_ver: captured["error"] = "stale" captured["current_version"] = current_ver raise _StaleVersion() proj = _find_project(data, slug) if proj is None: captured["error"] = "not_found" raise KeyError(slug) proj["status"] = target_status for k, v in extra.items(): if v is None and k in proj: proj[k] = None else: proj[k] = v captured["project"] = proj _bump_version(data) return data try: _write_approved(_mut) except _StaleVersion: return False, { "error": "stale", "current_version": captured.get("current_version", 0), } except KeyError: return False, {"error": "not_found", "slug": slug} except Exception as exc: log.exception("status mutate failed") return False, {"error": str(exc)} return True, captured.get("project", {}) # ── POST /api/projects/approve ───────────────────────────────── def handle_approve(self): body = parse_json_body(self) if body is None: return slug = (body.get("slug") or "").strip() if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return ok, payload = self._set_status_locked(slug, "approved", approved_at=_now_iso()) if not ok: code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500 self.send_json(payload, code) return self.send_json({"ok": True, "slug": slug, "status": "approved"}) # ── POST /api/projects/unapprove ────────────────────────────── def handle_unapprove(self): body = parse_json_body(self) if body is None: return slug = (body.get("slug") or "").strip() if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return ok, payload = self._set_status_locked(slug, "pending", approved_at=None) if not ok: code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500 self.send_json(payload, code) return self.send_json({"ok": True, "slug": slug, "status": "pending"}) # ── POST /api/projects/cancel ───────────────────────────────── def handle_cancel(self): """Cancel a project. Sets status=cancelled (matches router behaviour).""" body = parse_json_body(self) if body is None: return slug = (body.get("slug") or "").strip() if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return ok, payload = self._set_status_locked(slug, "cancelled") if not ok: code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500 self.send_json(payload, code) return self.send_json({"ok": True, "slug": slug, "status": "cancelled"}) # ── planning state helpers ──────────────────────────────────── @staticmethod def _planning_key(slug: str) -> tuple[str, str]: """Dashboard planning sessions live in adapter='dashboard', channel=slug.""" return ("dashboard", slug) @staticmethod def _resolve_planning_key(slug: str) -> tuple[str, str]: """Find the active session's (adapter, channel) for slug, regardless of where it was started. Falls back to ('dashboard', slug) if nothing matches — preserving prior behavior for the no-session case. Picks the most-recently-updated entry when multiple exist. """ try: from src.planning_session import _load_planning_state # type: ignore except Exception: return ("dashboard", slug) try: all_state = _load_planning_state() except Exception: return ("dashboard", slug) matches = [ entry for entry in all_state.values() if (entry.get("slug") or "").lower() == slug.lower() ] if not matches: return ("dashboard", slug) matches.sort(key=lambda e: e.get("updated_at") or "", reverse=True) best = matches[0] adapter = best.get("adapter") or "dashboard" channel = best.get("channel_id") or slug return (adapter, channel) # ── POST /api/projects//plan/start ────────────────────── def handle_plan_start(self, slug: str): if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return body = parse_json_body(self) if body is None: return description = (body.get("description") or "").strip() # Description fallback: use the existing approved-tasks description so # users can re-open planning on a project they already proposed. if not description: existing = _find_project(_read_approved(), slug) if existing: description = (existing.get("description") or "").strip() desc_err = validate_description(description) if description else "description required" if desc_err: self.send_json({"error": "invalid_description", "message": desc_err}, 400) return # Set status=planning + ensure entry exists. def _mut(data: dict) -> dict: proj = _find_project(data, slug) if proj is None: proj = { "name": slug, "description": description, "status": "planning", "planning_session_id": None, "final_plan_path": None, "proposed_at": _now_iso(), "approved_at": None, "started_at": None, "pid": None, } data["projects"].append(proj) else: proj["status"] = "planning" if not proj.get("description"): proj["description"] = description _bump_version(data) return data try: _write_approved(_mut) except Exception as exc: log.exception("plan/start mutate failed") self.send_json({"error": str(exc)}, 500) return adapter, channel = self._planning_key(slug) try: from src.planning_orchestrator import PlanningOrchestrator # type: ignore session, first_text = PlanningOrchestrator.start( slug=slug, description=description, channel_id=channel, adapter=adapter, ) except Exception as exc: log.exception("PlanningOrchestrator.start failed for %s", slug) # Roll status back so the dashboard reflects reality. try: self._set_status_locked(slug, "pending") except Exception: pass self.send_json({"error": "planning_start_failed", "message": str(exc)}, 500) return # Stash planning_session_id on the entry. def _stash(data: dict) -> dict: proj = _find_project(data, slug) if proj is not None: proj["planning_session_id"] = session.planning_session_id return data try: _write_approved(_stash) except Exception: pass self.send_json({ "ok": True, "slug": slug, "session_id": session.planning_session_id, "phase": session.phase, "message": first_text, }) # ── POST /api/projects//plan/respond ──────────────────── def handle_plan_respond(self, slug: str): if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return body = parse_json_body(self) if body is None: return message = (body.get("message") or "").strip() if not message: self.send_json({"error": "message required"}, 400) return adapter, channel = self._resolve_planning_key(slug) try: from src.planning_orchestrator import PlanningOrchestrator # type: ignore session, text, phase_ready = PlanningOrchestrator.respond( adapter=adapter, channel_id=channel, message=message, ) except Exception as exc: log.exception("plan/respond failed for %s", slug) self.send_json({"error": str(exc)}, 500) return if session is None: self.send_json( {"error": "no_active_session", "message": "Nu există o sesiune activă pentru acest slug."}, 404, ) return self.send_json({ "ok": True, "slug": slug, "phase": session.phase, "phase_ready": phase_ready, "message": text, }) # ── GET /api/projects//plan/state ─────────────────────── def handle_plan_state(self, slug: str): if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return adapter, channel = self._planning_key(slug) try: from src.planning_session import get_planning_state # type: ignore except Exception as exc: self.send_json({"error": str(exc)}, 500) return state = get_planning_state(adapter, channel) # Also try cross-adapter fallback so the dashboard surfaces sessions # started via discord/telegram. if state is None: try: from src.planning_session import _load_planning_state # type: ignore all_state = _load_planning_state() for _k, entry in all_state.items(): if (entry.get("slug") or "").lower() == slug.lower(): state = entry break except Exception: pass if state is None: self.send_json({"slug": slug, "status": "expired", "phase": None, "pending_response": False}) return # Heuristic: phase=='__complete__' means the pipeline finished. phase = state.get("phase") completed = phase == "__complete__" self.send_json({ "slug": slug, "status": "complete" if completed else "active", "phase": phase, "phases_completed": state.get("phases_completed") or [], "phases_planned": state.get("phases_planned") or [], "pending_response": False, "since": state.get("started_at"), "updated_at": state.get("updated_at"), "session_id": state.get("session_id"), "planning_session_id": state.get("planning_session_id"), "last_text_excerpt": state.get("last_text_excerpt") or "", "final_plan_path": state.get("final_plan_path"), }) # ── GET /api/projects//plan/transcript ────────────────── def handle_plan_transcript(self, slug: str): """Return planning transcript. The planning subprocess persists only the latest excerpt + metadata in `sessions/planning.json` (full transcripts live inside Claude CLI session state). We return what's available, plus the final-plan markdown if it has been written. """ if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return adapter, channel = self._planning_key(slug) try: from src.planning_session import get_planning_state, _load_planning_state # type: ignore except Exception as exc: self.send_json({"error": str(exc)}, 500) return state = get_planning_state(adapter, channel) if state is None: try: all_state = _load_planning_state() for _k, entry in all_state.items(): if (entry.get("slug") or "").lower() == slug.lower(): state = entry break except Exception: pass final_plan_text = "" final_plan_path = None try: fp = constants.WORKSPACE_DIR / slug / "scripts" / "ralph" / "final-plan.md" if fp.exists(): final_plan_path = str(fp) final_plan_text = fp.read_text(encoding="utf-8", errors="replace") except OSError: pass session_id = (state or {}).get("session_id") or "" history = _load_planning_history(slug, session_id) self.send_json({ "slug": slug, "phase": (state or {}).get("phase"), "phases_completed": (state or {}).get("phases_completed") or [], "last_text_excerpt": (state or {}).get("last_text_excerpt") or "", "final_plan_path": final_plan_path, "final_plan": final_plan_text, "history": history, }) # ── POST /api/projects//plan/finalize ─────────────────── def handle_plan_finalize(self, slug: str): if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return adapter, channel = self._resolve_planning_key(slug) try: from src.planning_session import get_planning_state, clear_planning_state # type: ignore from src.planning_orchestrator import PlanningOrchestrator # type: ignore except Exception as exc: self.send_json({"error": str(exc)}, 500) return state = get_planning_state(adapter, channel) final_plan = None if state and state.get("final_plan_path"): final_plan = state["final_plan_path"] else: final_plan = str(PlanningOrchestrator.final_plan_path(slug)) def _mut(data: dict) -> dict: proj = _find_project(data, slug) if proj is None: raise KeyError(slug) proj["status"] = "approved" proj["approved_at"] = _now_iso() proj["planning_session_id"] = None proj["final_plan_path"] = final_plan _bump_version(data) return data try: _write_approved(_mut) except KeyError: self.send_json({"error": "not_found", "slug": slug}, 404) return except Exception as exc: log.exception("plan/finalize mutate failed") self.send_json({"error": str(exc)}, 500) return clear_planning_state(adapter, channel) self.send_json({ "ok": True, "slug": slug, "status": "approved", "final_plan_path": final_plan, }) # ── POST /api/projects//plan/cancel ───────────────────── def handle_plan_cancel_planning(self, slug: str): if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return adapter, channel = self._resolve_planning_key(slug) try: from src.planning_orchestrator import PlanningOrchestrator # type: ignore except Exception as exc: self.send_json({"error": str(exc)}, 500) return cleared = PlanningOrchestrator.cancel(adapter, channel) # Revert status to pending if currently planning. def _mut(data: dict) -> dict: proj = _find_project(data, slug) if proj is not None and (proj.get("status") or "").lower() == "planning": proj["status"] = "pending" proj["planning_session_id"] = None _bump_version(data) return data try: _write_approved(_mut) except Exception: pass self.send_json({"ok": True, "slug": slug, "cleared": cleared, "status": "pending"}) # ── POST /api/projects//plan/advance ──────────────────── def handle_plan_advance(self, slug: str): if validate_slug(slug): self.send_json({"error": "invalid_slug"}, 400) return adapter, channel = self._resolve_planning_key(slug) try: from src.planning_orchestrator import PlanningOrchestrator # type: ignore session, text, completed = PlanningOrchestrator.advance(adapter, channel) except Exception as exc: log.exception("plan/advance failed for %s", slug) self.send_json({"error": str(exc)}, 500) return if session is None: self.send_json({"error": "no_active_session", "message": text}, 404) return self.send_json({ "ok": True, "slug": slug, "phase": session.phase, "completed": completed, "message": text, })