Files
echo-core/tools/voice_bench.py
Marius Mutu c6d11bdf9f chore(voice): spike STT latency benchmark + HT contention lesson
Pas 1 (BLOCKING) din Discord voice-to-voice test plan. Sweet spot empiric
pe i7-6700T: faster-whisper small int8 @ cpu_threads=4 → p50 2.25s,
p95 2.64s, mean RTF 0.46. Curba HT: 2t=3.25s → 4t=2.25s (sweet) →
6t=2.79s (regres +24% prin contention). tiny respinge — halucinează RO.

- tools/voice_bench.py: harness benchmark cu 8 sample-uri RO sintetizate
  via Supertonic API, măsoară p50/p95/RTF pentru small+tiny pe N threads.
- tools/voice_bench_results*.json: raw output 3 pass-uri (threads 2/4/6).
- tasks/voice-bench-results*.md: summary markdown per pass.
- tasks/lessons.md: HT contention rule — cpu_threads = physical cores,
  rulează sweep nu single-point pentru ML inference compute-bound.

Budget updated în plan-uri: STT p50 1.5s → 2.5s, perceived 4s → 5s p50.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 12:52:11 +00:00

376 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Voice latency spike benchmark — BLOCKING Pas 1 pentru voice-to-voice Discord.
Confirmă (sau infirmă) budget-ul STT p50 <1.5s pe hardware-ul curent.
Generează audio RO via Supertonic la :7788, rulează faster-whisper pe sample-uri,
raportează p50/p95 per model.
Decision logic:
small.p50 < 1.5s → PASS (use small)
small fail, tiny.p50 < 1.5s → FALLBACK_TINY (use tiny, document trade-off)
ambele fail → FAIL (re-plan model sau hardware)
Output:
tools/voice_bench_results.json — raw per-utterance + summary
tasks/voice-bench-results.md — sumar uman cu decizie + recomandări
exit 0 (PASS/FALLBACK_TINY) sau 1 (FAIL)
Usage:
python3 tools/voice_bench.py
python3 tools/voice_bench.py --models small,tiny --trials 3 --budget-s 1.5
"""
from __future__ import annotations
import argparse
import json
import os
import statistics
import sys
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import httpx
PROJECT_ROOT = Path(__file__).resolve().parent.parent
SUPERTONIC_URL = "http://127.0.0.1:7788"
DEFAULT_BUDGET_S = 1.5
DEFAULT_MODELS = ("small", "tiny")
DEFAULT_TRIALS = 3
RESULTS_JSON = PROJECT_ROOT / "tools" / "voice_bench_results.json"
RESULTS_MD = PROJECT_ROOT / "tasks" / "voice-bench-results.md"
UTTERANCES_RO: list[tuple[str, str]] = [
("short", "Salut, ce mai faci?"),
("conversational", "Stai puțin să mă gândesc la asta."),
("medium", "Am verificat în calendar și avem ședință cu echipa la trei după-amiază."),
("numbers", "Costul total este o sută douăzeci și trei de lei și cincizeci de bani."),
("question", "Marius, vrei să-ți pun pe agenda de mâine să suni la NOAA?"),
("longer", "Vreau să-mi reamintești diseară să verific dacă scriptul de backup a rulat corect și să trimit raportul către echipă."),
]
@dataclass
class SampleResult:
name: str
text: str
wav_path: str
audio_duration_s: float
transcribe_latencies_s: list[float] = field(default_factory=list)
transcribed_text: str = ""
@property
def median_latency_s(self) -> float:
return statistics.median(self.transcribe_latencies_s) if self.transcribe_latencies_s else float("inf")
@property
def real_time_factor(self) -> float:
if not self.audio_duration_s:
return float("inf")
return self.median_latency_s / self.audio_duration_s
@dataclass
class ModelSummary:
model: str
sample_results: list[SampleResult]
load_time_s: float
cpu_threads: int
@property
def all_latencies(self) -> list[float]:
out: list[float] = []
for s in self.sample_results:
out.extend(s.transcribe_latencies_s)
return out
@property
def p50_s(self) -> float:
lat = self.all_latencies
return statistics.median(lat) if lat else float("inf")
@property
def p95_s(self) -> float:
lat = sorted(self.all_latencies)
if not lat:
return float("inf")
idx = max(0, int(round(0.95 * (len(lat) - 1))))
return lat[idx]
@property
def mean_rtf(self) -> float:
rtfs = [s.real_time_factor for s in self.sample_results]
return statistics.mean(rtfs) if rtfs else float("inf")
def log(msg: str) -> None:
print(f"[voice_bench] {msg}", flush=True)
def check_supertonic() -> None:
try:
r = httpx.post(
f"{SUPERTONIC_URL}/v1/audio/speech",
json={"model": "supertonic-3", "input": "test", "voice": "M2",
"response_format": "wav", "lang": "ro"},
timeout=10.0,
)
r.raise_for_status()
except Exception as e:
log(f"FATAL: Supertonic la {SUPERTONIC_URL} nu răspunde: {e}")
log("Pornește cu: systemctl --user start supertonic-tts")
sys.exit(2)
def synthesize_sample(name: str, text: str, out_dir: Path) -> tuple[Path, float]:
"""TTS la WAV + probe duration cu wave module (no ffmpeg dep)."""
import wave
out_path = out_dir / f"{name}.wav"
r = httpx.post(
f"{SUPERTONIC_URL}/v1/audio/speech",
json={"model": "supertonic-3", "input": text, "voice": "M2",
"response_format": "wav", "lang": "ro"},
timeout=60.0,
)
r.raise_for_status()
out_path.write_bytes(r.content)
with wave.open(str(out_path), "rb") as wf:
duration = wf.getnframes() / float(wf.getframerate())
return out_path, duration
def benchmark_model(model_name: str, samples: list[SampleResult], trials: int, threads: int) -> ModelSummary:
from faster_whisper import WhisperModel
log(f"Loading model '{model_name}' (compute_type=int8, threads={threads})…")
t0 = time.perf_counter()
model = WhisperModel(model_name, device="cpu", compute_type="int8", cpu_threads=threads)
load_time = time.perf_counter() - t0
log(f" loaded in {load_time:.2f}s")
for sample in samples:
log(f"'{sample.name}' ({sample.audio_duration_s:.2f}s audio) ×{trials} trials")
for trial in range(trials):
t0 = time.perf_counter()
segments, _info = model.transcribe(
sample.wav_path,
language="ro",
beam_size=1,
vad_filter=False,
without_timestamps=True,
)
text = " ".join(seg.text.strip() for seg in segments)
latency = time.perf_counter() - t0
sample.transcribe_latencies_s.append(latency)
if trial == 0:
sample.transcribed_text = text.strip()
log(f" trial {trial+1}: {latency:.2f}s → \"{text.strip()[:70]}\"")
return ModelSummary(model=model_name, sample_results=samples, load_time_s=load_time, cpu_threads=threads)
def decide(summaries: dict[str, ModelSummary], budget_s: float) -> tuple[str, str]:
"""Returns (decision, rationale)."""
small = summaries.get("small")
tiny = summaries.get("tiny")
if small and small.p50_s < budget_s:
return "PASS", (
f"small.p50={small.p50_s:.2f}s < budget {budget_s:.2f}s. "
f"Folosește 'small'. RTF mediu {small.mean_rtf:.2f}."
)
if tiny and tiny.p50_s < budget_s:
small_p50 = small.p50_s if small else float("inf")
return "FALLBACK_TINY", (
f"small.p50={small_p50:.2f}s >= budget; "
f"tiny.p50={tiny.p50_s:.2f}s < budget {budget_s:.2f}s. "
f"Document fallback la 'tiny' în plan (accuracy mai slabă, latency OK)."
)
small_p50 = small.p50_s if small else float("inf")
tiny_p50 = tiny.p50_s if tiny else float("inf")
return "FAIL", (
f"Ambele modele depășesc budget-ul {budget_s:.2f}s "
f"(small.p50={small_p50:.2f}s, tiny.p50={tiny_p50:.2f}s). "
f"Re-plan: model extern (Groq/Deepgram), upgrade hardware, sau "
f"acceptă latență mai mare."
)
def write_json(summaries: dict[str, ModelSummary], decision: str, rationale: str,
budget_s: float, trials: int) -> None:
payload: dict[str, Any] = {
"schema_version": 1,
"timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"decision": decision,
"rationale": rationale,
"budget_s": budget_s,
"trials_per_sample": trials,
"models": {},
}
for name, s in summaries.items():
payload["models"][name] = {
"p50_s": round(s.p50_s, 3),
"p95_s": round(s.p95_s, 3),
"mean_rtf": round(s.mean_rtf, 3),
"load_time_s": round(s.load_time_s, 3),
"cpu_threads": s.cpu_threads,
"samples": [
{
"name": sr.name,
"text_in": sr.text,
"text_out": sr.transcribed_text,
"audio_duration_s": round(sr.audio_duration_s, 3),
"latencies_s": [round(x, 3) for x in sr.transcribe_latencies_s],
"median_latency_s": round(sr.median_latency_s, 3),
"rtf": round(sr.real_time_factor, 3),
}
for sr in s.sample_results
],
}
RESULTS_JSON.parent.mkdir(parents=True, exist_ok=True)
RESULTS_JSON.write_text(json.dumps(payload, indent=2, ensure_ascii=False))
log(f"Wrote {RESULTS_JSON}")
def write_markdown(summaries: dict[str, ModelSummary], decision: str, rationale: str,
budget_s: float, trials: int) -> None:
lines: list[str] = []
lines.append("# Voice Bench Results — Discord Voice-to-Voice Spike")
lines.append("")
lines.append(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}")
lines.append(f"Budget: STT p50 < {budget_s:.2f}s (per CEO plan + eng review)")
lines.append(f"Trials per sample: {trials}")
lines.append("")
lines.append(f"## Decision: **{decision}**")
lines.append("")
lines.append(rationale)
lines.append("")
lines.append("## Per-Model Summary")
lines.append("")
lines.append("| Model | p50 (s) | p95 (s) | Mean RTF | Load (s) | Threads |")
lines.append("|-------|--------:|--------:|---------:|---------:|--------:|")
for name, s in summaries.items():
pass_mark = "PASS" if s.p50_s < budget_s else "FAIL"
lines.append(
f"| {name} | {s.p50_s:.2f} ({pass_mark}) | {s.p95_s:.2f} | "
f"{s.mean_rtf:.2f} | {s.load_time_s:.2f} | {s.cpu_threads} |"
)
lines.append("")
lines.append("## Per-Utterance Detail")
lines.append("")
for name, s in summaries.items():
lines.append(f"### {name}")
lines.append("")
lines.append("| Sample | Audio (s) | Median lat (s) | RTF | Trials | Transcript |")
lines.append("|--------|----------:|---------------:|----:|--------|------------|")
for sr in s.sample_results:
trials_str = ", ".join(f"{x:.2f}" for x in sr.transcribe_latencies_s)
transcript = sr.transcribed_text[:80].replace("|", "\\|")
lines.append(
f"| {sr.name} | {sr.audio_duration_s:.2f} | {sr.median_latency_s:.2f} | "
f"{sr.real_time_factor:.2f} | {trials_str} | {transcript} |"
)
lines.append("")
lines.append("## Hardware Context")
lines.append("")
try:
import platform
import multiprocessing
lines.append(f"- Platform: {platform.platform()}")
lines.append(f"- CPU count (logical): {multiprocessing.cpu_count()}")
except Exception:
pass
try:
with open("/proc/cpuinfo") as f:
model_lines = [ln for ln in f.read().split("\n") if "model name" in ln]
if model_lines:
lines.append(f"- {model_lines[0].strip()}")
except Exception:
pass
try:
with open("/proc/meminfo") as f:
for ln in f.read().split("\n")[:3]:
lines.append(f"- {ln.strip()}")
except Exception:
pass
lines.append("")
lines.append("## Raw Data")
lines.append("")
lines.append(f"Vezi `{RESULTS_JSON.relative_to(PROJECT_ROOT)}` pentru JSON complet.")
lines.append("")
RESULTS_MD.parent.mkdir(parents=True, exist_ok=True)
RESULTS_MD.write_text("\n".join(lines))
log(f"Wrote {RESULTS_MD}")
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--models", default=",".join(DEFAULT_MODELS),
help="CSV listă de modele faster-whisper (default: small,tiny)")
ap.add_argument("--trials", type=int, default=DEFAULT_TRIALS,
help=f"Trials per sample (default {DEFAULT_TRIALS})")
ap.add_argument("--budget-s", type=float, default=DEFAULT_BUDGET_S,
help=f"STT p50 budget secunde (default {DEFAULT_BUDGET_S})")
ap.add_argument("--threads", type=int, default=int(os.environ.get("VOICE_BENCH_THREADS", "2")),
help="cpu_threads pentru faster-whisper (default 2 — Proxmox VM)")
ap.add_argument("--keep-wavs", action="store_true", help="Nu șterge WAV-urile temp")
args = ap.parse_args()
log(f"Budget: p50 < {args.budget_s:.2f}s | Models: {args.models} | Trials: {args.trials}")
check_supertonic()
work_dir = Path(tempfile.mkdtemp(prefix="voice_bench_"))
log(f"Working dir: {work_dir}")
log("Stage 1/3: Generating RO audio samples via Supertonic…")
samples: list[SampleResult] = []
for name, text in UTTERANCES_RO:
log(f" TTS '{name}': {text!r}")
path, duration = synthesize_sample(name, text, work_dir)
log(f"{path.name} ({duration:.2f}s)")
samples.append(SampleResult(name=name, text=text, wav_path=str(path),
audio_duration_s=duration))
log("Stage 2/3: Running faster-whisper benchmarks…")
summaries: dict[str, ModelSummary] = {}
for model_name in args.models.split(","):
model_name = model_name.strip()
if not model_name:
continue
fresh_samples = [
SampleResult(name=s.name, text=s.text, wav_path=s.wav_path,
audio_duration_s=s.audio_duration_s)
for s in samples
]
summaries[model_name] = benchmark_model(model_name, fresh_samples,
args.trials, args.threads)
log("Stage 3/3: Decision & artifacts…")
decision, rationale = decide(summaries, args.budget_s)
log(f"DECISION: {decision}")
log(f"WHY: {rationale}")
write_json(summaries, decision, rationale, args.budget_s, args.trials)
write_markdown(summaries, decision, rationale, args.budget_s, args.trials)
if not args.keep_wavs:
for s in samples:
try:
Path(s.wav_path).unlink(missing_ok=True)
except Exception:
pass
try:
work_dir.rmdir()
except Exception:
pass
return 0 if decision in ("PASS", "FALLBACK_TINY") else 1
if __name__ == "__main__":
sys.exit(main())