"""Ralph UX flow state — short-lived per (adapter, chat, user) state for interactive menus. Tracks state for multi-step flows like "user clicked Propose → next message is description". Persisted in sessions/ralph_flow.json so it survives Echo Core restart. TTL: 10 min default; cleanup_expired() drops stale entries. """ import json import logging import os import tempfile from datetime import datetime, timedelta, timezone from pathlib import Path log = logging.getLogger(__name__) PROJECT_ROOT = Path(__file__).resolve().parent.parent SESSIONS_DIR = PROJECT_ROOT / "sessions" _STATE_FILE = SESSIONS_DIR / "ralph_flow.json" DEFAULT_TTL_SECONDS = 600 # 10 minutes # Step values used across adapters STEP_INPUT_DESCRIPTION = "input_description" STEP_IN_PLANNING = "in_planning" # reserved for W2 (planning agent) def _key(adapter: str, chat_id: str, user_id: str) -> str: return f"{adapter}:{chat_id}:{user_id}" def _load() -> dict: try: text = _STATE_FILE.read_text(encoding="utf-8") if not text.strip(): return {} return json.loads(text) except (FileNotFoundError, json.JSONDecodeError): return {} def _save(data: dict) -> None: SESSIONS_DIR.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp( dir=SESSIONS_DIR, prefix=".ralph_flow_", suffix=".json" ) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") os.replace(tmp_path, _STATE_FILE) except BaseException: try: os.unlink(tmp_path) except OSError: pass raise def _is_expired(entry: dict, now: datetime | None = None) -> bool: expires_at = entry.get("expires_at") if not expires_at: return False try: return datetime.fromisoformat(expires_at) < (now or datetime.now(timezone.utc)) except ValueError: return True def get_state(adapter: str, chat_id: str, user_id: str) -> dict | None: """Return current state or None if absent/expired. Drops expired entries on read.""" data = _load() key = _key(adapter, chat_id, user_id) entry = data.get(key) if entry is None: return None if _is_expired(entry): del data[key] _save(data) return None return entry def set_state( adapter: str, chat_id: str, user_id: str, step: str, project: str | None = None, ttl_seconds: int = DEFAULT_TTL_SECONDS, **extras, ) -> None: """Set state for (adapter, chat, user). Overwrites any previous state.""" data = _load() expires_at = ( datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds) ).isoformat() entry: dict = {"step": step, "expires_at": expires_at} if project is not None: entry["project"] = project entry.update(extras) data[_key(adapter, chat_id, user_id)] = entry _save(data) def clear_state(adapter: str, chat_id: str, user_id: str) -> bool: """Clear state. Returns True if anything was cleared.""" data = _load() key = _key(adapter, chat_id, user_id) if key in data: del data[key] _save(data) return True return False def cleanup_expired() -> int: """Drop all expired entries. Returns count dropped.""" data = _load() now = datetime.now(timezone.utc) dropped = 0 for k in list(data.keys()): if _is_expired(data[k], now=now): del data[k] dropped += 1 if dropped: _save(data) return dropped