Files
echo-core/src/adapters/discord_voice.py
Marius Mutu e4f3177fc1 feat(voice): DAVE E2E receive-side decrypt — unblocks Pas 12
Vendored fork: discord-ext-voice_recv 0.5.3a+echo.dave1

Patches the receive pipeline to handle Discord's mandatory DAVE E2E
encryption on voice gateway v=8. Without this, opus_decode raised
"corrupted stream" on every received packet in a DAVE-active room and
voice-to-voice never connected.

DAVE patch (vendor/discord-ext-voice-recv/reader.py):
- `_maybe_dave_decrypt(rtp_packet)`: gate mirrors discord.py 2.7.1
  `voice_state.can_encrypt`. Uses davey's `can_passthrough(user_id)` to
  branch — peers in passthrough send transport-only packets that pass
  through verbatim; peers in DAVE epoch go through `davey.decrypt`.
- Hooked in `callback()` between transport decrypt and feed_rtp;
  drops on decrypt failure without killing the reader thread.
- Bumps __version__ to '0.5.3a+echo.dave1' (PEP 440 local segment) so a
  contract test can fail fast on accidental upstream-sync overwrite.

Pipeline fixes uncovered while testing DAVE end-to-end:
- src/voice/pipeline.py: silero-vad v6+ requires exactly 512 samples per
  call at 16kHz; our 100ms window (1600 samples) was silently raising
  ValueError → VAD always returned False → STT never fired. Slice the
  window into 512-sample chunks. Bump whisper beam_size 1→5 and add a
  Romanian `initial_prompt` — transcriptions go from "Eco salt." gibberish
  to "Echo, salutare, te rog spune-mi cât este ora."
- src/voice/tts_stream.py: EchoStreamingAudioSource.read() returns a 20ms
  silence frame instead of b'' on empty queue. Empty return is treated
  by Discord as end-of-stream and kills the player, so any TTS pushed
  later would be silently discarded.
- src/adapters/discord_voice.py: actually attach EchoStreamingAudioSource
  to the voice client after the wakeup beep (chained via `after=`),
  which was missing entirely — TTS frames had no consumer.

Tests:
- tests/test_voice_recv_dave.py: 11 unit + callback integration tests
  covering bypass paths, can_passthrough gate, decrypt error handling.
- tests/test_voice_adapter_contract.py: +test_voice_recv_fork_version
  and +test_voice_connection_state_has_dave_attrs guards against
  upstream drift on either side.

Config:
- config.json: voice.allowed_user_ids whitelist for Marius's user id.

Status: voice-to-voice loop closes end-to-end (DAVE → VAD → Whisper →
Claude → Supertonic → audio out). Latency is ~8-13s per turn, which is
out of scope for this commit — see TODOS.md for the real-time UX
follow-up plan.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 19:48:36 +00:00

323 lines
12 KiB
Python

"""Discord voice slash commands (Pas 7 — CONVERGENCE wiring).
Registers the `/voice` slash command group on the existing CommandTree and
exposes an async `warmup_models()` for eager model load at bot startup.
Owns nothing in `src/voice/*` — purely the Discord-facing wiring. Defers
heavy lifting to:
- ``src.voice.pipeline.VoiceSession`` — per-guild session state machine
- ``src.voice.pipeline.EchoVoiceSink`` — discord-ext-voice-recv sink
- ``src.voice.tts_stream.TTSQueue`` / ``EchoStreamingAudioSource``
- ``src.voice._discord_voice_adapter.connect_voice``
"""
from __future__ import annotations
import asyncio
import logging
from typing import Optional
import discord
from discord import app_commands
# Optional DAVE dep (mandatory at runtime when discord.py 2.7.1 is paired with
# Discord voice gateway v=8; tolerated missing in tests / dev environments).
try:
import davey
_HAS_DAVE = True
except ImportError:
_HAS_DAVE = False
from src.config import Config
from src.voice.pipeline import (
VoiceSession,
EchoVoiceSink,
_get_whisper_model,
_get_silero_vad,
)
from src.voice.tts_stream import TTSQueue, EchoStreamingAudioSource
from src.voice._discord_voice_adapter import connect_voice
log = logging.getLogger("echo-core.discord.voice")
# Per-guild voice session registry. Key = guild_id.
_voice_sessions: dict[int, VoiceSession] = {}
# Set if model warmup failed; surfaces as ephemeral error on /voice join.
_voice_load_error: Optional[str] = None
# Reference to the eager warmup task created in on_ready, so /voice join can
# await it if the user is faster than the background load.
_models_warmup_future: Optional[asyncio.Task] = None
async def warmup_models() -> None:
"""Eager model load — called from `on_ready()` as a background task.
Runs the (synchronous, blocking) model loaders on a worker thread so the
event loop stays responsive. On failure, sets `_voice_load_error` instead
of raising, so `/voice join` can degrade gracefully.
"""
global _voice_load_error
try:
if not discord.opus.is_loaded():
discord.opus.load_opus("libopus.so.0")
if _HAS_DAVE:
log.info("DAVE protocol v%d available (davey %s)",
davey.DAVE_PROTOCOL_VERSION, davey.__version__)
await asyncio.to_thread(_get_whisper_model)
await asyncio.to_thread(_get_silero_vad)
log.info("Voice models warm")
except Exception as e:
_voice_load_error = f"{type(e).__name__}: {e}"
log.error("Voice models load failed: %s", _voice_load_error)
def _get_whitelist() -> set[int]:
"""Read `voice.allowed_user_ids` from config and coerce to int set.
Re-reads config from disk to pick up any runtime edits between bot start
and /voice join.
"""
try:
raw = Config().get("voice.allowed_user_ids", [])
except Exception:
raw = []
out: set[int] = set()
for v in raw or []:
try:
out.add(int(v))
except (TypeError, ValueError):
continue
return out
def _get_default_voice() -> str:
try:
return Config().get("voice.default_voice", "M2") or "M2"
except Exception:
return "M2"
def register(tree: app_commands.CommandTree, bot: discord.Client) -> app_commands.Group:
"""Build the `/voice` slash command group and return it (caller registers)."""
voice_group = app_commands.Group(
name="voice", description="Echo Core voice channel"
)
@voice_group.command(name="join", description="Echo intră în voice channel-ul tău")
async def join(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
if _voice_load_error:
await interaction.followup.send(
f"Voice unavailable: {_voice_load_error}", ephemeral=True
)
return
if _models_warmup_future is not None and not _models_warmup_future.done():
try:
await _models_warmup_future
except Exception as e:
await interaction.followup.send(
f"Voice unavailable: {type(e).__name__}: {e}", ephemeral=True
)
return
user = interaction.user
if not isinstance(user, discord.Member) or user.voice is None or user.voice.channel is None:
await interaction.followup.send(
"Intră într-un voice channel întâi.", ephemeral=True
)
return
channel = user.voice.channel
whitelist = _get_whitelist()
if user.id not in whitelist:
await interaction.followup.send(
"Nu ești pe whitelist voice.", ephemeral=True
)
return
# Reject double-join on the same guild.
guild_id = channel.guild.id
if guild_id in _voice_sessions:
await interaction.followup.send(
"Sunt deja în voice pe acest server. Folosește /voice leave întâi.",
ephemeral=True,
)
return
# Connect
try:
vc = await connect_voice(channel)
except Exception as e:
log.exception("connect_voice failed")
await interaction.followup.send(
f"Conectare eșuată: {type(e).__name__}: {e}", ephemeral=True
)
return
# Build TTS queue + session
ttsq = TTSQueue(voice_id=_get_default_voice(), lang="ro")
ttsq.start()
try:
session = VoiceSession(
channel_id=channel.id,
guild_id=guild_id,
voice_client=vc,
text_channel=interaction.channel,
record_enabled=False,
mirror_enabled=True,
whitelist=whitelist,
ttsq=ttsq,
bot=bot,
loop=asyncio.get_running_loop(),
)
except Exception as e:
log.exception("VoiceSession construction failed")
ttsq.stop()
try:
await vc.disconnect(force=True)
except Exception:
pass
await interaction.followup.send(
f"Sesiune voice eșuată: {type(e).__name__}: {e}", ephemeral=True
)
return
_voice_sessions[guild_id] = session
# Start TTS streaming source for the entire session. Chain the
# wake-up beep via `after=` so streaming takes over when beep ends.
def _start_stream(error: Optional[Exception] = None) -> None:
if error is not None:
log.warning("Beep playback ended with error: %s", error)
try:
vc.play(EchoStreamingAudioSource(ttsq))
log.info("TTS streaming source attached")
except Exception:
log.exception("EchoStreamingAudioSource attach failed")
try:
vc.play(
discord.FFmpegPCMAudio("assets/voice/beep_200ms.wav"),
after=_start_stream,
)
except Exception:
log.warning("Beep playback skipped, starting stream directly", exc_info=True)
_start_stream()
# Attach sink
try:
bot_user_id = int(bot.user.id) if bot.user is not None else 0
sink = EchoVoiceSink(session=session, bot_user_id=bot_user_id)
vc.listen(sink)
except Exception as e:
log.exception("Sink attach failed")
_voice_sessions.pop(guild_id, None)
try:
session.cleanup("sink_attach_failed")
except Exception:
pass
await interaction.followup.send(
f"Atașare sink eșuată: {type(e).__name__}: {e}", ephemeral=True
)
return
# Presence
try:
await bot.change_presence(activity=discord.Activity(
type=discord.ActivityType.listening,
name=f"{user.display_name} în #{channel.name}",
))
except Exception:
log.warning("Presence update skipped", exc_info=True)
await interaction.followup.send(
f"În voce în #{channel.name}.", ephemeral=True
)
@voice_group.command(name="leave", description="Echo iese din voice channel")
async def leave(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
guild_id = interaction.guild.id if interaction.guild else None
session = _voice_sessions.pop(guild_id, None) if guild_id is not None else None
if session is None:
await interaction.followup.send(
"Nu sunt în niciun voice channel aici.", ephemeral=True
)
return
try:
session.cleanup("user_leave")
except Exception:
log.exception("session.cleanup raised")
try:
await bot.change_presence(activity=None)
except Exception:
log.warning("Presence reset skipped", exc_info=True)
await interaction.followup.send("Plecat.", ephemeral=True)
@voice_group.command(name="doctor", description="Verifică voice stack")
async def doctor(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
checks: list[tuple[str, bool]] = []
# libopus
try:
checks.append(("libopus", bool(discord.opus.is_loaded())))
except Exception:
checks.append(("libopus", False))
# warmup
checks.append(("voice load error", _voice_load_error is None))
# Build response
lines = ["**Voice doctor:**"]
for label, ok in checks:
lines.append(f"{'OK' if ok else 'FAIL'}{label}")
if _voice_load_error:
lines.append(f" details: {_voice_load_error}")
await interaction.followup.send("\n".join(lines), ephemeral=True)
# --- /voice mirror on|off ---
mirror_group = app_commands.Group(
name="mirror", description="Text mirror", parent=voice_group
)
@mirror_group.command(name="on", description="Activează text mirror în canal")
async def mirror_on(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
guild_id = interaction.guild.id if interaction.guild else None
s = _voice_sessions.get(guild_id) if guild_id is not None else None
if s is None:
await interaction.followup.send("Nu sunt în voice.", ephemeral=True)
return
s.mirror_enabled = True
await interaction.followup.send("Mirror ON.", ephemeral=True)
@mirror_group.command(name="off", description="Dezactivează text mirror")
async def mirror_off(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
guild_id = interaction.guild.id if interaction.guild else None
s = _voice_sessions.get(guild_id) if guild_id is not None else None
if s is None:
await interaction.followup.send("Nu sunt în voice.", ephemeral=True)
return
s.mirror_enabled = False
await interaction.followup.send("Mirror OFF.", ephemeral=True)
# --- /voice record on|off ---
record_group = app_commands.Group(
name="record", description="KB recording", parent=voice_group
)
@record_group.command(name="on", description="Activează înregistrare în KB")
async def record_on(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
guild_id = interaction.guild.id if interaction.guild else None
s = _voice_sessions.get(guild_id) if guild_id is not None else None
if s is None:
await interaction.followup.send("Nu sunt în voice.", ephemeral=True)
return
s.record_enabled = True
await interaction.followup.send("Record ON.", ephemeral=True)
@record_group.command(name="off", description="Dezactivează înregistrare")
async def record_off(interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
guild_id = interaction.guild.id if interaction.guild else None
s = _voice_sessions.get(guild_id) if guild_id is not None else None
if s is None:
await interaction.followup.send("Nu sunt în voice.", ephemeral=True)
return
s.record_enabled = False
await interaction.followup.send("Record OFF.", ephemeral=True)
return voice_group