"""Cookie-based authentication for the unified dashboard. This mixin provides: - POST /api/auth/login — exchanges a token (form body) for a cookie. - POST /api/auth/logout — clears the cookie. - _check_dashboard_cookie — used by the global POST middleware (and the SSE GET endpoint) to gate access. `DASHBOARD_TOKEN` is read once from `dashboard/.env` (loaded into `os.environ` by `dashboard/constants.py` at import time). When the token is not configured we generate a random one at startup, stash it in-process, and warn loudly to stderr — this means the dashboard is reachable from localhost only with a freshly-printed token (printed once at boot). """ from __future__ import annotations import json import logging import os import secrets import sys from urllib.parse import parse_qs log = logging.getLogger(__name__) # 30 days _COOKIE_MAX_AGE = 60 * 60 * 24 * 30 _COOKIE_NAME = "dashboard" _COOKIE_PATH = "/echo/" # Module-level cache for the resolved token. Set lazily on first call so # importing this module doesn't have a side effect at process boot. _DASHBOARD_TOKEN: str | None = None def _get_dashboard_token() -> str: """Return the dashboard token (cached). Generates a random one if absent. `dashboard/constants.py` already loads `dashboard/.env` into os.environ at import time, so by the time this is called the value (if present) is in `os.environ['DASHBOARD_TOKEN']`. If missing, we mint a 32-byte URL-safe token and warn — operators must read it from the log to log in. """ global _DASHBOARD_TOKEN if _DASHBOARD_TOKEN is not None: return _DASHBOARD_TOKEN token = os.environ.get("DASHBOARD_TOKEN", "").strip() if not token: token = secrets.token_urlsafe(32) msg = ( "[auth] DASHBOARD_TOKEN not set in dashboard/.env — generated a " f"random token for this process: {token}\n" " Add `DASHBOARD_TOKEN=` to dashboard/.env to make it " "stable across restarts.\n" ) print(msg, file=sys.stderr, flush=True) log.warning("DASHBOARD_TOKEN not configured — using ephemeral token") _DASHBOARD_TOKEN = token return token def _parse_cookie_header(raw: str) -> dict[str, str]: """Tiny RFC 6265 cookie-pair parser. Last-write-wins on duplicates.""" out: dict[str, str] = {} if not raw: return out for chunk in raw.split(";"): chunk = chunk.strip() if not chunk or "=" not in chunk: continue k, v = chunk.split("=", 1) out[k.strip()] = v.strip() return out class AuthHandlers: """Mixin: /api/auth/login, /api/auth/logout, plus _check_dashboard_cookie.""" # ── helpers ──────────────────────────────────────────────────────── def _check_dashboard_cookie(self) -> bool: """Return True if the request carries a valid `dashboard` cookie.""" raw = self.headers.get("Cookie", "") or "" cookies = _parse_cookie_header(raw) provided = cookies.get(_COOKIE_NAME, "") if not provided: return False expected = _get_dashboard_token() # Constant-time compare — token guess attacks aren't realistic here # (cookie path is /echo/, HttpOnly), but cheap defense in depth. return secrets.compare_digest(provided, expected) def _read_form_body(self) -> dict[str, str]: """Parse `application/x-www-form-urlencoded` POST body.""" try: length = int(self.headers.get("Content-Length", "0") or "0") except (TypeError, ValueError): length = 0 if length <= 0: return {} try: raw = self.rfile.read(length).decode("utf-8") except (UnicodeDecodeError, OSError): return {} parsed = parse_qs(raw, keep_blank_values=True) # Flatten — single-value form fields only return {k: v[0] for k, v in parsed.items() if v} # ── POST /api/auth/login ─────────────────────────────────────────── def handle_login(self): """Validate token from form body; on success, set cookie + 302 to workspace. On failure, return 401 JSON. The cookie is set with HttpOnly + SameSite=Strict; Path=/echo/ so it scopes to the dashboard reverse proxy mount. """ # Accept JSON body too (login.html might POST JSON in Lane B2) ctype = (self.headers.get("Content-Type", "") or "").lower() if "application/json" in ctype: try: length = int(self.headers.get("Content-Length", "0") or "0") raw = self.rfile.read(length).decode("utf-8") if length > 0 else "" form = json.loads(raw) if raw else {} if not isinstance(form, dict): form = {} except (ValueError, json.JSONDecodeError, UnicodeDecodeError, OSError): form = {} else: form = self._read_form_body() provided = (form.get("token") or "").strip() expected = _get_dashboard_token() if not provided or not secrets.compare_digest(provided, expected): body = json.dumps({"error": "Invalid token"}).encode("utf-8") self.send_response(401) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-store") self.end_headers() try: self.wfile.write(body) except (BrokenPipeError, ConnectionResetError): pass return cookie = ( f"{_COOKIE_NAME}={expected}; HttpOnly; SameSite=Strict; " f"Path={_COOKIE_PATH}; Max-Age={_COOKIE_MAX_AGE}" ) self.send_response(302) self.send_header("Set-Cookie", cookie) self.send_header("Location", "/echo/workspace.html") self.send_header("Content-Length", "0") self.send_header("Cache-Control", "no-store") self.end_headers() # ── POST /api/auth/logout ────────────────────────────────────────── def handle_logout(self): """Clear the dashboard cookie. Returns 200 JSON `{"ok": true}`.""" cookie = ( f"{_COOKIE_NAME}=; HttpOnly; SameSite=Strict; " f"Path={_COOKIE_PATH}; Max-Age=0" ) body = json.dumps({"ok": True}).encode("utf-8") self.send_response(200) self.send_header("Set-Cookie", cookie) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-store") self.end_headers() try: self.wfile.write(body) except (BrokenPipeError, ConnectionResetError): pass