"""PlanningOrchestrator — multi-phase planning coordinator. Sequences `/office-hours → /plan-ceo-review → /plan-eng-review → /plan-design-review` (last only when description hints at UI scope), each in a **fresh subprocess** (per W2 plan + spike findings). Phases coordinate via disk artifacts (gstack convention: `~/.gstack/projects//...`). API used by router/adapters: PlanningOrchestrator.start(slug, description, channel_id, adapter) → (session, first_response_text) PlanningOrchestrator.respond(adapter, channel_id, message) → (session, response_text, phase_ready: bool) PlanningOrchestrator.advance(adapter, channel_id) → (session, first_response_text, completed: bool) # completed=True when no further phase remains; final-plan stub # written by the orchestrator if the planning agent didn't. PlanningOrchestrator.cancel(adapter, channel_id) → bool PlanningOrchestrator.has_ui_scope(description) → bool The orchestrator writes the final-plan stub path even if planning agent did not (so PRD generator always has something to read in W3). """ from __future__ import annotations import logging import re from pathlib import Path from typing import Callable from src.planning_session import ( GSTACK_PROJECTS_ROOT, PHASE_READY_MARKER, WORKSPACE_ROOT, PlanningSession, _load_planning_state, _save_planning_state, _channel_key, clear_planning_state, get_planning_state, ) log = logging.getLogger(__name__) # Ordered phase pipeline. The design phase is appended only when the # description hints at UI scope (heuristic — see has_ui_scope). BASE_PHASES = ["/office-hours", "/plan-ceo-review", "/plan-eng-review"] DESIGN_PHASE = "/plan-design-review" UI_HINT_PATTERN = re.compile( r"\b(ui|ux|frontend|design|button|page|css|html|interfa[țt]?[aăă]?|" r"layout|component|view|dashboard|modal|form|screen|" # Romanian variants r"pagin[ăa]|buton|ecran|formular)", re.IGNORECASE, ) def has_ui_scope(description: str) -> bool: """Cheap heuristic — adds /plan-design-review when description mentions UI.""" return bool(UI_HINT_PATTERN.search(description or "")) def _phases_for(description: str) -> list[str]: phases = list(BASE_PHASES) if has_ui_scope(description): phases.append(DESIGN_PHASE) return phases def _final_plan_path(slug: str) -> Path: return WORKSPACE_ROOT / slug / "scripts" / "ralph" / "final-plan.md" def _ensure_final_plan_dir(slug: str) -> Path: target = _final_plan_path(slug) target.parent.mkdir(parents=True, exist_ok=True) return target # --------------------------------------------------------------------------- # Orchestrator # --------------------------------------------------------------------------- class PlanningOrchestrator: """Stateless coordinator — all state is in `sessions/planning.json`.""" @staticmethod def start( slug: str, description: str, channel_id: str, adapter: str = "echo", on_text: Callable[[str], None] | None = None, ) -> tuple[PlanningSession, str]: """Begin planning at phase 0 (`/office-hours`). Persists state. Returns (PlanningSession, first_response_text). """ phases = _phases_for(description) first_phase = phases[0] log.info( "planning.start slug=%s adapter=%s channel=%s phases=%s", slug, adapter, channel_id, phases, ) # Wipe any prior state for this channel (start fresh). clear_planning_state(adapter, channel_id) session = PlanningSession.start( slug=slug, description=description, phase=first_phase, channel_id=channel_id, adapter=adapter, on_text=on_text, ) # Stash phase plan into disk state so advance() knows the pipeline. data = _load_planning_state() key = _channel_key(adapter, channel_id) if key in data: data[key]["phases_planned"] = phases data[key]["phase_index"] = 0 data[key]["phases_completed"] = [] _save_planning_state(data) return session, session.last_response @staticmethod def respond( adapter: str, channel_id: str, message: str, on_text: Callable[[str], None] | None = None, ) -> tuple[PlanningSession | None, str, bool]: """Forward `message` into the active phase via `--resume`. Returns (session, response_text, phase_ready). `phase_ready=True` means the planning agent emitted PHASE_READY_MARKER — the adapter should surface a "Continuă faza" / "Finalizează" button. """ session = PlanningSession.from_state(adapter, channel_id) if session is None: return None, "Nu există o sesiune de planning activă pe acest canal.", False text = session.respond(message, on_text=on_text) return session, text, session.is_phase_ready() @staticmethod def advance( adapter: str, channel_id: str, on_text: Callable[[str], None] | None = None, ) -> tuple[PlanningSession | None, str, bool]: """Move to the next phase (fresh subprocess). Returns (session, text, completed). If no more phases remain, writes a final-plan.md stub (if the agent didn't) and returns (last_session, summary_text, completed=True). """ state = get_planning_state(adapter, channel_id) if not state: return None, "Nu există o sesiune de planning activă.", False phases = state.get("phases_planned") or _phases_for(state.get("description", "")) completed = list(state.get("phases_completed") or []) current_phase = state.get("phase") if current_phase and current_phase not in completed: completed.append(current_phase) # Find next phase try: cur_idx = phases.index(current_phase) if current_phase in phases else -1 except ValueError: cur_idx = -1 next_idx = cur_idx + 1 slug = state["slug"] description = state.get("description", "") if next_idx >= len(phases): # Pipeline complete — ensure final-plan.md exists, return summary. target = _ensure_final_plan_dir(slug) if not target.exists(): stub = _build_final_plan_stub(slug, description, completed, state) target.write_text(stub, encoding="utf-8") log.info("planning.advance wrote final-plan stub: %s", target) # Persist completion marker but keep state so adapter can show # "Dau drumul tonight?" buttons. data = _load_planning_state() key = _channel_key(adapter, channel_id) if key in data: data[key]["phases_completed"] = completed data[key]["phase"] = "__complete__" data[key]["final_plan_path"] = str(target) _save_planning_state(data) session = PlanningSession.from_state(adapter, channel_id) summary = _summary_text(slug, completed, target) return session, summary, True next_phase = phases[next_idx] log.info( "planning.advance slug=%s adapter=%s channel=%s %s → %s", slug, adapter, channel_id, current_phase, next_phase, ) # Fresh subprocess for the next phase. Phase coordinates with prior # phase via gstack disk artifacts (~/.gstack/projects//). session = PlanningSession.start( slug=slug, description=description, phase=next_phase, channel_id=channel_id, adapter=adapter, on_text=on_text, ) data = _load_planning_state() key = _channel_key(adapter, channel_id) if key in data: data[key]["phases_completed"] = completed data[key]["phase_index"] = next_idx data[key]["phases_planned"] = phases _save_planning_state(data) return session, session.last_response, False @staticmethod def cancel(adapter: str, channel_id: str) -> bool: """Drop planning state. Returns True if anything was cleared.""" return clear_planning_state(adapter, channel_id) @staticmethod def final_plan_path(slug: str) -> Path: return _final_plan_path(slug) # Re-exported for convenience. has_ui_scope = staticmethod(has_ui_scope) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _summary_text(slug: str, completed_phases: list[str], plan_path: Path) -> str: phases_str = " → ".join(completed_phases) if completed_phases else "(nicio fază)" return ( f"✅ **Plan gata pentru `{slug}`**\n\n" f"Faze rulate: {phases_str}\n" f"Plan salvat: `{plan_path}`\n\n" "Apasă **Dau drumul tonight** ca Ralph să-l implementeze la 23:00, " "sau **Anulează** dacă vrei să mai gândim." ) def _build_final_plan_stub( slug: str, description: str, completed_phases: list[str], state: dict ) -> str: """Emit a minimal final-plan.md when the planning agent didn't write one. Captures what we know so PRD generator (W3) has something concrete to read. """ phases_lines = "\n".join(f"- `{p}`" for p in completed_phases) or "- (none)" last_excerpt = (state.get("last_text_excerpt") or "").strip() last_block = ( f"\n\n## Last agent output excerpt\n\n```\n{last_excerpt[:2000]}\n```" if last_excerpt else "" ) return f"""# Final plan — {slug} ## Context {description} ## Phases completed {phases_lines} ## Architecture overview (To be filled by planning agent. Stub written by PlanningOrchestrator because the agent didn't write its own `final-plan.md` before pipeline completion.) ## User stories preliminare (Stub. Ralph PRD generator will infer concrete stories from this plan + repo state.) ## Implementation hints (Stub.) ## Verification approach - typecheck + lint + tests pe modulele atinse - smart gates Ralph pe tags inferred per story {last_block} """