Three fixes that together restore the planning UX:
- Dashboard reopen showed only a 500-char truncated excerpt of the last
assistant message. Backend now reads the Claude session JSONL directly
and returns full per-turn history; frontend iterates and renders all
bubbles, falling back to last_text_excerpt when the JSONL is missing.
- Phases never advanced because the agent ran /plan-* skills inline as
tool calls and the marker protocol was loose. Tightened the planning
prompt (mandatory PHASE_STATUS marker on the last line of every turn,
ban on inline phase invocation), and the frontend now auto-calls
/plan/advance when phase_ready=true.
- The phase strip never showed visual state because data-phase values
("office-hours") didn't match orchestrator phase names ("/office-hours").
Added normalizePhase + cleanup of PHASE_STATUS markers from rendered
bubbles.
Also bumps eco.py session-content truncation from 2k to 20k so /eco
session views aren't cut mid-response either.
Bumps last_text_excerpt fallback in planning_session.py from 500 to
50_000 so even when the JSONL is unavailable, the bubble isn't sliced
mid-word.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
480 lines
17 KiB
Python
480 lines
17 KiB
Python
"""PlanningSession — Claude CLI wrapper for conversational planning phases.
|
||
|
||
Per the Echo Core conversational planning agent plan (W2), this is intentionally
|
||
a SEPARATE class from the chat session — NOT a `mode=string` parameter on
|
||
`ClaudeSession`. The plan calls it "PlanningSession(ClaudeSession) ca SUBCLASĂ".
|
||
Since `claude_session.py` exposes module-level functions (not a class) we
|
||
implement PlanningSession as a sibling class that REUSES the shared subprocess
|
||
helpers (`_run_claude`, `_safe_env`, `CLAUDE_BIN`, `SESSIONS_DIR`) but keeps:
|
||
|
||
- its own state file (`sessions/planning.json`)
|
||
- its own system prompt (`prompts/planning_agent.md`)
|
||
- per-slug working directory (`~/workspace/<slug>/`)
|
||
- `--add-dir` flags for skills + gstack project artifacts
|
||
- `--max-turns 20` default with retry on `error_max_turns`
|
||
|
||
Spike findings (`tasks/spike-planning-findings.md`):
|
||
- `claude -p '/skill'` → text serialization of AskUserQuestion. ✅
|
||
- `claude --resume <id> -p '<reply>'` round-trip preserves context. ✅
|
||
- Complex prompts can blow turn budget → MUST handle `error_max_turns`.
|
||
- Cost ~ $0.5–1.1/turn Opus 4.7 1M; Marius on subscription so ignore USD.
|
||
|
||
Architectural decisions captured for the W2 commit message:
|
||
1. Separate class (not mode parameter) — clean separation, easy to remove
|
||
planning entirely without touching chat session.
|
||
2. Fresh subprocess PER skill phase, NOT a single resumed session — phases
|
||
coordinate via disk artifacts (gstack convention:
|
||
`~/.gstack/projects/<slug>/{user}-{branch}-{phase}-*.md`).
|
||
3. State per `(adapter, channel)` keyed string — same convention as
|
||
`claude_session.active.json`. Re-resume on restart is supported via
|
||
`claude --resume <stored_id>`.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Callable
|
||
|
||
from src.claude_session import (
|
||
CLAUDE_BIN,
|
||
PROJECT_ROOT,
|
||
SESSIONS_DIR,
|
||
_run_claude,
|
||
_safe_env,
|
||
)
|
||
from src.jsonlock import read_locked, write_locked
|
||
|
||
logger = logging.getLogger(__name__)
|
||
_invoke_log = logging.getLogger("echo-core.invoke")
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
PLANNING_STATE_FILE = SESSIONS_DIR / "planning.json"
|
||
PROMPTS_DIR = PROJECT_ROOT / "prompts"
|
||
PLANNING_PROMPT_FILE = PROMPTS_DIR / "planning_agent.md"
|
||
|
||
# Roots scoped into each planning subprocess via --add-dir
|
||
WORKSPACE_ROOT = Path("/home/moltbot/workspace")
|
||
GSTACK_PROJECTS_ROOT = Path.home() / ".gstack" / "projects"
|
||
SKILLS_ROOT = Path.home() / ".claude" / "skills"
|
||
|
||
# Spike: prompts deep-tool-use can blow small budgets; 20 default with retry.
|
||
DEFAULT_MAX_TURNS = 20
|
||
RETRY_MAX_TURNS = 30 # boost on `error_max_turns`
|
||
DEFAULT_TIMEOUT = 600 # seconds — planning turns are slower than chat
|
||
|
||
# Marker the planning agent emits when a phase is conceptually done.
|
||
# Orchestrator scans for this to decide when to surface the "Continuă faza"
|
||
# button. Convention pinned in `prompts/planning_agent.md`.
|
||
PHASE_READY_MARKER = "PHASE_STATUS: ready_to_advance"
|
||
PHASE_NEEDS_INPUT_MARKER = "PHASE_STATUS: needs_input"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Disk state — sessions/planning.json
|
||
# Schema:
|
||
# {
|
||
# "<adapter>:<channel_id>": {
|
||
# "slug": "...",
|
||
# "description": "...",
|
||
# "phase": "/office-hours" | "/plan-ceo-review" | ...,
|
||
# "phases_completed": ["/office-hours", ...],
|
||
# "session_id": "<claude session uuid>",
|
||
# "planning_session_id": "<echo internal uuid>",
|
||
# "started_at": "...",
|
||
# "updated_at": "...",
|
||
# "last_text_excerpt": "...", # ≤50K char fallback excerpt; full transcript lives in Claude's session JSONL
|
||
# "last_subtype": "success" | "error_max_turns" | ...,
|
||
# }
|
||
# }
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _channel_key(adapter: str, channel_id: str) -> str:
|
||
return f"{adapter}:{channel_id}"
|
||
|
||
|
||
def _load_planning_state() -> dict:
|
||
"""Load planning sessions from disk under a shared flock. Returns {} if missing."""
|
||
try:
|
||
return read_locked(str(PLANNING_STATE_FILE))
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return {}
|
||
|
||
|
||
def _save_planning_state(data: dict) -> None:
|
||
"""Persist planning sessions under an exclusive flock + atomic replace."""
|
||
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
|
||
write_locked(str(PLANNING_STATE_FILE), lambda _existing: data)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# System prompt
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def build_planning_system_prompt(slug: str, description: str, phase: str) -> str:
|
||
"""Render `prompts/planning_agent.md` with phase-specific values.
|
||
|
||
Returns empty string if the prompt file does not exist (skill-only mode).
|
||
"""
|
||
if not PLANNING_PROMPT_FILE.exists():
|
||
logger.warning(
|
||
"Planning prompt missing: %s — falling back to skill-only mode.",
|
||
PLANNING_PROMPT_FILE,
|
||
)
|
||
return ""
|
||
template = PLANNING_PROMPT_FILE.read_text(encoding="utf-8")
|
||
# Use simple replacement (NOT format()) — markdown contains literal `{}`
|
||
# in code blocks which would explode `.format()`.
|
||
return (
|
||
template
|
||
.replace("{slug}", slug)
|
||
.replace("{description}", description)
|
||
.replace("{phase}", phase)
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# PlanningSession class
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class PlanningSession:
|
||
"""One Claude CLI subprocess scoped to a planning phase.
|
||
|
||
Lifecycle:
|
||
1. ``PlanningSession.start(slug, description, phase, channel, adapter)``
|
||
— fresh subprocess; first prompt is the skill invocation.
|
||
2. ``session.respond(message)`` — `claude --resume <session_id>`
|
||
per user reply. Returns response text + retry hint.
|
||
3. ``session.is_phase_ready()`` — True when output contains
|
||
``PHASE_STATUS: ready_to_advance`` (orchestrator advances).
|
||
4. State persisted in `sessions/planning.json` so restart is recoverable.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
slug: str,
|
||
description: str,
|
||
phase: str,
|
||
channel_id: str,
|
||
adapter: str = "echo",
|
||
session_id: str | None = None,
|
||
planning_session_id: str | None = None,
|
||
):
|
||
self.slug = slug
|
||
self.description = description
|
||
self.phase = phase
|
||
self.channel_id = channel_id
|
||
self.adapter = adapter
|
||
self.session_id = session_id
|
||
self.planning_session_id = planning_session_id or str(uuid.uuid4())
|
||
self._last_response: str = ""
|
||
self._last_subtype: str = ""
|
||
self._last_is_error: bool = False
|
||
|
||
# -- working directory & --add-dir scoping ------------------------------
|
||
|
||
@property
|
||
def cwd(self) -> Path:
|
||
"""Working directory for the subprocess.
|
||
|
||
Uses `~/workspace/<slug>/` if it exists; otherwise falls back to
|
||
Echo Core repo root (test mode / pre-clone scenarios).
|
||
"""
|
||
target = WORKSPACE_ROOT / self.slug
|
||
if target.is_dir():
|
||
return target
|
||
return PROJECT_ROOT
|
||
|
||
def _add_dirs(self) -> list[str]:
|
||
"""Build `--add-dir` arguments. Skip dirs that don't exist."""
|
||
candidates = [
|
||
SKILLS_ROOT,
|
||
GSTACK_PROJECTS_ROOT / self.slug,
|
||
GSTACK_PROJECTS_ROOT, # fallback in case slug-specific dir missing
|
||
]
|
||
seen: set[str] = set()
|
||
flags: list[str] = []
|
||
for d in candidates:
|
||
if d.exists() and str(d) not in seen:
|
||
flags.extend(["--add-dir", str(d)])
|
||
seen.add(str(d))
|
||
return flags
|
||
|
||
# -- command construction ----------------------------------------------
|
||
|
||
def _build_cmd(
|
||
self,
|
||
prompt: str,
|
||
*,
|
||
resume: str | None,
|
||
max_turns: int,
|
||
with_system_prompt: bool,
|
||
) -> list[str]:
|
||
cmd = [CLAUDE_BIN, "-p", prompt]
|
||
if resume:
|
||
cmd += ["--resume", resume]
|
||
cmd += [
|
||
"--output-format", "stream-json",
|
||
"--verbose",
|
||
"--max-turns", str(max_turns),
|
||
]
|
||
if with_system_prompt:
|
||
sys_prompt = build_planning_system_prompt(
|
||
self.slug, self.description, self.phase
|
||
)
|
||
if sys_prompt:
|
||
cmd += ["--system-prompt", sys_prompt]
|
||
cmd += self._add_dirs()
|
||
cmd += ["--dangerously-skip-permissions"]
|
||
return cmd
|
||
|
||
# -- subprocess invocation ---------------------------------------------
|
||
|
||
def _invoke(
|
||
self,
|
||
prompt: str,
|
||
*,
|
||
resume: str | None,
|
||
timeout: int,
|
||
max_turns: int,
|
||
with_system_prompt: bool,
|
||
on_text: Callable[[str], None] | None,
|
||
) -> dict:
|
||
cmd = self._build_cmd(
|
||
prompt,
|
||
resume=resume,
|
||
max_turns=max_turns,
|
||
with_system_prompt=with_system_prompt,
|
||
)
|
||
_t0 = time.monotonic()
|
||
result = _run_claude(cmd, timeout=timeout, on_text=on_text, cwd=self.cwd)
|
||
_elapsed = int((time.monotonic() - _t0) * 1000)
|
||
_invoke_log.info(
|
||
"planning slug=%s phase=%s adapter=%s channel=%s duration_ms=%d "
|
||
"tokens_in=%d tokens_out=%d session=%s subtype=%s cost=%.4f",
|
||
self.slug, self.phase, self.adapter, self.channel_id, _elapsed,
|
||
result.get("usage", {}).get("input_tokens", 0),
|
||
result.get("usage", {}).get("output_tokens", 0),
|
||
(result.get("session_id") or "")[:8],
|
||
result.get("subtype", ""),
|
||
float(result.get("total_cost_usd", 0) or 0),
|
||
)
|
||
return result
|
||
|
||
# -- public API: start/resume ------------------------------------------
|
||
|
||
@classmethod
|
||
def start(
|
||
cls,
|
||
slug: str,
|
||
description: str,
|
||
phase: str,
|
||
channel_id: str,
|
||
adapter: str = "echo",
|
||
timeout: int = DEFAULT_TIMEOUT,
|
||
on_text: Callable[[str], None] | None = None,
|
||
) -> "PlanningSession":
|
||
"""Kick off a new phase subprocess. First prompt is the skill call.
|
||
|
||
Returns a `PlanningSession` with `session_id`, `_last_response` set.
|
||
Persists state in `sessions/planning.json` keyed by `(adapter, channel_id)`.
|
||
Retries once with `RETRY_MAX_TURNS` if first run hits `error_max_turns`.
|
||
"""
|
||
session = cls(
|
||
slug=slug,
|
||
description=description,
|
||
phase=phase,
|
||
channel_id=channel_id,
|
||
adapter=adapter,
|
||
)
|
||
# Compose initial prompt — skill name + slug + description so the skill
|
||
# has enough hook to start.
|
||
initial_prompt = f"{phase} {description}".strip()
|
||
|
||
result = session._invoke(
|
||
initial_prompt,
|
||
resume=None,
|
||
timeout=timeout,
|
||
max_turns=DEFAULT_MAX_TURNS,
|
||
with_system_prompt=True,
|
||
on_text=on_text,
|
||
)
|
||
# Retry on error_max_turns — spike found this happens with deep tool-use.
|
||
if result.get("subtype") == "error_max_turns" and not result.get("session_id"):
|
||
logger.warning(
|
||
"planning start hit error_max_turns for %s/%s — retrying with %d turns",
|
||
slug, phase, RETRY_MAX_TURNS,
|
||
)
|
||
result = session._invoke(
|
||
initial_prompt,
|
||
resume=None,
|
||
timeout=timeout,
|
||
max_turns=RETRY_MAX_TURNS,
|
||
with_system_prompt=True,
|
||
on_text=on_text,
|
||
)
|
||
|
||
session.session_id = result.get("session_id") or None
|
||
session._last_response = result.get("result", "")
|
||
session._last_subtype = result.get("subtype", "")
|
||
session._last_is_error = bool(result.get("is_error", False))
|
||
session._persist(action="start", cost_usd=float(result.get("total_cost_usd", 0) or 0))
|
||
return session
|
||
|
||
def respond(
|
||
self,
|
||
message: str,
|
||
*,
|
||
timeout: int = DEFAULT_TIMEOUT,
|
||
on_text: Callable[[str], None] | None = None,
|
||
) -> str:
|
||
"""Send the user's reply to the running phase session via `--resume`.
|
||
|
||
Returns the response text. Updates persistent state.
|
||
"""
|
||
if not self.session_id:
|
||
raise RuntimeError(
|
||
"PlanningSession.respond called without an active session_id"
|
||
)
|
||
wrapped = f"[EXTERNAL CONTENT]\n{message}\n[END EXTERNAL CONTENT]"
|
||
result = self._invoke(
|
||
wrapped,
|
||
resume=self.session_id,
|
||
timeout=timeout,
|
||
max_turns=DEFAULT_MAX_TURNS,
|
||
with_system_prompt=False, # already in session
|
||
on_text=on_text,
|
||
)
|
||
# Retry once on error_max_turns
|
||
if result.get("subtype") == "error_max_turns":
|
||
logger.warning(
|
||
"planning respond hit error_max_turns for %s/%s — retrying",
|
||
self.slug, self.phase,
|
||
)
|
||
result = self._invoke(
|
||
wrapped,
|
||
resume=self.session_id,
|
||
timeout=timeout,
|
||
max_turns=RETRY_MAX_TURNS,
|
||
with_system_prompt=False,
|
||
on_text=on_text,
|
||
)
|
||
|
||
self._last_response = result.get("result", "")
|
||
self._last_subtype = result.get("subtype", "")
|
||
self._last_is_error = bool(result.get("is_error", False))
|
||
self._persist(
|
||
action="respond", cost_usd=float(result.get("total_cost_usd", 0) or 0)
|
||
)
|
||
return self._last_response
|
||
|
||
# -- introspection ------------------------------------------------------
|
||
|
||
def is_phase_ready(self) -> bool:
|
||
"""True if last response contained the ready-to-advance marker."""
|
||
return PHASE_READY_MARKER in (self._last_response or "")
|
||
|
||
@property
|
||
def last_response(self) -> str:
|
||
return self._last_response
|
||
|
||
@property
|
||
def last_subtype(self) -> str:
|
||
return self._last_subtype
|
||
|
||
# -- persistence --------------------------------------------------------
|
||
|
||
def _persist(self, *, action: str, cost_usd: float = 0.0) -> None:
|
||
data = _load_planning_state()
|
||
key = _channel_key(self.adapter, self.channel_id)
|
||
existing = data.get(key, {})
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
|
||
phases_completed = existing.get("phases_completed", [])
|
||
# If this session changed phase, the orchestrator handles transition;
|
||
# we just keep our own slot consistent with the current phase.
|
||
entry = {
|
||
"slug": self.slug,
|
||
"description": self.description,
|
||
"phase": self.phase,
|
||
"phases_completed": phases_completed,
|
||
"session_id": self.session_id,
|
||
"planning_session_id": self.planning_session_id,
|
||
"adapter": self.adapter,
|
||
"channel_id": self.channel_id,
|
||
"started_at": existing.get("started_at", now),
|
||
"updated_at": now,
|
||
"last_text_excerpt": (self._last_response or "")[:50000],
|
||
"last_subtype": self._last_subtype,
|
||
"total_cost_usd": (
|
||
float(existing.get("total_cost_usd") or 0.0) + float(cost_usd or 0.0)
|
||
),
|
||
}
|
||
data[key] = entry
|
||
_save_planning_state(data)
|
||
|
||
@classmethod
|
||
def from_state(cls, adapter: str, channel_id: str) -> "PlanningSession | None":
|
||
"""Reconstruct a session from `sessions/planning.json` (post-restart)."""
|
||
data = _load_planning_state()
|
||
entry = data.get(_channel_key(adapter, channel_id))
|
||
if not entry or not entry.get("session_id"):
|
||
return None
|
||
sess = cls(
|
||
slug=entry["slug"],
|
||
description=entry.get("description", ""),
|
||
phase=entry["phase"],
|
||
channel_id=channel_id,
|
||
adapter=adapter,
|
||
session_id=entry["session_id"],
|
||
planning_session_id=entry.get("planning_session_id"),
|
||
)
|
||
sess._last_subtype = entry.get("last_subtype", "")
|
||
sess._last_response = entry.get("last_text_excerpt", "")
|
||
return sess
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Module-level helpers consumed by router/orchestrator/adapters
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def get_planning_state(adapter: str, channel_id: str) -> dict | None:
|
||
"""Return persisted planning state for a channel, or None."""
|
||
return _load_planning_state().get(_channel_key(adapter, channel_id))
|
||
|
||
|
||
def is_in_planning(adapter: str, channel_id: str) -> bool:
|
||
"""True if there is an active planning session for this channel."""
|
||
return get_planning_state(adapter, channel_id) is not None
|
||
|
||
|
||
def clear_planning_state(adapter: str, channel_id: str) -> bool:
|
||
"""Drop persisted planning state. Returns True if anything was cleared."""
|
||
data = _load_planning_state()
|
||
key = _channel_key(adapter, channel_id)
|
||
if key in data:
|
||
del data[key]
|
||
_save_planning_state(data)
|
||
return True
|
||
return False
|
||
|
||
|
||
def list_planning_sessions() -> dict:
|
||
"""Return all persisted planning sessions (for diagnostics)."""
|
||
return _load_planning_state()
|