feat(voice): Pas 5 — voice/pipeline.py VoiceSession + EchoVoiceSink + cleanup
Central voice pipeline (~250 LOC + docstrings = ~430 lines):
VoiceSession (context manager + idempotent cleanup pe 5 căi):
- __enter__: acquire _lock, open JSONL (record=on)
- __exit__: calls cleanup("exit"), nu suprimă exceptions
- cleanup(reason): IDEMPOTENT, side effects o singură dată — JSONL
flush+close (record=on) sau delete (record=off), bot presence cleared,
voice_client.cleanup(), ttsq.stop(), cancel filler task, lock release,
structured log la logs/voice_metrics.jsonl
- on_segment_done(speaker_id, text, no_speech_prob): mirror text channel,
append JSONL, arm 3s filler timer, route_message cu on_text callback
+ cancel filler la first block
- last_activity_ts: time.monotonic() — caller-driven 5min auto-leave
EchoVoiceSink(session, bot_user_id):
- wants_opus() False (PCM)
- write() runs în voice_recv reader thread (threading primitives only):
- GUARD 1: user None/id==0/id==bot_user_id → return (load-bearing
echo prevention)
- GUARD 2: whitelist filter (empty = allow all)
- Buffer 20ms packets per-user → batch 100ms (5×20ms = 19200 bytes)
→ silero-vad threshold 0.5 → 800ms cumulative silence flush
- _flush_to_stt: faster-whisper small int8 cpu_threads=4 lang=ro
beam_size=1, no_speech_prob > 0.6 drop, schedule on_segment_done
via run_coroutine_threadsafe pe session.loop
Module helpers (lazy thread-safe singletons): _get_whisper_model,
_get_silero_vad. Constants: FILLER_DELAY_S=3.0, SILENCE_FLUSH_MS=800,
VAD_THRESHOLD=0.5, VAD_WINDOW_MS=100, NO_SPEECH_DROP_THRESHOLD=0.6.
Decisions:
- STT runs in audio thread — acceptable la 2.25s p50 (user just stopped
talking, no batching contention). Wrap în ThreadPoolExecutor.submit
if perf bites later.
- Downsample 48k→16k via 3-sample averaging (no scipy dep). Whisper
robust la mild aliasing.
- Energy-RMS VAD fallback dacă torch import fail — graceful degrade.
- router_route_message injection seam ca kwarg pentru testabilitate.
- bot.change_presence handling cross-thread via run_coroutine_threadsafe.
tests/test_voice_session_cleanup.py — 6 tests:
- voice_leave / disconnect / crash via __exit__ / auto_leave /
user_left_channel (5 cleanup paths each verified for: JSONL state,
presence cleared, voice_client.cleanup, ttsq.stop, lock release,
idempotency)
- 1 robustness cross-cut (double-cleanup safety)
6/6 PASS. Regression suite 63/63 PASS (normalize + adapter + mutex).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
551
src/voice/pipeline.py
Normal file
551
src/voice/pipeline.py
Normal file
@@ -0,0 +1,551 @@
|
|||||||
|
"""Central voice pipeline: VAD -> STT -> Claude -> TTS for Discord voice.
|
||||||
|
|
||||||
|
``VoiceSession`` binds per-call state — voice_client, TTS queue, transcript
|
||||||
|
JSONL buffer, whitelist, presence — and exposes a single idempotent
|
||||||
|
``cleanup()`` invoked from every exit path (user /voice leave, network
|
||||||
|
disconnect, crash via ``__exit__``, auto-leave timer, user leaves channel).
|
||||||
|
|
||||||
|
``EchoVoiceSink`` is the discord-ext-voice-recv ``AudioSink`` subclass that
|
||||||
|
runs in the voice_recv reader thread. It batches 20ms PCM packets into
|
||||||
|
100ms windows for silero-vad inference, marks per-user speech timestamps,
|
||||||
|
and on 800ms cumulative silence flushes the accumulated audio through
|
||||||
|
faster-whisper. Hallucinated segments (``no_speech_prob > 0.6``) are
|
||||||
|
dropped. Valid transcripts are scheduled onto the session's event loop
|
||||||
|
via ``asyncio.run_coroutine_threadsafe``.
|
||||||
|
|
||||||
|
The bot's own ``user.id`` is filtered FIRST inside ``write()`` — load-bearing
|
||||||
|
echo prevention so a future whitelist expansion (Bianca, etc.) never lets
|
||||||
|
the bot transcribe itself.
|
||||||
|
|
||||||
|
See plan: ``src/voice/pipeline.py`` (Pas 5), Engineering decisions #4
|
||||||
|
(VAD 100ms batched), #5 (cleanup centralizat), #7 (bot.user.id explicit
|
||||||
|
guard), #8 (filler audio ``thinking.wav`` at 3s pre-first-block).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from src.voice._discord_voice_adapter import AudioSink, VoiceData
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# Discord delivers 48kHz s16le stereo PCM, 20ms per packet (3840 bytes).
|
||||||
|
SAMPLE_RATE_DISCORD = 48000
|
||||||
|
SAMPLE_RATE_WHISPER = 16000
|
||||||
|
PACKET_MS = 20
|
||||||
|
PACKET_BYTES = 3840 # 48000 Hz * 0.020 s * 2 channels * 2 bytes
|
||||||
|
VAD_WINDOW_MS = 100 # batch 5 * 20ms packets per VAD inference (Decision #4)
|
||||||
|
VAD_WINDOW_BYTES = PACKET_BYTES * (VAD_WINDOW_MS // PACKET_MS)
|
||||||
|
VAD_THRESHOLD = 0.5
|
||||||
|
SILENCE_FLUSH_MS = 800
|
||||||
|
NO_SPEECH_DROP_THRESHOLD = 0.6
|
||||||
|
FILLER_DELAY_S = 3.0
|
||||||
|
|
||||||
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||||
|
LOGS_DIR = PROJECT_ROOT / "logs"
|
||||||
|
VOICE_METRICS_PATH = LOGS_DIR / "voice_metrics.jsonl"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- Lazy model singletons ----------
|
||||||
|
|
||||||
|
_whisper_model: Any = None
|
||||||
|
_whisper_lock = threading.Lock()
|
||||||
|
_silero_model: Any = None
|
||||||
|
_silero_get_timestamps: Any = None
|
||||||
|
_silero_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_whisper_model() -> Any:
|
||||||
|
"""Lazy-load faster-whisper ``small`` int8 with the spike-validated
|
||||||
|
``cpu_threads=4`` (see ``tasks/voice-bench-results.md``)."""
|
||||||
|
global _whisper_model
|
||||||
|
if _whisper_model is not None:
|
||||||
|
return _whisper_model
|
||||||
|
with _whisper_lock:
|
||||||
|
if _whisper_model is not None:
|
||||||
|
return _whisper_model
|
||||||
|
from faster_whisper import WhisperModel
|
||||||
|
_whisper_model = WhisperModel(
|
||||||
|
"small", device="cpu", compute_type="int8", cpu_threads=4,
|
||||||
|
)
|
||||||
|
return _whisper_model
|
||||||
|
|
||||||
|
|
||||||
|
def _get_silero_vad():
|
||||||
|
"""Lazy-load silero-vad. Returns ``(model, get_speech_timestamps)``."""
|
||||||
|
global _silero_model, _silero_get_timestamps
|
||||||
|
if _silero_model is not None:
|
||||||
|
return _silero_model, _silero_get_timestamps
|
||||||
|
with _silero_lock:
|
||||||
|
if _silero_model is not None:
|
||||||
|
return _silero_model, _silero_get_timestamps
|
||||||
|
from silero_vad import get_speech_timestamps, load_silero_vad
|
||||||
|
_silero_model = load_silero_vad()
|
||||||
|
_silero_get_timestamps = get_speech_timestamps
|
||||||
|
return _silero_model, _silero_get_timestamps
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- Audio helpers ----------
|
||||||
|
|
||||||
|
def _pcm48_stereo_to_16_mono(pcm: bytes) -> np.ndarray:
|
||||||
|
"""Discord 48kHz s16le stereo bytes -> 16kHz mono float32 in [-1, 1].
|
||||||
|
|
||||||
|
Cheap downsample: average the two channels, then average every 3
|
||||||
|
samples (48k / 3 = 16k). faster-whisper + silero-vad accept the
|
||||||
|
resulting ``np.float32`` array directly.
|
||||||
|
"""
|
||||||
|
if not pcm:
|
||||||
|
return np.zeros(0, dtype=np.float32)
|
||||||
|
samples = np.frombuffer(pcm, dtype=np.int16)
|
||||||
|
if samples.size % 2 != 0:
|
||||||
|
samples = samples[:-1]
|
||||||
|
stereo = samples.reshape(-1, 2)
|
||||||
|
mono = stereo.mean(axis=1).astype(np.float32) / 32768.0
|
||||||
|
if mono.size == 0:
|
||||||
|
return mono
|
||||||
|
trim = (mono.size // 3) * 3
|
||||||
|
if trim == 0:
|
||||||
|
return np.zeros(0, dtype=np.float32)
|
||||||
|
mono = mono[:trim].reshape(-1, 3).mean(axis=1)
|
||||||
|
return mono.astype(np.float32)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- VoiceSession ----------
|
||||||
|
|
||||||
|
class VoiceSession:
|
||||||
|
"""Per-voice-call state with a single idempotent ``cleanup()``."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
channel_id: int,
|
||||||
|
guild_id: int,
|
||||||
|
text_channel: Any,
|
||||||
|
voice_client: Any,
|
||||||
|
bot: Any,
|
||||||
|
ttsq: Any,
|
||||||
|
whitelist: Optional[set] = None,
|
||||||
|
record_enabled: bool = False,
|
||||||
|
mirror_enabled: bool = True,
|
||||||
|
transcripts_jsonl_path: Optional[Path] = None,
|
||||||
|
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||||||
|
router_route_message: Optional[Callable] = None,
|
||||||
|
):
|
||||||
|
self.channel_id = int(channel_id)
|
||||||
|
self.guild_id = int(guild_id)
|
||||||
|
self.text_channel = text_channel
|
||||||
|
self.voice_client = voice_client
|
||||||
|
self.bot = bot
|
||||||
|
self.ttsq = ttsq
|
||||||
|
self.whitelist: set = set(whitelist or set())
|
||||||
|
self.record_enabled = bool(record_enabled)
|
||||||
|
self.mirror_enabled = bool(mirror_enabled)
|
||||||
|
self.transcripts_jsonl_path = transcripts_jsonl_path
|
||||||
|
self.loop = loop
|
||||||
|
# Injection seam so tests can replace router.route_message without
|
||||||
|
# mocking the whole module.
|
||||||
|
if router_route_message is None:
|
||||||
|
from src.router import route_message as _rm
|
||||||
|
self._route_message = _rm
|
||||||
|
else:
|
||||||
|
self._route_message = router_route_message
|
||||||
|
|
||||||
|
self.last_activity_ts = time.monotonic()
|
||||||
|
self._jsonl_fh = None
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._cleaned_up = False
|
||||||
|
self._lock_owner_thread: Optional[int] = None
|
||||||
|
self._filler_task: Optional[asyncio.Task] = None
|
||||||
|
self._first_block_seen = False
|
||||||
|
|
||||||
|
# ----- context manager -----
|
||||||
|
|
||||||
|
def __enter__(self) -> "VoiceSession":
|
||||||
|
self._lock.acquire()
|
||||||
|
self._lock_owner_thread = threading.get_ident()
|
||||||
|
if self.record_enabled and self.transcripts_jsonl_path is not None:
|
||||||
|
try:
|
||||||
|
self.transcripts_jsonl_path.parent.mkdir(
|
||||||
|
parents=True, exist_ok=True,
|
||||||
|
)
|
||||||
|
self._jsonl_fh = open(
|
||||||
|
self.transcripts_jsonl_path, "a",
|
||||||
|
buffering=1, encoding="utf-8",
|
||||||
|
)
|
||||||
|
except OSError as e:
|
||||||
|
log.warning("voice transcript open failed: %s", e)
|
||||||
|
self._jsonl_fh = None
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
|
||||||
|
self.cleanup("exit")
|
||||||
|
return False # never suppress exceptions
|
||||||
|
|
||||||
|
# ----- cleanup (centralized, idempotent) -----
|
||||||
|
|
||||||
|
def cleanup(self, reason: str) -> None:
|
||||||
|
"""Single drain path for ALL 5 exit scenarios. Safe to call twice."""
|
||||||
|
if self._cleaned_up:
|
||||||
|
return
|
||||||
|
self._cleaned_up = True
|
||||||
|
|
||||||
|
# 1. Flush or discard JSONL transcript.
|
||||||
|
if self._jsonl_fh is not None:
|
||||||
|
try:
|
||||||
|
self._jsonl_fh.flush()
|
||||||
|
self._jsonl_fh.close()
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("voice transcript flush failed: %s", e)
|
||||||
|
self._jsonl_fh = None
|
||||||
|
if (not self.record_enabled
|
||||||
|
and self.transcripts_jsonl_path is not None
|
||||||
|
and self.transcripts_jsonl_path.exists()):
|
||||||
|
try:
|
||||||
|
self.transcripts_jsonl_path.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 2. Restore bot presence (clear Listening activity).
|
||||||
|
if self.bot is not None:
|
||||||
|
try:
|
||||||
|
change = getattr(self.bot, "change_presence", None)
|
||||||
|
if callable(change):
|
||||||
|
coro = change(activity=None)
|
||||||
|
if asyncio.iscoroutine(coro):
|
||||||
|
if self.loop is not None and self.loop.is_running():
|
||||||
|
asyncio.run_coroutine_threadsafe(coro, self.loop)
|
||||||
|
else:
|
||||||
|
# Best-effort: close the coroutine so Python
|
||||||
|
# doesn't emit "coroutine was never awaited".
|
||||||
|
coro.close()
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("voice presence restore failed: %s", e)
|
||||||
|
|
||||||
|
# 3. Tear down the voice client.
|
||||||
|
if self.voice_client is not None:
|
||||||
|
try:
|
||||||
|
self.voice_client.cleanup()
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("voice_client.cleanup failed: %s", e)
|
||||||
|
|
||||||
|
# 4. Stop the TTS queue worker.
|
||||||
|
if self.ttsq is not None:
|
||||||
|
try:
|
||||||
|
self.ttsq.stop()
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("ttsq.stop failed: %s", e)
|
||||||
|
|
||||||
|
# 5. Cancel pending filler task.
|
||||||
|
if self._filler_task is not None and not self._filler_task.done():
|
||||||
|
try:
|
||||||
|
self._filler_task.cancel()
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 6. Release the session lock (held since __enter__).
|
||||||
|
try:
|
||||||
|
if self._lock.locked():
|
||||||
|
self._lock.release()
|
||||||
|
except RuntimeError:
|
||||||
|
# Released from a different thread than acquired it — already
|
||||||
|
# free for the next caller; nothing to do.
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._log_metric({"event": "cleanup", "reason": reason})
|
||||||
|
|
||||||
|
# ----- segment completion (scheduled from sink) -----
|
||||||
|
|
||||||
|
async def on_segment_done(
|
||||||
|
self,
|
||||||
|
speaker_id: int,
|
||||||
|
text: str,
|
||||||
|
no_speech_prob: float,
|
||||||
|
) -> None:
|
||||||
|
"""Mirror, persist, route to Claude, drive TTS via streaming callback."""
|
||||||
|
if self._cleaned_up:
|
||||||
|
return
|
||||||
|
self.last_activity_ts = time.monotonic()
|
||||||
|
speaker_name = self._resolve_speaker_name(speaker_id)
|
||||||
|
|
||||||
|
# 1. Mirror to text channel (one Unicode 🎤 — exception per plan).
|
||||||
|
if self.mirror_enabled and self.text_channel is not None:
|
||||||
|
try:
|
||||||
|
send = getattr(self.text_channel, "send", None)
|
||||||
|
if callable(send):
|
||||||
|
coro = send(f"\U0001f3a4 {speaker_name}: \"{text}\"")
|
||||||
|
if asyncio.iscoroutine(coro):
|
||||||
|
await coro
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("voice mirror send failed: %s", e)
|
||||||
|
|
||||||
|
# 2. Append to JSONL transcript buffer if recording.
|
||||||
|
if self._jsonl_fh is not None:
|
||||||
|
try:
|
||||||
|
self._jsonl_fh.write(
|
||||||
|
json.dumps({
|
||||||
|
"ts": time.time(),
|
||||||
|
"speaker_id": speaker_id,
|
||||||
|
"speaker": speaker_name,
|
||||||
|
"text": text,
|
||||||
|
"no_speech_prob": no_speech_prob,
|
||||||
|
}, ensure_ascii=False) + "\n"
|
||||||
|
)
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("voice transcript write failed: %s", e)
|
||||||
|
|
||||||
|
# 3. Arm the 3s filler timer — fires only if no Claude block arrives.
|
||||||
|
self._first_block_seen = False
|
||||||
|
if self._filler_task is not None and not self._filler_task.done():
|
||||||
|
self._filler_task.cancel()
|
||||||
|
try:
|
||||||
|
self._filler_task = asyncio.create_task(self._filler_after_delay())
|
||||||
|
except RuntimeError:
|
||||||
|
# No running loop (test path). Skip the timer.
|
||||||
|
self._filler_task = None
|
||||||
|
|
||||||
|
def voice_stream_callback(block: str) -> None:
|
||||||
|
"""Called once per Claude streamed text block — pushes to TTS
|
||||||
|
and cancels the filler on first arrival."""
|
||||||
|
if not self._first_block_seen:
|
||||||
|
self._first_block_seen = True
|
||||||
|
ft = self._filler_task
|
||||||
|
if ft is not None and not ft.done():
|
||||||
|
try:
|
||||||
|
ft.cancel()
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
self.ttsq.push_text(block)
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("ttsq.push_text failed: %s", e)
|
||||||
|
|
||||||
|
# 4. Dispatch to Claude. send_message is sync subprocess, run on
|
||||||
|
# a worker thread so the loop stays responsive for mirror/TTS.
|
||||||
|
try:
|
||||||
|
await asyncio.to_thread(
|
||||||
|
self._route_message,
|
||||||
|
str(self.channel_id),
|
||||||
|
str(speaker_id),
|
||||||
|
text,
|
||||||
|
None, # model
|
||||||
|
voice_stream_callback, # on_text
|
||||||
|
"discord-voice", # adapter_name
|
||||||
|
)
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.error("route_message voice path failed: %s", e)
|
||||||
|
|
||||||
|
async def _filler_after_delay(self) -> None:
|
||||||
|
"""Push ``assets/voice/thinking.wav`` after FILLER_DELAY_S if Claude
|
||||||
|
hasn't produced a first block yet."""
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(FILLER_DELAY_S)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
return
|
||||||
|
if self._first_block_seen or self._cleaned_up:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.ttsq.push_filler()
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("ttsq.push_filler failed: %s", e)
|
||||||
|
|
||||||
|
# ----- helpers -----
|
||||||
|
|
||||||
|
def _resolve_speaker_name(self, speaker_id: int) -> str:
|
||||||
|
"""Best-effort display name lookup via the bot user cache."""
|
||||||
|
try:
|
||||||
|
if self.bot is not None and hasattr(self.bot, "get_user"):
|
||||||
|
user = self.bot.get_user(speaker_id)
|
||||||
|
if user is not None:
|
||||||
|
name = getattr(user, "display_name", None) or getattr(
|
||||||
|
user, "name", None,
|
||||||
|
)
|
||||||
|
if name:
|
||||||
|
return str(name)
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
pass
|
||||||
|
return str(speaker_id)
|
||||||
|
|
||||||
|
def _log_metric(self, payload: dict) -> None:
|
||||||
|
"""Append a structured event to ``logs/voice_metrics.jsonl``."""
|
||||||
|
event = {"ts": time.time(), "channel_id": self.channel_id, **payload}
|
||||||
|
try:
|
||||||
|
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(VOICE_METRICS_PATH, "a", buffering=1, encoding="utf-8") as f:
|
||||||
|
f.write(json.dumps(event, ensure_ascii=False) + "\n")
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- EchoVoiceSink ----------
|
||||||
|
|
||||||
|
class EchoVoiceSink(AudioSink):
|
||||||
|
"""PCM-in sink: per-user 20ms buffer -> 100ms VAD windows -> 800ms
|
||||||
|
silence triggers Whisper STT -> schedules ``on_segment_done`` on the
|
||||||
|
session loop.
|
||||||
|
|
||||||
|
Lives in the voice_recv reader thread; uses ``threading`` primitives
|
||||||
|
only (no asyncio in the hot path).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, session: VoiceSession, bot_user_id: int):
|
||||||
|
super().__init__()
|
||||||
|
self.session = session
|
||||||
|
self.bot_user_id = int(bot_user_id) if bot_user_id is not None else 0
|
||||||
|
self.whitelist: set = set(session.whitelist or set())
|
||||||
|
self._user_buffers: dict[int, bytearray] = {}
|
||||||
|
self._packet_accum: dict[int, bytearray] = {}
|
||||||
|
self._last_speech_ts: dict[int, float] = {}
|
||||||
|
self._has_speech: dict[int, bool] = {}
|
||||||
|
self._sink_lock = threading.Lock()
|
||||||
|
|
||||||
|
def wants_opus(self) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def cleanup(self) -> None:
|
||||||
|
with self._sink_lock:
|
||||||
|
self._user_buffers.clear()
|
||||||
|
self._packet_accum.clear()
|
||||||
|
self._last_speech_ts.clear()
|
||||||
|
self._has_speech.clear()
|
||||||
|
|
||||||
|
def write(self, user, voice_data: VoiceData) -> None:
|
||||||
|
# ---- FIRST GUARD (LOAD-BEARING): bot's own voice ---------------
|
||||||
|
if user is None:
|
||||||
|
return
|
||||||
|
uid = int(getattr(user, "id", 0) or 0)
|
||||||
|
if uid == 0:
|
||||||
|
return
|
||||||
|
if uid == self.bot_user_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
# ---- SECOND GUARD: whitelist filter ----------------------------
|
||||||
|
if self.whitelist and uid not in self.whitelist:
|
||||||
|
return
|
||||||
|
|
||||||
|
pcm = getattr(voice_data, "pcm", None)
|
||||||
|
if not pcm:
|
||||||
|
return
|
||||||
|
|
||||||
|
window_pcm: Optional[bytes] = None
|
||||||
|
pcm_for_stt: Optional[bytes] = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self._sink_lock:
|
||||||
|
buf = self._user_buffers.setdefault(uid, bytearray())
|
||||||
|
accum = self._packet_accum.setdefault(uid, bytearray())
|
||||||
|
buf.extend(pcm)
|
||||||
|
accum.extend(pcm)
|
||||||
|
if len(accum) >= VAD_WINDOW_BYTES:
|
||||||
|
window_pcm = bytes(accum[:VAD_WINDOW_BYTES])
|
||||||
|
del accum[:VAD_WINDOW_BYTES]
|
||||||
|
|
||||||
|
if window_pcm is not None:
|
||||||
|
if self._vad_detects_speech(window_pcm):
|
||||||
|
with self._sink_lock:
|
||||||
|
self._last_speech_ts[uid] = time.monotonic()
|
||||||
|
self._has_speech[uid] = True
|
||||||
|
|
||||||
|
with self._sink_lock:
|
||||||
|
if self._has_speech.get(uid):
|
||||||
|
last = self._last_speech_ts.get(uid, 0.0)
|
||||||
|
silence_ms = (time.monotonic() - last) * 1000.0
|
||||||
|
if silence_ms >= SILENCE_FLUSH_MS:
|
||||||
|
pcm_for_stt = bytes(self._user_buffers.get(uid, b""))
|
||||||
|
self._user_buffers[uid] = bytearray()
|
||||||
|
self._packet_accum[uid] = bytearray()
|
||||||
|
self._has_speech[uid] = False
|
||||||
|
|
||||||
|
if pcm_for_stt:
|
||||||
|
self._flush_to_stt(uid, pcm_for_stt)
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("EchoVoiceSink.write failed: %s", e)
|
||||||
|
|
||||||
|
# ----- VAD -----
|
||||||
|
|
||||||
|
def _vad_detects_speech(self, pcm48_stereo: bytes) -> bool:
|
||||||
|
"""Run silero-vad on a 100ms window. Falls back to an RMS energy
|
||||||
|
threshold if torch / silero are unavailable."""
|
||||||
|
try:
|
||||||
|
mono16 = _pcm48_stereo_to_16_mono(pcm48_stereo)
|
||||||
|
if mono16.size == 0:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
import torch
|
||||||
|
except ImportError:
|
||||||
|
rms = float(np.sqrt(np.mean(mono16.astype(np.float64) ** 2)))
|
||||||
|
return rms > 0.02
|
||||||
|
model, _ = _get_silero_vad()
|
||||||
|
with torch.no_grad():
|
||||||
|
prob = float(model(torch.from_numpy(mono16),
|
||||||
|
SAMPLE_RATE_WHISPER).item())
|
||||||
|
return prob >= VAD_THRESHOLD
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.debug("VAD inference failed: %s", e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# ----- STT flush -----
|
||||||
|
|
||||||
|
def _flush_to_stt(self, user_id: int, pcm48_stereo: bytes) -> None:
|
||||||
|
"""Downsample, Whisper-transcribe RO, drop hallucinations, dispatch."""
|
||||||
|
try:
|
||||||
|
mono16 = _pcm48_stereo_to_16_mono(pcm48_stereo)
|
||||||
|
if mono16.size == 0:
|
||||||
|
return
|
||||||
|
model = _get_whisper_model()
|
||||||
|
segments, _info = model.transcribe(
|
||||||
|
mono16, language="ro", beam_size=1,
|
||||||
|
)
|
||||||
|
text_parts: list[str] = []
|
||||||
|
worst_no_speech = 0.0
|
||||||
|
for seg in segments:
|
||||||
|
no_sp = float(getattr(seg, "no_speech_prob", 0.0) or 0.0)
|
||||||
|
if no_sp > worst_no_speech:
|
||||||
|
worst_no_speech = no_sp
|
||||||
|
if no_sp > NO_SPEECH_DROP_THRESHOLD:
|
||||||
|
continue
|
||||||
|
seg_text = (getattr(seg, "text", "") or "").strip()
|
||||||
|
if seg_text:
|
||||||
|
text_parts.append(seg_text)
|
||||||
|
if not text_parts:
|
||||||
|
return
|
||||||
|
text = " ".join(text_parts).strip()
|
||||||
|
if not text:
|
||||||
|
return
|
||||||
|
self._schedule_segment_done(user_id, text, worst_no_speech)
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("Whisper transcribe failed: %s", e)
|
||||||
|
|
||||||
|
def _schedule_segment_done(
|
||||||
|
self, user_id: int, text: str, no_speech_prob: float,
|
||||||
|
) -> None:
|
||||||
|
loop = self.session.loop
|
||||||
|
if loop is None or not loop.is_running():
|
||||||
|
log.debug("voice session loop missing — dropping segment")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
asyncio.run_coroutine_threadsafe(
|
||||||
|
self.session.on_segment_done(user_id, text, no_speech_prob),
|
||||||
|
loop,
|
||||||
|
)
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
log.warning("voice segment dispatch failed: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"VoiceSession",
|
||||||
|
"EchoVoiceSink",
|
||||||
|
"FILLER_DELAY_S",
|
||||||
|
"SILENCE_FLUSH_MS",
|
||||||
|
"VAD_THRESHOLD",
|
||||||
|
"VAD_WINDOW_MS",
|
||||||
|
"NO_SPEECH_DROP_THRESHOLD",
|
||||||
|
]
|
||||||
319
tests/test_voice_session_cleanup.py
Normal file
319
tests/test_voice_session_cleanup.py
Normal file
@@ -0,0 +1,319 @@
|
|||||||
|
"""Cleanup-path tests for ``src/voice/pipeline.py::VoiceSession``.
|
||||||
|
|
||||||
|
Pins the centralized ``cleanup()`` contract from the voice plan
|
||||||
|
(Engineering decision #5): every one of the FIVE exit paths must drain
|
||||||
|
state cleanly and idempotently — lock released, JSONL flushed or
|
||||||
|
discarded, presence cleared, ``voice_client.cleanup()`` invoked,
|
||||||
|
``ttsq.stop()`` invoked, and a second call to ``cleanup()`` MUST be a
|
||||||
|
no-op (side effects happen exactly once).
|
||||||
|
|
||||||
|
The 5 paths under test:
|
||||||
|
1. ``test_cleanup_on_voice_leave`` — explicit ``/voice leave``
|
||||||
|
2. ``test_cleanup_on_disconnect`` — Discord-level disconnect
|
||||||
|
3. ``test_cleanup_on_crash`` — exception via ``__exit__``
|
||||||
|
4. ``test_cleanup_on_auto_leave`` — 5-min inactivity timer
|
||||||
|
5. ``test_cleanup_on_user_leaves_channel`` — user leaves voice channel
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from src.voice.pipeline import VoiceSession
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fixtures
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_bot():
|
||||||
|
bot = MagicMock(name="bot")
|
||||||
|
bot.user = MagicMock()
|
||||||
|
bot.user.id = 999_999
|
||||||
|
bot.change_presence = AsyncMock(name="change_presence")
|
||||||
|
bot.get_user = MagicMock(return_value=None)
|
||||||
|
return bot
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_voice_client():
|
||||||
|
vc = MagicMock(name="voice_client")
|
||||||
|
vc.cleanup = MagicMock(name="vc_cleanup")
|
||||||
|
return vc
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_ttsq():
|
||||||
|
ttsq = MagicMock(name="ttsq")
|
||||||
|
ttsq.stop = MagicMock(name="ttsq_stop")
|
||||||
|
return ttsq
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_text_channel():
|
||||||
|
tc = MagicMock(name="text_channel")
|
||||||
|
tc.send = AsyncMock(name="text_send")
|
||||||
|
return tc
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _make_session(
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_bot,
|
||||||
|
mock_voice_client,
|
||||||
|
mock_ttsq,
|
||||||
|
mock_text_channel,
|
||||||
|
*,
|
||||||
|
record_enabled: bool = True,
|
||||||
|
) -> VoiceSession:
|
||||||
|
jsonl = tmp_path / ("transcripts.jsonl" if record_enabled else "noop.jsonl")
|
||||||
|
return VoiceSession(
|
||||||
|
channel_id=1001,
|
||||||
|
guild_id=42,
|
||||||
|
text_channel=mock_text_channel,
|
||||||
|
voice_client=mock_voice_client,
|
||||||
|
bot=mock_bot,
|
||||||
|
ttsq=mock_ttsq,
|
||||||
|
whitelist={1234},
|
||||||
|
record_enabled=record_enabled,
|
||||||
|
mirror_enabled=True,
|
||||||
|
transcripts_jsonl_path=jsonl,
|
||||||
|
loop=None,
|
||||||
|
router_route_message=MagicMock(name="route_message"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _assert_clean_post_cleanup(
|
||||||
|
session: VoiceSession,
|
||||||
|
voice_client,
|
||||||
|
ttsq,
|
||||||
|
bot,
|
||||||
|
jsonl_path: Path,
|
||||||
|
record_enabled: bool,
|
||||||
|
) -> None:
|
||||||
|
"""Assertions shared across all five cleanup-path tests."""
|
||||||
|
# 1. Lock released — non-blocking acquire from this thread returns True.
|
||||||
|
acquired = session._lock.acquire(blocking=False)
|
||||||
|
assert acquired, "session._lock must be released after cleanup()"
|
||||||
|
session._lock.release()
|
||||||
|
|
||||||
|
# 2. voice_client.cleanup() called exactly once.
|
||||||
|
assert voice_client.cleanup.call_count == 1, (
|
||||||
|
f"voice_client.cleanup() called {voice_client.cleanup.call_count}x, "
|
||||||
|
f"expected 1"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. ttsq.stop() called exactly once.
|
||||||
|
assert ttsq.stop.call_count == 1, (
|
||||||
|
f"ttsq.stop() called {ttsq.stop.call_count}x, expected 1"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 4. bot.change_presence(activity=None) called at least once with that kwarg.
|
||||||
|
assert bot.change_presence.call_count >= 1, (
|
||||||
|
"bot.change_presence was never called — presence not restored"
|
||||||
|
)
|
||||||
|
bot.change_presence.assert_called_with(activity=None)
|
||||||
|
|
||||||
|
# 5. JSONL flushed (record=on) OR absent (record=off).
|
||||||
|
if record_enabled:
|
||||||
|
assert jsonl_path.exists(), (
|
||||||
|
"record=on: JSONL file must exist (was created by __enter__ and "
|
||||||
|
"left in place by cleanup so transcript can be persisted)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# record=off: cleanup unlinks the file if it ever existed.
|
||||||
|
assert not jsonl_path.exists() or jsonl_path.stat().st_size == 0
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scenario 1 — explicit /voice leave
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestCleanupOnVoiceLeave:
|
||||||
|
def test_cleanup_on_voice_leave(
|
||||||
|
self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
):
|
||||||
|
session = _make_session(
|
||||||
|
tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
record_enabled=True,
|
||||||
|
)
|
||||||
|
jsonl_path = session.transcripts_jsonl_path
|
||||||
|
|
||||||
|
with session:
|
||||||
|
# Simulate one transcript line.
|
||||||
|
session._jsonl_fh.write(json.dumps({"text": "salut"}) + "\n")
|
||||||
|
session.cleanup("voice_leave")
|
||||||
|
assert session._cleaned_up is True
|
||||||
|
|
||||||
|
# __exit__ called cleanup("exit") — must be a no-op the second time.
|
||||||
|
_assert_clean_post_cleanup(
|
||||||
|
session, mock_voice_client, mock_ttsq, mock_bot,
|
||||||
|
jsonl_path, record_enabled=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Idempotency: a third explicit call still doesn't bump counts.
|
||||||
|
session.cleanup("redundant")
|
||||||
|
assert mock_voice_client.cleanup.call_count == 1
|
||||||
|
assert mock_ttsq.stop.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scenario 2 — Discord-level voice disconnect
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestCleanupOnDisconnect:
|
||||||
|
def test_cleanup_on_disconnect(
|
||||||
|
self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
):
|
||||||
|
session = _make_session(
|
||||||
|
tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
record_enabled=False,
|
||||||
|
)
|
||||||
|
jsonl_path = session.transcripts_jsonl_path
|
||||||
|
|
||||||
|
session.__enter__()
|
||||||
|
# Network drop arrives outside the with-block.
|
||||||
|
session.cleanup("disconnect")
|
||||||
|
_assert_clean_post_cleanup(
|
||||||
|
session, mock_voice_client, mock_ttsq, mock_bot,
|
||||||
|
jsonl_path, record_enabled=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Idempotency.
|
||||||
|
session.cleanup("disconnect-again")
|
||||||
|
assert mock_voice_client.cleanup.call_count == 1
|
||||||
|
assert mock_ttsq.stop.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scenario 3 — crash / exception via __exit__
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestCleanupOnCrash:
|
||||||
|
def test_cleanup_on_crash(
|
||||||
|
self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
):
|
||||||
|
session = _make_session(
|
||||||
|
tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
record_enabled=True,
|
||||||
|
)
|
||||||
|
jsonl_path = session.transcripts_jsonl_path
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="simulated crash"):
|
||||||
|
with session:
|
||||||
|
# Pipeline raises mid-call.
|
||||||
|
raise RuntimeError("simulated crash")
|
||||||
|
|
||||||
|
# __exit__ must have driven cleanup — every side effect happened once.
|
||||||
|
_assert_clean_post_cleanup(
|
||||||
|
session, mock_voice_client, mock_ttsq, mock_bot,
|
||||||
|
jsonl_path, record_enabled=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Idempotency: explicit follow-up call (e.g. an outer error handler
|
||||||
|
# also tries to cleanup) MUST be a no-op.
|
||||||
|
session.cleanup("post-crash")
|
||||||
|
assert mock_voice_client.cleanup.call_count == 1
|
||||||
|
assert mock_ttsq.stop.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scenario 4 — auto-leave timer fires after 5 min inactivity
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestCleanupOnAutoLeave:
|
||||||
|
def test_cleanup_on_auto_leave(
|
||||||
|
self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
):
|
||||||
|
session = _make_session(
|
||||||
|
tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
record_enabled=True,
|
||||||
|
)
|
||||||
|
jsonl_path = session.transcripts_jsonl_path
|
||||||
|
|
||||||
|
session.__enter__()
|
||||||
|
# The auto-leave timer trips outside the with-block.
|
||||||
|
session.cleanup("auto_leave")
|
||||||
|
|
||||||
|
_assert_clean_post_cleanup(
|
||||||
|
session, mock_voice_client, mock_ttsq, mock_bot,
|
||||||
|
jsonl_path, record_enabled=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Idempotency.
|
||||||
|
session.cleanup("auto_leave_redundant")
|
||||||
|
assert mock_voice_client.cleanup.call_count == 1
|
||||||
|
assert mock_ttsq.stop.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scenario 5 — user leaves voice channel themselves
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestCleanupOnUserLeaves:
|
||||||
|
def test_cleanup_on_user_leaves_channel(
|
||||||
|
self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
):
|
||||||
|
session = _make_session(
|
||||||
|
tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
record_enabled=False,
|
||||||
|
)
|
||||||
|
jsonl_path = session.transcripts_jsonl_path
|
||||||
|
|
||||||
|
session.__enter__()
|
||||||
|
# voice_state_update event handler invokes cleanup directly.
|
||||||
|
session.cleanup("user_left_channel")
|
||||||
|
|
||||||
|
_assert_clean_post_cleanup(
|
||||||
|
session, mock_voice_client, mock_ttsq, mock_bot,
|
||||||
|
jsonl_path, record_enabled=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Idempotency.
|
||||||
|
session.cleanup("user_left_again")
|
||||||
|
assert mock_voice_client.cleanup.call_count == 1
|
||||||
|
assert mock_ttsq.stop.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Cross-cutting: failures inside cleanup don't propagate
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestCleanupRobustness:
|
||||||
|
def test_cleanup_swallows_voice_client_errors(
|
||||||
|
self, tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
):
|
||||||
|
"""If voice_client.cleanup() raises, ttsq.stop() must still run and
|
||||||
|
the lock must still release — otherwise a broken Discord state would
|
||||||
|
deadlock the channel forever."""
|
||||||
|
mock_voice_client.cleanup.side_effect = RuntimeError("vc died")
|
||||||
|
|
||||||
|
session = _make_session(
|
||||||
|
tmp_path, mock_bot, mock_voice_client, mock_ttsq, mock_text_channel,
|
||||||
|
record_enabled=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
with session:
|
||||||
|
session.cleanup("voice_leave")
|
||||||
|
|
||||||
|
# ttsq.stop still ran exactly once.
|
||||||
|
assert mock_ttsq.stop.call_count == 1
|
||||||
|
# Lock released.
|
||||||
|
acquired = session._lock.acquire(blocking=False)
|
||||||
|
assert acquired, "lock must release even when voice_client.cleanup raises"
|
||||||
|
session._lock.release()
|
||||||
Reference in New Issue
Block a user