Files
echo-core/dashboard/handlers/projects.py
Marius Mutu 8594f98bff fix(dashboard): resolve planning 404 for sessions started outside dashboard
_resolve_planning_key searches all active sessions by slug regardless of
adapter, so respond/finalize/cancel/advance work even when planning was
initiated from Discord or Telegram.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-28 10:57:34 +00:00

1044 lines
40 KiB
Python

"""Unified project endpoints for the new dashboard (Task #4 backend).
Replaces the old separate /api/workspace + /api/ralph payloads with a single
/api/projects view that merges:
- workspace dirs (`~/workspace/<slug>/`)
- approved-tasks.json entries
- per-project Ralph PRD (`scripts/ralph/prd.json`)
Endpoints exposed via the mixin:
GET /api/projects — full list + version
GET /api/projects/signature — change-detection hash for SSE
GET /api/projects/stream — SSE stream of changes
GET /api/projects/<slug>/plan/state
GET /api/projects/<slug>/plan/transcript
POST /api/projects/propose
POST /api/projects/approve (If-Match: <version>)
POST /api/projects/unapprove (If-Match: <version>)
POST /api/projects/cancel (If-Match: <version>)
POST /api/projects/<slug>/plan/start
POST /api/projects/<slug>/plan/respond
POST /api/projects/<slug>/plan/finalize
POST /api/projects/<slug>/plan/cancel
POST /api/projects/<slug>/plan/advance
Status derivation (one of these strings — see `_derive_status`):
running-ralph — has PID, /proc/<pid>/cmdline contains ralph.sh
running-manual — has PID, alive, but not ralph.sh
planning — approved-tasks.json entry has status=planning
approved — entry has status=approved (waiting for night-execute)
pending — entry has status=pending
blocked — prd has stories, all incomplete are blocked
failed — prd has stories, last run had failures
complete — prd has stories, all passes==True
idle — workspace dir exists but no signal
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import parse_qs, urlparse
import constants
from handlers._validators import parse_json_body, validate_description, validate_slug
# Make `from src.x import ...` work — same trick youtube.py uses.
if str(constants.BASE_DIR) not in sys.path:
sys.path.insert(0, str(constants.BASE_DIR))
log = logging.getLogger(__name__)
APPROVED_TASKS_FILE = constants.BASE_DIR / "approved-tasks.json"
class _StaleVersion(Exception):
"""Raised inside a write_approved mutator to abort on If-Match mismatch."""
pass
# Module-level signature cache — only re-run `git status --porcelain` when the
# repo's `.git/index` mtime advances. Keeps SSE polling cheap.
_SIG_CACHE: dict = {"git_mtime": None, "signature": None, "ts": 0.0}
# ─── helpers ───────────────────────────────────────────────────────────
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _read_approved() -> dict:
"""Read approved-tasks.json under a shared flock."""
try:
from src.jsonlock import read_locked # type: ignore
return read_locked(str(APPROVED_TASKS_FILE))
except FileNotFoundError:
return {"projects": [], "last_updated": None, "version": 0}
except (json.JSONDecodeError, OSError) as exc: # pragma: no cover
log.error("approved-tasks.json read failed: %s", exc)
return {"projects": [], "last_updated": None, "version": 0}
def _write_approved(mutator):
"""Persist approved-tasks.json under an exclusive flock + atomic replace."""
from src.jsonlock import write_locked # type: ignore
def _wrap(data: dict) -> dict:
if not isinstance(data, dict):
data = {}
data.setdefault("projects", [])
data = mutator(data)
data["last_updated"] = _now_iso()
return data
return write_locked(str(APPROVED_TASKS_FILE), _wrap)
def _find_project(data: dict, slug: str) -> dict | None:
for p in data.get("projects", []) or []:
if (p.get("name") or "").lower() == slug.lower():
return p
return None
def _read_prd(project_dir: Path) -> dict | None:
prd_path = project_dir / "scripts" / "ralph" / "prd.json"
if not prd_path.exists():
return None
try:
return json.loads(prd_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
def _pid_alive_with_cmdline(pid: int | None) -> tuple[bool, str]:
"""(alive, cmdline) — cmdline is empty string if not readable."""
if not pid or not isinstance(pid, int):
return False, ""
try:
os.kill(pid, 0)
except (ProcessLookupError, PermissionError, OSError):
return False, ""
cmdline = ""
try:
with open(f"/proc/{pid}/cmdline", "rb") as f:
cmdline = f.read().decode("utf-8", errors="replace").replace("\x00", " ")
except (FileNotFoundError, PermissionError, OSError):
cmdline = ""
return True, cmdline
# ─── status derivation ────────────────────────────────────────────────
def _derive_status(slug, approved_entry, workspace_dir, prd_data) -> str:
"""Compute one of nine UI states (see module docstring)."""
# Process signal trumps everything else.
pid = (approved_entry or {}).get("pid")
alive, cmdline = _pid_alive_with_cmdline(pid)
if alive:
if "ralph.sh" in cmdline:
return "running-ralph"
return "running-manual"
# Approved-tasks entry status (planning/approved/pending takes precedence
# over PRD-derived states because the user explicitly set it).
if approved_entry:
s = (approved_entry.get("status") or "").lower()
if s == "planning":
return "planning"
if s == "approved":
return "approved"
if s == "pending":
return "pending"
if s in ("complete", "completed"):
# fall through to prd check — explicit complete wins if prd matches
pass
if s == "failed":
return "failed"
if s in ("cancelled", "canceled"):
return "idle"
# PRD-derived states (only for projects that ran Ralph).
stories = (prd_data or {}).get("userStories") or []
if stories:
all_pass = all(bool(s.get("passes")) for s in stories)
any_pass = any(bool(s.get("passes")) for s in stories)
if all_pass:
return "complete"
# Distinguish blocked vs failed using per-story flags.
incomplete = [s for s in stories if not s.get("passes")]
if incomplete and all(s.get("blocked") for s in incomplete):
return "blocked"
if any(s.get("failed") for s in stories):
return "failed"
if any_pass:
# partial progress without active PID — call it idle (re-runnable)
return "idle"
return "idle"
def _stories_progress(prd_data: dict | None) -> tuple[int, int]:
stories = (prd_data or {}).get("userStories") or []
total = len(stories)
done = sum(1 for s in stories if s.get("passes"))
return total, done
def _last_iter_mtime(project_dir: Path) -> float | None:
logs = project_dir / "scripts" / "ralph" / "logs"
if not logs.exists():
return None
try:
files = list(logs.glob("iteration-*.log"))
if not files:
return None
return max(f.stat().st_mtime for f in files)
except OSError:
return None
def _eta_seconds(prd_data, total, done, last_iter_mtime) -> int | None:
"""Crude ETA: 12 minutes per remaining story (matches RalphHandlers)."""
if total <= 0:
return None
remaining = total - done
if remaining <= 0:
return 0
return remaining * 12 * 60
def _last_message_preview(slug: str) -> str:
"""Return the most recent planning excerpt for any adapter on this slug.
The dashboard owns adapter="dashboard" but we also surface chats started
via discord/telegram so the unified UI reflects reality.
"""
try:
from src.planning_session import _load_planning_state # type: ignore
except Exception: # pragma: no cover — defensive
return ""
try:
state = _load_planning_state()
except Exception:
return ""
best = ""
best_ts = ""
for _key, entry in state.items():
if (entry.get("slug") or "").lower() != slug.lower():
continue
ts = entry.get("updated_at") or entry.get("started_at") or ""
excerpt = entry.get("last_text_excerpt") or ""
if ts >= best_ts and excerpt:
best = excerpt
best_ts = ts
return best[:200]
# ─── version helpers ──────────────────────────────────────────────────
def _get_version_from(data: dict) -> int:
try:
return int(data.get("version") or 0)
except (TypeError, ValueError):
return 0
def _bump_version(data: dict) -> dict:
data["version"] = _get_version_from(data) + 1
return data
# ─── signature for SSE change detection ───────────────────────────────
def _git_index_mtime() -> float | None:
idx = constants.BASE_DIR / ".git" / "index"
try:
return idx.stat().st_mtime
except OSError:
return None
def _compute_signature() -> str:
"""Hash of (git porcelain status, workspace count, approved-tasks version).
Uses the cached value when `.git/index` mtime hasn't changed.
"""
now = time.monotonic()
git_mtime = _git_index_mtime()
cached_mtime = _SIG_CACHE.get("git_mtime")
cached_sig = _SIG_CACHE.get("signature")
# Always include workspace count + approved version + per-project mtimes
# in the signature (cheap stat). Only the git porcelain piece is cached.
ws_count = 0
project_marks: list[tuple] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
ws_count += 1
prd = entry / "scripts" / "ralph" / "prd.json"
pid_file = entry / "scripts" / "ralph" / ".ralph.pid"
try:
pmt = prd.stat().st_mtime if prd.exists() else 0
except OSError:
pmt = 0
try:
pid_mt = pid_file.stat().st_mtime if pid_file.exists() else 0
except OSError:
pid_mt = 0
project_marks.append((entry.name, pmt, pid_mt))
# Approved version
approved = _read_approved()
version = _get_version_from(approved)
# Git porcelain — cached on .git/index mtime
if git_mtime == cached_mtime and cached_sig is not None:
git_part = cached_sig
else:
try:
res = subprocess.run(
["git", "status", "--porcelain"],
cwd=str(constants.BASE_DIR),
capture_output=True, text=True, timeout=5,
)
git_part = res.stdout if res.returncode == 0 else ""
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
git_part = ""
_SIG_CACHE["git_mtime"] = git_mtime
_SIG_CACHE["signature"] = git_part
_SIG_CACHE["ts"] = now
h = hashlib.sha256()
h.update(git_part.encode("utf-8", errors="replace"))
h.update(str(ws_count).encode())
h.update(str(version).encode())
for name, pmt, pid_mt in project_marks:
h.update(name.encode("utf-8"))
h.update(f"{pmt:.3f}|{pid_mt:.3f}".encode())
return h.hexdigest()
# ─── projects mixin ───────────────────────────────────────────────────
class ProjectsHandlers:
"""Mixin: /api/projects/* endpoints — unified workspace + Ralph + planning."""
# ── helpers (instance) ─────────────────────────────────────────
def _get_version(self) -> int:
return _get_version_from(_read_approved())
def _increment_version(self, data: dict) -> dict:
return _bump_version(data)
def _derive_status(self, slug, approved_entry, workspace_entry, prd_data) -> str:
return _derive_status(slug, approved_entry, workspace_entry, prd_data)
def _build_unified_payload(self) -> dict:
approved = _read_approved()
approved_entries: dict[str, dict] = {
(p.get("name") or "").lower(): p for p in approved.get("projects", []) or []
}
# Walk workspace dirs first so we capture projects even without an
# approved-tasks entry.
slugs_seen: set[str] = set()
result: list[dict] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
slug = entry.name
slugs_seen.add(slug.lower())
ae = approved_entries.get(slug.lower())
prd = _read_prd(entry)
result.append(self._project_record(slug, entry, ae, prd))
# Then projects that exist only in approved-tasks (no workspace yet).
for ae in approved.get("projects", []) or []:
slug = ae.get("name") or ""
if not slug or slug.lower() in slugs_seen:
continue
result.append(self._project_record(slug, None, ae, None))
return {
"projects": sorted(result, key=lambda p: p.get("slug") or ""),
"version": _get_version_from(approved),
"fetchedAt": _now_iso(),
"count": len(result),
}
def _project_record(self, slug, workspace_dir, approved_entry, prd_data) -> dict:
status = _derive_status(slug, approved_entry, workspace_dir, prd_data)
total, done = _stories_progress(prd_data)
last_iter = _last_iter_mtime(workspace_dir) if workspace_dir else None
eta = _eta_seconds(prd_data, total, done, last_iter)
ae = approved_entry or {}
return {
"slug": slug,
"status": status,
"name": slug,
"description": ae.get("description") or "",
"proposed_at": ae.get("proposed_at"),
"approved_at": ae.get("approved_at"),
"started_at": ae.get("started_at"),
"pid": ae.get("pid"),
"stories_total": total,
"stories_done": done,
"eta_seconds": eta,
"last_message_preview": _last_message_preview(slug),
"error_reason": ae.get("error") or "",
"has_workspace": workspace_dir is not None,
"has_prd": prd_data is not None,
}
# ── GET /api/projects ──────────────────────────────────────────
def handle_unified_status(self):
try:
self.send_json(self._build_unified_payload())
except Exception as exc:
log.exception("handle_unified_status failed")
self.send_json({"error": str(exc)}, 500)
# ── GET /api/projects/signature ────────────────────────────────
def handle_unified_signature(self):
try:
sig = _compute_signature()
approved = _read_approved()
self.send_json({
"signature": sig,
"version": _get_version_from(approved),
"fetchedAt": _now_iso(),
})
except Exception as exc:
log.exception("handle_unified_signature failed")
self.send_json({"error": str(exc)}, 500)
# ── GET /api/projects/stream (SSE) ────────────────────────────
def handle_projects_stream(self):
"""Stream `data: <payload>\\n\\n` whenever the signature changes.
Explicit cookie check (the global POST middleware doesn't cover GET).
Keep-alive ping every 15s. Exits cleanly on BrokenPipe.
"""
# Auth: require cookie even though this is a GET.
if hasattr(self, "_check_dashboard_cookie") and not self._check_dashboard_cookie():
body = b'{"error":"Unauthorized"}'
try:
self.send_response(401)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError):
pass
return
# CSRF/Origin allowlist (skipped if no Origin/Referer — curl etc).
origin = self.headers.get("Origin", "") or ""
referer = self.headers.get("Referer", "") or ""
allowed = ["http://127.0.0.1:8088", "http://localhost:8088"]
dh = os.environ.get("DASHBOARD_HOST", "").strip()
if dh:
allowed.append(dh)
if origin or referer:
check = origin or referer
if not any(check.startswith(a) for a in allowed):
body = b'{"error":"CSRF"}'
try:
self.send_response(403)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError):
pass
return
try:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("X-Accel-Buffering", "no")
self.send_header("Connection", "keep-alive")
self.end_headers()
except (BrokenPipeError, ConnectionResetError):
return
last_sig: str | None = None
last_ping = time.monotonic()
# Initial snapshot
try:
payload = self._build_unified_payload()
last_sig = _compute_signature()
self.wfile.write(b"data: " + json.dumps(payload).encode("utf-8") + b"\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
return
except Exception as exc:
try:
err = json.dumps({"error": str(exc)}).encode("utf-8")
self.wfile.write(b"data: " + err + b"\n\n")
self.wfile.flush()
except Exception:
pass
return
while True:
try:
time.sleep(2)
sig = _compute_signature()
now = time.monotonic()
if sig != last_sig:
payload = self._build_unified_payload()
body = json.dumps(payload).encode("utf-8")
self.wfile.write(b"data: " + body + b"\n\n")
self.wfile.flush()
last_sig = sig
last_ping = now
elif now - last_ping >= 15:
self.wfile.write(b'data: {"type":"ping"}\n\n')
self.wfile.flush()
last_ping = now
except (BrokenPipeError, ConnectionResetError):
return
except Exception:
# Best-effort: one bad iter shouldn't kill the stream.
continue
# ── POST /api/projects/propose ─────────────────────────────────
def handle_propose(self):
body = parse_json_body(self)
if body is None:
return # parse_json_body already sent 400
slug = (body.get("slug") or "").strip()
description = (body.get("description") or "").strip()
slug_err = validate_slug(slug)
desc_err = validate_description(description)
if slug_err or desc_err:
errors: dict[str, str] = {}
if slug_err:
errors["slug"] = slug_err
if desc_err:
errors["description"] = desc_err
self.send_json({"error": "validation_failed", "fields": errors}, 400)
return
existing = _find_project(_read_approved(), slug)
if existing:
self.send_json(
{"error": "duplicate_slug", "slug": slug, "status": existing.get("status")},
409,
)
return
def _add(data: dict) -> dict:
# Re-check inside the lock — guard against races.
if _find_project(data, slug) is not None:
raise FileExistsError(slug)
data["projects"].append({
"name": slug,
"description": description,
"status": "pending",
"planning_session_id": None,
"final_plan_path": None,
"proposed_at": _now_iso(),
"approved_at": None,
"started_at": None,
"pid": None,
})
_bump_version(data)
return data
try:
_write_approved(_add)
except FileExistsError:
self.send_json({"error": "duplicate_slug", "slug": slug}, 409)
return
except Exception as exc:
log.exception("propose failed")
self.send_json({"error": str(exc)}, 500)
return
self.send_json({"slug": slug, "status": "pending"}, 201)
# ── helpers for approve/unapprove/cancel ──────────────────────
def _read_if_match(self) -> int | None:
raw = self.headers.get("If-Match", "") or self.headers.get("if-match", "")
if not raw:
return None
raw = raw.strip().strip('"')
try:
return int(raw)
except (TypeError, ValueError):
return None
def _set_status_locked(self, slug: str, target_status: str, **extra) -> tuple[bool, dict]:
"""Mutate approved-tasks: set status + extra fields. Bumps version.
Returns (ok, project_entry_or_error_dict).
"""
client_ver = self._read_if_match()
captured: dict = {}
def _mut(data: dict) -> dict:
current_ver = _get_version_from(data)
if client_ver is not None and current_ver != client_ver:
captured["error"] = "stale"
captured["current_version"] = current_ver
raise _StaleVersion()
proj = _find_project(data, slug)
if proj is None:
captured["error"] = "not_found"
raise KeyError(slug)
proj["status"] = target_status
for k, v in extra.items():
if v is None and k in proj:
proj[k] = None
else:
proj[k] = v
captured["project"] = proj
_bump_version(data)
return data
try:
_write_approved(_mut)
except _StaleVersion:
return False, {
"error": "stale",
"current_version": captured.get("current_version", 0),
}
except KeyError:
return False, {"error": "not_found", "slug": slug}
except Exception as exc:
log.exception("status mutate failed")
return False, {"error": str(exc)}
return True, captured.get("project", {})
# ── POST /api/projects/approve ─────────────────────────────────
def handle_approve(self):
body = parse_json_body(self)
if body is None:
return
slug = (body.get("slug") or "").strip()
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
ok, payload = self._set_status_locked(slug, "approved", approved_at=_now_iso())
if not ok:
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
self.send_json(payload, code)
return
self.send_json({"ok": True, "slug": slug, "status": "approved"})
# ── POST /api/projects/unapprove ──────────────────────────────
def handle_unapprove(self):
body = parse_json_body(self)
if body is None:
return
slug = (body.get("slug") or "").strip()
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
ok, payload = self._set_status_locked(slug, "pending", approved_at=None)
if not ok:
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
self.send_json(payload, code)
return
self.send_json({"ok": True, "slug": slug, "status": "pending"})
# ── POST /api/projects/cancel ─────────────────────────────────
def handle_cancel(self):
"""Cancel a project. Sets status=cancelled (matches router behaviour)."""
body = parse_json_body(self)
if body is None:
return
slug = (body.get("slug") or "").strip()
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
ok, payload = self._set_status_locked(slug, "cancelled")
if not ok:
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
self.send_json(payload, code)
return
self.send_json({"ok": True, "slug": slug, "status": "cancelled"})
# ── planning state helpers ────────────────────────────────────
@staticmethod
def _planning_key(slug: str) -> tuple[str, str]:
"""Dashboard planning sessions live in adapter='dashboard', channel=slug."""
return ("dashboard", slug)
@staticmethod
def _resolve_planning_key(slug: str) -> tuple[str, str]:
"""Find the active session's (adapter, channel) for slug, regardless of
where it was started. Falls back to ('dashboard', slug) if nothing
matches — preserving prior behavior for the no-session case.
Picks the most-recently-updated entry when multiple exist.
"""
try:
from src.planning_session import _load_planning_state # type: ignore
except Exception:
return ("dashboard", slug)
try:
all_state = _load_planning_state()
except Exception:
return ("dashboard", slug)
matches = [
entry for entry in all_state.values()
if (entry.get("slug") or "").lower() == slug.lower()
]
if not matches:
return ("dashboard", slug)
matches.sort(key=lambda e: e.get("updated_at") or "", reverse=True)
best = matches[0]
adapter = best.get("adapter") or "dashboard"
channel = best.get("channel_id") or slug
return (adapter, channel)
# ── POST /api/projects/<slug>/plan/start ──────────────────────
def handle_plan_start(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
body = parse_json_body(self)
if body is None:
return
description = (body.get("description") or "").strip()
# Description fallback: use the existing approved-tasks description so
# users can re-open planning on a project they already proposed.
if not description:
existing = _find_project(_read_approved(), slug)
if existing:
description = (existing.get("description") or "").strip()
desc_err = validate_description(description) if description else "description required"
if desc_err:
self.send_json({"error": "invalid_description", "message": desc_err}, 400)
return
# Set status=planning + ensure entry exists.
def _mut(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is None:
proj = {
"name": slug,
"description": description,
"status": "planning",
"planning_session_id": None,
"final_plan_path": None,
"proposed_at": _now_iso(),
"approved_at": None,
"started_at": None,
"pid": None,
}
data["projects"].append(proj)
else:
proj["status"] = "planning"
if not proj.get("description"):
proj["description"] = description
_bump_version(data)
return data
try:
_write_approved(_mut)
except Exception as exc:
log.exception("plan/start mutate failed")
self.send_json({"error": str(exc)}, 500)
return
adapter, channel = self._planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
session, first_text = PlanningOrchestrator.start(
slug=slug,
description=description,
channel_id=channel,
adapter=adapter,
)
except Exception as exc:
log.exception("PlanningOrchestrator.start failed for %s", slug)
# Roll status back so the dashboard reflects reality.
try:
self._set_status_locked(slug, "pending")
except Exception:
pass
self.send_json({"error": "planning_start_failed", "message": str(exc)}, 500)
return
# Stash planning_session_id on the entry.
def _stash(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is not None:
proj["planning_session_id"] = session.planning_session_id
return data
try:
_write_approved(_stash)
except Exception:
pass
self.send_json({
"ok": True,
"slug": slug,
"session_id": session.planning_session_id,
"phase": session.phase,
"message": first_text,
})
# ── POST /api/projects/<slug>/plan/respond ────────────────────
def handle_plan_respond(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
body = parse_json_body(self)
if body is None:
return
message = (body.get("message") or "").strip()
if not message:
self.send_json({"error": "message required"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
session, text, phase_ready = PlanningOrchestrator.respond(
adapter=adapter, channel_id=channel, message=message,
)
except Exception as exc:
log.exception("plan/respond failed for %s", slug)
self.send_json({"error": str(exc)}, 500)
return
if session is None:
self.send_json(
{"error": "no_active_session", "message": "Nu există o sesiune activă pentru acest slug."},
404,
)
return
self.send_json({
"ok": True,
"slug": slug,
"phase": session.phase,
"phase_ready": phase_ready,
"message": text,
})
# ── GET /api/projects/<slug>/plan/state ───────────────────────
def handle_plan_state(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._planning_key(slug)
try:
from src.planning_session import get_planning_state # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
state = get_planning_state(adapter, channel)
# Also try cross-adapter fallback so the dashboard surfaces sessions
# started via discord/telegram.
if state is None:
try:
from src.planning_session import _load_planning_state # type: ignore
all_state = _load_planning_state()
for _k, entry in all_state.items():
if (entry.get("slug") or "").lower() == slug.lower():
state = entry
break
except Exception:
pass
if state is None:
self.send_json({"slug": slug, "status": "expired", "phase": None, "pending_response": False})
return
# Heuristic: phase=='__complete__' means the pipeline finished.
phase = state.get("phase")
completed = phase == "__complete__"
self.send_json({
"slug": slug,
"status": "complete" if completed else "active",
"phase": phase,
"phases_completed": state.get("phases_completed") or [],
"phases_planned": state.get("phases_planned") or [],
"pending_response": False,
"since": state.get("started_at"),
"updated_at": state.get("updated_at"),
"session_id": state.get("session_id"),
"planning_session_id": state.get("planning_session_id"),
"last_text_excerpt": state.get("last_text_excerpt") or "",
"final_plan_path": state.get("final_plan_path"),
})
# ── GET /api/projects/<slug>/plan/transcript ──────────────────
def handle_plan_transcript(self, slug: str):
"""Return planning transcript.
The planning subprocess persists only the latest excerpt + metadata
in `sessions/planning.json` (full transcripts live inside Claude CLI
session state). We return what's available, plus the final-plan
markdown if it has been written.
"""
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._planning_key(slug)
try:
from src.planning_session import get_planning_state, _load_planning_state # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
state = get_planning_state(adapter, channel)
if state is None:
try:
all_state = _load_planning_state()
for _k, entry in all_state.items():
if (entry.get("slug") or "").lower() == slug.lower():
state = entry
break
except Exception:
pass
final_plan_text = ""
final_plan_path = None
try:
fp = constants.WORKSPACE_DIR / slug / "scripts" / "ralph" / "final-plan.md"
if fp.exists():
final_plan_path = str(fp)
final_plan_text = fp.read_text(encoding="utf-8", errors="replace")
except OSError:
pass
self.send_json({
"slug": slug,
"phase": (state or {}).get("phase"),
"phases_completed": (state or {}).get("phases_completed") or [],
"last_text_excerpt": (state or {}).get("last_text_excerpt") or "",
"final_plan_path": final_plan_path,
"final_plan": final_plan_text,
"history": [], # reserved for future per-turn JSONL sidecar
})
# ── POST /api/projects/<slug>/plan/finalize ───────────────────
def handle_plan_finalize(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_session import get_planning_state, clear_planning_state # type: ignore
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
state = get_planning_state(adapter, channel)
final_plan = None
if state and state.get("final_plan_path"):
final_plan = state["final_plan_path"]
else:
final_plan = str(PlanningOrchestrator.final_plan_path(slug))
def _mut(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is None:
raise KeyError(slug)
proj["status"] = "approved"
proj["approved_at"] = _now_iso()
proj["planning_session_id"] = None
proj["final_plan_path"] = final_plan
_bump_version(data)
return data
try:
_write_approved(_mut)
except KeyError:
self.send_json({"error": "not_found", "slug": slug}, 404)
return
except Exception as exc:
log.exception("plan/finalize mutate failed")
self.send_json({"error": str(exc)}, 500)
return
clear_planning_state(adapter, channel)
self.send_json({
"ok": True,
"slug": slug,
"status": "approved",
"final_plan_path": final_plan,
})
# ── POST /api/projects/<slug>/plan/cancel ─────────────────────
def handle_plan_cancel_planning(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
cleared = PlanningOrchestrator.cancel(adapter, channel)
# Revert status to pending if currently planning.
def _mut(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is not None and (proj.get("status") or "").lower() == "planning":
proj["status"] = "pending"
proj["planning_session_id"] = None
_bump_version(data)
return data
try:
_write_approved(_mut)
except Exception:
pass
self.send_json({"ok": True, "slug": slug, "cleared": cleared, "status": "pending"})
# ── POST /api/projects/<slug>/plan/advance ────────────────────
def handle_plan_advance(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
session, text, completed = PlanningOrchestrator.advance(adapter, channel)
except Exception as exc:
log.exception("plan/advance failed for %s", slug)
self.send_json({"error": str(exc)}, 500)
return
if session is None:
self.send_json({"error": "no_active_session", "message": text}, 404)
return
self.send_json({
"ok": True,
"slug": slug,
"phase": session.phase,
"completed": completed,
"message": text,
})