Files
echo-core/src/planning_session.py
Marius Mutu 8432fe3150 feat(planning): full chat history + auto-advance phases
Three fixes that together restore the planning UX:

- Dashboard reopen showed only a 500-char truncated excerpt of the last
  assistant message. Backend now reads the Claude session JSONL directly
  and returns full per-turn history; frontend iterates and renders all
  bubbles, falling back to last_text_excerpt when the JSONL is missing.
- Phases never advanced because the agent ran /plan-* skills inline as
  tool calls and the marker protocol was loose. Tightened the planning
  prompt (mandatory PHASE_STATUS marker on the last line of every turn,
  ban on inline phase invocation), and the frontend now auto-calls
  /plan/advance when phase_ready=true.
- The phase strip never showed visual state because data-phase values
  ("office-hours") didn't match orchestrator phase names ("/office-hours").
  Added normalizePhase + cleanup of PHASE_STATUS markers from rendered
  bubbles.

Also bumps eco.py session-content truncation from 2k to 20k so /eco
session views aren't cut mid-response either.

Bumps last_text_excerpt fallback in planning_session.py from 500 to
50_000 so even when the JSONL is unavailable, the bubble isn't sliced
mid-word.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 07:47:10 +00:00

480 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PlanningSession — Claude CLI wrapper for conversational planning phases.
Per the Echo Core conversational planning agent plan (W2), this is intentionally
a SEPARATE class from the chat session — NOT a `mode=string` parameter on
`ClaudeSession`. The plan calls it "PlanningSession(ClaudeSession) ca SUBCLASĂ".
Since `claude_session.py` exposes module-level functions (not a class) we
implement PlanningSession as a sibling class that REUSES the shared subprocess
helpers (`_run_claude`, `_safe_env`, `CLAUDE_BIN`, `SESSIONS_DIR`) but keeps:
- its own state file (`sessions/planning.json`)
- its own system prompt (`prompts/planning_agent.md`)
- per-slug working directory (`~/workspace/<slug>/`)
- `--add-dir` flags for skills + gstack project artifacts
- `--max-turns 20` default with retry on `error_max_turns`
Spike findings (`tasks/spike-planning-findings.md`):
- `claude -p '/skill'` → text serialization of AskUserQuestion. ✅
- `claude --resume <id> -p '<reply>'` round-trip preserves context. ✅
- Complex prompts can blow turn budget → MUST handle `error_max_turns`.
- Cost ~ $0.51.1/turn Opus 4.7 1M; Marius on subscription so ignore USD.
Architectural decisions captured for the W2 commit message:
1. Separate class (not mode parameter) — clean separation, easy to remove
planning entirely without touching chat session.
2. Fresh subprocess PER skill phase, NOT a single resumed session — phases
coordinate via disk artifacts (gstack convention:
`~/.gstack/projects/<slug>/{user}-{branch}-{phase}-*.md`).
3. State per `(adapter, channel)` keyed string — same convention as
`claude_session.active.json`. Re-resume on restart is supported via
`claude --resume <stored_id>`.
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess
import threading
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from src.claude_session import (
CLAUDE_BIN,
PROJECT_ROOT,
SESSIONS_DIR,
_run_claude,
_safe_env,
)
from src.jsonlock import read_locked, write_locked
logger = logging.getLogger(__name__)
_invoke_log = logging.getLogger("echo-core.invoke")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PLANNING_STATE_FILE = SESSIONS_DIR / "planning.json"
PROMPTS_DIR = PROJECT_ROOT / "prompts"
PLANNING_PROMPT_FILE = PROMPTS_DIR / "planning_agent.md"
# Roots scoped into each planning subprocess via --add-dir
WORKSPACE_ROOT = Path("/home/moltbot/workspace")
GSTACK_PROJECTS_ROOT = Path.home() / ".gstack" / "projects"
SKILLS_ROOT = Path.home() / ".claude" / "skills"
# Spike: prompts deep-tool-use can blow small budgets; 20 default with retry.
DEFAULT_MAX_TURNS = 20
RETRY_MAX_TURNS = 30 # boost on `error_max_turns`
DEFAULT_TIMEOUT = 600 # seconds — planning turns are slower than chat
# Marker the planning agent emits when a phase is conceptually done.
# Orchestrator scans for this to decide when to surface the "Continuă faza"
# button. Convention pinned in `prompts/planning_agent.md`.
PHASE_READY_MARKER = "PHASE_STATUS: ready_to_advance"
PHASE_NEEDS_INPUT_MARKER = "PHASE_STATUS: needs_input"
# ---------------------------------------------------------------------------
# Disk state — sessions/planning.json
# Schema:
# {
# "<adapter>:<channel_id>": {
# "slug": "...",
# "description": "...",
# "phase": "/office-hours" | "/plan-ceo-review" | ...,
# "phases_completed": ["/office-hours", ...],
# "session_id": "<claude session uuid>",
# "planning_session_id": "<echo internal uuid>",
# "started_at": "...",
# "updated_at": "...",
# "last_text_excerpt": "...", # ≤50K char fallback excerpt; full transcript lives in Claude's session JSONL
# "last_subtype": "success" | "error_max_turns" | ...,
# }
# }
# ---------------------------------------------------------------------------
def _channel_key(adapter: str, channel_id: str) -> str:
return f"{adapter}:{channel_id}"
def _load_planning_state() -> dict:
"""Load planning sessions from disk under a shared flock. Returns {} if missing."""
try:
return read_locked(str(PLANNING_STATE_FILE))
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_planning_state(data: dict) -> None:
"""Persist planning sessions under an exclusive flock + atomic replace."""
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
write_locked(str(PLANNING_STATE_FILE), lambda _existing: data)
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
def build_planning_system_prompt(slug: str, description: str, phase: str) -> str:
"""Render `prompts/planning_agent.md` with phase-specific values.
Returns empty string if the prompt file does not exist (skill-only mode).
"""
if not PLANNING_PROMPT_FILE.exists():
logger.warning(
"Planning prompt missing: %s — falling back to skill-only mode.",
PLANNING_PROMPT_FILE,
)
return ""
template = PLANNING_PROMPT_FILE.read_text(encoding="utf-8")
# Use simple replacement (NOT format()) — markdown contains literal `{}`
# in code blocks which would explode `.format()`.
return (
template
.replace("{slug}", slug)
.replace("{description}", description)
.replace("{phase}", phase)
)
# ---------------------------------------------------------------------------
# PlanningSession class
# ---------------------------------------------------------------------------
class PlanningSession:
"""One Claude CLI subprocess scoped to a planning phase.
Lifecycle:
1. ``PlanningSession.start(slug, description, phase, channel, adapter)``
— fresh subprocess; first prompt is the skill invocation.
2. ``session.respond(message)`` — `claude --resume <session_id>`
per user reply. Returns response text + retry hint.
3. ``session.is_phase_ready()`` — True when output contains
``PHASE_STATUS: ready_to_advance`` (orchestrator advances).
4. State persisted in `sessions/planning.json` so restart is recoverable.
"""
def __init__(
self,
slug: str,
description: str,
phase: str,
channel_id: str,
adapter: str = "echo",
session_id: str | None = None,
planning_session_id: str | None = None,
):
self.slug = slug
self.description = description
self.phase = phase
self.channel_id = channel_id
self.adapter = adapter
self.session_id = session_id
self.planning_session_id = planning_session_id or str(uuid.uuid4())
self._last_response: str = ""
self._last_subtype: str = ""
self._last_is_error: bool = False
# -- working directory & --add-dir scoping ------------------------------
@property
def cwd(self) -> Path:
"""Working directory for the subprocess.
Uses `~/workspace/<slug>/` if it exists; otherwise falls back to
Echo Core repo root (test mode / pre-clone scenarios).
"""
target = WORKSPACE_ROOT / self.slug
if target.is_dir():
return target
return PROJECT_ROOT
def _add_dirs(self) -> list[str]:
"""Build `--add-dir` arguments. Skip dirs that don't exist."""
candidates = [
SKILLS_ROOT,
GSTACK_PROJECTS_ROOT / self.slug,
GSTACK_PROJECTS_ROOT, # fallback in case slug-specific dir missing
]
seen: set[str] = set()
flags: list[str] = []
for d in candidates:
if d.exists() and str(d) not in seen:
flags.extend(["--add-dir", str(d)])
seen.add(str(d))
return flags
# -- command construction ----------------------------------------------
def _build_cmd(
self,
prompt: str,
*,
resume: str | None,
max_turns: int,
with_system_prompt: bool,
) -> list[str]:
cmd = [CLAUDE_BIN, "-p", prompt]
if resume:
cmd += ["--resume", resume]
cmd += [
"--output-format", "stream-json",
"--verbose",
"--max-turns", str(max_turns),
]
if with_system_prompt:
sys_prompt = build_planning_system_prompt(
self.slug, self.description, self.phase
)
if sys_prompt:
cmd += ["--system-prompt", sys_prompt]
cmd += self._add_dirs()
cmd += ["--dangerously-skip-permissions"]
return cmd
# -- subprocess invocation ---------------------------------------------
def _invoke(
self,
prompt: str,
*,
resume: str | None,
timeout: int,
max_turns: int,
with_system_prompt: bool,
on_text: Callable[[str], None] | None,
) -> dict:
cmd = self._build_cmd(
prompt,
resume=resume,
max_turns=max_turns,
with_system_prompt=with_system_prompt,
)
_t0 = time.monotonic()
result = _run_claude(cmd, timeout=timeout, on_text=on_text, cwd=self.cwd)
_elapsed = int((time.monotonic() - _t0) * 1000)
_invoke_log.info(
"planning slug=%s phase=%s adapter=%s channel=%s duration_ms=%d "
"tokens_in=%d tokens_out=%d session=%s subtype=%s cost=%.4f",
self.slug, self.phase, self.adapter, self.channel_id, _elapsed,
result.get("usage", {}).get("input_tokens", 0),
result.get("usage", {}).get("output_tokens", 0),
(result.get("session_id") or "")[:8],
result.get("subtype", ""),
float(result.get("total_cost_usd", 0) or 0),
)
return result
# -- public API: start/resume ------------------------------------------
@classmethod
def start(
cls,
slug: str,
description: str,
phase: str,
channel_id: str,
adapter: str = "echo",
timeout: int = DEFAULT_TIMEOUT,
on_text: Callable[[str], None] | None = None,
) -> "PlanningSession":
"""Kick off a new phase subprocess. First prompt is the skill call.
Returns a `PlanningSession` with `session_id`, `_last_response` set.
Persists state in `sessions/planning.json` keyed by `(adapter, channel_id)`.
Retries once with `RETRY_MAX_TURNS` if first run hits `error_max_turns`.
"""
session = cls(
slug=slug,
description=description,
phase=phase,
channel_id=channel_id,
adapter=adapter,
)
# Compose initial prompt — skill name + slug + description so the skill
# has enough hook to start.
initial_prompt = f"{phase} {description}".strip()
result = session._invoke(
initial_prompt,
resume=None,
timeout=timeout,
max_turns=DEFAULT_MAX_TURNS,
with_system_prompt=True,
on_text=on_text,
)
# Retry on error_max_turns — spike found this happens with deep tool-use.
if result.get("subtype") == "error_max_turns" and not result.get("session_id"):
logger.warning(
"planning start hit error_max_turns for %s/%s — retrying with %d turns",
slug, phase, RETRY_MAX_TURNS,
)
result = session._invoke(
initial_prompt,
resume=None,
timeout=timeout,
max_turns=RETRY_MAX_TURNS,
with_system_prompt=True,
on_text=on_text,
)
session.session_id = result.get("session_id") or None
session._last_response = result.get("result", "")
session._last_subtype = result.get("subtype", "")
session._last_is_error = bool(result.get("is_error", False))
session._persist(action="start", cost_usd=float(result.get("total_cost_usd", 0) or 0))
return session
def respond(
self,
message: str,
*,
timeout: int = DEFAULT_TIMEOUT,
on_text: Callable[[str], None] | None = None,
) -> str:
"""Send the user's reply to the running phase session via `--resume`.
Returns the response text. Updates persistent state.
"""
if not self.session_id:
raise RuntimeError(
"PlanningSession.respond called without an active session_id"
)
wrapped = f"[EXTERNAL CONTENT]\n{message}\n[END EXTERNAL CONTENT]"
result = self._invoke(
wrapped,
resume=self.session_id,
timeout=timeout,
max_turns=DEFAULT_MAX_TURNS,
with_system_prompt=False, # already in session
on_text=on_text,
)
# Retry once on error_max_turns
if result.get("subtype") == "error_max_turns":
logger.warning(
"planning respond hit error_max_turns for %s/%s — retrying",
self.slug, self.phase,
)
result = self._invoke(
wrapped,
resume=self.session_id,
timeout=timeout,
max_turns=RETRY_MAX_TURNS,
with_system_prompt=False,
on_text=on_text,
)
self._last_response = result.get("result", "")
self._last_subtype = result.get("subtype", "")
self._last_is_error = bool(result.get("is_error", False))
self._persist(
action="respond", cost_usd=float(result.get("total_cost_usd", 0) or 0)
)
return self._last_response
# -- introspection ------------------------------------------------------
def is_phase_ready(self) -> bool:
"""True if last response contained the ready-to-advance marker."""
return PHASE_READY_MARKER in (self._last_response or "")
@property
def last_response(self) -> str:
return self._last_response
@property
def last_subtype(self) -> str:
return self._last_subtype
# -- persistence --------------------------------------------------------
def _persist(self, *, action: str, cost_usd: float = 0.0) -> None:
data = _load_planning_state()
key = _channel_key(self.adapter, self.channel_id)
existing = data.get(key, {})
now = datetime.now(timezone.utc).isoformat()
phases_completed = existing.get("phases_completed", [])
# If this session changed phase, the orchestrator handles transition;
# we just keep our own slot consistent with the current phase.
entry = {
"slug": self.slug,
"description": self.description,
"phase": self.phase,
"phases_completed": phases_completed,
"session_id": self.session_id,
"planning_session_id": self.planning_session_id,
"adapter": self.adapter,
"channel_id": self.channel_id,
"started_at": existing.get("started_at", now),
"updated_at": now,
"last_text_excerpt": (self._last_response or "")[:50000],
"last_subtype": self._last_subtype,
"total_cost_usd": (
float(existing.get("total_cost_usd") or 0.0) + float(cost_usd or 0.0)
),
}
data[key] = entry
_save_planning_state(data)
@classmethod
def from_state(cls, adapter: str, channel_id: str) -> "PlanningSession | None":
"""Reconstruct a session from `sessions/planning.json` (post-restart)."""
data = _load_planning_state()
entry = data.get(_channel_key(adapter, channel_id))
if not entry or not entry.get("session_id"):
return None
sess = cls(
slug=entry["slug"],
description=entry.get("description", ""),
phase=entry["phase"],
channel_id=channel_id,
adapter=adapter,
session_id=entry["session_id"],
planning_session_id=entry.get("planning_session_id"),
)
sess._last_subtype = entry.get("last_subtype", "")
sess._last_response = entry.get("last_text_excerpt", "")
return sess
# ---------------------------------------------------------------------------
# Module-level helpers consumed by router/orchestrator/adapters
# ---------------------------------------------------------------------------
def get_planning_state(adapter: str, channel_id: str) -> dict | None:
"""Return persisted planning state for a channel, or None."""
return _load_planning_state().get(_channel_key(adapter, channel_id))
def is_in_planning(adapter: str, channel_id: str) -> bool:
"""True if there is an active planning session for this channel."""
return get_planning_state(adapter, channel_id) is not None
def clear_planning_state(adapter: str, channel_id: str) -> bool:
"""Drop persisted planning state. Returns True if anything was cleared."""
data = _load_planning_state()
key = _channel_key(adapter, channel_id)
if key in data:
del data[key]
_save_planning_state(data)
return True
return False
def list_planning_sessions() -> dict:
"""Return all persisted planning sessions (for diagnostics)."""
return _load_planning_state()