Files
echo-core/dashboard/handlers/projects.py
Marius Mutu 2bcefe1ab4 feat(projects): approval guard + worktree-aware ralph execution
Two structural fixes that together let users manage feature-branch
work without manual intervention:

Approval guard — `/plan/start` returns 409 `already_committed` if the
project status is approved/running/complete, unless the body opts in
with `force=true`. Frontend now renders "Re-planifică" instead of
"Planifică" on approved cards and gates it behind a confirm dialog
that threads `force=true` through. Prevents an accidental click from
wiping `status=approved` and burning a fresh planning subprocess.

Worktree awareness — projects can now declare that they target a
feature branch on an existing Gitea repo, not a repo-per-slug clone.
Three optional fields added to approved-tasks.json: `repo` (default
= slug), `branch` (feature branch to create), `base_branch` (default
main). Wired through `/p` flag parser in router.py, the dashboard
Propose modal's new "Avansat" section, and the night-execute prompt
which clones {repo} and creates {branch} from {base_branch} before
running ralph.

CLAUDE.md updated with both flows + the new schema fields.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 08:27:14 +00:00

1169 lines
44 KiB
Python

"""Unified project endpoints for the new dashboard (Task #4 backend).
Replaces the old separate /api/workspace + /api/ralph payloads with a single
/api/projects view that merges:
- workspace dirs (`~/workspace/<slug>/`)
- approved-tasks.json entries
- per-project Ralph PRD (`scripts/ralph/prd.json`)
Endpoints exposed via the mixin:
GET /api/projects — full list + version
GET /api/projects/signature — change-detection hash for SSE
GET /api/projects/stream — SSE stream of changes
GET /api/projects/<slug>/plan/state
GET /api/projects/<slug>/plan/transcript
POST /api/projects/propose
POST /api/projects/approve (If-Match: <version>)
POST /api/projects/unapprove (If-Match: <version>)
POST /api/projects/cancel (If-Match: <version>)
POST /api/projects/<slug>/plan/start
POST /api/projects/<slug>/plan/respond
POST /api/projects/<slug>/plan/finalize
POST /api/projects/<slug>/plan/cancel
POST /api/projects/<slug>/plan/advance
Status derivation (one of these strings — see `_derive_status`):
running-ralph — has PID, /proc/<pid>/cmdline contains ralph.sh
running-manual — has PID, alive, but not ralph.sh
planning — approved-tasks.json entry has status=planning
approved — entry has status=approved (waiting for night-execute)
pending — entry has status=pending
blocked — prd has stories, all incomplete are blocked
failed — prd has stories, last run had failures
complete — prd has stories, all passes==True
idle — workspace dir exists but no signal
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import parse_qs, urlparse
import constants
from handlers._validators import parse_json_body, validate_description, validate_slug
# Make `from src.x import ...` work — same trick youtube.py uses.
if str(constants.BASE_DIR) not in sys.path:
sys.path.insert(0, str(constants.BASE_DIR))
log = logging.getLogger(__name__)
APPROVED_TASKS_FILE = constants.BASE_DIR / "approved-tasks.json"
class _StaleVersion(Exception):
"""Raised inside a write_approved mutator to abort on If-Match mismatch."""
pass
# Module-level signature cache — only re-run `git status --porcelain` when the
# repo's `.git/index` mtime advances. Keeps SSE polling cheap.
_SIG_CACHE: dict = {"git_mtime": None, "signature": None, "ts": 0.0}
# ─── helpers ───────────────────────────────────────────────────────────
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _read_approved() -> dict:
"""Read approved-tasks.json under a shared flock."""
try:
from src.jsonlock import read_locked # type: ignore
return read_locked(str(APPROVED_TASKS_FILE))
except FileNotFoundError:
return {"projects": [], "last_updated": None, "version": 0}
except (json.JSONDecodeError, OSError) as exc: # pragma: no cover
log.error("approved-tasks.json read failed: %s", exc)
return {"projects": [], "last_updated": None, "version": 0}
def _write_approved(mutator):
"""Persist approved-tasks.json under an exclusive flock + atomic replace."""
from src.jsonlock import write_locked # type: ignore
def _wrap(data: dict) -> dict:
if not isinstance(data, dict):
data = {}
data.setdefault("projects", [])
data = mutator(data)
data["last_updated"] = _now_iso()
return data
return write_locked(str(APPROVED_TASKS_FILE), _wrap)
def _find_project(data: dict, slug: str) -> dict | None:
for p in data.get("projects", []) or []:
if (p.get("name") or "").lower() == slug.lower():
return p
return None
def _read_prd(project_dir: Path) -> dict | None:
prd_path = project_dir / "scripts" / "ralph" / "prd.json"
if not prd_path.exists():
return None
try:
return json.loads(prd_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
def _pid_alive_with_cmdline(pid: int | None) -> tuple[bool, str]:
"""(alive, cmdline) — cmdline is empty string if not readable."""
if not pid or not isinstance(pid, int):
return False, ""
try:
os.kill(pid, 0)
except (ProcessLookupError, PermissionError, OSError):
return False, ""
cmdline = ""
try:
with open(f"/proc/{pid}/cmdline", "rb") as f:
cmdline = f.read().decode("utf-8", errors="replace").replace("\x00", " ")
except (FileNotFoundError, PermissionError, OSError):
cmdline = ""
return True, cmdline
# ─── status derivation ────────────────────────────────────────────────
def _derive_status(slug, approved_entry, workspace_dir, prd_data) -> str:
"""Compute one of nine UI states (see module docstring)."""
# Process signal trumps everything else.
pid = (approved_entry or {}).get("pid")
alive, cmdline = _pid_alive_with_cmdline(pid)
if alive:
if "ralph.sh" in cmdline:
return "running-ralph"
return "running-manual"
# Approved-tasks entry status (planning/approved/pending takes precedence
# over PRD-derived states because the user explicitly set it).
if approved_entry:
s = (approved_entry.get("status") or "").lower()
if s == "planning":
return "planning"
if s == "approved":
return "approved"
if s == "pending":
return "pending"
if s in ("complete", "completed"):
# fall through to prd check — explicit complete wins if prd matches
pass
if s == "failed":
return "failed"
if s in ("cancelled", "canceled"):
return "idle"
# PRD-derived states (only for projects that ran Ralph).
stories = (prd_data or {}).get("userStories") or []
if stories:
all_pass = all(bool(s.get("passes")) for s in stories)
any_pass = any(bool(s.get("passes")) for s in stories)
if all_pass:
return "complete"
# Distinguish blocked vs failed using per-story flags.
incomplete = [s for s in stories if not s.get("passes")]
if incomplete and all(s.get("blocked") for s in incomplete):
return "blocked"
if any(s.get("failed") for s in stories):
return "failed"
if any_pass:
# partial progress without active PID — call it idle (re-runnable)
return "idle"
return "idle"
def _stories_progress(prd_data: dict | None) -> tuple[int, int]:
stories = (prd_data or {}).get("userStories") or []
total = len(stories)
done = sum(1 for s in stories if s.get("passes"))
return total, done
def _last_iter_mtime(project_dir: Path) -> float | None:
logs = project_dir / "scripts" / "ralph" / "logs"
if not logs.exists():
return None
try:
files = list(logs.glob("iteration-*.log"))
if not files:
return None
return max(f.stat().st_mtime for f in files)
except OSError:
return None
def _eta_seconds(prd_data, total, done, last_iter_mtime) -> int | None:
"""Crude ETA: 12 minutes per remaining story (matches RalphHandlers)."""
if total <= 0:
return None
remaining = total - done
if remaining <= 0:
return 0
return remaining * 12 * 60
def _last_message_preview(slug: str) -> str:
"""Return the most recent planning excerpt for any adapter on this slug.
The dashboard owns adapter="dashboard" but we also surface chats started
via discord/telegram so the unified UI reflects reality.
"""
try:
from src.planning_session import _load_planning_state # type: ignore
except Exception: # pragma: no cover — defensive
return ""
try:
state = _load_planning_state()
except Exception:
return ""
best = ""
best_ts = ""
for _key, entry in state.items():
if (entry.get("slug") or "").lower() != slug.lower():
continue
ts = entry.get("updated_at") or entry.get("started_at") or ""
excerpt = entry.get("last_text_excerpt") or ""
if ts >= best_ts and excerpt:
best = excerpt
best_ts = ts
return best[:200]
# ─── planning transcript helpers ──────────────────────────────────────
_EXTERNAL_CONTENT_RE = re.compile(
r"^\s*\[EXTERNAL CONTENT\]\s*\n(.*)\n\[END EXTERNAL CONTENT\]\s*$",
re.DOTALL,
)
_COMMAND_ARGS_RE = re.compile(r"<command-args>(.*?)</command-args>", re.DOTALL)
def _planning_jsonl_path(slug: str, session_id: str) -> Path | None:
"""Path to the Claude CLI JSONL transcript for a planning session.
PlanningSession runs with cwd = `~/workspace/<slug>/` if it exists, else
falls back to the echo-core repo root (see `PlanningSession.cwd`).
Claude CLI stores transcripts under
`~/.claude/projects/<encoded-cwd>/<session_id>.jsonl`, where encoded-cwd
is the absolute path with `/` replaced by `-`.
"""
if not session_id:
return None
workspace = constants.WORKSPACE_DIR / slug
cwd = workspace if workspace.is_dir() else constants.BASE_DIR
encoded = str(cwd).replace("/", "-")
return Path.home() / ".claude" / "projects" / encoded / f"{session_id}.jsonl"
def _clean_user_text(text: str) -> str:
"""Strip wrappers Echo adds around user input sent to the planning subprocess."""
m = _EXTERNAL_CONTENT_RE.match(text)
if m:
text = m.group(1).strip()
if "<command-name>" in text:
m = _COMMAND_ARGS_RE.search(text)
if m:
text = m.group(1).strip()
return text
def _extract_text_blocks(content) -> str:
"""Concatenate `text`-type blocks from a Claude `message.content` field."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
t = block.get("text") or ""
if t:
parts.append(t)
return "\n\n".join(parts)
return ""
def _load_planning_history(slug: str, session_id: str | None) -> list[dict]:
"""Parse the Claude session JSONL into a `[{role, text}, ...]` chat history.
Skips queue / system / tool-result entries. Empty list on any structural
problem so callers can fall back to `last_text_excerpt`.
"""
if not session_id:
return []
path = _planning_jsonl_path(slug, session_id)
if path is None or not path.exists():
return []
history: list[dict] = []
try:
with path.open(encoding="utf-8") as fh:
for raw in fh:
raw = raw.strip()
if not raw:
continue
try:
entry = json.loads(raw)
except json.JSONDecodeError:
continue
etype = entry.get("type")
if etype not in ("user", "assistant"):
continue
if etype == "user" and "toolUseResult" in entry:
continue
if entry.get("isMeta"):
continue
msg = entry.get("message") or {}
text = _extract_text_blocks(msg.get("content"))
if etype == "user":
text = _clean_user_text(text)
text = (text or "").strip()
if not text:
continue
history.append({"role": etype, "text": text})
except OSError:
return []
return history
# ─── version helpers ──────────────────────────────────────────────────
def _get_version_from(data: dict) -> int:
try:
return int(data.get("version") or 0)
except (TypeError, ValueError):
return 0
def _bump_version(data: dict) -> dict:
data["version"] = _get_version_from(data) + 1
return data
# ─── signature for SSE change detection ───────────────────────────────
def _git_index_mtime() -> float | None:
idx = constants.BASE_DIR / ".git" / "index"
try:
return idx.stat().st_mtime
except OSError:
return None
def _compute_signature() -> str:
"""Hash of (git porcelain status, workspace count, approved-tasks version).
Uses the cached value when `.git/index` mtime hasn't changed.
"""
now = time.monotonic()
git_mtime = _git_index_mtime()
cached_mtime = _SIG_CACHE.get("git_mtime")
cached_sig = _SIG_CACHE.get("signature")
# Always include workspace count + approved version + per-project mtimes
# in the signature (cheap stat). Only the git porcelain piece is cached.
ws_count = 0
project_marks: list[tuple] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
ws_count += 1
prd = entry / "scripts" / "ralph" / "prd.json"
pid_file = entry / "scripts" / "ralph" / ".ralph.pid"
try:
pmt = prd.stat().st_mtime if prd.exists() else 0
except OSError:
pmt = 0
try:
pid_mt = pid_file.stat().st_mtime if pid_file.exists() else 0
except OSError:
pid_mt = 0
project_marks.append((entry.name, pmt, pid_mt))
# Approved version
approved = _read_approved()
version = _get_version_from(approved)
# Git porcelain — cached on .git/index mtime
if git_mtime == cached_mtime and cached_sig is not None:
git_part = cached_sig
else:
try:
res = subprocess.run(
["git", "status", "--porcelain"],
cwd=str(constants.BASE_DIR),
capture_output=True, text=True, timeout=5,
)
git_part = res.stdout if res.returncode == 0 else ""
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
git_part = ""
_SIG_CACHE["git_mtime"] = git_mtime
_SIG_CACHE["signature"] = git_part
_SIG_CACHE["ts"] = now
h = hashlib.sha256()
h.update(git_part.encode("utf-8", errors="replace"))
h.update(str(ws_count).encode())
h.update(str(version).encode())
for name, pmt, pid_mt in project_marks:
h.update(name.encode("utf-8"))
h.update(f"{pmt:.3f}|{pid_mt:.3f}".encode())
return h.hexdigest()
# ─── projects mixin ───────────────────────────────────────────────────
class ProjectsHandlers:
"""Mixin: /api/projects/* endpoints — unified workspace + Ralph + planning."""
# ── helpers (instance) ─────────────────────────────────────────
def _get_version(self) -> int:
return _get_version_from(_read_approved())
def _increment_version(self, data: dict) -> dict:
return _bump_version(data)
def _derive_status(self, slug, approved_entry, workspace_entry, prd_data) -> str:
return _derive_status(slug, approved_entry, workspace_entry, prd_data)
def _build_unified_payload(self) -> dict:
approved = _read_approved()
approved_entries: dict[str, dict] = {
(p.get("name") or "").lower(): p for p in approved.get("projects", []) or []
}
# Walk workspace dirs first so we capture projects even without an
# approved-tasks entry.
slugs_seen: set[str] = set()
result: list[dict] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
slug = entry.name
slugs_seen.add(slug.lower())
ae = approved_entries.get(slug.lower())
prd = _read_prd(entry)
result.append(self._project_record(slug, entry, ae, prd))
# Then projects that exist only in approved-tasks (no workspace yet).
for ae in approved.get("projects", []) or []:
slug = ae.get("name") or ""
if not slug or slug.lower() in slugs_seen:
continue
result.append(self._project_record(slug, None, ae, None))
return {
"projects": sorted(result, key=lambda p: p.get("slug") or ""),
"version": _get_version_from(approved),
"fetchedAt": _now_iso(),
"count": len(result),
}
def _project_record(self, slug, workspace_dir, approved_entry, prd_data) -> dict:
status = _derive_status(slug, approved_entry, workspace_dir, prd_data)
total, done = _stories_progress(prd_data)
last_iter = _last_iter_mtime(workspace_dir) if workspace_dir else None
eta = _eta_seconds(prd_data, total, done, last_iter)
ae = approved_entry or {}
return {
"slug": slug,
"status": status,
"name": slug,
"description": ae.get("description") or "",
"proposed_at": ae.get("proposed_at"),
"approved_at": ae.get("approved_at"),
"started_at": ae.get("started_at"),
"pid": ae.get("pid"),
"stories_total": total,
"stories_done": done,
"eta_seconds": eta,
"last_message_preview": _last_message_preview(slug),
"error_reason": ae.get("error") or "",
"has_workspace": workspace_dir is not None,
"has_prd": prd_data is not None,
}
# ── GET /api/projects ──────────────────────────────────────────
def handle_unified_status(self):
try:
self.send_json(self._build_unified_payload())
except Exception as exc:
log.exception("handle_unified_status failed")
self.send_json({"error": str(exc)}, 500)
# ── GET /api/projects/signature ────────────────────────────────
def handle_unified_signature(self):
try:
sig = _compute_signature()
approved = _read_approved()
self.send_json({
"signature": sig,
"version": _get_version_from(approved),
"fetchedAt": _now_iso(),
})
except Exception as exc:
log.exception("handle_unified_signature failed")
self.send_json({"error": str(exc)}, 500)
# ── GET /api/projects/stream (SSE) ────────────────────────────
def handle_projects_stream(self):
"""Stream `data: <payload>\\n\\n` whenever the signature changes.
Explicit cookie check (the global POST middleware doesn't cover GET).
Keep-alive ping every 15s. Exits cleanly on BrokenPipe.
"""
# Auth: require cookie even though this is a GET.
if hasattr(self, "_check_dashboard_cookie") and not self._check_dashboard_cookie():
body = b'{"error":"Unauthorized"}'
try:
self.send_response(401)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError):
pass
return
# CSRF/Origin allowlist (skipped if no Origin/Referer — curl etc).
origin = self.headers.get("Origin", "") or ""
referer = self.headers.get("Referer", "") or ""
allowed = ["http://127.0.0.1:8088", "http://localhost:8088"]
dh = os.environ.get("DASHBOARD_HOST", "").strip()
if dh:
allowed.append(dh)
if origin or referer:
check = origin or referer
if not any(check.startswith(a) for a in allowed):
body = b'{"error":"CSRF"}'
try:
self.send_response(403)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError):
pass
return
try:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("X-Accel-Buffering", "no")
self.send_header("Connection", "keep-alive")
self.end_headers()
except (BrokenPipeError, ConnectionResetError):
return
last_sig: str | None = None
last_ping = time.monotonic()
# Initial snapshot
try:
payload = self._build_unified_payload()
last_sig = _compute_signature()
self.wfile.write(b"data: " + json.dumps(payload).encode("utf-8") + b"\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
return
except Exception as exc:
try:
err = json.dumps({"error": str(exc)}).encode("utf-8")
self.wfile.write(b"data: " + err + b"\n\n")
self.wfile.flush()
except Exception:
pass
return
while True:
try:
time.sleep(2)
sig = _compute_signature()
now = time.monotonic()
if sig != last_sig:
payload = self._build_unified_payload()
body = json.dumps(payload).encode("utf-8")
self.wfile.write(b"data: " + body + b"\n\n")
self.wfile.flush()
last_sig = sig
last_ping = now
elif now - last_ping >= 15:
self.wfile.write(b'data: {"type":"ping"}\n\n')
self.wfile.flush()
last_ping = now
except (BrokenPipeError, ConnectionResetError):
return
except Exception:
# Best-effort: one bad iter shouldn't kill the stream.
continue
# ── POST /api/projects/propose ─────────────────────────────────
def handle_propose(self):
body = parse_json_body(self)
if body is None:
return # parse_json_body already sent 400
slug = (body.get("slug") or "").strip()
description = (body.get("description") or "").strip()
repo = (body.get("repo") or "").strip() or None
branch = (body.get("branch") or "").strip() or None
base_branch = (body.get("base_branch") or "").strip() or None
slug_err = validate_slug(slug)
desc_err = validate_description(description)
if slug_err or desc_err:
errors: dict[str, str] = {}
if slug_err:
errors["slug"] = slug_err
if desc_err:
errors["description"] = desc_err
self.send_json({"error": "validation_failed", "fields": errors}, 400)
return
existing = _find_project(_read_approved(), slug)
if existing:
self.send_json(
{"error": "duplicate_slug", "slug": slug, "status": existing.get("status")},
409,
)
return
def _add(data: dict) -> dict:
# Re-check inside the lock — guard against races.
if _find_project(data, slug) is not None:
raise FileExistsError(slug)
data["projects"].append({
"name": slug,
"description": description,
"status": "pending",
"planning_session_id": None,
"final_plan_path": None,
"repo": repo,
"branch": branch,
"base_branch": base_branch,
"proposed_at": _now_iso(),
"approved_at": None,
"started_at": None,
"pid": None,
})
_bump_version(data)
return data
try:
_write_approved(_add)
except FileExistsError:
self.send_json({"error": "duplicate_slug", "slug": slug}, 409)
return
except Exception as exc:
log.exception("propose failed")
self.send_json({"error": str(exc)}, 500)
return
self.send_json({"slug": slug, "status": "pending"}, 201)
# ── helpers for approve/unapprove/cancel ──────────────────────
def _read_if_match(self) -> int | None:
raw = self.headers.get("If-Match", "") or self.headers.get("if-match", "")
if not raw:
return None
raw = raw.strip().strip('"')
try:
return int(raw)
except (TypeError, ValueError):
return None
def _set_status_locked(self, slug: str, target_status: str, **extra) -> tuple[bool, dict]:
"""Mutate approved-tasks: set status + extra fields. Bumps version.
Returns (ok, project_entry_or_error_dict).
"""
client_ver = self._read_if_match()
captured: dict = {}
def _mut(data: dict) -> dict:
current_ver = _get_version_from(data)
if client_ver is not None and current_ver != client_ver:
captured["error"] = "stale"
captured["current_version"] = current_ver
raise _StaleVersion()
proj = _find_project(data, slug)
if proj is None:
captured["error"] = "not_found"
raise KeyError(slug)
proj["status"] = target_status
for k, v in extra.items():
if v is None and k in proj:
proj[k] = None
else:
proj[k] = v
captured["project"] = proj
_bump_version(data)
return data
try:
_write_approved(_mut)
except _StaleVersion:
return False, {
"error": "stale",
"current_version": captured.get("current_version", 0),
}
except KeyError:
return False, {"error": "not_found", "slug": slug}
except Exception as exc:
log.exception("status mutate failed")
return False, {"error": str(exc)}
return True, captured.get("project", {})
# ── POST /api/projects/approve ─────────────────────────────────
def handle_approve(self):
body = parse_json_body(self)
if body is None:
return
slug = (body.get("slug") or "").strip()
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
ok, payload = self._set_status_locked(slug, "approved", approved_at=_now_iso())
if not ok:
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
self.send_json(payload, code)
return
self.send_json({"ok": True, "slug": slug, "status": "approved"})
# ── POST /api/projects/unapprove ──────────────────────────────
def handle_unapprove(self):
body = parse_json_body(self)
if body is None:
return
slug = (body.get("slug") or "").strip()
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
ok, payload = self._set_status_locked(slug, "pending", approved_at=None)
if not ok:
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
self.send_json(payload, code)
return
self.send_json({"ok": True, "slug": slug, "status": "pending"})
# ── POST /api/projects/cancel ─────────────────────────────────
def handle_cancel(self):
"""Cancel a project. Sets status=cancelled (matches router behaviour)."""
body = parse_json_body(self)
if body is None:
return
slug = (body.get("slug") or "").strip()
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
ok, payload = self._set_status_locked(slug, "cancelled")
if not ok:
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
self.send_json(payload, code)
return
self.send_json({"ok": True, "slug": slug, "status": "cancelled"})
# ── planning state helpers ────────────────────────────────────
@staticmethod
def _planning_key(slug: str) -> tuple[str, str]:
"""Dashboard planning sessions live in adapter='dashboard', channel=slug."""
return ("dashboard", slug)
@staticmethod
def _resolve_planning_key(slug: str) -> tuple[str, str]:
"""Find the active session's (adapter, channel) for slug, regardless of
where it was started. Falls back to ('dashboard', slug) if nothing
matches — preserving prior behavior for the no-session case.
Picks the most-recently-updated entry when multiple exist.
"""
try:
from src.planning_session import _load_planning_state # type: ignore
except Exception:
return ("dashboard", slug)
try:
all_state = _load_planning_state()
except Exception:
return ("dashboard", slug)
matches = [
entry for entry in all_state.values()
if (entry.get("slug") or "").lower() == slug.lower()
]
if not matches:
return ("dashboard", slug)
matches.sort(key=lambda e: e.get("updated_at") or "", reverse=True)
best = matches[0]
adapter = best.get("adapter") or "dashboard"
channel = best.get("channel_id") or slug
return (adapter, channel)
# ── POST /api/projects/<slug>/plan/start ──────────────────────
def handle_plan_start(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
body = parse_json_body(self)
if body is None:
return
description = (body.get("description") or "").strip()
force = bool(body.get("force"))
# Approval guard: refuse to silently overwrite approved/running/complete
# projects. Caller must opt in with force=true (the "Re-planifică" path).
existing_proj = _find_project(_read_approved(), slug)
if existing_proj and not force:
current_status = (existing_proj.get("status") or "").lower()
if current_status in {"approved", "running", "complete"}:
self.send_json({
"error": "already_committed",
"slug": slug,
"status": current_status,
"message": (
f"Proiectul `{slug}` e deja `{current_status}`. "
"Folosește «Re-planifică» (cu force=true) ca să reiei planning-ul."
),
}, 409)
return
# Description fallback: use the existing approved-tasks description so
# users can re-open planning on a project they already proposed.
if not description:
if existing_proj:
description = (existing_proj.get("description") or "").strip()
desc_err = validate_description(description) if description else "description required"
if desc_err:
self.send_json({"error": "invalid_description", "message": desc_err}, 400)
return
# Set status=planning + ensure entry exists.
def _mut(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is None:
proj = {
"name": slug,
"description": description,
"status": "planning",
"planning_session_id": None,
"final_plan_path": None,
"repo": None,
"branch": None,
"base_branch": None,
"proposed_at": _now_iso(),
"approved_at": None,
"started_at": None,
"pid": None,
}
data["projects"].append(proj)
else:
proj["status"] = "planning"
if not proj.get("description"):
proj["description"] = description
_bump_version(data)
return data
try:
_write_approved(_mut)
except Exception as exc:
log.exception("plan/start mutate failed")
self.send_json({"error": str(exc)}, 500)
return
adapter, channel = self._planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
session, first_text = PlanningOrchestrator.start(
slug=slug,
description=description,
channel_id=channel,
adapter=adapter,
)
except Exception as exc:
log.exception("PlanningOrchestrator.start failed for %s", slug)
# Roll status back so the dashboard reflects reality.
try:
self._set_status_locked(slug, "pending")
except Exception:
pass
self.send_json({"error": "planning_start_failed", "message": str(exc)}, 500)
return
# Stash planning_session_id on the entry.
def _stash(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is not None:
proj["planning_session_id"] = session.planning_session_id
return data
try:
_write_approved(_stash)
except Exception:
pass
self.send_json({
"ok": True,
"slug": slug,
"session_id": session.planning_session_id,
"phase": session.phase,
"message": first_text,
})
# ── POST /api/projects/<slug>/plan/respond ────────────────────
def handle_plan_respond(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
body = parse_json_body(self)
if body is None:
return
message = (body.get("message") or "").strip()
if not message:
self.send_json({"error": "message required"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
session, text, phase_ready = PlanningOrchestrator.respond(
adapter=adapter, channel_id=channel, message=message,
)
except Exception as exc:
log.exception("plan/respond failed for %s", slug)
self.send_json({"error": str(exc)}, 500)
return
if session is None:
self.send_json(
{"error": "no_active_session", "message": "Nu există o sesiune activă pentru acest slug."},
404,
)
return
self.send_json({
"ok": True,
"slug": slug,
"phase": session.phase,
"phase_ready": phase_ready,
"message": text,
})
# ── GET /api/projects/<slug>/plan/state ───────────────────────
def handle_plan_state(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._planning_key(slug)
try:
from src.planning_session import get_planning_state # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
state = get_planning_state(adapter, channel)
# Also try cross-adapter fallback so the dashboard surfaces sessions
# started via discord/telegram.
if state is None:
try:
from src.planning_session import _load_planning_state # type: ignore
all_state = _load_planning_state()
for _k, entry in all_state.items():
if (entry.get("slug") or "").lower() == slug.lower():
state = entry
break
except Exception:
pass
if state is None:
self.send_json({"slug": slug, "status": "expired", "phase": None, "pending_response": False})
return
# Heuristic: phase=='__complete__' means the pipeline finished.
phase = state.get("phase")
completed = phase == "__complete__"
self.send_json({
"slug": slug,
"status": "complete" if completed else "active",
"phase": phase,
"phases_completed": state.get("phases_completed") or [],
"phases_planned": state.get("phases_planned") or [],
"pending_response": False,
"since": state.get("started_at"),
"updated_at": state.get("updated_at"),
"session_id": state.get("session_id"),
"planning_session_id": state.get("planning_session_id"),
"last_text_excerpt": state.get("last_text_excerpt") or "",
"final_plan_path": state.get("final_plan_path"),
})
# ── GET /api/projects/<slug>/plan/transcript ──────────────────
def handle_plan_transcript(self, slug: str):
"""Return planning transcript.
The planning subprocess persists only the latest excerpt + metadata
in `sessions/planning.json` (full transcripts live inside Claude CLI
session state). We return what's available, plus the final-plan
markdown if it has been written.
"""
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._planning_key(slug)
try:
from src.planning_session import get_planning_state, _load_planning_state # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
state = get_planning_state(adapter, channel)
if state is None:
try:
all_state = _load_planning_state()
for _k, entry in all_state.items():
if (entry.get("slug") or "").lower() == slug.lower():
state = entry
break
except Exception:
pass
final_plan_text = ""
final_plan_path = None
try:
fp = constants.WORKSPACE_DIR / slug / "scripts" / "ralph" / "final-plan.md"
if fp.exists():
final_plan_path = str(fp)
final_plan_text = fp.read_text(encoding="utf-8", errors="replace")
except OSError:
pass
session_id = (state or {}).get("session_id") or ""
history = _load_planning_history(slug, session_id)
self.send_json({
"slug": slug,
"phase": (state or {}).get("phase"),
"phases_completed": (state or {}).get("phases_completed") or [],
"last_text_excerpt": (state or {}).get("last_text_excerpt") or "",
"final_plan_path": final_plan_path,
"final_plan": final_plan_text,
"history": history,
})
# ── POST /api/projects/<slug>/plan/finalize ───────────────────
def handle_plan_finalize(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_session import get_planning_state, clear_planning_state # type: ignore
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
state = get_planning_state(adapter, channel)
final_plan = None
if state and state.get("final_plan_path"):
final_plan = state["final_plan_path"]
else:
final_plan = str(PlanningOrchestrator.final_plan_path(slug))
def _mut(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is None:
raise KeyError(slug)
proj["status"] = "approved"
proj["approved_at"] = _now_iso()
proj["planning_session_id"] = None
proj["final_plan_path"] = final_plan
_bump_version(data)
return data
try:
_write_approved(_mut)
except KeyError:
self.send_json({"error": "not_found", "slug": slug}, 404)
return
except Exception as exc:
log.exception("plan/finalize mutate failed")
self.send_json({"error": str(exc)}, 500)
return
clear_planning_state(adapter, channel)
self.send_json({
"ok": True,
"slug": slug,
"status": "approved",
"final_plan_path": final_plan,
})
# ── POST /api/projects/<slug>/plan/cancel ─────────────────────
def handle_plan_cancel_planning(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
return
cleared = PlanningOrchestrator.cancel(adapter, channel)
# Revert status to pending if currently planning.
def _mut(data: dict) -> dict:
proj = _find_project(data, slug)
if proj is not None and (proj.get("status") or "").lower() == "planning":
proj["status"] = "pending"
proj["planning_session_id"] = None
_bump_version(data)
return data
try:
_write_approved(_mut)
except Exception:
pass
self.send_json({"ok": True, "slug": slug, "cleared": cleared, "status": "pending"})
# ── POST /api/projects/<slug>/plan/advance ────────────────────
def handle_plan_advance(self, slug: str):
if validate_slug(slug):
self.send_json({"error": "invalid_slug"}, 400)
return
adapter, channel = self._resolve_planning_key(slug)
try:
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
session, text, completed = PlanningOrchestrator.advance(adapter, channel)
except Exception as exc:
log.exception("plan/advance failed for %s", slug)
self.send_json({"error": str(exc)}, 500)
return
if session is None:
self.send_json({"error": "no_active_session", "message": text}, 404)
return
self.send_json({
"ok": True,
"slug": slug,
"phase": session.phase,
"completed": completed,
"message": text,
})