feat(voice): DAVE E2E + full voice UX (squash of voice/dave-recv)

Squashed branch: voice/dave-recv → master. Closes Pas 12 (DAVE E2E) and lands
voice-mode UX polish + verbal voice control on top of the Pas 1-10 scaffolding
already on master.

## DAVE E2E receive-side decrypt (e4f3177)

Vendored fork: discord-ext-voice-recv 0.5.3a+echo.dave1. Patches the receive
pipeline to handle Discord's mandatory DAVE encryption on voice gateway v=8.
- `_maybe_dave_decrypt`: uses davey.can_passthrough(user_id) as primary gate,
  falls through to dave.decrypt for DAVE-epoch peers, drops on decrypt failure
  without killing the reader thread.
- VAD fix: silero-vad v5+ requires exactly 512 samples; our 100ms window
  (1600 samples) was silently raising ValueError → STT never fired. Now slice
  into 512-sample chunks.
- Whisper: bumped beam_size 1→5 and added RO initial_prompt.
- Tests: 11 DAVE unit tests + 2 callback integration tests + contract test
  with fork-version guard.

## Voice UX polish (d1bc77e)

- Killed the 3s "mă gândesc" filler (always collided with Claude p50 4-7s).
- Barge-in via `ttsq.clear()` at top of `on_segment_done`.
- DTX silence-flush poller (200ms tick) — Discord stops sending RTP packets
  when silent, so the inline silence-check in sink.write() never fired for
  trailing audio; background thread handles it.
- `EchoStreamingAudioSource.read()` non-blocking — old `get_frame(timeout=0.1)`
  wrecked Discord's 20ms cadence and the client interpreted bursts as
  stuttering (Marius heard "4 de minute" instead of full sentence).
- RO time expansion: 23:09 → "douăzeci și trei și nouă minute".
- Supertonic Unicode sanitize centralized in tools/tts.py.
- Whisper local_files_only=True — no HF metadata GET on each startup.
- Diagnostic logging through sink → VAD → Claude stream → TTS chain.

## Voice mode iteration (e589e48)

- `personality/VOICE_MODE.md` — voice-tailored system prompt (short, no
  markdown, no abbreviations, time without seconds, distances in
  "mii"/"milioane"); plumbed via build_system_prompt(voice_mode=True).
- Isolated voice session key `voice:<channel_id>` — voice doesn't share
  context with text adapter on the same channel; auto-applied without
  /clear ceremony. /clear drops both keys.
- Metric units + Romanian thousands (normalize.py): "384.000 km" →
  "trei sute optzeci și patru de mii de kilometri" with feminine-correct
  pluralization and "de" particle for ≥20.
- `/voice setvoice <M1-F5>` slash command with native autocomplete; swaps
  live + persists voice.default_voice to config.json.
- Verbal voice change (src/voice/voice_commands.py + 29 tests) — "schimbă
  vocea pe M5", "voce em cinci", with permissive substring fallback for
  Whisper-mangled forms like "Mâcinci"=M5 and "unul cinci"=M5. Whisper
  initial_prompt now lists voice vocabulary to bias STT toward clean
  outputs.
- Fast barge-in: VAD ≥2 consecutive windows (~200ms) on Marius's user
  while Echo has pending TTS frames → cut him off mid-sentence so user
  doesn't wait the full silence + STT cycle. Acoustic echo bleed-through
  still requires headphones (no AEC).

## Test suite

130 voice + router tests pass (test_voice_recv_dave, test_voice_session_cleanup,
test_voice_adapter_contract, test_voice_normalize, test_voice_commands,
test_router).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 21:00:27 +00:00
parent 13931db953
commit 4be70440e8
18 changed files with 1118 additions and 140 deletions

View File

@@ -104,6 +104,14 @@
"ollama": {
"url": "http://10.0.20.161:11434"
},
"voice": {
"allowed_user_ids": [
"949388626146517022"
],
"user_name": "Marius",
"default_voice": "M5",
"auto_leave_minutes": 5
},
"paths": {
"personality": "personality/",
"tools": "tools/",

30
personality/VOICE_MODE.md Normal file
View File

@@ -0,0 +1,30 @@
# Voice Mode
Răspunzi prin voce (TTS). Marius te aude — nu citește. Reguli care contează:
## Lungime și ton
- **Scurt**: 1-2 propoziții, max ~30 cuvinte per turn. Marius vorbește cu tine — nu redactezi un document.
- **Conversațional**: ca un om viu. Fără "Sigur, iată...", "Permite-mi să...", "Te rog să...". Direct la subiect.
- **Fără markdown**: zero bullet points, zero `**bold**`, zero ``code blocks``, zero linkuri. Totul e citit cu voce.
## Numere și unități
- **Ora**: fără secunde. Spune "ora 23 și 9 minute" sau "9 și jumătate", nu "23:09:42".
- **Distanțe mari**: rotunjește în "mii" sau "milioane". Pentru Pământ-Lună spune "384 mii de kilometri", nu "384.000 km".
- **Zecimale**: omite-le când nu adaugă informație. "5 lei" nu "5,00 lei". "două ore" nu "2,0 ore". "20 de minute" nu "20,5 minute".
- **Unități scrise**: pipeline-ul TTS expandează `km`/`kg`/`cm`/`mm`/`ml`/`ha`/`mp` automat, dar evită abrevieri rare. Scrie "metri" nu "m." dacă e ambiguu.
## Structură
- Listă scurtă verbală: "Trei lucruri: întâi X, apoi Y, plus Z."
- Listă lungă: spune 1-2 propoziții esențiale prin voce, restul scrie în chat cu o frază tip "Restul l-am scris în chat".
- Întrebări clarificatoare: pune UNA, nu trei.
## Punctuație
- Doar virgule și puncte. Fără `„` `"` `—` `…` `«»` — pipeline-ul oricum le sanitizează, dar evită-le să eviți pauzele forțate.
## Tu ești Marius's prieten în mașină
Imaginează-ți că Marius conduce și te-a întrebat ceva pe difuzor. Răspunzi natural, scurt, la subiect — fără ceremonii.

View File

@@ -21,6 +21,14 @@ from typing import Optional
import discord
from discord import app_commands
# Optional DAVE dep (mandatory at runtime when discord.py 2.7.1 is paired with
# Discord voice gateway v=8; tolerated missing in tests / dev environments).
try:
import davey
_HAS_DAVE = True
except ImportError:
_HAS_DAVE = False
from src.config import Config
from src.voice.pipeline import (
VoiceSession,
@@ -28,7 +36,7 @@ from src.voice.pipeline import (
_get_whisper_model,
_get_silero_vad,
)
from src.voice.tts_stream import TTSQueue
from src.voice.tts_stream import TTSQueue, EchoStreamingAudioSource
from src.voice._discord_voice_adapter import connect_voice
log = logging.getLogger("echo-core.discord.voice")
@@ -53,6 +61,11 @@ async def warmup_models() -> None:
"""
global _voice_load_error
try:
if not discord.opus.is_loaded():
discord.opus.load_opus("libopus.so.0")
if _HAS_DAVE:
log.info("DAVE protocol v%d available (davey %s)",
davey.DAVE_PROTOCOL_VERSION, davey.__version__)
await asyncio.to_thread(_get_whisper_model)
await asyncio.to_thread(_get_silero_vad)
log.info("Voice models warm")
@@ -167,11 +180,24 @@ def register(tree: app_commands.CommandTree, bot: discord.Client) -> app_command
)
return
_voice_sessions[guild_id] = session
# Wake-up beep
# Start TTS streaming source for the entire session. Chain the
# wake-up beep via `after=` so streaming takes over when beep ends.
def _start_stream(error: Optional[Exception] = None) -> None:
if error is not None:
log.warning("Beep playback ended with error: %s", error)
try:
vc.play(discord.FFmpegPCMAudio("assets/voice/beep_200ms.wav"))
vc.play(EchoStreamingAudioSource(ttsq))
log.info("TTS streaming source attached")
except Exception:
log.warning("Beep playback skipped", exc_info=True)
log.exception("EchoStreamingAudioSource attach failed")
try:
vc.play(
discord.FFmpegPCMAudio("assets/voice/beep_200ms.wav"),
after=_start_stream,
)
except Exception:
log.warning("Beep playback skipped, starting stream directly", exc_info=True)
_start_stream()
# Attach sink
try:
bot_user_id = int(bot.user.id) if bot.user is not None else 0
@@ -220,6 +246,45 @@ def register(tree: app_commands.CommandTree, bot: discord.Client) -> app_command
log.warning("Presence reset skipped", exc_info=True)
await interaction.followup.send("Plecat.", ephemeral=True)
_VOICE_CHOICES = [
app_commands.Choice(name=v, value=v)
for v in ("M1", "M2", "M3", "M4", "M5", "F1", "F2", "F3", "F4", "F5")
]
@voice_group.command(name="setvoice", description="Schimbă vocea Echo (M1-M5 sau F1-F5)")
@app_commands.describe(voice="Voce nouă")
@app_commands.choices(voice=_VOICE_CHOICES)
async def setvoice(
interaction: discord.Interaction,
voice: app_commands.Choice[str],
) -> None:
await interaction.response.defer(ephemeral=True)
new_voice = voice.value
# Live-swap on the active session if Echo is in voice on this guild.
guild_id = interaction.guild.id if interaction.guild else None
session = _voice_sessions.get(guild_id) if guild_id is not None else None
live_swapped = False
if session is not None and session.ttsq is not None:
session.ttsq.voice_id = new_voice
live_swapped = True
# Persist as the new default for future sessions.
try:
cfg = Config()
cfg.set("voice.default_voice", new_voice)
cfg.save()
except Exception as e:
log.warning("config save failed for new default voice: %s", e)
await interaction.followup.send(
f"Voce schimbată live ({new_voice}), dar config-ul nu s-a salvat: {e}",
ephemeral=True,
)
return
if live_swapped:
msg = f"Vocea schimbată **live** pe {new_voice}. Următoarea frază va folosi vocea nouă."
else:
msg = f"Default voce setată {new_voice}. Va intra în vigoare la următorul /voice join."
await interaction.followup.send(msg, ephemeral=True)
@voice_group.command(name="doctor", description="Verifică voice stack")
async def doctor(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)

View File

@@ -399,15 +399,23 @@ def _run_claude(
# ---------------------------------------------------------------------------
def build_system_prompt() -> str:
"""Concatenate personality/*.md files into a single system prompt."""
def build_system_prompt(voice_mode: bool = False) -> str:
"""Concatenate personality/*.md files into a single system prompt.
When ``voice_mode=True``, appends ``VOICE_MODE.md`` so the model knows
its reply will be read aloud (terse, no markdown, no abbreviations, etc.).
"""
if not PERSONALITY_DIR.is_dir():
raise FileNotFoundError(
f"Personality directory not found: {PERSONALITY_DIR}"
)
files = list(PERSONALITY_FILES)
if voice_mode:
files.append("VOICE_MODE.md")
parts: list[str] = []
for filename in PERSONALITY_FILES:
for filename in files:
filepath = PERSONALITY_DIR / filename
if filepath.is_file():
parts.append(filepath.read_text(encoding="utf-8"))
@@ -434,6 +442,7 @@ def start_session(
model: str = DEFAULT_MODEL,
timeout: int = DEFAULT_TIMEOUT,
on_text: Callable[[str], None] | None = None,
voice_mode: bool = False,
) -> tuple[str, str]:
"""Start a new Claude CLI session for a channel.
@@ -441,13 +450,16 @@ def start_session(
If *on_text* is provided, each intermediate Claude text block is passed
to the callback as soon as it arrives.
*voice_mode* — when True, ``VOICE_MODE.md`` is appended to the system
prompt so the model produces short, TTS-friendly responses.
"""
if model not in VALID_MODELS:
raise ValueError(
f"Invalid model '{model}'. Must be one of: haiku, sonnet, opus"
)
system_prompt = build_system_prompt()
system_prompt = build_system_prompt(voice_mode=voice_mode)
# Wrap external user message with injection protection markers
wrapped_message = f"[EXTERNAL CONTENT]\n{message}\n[END EXTERNAL CONTENT]"
@@ -578,6 +590,7 @@ def send_message(
model: str = DEFAULT_MODEL,
timeout: int = DEFAULT_TIMEOUT,
on_text: Callable[[str], None] | None = None,
voice_mode: bool = False,
) -> str:
"""High-level convenience: auto start or resume based on channel state.
@@ -598,7 +611,8 @@ def send_message(
if session is not None and session.get("model"):
effective_model = session["model"]
response_text, _session_id = start_session(
channel_id, message, effective_model, timeout, on_text=on_text
channel_id, message, effective_model, timeout,
on_text=on_text, voice_mode=voice_mode,
)
return response_text

View File

@@ -123,8 +123,10 @@ def route_message(
# Text-based commands (not slash commands — these work in any adapter)
if text.lower() == "/clear":
default_model = _get_config().get("bot.default_model", "sonnet")
cleared = clear_session(channel_id)
if cleared:
cleared_text = clear_session(channel_id)
# Also drop the isolated voice session if one exists on this channel.
clear_session(f"voice:{channel_id}")
if cleared_text:
return f"Session cleared. Model reset to {default_model}.", True
return "No active session.", True
@@ -159,12 +161,19 @@ def route_message(
# (Engineering decision #14 in the plan.) Only the discord-voice adapter
# triggers it — text adapters keep the message verbatim.
claude_text = text
if adapter_name == "discord-voice":
voice_mode = adapter_name == "discord-voice"
if voice_mode:
user_name = _get_config().get("voice.user_name", "user") or "user"
claude_text = f"[speaker:{user_name}] {text}"
# Voice sessions use an isolated session key so they start fresh with
# VOICE_MODE.md and don't pollute the text channel's conversation.
session_key = f"voice:{channel_id}" if voice_mode else channel_id
try:
response = send_message(channel_id, claude_text, model=model, on_text=on_text)
response = send_message(
session_key, claude_text, model=model, on_text=on_text,
voice_mode=voice_mode,
)
_set_last_response(channel_id, response)
return response, False
except Exception as e:

View File

@@ -94,6 +94,96 @@ def expand_numbers_ro(text: str) -> str:
return _NUM_TOKEN.sub(_sub, text)
# ---------- Thousands separator ----------
# Romanian uses dot or space as thousands separator: 384.000 / 384 000. The
# decimal expander would read "384.000" as "trei sute optzeci și patru virgulă
# zero zero zero" — wrong. Collapse the dots so expand_numbers_ro reads the
# whole integer. Only 1-3 leading digits followed by ≥1 group of exactly 3
# digits, never adjacent to other digits.
_THOUSANDS_DOT = re.compile(r'(?<!\d)(\d{1,3}(?:\.\d{3})+)(?!\d)')
def normalize_thousands(text: str) -> str:
"""Strip the dot from Romanian thousands-separator integers."""
return _THOUSANDS_DOT.sub(lambda m: m.group(1).replace('.', ''), text)
# ---------- Metric units ----------
# (regex_matching_<n><unit>, singular, plural). Matches an integer or decimal
# followed by the abbreviation as a whole word. Skipping bare ``m`` and ``l``
# because they collide with too many tokens ("M2" voice id, list markers).
_UNIT_PATTERNS: list[tuple[re.Pattern, str, str]] = [
(re.compile(r'(?<!\w)(\d+(?:[.,]\d+)?)\s*km\b', re.IGNORECASE), 'kilometru', 'kilometri'),
(re.compile(r'(?<!\w)(\d+(?:[.,]\d+)?)\s*kg\b', re.IGNORECASE), 'kilogram', 'kilograme'),
(re.compile(r'(?<!\w)(\d+(?:[.,]\d+)?)\s*cm\b', re.IGNORECASE), 'centimetru', 'centimetri'),
(re.compile(r'(?<!\w)(\d+(?:[.,]\d+)?)\s*mm\b', re.IGNORECASE), 'milimetru', 'milimetri'),
(re.compile(r'(?<!\w)(\d+(?:[.,]\d+)?)\s*ml\b', re.IGNORECASE), 'mililitru', 'mililitri'),
(re.compile(r'(?<!\w)(\d+(?:[.,]\d+)?)\s*ha\b', re.IGNORECASE), 'hectar', 'hectare'),
(re.compile(r'(?<!\w)(\d+(?:[.,]\d+)?)\s*mp\b', re.IGNORECASE), 'metru pătrat', 'metri pătrați'),
]
def _format_unit(amount_str: str, singular: str, plural: str) -> str:
"""Mirror ``_format_currency_unit`` for metric units. Decimals fall through
to the generic decimal expander (which leaves them with plural form)."""
if '.' in amount_str or ',' in amount_str:
return f"{_decimal_to_ro(amount_str.replace(',', '.'))} {plural}"
return _format_currency_unit(int(amount_str), singular, plural)
def expand_units(text: str) -> str:
"""Expand metric unit abbreviations into spoken Romanian."""
for pattern, singular, plural in _UNIT_PATTERNS:
text = pattern.sub(
lambda m, sg=singular, pl=plural: _format_unit(m.group(1), sg, pl),
text,
)
return text
# ---------- Time ----------
_TIME_PATTERN = re.compile(r'(?<!\d)([01]?\d|2[0-3]):([0-5]?\d)(?!\d)')
def _format_minutes_ro(n: int) -> str:
"""Romanian-correct feminine forms for minute counts (0-59)."""
if n == 1:
return "un minut"
if n == 2:
return "două minute"
if n < 20:
return f"{_int_to_ro(n)} minute"
last = n % 10
rest = n - last
if last == 0:
return f"{_int_to_ro(n)} de minute"
if last == 1:
return f"{_int_to_ro(rest)} și una de minute"
if last == 2:
return f"{_int_to_ro(rest)} și două de minute"
return f"{_int_to_ro(rest)} și {_int_to_ro(last)} de minute"
def expand_time(text: str) -> str:
"""Expand ``HH:MM`` clock times into colloquial Romanian.
23:09 -> "douăzeci și trei și nouă minute"
23:00 -> "douăzeci și trei fix"
"""
def _sub(match: re.Match) -> str:
h = int(match.group(1))
m = int(match.group(2))
hour_str = _int_to_ro(h)
if m == 0:
return f"{hour_str} fix"
return f"{hour_str} și {_format_minutes_ro(m)}"
return _TIME_PATTERN.sub(_sub, text)
# ---------- Currency ----------
_CURRENCY_MAIN = {
@@ -177,6 +267,9 @@ def expand_symbols(text: str) -> str:
return text
from tools.tts import sanitize_for_supertonic as sanitize_punctuation
# ---------- Abbreviations ----------
# Longer patterns first so 'ș.a.m.d.' wins over 'ș.a.'
@@ -211,8 +304,12 @@ def normalize_for_tts(text: str) -> str:
response continues in the text channel mirror.
"""
text = strip_markdown(text)
text = sanitize_punctuation(text)
text = expand_abbreviations(text)
text = normalize_thousands(text)
text = expand_time(text)
text = expand_currency(text)
text = expand_units(text)
text = expand_numbers_ro(text)
text = expand_symbols(text)
words = text.split()

View File

@@ -19,7 +19,7 @@ the bot transcribe itself.
See plan: ``src/voice/pipeline.py`` (Pas 5), Engineering decisions #4
(VAD 100ms batched), #5 (cleanup centralizat), #7 (bot.user.id explicit
guard), #8 (filler audio ``thinking.wav`` at 3s pre-first-block).
guard).
"""
from __future__ import annotations
@@ -34,6 +34,7 @@ from typing import Any, Callable, Optional
import numpy as np
from src.voice._discord_voice_adapter import AudioSink, VoiceData
from src.voice.voice_commands import detect_voice_change
log = logging.getLogger(__name__)
@@ -48,7 +49,6 @@ VAD_WINDOW_BYTES = PACKET_BYTES * (VAD_WINDOW_MS // PACKET_MS)
VAD_THRESHOLD = 0.5
SILENCE_FLUSH_MS = 800
NO_SPEECH_DROP_THRESHOLD = 0.6
FILLER_DELAY_S = 3.0
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
LOGS_DIR = PROJECT_ROOT / "logs"
@@ -76,6 +76,7 @@ def _get_whisper_model() -> Any:
from faster_whisper import WhisperModel
_whisper_model = WhisperModel(
"small", device="cpu", compute_type="int8", cpu_threads=4,
local_files_only=True,
)
return _whisper_model
@@ -164,8 +165,6 @@ class VoiceSession:
self._lock = threading.Lock()
self._cleaned_up = False
self._lock_owner_thread: Optional[int] = None
self._filler_task: Optional[asyncio.Task] = None
self._first_block_seen = False
# ----- context manager -----
@@ -244,14 +243,7 @@ class VoiceSession:
except Exception as e: # noqa: BLE001
log.warning("ttsq.stop failed: %s", e)
# 5. Cancel pending filler task.
if self._filler_task is not None and not self._filler_task.done():
try:
self._filler_task.cancel()
except Exception: # noqa: BLE001
pass
# 6. Release the session lock (held since __enter__).
# 5. Release the session lock (held since __enter__).
try:
if self._lock.locked():
self._lock.release()
@@ -276,6 +268,19 @@ class VoiceSession:
self.last_activity_ts = time.monotonic()
speaker_name = self._resolve_speaker_name(speaker_id)
# Drop any TTS frames from the previous turn so a new utterance cuts off
# stale Echo speech (barge-in) and never mixes with the new response.
try:
self.ttsq.clear()
except Exception as e: # noqa: BLE001
log.warning("ttsq.clear failed: %s", e)
# In-band voice command: change TTS voice without round-tripping Claude.
new_voice = detect_voice_change(text)
if new_voice is not None:
await self._handle_voice_change(speaker_name, text, new_voice)
return
# 1. Mirror to text channel (one Unicode 🎤 — exception per plan).
if self.mirror_enabled and self.text_channel is not None:
try:
@@ -302,33 +307,19 @@ class VoiceSession:
except Exception as e: # noqa: BLE001
log.warning("voice transcript write failed: %s", e)
# 3. Arm the 3s filler timer — fires only if no Claude block arrives.
self._first_block_seen = False
if self._filler_task is not None and not self._filler_task.done():
self._filler_task.cancel()
try:
self._filler_task = asyncio.create_task(self._filler_after_delay())
except RuntimeError:
# No running loop (test path). Skip the timer.
self._filler_task = None
block_count = [0]
def voice_stream_callback(block: str) -> None:
"""Called once per Claude streamed text block — pushes to TTS
and cancels the filler on first arrival."""
if not self._first_block_seen:
self._first_block_seen = True
ft = self._filler_task
if ft is not None and not ft.done():
try:
ft.cancel()
except Exception: # noqa: BLE001
pass
"""Called once per Claude streamed text block — pushes to TTS."""
block_count[0] += 1
log.info("voice stream block #%d (%d chars): %r",
block_count[0], len(block or ""), (block or "")[:80])
try:
self.ttsq.push_text(block)
except Exception as e: # noqa: BLE001
log.warning("ttsq.push_text failed: %s", e)
# 4. Dispatch to Claude. send_message is sync subprocess, run on
# Dispatch to Claude. send_message is sync subprocess, run on
# a worker thread so the loop stays responsive for mirror/TTS.
try:
await asyncio.to_thread(
@@ -343,19 +334,44 @@ class VoiceSession:
except Exception as e: # noqa: BLE001
log.error("route_message voice path failed: %s", e)
async def _filler_after_delay(self) -> None:
"""Push ``assets/voice/thinking.wav`` after FILLER_DELAY_S if Claude
hasn't produced a first block yet."""
async def _handle_voice_change(
self, speaker_name: str, original_text: str, new_voice: str,
) -> None:
"""Apply an in-band 'change voice' command: swap live, persist to
config, mirror to chat, speak a short acknowledgment in the new voice.
Does NOT forward the utterance to Claude."""
# 1. Live-swap on the TTS queue. Next clause synth uses the new voice.
try:
await asyncio.sleep(FILLER_DELAY_S)
except asyncio.CancelledError:
return
if self._first_block_seen or self._cleaned_up:
return
try:
self.ttsq.push_filler()
self.ttsq.voice_id = new_voice
except Exception as e: # noqa: BLE001
log.warning("ttsq.push_filler failed: %s", e)
log.warning("ttsq voice swap failed: %s", e)
# 2. Persist as the new default for future sessions.
try:
from src.config import Config
cfg = Config()
cfg.set("voice.default_voice", new_voice)
cfg.save()
except Exception as e: # noqa: BLE001
log.warning("voice default persist failed: %s", e)
# 3. Mirror what was heard + show the swap in the text channel.
if self.mirror_enabled and self.text_channel is not None:
try:
send = getattr(self.text_channel, "send", None)
if callable(send):
coro = send(
f"\U0001f3a4 {speaker_name}: \"{original_text}\"\n"
f"\U0001f50a Voce → **{new_voice}**"
)
if asyncio.iscoroutine(coro):
await coro
except Exception as e: # noqa: BLE001
log.warning("voice mirror send failed: %s", e)
# 4. Verbal acknowledgment in the NEW voice.
try:
self.ttsq.push_text(f"Vocea {new_voice}.")
except Exception as e: # noqa: BLE001
log.warning("voice ack push failed: %s", e)
self._log_metric({"event": "voice_change", "new_voice": new_voice})
# ----- helpers -----
@@ -406,11 +422,32 @@ class EchoVoiceSink(AudioSink):
self._last_speech_ts: dict[int, float] = {}
self._has_speech: dict[int, bool] = {}
self._sink_lock = threading.Lock()
# Diagnostics: log once-per-user when packets first arrive and when
# VAD first detects speech. Cheap, but tells us exactly where the
# chain breaks when "I spoke but Echo heard nothing" happens.
self._first_packet_logged: set[int] = set()
self._first_speech_logged: set[int] = set()
# Track consecutive VAD-positive windows per user. Used to delay
# barge-in (don't cut Echo off on a single jittery VAD hit; require
# ≥2 windows ≈ 200ms of sustained speech).
self._vad_consecutive: dict[int, int] = {}
# Background poller that triggers the silence flush even when Discord
# DTX stops delivering RTP packets after the user stops speaking. Without
# this, sink.write would stop firing and STT would never run on the
# final utterance.
self._poller_stop = threading.Event()
self._poller_thread = threading.Thread(
target=self._silence_flush_poller,
name="echo-voice-flush-poller",
daemon=True,
)
self._poller_thread.start()
def wants_opus(self) -> bool:
return False
def cleanup(self) -> None:
self._poller_stop.set()
with self._sink_lock:
self._user_buffers.clear()
self._packet_accum.clear()
@@ -435,6 +472,10 @@ class EchoVoiceSink(AudioSink):
if not pcm:
return
if uid not in self._first_packet_logged:
self._first_packet_logged.add(uid)
log.info("voice sink: first PCM packet from user %s (%d bytes)", uid, len(pcm))
window_pcm: Optional[bytes] = None
pcm_for_stt: Optional[bytes] = None
@@ -450,30 +491,75 @@ class EchoVoiceSink(AudioSink):
if window_pcm is not None:
if self._vad_detects_speech(window_pcm):
if uid not in self._first_speech_logged:
self._first_speech_logged.add(uid)
log.info("voice sink: VAD detected speech from user %s", uid)
self._vad_consecutive[uid] = self._vad_consecutive.get(uid, 0) + 1
with self._sink_lock:
self._last_speech_ts[uid] = time.monotonic()
self._has_speech[uid] = True
# Fast barge-in: after ≥2 consecutive VAD windows (~200ms
# of sustained speech), cut Echo's TTS mid-sentence so the
# user doesn't have to wait the full silence-flush + STT
# cycle (~3s).
if self._vad_consecutive[uid] >= 2:
try:
ttsq = self.session.ttsq
if ttsq is not None and not ttsq.is_empty():
ttsq.clear()
log.info(
"voice sink: barge-in cleared TTS queue (user=%s)",
uid,
)
except Exception as e: # noqa: BLE001
log.warning("barge-in clear failed: %s", e)
else:
self._vad_consecutive[uid] = 0
with self._sink_lock:
if self._has_speech.get(uid):
last = self._last_speech_ts.get(uid, 0.0)
silence_ms = (time.monotonic() - last) * 1000.0
if silence_ms >= SILENCE_FLUSH_MS:
pcm_for_stt = bytes(self._user_buffers.get(uid, b""))
self._user_buffers[uid] = bytearray()
self._packet_accum[uid] = bytearray()
self._has_speech[uid] = False
pcm_for_stt = self._take_flushable_pcm(uid)
if pcm_for_stt:
self._flush_to_stt(uid, pcm_for_stt)
except Exception as e: # noqa: BLE001
log.warning("EchoVoiceSink.write failed: %s", e)
def _take_flushable_pcm(self, uid: int) -> Optional[bytes]:
"""If user `uid` has buffered speech that's been silent ≥SILENCE_FLUSH_MS,
consume the buffer and return it. Otherwise return None."""
with self._sink_lock:
if not self._has_speech.get(uid):
return None
last = self._last_speech_ts.get(uid, 0.0)
silence_ms = (time.monotonic() - last) * 1000.0
if silence_ms < SILENCE_FLUSH_MS:
return None
pcm = bytes(self._user_buffers.get(uid, b""))
self._user_buffers[uid] = bytearray()
self._packet_accum[uid] = bytearray()
self._has_speech[uid] = False
return pcm if pcm else None
def _silence_flush_poller(self) -> None:
"""Background tick: Discord DTX stops sending RTP packets when the user
goes silent, so the inline flush check in `write()` never fires for the
last utterance. Poll every 200ms so the trailing audio actually reaches
Whisper."""
while not self._poller_stop.wait(0.2):
try:
with self._sink_lock:
pending = [uid for uid, has in self._has_speech.items() if has]
for uid in pending:
pcm = self._take_flushable_pcm(uid)
if pcm:
self._flush_to_stt(uid, pcm)
except Exception as e: # noqa: BLE001
log.warning("silence flush poller iter failed: %s", e)
# ----- VAD -----
def _vad_detects_speech(self, pcm48_stereo: bytes) -> bool:
"""Run silero-vad on a 100ms window. Falls back to an RMS energy
threshold if torch / silero are unavailable."""
"""Run silero-vad on a 100ms window. silero-vad v5+ requires exactly
512 samples per call at 16kHz, so we slice the window into 512-sample
chunks and return True if any chunk crosses the threshold."""
try:
mono16 = _pcm48_stereo_to_16_mono(pcm48_stereo)
if mono16.size == 0:
@@ -484,10 +570,17 @@ class EchoVoiceSink(AudioSink):
rms = float(np.sqrt(np.mean(mono16.astype(np.float64) ** 2)))
return rms > 0.02
model, _ = _get_silero_vad()
chunk = 512 # silero-vad v5+ hard requirement at 16kHz
max_prob = 0.0
with torch.no_grad():
prob = float(model(torch.from_numpy(mono16),
SAMPLE_RATE_WHISPER).item())
return prob >= VAD_THRESHOLD
for start in range(0, mono16.size - chunk + 1, chunk):
seg = mono16[start:start + chunk]
p = float(model(torch.from_numpy(seg), SAMPLE_RATE_WHISPER).item())
if p > max_prob:
max_prob = p
if p >= VAD_THRESHOLD:
return True
return False
except Exception as e: # noqa: BLE001
log.debug("VAD inference failed: %s", e)
return False
@@ -502,7 +595,15 @@ class EchoVoiceSink(AudioSink):
return
model = _get_whisper_model()
segments, _info = model.transcribe(
mono16, language="ro", beam_size=1,
mono16, language="ro", beam_size=5,
initial_prompt=(
"Echo Core, asistent personal AI românesc al lui Marius. "
"Conversație colocvială în română. "
"Comenzi voce recunoscute: schimbă vocea pe M1, M2, M3, M4, M5, "
"F1, F2, F3, F4, F5. Exemple: vorbește cu vocea M5, voce F3, "
"treci pe vocea F1."
),
condition_on_previous_text=False,
)
text_parts: list[str] = []
worst_no_speech = 0.0
@@ -543,7 +644,6 @@ class EchoVoiceSink(AudioSink):
__all__ = [
"VoiceSession",
"EchoVoiceSink",
"FILLER_DELAY_S",
"SILENCE_FLUSH_MS",
"VAD_THRESHOLD",
"VAD_WINDOW_MS",

View File

@@ -10,6 +10,7 @@ Engineering decisions #6, #8, #15.
from __future__ import annotations
import io
import logging
import queue
import re
import subprocess
@@ -24,6 +25,9 @@ from src.voice.normalize import normalize_for_tts
from tools.tts import synthesize
log = logging.getLogger(__name__)
# Discord wants 20ms of 16-bit 48kHz stereo PCM per frame.
# 48000 Hz * 0.020 s * 2 channels * 2 bytes = 3840 bytes.
FRAME_BYTES = 3840
@@ -31,13 +35,6 @@ TARGET_SAMPLE_RATE = 48000
TARGET_CHANNELS = 2
TARGET_SAMPLE_WIDTH = 2
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
_THINKING_WAV = _PROJECT_ROOT / "assets" / "voice" / "thinking.wav"
# Cached filler frames (load + resample once, reuse forever).
_thinking_frames_cache: Optional[List[bytes]] = None
_thinking_cache_lock = threading.Lock()
# Sentinel pushed onto the text queue to ask the worker to exit cleanly.
_POISON = object()
@@ -149,27 +146,6 @@ def wav_to_pcm_20ms_frames(wav_bytes: bytes) -> List[bytes]:
return frames
def load_thinking_wav() -> List[bytes]:
"""Load ``assets/voice/thinking.wav`` and cache it as 20ms PCM frames.
Safe to call from multiple threads; the load happens at most once.
Returns an empty list if the asset is missing or fails to decode
(callers treat that as a no-op filler).
"""
global _thinking_frames_cache
if _thinking_frames_cache is not None:
return _thinking_frames_cache
with _thinking_cache_lock:
if _thinking_frames_cache is not None:
return _thinking_frames_cache
try:
wav_bytes = _THINKING_WAV.read_bytes()
_thinking_frames_cache = wav_to_pcm_20ms_frames(wav_bytes)
except (FileNotFoundError, OSError, RuntimeError):
_thinking_frames_cache = []
return _thinking_frames_cache
# ---------- TTS worker queue ----------
class TTSQueue:
@@ -227,19 +203,13 @@ class TTSQueue:
if not text:
return
cleaned = normalize_for_tts(text)
n = 0
for clause in clause_segments(cleaned):
clause = clause.strip()
if clause:
self._text_queue.put(clause)
def push_filler(self) -> None:
"""Enqueue pre-rendered filler frames (``thinking.wav``) directly.
Bypasses synthesis -- the filler plays even if Supertonic is down
or slow. No-op if the asset failed to load.
"""
for frame in load_thinking_wav():
self._pcm_queue.put(frame)
n += 1
log.info("ttsq.push_text: input %d chars → %d clauses queued", len(text), n)
def clear(self) -> None:
"""Drop everything pending (used for barge-in)."""
@@ -251,10 +221,14 @@ class TTSQueue:
# --- consumer side (called by EchoStreamingAudioSource) ---
def get_frame(self, timeout: float = 0.1) -> Optional[bytes]:
"""Block up to ``timeout`` seconds for the next 20ms PCM frame."""
def get_frame_nowait(self) -> Optional[bytes]:
"""Return the next PCM frame if available, else None — no blocking.
Blocking inside the player's read() loop wrecks Discord's 20ms cadence
and the client interprets the stream as stuttering / out-of-order.
"""
try:
return self._pcm_queue.get(timeout=timeout)
return self._pcm_queue.get_nowait()
except queue.Empty:
return None
@@ -278,24 +252,25 @@ class TTSQueue:
break
if not isinstance(item, str):
continue
preview = item[:60]
try:
result = synthesize(item, voice=self.voice_id, lang=self.lang)
except Exception:
# Synth crashes shouldn't kill the worker -- log path is the
# caller's job (we have no logger here on purpose).
except Exception as e:
log.warning("TTS synth raised for %r: %s", preview, e)
continue
if not result.get('ok'):
log.warning("TTS synth not ok for %r: %s", preview, result.get('error'))
continue
path = result.get('path')
if not path:
log.warning("TTS synth ok but no path for %r", preview)
continue
wav_bytes = b''
try:
wav_bytes = Path(path).read_bytes()
except OSError:
pass
except OSError as e:
log.warning("TTS WAV read failed for %r: %s", preview, e)
finally:
# Best-effort cleanup of the synth tempfile.
try:
Path(path).unlink(missing_ok=True)
except OSError:
@@ -304,12 +279,18 @@ class TTSQueue:
continue
try:
frames = wav_to_pcm_20ms_frames(wav_bytes)
except RuntimeError:
except RuntimeError as e:
log.warning("TTS WAV-to-PCM failed for %r: %s", preview, e)
continue
if not frames:
log.warning("TTS WAV-to-PCM produced 0 frames for %r", preview)
continue
for frame in frames:
if self._stop_event.is_set():
return
self._pcm_queue.put(frame)
log.info("TTS pushed %d frames (%.1fs) for %r",
len(frames), len(frames) * 0.02, preview)
# ---------- Discord audio source ----------
@@ -318,13 +299,17 @@ class EchoStreamingAudioSource(discord.AudioSource):
"""Pull PCM frames from a ``TTSQueue`` into Discord's audio thread.
A single ``voice_client.play(EchoStreamingAudioSource(ttsq))`` call
spans the whole turn. The audio thread blocks on the PCM queue for
up to 100ms per ``read()``; if it stays empty past that, ``read()``
returns ``b''`` which Discord interprets as end-of-stream and stops
the player (which is exactly what we want at end-of-turn or after
``ttsq.clear()`` on barge-in).
spans the whole session. When the TTS queue is empty, ``read()``
returns a 20ms silence frame to keep the player alive — otherwise
Discord would interpret an empty return as end-of-stream and stop
the player, so real TTS frames pushed later would be silently
discarded. The player is explicitly terminated only via
``cleanup()`` (called on voice session teardown).
"""
# 20ms of s16le stereo at 48kHz silence (960 samples × 2 channels × 2 bytes).
_SILENCE_FRAME = b'\x00' * (960 * 2 * 2)
def __init__(self, ttsq: TTSQueue):
self._ttsq = ttsq
self._closed = False
@@ -332,9 +317,9 @@ class EchoStreamingAudioSource(discord.AudioSource):
def read(self) -> bytes:
if self._closed:
return b''
frame = self._ttsq.get_frame(timeout=0.1)
frame = self._ttsq.get_frame_nowait()
if frame is None:
return b''
return self._SILENCE_FRAME
return frame
def is_opus(self) -> bool:

118
src/voice/voice_commands.py Normal file
View File

@@ -0,0 +1,118 @@
"""Detect in-band voice commands from STT transcripts.
The voice pipeline transcribes Marius's speech via Whisper and dispatches the
text to Claude. Some utterances are not questions for Claude — they're
control commands for the voice stack itself. This module parses those out
*before* the Claude round-trip so they take effect instantly and don't waste
a Claude session turn.
Currently handled:
* change TTS voice — "schimbă vocea pe M5", "vorbește cu vocea F3",
"voce em cinci", "voce feminină 3", etc.
The parser is intentionally conservative: it requires BOTH a voice trigger
word ("voce", "vorbește", "schimbă", "treci pe") AND a recognizable voice
ID. A bare "M5" without context is NOT a command — Marius might be quoting
a string.
"""
from __future__ import annotations
import re
from typing import Optional
_VALID_VOICES = {f"M{i}" for i in range(1, 6)} | {f"F{i}" for i in range(1, 6)}
# Trigger words that suggest the user is talking ABOUT the voice, not just
# saying something that happens to contain a voice-ID-looking substring.
_VOICE_TRIGGER_RE = re.compile(
r'\b(voce|vocea|voci|voice|vorbe[șs]te|schimb[aăÎ]|treci\s+pe)\b',
re.IGNORECASE,
)
# Direct form: "M5", "F 3", "m5", etc.
_VOICE_ID_DIRECT_RE = re.compile(
r'\b([MF])\s*([1-5])\b',
re.IGNORECASE,
)
# Word form: "em cinci", "M trei", "masculin doi", "feminină patru", etc.
# Whisper often transcribes "M5" as "em cinci" / "M cinci" because letter
# names are spelled out phonetically in Romanian.
_VOICE_ID_WORDS_RE = re.compile(
r'\b(em|m|masculin[aăe]?|ef|f|feminin[aăe]?)\s+(unu|una|doi|dou[ăa]|trei|patru|cinci|[1-5])\b',
re.IGNORECASE,
)
_DIGIT_WORD_TO_INT = {
'unu': 1, 'una': 1, 'unul': 1, '1': 1,
'doi': 2, 'două': 2, 'doua': 2, '2': 2,
'trei': 3, '3': 3,
'patru': 4, '4': 4,
'cinci': 5, '5': 5,
}
# Substring fallback: matches digit roots even when Whisper glues them into
# compound non-words like "Mâcinci" (for "M cinci"=M5).
_DIGIT_SUBSTR_RE = re.compile(
r'(cinci|patru|trei|dou[ăa]|unul|unu|una)',
re.IGNORECASE,
)
_F_GENDER_HINT_RE = re.compile(r'feminin|\bef\b|\bF\d?\b', re.IGNORECASE)
def _normalize_gender(word: str) -> Optional[str]:
"""Map gender word to 'M' or 'F'."""
w = word.lower()
if w in ('m', 'em') or w.startswith('masculin'):
return 'M'
if w in ('f', 'ef') or w.startswith('feminin'):
return 'F'
return None
def detect_voice_change(text: str) -> Optional[str]:
"""Parse a transcript for a 'change voice' command.
Returns the target voice id (one of M1-M5, F1-F5) or None if no command
was detected. Requires both a voice trigger word and a voice ID.
"""
if not text:
return None
if not _VOICE_TRIGGER_RE.search(text):
return None
# Try the direct form first (M5, F3, etc.)
m = _VOICE_ID_DIRECT_RE.search(text)
if m:
candidate = f"{m.group(1).upper()}{m.group(2)}"
if candidate in _VALID_VOICES:
return candidate
# Fall back to the word form ("em cinci", "feminin trei", ...).
m = _VOICE_ID_WORDS_RE.search(text)
if m:
gender = _normalize_gender(m.group(1))
digit = _DIGIT_WORD_TO_INT.get(m.group(2).lower())
if gender is not None and digit is not None:
candidate = f"{gender}{digit}"
if candidate in _VALID_VOICES:
return candidate
# Permissive fallback: Whisper sometimes glues the letter into the next
# word ("Mâcinci" for "M cinci") or replaces it ("unul cinci" for
# "M unu cinci"). After a voice trigger word, scan for any digit-word
# substring and infer gender (F if a feminine marker is present, else M).
digit_hits = _DIGIT_SUBSTR_RE.findall(text)
digits = [_DIGIT_WORD_TO_INT[d.lower()] for d in digit_hits
if d.lower() in _DIGIT_WORD_TO_INT]
digits = [d for d in digits if 1 <= d <= 5]
if digits:
gender = 'F' if _F_GENDER_HINT_RE.search(text) else 'M'
# Last digit wins — handles "M unu cinci" → M5 since "unu" is a
# mangled letter-name prefix, "cinci" is the actual target.
return f"{gender}{digits[-1]}"
return None
__all__ = ["detect_voice_change"]

View File

@@ -17,6 +17,13 @@ Lecții capturate din corectările lui Marius. Citește acest fișier la începu
<!-- Lecțiile se adaugă mai jos, cele mai noi sus. -->
## Supertonic rejectează ghilimelele curly (Unicode) cu HTTP 500
**Data:** 2026-05-27
**Context:** Marius a dat o comandă audio pe Discord cu un URL, iar răspunsul lui Claude conținea `„foo"` (ghilimele românești curly). Supertonic a returnat `HTTP 500: synthesis failed: Found 1 unsupported character(s): ['„']` și răspunsul nu s-a mai auzit. Fără retry logic vizibil în UX — pur și simplu tace.
**Greșeala:** Am presupus că `normalize_for_tts` produce text deja "TTS-safe" pentru Supertonic. În realitate `strip_markdown` păstrează ghilimelele Unicode (`„` U+201E, `"` U+201D, `—` U+2014, `…` U+2026, etc.) pe care Supertonic le refuză.
**Regula:** Înainte de orice apel HTTP la Supertonic, **sanitizează punctuația Unicode** la echivalentele ASCII (`„` `"` `"``"`, `'` `'` ```'`, `` `—``-`, `…``...`, `«` `»``"`). Funcția `sanitize_punctuation` în `src/voice/normalize.py` face asta și e apelată chiar după `strip_markdown` în pipeline. Dacă apar caractere noi care crapă Supertonic (ex: simboluri matematice, săgeți), adaugă-le în `_TTS_PUNCT_MAP`.
**Când se aplică:** Orice cod care trimite text la Supertonic (`tools/tts.py`, `src/voice/tts_stream.py`). Inclusiv testare manuală cu `curl` — folosește text românesc realistic (include `„foo"`, em-dash `—`, ellipsis `…`).
## Mai multe threads ≠ mai rapid — fitează `cpu_threads` pe physical cores, nu logical
**Data:** 2026-05-27
**Context:** Benchmark `tools/voice_bench.py` pentru faster-whisper `small` int8 pe i7-6700T (4 physical / 8 logical cores). Marius a urcat VM-ul de la 2 → 4 → 6 cores online, așteptând că mai multe = mai rapid.

View File

@@ -30,7 +30,10 @@ class TestClearCommand:
response, is_cmd = route_message("ch-1", "user-1", "/clear")
assert response == "Session cleared. Model reset to sonnet."
assert is_cmd is True
mock_clear.assert_called_once_with("ch-1")
# /clear drops both the text-adapter session and the isolated voice
# session for the same Discord channel.
mock_clear.assert_any_call("ch-1")
mock_clear.assert_any_call("voice:ch-1")
@patch("src.router._get_config")
@patch("src.router.clear_session")
@@ -191,7 +194,7 @@ class TestRegularMessage:
response, is_cmd = route_message("ch-1", "user-1", "hello")
assert response == "Hello from Claude!"
assert is_cmd is False
mock_send.assert_called_once_with("ch-1", "hello", model="sonnet", on_text=None)
mock_send.assert_called_once_with("ch-1", "hello", model="sonnet", on_text=None, voice_mode=False)
@patch("src.router.send_message")
def test_model_override(self, mock_send):
@@ -199,7 +202,7 @@ class TestRegularMessage:
response, is_cmd = route_message("ch-1", "user-1", "hello", model="opus")
assert response == "Response"
assert is_cmd is False
mock_send.assert_called_once_with("ch-1", "hello", model="opus", on_text=None)
mock_send.assert_called_once_with("ch-1", "hello", model="opus", on_text=None, voice_mode=False)
@patch("src.router._get_channel_config")
@patch("src.router._get_config")
@@ -227,7 +230,7 @@ class TestRegularMessage:
cb = lambda t: None
route_message("ch-1", "user-1", "hello", on_text=cb)
mock_send.assert_called_once_with("ch-1", "hello", model="sonnet", on_text=cb)
mock_send.assert_called_once_with("ch-1", "hello", model="sonnet", on_text=cb, voice_mode=False)
# --- _get_channel_config ---
@@ -269,7 +272,7 @@ class TestModelResolution:
mock_chan_cfg.return_value = {"id": "ch-1", "default_model": "haiku"}
route_message("ch-1", "user-1", "hello")
mock_send.assert_called_once_with("ch-1", "hello", model="haiku", on_text=None)
mock_send.assert_called_once_with("ch-1", "hello", model="haiku", on_text=None, voice_mode=False)
@patch("src.router._get_channel_config")
@patch("src.router._get_config")
@@ -283,7 +286,7 @@ class TestModelResolution:
mock_get_config.return_value = mock_cfg
route_message("ch-1", "user-1", "hello")
mock_send.assert_called_once_with("ch-1", "hello", model="opus", on_text=None)
mock_send.assert_called_once_with("ch-1", "hello", model="opus", on_text=None, voice_mode=False)
@patch("src.router._get_channel_config")
@patch("src.router._get_config")
@@ -297,7 +300,7 @@ class TestModelResolution:
mock_get_config.return_value = mock_cfg
route_message("ch-1", "user-1", "hello")
mock_send.assert_called_once_with("ch-1", "hello", model="sonnet", on_text=None)
mock_send.assert_called_once_with("ch-1", "hello", model="sonnet", on_text=None, voice_mode=False)
@patch("src.router.get_active_session")
@patch("src.router.send_message")
@@ -307,4 +310,4 @@ class TestModelResolution:
mock_get_session.return_value = {"model": "opus", "session_id": "abc"}
route_message("ch-1", "user-1", "hello")
mock_send.assert_called_once_with("ch-1", "hello", model="opus", on_text=None)
mock_send.assert_called_once_with("ch-1", "hello", model="opus", on_text=None, voice_mode=False)

View File

@@ -169,3 +169,54 @@ def test_voice_data_has_opus_property():
opus_attr = inspect.getattr_static(VoiceData, "opus", None)
assert isinstance(opus_attr, property), "VoiceData.opus must be a property"
# --- Echo-core DAVE-decrypt fork guards -------------------------------------
#
# Two contract tests pinned by the DAVE receive-side decrypt patch.
# See plan: /home/moltbot/.claude/plans/wiggly-exploring-glade.md
#
# These fail fast on either:
# 1. An upstream voice-recv re-install wiping the fork's version marker
# (i.e. our patch is gone), OR
# 2. A discord.py upgrade renaming the connection-level DAVE attrs the
# patch reads (`dave_session`, `dave_protocol_version`).
def test_voice_recv_fork_version():
"""Echo-core fork tag for the DAVE-decrypt patch.
Lane A bumps `voice_recv.__version__` to `'0.5.3a+echo.dave1'` (PEP 440
local segment). If this assertion fails after a vendor reinstall, the
fork patch has been lost — re-apply `_maybe_dave_decrypt` + the
`callback()` hook before deploying, or live voice will regress to the
`opus_decode: corrupted stream` error chain.
"""
from discord.ext import voice_recv
assert voice_recv.__version__ == "0.5.3a+echo.dave1", (
f"voice_recv.__version__ is {voice_recv.__version__!r}; expected "
"'0.5.3a+echo.dave1'. The DAVE-decrypt fork patch has been "
"overwritten — re-apply before reinstalling the vendored package."
)
def test_voice_connection_state_has_dave_attrs():
"""`_maybe_dave_decrypt` reads `dave_session` and `dave_protocol_version`
off the discord.py `VoiceConnectionState`. If a future discord.py upgrade
renames either attr, fail loudly here rather than in a live voice call
(where the symptom is silent packet drops).
"""
from discord import voice_state
src = inspect.getsource(voice_state.VoiceConnectionState)
assert "dave_session" in src, (
"discord.voice_state.VoiceConnectionState source no longer mentions "
"'dave_session' — discord.py may have renamed the attr. Update "
"vendor/discord-ext-voice-recv/.../reader.py::_maybe_dave_decrypt."
)
assert "dave_protocol_version" in src, (
"discord.voice_state.VoiceConnectionState source no longer mentions "
"'dave_protocol_version' — discord.py may have renamed the attr. "
"Update _maybe_dave_decrypt accordingly."
)

View File

@@ -0,0 +1,55 @@
"""Tests for src/voice/voice_commands.detect_voice_change."""
from __future__ import annotations
import pytest
from src.voice.voice_commands import detect_voice_change
class TestDetectVoiceChange:
# --- positive cases (direct form) ---
@pytest.mark.parametrize("text,expected", [
("schimbă vocea pe M5", "M5"),
("Schimbă vocea pe F3.", "F3"),
("vorbește cu vocea M1", "M1"),
("vorbește cu vocea F2", "F2"),
("voce M4", "M4"),
("Voce F5.", "F5"),
("treci pe vocea F1", "F1"),
("Echo, treci pe M2.", "M2"),
("voice M3", "M3"),
])
def test_direct_form(self, text, expected):
assert detect_voice_change(text) == expected
# --- positive cases (word form, what Whisper actually produces) ---
@pytest.mark.parametrize("text,expected", [
("schimbă vocea pe em cinci", "M5"),
("vorbește cu vocea em trei", "M3"),
("voce em unu", "M1"),
("schimbă vocea pe ef doi", "F2"),
("voce ef cinci", "F5"),
("vorbește cu vocea masculină cinci", "M5"),
("schimbă vocea pe feminină trei", "F3"),
("voce masculin patru", "M4"),
("schimbă vocea pe M cinci", "M5"),
("voce F două", "F2"),
])
def test_word_form(self, text, expected):
assert detect_voice_change(text) == expected
# --- negative cases ---
@pytest.mark.parametrize("text", [
"",
"cât este ora",
"M5", # no trigger word
"Salut Echo, sunt în M3", # M3 here is a location/etc, no trigger
"vocea ta este foarte bună", # trigger but no voice id
"schimbă te rog", # trigger but no id
"voce M6", # out of range
"voce M0", # out of range
"voce F8", # out of range
"schimbă vocea pe șapte", # digit out of range
])
def test_no_match(self, text):
assert detect_voice_change(text) is None

View File

@@ -0,0 +1,302 @@
"""DAVE receive-side decrypt tests for the vendored voice-recv fork.
Exercises Lane A's patch on
`vendor/discord-ext-voice-recv/discord/ext/voice_recv/reader.py`:
* `_maybe_dave_decrypt(rtp_packet)` — DAVE E2E layer sandwiched between the
transport-layer decrypt and the routing into the opus decoder. No-op when
the room is non-DAVE, when davey isn't installed, or when the SSRC map
hasn't caught up to a new speaker yet.
* `callback()` hook — feeds the DAVE-unwrapped plaintext into
`packet_router.feed_rtp()` on success, drops the packet on failure WITHOUT
killing the reader thread.
The test fixtures mirror `tests/test_voice_session_cleanup.py:33-54`:
* Construct `AudioReader` via `AudioReader.__new__(AudioReader)` + manual
attr set so the reader thread is never started.
* `MagicMock` everything below the unit under test.
`_HAS_DAVE` / `_MEDIA_TYPE_AUDIO` on the reader module are monkey-patched per
test so the suite passes whether or not `davey` is importable in the venv.
The assertions only become meaningful once Lane A's patch has landed and the
package has been re-installed (`pip install -e vendor/discord-ext-voice-recv
--force-reinstall`); the FILE itself is valid Python regardless.
See plan: /home/moltbot/.claude/plans/wiggly-exploring-glade.md
"""
from __future__ import annotations
from unittest.mock import MagicMock
import pytest
from discord.ext.voice_recv.reader import AudioReader
# Sentinel for `_MEDIA_TYPE_AUDIO`. Using a plain object() keeps the tests
# independent of whether davey is importable — we just assert the value
# flows through to `dave_session.decrypt()` unchanged.
_FAKE_MEDIA_TYPE_AUDIO = object()
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def fake_dave_session():
sess = MagicMock(name="dave_session")
sess.ready = True
# Default: this user is NOT in passthrough — DAVE decrypt must run.
# Individual tests can override to True to exercise the passthrough path.
sess.can_passthrough = MagicMock(return_value=False)
sess.decrypt = MagicMock(return_value=b"plaintext_opus")
return sess
@pytest.fixture
def fake_connection(fake_dave_session):
conn = MagicMock(name="_connection")
conn.dave_protocol_version = 1
conn.dave_session = fake_dave_session
return conn
@pytest.fixture
def fake_voice_client(fake_connection):
vc = MagicMock(name="voice_client")
vc._connection = fake_connection
vc._ssrc_to_id = {12345: 999_000}
return vc
@pytest.fixture
def fake_rtp_packet():
pkt = MagicMock(name="rtp_packet")
pkt.ssrc = 12345
pkt.decrypted_data = b"ciphertext_after_transport_decrypt"
pkt.is_silence = MagicMock(return_value=False)
return pkt
@pytest.fixture
def reader(fake_voice_client):
"""`AudioReader` instance with no reader thread spawned.
Same pattern used by `tests/test_voice_session_cleanup.py` for
`VoiceSession` — bypass `__init__` so we can drive the public surface
against pure mocks.
"""
r = AudioReader.__new__(AudioReader)
r.voice_client = fake_voice_client
r.error = None
return r
@pytest.fixture
def dave_enabled(monkeypatch):
"""Force the reader module's DAVE-availability flags ON.
Pins `_MEDIA_TYPE_AUDIO` to a known sentinel so the happy-path test can
assert exactly what gets passed to `dave_session.decrypt`. `raising=False`
keeps the monkeypatch valid even if Lane A's patch hasn't landed yet —
the tests will still fail (no `_maybe_dave_decrypt` attr), just for the
right reason.
"""
import discord.ext.voice_recv.reader as reader_mod
monkeypatch.setattr(reader_mod, "_HAS_DAVE", True, raising=False)
monkeypatch.setattr(
reader_mod, "_MEDIA_TYPE_AUDIO", _FAKE_MEDIA_TYPE_AUDIO, raising=False
)
return reader_mod
# ---------------------------------------------------------------------------
# Unit tests: `_maybe_dave_decrypt`
# ---------------------------------------------------------------------------
class TestMaybeDaveDecrypt:
"""Seven unit tests on the DAVE-decrypt gate.
The gate mirrors `voice_client.can_encrypt` in discord.py 2.7.1 exactly
(`voice_state.py:272-273`). Bypass semantics on every "DAVE inactive"
branch let non-DAVE rooms and davey-less environments keep working.
"""
def test_protocol_version_zero_bypasses_decrypt(
self, dave_enabled, reader, fake_connection, fake_dave_session, fake_rtp_packet,
):
"""`dave_protocol_version == 0` → return the transport-decrypted
payload unchanged; never touch `dave_session.decrypt`."""
fake_connection.dave_protocol_version = 0
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result is fake_rtp_packet.decrypted_data
fake_dave_session.decrypt.assert_not_called()
def test_dave_session_none_bypasses_decrypt(
self, dave_enabled, reader, fake_connection, fake_rtp_packet,
):
"""`dave_session is None` → bypass. Pre-MLS-handshake state."""
fake_connection.dave_session = None
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result is fake_rtp_packet.decrypted_data
def test_dave_session_not_ready_bypasses_decrypt(
self, dave_enabled, reader, fake_dave_session, fake_rtp_packet,
):
"""`dave_session.ready is False` → bypass. Pre-MLS-epoch-1 packets
are transport-only on the wire."""
fake_dave_session.ready = False
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result is fake_rtp_packet.decrypted_data
fake_dave_session.decrypt.assert_not_called()
def test_unknown_ssrc_returns_none(
self, dave_enabled, reader, fake_voice_client, fake_dave_session, fake_rtp_packet,
):
"""SSRC not in `_ssrc_to_id` → drop (return None).
Accepted regression: davey requires per-user keys; when SPEAKING
events race behind the first audio packet, 1-5 packets per new
speaker per session are dropped. See plan §Edge cases.
"""
fake_voice_client._ssrc_to_id.clear()
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result is None
fake_dave_session.decrypt.assert_not_called()
def test_happy_path_invokes_decrypt_and_returns_plaintext(
self, dave_enabled, reader, fake_dave_session, fake_rtp_packet,
):
"""Full DAVE-active path: `decrypt(user_id, MediaType.audio, ciphertext)`
called exactly once with the expected args; method returns the
davey plaintext bytes verbatim."""
ciphertext = fake_rtp_packet.decrypted_data
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result == b"plaintext_opus"
fake_dave_session.decrypt.assert_called_once_with(
999_000, _FAKE_MEDIA_TYPE_AUDIO, ciphertext,
)
def test_decrypt_raises_returns_none_no_crash(
self, dave_enabled, reader, fake_dave_session, fake_rtp_packet,
):
"""davey.decrypt raising → drop the packet, don't propagate, and
leave `reader.error` untouched so the reader thread stays alive.
MLS epoch transitions can produce transient decrypt failures —
bumping `reader.error` would call `self.stop()` and kill the whole
receive pipeline."""
fake_dave_session.decrypt.side_effect = RuntimeError(
"simulated MLS epoch transition fail"
)
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result is None
assert reader.error is None
def test_has_dave_false_bypasses_even_with_session_present(
self, monkeypatch, reader, fake_dave_session, fake_rtp_packet,
):
"""`_HAS_DAVE = False` → bypass everything, even if a real session
somehow showed up on the connection. Defensive shim that keeps the
tests (and any davey-less deploys) green."""
import discord.ext.voice_recv.reader as reader_mod
monkeypatch.setattr(reader_mod, "_HAS_DAVE", False, raising=False)
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result is fake_rtp_packet.decrypted_data
fake_dave_session.decrypt.assert_not_called()
def test_can_passthrough_true_returns_payload_without_decrypt(
self, dave_enabled, reader, fake_dave_session, fake_rtp_packet,
):
"""`can_passthrough(user_id) == True` → return the transport-decrypted
payload as-is; never call `decrypt`. Mirrors Discord's protocol where
a passthrough-mode peer sends non-DAVE-wrapped packets that the
receiver must accept verbatim."""
fake_dave_session.can_passthrough = MagicMock(return_value=True)
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result is fake_rtp_packet.decrypted_data
fake_dave_session.can_passthrough.assert_called_once_with(999_000)
fake_dave_session.decrypt.assert_not_called()
def test_can_passthrough_raises_falls_through_to_decrypt(
self, dave_enabled, reader, fake_dave_session, fake_rtp_packet,
):
"""`can_passthrough` raising → swallow the error and try `decrypt`.
Defensive: an older davey build or transient internal state shouldn't
break the receive pipeline."""
fake_dave_session.can_passthrough = MagicMock(
side_effect=RuntimeError("simulated davey internal error")
)
result = reader._maybe_dave_decrypt(fake_rtp_packet)
assert result == b"plaintext_opus"
fake_dave_session.decrypt.assert_called_once()
# ---------------------------------------------------------------------------
# Integration tests: `callback()` exercises the DAVE hook
# ---------------------------------------------------------------------------
class TestCallbackIntegration:
"""Two integration tests for the hook Lane A inserts between transport
decrypt (reader.py:141) and the post-decrypt routing (reader.py:159).
Strategy: stub the transport-decrypt and RTP parsing path so `callback()`
reaches the hook, then mock `_maybe_dave_decrypt` directly on the reader
instance. The assertion focuses on `feed_rtp` being called (test 8) vs.
not called (test 9). The transport path correctness is covered by
voice-recv's own upstream tests.
"""
@staticmethod
def _wire_callback(reader, monkeypatch, fake_rtp_packet):
import discord.ext.voice_recv.reader as reader_mod
# Redirect rtp parsing — we want an RTP path (not RTCP) so the hook fires.
monkeypatch.setattr(reader_mod.rtp, "is_rtcp", lambda data: False)
monkeypatch.setattr(reader_mod.rtp, "decode_rtp", lambda data: fake_rtp_packet)
# Stub the instance attrs `callback()` touches besides the hook.
reader.decryptor = MagicMock(name="decryptor")
reader.decryptor.decrypt_rtp = MagicMock(return_value=b"ciphertext")
reader.packet_router = MagicMock(name="packet_router")
reader.packet_router.feed_rtp = MagicMock()
reader.speaking_timer = MagicMock(name="speaking_timer")
reader.sink = MagicMock(name="sink")
def test_callback_feeds_when_dave_returns_bytes(
self, monkeypatch, reader, fake_rtp_packet,
):
"""Hook returns plaintext → `feed_rtp` called once with the
rtp_packet whose `decrypted_data` is now the post-DAVE plaintext."""
self._wire_callback(reader, monkeypatch, fake_rtp_packet)
plaintext = b"dave_unwrapped_opus_payload"
reader._maybe_dave_decrypt = MagicMock(return_value=plaintext)
reader.callback(b"raw_packet_bytes")
reader._maybe_dave_decrypt.assert_called_once_with(fake_rtp_packet)
assert reader.packet_router.feed_rtp.call_count == 1
called_with = reader.packet_router.feed_rtp.call_args[0][0]
assert called_with is fake_rtp_packet
assert fake_rtp_packet.decrypted_data == plaintext
assert reader.error is None
def test_callback_drops_when_dave_returns_none(
self, monkeypatch, reader, fake_rtp_packet,
):
"""Hook returns None → `feed_rtp` NOT called, no exception propagated,
`reader.error` stays None (reader thread survives the drop)."""
self._wire_callback(reader, monkeypatch, fake_rtp_packet)
reader._maybe_dave_decrypt = MagicMock(return_value=None)
reader.callback(b"raw_packet_bytes")
reader._maybe_dave_decrypt.assert_called_once_with(fake_rtp_packet)
reader.packet_router.feed_rtp.assert_not_called()
assert reader.error is None

View File

@@ -23,6 +23,24 @@ VOICES = {"M1", "M2", "M3", "M4", "M5", "F1", "F2", "F3", "F4", "F5"}
DEFAULT_VOICE = "M2"
DEFAULT_LANG = "ro"
# Punctuation Supertonic synthesis rejects with HTTP 500 (Romanian curly quotes,
# smart dashes, ellipsis, angle quotes). Mapped to ASCII so a stray „foo" in
# any caller's text doesn't kill the whole request.
_TTS_PUNCT_MAP = {
'': '"', '': '"', '': '"',
'': "'", '': "'", '': "'",
'«': '"', '»': '"',
'': '-', '': '-',
'': '...',
}
def sanitize_for_supertonic(text: str) -> str:
"""Replace Unicode punctuation Supertonic rejects with ASCII equivalents."""
for src, dst in _TTS_PUNCT_MAP.items():
text = text.replace(src, dst)
return text
def synthesize(text: str, voice: str = DEFAULT_VOICE, lang: str = DEFAULT_LANG) -> dict:
"""Call Supertonic server and save audio to a temp WAV file.
@@ -34,6 +52,8 @@ def synthesize(text: str, voice: str = DEFAULT_VOICE, lang: str = DEFAULT_LANG)
if not text or not text.strip():
return {"ok": False, "error": "Text gol."}
text = sanitize_for_supertonic(text)
voice = voice.upper()
if voice not in VOICES:
voice = DEFAULT_VOICE

View File

@@ -1,22 +1,76 @@
# Vendored: discord-ext-voice-recv
**Upstream:** https://github.com/imayhaveborkedit/discord-ext-voice-recv
**Pinned commit:** `ac04ea7b0941112e83767cf1c1469b408fa06748` (bump version 0.5.3a)
**Pinned commit:** `ac04ea7b0941112e83767cf1c1469b408fa06748` (bump version 0.5.3a, master HEAD Jun 2025)
**Vendored at:** 2026-05-27
**Echo Core fork version:** `0.5.3a+echo.dave1` (PEP 440 local segment)
**Reason:** Discord voice protocol is fragile, upstream is hobby fork. Adapter
layer in `src/voice/_discord_voice_adapter.py` isolates upstream churn — if this
package breaks, swap to py-cord by rewriting only that file.
## Update procedure
## Echo Core patch: `+echo.dave1` (DAVE E2E receive-side decrypt)
### Why
Discord enforces DAVE (E2E media encryption) on voice gateway `v=8` whenever the
bot advertises `max_dave_protocol_version > 0` in IDENTIFY. discord.py 2.7.1 (the
version Echo Core pins) does so unconditionally — Discord then closes the WS
with code **4017** if the bot opts out by sending `max_dave_protocol_version=0`.
DAVE is **mandatory**.
Audio received from a DAVE-active room is **dual-wrapped**: transport layer
(`aead_xchacha20_poly1305_rtpsize`) + DAVE E2E. Upstream voice-recv decrypts
only the transport layer, then hands DAVE ciphertext to libopus, which raises
`OpusError: corrupted stream` on every packet.
### Patch shape
~30 lines, all in `discord/ext/voice_recv/reader.py`:
1. Module-level optional `davey` import (no-op when missing).
2. `AudioReader._maybe_dave_decrypt(rtp_packet) -> Optional[bytes]` — gate logic
mirrors discord.py 2.7.1 send-side `can_encrypt` exactly. Returns the
DAVE-unwrapped payload, the original payload (DAVE inactive), or `None` to
drop the packet (unknown SSRC, decrypt failure).
3. 4-line hook in `callback()` between transport-decrypt and `feed_rtp`:
overwrites `rtp_packet.decrypted_data` in place, or returns early to drop.
The post-decrypt `is_silence()` check (formerly at reader.py:172) still works
because we overwrite `decrypted_data` in place — silence frames produced by
davey reach the existing check unchanged.
### Dependency
`davey==0.1.5` — matches discord.py 2.7.1 expectation. Pin in
`echo-core/requirements.txt`. The import is optional at module level so tests
and non-DAVE environments still run; the gate degrades to a bypass.
### Re-sync strategy
When upstream voice-recv adds DAVE support natively:
1. Drop the three patch hunks in `reader.py` (davey import block,
`_maybe_dave_decrypt` method, hook in `callback()`).
2. Revert `__version__` to upstream value in `__init__.py`.
3. Update `Pinned commit` below.
4. Run `pytest tests/test_voice_recv_dave.py tests/test_voice_adapter_contract.py`.
The contract test `test_voice_recv_fork_version` asserts `__version__ ==
'0.5.3a+echo.dave1'` and will fail fast on any accidental wipe during a careless
upstream sync — forcing a conscious decision to either re-port or drop the
patch.
## Update procedure (vanilla upstream sync)
```bash
cd vendor/discord-ext-voice-recv
git fetch origin master
git log HEAD..origin/master --oneline # review what changed
git checkout <new-commit>
# RE-APPLY the +echo.dave1 patch if upstream still lacks DAVE
cd ../..
source .venv/bin/activate && pip install -e vendor/discord-ext-voice-recv --force-reinstall
pytest tests/test_voice_adapter_contract.py -v # MUST PASS — contract guard
pytest tests/test_voice_adapter_contract.py tests/test_voice_recv_dave.py -v # MUST PASS — contract + DAVE guards
```
Update this file's `Pinned commit` after a successful upgrade.

View File

@@ -17,4 +17,4 @@ __title__ = 'discord.ext.voice_recv'
__author__ = 'Imayhaveborkedit'
__license__ = 'MIT'
__copyright__ = 'Copyright 2021-present Imayhaveborkedit'
__version__ = '0.5.3a'
__version__ = '0.5.3a+echo.dave1'

View File

@@ -19,6 +19,15 @@ try:
except ImportError as e:
raise RuntimeError("pynacl is required") from e
# Echo Core +echo.dave1 patch: DAVE E2E receive-side decrypt. See VENDOR_INFO.md.
try:
import davey
_MEDIA_TYPE_AUDIO = davey.MediaType.audio
_HAS_DAVE = True
except ImportError:
_MEDIA_TYPE_AUDIO = None
_HAS_DAVE = False
if TYPE_CHECKING:
from typing import Optional, Callable, Any, Dict, Literal, Union
@@ -133,12 +142,63 @@ class AudioReader:
def _is_ip_discovery_packet(self, data: bytes) -> bool:
return len(data) == 74 and data[1] == 0x02
def _maybe_dave_decrypt(self, rtp_packet) -> Optional[bytes]:
"""DAVE E2E layer applied after transport decrypt.
Returns the (possibly DAVE-unwrapped) opus payload, or None to drop the
packet. No-op when DAVE is inactive — non-DAVE rooms and environments
without `davey` installed pass through unchanged.
NOTE: `is_silence()` is NOT checked here. In a DAVE-active room the
transport-decrypted payload is ciphertext, so `is_silence()` (which
compares to plaintext OPUS_SILENCE ``b'\\xf8\\xff\\xfe'``) never matches.
Silence frames are handled either by davey.decrypt returning plaintext
silence (then caught at the existing post-decrypt silence check on
``decrypted_data``), or dropped via the decrypt-raises path. The
existing post-decrypt silence check continues to work because we
overwrite ``decrypted_data`` in place.
"""
if not _HAS_DAVE:
return rtp_packet.decrypted_data
conn = self.voice_client._connection
if getattr(conn, 'dave_protocol_version', 0) == 0:
return rtp_packet.decrypted_data
dave = getattr(conn, 'dave_session', None)
if dave is None or not dave.ready:
return rtp_packet.decrypted_data
user_id = self.voice_client._ssrc_to_id.get(rtp_packet.ssrc)
if user_id is None:
# ACCEPTED REGRESSION: davey requires per-user key. When SPEAKING
# event races behind the first audio packet, we drop 1-5 packets
# (~40-200ms) per new speaker per session.
return None
# can_passthrough(user_id) mirrors Discord's protocol: when this user's
# decryptor is in passthrough mode, packets are not DAVE-wrapped and
# must be returned as-is. Otherwise davey.decrypt unwraps DAVE E2E.
try:
if dave.can_passthrough(user_id):
return rtp_packet.decrypted_data
except Exception as e:
log.debug("can_passthrough check failed for ssrc=%s user=%s: %s: %s",
rtp_packet.ssrc, user_id, type(e).__name__, e)
try:
return dave.decrypt(user_id, _MEDIA_TYPE_AUDIO, rtp_packet.decrypted_data)
except Exception as e:
log.debug("DAVE decrypt failed for ssrc=%s user=%s: %s: %s",
rtp_packet.ssrc, user_id, type(e).__name__, e)
return None
def callback(self, packet_data: bytes) -> None:
packet = rtp_packet = rtcp_packet = None
try:
if not rtp.is_rtcp(packet_data):
packet = rtp_packet = rtp.decode_rtp(packet_data)
packet.decrypted_data = self.decryptor.decrypt_rtp(packet)
# Echo Core +echo.dave1: DAVE E2E layer (no-op when inactive).
dave_payload = self._maybe_dave_decrypt(rtp_packet)
if dave_payload is None:
return # drop packet, do not feed_rtp; reader thread stays alive
rtp_packet.decrypted_data = dave_payload
else:
packet = rtcp_packet = rtp.decode_rtcp(self.decryptor.decrypt_rtcp(packet_data))