#!/usr/bin/env python3 """Voice latency spike benchmark — BLOCKING Pas 1 pentru voice-to-voice Discord. Confirmă (sau infirmă) budget-ul STT p50 <1.5s pe hardware-ul curent. Generează audio RO via Supertonic la :7788, rulează faster-whisper pe sample-uri, raportează p50/p95 per model. Decision logic: small.p50 < 1.5s → PASS (use small) small fail, tiny.p50 < 1.5s → FALLBACK_TINY (use tiny, document trade-off) ambele fail → FAIL (re-plan model sau hardware) Output: tools/voice_bench_results.json — raw per-utterance + summary tasks/voice-bench-results.md — sumar uman cu decizie + recomandări exit 0 (PASS/FALLBACK_TINY) sau 1 (FAIL) Usage: python3 tools/voice_bench.py python3 tools/voice_bench.py --models small,tiny --trials 3 --budget-s 1.5 """ from __future__ import annotations import argparse import json import os import statistics import sys import tempfile import time from dataclasses import dataclass, field from pathlib import Path from typing import Any import httpx PROJECT_ROOT = Path(__file__).resolve().parent.parent SUPERTONIC_URL = "http://127.0.0.1:7788" DEFAULT_BUDGET_S = 1.5 DEFAULT_MODELS = ("small", "tiny") DEFAULT_TRIALS = 3 RESULTS_JSON = PROJECT_ROOT / "tools" / "voice_bench_results.json" RESULTS_MD = PROJECT_ROOT / "tasks" / "voice-bench-results.md" UTTERANCES_RO: list[tuple[str, str]] = [ ("short", "Salut, ce mai faci?"), ("conversational", "Stai puțin să mă gândesc la asta."), ("medium", "Am verificat în calendar și avem ședință cu echipa la trei după-amiază."), ("numbers", "Costul total este o sută douăzeci și trei de lei și cincizeci de bani."), ("question", "Marius, vrei să-ți pun pe agenda de mâine să suni la NOAA?"), ("longer", "Vreau să-mi reamintești diseară să verific dacă scriptul de backup a rulat corect și să trimit raportul către echipă."), ] @dataclass class SampleResult: name: str text: str wav_path: str audio_duration_s: float transcribe_latencies_s: list[float] = field(default_factory=list) transcribed_text: str = "" @property def median_latency_s(self) -> float: return statistics.median(self.transcribe_latencies_s) if self.transcribe_latencies_s else float("inf") @property def real_time_factor(self) -> float: if not self.audio_duration_s: return float("inf") return self.median_latency_s / self.audio_duration_s @dataclass class ModelSummary: model: str sample_results: list[SampleResult] load_time_s: float cpu_threads: int @property def all_latencies(self) -> list[float]: out: list[float] = [] for s in self.sample_results: out.extend(s.transcribe_latencies_s) return out @property def p50_s(self) -> float: lat = self.all_latencies return statistics.median(lat) if lat else float("inf") @property def p95_s(self) -> float: lat = sorted(self.all_latencies) if not lat: return float("inf") idx = max(0, int(round(0.95 * (len(lat) - 1)))) return lat[idx] @property def mean_rtf(self) -> float: rtfs = [s.real_time_factor for s in self.sample_results] return statistics.mean(rtfs) if rtfs else float("inf") def log(msg: str) -> None: print(f"[voice_bench] {msg}", flush=True) def check_supertonic() -> None: try: r = httpx.post( f"{SUPERTONIC_URL}/v1/audio/speech", json={"model": "supertonic-3", "input": "test", "voice": "M2", "response_format": "wav", "lang": "ro"}, timeout=10.0, ) r.raise_for_status() except Exception as e: log(f"FATAL: Supertonic la {SUPERTONIC_URL} nu răspunde: {e}") log("Pornește cu: systemctl --user start supertonic-tts") sys.exit(2) def synthesize_sample(name: str, text: str, out_dir: Path) -> tuple[Path, float]: """TTS la WAV + probe duration cu wave module (no ffmpeg dep).""" import wave out_path = out_dir / f"{name}.wav" r = httpx.post( f"{SUPERTONIC_URL}/v1/audio/speech", json={"model": "supertonic-3", "input": text, "voice": "M2", "response_format": "wav", "lang": "ro"}, timeout=60.0, ) r.raise_for_status() out_path.write_bytes(r.content) with wave.open(str(out_path), "rb") as wf: duration = wf.getnframes() / float(wf.getframerate()) return out_path, duration def benchmark_model(model_name: str, samples: list[SampleResult], trials: int, threads: int) -> ModelSummary: from faster_whisper import WhisperModel log(f"Loading model '{model_name}' (compute_type=int8, threads={threads})…") t0 = time.perf_counter() model = WhisperModel(model_name, device="cpu", compute_type="int8", cpu_threads=threads) load_time = time.perf_counter() - t0 log(f" loaded in {load_time:.2f}s") for sample in samples: log(f" → '{sample.name}' ({sample.audio_duration_s:.2f}s audio) ×{trials} trials") for trial in range(trials): t0 = time.perf_counter() segments, _info = model.transcribe( sample.wav_path, language="ro", beam_size=1, vad_filter=False, without_timestamps=True, ) text = " ".join(seg.text.strip() for seg in segments) latency = time.perf_counter() - t0 sample.transcribe_latencies_s.append(latency) if trial == 0: sample.transcribed_text = text.strip() log(f" trial {trial+1}: {latency:.2f}s → \"{text.strip()[:70]}\"") return ModelSummary(model=model_name, sample_results=samples, load_time_s=load_time, cpu_threads=threads) def decide(summaries: dict[str, ModelSummary], budget_s: float) -> tuple[str, str]: """Returns (decision, rationale).""" small = summaries.get("small") tiny = summaries.get("tiny") if small and small.p50_s < budget_s: return "PASS", ( f"small.p50={small.p50_s:.2f}s < budget {budget_s:.2f}s. " f"Folosește 'small'. RTF mediu {small.mean_rtf:.2f}." ) if tiny and tiny.p50_s < budget_s: small_p50 = small.p50_s if small else float("inf") return "FALLBACK_TINY", ( f"small.p50={small_p50:.2f}s >= budget; " f"tiny.p50={tiny.p50_s:.2f}s < budget {budget_s:.2f}s. " f"Document fallback la 'tiny' în plan (accuracy mai slabă, latency OK)." ) small_p50 = small.p50_s if small else float("inf") tiny_p50 = tiny.p50_s if tiny else float("inf") return "FAIL", ( f"Ambele modele depășesc budget-ul {budget_s:.2f}s " f"(small.p50={small_p50:.2f}s, tiny.p50={tiny_p50:.2f}s). " f"Re-plan: model extern (Groq/Deepgram), upgrade hardware, sau " f"acceptă latență mai mare." ) def write_json(summaries: dict[str, ModelSummary], decision: str, rationale: str, budget_s: float, trials: int) -> None: payload: dict[str, Any] = { "schema_version": 1, "timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "decision": decision, "rationale": rationale, "budget_s": budget_s, "trials_per_sample": trials, "models": {}, } for name, s in summaries.items(): payload["models"][name] = { "p50_s": round(s.p50_s, 3), "p95_s": round(s.p95_s, 3), "mean_rtf": round(s.mean_rtf, 3), "load_time_s": round(s.load_time_s, 3), "cpu_threads": s.cpu_threads, "samples": [ { "name": sr.name, "text_in": sr.text, "text_out": sr.transcribed_text, "audio_duration_s": round(sr.audio_duration_s, 3), "latencies_s": [round(x, 3) for x in sr.transcribe_latencies_s], "median_latency_s": round(sr.median_latency_s, 3), "rtf": round(sr.real_time_factor, 3), } for sr in s.sample_results ], } RESULTS_JSON.parent.mkdir(parents=True, exist_ok=True) RESULTS_JSON.write_text(json.dumps(payload, indent=2, ensure_ascii=False)) log(f"Wrote {RESULTS_JSON}") def write_markdown(summaries: dict[str, ModelSummary], decision: str, rationale: str, budget_s: float, trials: int) -> None: lines: list[str] = [] lines.append("# Voice Bench Results — Discord Voice-to-Voice Spike") lines.append("") lines.append(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}") lines.append(f"Budget: STT p50 < {budget_s:.2f}s (per CEO plan + eng review)") lines.append(f"Trials per sample: {trials}") lines.append("") lines.append(f"## Decision: **{decision}**") lines.append("") lines.append(rationale) lines.append("") lines.append("## Per-Model Summary") lines.append("") lines.append("| Model | p50 (s) | p95 (s) | Mean RTF | Load (s) | Threads |") lines.append("|-------|--------:|--------:|---------:|---------:|--------:|") for name, s in summaries.items(): pass_mark = "PASS" if s.p50_s < budget_s else "FAIL" lines.append( f"| {name} | {s.p50_s:.2f} ({pass_mark}) | {s.p95_s:.2f} | " f"{s.mean_rtf:.2f} | {s.load_time_s:.2f} | {s.cpu_threads} |" ) lines.append("") lines.append("## Per-Utterance Detail") lines.append("") for name, s in summaries.items(): lines.append(f"### {name}") lines.append("") lines.append("| Sample | Audio (s) | Median lat (s) | RTF | Trials | Transcript |") lines.append("|--------|----------:|---------------:|----:|--------|------------|") for sr in s.sample_results: trials_str = ", ".join(f"{x:.2f}" for x in sr.transcribe_latencies_s) transcript = sr.transcribed_text[:80].replace("|", "\\|") lines.append( f"| {sr.name} | {sr.audio_duration_s:.2f} | {sr.median_latency_s:.2f} | " f"{sr.real_time_factor:.2f} | {trials_str} | {transcript} |" ) lines.append("") lines.append("## Hardware Context") lines.append("") try: import platform import multiprocessing lines.append(f"- Platform: {platform.platform()}") lines.append(f"- CPU count (logical): {multiprocessing.cpu_count()}") except Exception: pass try: with open("/proc/cpuinfo") as f: model_lines = [ln for ln in f.read().split("\n") if "model name" in ln] if model_lines: lines.append(f"- {model_lines[0].strip()}") except Exception: pass try: with open("/proc/meminfo") as f: for ln in f.read().split("\n")[:3]: lines.append(f"- {ln.strip()}") except Exception: pass lines.append("") lines.append("## Raw Data") lines.append("") lines.append(f"Vezi `{RESULTS_JSON.relative_to(PROJECT_ROOT)}` pentru JSON complet.") lines.append("") RESULTS_MD.parent.mkdir(parents=True, exist_ok=True) RESULTS_MD.write_text("\n".join(lines)) log(f"Wrote {RESULTS_MD}") def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--models", default=",".join(DEFAULT_MODELS), help="CSV listă de modele faster-whisper (default: small,tiny)") ap.add_argument("--trials", type=int, default=DEFAULT_TRIALS, help=f"Trials per sample (default {DEFAULT_TRIALS})") ap.add_argument("--budget-s", type=float, default=DEFAULT_BUDGET_S, help=f"STT p50 budget secunde (default {DEFAULT_BUDGET_S})") ap.add_argument("--threads", type=int, default=int(os.environ.get("VOICE_BENCH_THREADS", "2")), help="cpu_threads pentru faster-whisper (default 2 — Proxmox VM)") ap.add_argument("--keep-wavs", action="store_true", help="Nu șterge WAV-urile temp") args = ap.parse_args() log(f"Budget: p50 < {args.budget_s:.2f}s | Models: {args.models} | Trials: {args.trials}") check_supertonic() work_dir = Path(tempfile.mkdtemp(prefix="voice_bench_")) log(f"Working dir: {work_dir}") log("Stage 1/3: Generating RO audio samples via Supertonic…") samples: list[SampleResult] = [] for name, text in UTTERANCES_RO: log(f" TTS '{name}': {text!r}") path, duration = synthesize_sample(name, text, work_dir) log(f" → {path.name} ({duration:.2f}s)") samples.append(SampleResult(name=name, text=text, wav_path=str(path), audio_duration_s=duration)) log("Stage 2/3: Running faster-whisper benchmarks…") summaries: dict[str, ModelSummary] = {} for model_name in args.models.split(","): model_name = model_name.strip() if not model_name: continue fresh_samples = [ SampleResult(name=s.name, text=s.text, wav_path=s.wav_path, audio_duration_s=s.audio_duration_s) for s in samples ] summaries[model_name] = benchmark_model(model_name, fresh_samples, args.trials, args.threads) log("Stage 3/3: Decision & artifacts…") decision, rationale = decide(summaries, args.budget_s) log(f"DECISION: {decision}") log(f"WHY: {rationale}") write_json(summaries, decision, rationale, args.budget_s, args.trials) write_markdown(summaries, decision, rationale, args.budget_s, args.trials) if not args.keep_wavs: for s in samples: try: Path(s.wav_path).unlink(missing_ok=True) except Exception: pass try: work_dir.rmdir() except Exception: pass return 0 if decision in ("PASS", "FALLBACK_TINY") else 1 if __name__ == "__main__": sys.exit(main())