""" Claude CLI session manager for Echo-Core. Wraps the Claude Code CLI to provide conversation management. Each Discord channel maps to at most one active Claude session, tracked in sessions/active.json. """ import json import logging import os import shutil import subprocess import tempfile from datetime import datetime, timezone from pathlib import Path logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants & configuration # --------------------------------------------------------------------------- PROJECT_ROOT = Path(__file__).resolve().parent.parent PERSONALITY_DIR = PROJECT_ROOT / "personality" SESSIONS_DIR = PROJECT_ROOT / "sessions" _SESSIONS_FILE = SESSIONS_DIR / "active.json" VALID_MODELS = {"haiku", "sonnet", "opus"} DEFAULT_MODEL = "sonnet" DEFAULT_TIMEOUT = 120 # seconds CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "claude") PERSONALITY_FILES = [ "IDENTITY.md", "SOUL.md", "USER.md", "AGENTS.md", "TOOLS.md", "HEARTBEAT.md", ] # Environment variables allowed through to the Claude subprocess _ENV_PASSTHROUGH = { "PATH", "HOME", "USER", "LANG", "LC_ALL", "TERM", "SHELL", "TMPDIR", "XDG_CONFIG_HOME", "XDG_DATA_HOME", "CLAUDE_CONFIG_DIR", } # --------------------------------------------------------------------------- # Module-level binary check # --------------------------------------------------------------------------- if not shutil.which(CLAUDE_BIN): logger.warning( "Claude CLI (%s) not found on PATH. " "Session functions will raise FileNotFoundError.", CLAUDE_BIN, ) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _safe_env() -> dict[str, str]: """Return a filtered copy of os.environ for subprocess calls.""" return {k: v for k, v in os.environ.items() if k in _ENV_PASSTHROUGH} def _load_sessions() -> dict: """Load sessions from active.json. Returns {} if missing or empty.""" try: text = _SESSIONS_FILE.read_text(encoding="utf-8") if not text.strip(): return {} return json.loads(text) except (FileNotFoundError, json.JSONDecodeError): return {} def _save_sessions(data: dict) -> None: """Atomically write sessions to active.json via tempfile + os.replace.""" SESSIONS_DIR.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp( dir=SESSIONS_DIR, prefix=".active_", suffix=".json" ) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") os.replace(tmp_path, _SESSIONS_FILE) except BaseException: # Clean up temp file on any failure try: os.unlink(tmp_path) except OSError: pass raise def _run_claude(cmd: list[str], timeout: int) -> dict: """Run a Claude CLI command and return the parsed JSON output.""" if not shutil.which(CLAUDE_BIN): raise FileNotFoundError( "Claude CLI not found. " "Install: https://docs.anthropic.com/en/docs/claude-code" ) try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, env=_safe_env(), cwd=PROJECT_ROOT, ) except subprocess.TimeoutExpired: raise TimeoutError(f"Claude CLI timed out after {timeout}s") if proc.returncode != 0: raise RuntimeError( f"Claude CLI error (exit {proc.returncode}): " f"{proc.stderr[:500]}" ) try: data = json.loads(proc.stdout) except json.JSONDecodeError as exc: raise RuntimeError(f"Failed to parse Claude CLI output: {exc}") return data # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_system_prompt() -> str: """Concatenate personality/*.md files into a single system prompt.""" if not PERSONALITY_DIR.is_dir(): raise FileNotFoundError( f"Personality directory not found: {PERSONALITY_DIR}" ) parts: list[str] = [] for filename in PERSONALITY_FILES: filepath = PERSONALITY_DIR / filename if filepath.is_file(): parts.append(filepath.read_text(encoding="utf-8")) return "\n\n---\n\n".join(parts) def start_session( channel_id: str, message: str, model: str = DEFAULT_MODEL, timeout: int = DEFAULT_TIMEOUT, ) -> tuple[str, str]: """Start a new Claude CLI session for a channel. Returns (response_text, session_id). """ if model not in VALID_MODELS: raise ValueError( f"Invalid model '{model}'. Must be one of: haiku, sonnet, opus" ) system_prompt = build_system_prompt() cmd = [ CLAUDE_BIN, "-p", message, "--model", model, "--output-format", "json", "--system-prompt", system_prompt, ] data = _run_claude(cmd, timeout) for field in ("result", "session_id"): if field not in data: raise RuntimeError( f"Claude CLI response missing required field: {field}" ) response_text = data["result"] session_id = data["session_id"] # Save session metadata now = datetime.now(timezone.utc).isoformat() sessions = _load_sessions() sessions[channel_id] = { "session_id": session_id, "model": model, "created_at": now, "last_message_at": now, "message_count": 1, } _save_sessions(sessions) return response_text, session_id def resume_session( session_id: str, message: str, timeout: int = DEFAULT_TIMEOUT, ) -> str: """Resume an existing Claude session by ID. Returns response text.""" cmd = [ CLAUDE_BIN, "-p", message, "--resume", session_id, "--output-format", "json", ] data = _run_claude(cmd, timeout) if "result" not in data: raise RuntimeError( "Claude CLI response missing required field: result" ) response_text = data["result"] # Update session metadata now = datetime.now(timezone.utc).isoformat() sessions = _load_sessions() for channel_id, session in sessions.items(): if session.get("session_id") == session_id: session["last_message_at"] = now session["message_count"] = session.get("message_count", 0) + 1 break _save_sessions(sessions) return response_text def send_message( channel_id: str, message: str, model: str = DEFAULT_MODEL, timeout: int = DEFAULT_TIMEOUT, ) -> str: """High-level convenience: auto start or resume based on channel state.""" session = get_active_session(channel_id) if session is not None: return resume_session(session["session_id"], message, timeout) response_text, _session_id = start_session( channel_id, message, model, timeout ) return response_text def clear_session(channel_id: str) -> bool: """Remove a channel's session entry. Returns True if removed.""" sessions = _load_sessions() if channel_id not in sessions: return False del sessions[channel_id] _save_sessions(sessions) return True def get_active_session(channel_id: str) -> dict | None: """Return session metadata for a channel, or None.""" sessions = _load_sessions() return sessions.get(channel_id) def list_sessions() -> dict: """Return all channel→session mappings from active.json.""" return _load_sessions()