"""PlanningSession — Claude CLI wrapper for conversational planning phases. Per the Echo Core conversational planning agent plan (W2), this is intentionally a SEPARATE class from the chat session — NOT a `mode=string` parameter on `ClaudeSession`. The plan calls it "PlanningSession(ClaudeSession) ca SUBCLASĂ". Since `claude_session.py` exposes module-level functions (not a class) we implement PlanningSession as a sibling class that REUSES the shared subprocess helpers (`_run_claude`, `_safe_env`, `CLAUDE_BIN`, `SESSIONS_DIR`) but keeps: - its own state file (`sessions/planning.json`) - its own system prompt (`prompts/planning_agent.md`) - per-slug working directory (`~/workspace//`) - `--add-dir` flags for skills + gstack project artifacts - `--max-turns 20` default with retry on `error_max_turns` Spike findings (`tasks/spike-planning-findings.md`): - `claude -p '/skill'` → text serialization of AskUserQuestion. ✅ - `claude --resume -p ''` round-trip preserves context. ✅ - Complex prompts can blow turn budget → MUST handle `error_max_turns`. - Cost ~ $0.5–1.1/turn Opus 4.7 1M; Marius on subscription so ignore USD. Architectural decisions captured for the W2 commit message: 1. Separate class (not mode parameter) — clean separation, easy to remove planning entirely without touching chat session. 2. Fresh subprocess PER skill phase, NOT a single resumed session — phases coordinate via disk artifacts (gstack convention: `~/.gstack/projects//{user}-{branch}-{phase}-*.md`). 3. State per `(adapter, channel)` keyed string — same convention as `claude_session.active.json`. Re-resume on restart is supported via `claude --resume `. """ from __future__ import annotations import json import logging import os import shutil import subprocess import tempfile import threading import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Callable from src.claude_session import ( CLAUDE_BIN, PROJECT_ROOT, SESSIONS_DIR, _run_claude, _safe_env, ) logger = logging.getLogger(__name__) _invoke_log = logging.getLogger("echo-core.invoke") # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- PLANNING_STATE_FILE = SESSIONS_DIR / "planning.json" PROMPTS_DIR = PROJECT_ROOT / "prompts" PLANNING_PROMPT_FILE = PROMPTS_DIR / "planning_agent.md" # Roots scoped into each planning subprocess via --add-dir WORKSPACE_ROOT = Path("/home/moltbot/workspace") GSTACK_PROJECTS_ROOT = Path.home() / ".gstack" / "projects" SKILLS_ROOT = Path.home() / ".claude" / "skills" # Spike: prompts deep-tool-use can blow small budgets; 20 default with retry. DEFAULT_MAX_TURNS = 20 RETRY_MAX_TURNS = 30 # boost on `error_max_turns` DEFAULT_TIMEOUT = 600 # seconds — planning turns are slower than chat # Marker the planning agent emits when a phase is conceptually done. # Orchestrator scans for this to decide when to surface the "Continuă faza" # button. Convention pinned in `prompts/planning_agent.md`. PHASE_READY_MARKER = "PHASE_STATUS: ready_to_advance" PHASE_NEEDS_INPUT_MARKER = "PHASE_STATUS: needs_input" # --------------------------------------------------------------------------- # Disk state — sessions/planning.json # Schema: # { # ":": { # "slug": "...", # "description": "...", # "phase": "/office-hours" | "/plan-ceo-review" | ..., # "phases_completed": ["/office-hours", ...], # "session_id": "", # "planning_session_id": "", # "started_at": "...", # "updated_at": "...", # "last_text_excerpt": "...", # 500 char excerpt for debugging # "last_subtype": "success" | "error_max_turns" | ..., # } # } # --------------------------------------------------------------------------- def _channel_key(adapter: str, channel_id: str) -> str: return f"{adapter}:{channel_id}" def _load_planning_state() -> dict: """Load planning sessions from disk. Returns {} if missing or empty.""" try: text = PLANNING_STATE_FILE.read_text(encoding="utf-8") if not text.strip(): return {} return json.loads(text) except (FileNotFoundError, json.JSONDecodeError): return {} def _save_planning_state(data: dict) -> None: """Atomically write planning sessions via tempfile + os.replace.""" SESSIONS_DIR.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp( dir=SESSIONS_DIR, prefix=".planning_", suffix=".json" ) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") os.replace(tmp_path, PLANNING_STATE_FILE) except BaseException: try: os.unlink(tmp_path) except OSError: pass raise # --------------------------------------------------------------------------- # System prompt # --------------------------------------------------------------------------- def build_planning_system_prompt(slug: str, description: str, phase: str) -> str: """Render `prompts/planning_agent.md` with phase-specific values. Returns empty string if the prompt file does not exist (skill-only mode). """ if not PLANNING_PROMPT_FILE.exists(): logger.warning( "Planning prompt missing: %s — falling back to skill-only mode.", PLANNING_PROMPT_FILE, ) return "" template = PLANNING_PROMPT_FILE.read_text(encoding="utf-8") # Use simple replacement (NOT format()) — markdown contains literal `{}` # in code blocks which would explode `.format()`. return ( template .replace("{slug}", slug) .replace("{description}", description) .replace("{phase}", phase) ) # --------------------------------------------------------------------------- # PlanningSession class # --------------------------------------------------------------------------- class PlanningSession: """One Claude CLI subprocess scoped to a planning phase. Lifecycle: 1. ``PlanningSession.start(slug, description, phase, channel, adapter)`` — fresh subprocess; first prompt is the skill invocation. 2. ``session.respond(message)`` — `claude --resume ` per user reply. Returns response text + retry hint. 3. ``session.is_phase_ready()`` — True when output contains ``PHASE_STATUS: ready_to_advance`` (orchestrator advances). 4. State persisted in `sessions/planning.json` so restart is recoverable. """ def __init__( self, slug: str, description: str, phase: str, channel_id: str, adapter: str = "echo", session_id: str | None = None, planning_session_id: str | None = None, ): self.slug = slug self.description = description self.phase = phase self.channel_id = channel_id self.adapter = adapter self.session_id = session_id self.planning_session_id = planning_session_id or str(uuid.uuid4()) self._last_response: str = "" self._last_subtype: str = "" self._last_is_error: bool = False # -- working directory & --add-dir scoping ------------------------------ @property def cwd(self) -> Path: """Working directory for the subprocess. Uses `~/workspace//` if it exists; otherwise falls back to Echo Core repo root (test mode / pre-clone scenarios). """ target = WORKSPACE_ROOT / self.slug if target.is_dir(): return target return PROJECT_ROOT def _add_dirs(self) -> list[str]: """Build `--add-dir` arguments. Skip dirs that don't exist.""" candidates = [ SKILLS_ROOT, GSTACK_PROJECTS_ROOT / self.slug, GSTACK_PROJECTS_ROOT, # fallback in case slug-specific dir missing ] seen: set[str] = set() flags: list[str] = [] for d in candidates: if d.exists() and str(d) not in seen: flags.extend(["--add-dir", str(d)]) seen.add(str(d)) return flags # -- command construction ---------------------------------------------- def _build_cmd( self, prompt: str, *, resume: str | None, max_turns: int, with_system_prompt: bool, ) -> list[str]: cmd = [CLAUDE_BIN, "-p", prompt] if resume: cmd += ["--resume", resume] cmd += [ "--output-format", "stream-json", "--verbose", "--max-turns", str(max_turns), ] if with_system_prompt: sys_prompt = build_planning_system_prompt( self.slug, self.description, self.phase ) if sys_prompt: cmd += ["--system-prompt", sys_prompt] cmd += self._add_dirs() cmd += ["--dangerously-skip-permissions"] return cmd # -- subprocess invocation --------------------------------------------- def _invoke( self, prompt: str, *, resume: str | None, timeout: int, max_turns: int, with_system_prompt: bool, on_text: Callable[[str], None] | None, ) -> dict: cmd = self._build_cmd( prompt, resume=resume, max_turns=max_turns, with_system_prompt=with_system_prompt, ) _t0 = time.monotonic() result = _run_claude(cmd, timeout=timeout, on_text=on_text, cwd=self.cwd) _elapsed = int((time.monotonic() - _t0) * 1000) _invoke_log.info( "planning slug=%s phase=%s adapter=%s channel=%s duration_ms=%d " "tokens_in=%d tokens_out=%d session=%s subtype=%s cost=%.4f", self.slug, self.phase, self.adapter, self.channel_id, _elapsed, result.get("usage", {}).get("input_tokens", 0), result.get("usage", {}).get("output_tokens", 0), (result.get("session_id") or "")[:8], result.get("subtype", ""), float(result.get("total_cost_usd", 0) or 0), ) return result # -- public API: start/resume ------------------------------------------ @classmethod def start( cls, slug: str, description: str, phase: str, channel_id: str, adapter: str = "echo", timeout: int = DEFAULT_TIMEOUT, on_text: Callable[[str], None] | None = None, ) -> "PlanningSession": """Kick off a new phase subprocess. First prompt is the skill call. Returns a `PlanningSession` with `session_id`, `_last_response` set. Persists state in `sessions/planning.json` keyed by `(adapter, channel_id)`. Retries once with `RETRY_MAX_TURNS` if first run hits `error_max_turns`. """ session = cls( slug=slug, description=description, phase=phase, channel_id=channel_id, adapter=adapter, ) # Compose initial prompt — skill name + slug + description so the skill # has enough hook to start. initial_prompt = f"{phase} {description}".strip() result = session._invoke( initial_prompt, resume=None, timeout=timeout, max_turns=DEFAULT_MAX_TURNS, with_system_prompt=True, on_text=on_text, ) # Retry on error_max_turns — spike found this happens with deep tool-use. if result.get("subtype") == "error_max_turns" and not result.get("session_id"): logger.warning( "planning start hit error_max_turns for %s/%s — retrying with %d turns", slug, phase, RETRY_MAX_TURNS, ) result = session._invoke( initial_prompt, resume=None, timeout=timeout, max_turns=RETRY_MAX_TURNS, with_system_prompt=True, on_text=on_text, ) session.session_id = result.get("session_id") or None session._last_response = result.get("result", "") session._last_subtype = result.get("subtype", "") session._last_is_error = bool(result.get("is_error", False)) session._persist(action="start", cost_usd=float(result.get("total_cost_usd", 0) or 0)) return session def respond( self, message: str, *, timeout: int = DEFAULT_TIMEOUT, on_text: Callable[[str], None] | None = None, ) -> str: """Send the user's reply to the running phase session via `--resume`. Returns the response text. Updates persistent state. """ if not self.session_id: raise RuntimeError( "PlanningSession.respond called without an active session_id" ) wrapped = f"[EXTERNAL CONTENT]\n{message}\n[END EXTERNAL CONTENT]" result = self._invoke( wrapped, resume=self.session_id, timeout=timeout, max_turns=DEFAULT_MAX_TURNS, with_system_prompt=False, # already in session on_text=on_text, ) # Retry once on error_max_turns if result.get("subtype") == "error_max_turns": logger.warning( "planning respond hit error_max_turns for %s/%s — retrying", self.slug, self.phase, ) result = self._invoke( wrapped, resume=self.session_id, timeout=timeout, max_turns=RETRY_MAX_TURNS, with_system_prompt=False, on_text=on_text, ) self._last_response = result.get("result", "") self._last_subtype = result.get("subtype", "") self._last_is_error = bool(result.get("is_error", False)) self._persist( action="respond", cost_usd=float(result.get("total_cost_usd", 0) or 0) ) return self._last_response # -- introspection ------------------------------------------------------ def is_phase_ready(self) -> bool: """True if last response contained the ready-to-advance marker.""" return PHASE_READY_MARKER in (self._last_response or "") @property def last_response(self) -> str: return self._last_response @property def last_subtype(self) -> str: return self._last_subtype # -- persistence -------------------------------------------------------- def _persist(self, *, action: str, cost_usd: float = 0.0) -> None: data = _load_planning_state() key = _channel_key(self.adapter, self.channel_id) existing = data.get(key, {}) now = datetime.now(timezone.utc).isoformat() phases_completed = existing.get("phases_completed", []) # If this session changed phase, the orchestrator handles transition; # we just keep our own slot consistent with the current phase. entry = { "slug": self.slug, "description": self.description, "phase": self.phase, "phases_completed": phases_completed, "session_id": self.session_id, "planning_session_id": self.planning_session_id, "adapter": self.adapter, "channel_id": self.channel_id, "started_at": existing.get("started_at", now), "updated_at": now, "last_text_excerpt": (self._last_response or "")[:500], "last_subtype": self._last_subtype, "total_cost_usd": ( float(existing.get("total_cost_usd") or 0.0) + float(cost_usd or 0.0) ), } data[key] = entry _save_planning_state(data) @classmethod def from_state(cls, adapter: str, channel_id: str) -> "PlanningSession | None": """Reconstruct a session from `sessions/planning.json` (post-restart).""" data = _load_planning_state() entry = data.get(_channel_key(adapter, channel_id)) if not entry or not entry.get("session_id"): return None sess = cls( slug=entry["slug"], description=entry.get("description", ""), phase=entry["phase"], channel_id=channel_id, adapter=adapter, session_id=entry["session_id"], planning_session_id=entry.get("planning_session_id"), ) sess._last_subtype = entry.get("last_subtype", "") sess._last_response = entry.get("last_text_excerpt", "") return sess # --------------------------------------------------------------------------- # Module-level helpers consumed by router/orchestrator/adapters # --------------------------------------------------------------------------- def get_planning_state(adapter: str, channel_id: str) -> dict | None: """Return persisted planning state for a channel, or None.""" return _load_planning_state().get(_channel_key(adapter, channel_id)) def is_in_planning(adapter: str, channel_id: str) -> bool: """True if there is an active planning session for this channel.""" return get_planning_state(adapter, channel_id) is not None def clear_planning_state(adapter: str, channel_id: str) -> bool: """Drop persisted planning state. Returns True if anything was cleared.""" data = _load_planning_state() key = _channel_key(adapter, channel_id) if key in data: del data[key] _save_planning_state(data) return True return False def list_planning_sessions() -> dict: """Return all persisted planning sessions (for diagnostics).""" return _load_planning_state()