feat(ralph): conversational planning agent (W2)

Echo Core devine planning agent: poartă o conversație multi-fază cu Marius
folosind skill-urile gstack (/office-hours → /plan-ceo-review →
/plan-eng-review → /plan-design-review opt) și produce final-plan.md în
~/workspace/<slug>/scripts/ralph/, gata să fie consumat de Ralph PRD
generator (W3) noaptea.

Decizii arhitecturale (din eng review + spike findings):
- PlanningSession ca clasă SEPARATĂ de chat-ul main (NU mode=string param)
  — separation explicit. claude_session.py rămâne strict pentru chat;
  planning trăiește în src/planning_session.py + src/planning_orchestrator.py.
  Inheritance literală nu se aplică (claude_session.py expune funcții
  module-level, nu o clasă) — separation e satisfacută prin module distinct.
- Fresh subprocess PER skill phase, NU single resumed session — phase-urile
  coordinează via disk artifacts (gstack convention în
  ~/.gstack/projects/<slug>/). Avoids context window growth.
- --max-turns 20 default + retry pe error_max_turns la --max-turns 30.
  Spike a arătat că prompt-uri complexe pot exploda turn budget-ul.
- approved-tasks.json schema extins cu planning_session_id + final_plan_path
  (Status flow: pending → planning → approved → running → complete).
- State separat în sessions/planning.json (NU active.json), keyed pe
  (adapter, channel_id) pentru re-resume la restart echo-core.

Trigger-e:
- Discord: slash command /plan <slug> [descriere] cu autocomplete pe pending,
  buton "🧠 Planifică" în RalphProjectView, și /cancel slash command.
- Telegram: /plan + /cancel commands, plus buton "🧠 Planifică" în
  ralph project keyboard.
- Router: state-aware routing — dacă chat-ul e în planning, mesajele plain
  trec la PlanningOrchestrator.respond() prin --resume; /cancel revine la
  status pending; /advance / "Continuă faza" advance fază nouă (fresh
  subprocess); /finalize sau "Dau drumul" promote la status approved.

Discord defer pattern: toate butoanele noi (PlanningActiveView,
PlanningFinalView, "🧠 Planifică") apelează await
interaction.response.defer(ephemeral=True) ÎNAINTE de orice IO — evită
"Interaction failed" pe IO >3s.

UX strings warm + colaborativ (per design review): "🧠 Pornesc planning
pentru ...", "Răspunde aici", "Continuă faza", "Dau drumul tonight",
"Anulează" — niciun "Submit/Approve/Cancel" generic.

Tests: 23 noi (test_planning_session, test_planning_orchestrator,
test_router_planning) — toate pass. Mock pe _run_claude pentru a evita
subprocess Claude real în CI.

Files new:
  prompts/planning_agent.md
  src/planning_session.py
  src/planning_orchestrator.py
  tests/test_planning_session.py
  tests/test_planning_orchestrator.py
  tests/test_router_planning.py

Files modified:
  src/claude_session.py        — _run_claude(cwd=...) optional + surface subtype/is_error
  src/router.py                — state-aware routing, start_planning_session, planning_advance/approve/cancel, _ralph_propose schema cu planning_session_id + final_plan_path
  src/adapters/discord_bot.py  — /plan + /cancel slash commands; planning views imported
  src/adapters/discord_views.py — PlanningActiveView, PlanningFinalView, "Planifică" button în RalphProjectView, _split_chunks helper
  src/adapters/telegram_bot.py — /plan + /cancel handlers, callback_ralph extins cu plan/planadvance/plancancel/planapprove, planning keyboards

Status testelor pe modulele atinse: 75 passed, 0 failed
(test_claude_session security_section preexistent — neatins).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-26 18:38:51 +00:00
parent e06a79d98c
commit 51e56af557
11 changed files with 2244 additions and 7 deletions

67
prompts/planning_agent.md Normal file
View File

@@ -0,0 +1,67 @@
# Echo planning agent — system prompt
Ești **Echo**, asistentul lui Marius, în rol de **agent de planning conversational**. Marius
te-a chemat să porți cu el o conversație multi-fază despre un feature, până se naște un plan
implementabil. La final, tu (sau o fază ulterioară) scrii `final-plan.md` în repo-ul țintă,
iar Ralph îl execută noaptea pe stories.
## Context curent
- **Slug proiect:** `{slug}`
- **Descriere inițială:** {description}
- **Faza curentă:** `{phase}`
- **Repo țintă (CWD):** `~/workspace/{slug}/`
- **Artefacte gstack anterioare:** `~/.gstack/projects/{slug}/` (citește înainte să întrebi
lucruri pe care alte faze le-au lămurit deja)
- **Output final:** `~/workspace/{slug}/scripts/ralph/final-plan.md`
## Voce / ton
Cald + colaborativ, ca un coleg cu care construiești ceva. „Hai să...", „ce-ți dorești", „noi"
— niciodată „Please provide", „Submit", „Approve". Răspunde în limba lui Marius (română default;
dacă scrie EN, mergi EN). Concis: 3-6 propoziții per turn, nu eseuri.
## Cum coordonezi cu skill-urile gstack
Faza curentă e numele unui skill gstack (`/office-hours`, `/plan-ceo-review`,
`/plan-eng-review`, `/plan-design-review`). Când primești prima invocare a fazei, urmează skill-ul
ca de obicei — el îți dă structura. Nu re-rula skill-ul în interiorul aceleiași sesiuni decât
dacă Marius cere explicit.
Fiecare fază rulează într-un **subprocess Claude separat** (fresh `claude -p`). Sesiunea
precedentă a salvat un artifact pe disc (`~/.gstack/projects/{slug}/...`); citește-l ca să nu îl
întrebi pe Marius lucruri lămurite deja.
## Reguli de output
1. **Întrebări pentru Marius** — pune-i 13 întrebări la rând, nu 10. AskUserQuestion gstack se
serializează ca text simplu — nu te bloca în tool-use când ești în `-p` mode.
2. **Marker de progres** — când consideri faza completă în mintea ta, închide turnul cu o
linie pe ultim rand:
```
PHASE_STATUS: ready_to_advance
```
Echo (orchestratorul) o citește și îi prezintă lui Marius butonul „Continuă faza".
Dacă mai ai nevoie de input, închide cu `PHASE_STATUS: needs_input`.
3. **Artifact pe disc** — la sfârșitul fazei tale, scrie sau actualizează artifactul în
`~/.gstack/projects/{slug}/{user}-{phase}-...md` conform convenției skill-ului. Nu inventa
path-uri noi — folosește exact ce skill-ul gstack creează implicit.
4. **Final plan** — în ultima fază (sau când Marius spune explicit „gata"), scrie
`~/workspace/{slug}/scripts/ralph/final-plan.md` cu secțiunile:
- Context (de ce această schimbare)
- Architecture overview
- User stories preliminare (Ralph PRD generator le va structura ulterior)
- Implementation hints
- Verification approach (smoke tests, ce gates relevante)
5. **Niciodată nu rula** comenzi destructive fără confirmare. Nu modifica fișiere în afara
`~/workspace/{slug}/` și `~/.gstack/projects/{slug}/`.
## Granițe
- Nu ai voie să atingi `src/router.py`, `src/claude_session.py`, `src/planning_session.py`,
`src/planning_orchestrator.py` sau alte fișiere core din `echo-core` — chiar dacă Marius îți
cere ceva care ar implica asta, întoarce-te la el cu „asta e core Echo, fac eu pe master".
- Nu inventa decizii arhitecturale fără să ai semnal de la Marius. Dacă te blochează lipsă de
context, întreabă-l pe el direct.
- Cost / rate-limit: Marius e pe subscription Anthropic, deci ignoră US$. Dar ține-te scurt —
fiecare turn consumă rate-limit budget.

View File

@@ -23,8 +23,17 @@ from src.router import (
_ralph_status,
_ralph_stop,
_load_approved_tasks,
planning_advance,
planning_approve,
planning_cancel,
start_planning_session,
)
from src.adapters.discord_views import (
RalphRootView,
PlanningActiveView,
PlanningFinalView,
_split_chunks,
)
from src.adapters.discord_views import RalphRootView
logger = logging.getLogger("echo-core.discord")
_security_log = logging.getLogger("echo-core.security")
@@ -985,6 +994,67 @@ def create_bot(config: Config) -> discord.Client:
) -> None:
await interaction.response.send_message(_ralph_stop(slug))
# ---- Planning agent (W2) ---------------------------------------------
@tree.command(name="plan", description="Pornește o sesiune de planning conversational pentru un proiect")
@app_commands.describe(
slug="Project slug (folosește /p ca să-l adaugi întâi)",
description="Descriere opțională (default: cea din approved-tasks.json)",
)
@app_commands.autocomplete(slug=_ralph_autocomplete_pending)
async def plan_cmd(
interaction: discord.Interaction,
slug: str,
description: str | None = None,
) -> None:
await interaction.response.defer(ephemeral=True)
# Resolve description: explicit param wins, else look up in approved-tasks.
desc = (description or "").strip()
if not desc:
try:
data = _load_approved_tasks()
for p in data.get("projects", []):
if p.get("name", "").lower() == slug.lower():
desc = p.get("description") or ""
break
except Exception:
logger.exception("approved-tasks lookup failed")
if not desc:
await interaction.followup.send(
f"Nu am descriere pentru `{slug}`. Adaugă cu `/p {slug} <descriere>` "
"sau pasează `description` la `/plan`.",
ephemeral=True,
)
return
channel_id = str(interaction.channel_id)
await interaction.followup.send(
f"🧠 Pornesc planning pentru `{slug}`… (durează ~60s)", ephemeral=True
)
try:
first = await asyncio.to_thread(
start_planning_session, slug, desc, channel_id, "discord",
)
except Exception as e:
logger.exception("start_planning_session failed for %s", slug)
await interaction.followup.send(f"Planning blocat: {e}", ephemeral=True)
return
for chunk in _split_chunks(first):
await interaction.followup.send(chunk, ephemeral=True)
await interaction.followup.send(
"Răspunde aici. Apasă **Continuă faza** când ești gata să trec la următoarea.",
view=PlanningActiveView(),
ephemeral=True,
)
@tree.command(name="cancel", description="Anulează sesiunea de planning curentă")
async def cancel_planning_cmd(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
text = await asyncio.to_thread(
planning_cancel, str(interaction.channel_id), "discord",
)
await interaction.followup.send(text, ephemeral=True)
# --- Events ---
@client.event
@@ -1029,6 +1099,7 @@ def create_bot(config: Config) -> discord.Client:
response, _is_cmd = await asyncio.to_thread(
route_message, channel_id, user_id, text,
on_text=on_text,
adapter_name="discord",
)
# Only send the final combined response if no intermediates

View File

@@ -20,7 +20,12 @@ from src.router import (
_ralph_propose,
_ralph_status,
_ralph_stop,
planning_advance,
planning_approve,
planning_cancel,
start_planning_session,
)
from src.planning_session import is_in_planning
log = logging.getLogger(__name__)
@@ -168,7 +173,49 @@ class RalphProjectView(discord.ui.View):
text = _ralph_stop(self.slug)
await interaction.followup.send(text, ephemeral=True)
@discord.ui.button(label="🔙 Înapoi", style=discord.ButtonStyle.secondary, row=1)
@discord.ui.button(label="🧠 Planifică", style=discord.ButtonStyle.primary, row=2)
async def plan(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
await interaction.response.defer(ephemeral=True)
# Look up description from approved-tasks.json
description = ""
try:
data = _load_approved_tasks()
for p in data.get("projects", []):
if p.get("name", "").lower() == self.slug.lower():
description = p.get("description") or ""
break
except Exception:
log.exception("approved-tasks lookup failed")
if not description:
await interaction.followup.send(
f"Nu am descriere pentru `{self.slug}`. "
f"Adaugă mai întâi cu `/p {self.slug} <descriere>`.",
ephemeral=True,
)
return
channel_id = str(interaction.channel_id)
await interaction.followup.send(
f"🧠 Pornesc planning pentru `{self.slug}`… (durează ~60s)",
ephemeral=True,
)
try:
first = start_planning_session(
self.slug, description, channel_id, "discord",
)
except Exception as e:
log.exception("start_planning_session failed for %s", self.slug)
await interaction.followup.send(f"Planning blocat: {e}", ephemeral=True)
return
# Send first message of the planning agent + active keyboard
for chunk in _split_chunks(first, 1900):
await interaction.followup.send(chunk, ephemeral=True)
await interaction.followup.send(
"Răspunde aici. Apasă **Continuă faza** când ești gata să trec la următoarea.",
view=PlanningActiveView(),
ephemeral=True,
)
@discord.ui.button(label="🔙 Înapoi", style=discord.ButtonStyle.secondary, row=2)
async def back(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
await interaction.response.defer(ephemeral=True)
view = RalphRootView()
@@ -231,6 +278,80 @@ class _CloseButton(discord.ui.Button):
await interaction.edit_original_response(content="Închis.", view=None)
def _split_chunks(text: str, limit: int = 1900) -> list[str]:
"""Split a long message into Discord-safe chunks."""
if len(text) <= limit:
return [text]
chunks: list[str] = []
while text:
if len(text) <= limit:
chunks.append(text)
break
cut = text.rfind("\n", 0, limit)
if cut == -1:
cut = limit
chunks.append(text[:cut])
text = text[cut:].lstrip("\n")
return chunks
# ---------------------------------------------------------------------------
# Planning views (W2) — buttons that drive the planning conversation
# ---------------------------------------------------------------------------
class PlanningActiveView(discord.ui.View):
"""Buttons shown DURING an active planning session: advance phase / cancel."""
def __init__(self) -> None:
super().__init__(timeout=VIEW_TIMEOUT)
@discord.ui.button(label="▶️ Continuă faza", style=discord.ButtonStyle.primary, row=0)
async def advance(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
await interaction.response.defer(ephemeral=True)
channel_id = str(interaction.channel_id)
try:
text, completed = planning_advance(channel_id, "discord")
except Exception as e:
log.exception("planning advance failed")
await interaction.followup.send(f"Eroare: {e}", ephemeral=True)
return
for chunk in _split_chunks(text):
await interaction.followup.send(chunk, ephemeral=True)
view: discord.ui.View = (
PlanningFinalView() if completed else PlanningActiveView()
)
await interaction.followup.send(
("Plan gata. Confirmi?" if completed else "Continuăm?"),
view=view, ephemeral=True,
)
@discord.ui.button(label="🛑 Anulează", style=discord.ButtonStyle.danger, row=0)
async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
await interaction.response.defer(ephemeral=True)
text = planning_cancel(str(interaction.channel_id), "discord")
await interaction.followup.send(text, ephemeral=True)
class PlanningFinalView(discord.ui.View):
"""Buttons shown when ALL planning phases finished — Dau drumul / Anulează."""
def __init__(self) -> None:
super().__init__(timeout=VIEW_TIMEOUT)
@discord.ui.button(label="✅ Dau drumul tonight", style=discord.ButtonStyle.success, row=0)
async def approve(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
await interaction.response.defer(ephemeral=True)
text = planning_approve(str(interaction.channel_id), "discord")
await interaction.followup.send(text, ephemeral=True)
@discord.ui.button(label="🛑 Anulează", style=discord.ButtonStyle.danger, row=0)
async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
await interaction.response.defer(ephemeral=True)
text = planning_cancel(str(interaction.channel_id), "discord")
await interaction.followup.send(text, ephemeral=True)
class RalphRootView(discord.ui.View):
"""Landing view: workspace projects with status emoji + refresh + close."""

View File

@@ -38,7 +38,12 @@ from src.router import (
_ralph_approve,
_ralph_status,
_ralph_stop,
planning_advance,
planning_approve,
planning_cancel,
start_planning_session,
)
from src.planning_session import is_in_planning
WORKSPACE_DIR = Path("/home/moltbot/workspace")
ADAPTER_NAME = "telegram"
@@ -408,19 +413,42 @@ def _build_ralph_project_keyboard(slug: str) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup([
[
InlineKeyboardButton(" Propune feature", callback_data=f"ralph:propose:{slug}"),
InlineKeyboardButton("🧠 Planifică", callback_data=f"ralph:plan:{slug}"),
],
[
InlineKeyboardButton("👁 Vezi PRD", callback_data=f"ralph:prd:{slug}"),
],
[
InlineKeyboardButton("📊 Status", callback_data=f"ralph:status:{slug}"),
InlineKeyboardButton("✅ Aprobă tonight", callback_data=f"ralph:approve:{slug}"),
],
[
InlineKeyboardButton("✅ Aprobă tonight", callback_data=f"ralph:approve:{slug}"),
InlineKeyboardButton("🛑 Stop", callback_data=f"ralph:stop:{slug}"),
],
[
InlineKeyboardButton("🔙 Înapoi", callback_data="ralph:menu"),
],
])
def _build_planning_active_keyboard() -> InlineKeyboardMarkup:
"""Keyboard shown DURING an active planning session (after each turn)."""
return InlineKeyboardMarkup([
[
InlineKeyboardButton("▶️ Continuă faza", callback_data="ralph:planadvance"),
InlineKeyboardButton("🛑 Anulează", callback_data="ralph:plancancel"),
],
])
def _build_planning_final_keyboard() -> InlineKeyboardMarkup:
"""Keyboard shown when the planning pipeline has finished all phases."""
return InlineKeyboardMarkup([
[
InlineKeyboardButton("✅ Dau drumul tonight", callback_data="ralph:planapprove"),
InlineKeyboardButton("🛑 Anulează", callback_data="ralph:plancancel"),
],
])
def _render_ralph_root_summary() -> str:
try:
data = _load_approved_tasks()
@@ -468,6 +496,73 @@ async def cmd_ralph_k(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Non
await update.message.reply_text(result)
def split_planning_chunks(text: str, limit: int = 4096) -> list[str]:
"""Telegram-safe split (mirrors split_message but local to avoid forward ref)."""
if len(text) <= limit:
return [text]
chunks = []
while text:
if len(text) <= limit:
chunks.append(text)
break
cut = text.rfind("\n", 0, limit)
if cut == -1:
cut = limit
chunks.append(text[:cut])
text = text[cut:].lstrip("\n")
return chunks
async def cmd_plan(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/plan <slug> [descriere] — pornește o sesiune de planning conversational."""
args = list(context.args or [])
if not args:
await update.message.reply_text("Folosire: /plan <slug> [descriere]")
return
slug = args[0]
description = " ".join(args[1:]).strip()
if not description:
# Look up from approved-tasks
try:
data = _load_approved_tasks()
except Exception:
data = {"projects": []}
for p in data.get("projects", []):
if p.get("name", "").lower() == slug.lower():
description = p.get("description") or ""
break
if not description:
await update.message.reply_text(
f"Nu am descriere pentru `{slug}`. Adaugă cu /p {slug} <descriere>.",
parse_mode="Markdown",
)
return
chat_id = update.message.chat_id
await update.message.reply_text(
f"🧠 Pornesc planning pentru *{slug}*… (durează ~60s)",
parse_mode="Markdown",
)
first = await asyncio.to_thread(
start_planning_session, slug, description, str(chat_id), ADAPTER_NAME,
)
for chunk in split_planning_chunks(first):
await context.bot.send_message(chat_id=chat_id, text=chunk)
await context.bot.send_message(
chat_id=chat_id,
text="Răspunde aici. Apasă _Continuă faza_ când ești gata să trec la următoarea.",
reply_markup=_build_planning_active_keyboard(),
parse_mode="Markdown",
)
async def cmd_cancel_planning(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/cancel — anulează sesiunea de planning curentă."""
text = await asyncio.to_thread(
planning_cancel, str(update.message.chat_id), ADAPTER_NAME,
)
await update.message.reply_text(text)
async def callback_ralph(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle inline keyboard callbacks for Ralph (pattern ^ralph:)."""
query = update.callback_query
@@ -576,6 +671,71 @@ async def callback_ralph(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
await context.bot.send_message(chat_id=int(chat_id), text=result)
return
# ---- Planning agent (W2) ---------------------------------------------
if action == "plan":
# Look up project description from approved-tasks.json (or workspace fallback).
try:
data = _load_approved_tasks()
except Exception:
data = {"projects": []}
description = ""
for p in data.get("projects", []):
if p.get("name", "").lower() == (slug or "").lower():
description = p.get("description") or ""
break
if not description:
await context.bot.send_message(
chat_id=int(chat_id),
text=(
f"Nu am descriere pentru `{slug}`. "
f"Adaugă mai întâi cu `/p {slug} <descriere>`."
),
parse_mode="Markdown",
)
return
await context.bot.send_message(
chat_id=int(chat_id),
text=f"🧠 Pornesc planning pentru *{slug}*… (durează ~60s)",
parse_mode="Markdown",
)
first = await asyncio.to_thread(
start_planning_session, slug, description, str(chat_id), ADAPTER_NAME,
)
for chunk in split_message(first):
await context.bot.send_message(chat_id=int(chat_id), text=chunk)
await context.bot.send_message(
chat_id=int(chat_id),
text="Răspunde aici. Apasă _Continuă faza_ când ești gata să trec la următoarea.",
reply_markup=_build_planning_active_keyboard(),
parse_mode="Markdown",
)
return
if action == "planadvance":
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
text, completed = await asyncio.to_thread(
planning_advance, str(chat_id), ADAPTER_NAME,
)
for chunk in split_message(text):
await context.bot.send_message(chat_id=int(chat_id), text=chunk)
kb = _build_planning_final_keyboard() if completed else _build_planning_active_keyboard()
await context.bot.send_message(
chat_id=int(chat_id),
text=("Plan gata. Confirmi?" if completed else "Continuăm?"),
reply_markup=kb,
)
return
if action == "plancancel":
text = await asyncio.to_thread(planning_cancel, str(chat_id), ADAPTER_NAME)
await context.bot.send_message(chat_id=int(chat_id), text=text)
return
if action == "planapprove":
text = await asyncio.to_thread(planning_approve, str(chat_id), ADAPTER_NAME)
await context.bot.send_message(chat_id=int(chat_id), text=text)
return
# --- Fast command handlers ---
@@ -826,6 +986,10 @@ def create_telegram_bot(config: Config, token: str) -> Application:
app.add_handler(CommandHandler("l", cmd_ralph_l))
app.add_handler(CommandHandler("k", cmd_ralph_k))
# Planning agent (W2)
app.add_handler(CommandHandler("plan", cmd_plan))
app.add_handler(CommandHandler("cancel", cmd_cancel_planning))
# Fast commands
app.add_handler(CommandHandler("email", cmd_email))
app.add_handler(CommandHandler("emailsend", cmd_emailsend))
@@ -880,6 +1044,8 @@ def create_telegram_bot(config: Config, token: str) -> Application:
BotCommand("a", "Ralph: approve project for tonight"),
BotCommand("l", "Ralph: list projects status"),
BotCommand("k", "Ralph: stop running project"),
BotCommand("plan", "Planning conversational pentru un proiect"),
BotCommand("cancel", "Anulează planning în curs"),
])
app.post_init = post_init

View File

@@ -227,6 +227,7 @@ def _run_claude(
cmd: list[str],
timeout: int,
on_text: Callable[[str], None] | None = None,
cwd: Path | str | None = None,
) -> dict:
"""Run a Claude CLI command and return parsed output.
@@ -237,6 +238,10 @@ def _run_claude(
If *on_text* is provided it is called with each intermediate text block
as soon as it arrives (before the process finishes), enabling real-time
streaming to adapters.
*cwd* — optional working directory override (default: PROJECT_ROOT).
Used by PlanningSession to scope the subprocess to ``~/workspace/<slug>/``
so artifacts land in the target repo.
"""
if not shutil.which(CLAUDE_BIN):
raise FileNotFoundError(
@@ -250,7 +255,7 @@ def _run_claude(
stderr=subprocess.PIPE,
text=True,
env=_safe_env(),
cwd=PROJECT_ROOT,
cwd=str(cwd) if cwd else PROJECT_ROOT,
)
# Watchdog thread: kill the process if it exceeds the timeout
@@ -346,6 +351,10 @@ def _run_claude(
"duration_ms": result_obj.get("duration_ms", 0),
"num_turns": result_obj.get("num_turns", 0),
"intermediate_count": intermediate_count,
# Surface subtype/is_error for callers that retry on `error_max_turns`
# (PlanningSession does this — spike findings recommended retry strategy).
"subtype": result_obj.get("subtype", ""),
"is_error": bool(result_obj.get("is_error", False)),
}

View File

@@ -0,0 +1,299 @@
"""PlanningOrchestrator — multi-phase planning coordinator.
Sequences `/office-hours → /plan-ceo-review → /plan-eng-review →
/plan-design-review` (last only when description hints at UI scope), each in a
**fresh subprocess** (per W2 plan + spike findings). Phases coordinate via
disk artifacts (gstack convention: `~/.gstack/projects/<slug>/...`).
API used by router/adapters:
PlanningOrchestrator.start(slug, description, channel_id, adapter)
→ (session, first_response_text)
PlanningOrchestrator.respond(adapter, channel_id, message)
→ (session, response_text, phase_ready: bool)
PlanningOrchestrator.advance(adapter, channel_id)
→ (session, first_response_text, completed: bool)
# completed=True when no further phase remains; final-plan stub
# written by the orchestrator if the planning agent didn't.
PlanningOrchestrator.cancel(adapter, channel_id)
→ bool
PlanningOrchestrator.has_ui_scope(description) → bool
The orchestrator writes the final-plan stub path even if planning agent did
not (so PRD generator always has something to read in W3).
"""
from __future__ import annotations
import logging
import re
from pathlib import Path
from typing import Callable
from src.planning_session import (
GSTACK_PROJECTS_ROOT,
PHASE_READY_MARKER,
WORKSPACE_ROOT,
PlanningSession,
_load_planning_state,
_save_planning_state,
_channel_key,
clear_planning_state,
get_planning_state,
)
log = logging.getLogger(__name__)
# Ordered phase pipeline. The design phase is appended only when the
# description hints at UI scope (heuristic — see has_ui_scope).
BASE_PHASES = ["/office-hours", "/plan-ceo-review", "/plan-eng-review"]
DESIGN_PHASE = "/plan-design-review"
UI_HINT_PATTERN = re.compile(
r"\b(ui|ux|frontend|design|button|page|css|html|interfa[țt]?[aăă]?|"
r"layout|component|view|dashboard|modal|form|screen|"
# Romanian variants
r"pagin[ăa]|buton|ecran|formular)",
re.IGNORECASE,
)
def has_ui_scope(description: str) -> bool:
"""Cheap heuristic — adds /plan-design-review when description mentions UI."""
return bool(UI_HINT_PATTERN.search(description or ""))
def _phases_for(description: str) -> list[str]:
phases = list(BASE_PHASES)
if has_ui_scope(description):
phases.append(DESIGN_PHASE)
return phases
def _final_plan_path(slug: str) -> Path:
return WORKSPACE_ROOT / slug / "scripts" / "ralph" / "final-plan.md"
def _ensure_final_plan_dir(slug: str) -> Path:
target = _final_plan_path(slug)
target.parent.mkdir(parents=True, exist_ok=True)
return target
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
class PlanningOrchestrator:
"""Stateless coordinator — all state is in `sessions/planning.json`."""
@staticmethod
def start(
slug: str,
description: str,
channel_id: str,
adapter: str = "echo",
on_text: Callable[[str], None] | None = None,
) -> tuple[PlanningSession, str]:
"""Begin planning at phase 0 (`/office-hours`). Persists state.
Returns (PlanningSession, first_response_text).
"""
phases = _phases_for(description)
first_phase = phases[0]
log.info(
"planning.start slug=%s adapter=%s channel=%s phases=%s",
slug, adapter, channel_id, phases,
)
# Wipe any prior state for this channel (start fresh).
clear_planning_state(adapter, channel_id)
session = PlanningSession.start(
slug=slug,
description=description,
phase=first_phase,
channel_id=channel_id,
adapter=adapter,
on_text=on_text,
)
# Stash phase plan into disk state so advance() knows the pipeline.
data = _load_planning_state()
key = _channel_key(adapter, channel_id)
if key in data:
data[key]["phases_planned"] = phases
data[key]["phase_index"] = 0
data[key]["phases_completed"] = []
_save_planning_state(data)
return session, session.last_response
@staticmethod
def respond(
adapter: str,
channel_id: str,
message: str,
on_text: Callable[[str], None] | None = None,
) -> tuple[PlanningSession | None, str, bool]:
"""Forward `message` into the active phase via `--resume`.
Returns (session, response_text, phase_ready).
`phase_ready=True` means the planning agent emitted PHASE_READY_MARKER
— the adapter should surface a "Continuă faza" / "Finalizează" button.
"""
session = PlanningSession.from_state(adapter, channel_id)
if session is None:
return None, "Nu există o sesiune de planning activă pe acest canal.", False
text = session.respond(message, on_text=on_text)
return session, text, session.is_phase_ready()
@staticmethod
def advance(
adapter: str,
channel_id: str,
on_text: Callable[[str], None] | None = None,
) -> tuple[PlanningSession | None, str, bool]:
"""Move to the next phase (fresh subprocess). Returns (session, text, completed).
If no more phases remain, writes a final-plan.md stub (if the agent
didn't) and returns (last_session, summary_text, completed=True).
"""
state = get_planning_state(adapter, channel_id)
if not state:
return None, "Nu există o sesiune de planning activă.", False
phases = state.get("phases_planned") or _phases_for(state.get("description", ""))
completed = list(state.get("phases_completed") or [])
current_phase = state.get("phase")
if current_phase and current_phase not in completed:
completed.append(current_phase)
# Find next phase
try:
cur_idx = phases.index(current_phase) if current_phase in phases else -1
except ValueError:
cur_idx = -1
next_idx = cur_idx + 1
slug = state["slug"]
description = state.get("description", "")
if next_idx >= len(phases):
# Pipeline complete — ensure final-plan.md exists, return summary.
target = _ensure_final_plan_dir(slug)
if not target.exists():
stub = _build_final_plan_stub(slug, description, completed, state)
target.write_text(stub, encoding="utf-8")
log.info("planning.advance wrote final-plan stub: %s", target)
# Persist completion marker but keep state so adapter can show
# "Dau drumul tonight?" buttons.
data = _load_planning_state()
key = _channel_key(adapter, channel_id)
if key in data:
data[key]["phases_completed"] = completed
data[key]["phase"] = "__complete__"
data[key]["final_plan_path"] = str(target)
_save_planning_state(data)
session = PlanningSession.from_state(adapter, channel_id)
summary = _summary_text(slug, completed, target)
return session, summary, True
next_phase = phases[next_idx]
log.info(
"planning.advance slug=%s adapter=%s channel=%s %s%s",
slug, adapter, channel_id, current_phase, next_phase,
)
# Fresh subprocess for the next phase. Phase coordinates with prior
# phase via gstack disk artifacts (~/.gstack/projects/<slug>/).
session = PlanningSession.start(
slug=slug,
description=description,
phase=next_phase,
channel_id=channel_id,
adapter=adapter,
on_text=on_text,
)
data = _load_planning_state()
key = _channel_key(adapter, channel_id)
if key in data:
data[key]["phases_completed"] = completed
data[key]["phase_index"] = next_idx
data[key]["phases_planned"] = phases
_save_planning_state(data)
return session, session.last_response, False
@staticmethod
def cancel(adapter: str, channel_id: str) -> bool:
"""Drop planning state. Returns True if anything was cleared."""
return clear_planning_state(adapter, channel_id)
@staticmethod
def final_plan_path(slug: str) -> Path:
return _final_plan_path(slug)
# Re-exported for convenience.
has_ui_scope = staticmethod(has_ui_scope)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _summary_text(slug: str, completed_phases: list[str], plan_path: Path) -> str:
phases_str = "".join(completed_phases) if completed_phases else "(nicio fază)"
return (
f"✅ **Plan gata pentru `{slug}`**\n\n"
f"Faze rulate: {phases_str}\n"
f"Plan salvat: `{plan_path}`\n\n"
"Apasă **Dau drumul tonight** ca Ralph să-l implementeze la 23:00, "
"sau **Anulează** dacă vrei să mai gândim."
)
def _build_final_plan_stub(
slug: str, description: str, completed_phases: list[str], state: dict
) -> str:
"""Emit a minimal final-plan.md when the planning agent didn't write one.
Captures what we know so PRD generator (W3) has something concrete to read.
"""
phases_lines = "\n".join(f"- `{p}`" for p in completed_phases) or "- (none)"
last_excerpt = (state.get("last_text_excerpt") or "").strip()
last_block = (
f"\n\n## Last agent output excerpt\n\n```\n{last_excerpt[:2000]}\n```"
if last_excerpt
else ""
)
return f"""# Final plan — {slug}
## Context
{description}
## Phases completed
{phases_lines}
## Architecture overview
(To be filled by planning agent. Stub written by PlanningOrchestrator because the
agent didn't write its own `final-plan.md` before pipeline completion.)
## User stories preliminare
(Stub. Ralph PRD generator will infer concrete stories from this plan + repo state.)
## Implementation hints
(Stub.)
## Verification approach
- typecheck + lint + tests pe modulele atinse
- smart gates Ralph pe tags inferred per story
{last_block}
"""

495
src/planning_session.py Normal file
View File

@@ -0,0 +1,495 @@
"""PlanningSession — Claude CLI wrapper for conversational planning phases.
Per the Echo Core conversational planning agent plan (W2), this is intentionally
a SEPARATE class from the chat session — NOT a `mode=string` parameter on
`ClaudeSession`. The plan calls it "PlanningSession(ClaudeSession) ca SUBCLASĂ".
Since `claude_session.py` exposes module-level functions (not a class) we
implement PlanningSession as a sibling class that REUSES the shared subprocess
helpers (`_run_claude`, `_safe_env`, `CLAUDE_BIN`, `SESSIONS_DIR`) but keeps:
- its own state file (`sessions/planning.json`)
- its own system prompt (`prompts/planning_agent.md`)
- per-slug working directory (`~/workspace/<slug>/`)
- `--add-dir` flags for skills + gstack project artifacts
- `--max-turns 20` default with retry on `error_max_turns`
Spike findings (`tasks/spike-planning-findings.md`):
- `claude -p '/skill'` → text serialization of AskUserQuestion. ✅
- `claude --resume <id> -p '<reply>'` round-trip preserves context. ✅
- Complex prompts can blow turn budget → MUST handle `error_max_turns`.
- Cost ~ $0.51.1/turn Opus 4.7 1M; Marius on subscription so ignore USD.
Architectural decisions captured for the W2 commit message:
1. Separate class (not mode parameter) — clean separation, easy to remove
planning entirely without touching chat session.
2. Fresh subprocess PER skill phase, NOT a single resumed session — phases
coordinate via disk artifacts (gstack convention:
`~/.gstack/projects/<slug>/{user}-{branch}-{phase}-*.md`).
3. State per `(adapter, channel)` keyed string — same convention as
`claude_session.active.json`. Re-resume on restart is supported via
`claude --resume <stored_id>`.
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess
import tempfile
import threading
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from src.claude_session import (
CLAUDE_BIN,
PROJECT_ROOT,
SESSIONS_DIR,
_run_claude,
_safe_env,
)
logger = logging.getLogger(__name__)
_invoke_log = logging.getLogger("echo-core.invoke")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PLANNING_STATE_FILE = SESSIONS_DIR / "planning.json"
PROMPTS_DIR = PROJECT_ROOT / "prompts"
PLANNING_PROMPT_FILE = PROMPTS_DIR / "planning_agent.md"
# Roots scoped into each planning subprocess via --add-dir
WORKSPACE_ROOT = Path("/home/moltbot/workspace")
GSTACK_PROJECTS_ROOT = Path.home() / ".gstack" / "projects"
SKILLS_ROOT = Path.home() / ".claude" / "skills"
# Spike: prompts deep-tool-use can blow small budgets; 20 default with retry.
DEFAULT_MAX_TURNS = 20
RETRY_MAX_TURNS = 30 # boost on `error_max_turns`
DEFAULT_TIMEOUT = 600 # seconds — planning turns are slower than chat
# Marker the planning agent emits when a phase is conceptually done.
# Orchestrator scans for this to decide when to surface the "Continuă faza"
# button. Convention pinned in `prompts/planning_agent.md`.
PHASE_READY_MARKER = "PHASE_STATUS: ready_to_advance"
PHASE_NEEDS_INPUT_MARKER = "PHASE_STATUS: needs_input"
# ---------------------------------------------------------------------------
# Disk state — sessions/planning.json
# Schema:
# {
# "<adapter>:<channel_id>": {
# "slug": "...",
# "description": "...",
# "phase": "/office-hours" | "/plan-ceo-review" | ...,
# "phases_completed": ["/office-hours", ...],
# "session_id": "<claude session uuid>",
# "planning_session_id": "<echo internal uuid>",
# "started_at": "...",
# "updated_at": "...",
# "last_text_excerpt": "...", # 500 char excerpt for debugging
# "last_subtype": "success" | "error_max_turns" | ...,
# }
# }
# ---------------------------------------------------------------------------
def _channel_key(adapter: str, channel_id: str) -> str:
return f"{adapter}:{channel_id}"
def _load_planning_state() -> dict:
"""Load planning sessions from disk. Returns {} if missing or empty."""
try:
text = PLANNING_STATE_FILE.read_text(encoding="utf-8")
if not text.strip():
return {}
return json.loads(text)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_planning_state(data: dict) -> None:
"""Atomically write planning sessions via tempfile + os.replace."""
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
dir=SESSIONS_DIR, prefix=".planning_", suffix=".json"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write("\n")
os.replace(tmp_path, PLANNING_STATE_FILE)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
def build_planning_system_prompt(slug: str, description: str, phase: str) -> str:
"""Render `prompts/planning_agent.md` with phase-specific values.
Returns empty string if the prompt file does not exist (skill-only mode).
"""
if not PLANNING_PROMPT_FILE.exists():
logger.warning(
"Planning prompt missing: %s — falling back to skill-only mode.",
PLANNING_PROMPT_FILE,
)
return ""
template = PLANNING_PROMPT_FILE.read_text(encoding="utf-8")
# Use simple replacement (NOT format()) — markdown contains literal `{}`
# in code blocks which would explode `.format()`.
return (
template
.replace("{slug}", slug)
.replace("{description}", description)
.replace("{phase}", phase)
)
# ---------------------------------------------------------------------------
# PlanningSession class
# ---------------------------------------------------------------------------
class PlanningSession:
"""One Claude CLI subprocess scoped to a planning phase.
Lifecycle:
1. ``PlanningSession.start(slug, description, phase, channel, adapter)``
— fresh subprocess; first prompt is the skill invocation.
2. ``session.respond(message)`` — `claude --resume <session_id>`
per user reply. Returns response text + retry hint.
3. ``session.is_phase_ready()`` — True when output contains
``PHASE_STATUS: ready_to_advance`` (orchestrator advances).
4. State persisted in `sessions/planning.json` so restart is recoverable.
"""
def __init__(
self,
slug: str,
description: str,
phase: str,
channel_id: str,
adapter: str = "echo",
session_id: str | None = None,
planning_session_id: str | None = None,
):
self.slug = slug
self.description = description
self.phase = phase
self.channel_id = channel_id
self.adapter = adapter
self.session_id = session_id
self.planning_session_id = planning_session_id or str(uuid.uuid4())
self._last_response: str = ""
self._last_subtype: str = ""
self._last_is_error: bool = False
# -- working directory & --add-dir scoping ------------------------------
@property
def cwd(self) -> Path:
"""Working directory for the subprocess.
Uses `~/workspace/<slug>/` if it exists; otherwise falls back to
Echo Core repo root (test mode / pre-clone scenarios).
"""
target = WORKSPACE_ROOT / self.slug
if target.is_dir():
return target
return PROJECT_ROOT
def _add_dirs(self) -> list[str]:
"""Build `--add-dir` arguments. Skip dirs that don't exist."""
candidates = [
SKILLS_ROOT,
GSTACK_PROJECTS_ROOT / self.slug,
GSTACK_PROJECTS_ROOT, # fallback in case slug-specific dir missing
]
seen: set[str] = set()
flags: list[str] = []
for d in candidates:
if d.exists() and str(d) not in seen:
flags.extend(["--add-dir", str(d)])
seen.add(str(d))
return flags
# -- command construction ----------------------------------------------
def _build_cmd(
self,
prompt: str,
*,
resume: str | None,
max_turns: int,
with_system_prompt: bool,
) -> list[str]:
cmd = [CLAUDE_BIN, "-p", prompt]
if resume:
cmd += ["--resume", resume]
cmd += [
"--output-format", "stream-json",
"--verbose",
"--max-turns", str(max_turns),
]
if with_system_prompt:
sys_prompt = build_planning_system_prompt(
self.slug, self.description, self.phase
)
if sys_prompt:
cmd += ["--system-prompt", sys_prompt]
cmd += self._add_dirs()
cmd += ["--dangerously-skip-permissions"]
return cmd
# -- subprocess invocation ---------------------------------------------
def _invoke(
self,
prompt: str,
*,
resume: str | None,
timeout: int,
max_turns: int,
with_system_prompt: bool,
on_text: Callable[[str], None] | None,
) -> dict:
cmd = self._build_cmd(
prompt,
resume=resume,
max_turns=max_turns,
with_system_prompt=with_system_prompt,
)
_t0 = time.monotonic()
result = _run_claude(cmd, timeout=timeout, on_text=on_text, cwd=self.cwd)
_elapsed = int((time.monotonic() - _t0) * 1000)
_invoke_log.info(
"planning slug=%s phase=%s adapter=%s channel=%s duration_ms=%d "
"tokens_in=%d tokens_out=%d session=%s subtype=%s cost=%.4f",
self.slug, self.phase, self.adapter, self.channel_id, _elapsed,
result.get("usage", {}).get("input_tokens", 0),
result.get("usage", {}).get("output_tokens", 0),
(result.get("session_id") or "")[:8],
result.get("subtype", ""),
float(result.get("total_cost_usd", 0) or 0),
)
return result
# -- public API: start/resume ------------------------------------------
@classmethod
def start(
cls,
slug: str,
description: str,
phase: str,
channel_id: str,
adapter: str = "echo",
timeout: int = DEFAULT_TIMEOUT,
on_text: Callable[[str], None] | None = None,
) -> "PlanningSession":
"""Kick off a new phase subprocess. First prompt is the skill call.
Returns a `PlanningSession` with `session_id`, `_last_response` set.
Persists state in `sessions/planning.json` keyed by `(adapter, channel_id)`.
Retries once with `RETRY_MAX_TURNS` if first run hits `error_max_turns`.
"""
session = cls(
slug=slug,
description=description,
phase=phase,
channel_id=channel_id,
adapter=adapter,
)
# Compose initial prompt — skill name + slug + description so the skill
# has enough hook to start.
initial_prompt = f"{phase} {description}".strip()
result = session._invoke(
initial_prompt,
resume=None,
timeout=timeout,
max_turns=DEFAULT_MAX_TURNS,
with_system_prompt=True,
on_text=on_text,
)
# Retry on error_max_turns — spike found this happens with deep tool-use.
if result.get("subtype") == "error_max_turns" and not result.get("session_id"):
logger.warning(
"planning start hit error_max_turns for %s/%s — retrying with %d turns",
slug, phase, RETRY_MAX_TURNS,
)
result = session._invoke(
initial_prompt,
resume=None,
timeout=timeout,
max_turns=RETRY_MAX_TURNS,
with_system_prompt=True,
on_text=on_text,
)
session.session_id = result.get("session_id") or None
session._last_response = result.get("result", "")
session._last_subtype = result.get("subtype", "")
session._last_is_error = bool(result.get("is_error", False))
session._persist(action="start", cost_usd=float(result.get("total_cost_usd", 0) or 0))
return session
def respond(
self,
message: str,
*,
timeout: int = DEFAULT_TIMEOUT,
on_text: Callable[[str], None] | None = None,
) -> str:
"""Send the user's reply to the running phase session via `--resume`.
Returns the response text. Updates persistent state.
"""
if not self.session_id:
raise RuntimeError(
"PlanningSession.respond called without an active session_id"
)
wrapped = f"[EXTERNAL CONTENT]\n{message}\n[END EXTERNAL CONTENT]"
result = self._invoke(
wrapped,
resume=self.session_id,
timeout=timeout,
max_turns=DEFAULT_MAX_TURNS,
with_system_prompt=False, # already in session
on_text=on_text,
)
# Retry once on error_max_turns
if result.get("subtype") == "error_max_turns":
logger.warning(
"planning respond hit error_max_turns for %s/%s — retrying",
self.slug, self.phase,
)
result = self._invoke(
wrapped,
resume=self.session_id,
timeout=timeout,
max_turns=RETRY_MAX_TURNS,
with_system_prompt=False,
on_text=on_text,
)
self._last_response = result.get("result", "")
self._last_subtype = result.get("subtype", "")
self._last_is_error = bool(result.get("is_error", False))
self._persist(
action="respond", cost_usd=float(result.get("total_cost_usd", 0) or 0)
)
return self._last_response
# -- introspection ------------------------------------------------------
def is_phase_ready(self) -> bool:
"""True if last response contained the ready-to-advance marker."""
return PHASE_READY_MARKER in (self._last_response or "")
@property
def last_response(self) -> str:
return self._last_response
@property
def last_subtype(self) -> str:
return self._last_subtype
# -- persistence --------------------------------------------------------
def _persist(self, *, action: str, cost_usd: float = 0.0) -> None:
data = _load_planning_state()
key = _channel_key(self.adapter, self.channel_id)
existing = data.get(key, {})
now = datetime.now(timezone.utc).isoformat()
phases_completed = existing.get("phases_completed", [])
# If this session changed phase, the orchestrator handles transition;
# we just keep our own slot consistent with the current phase.
entry = {
"slug": self.slug,
"description": self.description,
"phase": self.phase,
"phases_completed": phases_completed,
"session_id": self.session_id,
"planning_session_id": self.planning_session_id,
"adapter": self.adapter,
"channel_id": self.channel_id,
"started_at": existing.get("started_at", now),
"updated_at": now,
"last_text_excerpt": (self._last_response or "")[:500],
"last_subtype": self._last_subtype,
"total_cost_usd": (
float(existing.get("total_cost_usd") or 0.0) + float(cost_usd or 0.0)
),
}
data[key] = entry
_save_planning_state(data)
@classmethod
def from_state(cls, adapter: str, channel_id: str) -> "PlanningSession | None":
"""Reconstruct a session from `sessions/planning.json` (post-restart)."""
data = _load_planning_state()
entry = data.get(_channel_key(adapter, channel_id))
if not entry or not entry.get("session_id"):
return None
sess = cls(
slug=entry["slug"],
description=entry.get("description", ""),
phase=entry["phase"],
channel_id=channel_id,
adapter=adapter,
session_id=entry["session_id"],
planning_session_id=entry.get("planning_session_id"),
)
sess._last_subtype = entry.get("last_subtype", "")
sess._last_response = entry.get("last_text_excerpt", "")
return sess
# ---------------------------------------------------------------------------
# Module-level helpers consumed by router/orchestrator/adapters
# ---------------------------------------------------------------------------
def get_planning_state(adapter: str, channel_id: str) -> dict | None:
"""Return persisted planning state for a channel, or None."""
return _load_planning_state().get(_channel_key(adapter, channel_id))
def is_in_planning(adapter: str, channel_id: str) -> bool:
"""True if there is an active planning session for this channel."""
return get_planning_state(adapter, channel_id) is not None
def clear_planning_state(adapter: str, channel_id: str) -> bool:
"""Drop persisted planning state. Returns True if anything was cleared."""
data = _load_planning_state()
key = _channel_key(adapter, channel_id)
if key in data:
del data[key]
_save_planning_state(data)
return True
return False
def list_planning_sessions() -> dict:
"""Return all persisted planning sessions (for diagnostics)."""
return _load_planning_state()

View File

@@ -18,6 +18,12 @@ from src.claude_session import (
set_session_model,
VALID_MODELS,
)
from src.planning_orchestrator import PlanningOrchestrator
from src.planning_session import (
clear_planning_state,
get_planning_state,
is_in_planning,
)
log = logging.getLogger(__name__)
@@ -56,6 +62,57 @@ def route_message(
"""
text = text.strip()
# ---- Planning state-aware routing -----------------------------------
# If the channel is in an active planning session, the user's message is
# part of that conversation — route it to the orchestrator (NOT Claude
# main session, NOT slash commands except explicit /cancel and /advance).
in_planning = is_in_planning(adapter_name or "echo", channel_id)
if in_planning:
low = text.lower().strip()
if low in ("/cancel", "/anuleaza", "/anulează", "anulează planning", "anuleaza planning"):
# Capture slug BEFORE clearing state so we can revert approved-tasks status.
adapter_key = adapter_name or "echo"
state_snapshot = get_planning_state(adapter_key, channel_id)
cleared = PlanningOrchestrator.cancel(adapter_key, channel_id)
if state_snapshot and state_snapshot.get("slug"):
_revert_status_for_slug(state_snapshot["slug"], to="pending")
if cleared:
return "Planning anulat. Status revenit la pending.", True
return "Nu era nicio sesiune activă.", True
if low in ("/advance", "/continua", "/continuă", "continuă faza", "continua faza"):
session, response, completed = PlanningOrchestrator.advance(
adapter_name or "echo", channel_id, on_text=on_text,
)
return response, True
if low in ("/finalize", "/dau drumul", "dau drumul"):
return _approve_from_planning(channel_id, adapter_name or "echo"), True
if text.startswith("/"):
# Allow other commands to fall through (e.g. /status, /clear),
# but skip Ralph dispatch and Claude routing below.
pass
else:
# Plain message → planning conversation.
try:
session, response, phase_ready = PlanningOrchestrator.respond(
adapter_name or "echo", channel_id, text, on_text=on_text,
)
if session is None:
# State raced — drop planning marker, fall through.
log.warning(
"planning state vanished mid-respond for channel=%s", channel_id
)
else:
if phase_ready:
response = (
response
+ "\n\n— Apasă **Continuă faza** ca să trec la următoarea, "
"sau **Anulează** dacă te-ai răzgândit."
)
return response, False
except Exception as e:
log.error("Planning respond failed for %s: %s", channel_id, e)
return f"Planning blocat: {e}", False
# Ralph commands — short form (/p /a /l /k) and legacy aliases (!propose !approve !status !stop)
ralph_response = _try_ralph_dispatch(text, adapter_name=adapter_name)
if ralph_response is not None:
@@ -220,7 +277,11 @@ def _try_ralph_dispatch(text: str, adapter_name: str | None = None) -> str | Non
def _ralph_propose(slug: str, description: str) -> str:
"""Adaugă un proiect cu status pending în approved-tasks.json."""
"""Adaugă un proiect cu status pending în approved-tasks.json.
Schema includes the W2 planning fields (`planning_session_id`,
`final_plan_path`) so the orchestrator and PRD generator can find them.
"""
data = _load_approved_tasks()
for p in data["projects"]:
@@ -231,6 +292,8 @@ def _ralph_propose(slug: str, description: str) -> str:
"name": slug,
"description": description,
"status": "pending",
"planning_session_id": None,
"final_plan_path": None,
"proposed_at": datetime.now(timezone.utc).isoformat(),
"approved_at": None,
"started_at": None,
@@ -371,3 +434,155 @@ def _get_channel_config(channel_id: str) -> dict | None:
if ch.get("id") == channel_id:
return ch
return None
# ---------------------------------------------------------------------------
# Planning session entry points (W2)
# ---------------------------------------------------------------------------
def start_planning_session(
slug: str,
description: str,
channel_id: str,
adapter_name: str,
on_text: Callable[[str], None] | None = None,
) -> str:
"""Begin a conversational planning session for `slug` on this channel.
Updates approved-tasks.json: status `planning`, `planning_session_id` set.
Returns the first response text from the planning agent — the adapter
will display it and the user replies in the same channel.
"""
data = _load_approved_tasks()
# Locate or create the project entry.
entry = None
for p in data["projects"]:
if p["name"].lower() == slug.lower():
entry = p
break
if entry is None:
entry = {
"name": slug,
"description": description,
"status": "pending",
"planning_session_id": None,
"final_plan_path": None,
"proposed_at": datetime.now(timezone.utc).isoformat(),
"approved_at": None,
"started_at": None,
"pid": None,
}
data["projects"].append(entry)
# Kick off orchestrator (this can take ~60s on first turn — caller should
# have already shown a "Echo se gândește..." indicator).
try:
session, first_response = PlanningOrchestrator.start(
slug=slug,
description=description,
channel_id=channel_id,
adapter=adapter_name or "echo",
on_text=on_text,
)
except Exception as e:
log.error("Planning session start failed for %s: %s", slug, e)
return f"Planning blocat: {e}\n\nÎncearcă din nou cu /plan {slug} <descriere>."
entry["status"] = "planning"
entry["planning_session_id"] = session.planning_session_id
if not entry.get("description"):
entry["description"] = description
_save_approved_tasks(data)
return first_response
def _revert_status_for_slug(slug: str, to: str = "pending") -> None:
"""Revert a project's status (planning → `to`) given its slug."""
if not slug:
return
data = _load_approved_tasks()
changed = False
for p in data["projects"]:
if p["name"].lower() == slug.lower() and p.get("status") == "planning":
p["status"] = to
p["planning_session_id"] = None
changed = True
break
if changed:
_save_approved_tasks(data)
def _approve_from_planning(channel_id: str, adapter_name: str) -> str:
"""User clicked 'Dau drumul' inside an active planning session.
Promotes status `planning` → `approved` and clears planning state.
Returns confirmation text.
"""
state = get_planning_state(adapter_name, channel_id)
if not state:
return "Nu există o sesiune de planning activă."
slug = state.get("slug")
if not slug:
return "Sesiunea de planning nu are slug — anulează cu /cancel și ia-o de la capăt."
data = _load_approved_tasks()
final_plan_path = state.get("final_plan_path") or str(
PlanningOrchestrator.final_plan_path(slug)
)
found = False
for p in data["projects"]:
if p["name"].lower() == slug.lower():
p["status"] = "approved"
p["approved_at"] = datetime.now(timezone.utc).isoformat()
p["planning_session_id"] = None
p["final_plan_path"] = final_plan_path
found = True
break
if not found:
return f"Proiectul `{slug}` lipsește din approved-tasks.json. Anulează cu /cancel."
_save_approved_tasks(data)
clear_planning_state(adapter_name, channel_id)
return (
f"✅ Aprobat: `{slug}`. Ralph începe la 23:00.\n"
f" Plan: `{final_plan_path}`"
)
# Public helpers — re-exported for adapter wiring.
def planning_state_for(channel_id: str, adapter_name: str) -> dict | None:
"""Return current planning state for (adapter, channel) — adapter helper."""
return get_planning_state(adapter_name, channel_id)
def planning_advance(
channel_id: str,
adapter_name: str,
on_text: Callable[[str], None] | None = None,
) -> tuple[str, bool]:
"""Advance the planning pipeline by one phase.
Returns (response_text, completed_bool).
"""
_session, text, completed = PlanningOrchestrator.advance(
adapter_name, channel_id, on_text=on_text,
)
return text, completed
def planning_cancel(channel_id: str, adapter_name: str) -> str:
"""Cancel an active planning session and revert project status."""
state = get_planning_state(adapter_name, channel_id)
if not state:
return "Nu era nicio sesiune de planning activă."
slug = state.get("slug")
PlanningOrchestrator.cancel(adapter_name, channel_id)
if slug:
_revert_status_for_slug(slug, to="pending")
return "Planning anulat. Status revenit la pending."
def planning_approve(channel_id: str, adapter_name: str) -> str:
"""Promote planning → approved (e.g. button click 'Dau drumul')."""
return _approve_from_planning(channel_id, adapter_name)

View File

@@ -0,0 +1,243 @@
"""Tests for src/planning_orchestrator.py — phase pipeline coordinator."""
from __future__ import annotations
from unittest.mock import patch
import pytest
from src import planning_orchestrator, planning_session
from src.planning_orchestrator import (
BASE_PHASES,
DESIGN_PHASE,
PlanningOrchestrator,
_phases_for,
has_ui_scope,
)
@pytest.fixture
def tmp_planning_state(tmp_path, monkeypatch):
fake_sessions_dir = tmp_path / "sessions"
fake_sessions_dir.mkdir()
fake_state = fake_sessions_dir / "planning.json"
monkeypatch.setattr(planning_session, "SESSIONS_DIR", fake_sessions_dir)
monkeypatch.setattr(planning_session, "PLANNING_STATE_FILE", fake_state)
yield fake_state
@pytest.fixture
def fake_workspace(tmp_path, monkeypatch):
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "demo").mkdir()
monkeypatch.setattr(planning_session, "WORKSPACE_ROOT", workspace)
monkeypatch.setattr(planning_orchestrator, "WORKSPACE_ROOT", workspace)
yield workspace
# ---------------------------------------------------------------------------
# has_ui_scope / _phases_for
# ---------------------------------------------------------------------------
class TestUiScopeHeuristic:
@pytest.mark.parametrize("text,expected", [
("redesign UI for the dashboard", True),
("add a button on settings page", True),
("frontend cleanup", True),
("Adaugă filtru genuri pe pagina de game-library", True), # ro
("schimbă culoarea butonului de submit", True), # ro
("refactor utility helpers", False),
("rewrite the database migration scripts", False),
("tweak the rate limiter", False),
])
def test_detects_ui_keywords(self, text, expected):
assert has_ui_scope(text) is expected
def test_phases_for_excludes_design_when_no_ui(self):
phases = _phases_for("refactor utility")
assert phases == BASE_PHASES
assert DESIGN_PHASE not in phases
def test_phases_for_appends_design_for_ui(self):
phases = _phases_for("add login page")
assert phases[-1] == DESIGN_PHASE
assert phases[: len(BASE_PHASES)] == BASE_PHASES
# ---------------------------------------------------------------------------
# Orchestrator start / respond / advance / cancel — mock subprocess
# ---------------------------------------------------------------------------
def _fake_result(session_id="sess-1", text="hi"):
return {
"result": text,
"session_id": session_id,
"usage": {"input_tokens": 1000, "output_tokens": 200},
"total_cost_usd": 0.4,
"subtype": "success",
"is_error": False,
}
class TestOrchestratorStart:
def test_start_persists_phases_planned(
self, tmp_planning_state, fake_workspace
):
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-1"),
):
sess, first = PlanningOrchestrator.start(
"demo", "Add login button", "ch-1", adapter="discord"
)
assert sess.session_id == "s-1"
from src.planning_session import get_planning_state
state = get_planning_state("discord", "ch-1")
assert state["phase"] == BASE_PHASES[0] # /office-hours
# UI scope → design phase included
assert state["phases_planned"][-1] == DESIGN_PHASE
assert state["phase_index"] == 0
assert state["phases_completed"] == []
def test_start_no_ui_scope_no_design_phase(
self, tmp_planning_state, fake_workspace
):
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-1"),
):
PlanningOrchestrator.start(
"demo", "refactor utility helpers", "ch-1", adapter="discord"
)
from src.planning_session import get_planning_state
state = get_planning_state("discord", "ch-1")
assert DESIGN_PHASE not in state["phases_planned"]
class TestOrchestratorRespond:
def test_respond_returns_text_and_phase_ready_marker(
self, tmp_planning_state, fake_workspace
):
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-1", text="initial"),
):
PlanningOrchestrator.start("demo", "Add login button", "ch-1", "discord")
ready_text = "ok we are done. PHASE_STATUS: ready_to_advance"
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-1", text=ready_text),
):
sess, response, ready = PlanningOrchestrator.respond(
"discord", "ch-1", "user reply"
)
assert response == ready_text
assert ready is True
assert sess is not None
def test_respond_returns_none_when_no_state(self, tmp_planning_state):
sess, text, ready = PlanningOrchestrator.respond(
"discord", "ch-missing", "hi"
)
assert sess is None
assert "Nu există" in text
assert ready is False
class TestOrchestratorAdvance:
def test_advance_starts_next_phase_fresh_subprocess(
self, tmp_planning_state, fake_workspace
):
# Phase 1 → office-hours
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-1"),
):
PlanningOrchestrator.start(
"demo", "Add login button", "ch-1", "discord"
)
# Advance → /plan-ceo-review fresh subprocess
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-2", text="ceo phase started"),
) as mock_run:
sess, text, completed = PlanningOrchestrator.advance(
"discord", "ch-1"
)
mock_run.assert_called_once()
# Verify the new subprocess has /plan-ceo-review in prompt (NOT --resume)
cmd = mock_run.call_args[0][0]
assert "/plan-ceo-review" in cmd[2]
assert "--resume" not in cmd
assert sess.session_id == "s-2"
assert sess.phase == "/plan-ceo-review"
assert completed is False
from src.planning_session import get_planning_state
state = get_planning_state("discord", "ch-1")
assert "/office-hours" in state["phases_completed"]
assert state["phase_index"] == 1
def test_advance_writes_final_plan_when_pipeline_complete(
self, tmp_planning_state, fake_workspace
):
# Manually seed state at the last phase.
from src.planning_session import _save_planning_state, _channel_key
# Build phase plan with 2 phases for brevity (skip design for non-UI).
state = {
_channel_key("discord", "ch-1"): {
"slug": "demo",
"description": "refactor utility",
"phase": "/plan-eng-review",
"phases_planned": ["/office-hours", "/plan-ceo-review", "/plan-eng-review"],
"phase_index": 2,
"phases_completed": ["/office-hours", "/plan-ceo-review"],
"session_id": "s-eng",
"planning_session_id": "ps-uuid",
"adapter": "discord",
"channel_id": "ch-1",
"started_at": "2026-04-26T20:00:00+00:00",
"updated_at": "2026-04-26T20:30:00+00:00",
"last_text_excerpt": "eng review done",
"last_subtype": "success",
}
}
_save_planning_state(state)
# Advance with no more phases — should write final-plan stub, no claude call.
with patch("src.planning_session._run_claude") as mock_run:
sess, text, completed = PlanningOrchestrator.advance(
"discord", "ch-1"
)
mock_run.assert_not_called()
assert completed is True
plan_path = PlanningOrchestrator.final_plan_path("demo")
assert plan_path.exists()
content = plan_path.read_text(encoding="utf-8")
assert "demo" in content
assert "refactor utility" in content
# All phases listed
assert "/office-hours" in content
assert "/plan-ceo-review" in content
assert "/plan-eng-review" in content
class TestOrchestratorCancel:
def test_cancel_clears_state(self, tmp_planning_state, fake_workspace):
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-1"),
):
PlanningOrchestrator.start("demo", "x", "ch-1", "discord")
from src.planning_session import is_in_planning
assert is_in_planning("discord", "ch-1") is True
assert PlanningOrchestrator.cancel("discord", "ch-1") is True
assert is_in_planning("discord", "ch-1") is False
def test_cancel_returns_false_when_no_state(self, tmp_planning_state):
assert PlanningOrchestrator.cancel("discord", "ch-x") is False

View File

@@ -0,0 +1,278 @@
"""Tests for src/planning_session.py — PlanningSession + state persistence."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from src import planning_session
from src.planning_session import (
PHASE_NEEDS_INPUT_MARKER,
PHASE_READY_MARKER,
PlanningSession,
_channel_key,
build_planning_system_prompt,
clear_planning_state,
get_planning_state,
is_in_planning,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_planning_state(tmp_path, monkeypatch):
"""Redirect planning state file into a tmp dir for each test."""
fake_sessions_dir = tmp_path / "sessions"
fake_sessions_dir.mkdir()
fake_state = fake_sessions_dir / "planning.json"
monkeypatch.setattr(planning_session, "SESSIONS_DIR", fake_sessions_dir)
monkeypatch.setattr(planning_session, "PLANNING_STATE_FILE", fake_state)
yield fake_state
@pytest.fixture
def fake_workspace(tmp_path, monkeypatch):
"""Pretend ~/workspace/<slug>/ exists so PlanningSession.cwd resolves."""
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "demo").mkdir()
monkeypatch.setattr(planning_session, "WORKSPACE_ROOT", workspace)
yield workspace
# ---------------------------------------------------------------------------
# build_planning_system_prompt
# ---------------------------------------------------------------------------
class TestBuildPlanningSystemPrompt:
def test_substitutes_slug_phase_description(self):
prompt = build_planning_system_prompt(
slug="demo", description="Add filter X", phase="/office-hours"
)
# Even if the prompt template differs, the values must appear at least
# once each.
assert "demo" in prompt
assert "Add filter X" in prompt
assert "/office-hours" in prompt
def test_returns_empty_when_template_missing(self, tmp_path, monkeypatch):
monkeypatch.setattr(
planning_session, "PLANNING_PROMPT_FILE", tmp_path / "missing.md"
)
assert build_planning_system_prompt("a", "b", "/x") == ""
# ---------------------------------------------------------------------------
# state get/set/clear
# ---------------------------------------------------------------------------
class TestPlanningState:
def test_clear_returns_false_when_absent(self, tmp_planning_state):
assert clear_planning_state("discord", "ch-1") is False
def test_get_returns_none_when_absent(self, tmp_planning_state):
assert get_planning_state("discord", "ch-1") is None
assert is_in_planning("discord", "ch-1") is False
def test_persist_and_recover(self, tmp_planning_state, fake_workspace):
# Build a session WITHOUT actually invoking claude — call _persist directly.
sess = PlanningSession(
slug="demo",
description="desc",
phase="/office-hours",
channel_id="ch-1",
adapter="discord",
session_id="sess-uuid-1",
)
sess._last_response = "hello world " + PHASE_NEEDS_INPUT_MARKER
sess._last_subtype = "success"
sess._persist(action="start", cost_usd=0.42)
assert is_in_planning("discord", "ch-1") is True
state = get_planning_state("discord", "ch-1")
assert state is not None
assert state["slug"] == "demo"
assert state["session_id"] == "sess-uuid-1"
assert state["phase"] == "/office-hours"
assert state["last_subtype"] == "success"
assert "hello world" in state["last_text_excerpt"]
recovered = PlanningSession.from_state("discord", "ch-1")
assert recovered is not None
assert recovered.slug == "demo"
assert recovered.session_id == "sess-uuid-1"
assert recovered.phase == "/office-hours"
assert clear_planning_state("discord", "ch-1") is True
assert get_planning_state("discord", "ch-1") is None
# ---------------------------------------------------------------------------
# is_phase_ready
# ---------------------------------------------------------------------------
class TestIsPhaseReady:
def test_returns_true_when_marker_present(self, fake_workspace):
sess = PlanningSession("demo", "d", "/x", "ch", session_id="abc")
sess._last_response = f"some text {PHASE_READY_MARKER}"
assert sess.is_phase_ready() is True
def test_returns_false_when_marker_absent(self, fake_workspace):
sess = PlanningSession("demo", "d", "/x", "ch", session_id="abc")
sess._last_response = "some text without marker"
assert sess.is_phase_ready() is False
# ---------------------------------------------------------------------------
# cwd resolution
# ---------------------------------------------------------------------------
class TestCwd:
def test_workspace_dir_used_when_present(self, fake_workspace):
sess = PlanningSession("demo", "d", "/x", "ch")
assert sess.cwd == fake_workspace / "demo"
def test_falls_back_to_project_root_when_missing(
self, fake_workspace, monkeypatch
):
sess = PlanningSession("nonexistent-slug", "d", "/x", "ch")
# Falls back to PROJECT_ROOT
assert sess.cwd == planning_session.PROJECT_ROOT
# ---------------------------------------------------------------------------
# command construction
# ---------------------------------------------------------------------------
class TestBuildCmd:
def test_start_includes_skill_phase_and_max_turns(self, fake_workspace):
sess = PlanningSession("demo", "Add filter", "/office-hours", "ch")
cmd = sess._build_cmd(
"/office-hours Add filter",
resume=None,
max_turns=20,
with_system_prompt=False,
)
assert cmd[0:3] == [planning_session.CLAUDE_BIN, "-p", "/office-hours Add filter"]
assert "--max-turns" in cmd
assert "20" in cmd
assert "--output-format" in cmd
assert "stream-json" in cmd
assert "--dangerously-skip-permissions" in cmd
# No --resume on a fresh start
assert "--resume" not in cmd
def test_resume_includes_resume_flag(self, fake_workspace):
sess = PlanningSession(
"demo", "Add filter", "/office-hours", "ch", session_id="abc"
)
cmd = sess._build_cmd(
"user reply",
resume="abc",
max_turns=20,
with_system_prompt=False,
)
assert "--resume" in cmd
assert "abc" in cmd
def test_with_system_prompt_appends_flag(self, fake_workspace, tmp_path, monkeypatch):
# Create a tiny prompt file so build_planning_system_prompt returns text.
fake = tmp_path / "planning_agent.md"
fake.write_text("phase={phase} slug={slug}", encoding="utf-8")
monkeypatch.setattr(planning_session, "PLANNING_PROMPT_FILE", fake)
sess = PlanningSession("demo", "d", "/office-hours", "ch")
cmd = sess._build_cmd(
"prompt", resume=None, max_turns=20, with_system_prompt=True
)
assert "--system-prompt" in cmd
idx = cmd.index("--system-prompt")
assert "phase=/office-hours" in cmd[idx + 1]
assert "slug=demo" in cmd[idx + 1]
# ---------------------------------------------------------------------------
# start() — integration-flavoured, mocks _run_claude
# ---------------------------------------------------------------------------
class TestStart:
def test_persists_session_id_and_response(
self, tmp_planning_state, fake_workspace
):
fake_result = {
"result": "Bună! Câteva întrebări… " + PHASE_NEEDS_INPUT_MARKER,
"session_id": "claude-uuid-99",
"usage": {"input_tokens": 1000, "output_tokens": 200},
"total_cost_usd": 0.55,
"subtype": "success",
"is_error": False,
}
with patch("src.planning_session._run_claude", return_value=fake_result) as mock_run:
sess = PlanningSession.start(
slug="demo",
description="Add filter X",
phase="/office-hours",
channel_id="ch-1",
adapter="discord",
)
mock_run.assert_called_once()
# cwd kw passed
_, kwargs = mock_run.call_args
assert "cwd" in kwargs
assert sess.session_id == "claude-uuid-99"
assert "Bună" in sess.last_response
state = get_planning_state("discord", "ch-1")
assert state["session_id"] == "claude-uuid-99"
assert state["slug"] == "demo"
assert state["phase"] == "/office-hours"
def test_retries_on_error_max_turns(self, tmp_planning_state, fake_workspace):
# First call returns error_max_turns with no session_id; second returns success.
first = {
"result": "deep tool use",
"session_id": "",
"usage": {},
"total_cost_usd": 0.6,
"subtype": "error_max_turns",
"is_error": True,
}
second = {
"result": "now I have a question",
"session_id": "claude-uuid-2",
"usage": {},
"total_cost_usd": 0.7,
"subtype": "success",
"is_error": False,
}
with patch(
"src.planning_session._run_claude", side_effect=[first, second]
) as mock_run:
sess = PlanningSession.start(
slug="demo",
description="Add filter X",
phase="/office-hours",
channel_id="ch-1",
adapter="discord",
)
assert mock_run.call_count == 2
assert sess.session_id == "claude-uuid-2"
# Second call uses RETRY_MAX_TURNS
second_args = mock_run.call_args_list[1][0][0]
assert "30" in second_args # RETRY_MAX_TURNS
def test_respond_requires_session_id(self, fake_workspace):
sess = PlanningSession("demo", "d", "/x", "ch") # no session_id
with pytest.raises(RuntimeError):
sess.respond("hello")

View File

@@ -0,0 +1,273 @@
"""Tests for src/router.py planning integration (W2 — state-aware routing,
start_planning_session, planning_approve, planning_cancel)."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from src import planning_session, planning_orchestrator, router
from src.planning_session import _channel_key, _save_planning_state
@pytest.fixture
def tmp_state(tmp_path, monkeypatch):
"""Redirect planning + approved-tasks into tmp."""
sessions_dir = tmp_path / "sessions"
sessions_dir.mkdir()
monkeypatch.setattr(planning_session, "SESSIONS_DIR", sessions_dir)
monkeypatch.setattr(
planning_session, "PLANNING_STATE_FILE", sessions_dir / "planning.json"
)
# approved-tasks.json — point router at a tmp file
approved = tmp_path / "approved-tasks.json"
approved.write_text(json.dumps({"projects": [], "last_updated": None}))
monkeypatch.setattr(router, "APPROVED_TASKS_FILE", approved)
# workspace dir for planning orchestrator final-plan.md target
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "demo").mkdir()
monkeypatch.setattr(planning_session, "WORKSPACE_ROOT", workspace)
monkeypatch.setattr(planning_orchestrator, "WORKSPACE_ROOT", workspace)
yield {"sessions": sessions_dir, "approved": approved, "workspace": workspace}
def _fake_result(session_id="s-1", text="hi"):
return {
"result": text,
"session_id": session_id,
"usage": {"input_tokens": 10, "output_tokens": 5},
"total_cost_usd": 0.1,
"subtype": "success",
"is_error": False,
}
# ---------------------------------------------------------------------------
# start_planning_session
# ---------------------------------------------------------------------------
class TestStartPlanningSession:
def test_creates_entry_and_sets_status_planning(self, tmp_state):
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-1", text="first"),
):
response = router.start_planning_session(
"demo", "Add filter X", "ch-1", "discord"
)
assert response == "first"
approved = json.loads(tmp_state["approved"].read_text())
assert len(approved["projects"]) == 1
entry = approved["projects"][0]
assert entry["name"] == "demo"
assert entry["status"] == "planning"
assert entry["planning_session_id"] # uuid
def test_promotes_existing_pending_entry(self, tmp_state):
# Pre-seed an existing pending entry
approved_data = {
"projects": [
{
"name": "demo",
"description": "from earlier",
"status": "pending",
"planning_session_id": None,
"final_plan_path": None,
"proposed_at": "2026-04-26T18:00:00+00:00",
"approved_at": None,
"started_at": None,
"pid": None,
}
],
"last_updated": None,
}
tmp_state["approved"].write_text(json.dumps(approved_data))
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(session_id="s-2", text="hi"),
):
router.start_planning_session(
"demo", "Add filter X", "ch-1", "discord"
)
approved = json.loads(tmp_state["approved"].read_text())
assert len(approved["projects"]) == 1
assert approved["projects"][0]["status"] == "planning"
# ---------------------------------------------------------------------------
# planning_approve
# ---------------------------------------------------------------------------
class TestPlanningApprove:
def test_promotes_status_and_clears_state(self, tmp_state):
# Seed an active planning state + approved-tasks pending entry
_save_planning_state({
_channel_key("discord", "ch-1"): {
"slug": "demo",
"description": "x",
"phase": "__complete__",
"phases_planned": ["/office-hours"],
"phases_completed": ["/office-hours"],
"phase_index": 1,
"session_id": "s-uuid",
"planning_session_id": "ps-uuid",
"final_plan_path": "/tmp/final-plan.md",
"adapter": "discord",
"channel_id": "ch-1",
"started_at": "2026-04-26T20:00:00+00:00",
"updated_at": "2026-04-26T20:30:00+00:00",
"last_text_excerpt": "done",
"last_subtype": "success",
}
})
approved_data = {
"projects": [
{
"name": "demo",
"description": "x",
"status": "planning",
"planning_session_id": "ps-uuid",
"final_plan_path": None,
"proposed_at": "2026-04-26T18:00:00+00:00",
"approved_at": None,
"started_at": None,
"pid": None,
}
],
"last_updated": None,
}
tmp_state["approved"].write_text(json.dumps(approved_data))
msg = router.planning_approve("ch-1", "discord")
assert "Aprobat" in msg or "" in msg
approved = json.loads(tmp_state["approved"].read_text())
entry = approved["projects"][0]
assert entry["status"] == "approved"
assert entry["approved_at"] is not None
assert entry["planning_session_id"] is None
assert entry["final_plan_path"] # set
# Planning state cleared
from src.planning_session import is_in_planning
assert is_in_planning("discord", "ch-1") is False
def test_no_state_returns_error_message(self, tmp_state):
msg = router.planning_approve("ch-missing", "discord")
assert "Nu există" in msg
# ---------------------------------------------------------------------------
# planning_cancel via route_message /cancel
# ---------------------------------------------------------------------------
class TestRouteMessagePlanningCancel:
def test_slash_cancel_in_planning_clears_state(self, tmp_state):
# Seed a planning session and approved-tasks pending entry
_save_planning_state({
_channel_key("discord", "ch-1"): {
"slug": "demo",
"description": "x",
"phase": "/office-hours",
"phases_planned": ["/office-hours", "/plan-ceo-review"],
"phases_completed": [],
"phase_index": 0,
"session_id": "s-uuid",
"planning_session_id": "ps-uuid",
"adapter": "discord",
"channel_id": "ch-1",
"started_at": "2026-04-26T20:00:00+00:00",
"updated_at": "2026-04-26T20:00:00+00:00",
"last_text_excerpt": "Hi",
"last_subtype": "success",
}
})
approved_data = {
"projects": [
{
"name": "demo",
"description": "x",
"status": "planning",
"planning_session_id": "ps-uuid",
"final_plan_path": None,
"proposed_at": "2026-04-26T18:00:00+00:00",
"approved_at": None,
"started_at": None,
"pid": None,
}
],
"last_updated": None,
}
tmp_state["approved"].write_text(json.dumps(approved_data))
response, is_cmd = router.route_message(
"ch-1", "user-1", "/cancel", adapter_name="discord"
)
assert is_cmd is True
assert "anulat" in response.lower()
approved = json.loads(tmp_state["approved"].read_text())
assert approved["projects"][0]["status"] == "pending"
from src.planning_session import is_in_planning
assert is_in_planning("discord", "ch-1") is False
class TestRouteMessagePlanningRespond:
def test_plain_message_in_planning_routes_to_orchestrator(self, tmp_state):
# Seed a planning session
_save_planning_state({
_channel_key("discord", "ch-1"): {
"slug": "demo",
"description": "x",
"phase": "/office-hours",
"phases_planned": ["/office-hours"],
"phases_completed": [],
"phase_index": 0,
"session_id": "s-uuid",
"planning_session_id": "ps-uuid",
"adapter": "discord",
"channel_id": "ch-1",
"started_at": "2026-04-26T20:00:00+00:00",
"updated_at": "2026-04-26T20:00:00+00:00",
"last_text_excerpt": "Hi",
"last_subtype": "success",
}
})
with patch(
"src.planning_session._run_claude",
return_value=_fake_result(
session_id="s-uuid", text="thanks PHASE_STATUS: needs_input"
),
) as mock_run:
response, is_cmd = router.route_message(
"ch-1", "user-1", "Vreau așa ceva.", adapter_name="discord"
)
mock_run.assert_called_once()
# respond uses --resume
cmd = mock_run.call_args[0][0]
assert "--resume" in cmd
assert is_cmd is False
assert "thanks" in response
def test_no_planning_state_falls_through_to_normal_routing(self, tmp_state):
# No planning state — should go to ralph dispatch / Claude.
with patch(
"src.router.send_message", return_value="claude says hi"
) as mock_send:
response, is_cmd = router.route_message(
"ch-1", "user-1", "hello",
adapter_name="discord",
model="sonnet",
)
mock_send.assert_called_once()
assert response == "claude says hi"
assert is_cmd is False