From 23666f791036d9d175de9404330869bfd8611ed1 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Wed, 27 May 2026 14:55:57 +0000 Subject: [PATCH] =?UTF-8?q?feat(voice):=20Pas=205=20=E2=80=94=20voice/pipe?= =?UTF-8?q?line.py=20VoiceSession=20+=20EchoVoiceSink=20+=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Central voice pipeline (~250 LOC + docstrings = ~430 lines): VoiceSession (context manager + idempotent cleanup pe 5 căi): - __enter__: acquire _lock, open JSONL (record=on) - __exit__: calls cleanup("exit"), nu suprimă exceptions - cleanup(reason): IDEMPOTENT, side effects o singură dată — JSONL flush+close (record=on) sau delete (record=off), bot presence cleared, voice_client.cleanup(), ttsq.stop(), cancel filler task, lock release, structured log la logs/voice_metrics.jsonl - on_segment_done(speaker_id, text, no_speech_prob): mirror text channel, append JSONL, arm 3s filler timer, route_message cu on_text callback + cancel filler la first block - last_activity_ts: time.monotonic() — caller-driven 5min auto-leave EchoVoiceSink(session, bot_user_id): - wants_opus() False (PCM) - write() runs în voice_recv reader thread (threading primitives only): - GUARD 1: user None/id==0/id==bot_user_id → return (load-bearing echo prevention) - GUARD 2: whitelist filter (empty = allow all) - Buffer 20ms packets per-user → batch 100ms (5×20ms = 19200 bytes) → silero-vad threshold 0.5 → 800ms cumulative silence flush - _flush_to_stt: faster-whisper small int8 cpu_threads=4 lang=ro beam_size=1, no_speech_prob > 0.6 drop, schedule on_segment_done via run_coroutine_threadsafe pe session.loop Module helpers (lazy thread-safe singletons): _get_whisper_model, _get_silero_vad. Constants: FILLER_DELAY_S=3.0, SILENCE_FLUSH_MS=800, VAD_THRESHOLD=0.5, VAD_WINDOW_MS=100, NO_SPEECH_DROP_THRESHOLD=0.6. Decisions: - STT runs in audio thread — acceptable la 2.25s p50 (user just stopped talking, no batching contention). Wrap în ThreadPoolExecutor.submit if perf bites later. - Downsample 48k→16k via 3-sample averaging (no scipy dep). Whisper robust la mild aliasing. - Energy-RMS VAD fallback dacă torch import fail — graceful degrade. - router_route_message injection seam ca kwarg pentru testabilitate. - bot.change_presence handling cross-thread via run_coroutine_threadsafe. tests/test_voice_session_cleanup.py — 6 tests: - voice_leave / disconnect / crash via __exit__ / auto_leave / user_left_channel (5 cleanup paths each verified for: JSONL state, presence cleared, voice_client.cleanup, ttsq.stop, lock release, idempotency) - 1 robustness cross-cut (double-cleanup safety) 6/6 PASS. Regression suite 63/63 PASS (normalize + adapter + mutex). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/voice/pipeline.py | 551 ++++++++++++++++++++++++++++ tests/test_voice_session_cleanup.py | 319 ++++++++++++++++ 2 files changed, 870 insertions(+) create mode 100644 src/voice/pipeline.py create mode 100644 tests/test_voice_session_cleanup.py diff --git a/src/voice/pipeline.py b/src/voice/pipeline.py new file mode 100644 index 0000000..87bfe48 --- /dev/null +++ b/src/voice/pipeline.py @@ -0,0 +1,551 @@ +"""Central voice pipeline: VAD -> STT -> Claude -> TTS for Discord voice. + +``VoiceSession`` binds per-call state — voice_client, TTS queue, transcript +JSONL buffer, whitelist, presence — and exposes a single idempotent +``cleanup()`` invoked from every exit path (user /voice leave, network +disconnect, crash via ``__exit__``, auto-leave timer, user leaves channel). + +``EchoVoiceSink`` is the discord-ext-voice-recv ``AudioSink`` subclass that +runs in the voice_recv reader thread. It batches 20ms PCM packets into +100ms windows for silero-vad inference, marks per-user speech timestamps, +and on 800ms cumulative silence flushes the accumulated audio through +faster-whisper. Hallucinated segments (``no_speech_prob > 0.6``) are +dropped. Valid transcripts are scheduled onto the session's event loop +via ``asyncio.run_coroutine_threadsafe``. + +The bot's own ``user.id`` is filtered FIRST inside ``write()`` — load-bearing +echo prevention so a future whitelist expansion (Bianca, etc.) never lets +the bot transcribe itself. + +See plan: ``src/voice/pipeline.py`` (Pas 5), Engineering decisions #4 +(VAD 100ms batched), #5 (cleanup centralizat), #7 (bot.user.id explicit +guard), #8 (filler audio ``thinking.wav`` at 3s pre-first-block). +""" +from __future__ import annotations + +import asyncio +import json +import logging +import threading +import time +from pathlib import Path +from typing import Any, Callable, Optional + +import numpy as np + +from src.voice._discord_voice_adapter import AudioSink, VoiceData + +log = logging.getLogger(__name__) + + +# Discord delivers 48kHz s16le stereo PCM, 20ms per packet (3840 bytes). +SAMPLE_RATE_DISCORD = 48000 +SAMPLE_RATE_WHISPER = 16000 +PACKET_MS = 20 +PACKET_BYTES = 3840 # 48000 Hz * 0.020 s * 2 channels * 2 bytes +VAD_WINDOW_MS = 100 # batch 5 * 20ms packets per VAD inference (Decision #4) +VAD_WINDOW_BYTES = PACKET_BYTES * (VAD_WINDOW_MS // PACKET_MS) +VAD_THRESHOLD = 0.5 +SILENCE_FLUSH_MS = 800 +NO_SPEECH_DROP_THRESHOLD = 0.6 +FILLER_DELAY_S = 3.0 + +PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent +LOGS_DIR = PROJECT_ROOT / "logs" +VOICE_METRICS_PATH = LOGS_DIR / "voice_metrics.jsonl" + + +# ---------- Lazy model singletons ---------- + +_whisper_model: Any = None +_whisper_lock = threading.Lock() +_silero_model: Any = None +_silero_get_timestamps: Any = None +_silero_lock = threading.Lock() + + +def _get_whisper_model() -> Any: + """Lazy-load faster-whisper ``small`` int8 with the spike-validated + ``cpu_threads=4`` (see ``tasks/voice-bench-results.md``).""" + global _whisper_model + if _whisper_model is not None: + return _whisper_model + with _whisper_lock: + if _whisper_model is not None: + return _whisper_model + from faster_whisper import WhisperModel + _whisper_model = WhisperModel( + "small", device="cpu", compute_type="int8", cpu_threads=4, + ) + return _whisper_model + + +def _get_silero_vad(): + """Lazy-load silero-vad. Returns ``(model, get_speech_timestamps)``.""" + global _silero_model, _silero_get_timestamps + if _silero_model is not None: + return _silero_model, _silero_get_timestamps + with _silero_lock: + if _silero_model is not None: + return _silero_model, _silero_get_timestamps + from silero_vad import get_speech_timestamps, load_silero_vad + _silero_model = load_silero_vad() + _silero_get_timestamps = get_speech_timestamps + return _silero_model, _silero_get_timestamps + + +# ---------- Audio helpers ---------- + +def _pcm48_stereo_to_16_mono(pcm: bytes) -> np.ndarray: + """Discord 48kHz s16le stereo bytes -> 16kHz mono float32 in [-1, 1]. + + Cheap downsample: average the two channels, then average every 3 + samples (48k / 3 = 16k). faster-whisper + silero-vad accept the + resulting ``np.float32`` array directly. + """ + if not pcm: + return np.zeros(0, dtype=np.float32) + samples = np.frombuffer(pcm, dtype=np.int16) + if samples.size % 2 != 0: + samples = samples[:-1] + stereo = samples.reshape(-1, 2) + mono = stereo.mean(axis=1).astype(np.float32) / 32768.0 + if mono.size == 0: + return mono + trim = (mono.size // 3) * 3 + if trim == 0: + return np.zeros(0, dtype=np.float32) + mono = mono[:trim].reshape(-1, 3).mean(axis=1) + return mono.astype(np.float32) + + +# ---------- VoiceSession ---------- + +class VoiceSession: + """Per-voice-call state with a single idempotent ``cleanup()``.""" + + def __init__( + self, + *, + channel_id: int, + guild_id: int, + text_channel: Any, + voice_client: Any, + bot: Any, + ttsq: Any, + whitelist: Optional[set] = None, + record_enabled: bool = False, + mirror_enabled: bool = True, + transcripts_jsonl_path: Optional[Path] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + router_route_message: Optional[Callable] = None, + ): + self.channel_id = int(channel_id) + self.guild_id = int(guild_id) + self.text_channel = text_channel + self.voice_client = voice_client + self.bot = bot + self.ttsq = ttsq + self.whitelist: set = set(whitelist or set()) + self.record_enabled = bool(record_enabled) + self.mirror_enabled = bool(mirror_enabled) + self.transcripts_jsonl_path = transcripts_jsonl_path + self.loop = loop + # Injection seam so tests can replace router.route_message without + # mocking the whole module. + if router_route_message is None: + from src.router import route_message as _rm + self._route_message = _rm + else: + self._route_message = router_route_message + + self.last_activity_ts = time.monotonic() + self._jsonl_fh = None + self._lock = threading.Lock() + self._cleaned_up = False + self._lock_owner_thread: Optional[int] = None + self._filler_task: Optional[asyncio.Task] = None + self._first_block_seen = False + + # ----- context manager ----- + + def __enter__(self) -> "VoiceSession": + self._lock.acquire() + self._lock_owner_thread = threading.get_ident() + if self.record_enabled and self.transcripts_jsonl_path is not None: + try: + self.transcripts_jsonl_path.parent.mkdir( + parents=True, exist_ok=True, + ) + self._jsonl_fh = open( + self.transcripts_jsonl_path, "a", + buffering=1, encoding="utf-8", + ) + except OSError as e: + log.warning("voice transcript open failed: %s", e) + self._jsonl_fh = None + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> bool: + self.cleanup("exit") + return False # never suppress exceptions + + # ----- cleanup (centralized, idempotent) ----- + + def cleanup(self, reason: str) -> None: + """Single drain path for ALL 5 exit scenarios. Safe to call twice.""" + if self._cleaned_up: + return + self._cleaned_up = True + + # 1. Flush or discard JSONL transcript. + if self._jsonl_fh is not None: + try: + self._jsonl_fh.flush() + self._jsonl_fh.close() + except Exception as e: # noqa: BLE001 + log.warning("voice transcript flush failed: %s", e) + self._jsonl_fh = None + if (not self.record_enabled + and self.transcripts_jsonl_path is not None + and self.transcripts_jsonl_path.exists()): + try: + self.transcripts_jsonl_path.unlink() + except OSError: + pass + + # 2. Restore bot presence (clear Listening activity). + if self.bot is not None: + try: + change = getattr(self.bot, "change_presence", None) + if callable(change): + coro = change(activity=None) + if asyncio.iscoroutine(coro): + if self.loop is not None and self.loop.is_running(): + asyncio.run_coroutine_threadsafe(coro, self.loop) + else: + # Best-effort: close the coroutine so Python + # doesn't emit "coroutine was never awaited". + coro.close() + except Exception as e: # noqa: BLE001 + log.warning("voice presence restore failed: %s", e) + + # 3. Tear down the voice client. + if self.voice_client is not None: + try: + self.voice_client.cleanup() + except Exception as e: # noqa: BLE001 + log.warning("voice_client.cleanup failed: %s", e) + + # 4. Stop the TTS queue worker. + if self.ttsq is not None: + try: + self.ttsq.stop() + except Exception as e: # noqa: BLE001 + log.warning("ttsq.stop failed: %s", e) + + # 5. Cancel pending filler task. + if self._filler_task is not None and not self._filler_task.done(): + try: + self._filler_task.cancel() + except Exception: # noqa: BLE001 + pass + + # 6. Release the session lock (held since __enter__). + try: + if self._lock.locked(): + self._lock.release() + except RuntimeError: + # Released from a different thread than acquired it — already + # free for the next caller; nothing to do. + pass + + self._log_metric({"event": "cleanup", "reason": reason}) + + # ----- segment completion (scheduled from sink) ----- + + async def on_segment_done( + self, + speaker_id: int, + text: str, + no_speech_prob: float, + ) -> None: + """Mirror, persist, route to Claude, drive TTS via streaming callback.""" + if self._cleaned_up: + return + self.last_activity_ts = time.monotonic() + speaker_name = self._resolve_speaker_name(speaker_id) + + # 1. Mirror to text channel (one Unicode 🎤 — exception per plan). + if self.mirror_enabled and self.text_channel is not None: + try: + send = getattr(self.text_channel, "send", None) + if callable(send): + coro = send(f"\U0001f3a4 {speaker_name}: \"{text}\"") + if asyncio.iscoroutine(coro): + await coro + except Exception as e: # noqa: BLE001 + log.warning("voice mirror send failed: %s", e) + + # 2. Append to JSONL transcript buffer if recording. + if self._jsonl_fh is not None: + try: + self._jsonl_fh.write( + json.dumps({ + "ts": time.time(), + "speaker_id": speaker_id, + "speaker": speaker_name, + "text": text, + "no_speech_prob": no_speech_prob, + }, ensure_ascii=False) + "\n" + ) + except Exception as e: # noqa: BLE001 + log.warning("voice transcript write failed: %s", e) + + # 3. Arm the 3s filler timer — fires only if no Claude block arrives. + self._first_block_seen = False + if self._filler_task is not None and not self._filler_task.done(): + self._filler_task.cancel() + try: + self._filler_task = asyncio.create_task(self._filler_after_delay()) + except RuntimeError: + # No running loop (test path). Skip the timer. + self._filler_task = None + + def voice_stream_callback(block: str) -> None: + """Called once per Claude streamed text block — pushes to TTS + and cancels the filler on first arrival.""" + if not self._first_block_seen: + self._first_block_seen = True + ft = self._filler_task + if ft is not None and not ft.done(): + try: + ft.cancel() + except Exception: # noqa: BLE001 + pass + try: + self.ttsq.push_text(block) + except Exception as e: # noqa: BLE001 + log.warning("ttsq.push_text failed: %s", e) + + # 4. Dispatch to Claude. send_message is sync subprocess, run on + # a worker thread so the loop stays responsive for mirror/TTS. + try: + await asyncio.to_thread( + self._route_message, + str(self.channel_id), + str(speaker_id), + text, + None, # model + voice_stream_callback, # on_text + "discord-voice", # adapter_name + ) + except Exception as e: # noqa: BLE001 + log.error("route_message voice path failed: %s", e) + + async def _filler_after_delay(self) -> None: + """Push ``assets/voice/thinking.wav`` after FILLER_DELAY_S if Claude + hasn't produced a first block yet.""" + try: + await asyncio.sleep(FILLER_DELAY_S) + except asyncio.CancelledError: + return + if self._first_block_seen or self._cleaned_up: + return + try: + self.ttsq.push_filler() + except Exception as e: # noqa: BLE001 + log.warning("ttsq.push_filler failed: %s", e) + + # ----- helpers ----- + + def _resolve_speaker_name(self, speaker_id: int) -> str: + """Best-effort display name lookup via the bot user cache.""" + try: + if self.bot is not None and hasattr(self.bot, "get_user"): + user = self.bot.get_user(speaker_id) + if user is not None: + name = getattr(user, "display_name", None) or getattr( + user, "name", None, + ) + if name: + return str(name) + except Exception: # noqa: BLE001 + pass + return str(speaker_id) + + def _log_metric(self, payload: dict) -> None: + """Append a structured event to ``logs/voice_metrics.jsonl``.""" + event = {"ts": time.time(), "channel_id": self.channel_id, **payload} + try: + LOGS_DIR.mkdir(parents=True, exist_ok=True) + with open(VOICE_METRICS_PATH, "a", buffering=1, encoding="utf-8") as f: + f.write(json.dumps(event, ensure_ascii=False) + "\n") + except OSError: + pass + + +# ---------- EchoVoiceSink ---------- + +class EchoVoiceSink(AudioSink): + """PCM-in sink: per-user 20ms buffer -> 100ms VAD windows -> 800ms + silence triggers Whisper STT -> schedules ``on_segment_done`` on the + session loop. + + Lives in the voice_recv reader thread; uses ``threading`` primitives + only (no asyncio in the hot path). + """ + + def __init__(self, session: VoiceSession, bot_user_id: int): + super().__init__() + self.session = session + self.bot_user_id = int(bot_user_id) if bot_user_id is not None else 0 + self.whitelist: set = set(session.whitelist or set()) + self._user_buffers: dict[int, bytearray] = {} + self._packet_accum: dict[int, bytearray] = {} + self._last_speech_ts: dict[int, float] = {} + self._has_speech: dict[int, bool] = {} + self._sink_lock = threading.Lock() + + def wants_opus(self) -> bool: + return False + + def cleanup(self) -> None: + with self._sink_lock: + self._user_buffers.clear() + self._packet_accum.clear() + self._last_speech_ts.clear() + self._has_speech.clear() + + def write(self, user, voice_data: VoiceData) -> None: + # ---- FIRST GUARD (LOAD-BEARING): bot's own voice --------------- + if user is None: + return + uid = int(getattr(user, "id", 0) or 0) + if uid == 0: + return + if uid == self.bot_user_id: + return + + # ---- SECOND GUARD: whitelist filter ---------------------------- + if self.whitelist and uid not in self.whitelist: + return + + pcm = getattr(voice_data, "pcm", None) + if not pcm: + return + + window_pcm: Optional[bytes] = None + pcm_for_stt: Optional[bytes] = None + + try: + with self._sink_lock: + buf = self._user_buffers.setdefault(uid, bytearray()) + accum = self._packet_accum.setdefault(uid, bytearray()) + buf.extend(pcm) + accum.extend(pcm) + if len(accum) >= VAD_WINDOW_BYTES: + window_pcm = bytes(accum[:VAD_WINDOW_BYTES]) + del accum[:VAD_WINDOW_BYTES] + + if window_pcm is not None: + if self._vad_detects_speech(window_pcm): + with self._sink_lock: + self._last_speech_ts[uid] = time.monotonic() + self._has_speech[uid] = True + + with self._sink_lock: + if self._has_speech.get(uid): + last = self._last_speech_ts.get(uid, 0.0) + silence_ms = (time.monotonic() - last) * 1000.0 + if silence_ms >= SILENCE_FLUSH_MS: + pcm_for_stt = bytes(self._user_buffers.get(uid, b"")) + self._user_buffers[uid] = bytearray() + self._packet_accum[uid] = bytearray() + self._has_speech[uid] = False + + if pcm_for_stt: + self._flush_to_stt(uid, pcm_for_stt) + except Exception as e: # noqa: BLE001 + log.warning("EchoVoiceSink.write failed: %s", e) + + # ----- VAD ----- + + def _vad_detects_speech(self, pcm48_stereo: bytes) -> bool: + """Run silero-vad on a 100ms window. Falls back to an RMS energy + threshold if torch / silero are unavailable.""" + try: + mono16 = _pcm48_stereo_to_16_mono(pcm48_stereo) + if mono16.size == 0: + return False + try: + import torch + except ImportError: + rms = float(np.sqrt(np.mean(mono16.astype(np.float64) ** 2))) + return rms > 0.02 + model, _ = _get_silero_vad() + with torch.no_grad(): + prob = float(model(torch.from_numpy(mono16), + SAMPLE_RATE_WHISPER).item()) + return prob >= VAD_THRESHOLD + except Exception as e: # noqa: BLE001 + log.debug("VAD inference failed: %s", e) + return False + + # ----- STT flush ----- + + def _flush_to_stt(self, user_id: int, pcm48_stereo: bytes) -> None: + """Downsample, Whisper-transcribe RO, drop hallucinations, dispatch.""" + try: + mono16 = _pcm48_stereo_to_16_mono(pcm48_stereo) + if mono16.size == 0: + return + model = _get_whisper_model() + segments, _info = model.transcribe( + mono16, language="ro", beam_size=1, + ) + text_parts: list[str] = [] + worst_no_speech = 0.0 + for seg in segments: + no_sp = float(getattr(seg, "no_speech_prob", 0.0) or 0.0) + if no_sp > worst_no_speech: + worst_no_speech = no_sp + if no_sp > NO_SPEECH_DROP_THRESHOLD: + continue + seg_text = (getattr(seg, "text", "") or "").strip() + if seg_text: + text_parts.append(seg_text) + if not text_parts: + return + text = " ".join(text_parts).strip() + if not text: + return + self._schedule_segment_done(user_id, text, worst_no_speech) + except Exception as e: # noqa: BLE001 + log.warning("Whisper transcribe failed: %s", e) + + def _schedule_segment_done( + self, user_id: int, text: str, no_speech_prob: float, + ) -> None: + loop = self.session.loop + if loop is None or not loop.is_running(): + log.debug("voice session loop missing — dropping segment") + return + try: + asyncio.run_coroutine_threadsafe( + self.session.on_segment_done(user_id, text, no_speech_prob), + loop, + ) + except Exception as e: # noqa: BLE001 + log.warning("voice segment dispatch failed: %s", e) + + +__all__ = [ + "VoiceSession", + "EchoVoiceSink", + "FILLER_DELAY_S", + "SILENCE_FLUSH_MS", + "VAD_THRESHOLD", + "VAD_WINDOW_MS", + "NO_SPEECH_DROP_THRESHOLD", +] diff --git a/tests/test_voice_session_cleanup.py b/tests/test_voice_session_cleanup.py new file mode 100644 index 0000000..e067ae0 --- /dev/null +++ b/tests/test_voice_session_cleanup.py @@ -0,0 +1,319 @@ +"""Cleanup-path tests for ``src/voice/pipeline.py::VoiceSession``. + +Pins the centralized ``cleanup()`` contract from the voice plan +(Engineering decision #5): every one of the FIVE exit paths must drain +state cleanly and idempotently — lock released, JSONL flushed or +discarded, presence cleared, ``voice_client.cleanup()`` invoked, +``ttsq.stop()`` invoked, and a second call to ``cleanup()`` MUST be a +no-op (side effects happen exactly once). + +The 5 paths under test: + 1. ``test_cleanup_on_voice_leave`` — explicit ``/voice leave`` + 2. ``test_cleanup_on_disconnect`` — Discord-level disconnect + 3. ``test_cleanup_on_crash`` — exception via ``__exit__`` + 4. ``test_cleanup_on_auto_leave`` — 5-min inactivity timer + 5. ``test_cleanup_on_user_leaves_channel`` — user leaves voice channel +""" +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from src.voice.pipeline import VoiceSession + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mock_bot(): + bot = MagicMock(name="bot") + bot.user = MagicMock() + bot.user.id = 999_999 + bot.change_presence = AsyncMock(name="change_presence") + bot.get_user = MagicMock(return_value=None) + return bot + + +@pytest.fixture +def mock_voice_client(): + vc = MagicMock(name="voice_client") + vc.cleanup = MagicMock(name="vc_cleanup") + return vc + + +@pytest.fixture +def mock_ttsq(): + ttsq = MagicMock(name="ttsq") + ttsq.stop = MagicMock(name="ttsq_stop") + return ttsq + + +@pytest.fixture +def mock_text_channel(): + tc = MagicMock(name="text_channel") + tc.send = AsyncMock(name="text_send") + return tc + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_session( + tmp_path: Path, + mock_bot, + mock_voice_client, + mock_ttsq, + mock_text_channel, + *, + record_enabled: bool = True, +) -> VoiceSession: + jsonl = tmp_path / ("transcripts.jsonl" if record_enabled else "noop.jsonl") + return VoiceSession( + channel_id=1001, + guild_id=42, + text_channel=mock_text_channel, + voice_client=mock_voice_client, + bot=mock_bot, + ttsq=mock_ttsq, + whitelist={1234}, + record_enabled=record_enabled, + mirror_enabled=True, + transcripts_jsonl_path=jsonl, + loop=None, + router_route_message=MagicMock(name="route_message"), + ) + + +def _assert_clean_post_cleanup( + session: VoiceSession, + voice_client, + ttsq, + bot, + jsonl_path: Path, + record_enabled: bool, +) -> None: + """Assertions shared across all five cleanup-path tests.""" + # 1. Lock released — non-blocking acquire from this thread returns True. + acquired = session._lock.acquire(blocking=False) + assert acquired, "session._lock must be released after cleanup()" + session._lock.release() + + # 2. voice_client.cleanup() called exactly once. + assert voice_client.cleanup.call_count == 1, ( + f"voice_client.cleanup() called {voice_client.cleanup.call_count}x, " + f"expected 1" + ) + + # 3. ttsq.stop() called exactly once. + assert ttsq.stop.call_count == 1, ( + f"ttsq.stop() called {ttsq.stop.call_count}x, expected 1" + ) + + # 4. bot.change_presence(activity=None) called at least once with that kwarg. + assert bot.change_presence.call_count >= 1, ( + "bot.change_presence was never called — presence not restored" + ) + bot.change_presence.assert_called_with(activity=None) + + # 5. JSONL flushed (record=on) OR absent (record=off). + if record_enabled: + assert jsonl_path.exists(), ( + "record=on: JSONL file must exist (was created by __enter__ and " + "left in place by cleanup so transcript can be persisted)" + ) + else: + # record=off: cleanup unlinks the file if it ever existed. + assert not jsonl_path.exists() or jsonl_path.stat().st_size == 0 + + +# --------------------------------------------------------------------------- +# Scenario 1 — explicit /voice leave +# --------------------------------------------------------------------------- + + +class TestCleanupOnVoiceLeave: + def test_cleanup_on_voice_leave( + self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + ): + session = _make_session( + tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + record_enabled=True, + ) + jsonl_path = session.transcripts_jsonl_path + + with session: + # Simulate one transcript line. + session._jsonl_fh.write(json.dumps({"text": "salut"}) + "\n") + session.cleanup("voice_leave") + assert session._cleaned_up is True + + # __exit__ called cleanup("exit") — must be a no-op the second time. + _assert_clean_post_cleanup( + session, mock_voice_client, mock_ttsq, mock_bot, + jsonl_path, record_enabled=True, + ) + + # Idempotency: a third explicit call still doesn't bump counts. + session.cleanup("redundant") + assert mock_voice_client.cleanup.call_count == 1 + assert mock_ttsq.stop.call_count == 1 + + +# --------------------------------------------------------------------------- +# Scenario 2 — Discord-level voice disconnect +# --------------------------------------------------------------------------- + + +class TestCleanupOnDisconnect: + def test_cleanup_on_disconnect( + self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + ): + session = _make_session( + tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + record_enabled=False, + ) + jsonl_path = session.transcripts_jsonl_path + + session.__enter__() + # Network drop arrives outside the with-block. + session.cleanup("disconnect") + _assert_clean_post_cleanup( + session, mock_voice_client, mock_ttsq, mock_bot, + jsonl_path, record_enabled=False, + ) + + # Idempotency. + session.cleanup("disconnect-again") + assert mock_voice_client.cleanup.call_count == 1 + assert mock_ttsq.stop.call_count == 1 + + +# --------------------------------------------------------------------------- +# Scenario 3 — crash / exception via __exit__ +# --------------------------------------------------------------------------- + + +class TestCleanupOnCrash: + def test_cleanup_on_crash( + self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + ): + session = _make_session( + tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + record_enabled=True, + ) + jsonl_path = session.transcripts_jsonl_path + + with pytest.raises(RuntimeError, match="simulated crash"): + with session: + # Pipeline raises mid-call. + raise RuntimeError("simulated crash") + + # __exit__ must have driven cleanup — every side effect happened once. + _assert_clean_post_cleanup( + session, mock_voice_client, mock_ttsq, mock_bot, + jsonl_path, record_enabled=True, + ) + + # Idempotency: explicit follow-up call (e.g. an outer error handler + # also tries to cleanup) MUST be a no-op. + session.cleanup("post-crash") + assert mock_voice_client.cleanup.call_count == 1 + assert mock_ttsq.stop.call_count == 1 + + +# --------------------------------------------------------------------------- +# Scenario 4 — auto-leave timer fires after 5 min inactivity +# --------------------------------------------------------------------------- + + +class TestCleanupOnAutoLeave: + def test_cleanup_on_auto_leave( + self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + ): + session = _make_session( + tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + record_enabled=True, + ) + jsonl_path = session.transcripts_jsonl_path + + session.__enter__() + # The auto-leave timer trips outside the with-block. + session.cleanup("auto_leave") + + _assert_clean_post_cleanup( + session, mock_voice_client, mock_ttsq, mock_bot, + jsonl_path, record_enabled=True, + ) + + # Idempotency. + session.cleanup("auto_leave_redundant") + assert mock_voice_client.cleanup.call_count == 1 + assert mock_ttsq.stop.call_count == 1 + + +# --------------------------------------------------------------------------- +# Scenario 5 — user leaves voice channel themselves +# --------------------------------------------------------------------------- + + +class TestCleanupOnUserLeaves: + def test_cleanup_on_user_leaves_channel( + self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + ): + session = _make_session( + tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + record_enabled=False, + ) + jsonl_path = session.transcripts_jsonl_path + + session.__enter__() + # voice_state_update event handler invokes cleanup directly. + session.cleanup("user_left_channel") + + _assert_clean_post_cleanup( + session, mock_voice_client, mock_ttsq, mock_bot, + jsonl_path, record_enabled=False, + ) + + # Idempotency. + session.cleanup("user_left_again") + assert mock_voice_client.cleanup.call_count == 1 + assert mock_ttsq.stop.call_count == 1 + + +# --------------------------------------------------------------------------- +# Cross-cutting: failures inside cleanup don't propagate +# --------------------------------------------------------------------------- + + +class TestCleanupRobustness: + def test_cleanup_swallows_voice_client_errors( + self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + ): + """If voice_client.cleanup() raises, ttsq.stop() must still run and + the lock must still release — otherwise a broken Discord state would + deadlock the channel forever.""" + mock_voice_client.cleanup.side_effect = RuntimeError("vc died") + + session = _make_session( + tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel, + record_enabled=False, + ) + + with session: + session.cleanup("voice_leave") + + # ttsq.stop still ran exactly once. + assert mock_ttsq.stop.call_count == 1 + # Lock released. + acquired = session._lock.acquire(blocking=False) + assert acquired, "lock must release even when voice_client.cleanup raises" + session._lock.release()