_resolve_planning_key searches all active sessions by slug regardless of adapter, so respond/finalize/cancel/advance work even when planning was initiated from Discord or Telegram. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1044 lines
40 KiB
Python
1044 lines
40 KiB
Python
"""Unified project endpoints for the new dashboard (Task #4 backend).
|
|
|
|
Replaces the old separate /api/workspace + /api/ralph payloads with a single
|
|
/api/projects view that merges:
|
|
|
|
- workspace dirs (`~/workspace/<slug>/`)
|
|
- approved-tasks.json entries
|
|
- per-project Ralph PRD (`scripts/ralph/prd.json`)
|
|
|
|
Endpoints exposed via the mixin:
|
|
|
|
GET /api/projects — full list + version
|
|
GET /api/projects/signature — change-detection hash for SSE
|
|
GET /api/projects/stream — SSE stream of changes
|
|
GET /api/projects/<slug>/plan/state
|
|
GET /api/projects/<slug>/plan/transcript
|
|
|
|
POST /api/projects/propose
|
|
POST /api/projects/approve (If-Match: <version>)
|
|
POST /api/projects/unapprove (If-Match: <version>)
|
|
POST /api/projects/cancel (If-Match: <version>)
|
|
POST /api/projects/<slug>/plan/start
|
|
POST /api/projects/<slug>/plan/respond
|
|
POST /api/projects/<slug>/plan/finalize
|
|
POST /api/projects/<slug>/plan/cancel
|
|
POST /api/projects/<slug>/plan/advance
|
|
|
|
Status derivation (one of these strings — see `_derive_status`):
|
|
|
|
running-ralph — has PID, /proc/<pid>/cmdline contains ralph.sh
|
|
running-manual — has PID, alive, but not ralph.sh
|
|
planning — approved-tasks.json entry has status=planning
|
|
approved — entry has status=approved (waiting for night-execute)
|
|
pending — entry has status=pending
|
|
blocked — prd has stories, all incomplete are blocked
|
|
failed — prd has stories, last run had failures
|
|
complete — prd has stories, all passes==True
|
|
idle — workspace dir exists but no signal
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
import constants
|
|
|
|
from handlers._validators import parse_json_body, validate_description, validate_slug
|
|
|
|
# Make `from src.x import ...` work — same trick youtube.py uses.
|
|
if str(constants.BASE_DIR) not in sys.path:
|
|
sys.path.insert(0, str(constants.BASE_DIR))
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
APPROVED_TASKS_FILE = constants.BASE_DIR / "approved-tasks.json"
|
|
|
|
|
|
class _StaleVersion(Exception):
|
|
"""Raised inside a write_approved mutator to abort on If-Match mismatch."""
|
|
pass
|
|
|
|
# Module-level signature cache — only re-run `git status --porcelain` when the
|
|
# repo's `.git/index` mtime advances. Keeps SSE polling cheap.
|
|
_SIG_CACHE: dict = {"git_mtime": None, "signature": None, "ts": 0.0}
|
|
|
|
|
|
# ─── helpers ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _read_approved() -> dict:
|
|
"""Read approved-tasks.json under a shared flock."""
|
|
try:
|
|
from src.jsonlock import read_locked # type: ignore
|
|
return read_locked(str(APPROVED_TASKS_FILE))
|
|
except FileNotFoundError:
|
|
return {"projects": [], "last_updated": None, "version": 0}
|
|
except (json.JSONDecodeError, OSError) as exc: # pragma: no cover
|
|
log.error("approved-tasks.json read failed: %s", exc)
|
|
return {"projects": [], "last_updated": None, "version": 0}
|
|
|
|
|
|
def _write_approved(mutator):
|
|
"""Persist approved-tasks.json under an exclusive flock + atomic replace."""
|
|
from src.jsonlock import write_locked # type: ignore
|
|
|
|
def _wrap(data: dict) -> dict:
|
|
if not isinstance(data, dict):
|
|
data = {}
|
|
data.setdefault("projects", [])
|
|
data = mutator(data)
|
|
data["last_updated"] = _now_iso()
|
|
return data
|
|
|
|
return write_locked(str(APPROVED_TASKS_FILE), _wrap)
|
|
|
|
|
|
def _find_project(data: dict, slug: str) -> dict | None:
|
|
for p in data.get("projects", []) or []:
|
|
if (p.get("name") or "").lower() == slug.lower():
|
|
return p
|
|
return None
|
|
|
|
|
|
def _read_prd(project_dir: Path) -> dict | None:
|
|
prd_path = project_dir / "scripts" / "ralph" / "prd.json"
|
|
if not prd_path.exists():
|
|
return None
|
|
try:
|
|
return json.loads(prd_path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
|
|
def _pid_alive_with_cmdline(pid: int | None) -> tuple[bool, str]:
|
|
"""(alive, cmdline) — cmdline is empty string if not readable."""
|
|
if not pid or not isinstance(pid, int):
|
|
return False, ""
|
|
try:
|
|
os.kill(pid, 0)
|
|
except (ProcessLookupError, PermissionError, OSError):
|
|
return False, ""
|
|
cmdline = ""
|
|
try:
|
|
with open(f"/proc/{pid}/cmdline", "rb") as f:
|
|
cmdline = f.read().decode("utf-8", errors="replace").replace("\x00", " ")
|
|
except (FileNotFoundError, PermissionError, OSError):
|
|
cmdline = ""
|
|
return True, cmdline
|
|
|
|
|
|
# ─── status derivation ────────────────────────────────────────────────
|
|
|
|
|
|
def _derive_status(slug, approved_entry, workspace_dir, prd_data) -> str:
|
|
"""Compute one of nine UI states (see module docstring)."""
|
|
# Process signal trumps everything else.
|
|
pid = (approved_entry or {}).get("pid")
|
|
alive, cmdline = _pid_alive_with_cmdline(pid)
|
|
if alive:
|
|
if "ralph.sh" in cmdline:
|
|
return "running-ralph"
|
|
return "running-manual"
|
|
|
|
# Approved-tasks entry status (planning/approved/pending takes precedence
|
|
# over PRD-derived states because the user explicitly set it).
|
|
if approved_entry:
|
|
s = (approved_entry.get("status") or "").lower()
|
|
if s == "planning":
|
|
return "planning"
|
|
if s == "approved":
|
|
return "approved"
|
|
if s == "pending":
|
|
return "pending"
|
|
if s in ("complete", "completed"):
|
|
# fall through to prd check — explicit complete wins if prd matches
|
|
pass
|
|
if s == "failed":
|
|
return "failed"
|
|
if s in ("cancelled", "canceled"):
|
|
return "idle"
|
|
|
|
# PRD-derived states (only for projects that ran Ralph).
|
|
stories = (prd_data or {}).get("userStories") or []
|
|
if stories:
|
|
all_pass = all(bool(s.get("passes")) for s in stories)
|
|
any_pass = any(bool(s.get("passes")) for s in stories)
|
|
if all_pass:
|
|
return "complete"
|
|
# Distinguish blocked vs failed using per-story flags.
|
|
incomplete = [s for s in stories if not s.get("passes")]
|
|
if incomplete and all(s.get("blocked") for s in incomplete):
|
|
return "blocked"
|
|
if any(s.get("failed") for s in stories):
|
|
return "failed"
|
|
if any_pass:
|
|
# partial progress without active PID — call it idle (re-runnable)
|
|
return "idle"
|
|
|
|
return "idle"
|
|
|
|
|
|
def _stories_progress(prd_data: dict | None) -> tuple[int, int]:
|
|
stories = (prd_data or {}).get("userStories") or []
|
|
total = len(stories)
|
|
done = sum(1 for s in stories if s.get("passes"))
|
|
return total, done
|
|
|
|
|
|
def _last_iter_mtime(project_dir: Path) -> float | None:
|
|
logs = project_dir / "scripts" / "ralph" / "logs"
|
|
if not logs.exists():
|
|
return None
|
|
try:
|
|
files = list(logs.glob("iteration-*.log"))
|
|
if not files:
|
|
return None
|
|
return max(f.stat().st_mtime for f in files)
|
|
except OSError:
|
|
return None
|
|
|
|
|
|
def _eta_seconds(prd_data, total, done, last_iter_mtime) -> int | None:
|
|
"""Crude ETA: 12 minutes per remaining story (matches RalphHandlers)."""
|
|
if total <= 0:
|
|
return None
|
|
remaining = total - done
|
|
if remaining <= 0:
|
|
return 0
|
|
return remaining * 12 * 60
|
|
|
|
|
|
def _last_message_preview(slug: str) -> str:
|
|
"""Return the most recent planning excerpt for any adapter on this slug.
|
|
|
|
The dashboard owns adapter="dashboard" but we also surface chats started
|
|
via discord/telegram so the unified UI reflects reality.
|
|
"""
|
|
try:
|
|
from src.planning_session import _load_planning_state # type: ignore
|
|
except Exception: # pragma: no cover — defensive
|
|
return ""
|
|
try:
|
|
state = _load_planning_state()
|
|
except Exception:
|
|
return ""
|
|
best = ""
|
|
best_ts = ""
|
|
for _key, entry in state.items():
|
|
if (entry.get("slug") or "").lower() != slug.lower():
|
|
continue
|
|
ts = entry.get("updated_at") or entry.get("started_at") or ""
|
|
excerpt = entry.get("last_text_excerpt") or ""
|
|
if ts >= best_ts and excerpt:
|
|
best = excerpt
|
|
best_ts = ts
|
|
return best[:200]
|
|
|
|
|
|
# ─── version helpers ──────────────────────────────────────────────────
|
|
|
|
|
|
def _get_version_from(data: dict) -> int:
|
|
try:
|
|
return int(data.get("version") or 0)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
def _bump_version(data: dict) -> dict:
|
|
data["version"] = _get_version_from(data) + 1
|
|
return data
|
|
|
|
|
|
# ─── signature for SSE change detection ───────────────────────────────
|
|
|
|
|
|
def _git_index_mtime() -> float | None:
|
|
idx = constants.BASE_DIR / ".git" / "index"
|
|
try:
|
|
return idx.stat().st_mtime
|
|
except OSError:
|
|
return None
|
|
|
|
|
|
def _compute_signature() -> str:
|
|
"""Hash of (git porcelain status, workspace count, approved-tasks version).
|
|
|
|
Uses the cached value when `.git/index` mtime hasn't changed.
|
|
"""
|
|
now = time.monotonic()
|
|
git_mtime = _git_index_mtime()
|
|
cached_mtime = _SIG_CACHE.get("git_mtime")
|
|
cached_sig = _SIG_CACHE.get("signature")
|
|
|
|
# Always include workspace count + approved version + per-project mtimes
|
|
# in the signature (cheap stat). Only the git porcelain piece is cached.
|
|
ws_count = 0
|
|
project_marks: list[tuple] = []
|
|
if constants.WORKSPACE_DIR.exists():
|
|
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
|
|
if not entry.is_dir() or entry.name.startswith("."):
|
|
continue
|
|
ws_count += 1
|
|
prd = entry / "scripts" / "ralph" / "prd.json"
|
|
pid_file = entry / "scripts" / "ralph" / ".ralph.pid"
|
|
try:
|
|
pmt = prd.stat().st_mtime if prd.exists() else 0
|
|
except OSError:
|
|
pmt = 0
|
|
try:
|
|
pid_mt = pid_file.stat().st_mtime if pid_file.exists() else 0
|
|
except OSError:
|
|
pid_mt = 0
|
|
project_marks.append((entry.name, pmt, pid_mt))
|
|
|
|
# Approved version
|
|
approved = _read_approved()
|
|
version = _get_version_from(approved)
|
|
|
|
# Git porcelain — cached on .git/index mtime
|
|
if git_mtime == cached_mtime and cached_sig is not None:
|
|
git_part = cached_sig
|
|
else:
|
|
try:
|
|
res = subprocess.run(
|
|
["git", "status", "--porcelain"],
|
|
cwd=str(constants.BASE_DIR),
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
git_part = res.stdout if res.returncode == 0 else ""
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
|
git_part = ""
|
|
_SIG_CACHE["git_mtime"] = git_mtime
|
|
_SIG_CACHE["signature"] = git_part
|
|
_SIG_CACHE["ts"] = now
|
|
|
|
h = hashlib.sha256()
|
|
h.update(git_part.encode("utf-8", errors="replace"))
|
|
h.update(str(ws_count).encode())
|
|
h.update(str(version).encode())
|
|
for name, pmt, pid_mt in project_marks:
|
|
h.update(name.encode("utf-8"))
|
|
h.update(f"{pmt:.3f}|{pid_mt:.3f}".encode())
|
|
return h.hexdigest()
|
|
|
|
|
|
# ─── projects mixin ───────────────────────────────────────────────────
|
|
|
|
|
|
class ProjectsHandlers:
|
|
"""Mixin: /api/projects/* endpoints — unified workspace + Ralph + planning."""
|
|
|
|
# ── helpers (instance) ─────────────────────────────────────────
|
|
def _get_version(self) -> int:
|
|
return _get_version_from(_read_approved())
|
|
|
|
def _increment_version(self, data: dict) -> dict:
|
|
return _bump_version(data)
|
|
|
|
def _derive_status(self, slug, approved_entry, workspace_entry, prd_data) -> str:
|
|
return _derive_status(slug, approved_entry, workspace_entry, prd_data)
|
|
|
|
def _build_unified_payload(self) -> dict:
|
|
approved = _read_approved()
|
|
approved_entries: dict[str, dict] = {
|
|
(p.get("name") or "").lower(): p for p in approved.get("projects", []) or []
|
|
}
|
|
|
|
# Walk workspace dirs first so we capture projects even without an
|
|
# approved-tasks entry.
|
|
slugs_seen: set[str] = set()
|
|
result: list[dict] = []
|
|
if constants.WORKSPACE_DIR.exists():
|
|
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
|
|
if not entry.is_dir() or entry.name.startswith("."):
|
|
continue
|
|
slug = entry.name
|
|
slugs_seen.add(slug.lower())
|
|
ae = approved_entries.get(slug.lower())
|
|
prd = _read_prd(entry)
|
|
result.append(self._project_record(slug, entry, ae, prd))
|
|
|
|
# Then projects that exist only in approved-tasks (no workspace yet).
|
|
for ae in approved.get("projects", []) or []:
|
|
slug = ae.get("name") or ""
|
|
if not slug or slug.lower() in slugs_seen:
|
|
continue
|
|
result.append(self._project_record(slug, None, ae, None))
|
|
|
|
return {
|
|
"projects": sorted(result, key=lambda p: p.get("slug") or ""),
|
|
"version": _get_version_from(approved),
|
|
"fetchedAt": _now_iso(),
|
|
"count": len(result),
|
|
}
|
|
|
|
def _project_record(self, slug, workspace_dir, approved_entry, prd_data) -> dict:
|
|
status = _derive_status(slug, approved_entry, workspace_dir, prd_data)
|
|
total, done = _stories_progress(prd_data)
|
|
last_iter = _last_iter_mtime(workspace_dir) if workspace_dir else None
|
|
eta = _eta_seconds(prd_data, total, done, last_iter)
|
|
ae = approved_entry or {}
|
|
return {
|
|
"slug": slug,
|
|
"status": status,
|
|
"name": slug,
|
|
"description": ae.get("description") or "",
|
|
"proposed_at": ae.get("proposed_at"),
|
|
"approved_at": ae.get("approved_at"),
|
|
"started_at": ae.get("started_at"),
|
|
"pid": ae.get("pid"),
|
|
"stories_total": total,
|
|
"stories_done": done,
|
|
"eta_seconds": eta,
|
|
"last_message_preview": _last_message_preview(slug),
|
|
"error_reason": ae.get("error") or "",
|
|
"has_workspace": workspace_dir is not None,
|
|
"has_prd": prd_data is not None,
|
|
}
|
|
|
|
# ── GET /api/projects ──────────────────────────────────────────
|
|
def handle_unified_status(self):
|
|
try:
|
|
self.send_json(self._build_unified_payload())
|
|
except Exception as exc:
|
|
log.exception("handle_unified_status failed")
|
|
self.send_json({"error": str(exc)}, 500)
|
|
|
|
# ── GET /api/projects/signature ────────────────────────────────
|
|
def handle_unified_signature(self):
|
|
try:
|
|
sig = _compute_signature()
|
|
approved = _read_approved()
|
|
self.send_json({
|
|
"signature": sig,
|
|
"version": _get_version_from(approved),
|
|
"fetchedAt": _now_iso(),
|
|
})
|
|
except Exception as exc:
|
|
log.exception("handle_unified_signature failed")
|
|
self.send_json({"error": str(exc)}, 500)
|
|
|
|
# ── GET /api/projects/stream (SSE) ────────────────────────────
|
|
def handle_projects_stream(self):
|
|
"""Stream `data: <payload>\\n\\n` whenever the signature changes.
|
|
|
|
Explicit cookie check (the global POST middleware doesn't cover GET).
|
|
Keep-alive ping every 15s. Exits cleanly on BrokenPipe.
|
|
"""
|
|
# Auth: require cookie even though this is a GET.
|
|
if hasattr(self, "_check_dashboard_cookie") and not self._check_dashboard_cookie():
|
|
body = b'{"error":"Unauthorized"}'
|
|
try:
|
|
self.send_response(401)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
pass
|
|
return
|
|
|
|
# CSRF/Origin allowlist (skipped if no Origin/Referer — curl etc).
|
|
origin = self.headers.get("Origin", "") or ""
|
|
referer = self.headers.get("Referer", "") or ""
|
|
allowed = ["http://127.0.0.1:8088", "http://localhost:8088"]
|
|
dh = os.environ.get("DASHBOARD_HOST", "").strip()
|
|
if dh:
|
|
allowed.append(dh)
|
|
if origin or referer:
|
|
check = origin or referer
|
|
if not any(check.startswith(a) for a in allowed):
|
|
body = b'{"error":"CSRF"}'
|
|
try:
|
|
self.send_response(403)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
pass
|
|
return
|
|
|
|
try:
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/event-stream")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("X-Accel-Buffering", "no")
|
|
self.send_header("Connection", "keep-alive")
|
|
self.end_headers()
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
return
|
|
|
|
last_sig: str | None = None
|
|
last_ping = time.monotonic()
|
|
|
|
# Initial snapshot
|
|
try:
|
|
payload = self._build_unified_payload()
|
|
last_sig = _compute_signature()
|
|
self.wfile.write(b"data: " + json.dumps(payload).encode("utf-8") + b"\n\n")
|
|
self.wfile.flush()
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
return
|
|
except Exception as exc:
|
|
try:
|
|
err = json.dumps({"error": str(exc)}).encode("utf-8")
|
|
self.wfile.write(b"data: " + err + b"\n\n")
|
|
self.wfile.flush()
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
while True:
|
|
try:
|
|
time.sleep(2)
|
|
sig = _compute_signature()
|
|
now = time.monotonic()
|
|
if sig != last_sig:
|
|
payload = self._build_unified_payload()
|
|
body = json.dumps(payload).encode("utf-8")
|
|
self.wfile.write(b"data: " + body + b"\n\n")
|
|
self.wfile.flush()
|
|
last_sig = sig
|
|
last_ping = now
|
|
elif now - last_ping >= 15:
|
|
self.wfile.write(b'data: {"type":"ping"}\n\n')
|
|
self.wfile.flush()
|
|
last_ping = now
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
return
|
|
except Exception:
|
|
# Best-effort: one bad iter shouldn't kill the stream.
|
|
continue
|
|
|
|
# ── POST /api/projects/propose ─────────────────────────────────
|
|
def handle_propose(self):
|
|
body = parse_json_body(self)
|
|
if body is None:
|
|
return # parse_json_body already sent 400
|
|
slug = (body.get("slug") or "").strip()
|
|
description = (body.get("description") or "").strip()
|
|
|
|
slug_err = validate_slug(slug)
|
|
desc_err = validate_description(description)
|
|
if slug_err or desc_err:
|
|
errors: dict[str, str] = {}
|
|
if slug_err:
|
|
errors["slug"] = slug_err
|
|
if desc_err:
|
|
errors["description"] = desc_err
|
|
self.send_json({"error": "validation_failed", "fields": errors}, 400)
|
|
return
|
|
|
|
existing = _find_project(_read_approved(), slug)
|
|
if existing:
|
|
self.send_json(
|
|
{"error": "duplicate_slug", "slug": slug, "status": existing.get("status")},
|
|
409,
|
|
)
|
|
return
|
|
|
|
def _add(data: dict) -> dict:
|
|
# Re-check inside the lock — guard against races.
|
|
if _find_project(data, slug) is not None:
|
|
raise FileExistsError(slug)
|
|
data["projects"].append({
|
|
"name": slug,
|
|
"description": description,
|
|
"status": "pending",
|
|
"planning_session_id": None,
|
|
"final_plan_path": None,
|
|
"proposed_at": _now_iso(),
|
|
"approved_at": None,
|
|
"started_at": None,
|
|
"pid": None,
|
|
})
|
|
_bump_version(data)
|
|
return data
|
|
|
|
try:
|
|
_write_approved(_add)
|
|
except FileExistsError:
|
|
self.send_json({"error": "duplicate_slug", "slug": slug}, 409)
|
|
return
|
|
except Exception as exc:
|
|
log.exception("propose failed")
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
self.send_json({"slug": slug, "status": "pending"}, 201)
|
|
|
|
# ── helpers for approve/unapprove/cancel ──────────────────────
|
|
def _read_if_match(self) -> int | None:
|
|
raw = self.headers.get("If-Match", "") or self.headers.get("if-match", "")
|
|
if not raw:
|
|
return None
|
|
raw = raw.strip().strip('"')
|
|
try:
|
|
return int(raw)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
def _set_status_locked(self, slug: str, target_status: str, **extra) -> tuple[bool, dict]:
|
|
"""Mutate approved-tasks: set status + extra fields. Bumps version.
|
|
|
|
Returns (ok, project_entry_or_error_dict).
|
|
"""
|
|
client_ver = self._read_if_match()
|
|
|
|
captured: dict = {}
|
|
|
|
def _mut(data: dict) -> dict:
|
|
current_ver = _get_version_from(data)
|
|
if client_ver is not None and current_ver != client_ver:
|
|
captured["error"] = "stale"
|
|
captured["current_version"] = current_ver
|
|
raise _StaleVersion()
|
|
proj = _find_project(data, slug)
|
|
if proj is None:
|
|
captured["error"] = "not_found"
|
|
raise KeyError(slug)
|
|
proj["status"] = target_status
|
|
for k, v in extra.items():
|
|
if v is None and k in proj:
|
|
proj[k] = None
|
|
else:
|
|
proj[k] = v
|
|
captured["project"] = proj
|
|
_bump_version(data)
|
|
return data
|
|
|
|
try:
|
|
_write_approved(_mut)
|
|
except _StaleVersion:
|
|
return False, {
|
|
"error": "stale",
|
|
"current_version": captured.get("current_version", 0),
|
|
}
|
|
except KeyError:
|
|
return False, {"error": "not_found", "slug": slug}
|
|
except Exception as exc:
|
|
log.exception("status mutate failed")
|
|
return False, {"error": str(exc)}
|
|
return True, captured.get("project", {})
|
|
|
|
# ── POST /api/projects/approve ─────────────────────────────────
|
|
def handle_approve(self):
|
|
body = parse_json_body(self)
|
|
if body is None:
|
|
return
|
|
slug = (body.get("slug") or "").strip()
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
ok, payload = self._set_status_locked(slug, "approved", approved_at=_now_iso())
|
|
if not ok:
|
|
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
|
|
self.send_json(payload, code)
|
|
return
|
|
self.send_json({"ok": True, "slug": slug, "status": "approved"})
|
|
|
|
# ── POST /api/projects/unapprove ──────────────────────────────
|
|
def handle_unapprove(self):
|
|
body = parse_json_body(self)
|
|
if body is None:
|
|
return
|
|
slug = (body.get("slug") or "").strip()
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
ok, payload = self._set_status_locked(slug, "pending", approved_at=None)
|
|
if not ok:
|
|
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
|
|
self.send_json(payload, code)
|
|
return
|
|
self.send_json({"ok": True, "slug": slug, "status": "pending"})
|
|
|
|
# ── POST /api/projects/cancel ─────────────────────────────────
|
|
def handle_cancel(self):
|
|
"""Cancel a project. Sets status=cancelled (matches router behaviour)."""
|
|
body = parse_json_body(self)
|
|
if body is None:
|
|
return
|
|
slug = (body.get("slug") or "").strip()
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
ok, payload = self._set_status_locked(slug, "cancelled")
|
|
if not ok:
|
|
code = 409 if payload.get("error") == "stale" else 404 if payload.get("error") == "not_found" else 500
|
|
self.send_json(payload, code)
|
|
return
|
|
self.send_json({"ok": True, "slug": slug, "status": "cancelled"})
|
|
|
|
# ── planning state helpers ────────────────────────────────────
|
|
@staticmethod
|
|
def _planning_key(slug: str) -> tuple[str, str]:
|
|
"""Dashboard planning sessions live in adapter='dashboard', channel=slug."""
|
|
return ("dashboard", slug)
|
|
|
|
@staticmethod
|
|
def _resolve_planning_key(slug: str) -> tuple[str, str]:
|
|
"""Find the active session's (adapter, channel) for slug, regardless of
|
|
where it was started. Falls back to ('dashboard', slug) if nothing
|
|
matches — preserving prior behavior for the no-session case.
|
|
|
|
Picks the most-recently-updated entry when multiple exist.
|
|
"""
|
|
try:
|
|
from src.planning_session import _load_planning_state # type: ignore
|
|
except Exception:
|
|
return ("dashboard", slug)
|
|
try:
|
|
all_state = _load_planning_state()
|
|
except Exception:
|
|
return ("dashboard", slug)
|
|
|
|
matches = [
|
|
entry for entry in all_state.values()
|
|
if (entry.get("slug") or "").lower() == slug.lower()
|
|
]
|
|
if not matches:
|
|
return ("dashboard", slug)
|
|
matches.sort(key=lambda e: e.get("updated_at") or "", reverse=True)
|
|
best = matches[0]
|
|
adapter = best.get("adapter") or "dashboard"
|
|
channel = best.get("channel_id") or slug
|
|
return (adapter, channel)
|
|
|
|
# ── POST /api/projects/<slug>/plan/start ──────────────────────
|
|
def handle_plan_start(self, slug: str):
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
body = parse_json_body(self)
|
|
if body is None:
|
|
return
|
|
description = (body.get("description") or "").strip()
|
|
|
|
# Description fallback: use the existing approved-tasks description so
|
|
# users can re-open planning on a project they already proposed.
|
|
if not description:
|
|
existing = _find_project(_read_approved(), slug)
|
|
if existing:
|
|
description = (existing.get("description") or "").strip()
|
|
desc_err = validate_description(description) if description else "description required"
|
|
if desc_err:
|
|
self.send_json({"error": "invalid_description", "message": desc_err}, 400)
|
|
return
|
|
|
|
# Set status=planning + ensure entry exists.
|
|
def _mut(data: dict) -> dict:
|
|
proj = _find_project(data, slug)
|
|
if proj is None:
|
|
proj = {
|
|
"name": slug,
|
|
"description": description,
|
|
"status": "planning",
|
|
"planning_session_id": None,
|
|
"final_plan_path": None,
|
|
"proposed_at": _now_iso(),
|
|
"approved_at": None,
|
|
"started_at": None,
|
|
"pid": None,
|
|
}
|
|
data["projects"].append(proj)
|
|
else:
|
|
proj["status"] = "planning"
|
|
if not proj.get("description"):
|
|
proj["description"] = description
|
|
_bump_version(data)
|
|
return data
|
|
|
|
try:
|
|
_write_approved(_mut)
|
|
except Exception as exc:
|
|
log.exception("plan/start mutate failed")
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
|
|
adapter, channel = self._planning_key(slug)
|
|
try:
|
|
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
|
|
session, first_text = PlanningOrchestrator.start(
|
|
slug=slug,
|
|
description=description,
|
|
channel_id=channel,
|
|
adapter=adapter,
|
|
)
|
|
except Exception as exc:
|
|
log.exception("PlanningOrchestrator.start failed for %s", slug)
|
|
# Roll status back so the dashboard reflects reality.
|
|
try:
|
|
self._set_status_locked(slug, "pending")
|
|
except Exception:
|
|
pass
|
|
self.send_json({"error": "planning_start_failed", "message": str(exc)}, 500)
|
|
return
|
|
|
|
# Stash planning_session_id on the entry.
|
|
def _stash(data: dict) -> dict:
|
|
proj = _find_project(data, slug)
|
|
if proj is not None:
|
|
proj["planning_session_id"] = session.planning_session_id
|
|
return data
|
|
|
|
try:
|
|
_write_approved(_stash)
|
|
except Exception:
|
|
pass
|
|
|
|
self.send_json({
|
|
"ok": True,
|
|
"slug": slug,
|
|
"session_id": session.planning_session_id,
|
|
"phase": session.phase,
|
|
"message": first_text,
|
|
})
|
|
|
|
# ── POST /api/projects/<slug>/plan/respond ────────────────────
|
|
def handle_plan_respond(self, slug: str):
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
body = parse_json_body(self)
|
|
if body is None:
|
|
return
|
|
message = (body.get("message") or "").strip()
|
|
if not message:
|
|
self.send_json({"error": "message required"}, 400)
|
|
return
|
|
|
|
adapter, channel = self._resolve_planning_key(slug)
|
|
try:
|
|
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
|
|
session, text, phase_ready = PlanningOrchestrator.respond(
|
|
adapter=adapter, channel_id=channel, message=message,
|
|
)
|
|
except Exception as exc:
|
|
log.exception("plan/respond failed for %s", slug)
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
|
|
if session is None:
|
|
self.send_json(
|
|
{"error": "no_active_session", "message": "Nu există o sesiune activă pentru acest slug."},
|
|
404,
|
|
)
|
|
return
|
|
self.send_json({
|
|
"ok": True,
|
|
"slug": slug,
|
|
"phase": session.phase,
|
|
"phase_ready": phase_ready,
|
|
"message": text,
|
|
})
|
|
|
|
# ── GET /api/projects/<slug>/plan/state ───────────────────────
|
|
def handle_plan_state(self, slug: str):
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
adapter, channel = self._planning_key(slug)
|
|
try:
|
|
from src.planning_session import get_planning_state # type: ignore
|
|
except Exception as exc:
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
state = get_planning_state(adapter, channel)
|
|
# Also try cross-adapter fallback so the dashboard surfaces sessions
|
|
# started via discord/telegram.
|
|
if state is None:
|
|
try:
|
|
from src.planning_session import _load_planning_state # type: ignore
|
|
all_state = _load_planning_state()
|
|
for _k, entry in all_state.items():
|
|
if (entry.get("slug") or "").lower() == slug.lower():
|
|
state = entry
|
|
break
|
|
except Exception:
|
|
pass
|
|
if state is None:
|
|
self.send_json({"slug": slug, "status": "expired", "phase": None, "pending_response": False})
|
|
return
|
|
# Heuristic: phase=='__complete__' means the pipeline finished.
|
|
phase = state.get("phase")
|
|
completed = phase == "__complete__"
|
|
self.send_json({
|
|
"slug": slug,
|
|
"status": "complete" if completed else "active",
|
|
"phase": phase,
|
|
"phases_completed": state.get("phases_completed") or [],
|
|
"phases_planned": state.get("phases_planned") or [],
|
|
"pending_response": False,
|
|
"since": state.get("started_at"),
|
|
"updated_at": state.get("updated_at"),
|
|
"session_id": state.get("session_id"),
|
|
"planning_session_id": state.get("planning_session_id"),
|
|
"last_text_excerpt": state.get("last_text_excerpt") or "",
|
|
"final_plan_path": state.get("final_plan_path"),
|
|
})
|
|
|
|
# ── GET /api/projects/<slug>/plan/transcript ──────────────────
|
|
def handle_plan_transcript(self, slug: str):
|
|
"""Return planning transcript.
|
|
|
|
The planning subprocess persists only the latest excerpt + metadata
|
|
in `sessions/planning.json` (full transcripts live inside Claude CLI
|
|
session state). We return what's available, plus the final-plan
|
|
markdown if it has been written.
|
|
"""
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
adapter, channel = self._planning_key(slug)
|
|
try:
|
|
from src.planning_session import get_planning_state, _load_planning_state # type: ignore
|
|
except Exception as exc:
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
state = get_planning_state(adapter, channel)
|
|
if state is None:
|
|
try:
|
|
all_state = _load_planning_state()
|
|
for _k, entry in all_state.items():
|
|
if (entry.get("slug") or "").lower() == slug.lower():
|
|
state = entry
|
|
break
|
|
except Exception:
|
|
pass
|
|
final_plan_text = ""
|
|
final_plan_path = None
|
|
try:
|
|
fp = constants.WORKSPACE_DIR / slug / "scripts" / "ralph" / "final-plan.md"
|
|
if fp.exists():
|
|
final_plan_path = str(fp)
|
|
final_plan_text = fp.read_text(encoding="utf-8", errors="replace")
|
|
except OSError:
|
|
pass
|
|
self.send_json({
|
|
"slug": slug,
|
|
"phase": (state or {}).get("phase"),
|
|
"phases_completed": (state or {}).get("phases_completed") or [],
|
|
"last_text_excerpt": (state or {}).get("last_text_excerpt") or "",
|
|
"final_plan_path": final_plan_path,
|
|
"final_plan": final_plan_text,
|
|
"history": [], # reserved for future per-turn JSONL sidecar
|
|
})
|
|
|
|
# ── POST /api/projects/<slug>/plan/finalize ───────────────────
|
|
def handle_plan_finalize(self, slug: str):
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
adapter, channel = self._resolve_planning_key(slug)
|
|
try:
|
|
from src.planning_session import get_planning_state, clear_planning_state # type: ignore
|
|
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
|
|
except Exception as exc:
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
|
|
state = get_planning_state(adapter, channel)
|
|
final_plan = None
|
|
if state and state.get("final_plan_path"):
|
|
final_plan = state["final_plan_path"]
|
|
else:
|
|
final_plan = str(PlanningOrchestrator.final_plan_path(slug))
|
|
|
|
def _mut(data: dict) -> dict:
|
|
proj = _find_project(data, slug)
|
|
if proj is None:
|
|
raise KeyError(slug)
|
|
proj["status"] = "approved"
|
|
proj["approved_at"] = _now_iso()
|
|
proj["planning_session_id"] = None
|
|
proj["final_plan_path"] = final_plan
|
|
_bump_version(data)
|
|
return data
|
|
|
|
try:
|
|
_write_approved(_mut)
|
|
except KeyError:
|
|
self.send_json({"error": "not_found", "slug": slug}, 404)
|
|
return
|
|
except Exception as exc:
|
|
log.exception("plan/finalize mutate failed")
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
|
|
clear_planning_state(adapter, channel)
|
|
self.send_json({
|
|
"ok": True,
|
|
"slug": slug,
|
|
"status": "approved",
|
|
"final_plan_path": final_plan,
|
|
})
|
|
|
|
# ── POST /api/projects/<slug>/plan/cancel ─────────────────────
|
|
def handle_plan_cancel_planning(self, slug: str):
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
adapter, channel = self._resolve_planning_key(slug)
|
|
try:
|
|
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
|
|
except Exception as exc:
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
|
|
cleared = PlanningOrchestrator.cancel(adapter, channel)
|
|
# Revert status to pending if currently planning.
|
|
def _mut(data: dict) -> dict:
|
|
proj = _find_project(data, slug)
|
|
if proj is not None and (proj.get("status") or "").lower() == "planning":
|
|
proj["status"] = "pending"
|
|
proj["planning_session_id"] = None
|
|
_bump_version(data)
|
|
return data
|
|
|
|
try:
|
|
_write_approved(_mut)
|
|
except Exception:
|
|
pass
|
|
self.send_json({"ok": True, "slug": slug, "cleared": cleared, "status": "pending"})
|
|
|
|
# ── POST /api/projects/<slug>/plan/advance ────────────────────
|
|
def handle_plan_advance(self, slug: str):
|
|
if validate_slug(slug):
|
|
self.send_json({"error": "invalid_slug"}, 400)
|
|
return
|
|
adapter, channel = self._resolve_planning_key(slug)
|
|
try:
|
|
from src.planning_orchestrator import PlanningOrchestrator # type: ignore
|
|
session, text, completed = PlanningOrchestrator.advance(adapter, channel)
|
|
except Exception as exc:
|
|
log.exception("plan/advance failed for %s", slug)
|
|
self.send_json({"error": str(exc)}, 500)
|
|
return
|
|
if session is None:
|
|
self.send_json({"error": "no_active_session", "message": text}, 404)
|
|
return
|
|
self.send_json({
|
|
"ok": True,
|
|
"slug": slug,
|
|
"phase": session.phase,
|
|
"completed": completed,
|
|
"message": text,
|
|
})
|
|
|
|
|