- audio/Modul {N}/filename.mp3 — fiecare modul în subdirector separat
pentru copiere pe telefon și transfer între PC-uri.
- PDF-urile se păstrează ca sursă în summaries/pdf/ (fără extract txt).
- transcribe_status="pdf_source_only" pentru lecțiile PDF → summarize.py
le filtrează automat.
- Fix coliziune manifest transcript_path (stem-based, nu preserve prior).
- .bat per modul (M2-M8) + dispatchers run_pc1_all (M2-M5) + run_pc2_all
(M6-M8) pentru partajare work pe 2 PC-uri.
- prepare_pc2_bundle.py: zip cu scripts + manifest + .env + PDFs pentru
PC2 (self-installs whisper.cpp/model/ffmpeg la primul run).
- M1 whisper complete (49/49 audio+vimeo transcrise).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
287 lines
9.3 KiB
Python
287 lines
9.3 KiB
Python
"""
|
|
Batch transcription using whisper.cpp.
|
|
Reads <root>/manifest.json, transcribes each audio file in module order,
|
|
outputs .txt and .srt files, updates manifest status.
|
|
Resumable: skips files with existing transcripts.
|
|
Converts MP3 -> WAV (16kHz mono) via ffmpeg before transcription.
|
|
|
|
Text lectures (type=="text") are skipped — their transcript files are
|
|
written directly by download.py.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from courses import course_paths, get_course, validate_manifest_course
|
|
|
|
# whisper.cpp defaults — override with env vars or CLI args.
|
|
# Shared across courses (same model + binary).
|
|
WHISPER_BIN = os.getenv("WHISPER_BIN", r"whisper-cli.exe")
|
|
WHISPER_MODEL = os.getenv("WHISPER_MODEL", r"models\ggml-medium-q5_0.bin")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler("transcribe_errors.log"),
|
|
],
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def find_ffmpeg() -> str:
|
|
if shutil.which("ffmpeg"):
|
|
return "ffmpeg"
|
|
for p in [Path("ffmpeg.exe"), Path("ffmpeg-bin/ffmpeg.exe")]:
|
|
if p.exists():
|
|
return str(p.resolve())
|
|
try:
|
|
import imageio_ffmpeg
|
|
return imageio_ffmpeg.get_ffmpeg_exe()
|
|
except ImportError:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def convert_to_wav(audio_path: str, wav_cache_dir: Path) -> str:
|
|
src = Path(audio_path)
|
|
if src.suffix.lower() == ".wav":
|
|
return audio_path
|
|
|
|
wav_cache_dir.mkdir(parents=True, exist_ok=True)
|
|
wav_path = wav_cache_dir / (src.stem + ".wav")
|
|
|
|
if wav_path.exists() and wav_path.stat().st_size > 0:
|
|
log.info(f" WAV cache hit: {wav_path}")
|
|
return str(wav_path)
|
|
|
|
ffmpeg = find_ffmpeg()
|
|
if not ffmpeg:
|
|
log.warning(" ffmpeg not found, using original file (may cause bad transcription)")
|
|
return audio_path
|
|
|
|
log.info(f" Converting to WAV: {src.name} -> {wav_path.name}")
|
|
cmd = [
|
|
ffmpeg, "-i", audio_path,
|
|
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
|
|
"-y", str(wav_path),
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
|
if result.returncode != 0:
|
|
log.error(f" ffmpeg failed: {result.stderr[:300]}")
|
|
return audio_path
|
|
log.info(f" WAV ready: {wav_path.name} ({wav_path.stat().st_size / 1_048_576:.0f} MB)")
|
|
return str(wav_path)
|
|
except FileNotFoundError:
|
|
log.warning(f" ffmpeg not found at: {ffmpeg}")
|
|
return audio_path
|
|
except subprocess.TimeoutExpired:
|
|
log.error(f" ffmpeg conversion timeout for {audio_path}")
|
|
return audio_path
|
|
|
|
|
|
def load_manifest(manifest_path: Path) -> dict:
|
|
with open(manifest_path, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_manifest(manifest: dict, manifest_path: Path):
|
|
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(manifest_path, "w", encoding="utf-8") as f:
|
|
json.dump(manifest, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def transcribe_file(audio_path: str, output_base: str) -> bool:
|
|
cmd = [
|
|
WHISPER_BIN,
|
|
"--model", WHISPER_MODEL,
|
|
"--language", "ro",
|
|
"--no-gpu",
|
|
"--threads", str(os.cpu_count() or 4),
|
|
"--beam-size", "1",
|
|
"--best-of", "1",
|
|
"--max-context", "0",
|
|
"--entropy-thold", "2.4",
|
|
"--max-len", "60",
|
|
"--suppress-nst",
|
|
"--no-fallback",
|
|
"--output-txt",
|
|
"--output-srt",
|
|
"--output-file", output_base,
|
|
"--file", audio_path,
|
|
]
|
|
|
|
log.info(f" CMD: {' '.join(cmd)}")
|
|
try:
|
|
env = os.environ.copy()
|
|
whisper_dir = str(Path(WHISPER_BIN).resolve().parent)
|
|
env["PATH"] = whisper_dir + os.pathsep + env.get("PATH", "")
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
stdout=sys.stdout,
|
|
stderr=sys.stderr,
|
|
timeout=7200,
|
|
env=env,
|
|
)
|
|
if result.returncode != 0:
|
|
log.error(f" whisper.cpp failed (exit {result.returncode})")
|
|
return False
|
|
|
|
txt_path = Path(f"{output_base}.txt")
|
|
srt_path = Path(f"{output_base}.srt")
|
|
if not txt_path.exists() or txt_path.stat().st_size == 0:
|
|
log.error(f" Empty or missing transcript: {txt_path}")
|
|
return False
|
|
|
|
log.info(f" Output: {txt_path.name} ({txt_path.stat().st_size} bytes)")
|
|
if srt_path.exists():
|
|
log.info(f" Output: {srt_path.name} ({srt_path.stat().st_size} bytes)")
|
|
return True
|
|
|
|
except subprocess.TimeoutExpired:
|
|
log.error(f" Timeout (>2h) for {audio_path}")
|
|
return False
|
|
except FileNotFoundError:
|
|
log.error(f" whisper.cpp not found at: {WHISPER_BIN}")
|
|
log.error(f" Set WHISPER_BIN env var or put whisper-cli.exe in PATH")
|
|
return False
|
|
except Exception as e:
|
|
log.error(f" Error: {e}")
|
|
return False
|
|
|
|
|
|
def parse_module_filter(arg: str) -> set[int]:
|
|
result = set()
|
|
for part in arg.split(","):
|
|
part = part.strip()
|
|
if "-" in part:
|
|
a, b = part.split("-", 1)
|
|
result.update(range(int(a), int(b) + 1))
|
|
else:
|
|
result.add(int(part))
|
|
return result
|
|
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description="Transcribe lecture audio for a course")
|
|
p.add_argument("--course", default="master", help="Course key (see courses.py)")
|
|
p.add_argument("--modules", default=None, help="Module filter, e.g. '1-3' or '2,4,5'")
|
|
return p.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
course = get_course(args.course)
|
|
paths = course_paths(course)
|
|
|
|
if not paths["manifest"].exists():
|
|
log.error(f"{paths['manifest']} not found. Run download.py --course {course['key']} first.")
|
|
sys.exit(1)
|
|
|
|
module_filter = parse_module_filter(args.modules) if args.modules else None
|
|
if module_filter:
|
|
log.info(f"Module filter: {sorted(module_filter)}")
|
|
|
|
manifest = load_manifest(paths["manifest"])
|
|
validate_manifest_course(manifest, course["key"])
|
|
paths["transcripts_dir"].mkdir(parents=True, exist_ok=True)
|
|
|
|
total = 0
|
|
transcribed = 0
|
|
skipped = 0
|
|
failed = 0
|
|
|
|
for mod_idx, mod in enumerate(manifest["modules"], 1):
|
|
if module_filter and mod_idx not in module_filter:
|
|
log.info(f"\nSkipping module {mod_idx}: {mod['name']}")
|
|
continue
|
|
log.info(f"\n{'='*60}")
|
|
log.info(f"Module: {mod['name']}")
|
|
log.info(f"{'='*60}")
|
|
|
|
for lec in mod["lectures"]:
|
|
total += 1
|
|
|
|
# Text lectures have transcript written by download.py.
|
|
if lec.get("type") == "text":
|
|
lec["transcribe_status"] = "complete"
|
|
skipped += 1
|
|
log.info(f" Skipping text: {lec['title']}")
|
|
continue
|
|
|
|
# PDF lectures are source-only (no transcript, no whisper). Preserve
|
|
# download.py's 'pdf_source_only' state so summarize.py filters them out.
|
|
if lec.get("type") == "pdf":
|
|
skipped += 1
|
|
log.info(f" Skipping pdf (source-only): {lec['title']}")
|
|
continue
|
|
|
|
if lec.get("download_status") != "complete":
|
|
log.warning(f" Skipping (not downloaded): {lec['title']}")
|
|
continue
|
|
|
|
audio_path = lec["audio_path"]
|
|
# Reuse the stem already recorded in the manifest for backward-compat
|
|
# with M1-M6 paths (strips ' [Audio]' for aresens filenames).
|
|
stem = Path(lec["original_filename"]).stem.replace(" [Audio]", "")
|
|
output_base = str(paths["transcripts_dir"] / stem)
|
|
|
|
txt_path = Path(f"{output_base}.txt")
|
|
if txt_path.exists() and txt_path.stat().st_size > 0:
|
|
lec["transcribe_status"] = "complete"
|
|
skipped += 1
|
|
log.info(f" Skipping (exists): {stem}.txt")
|
|
continue
|
|
|
|
log.info(f" Transcribing: {lec['title']}")
|
|
log.info(f" File: {audio_path}")
|
|
|
|
wav_path = convert_to_wav(audio_path, paths["wav_cache_dir"])
|
|
|
|
if transcribe_file(wav_path, output_base):
|
|
lec["transcribe_status"] = "complete"
|
|
transcribed += 1
|
|
else:
|
|
lec["transcribe_status"] = "failed"
|
|
failed += 1
|
|
|
|
save_manifest(manifest, paths["manifest"])
|
|
|
|
if mod == manifest["modules"][0] and transcribed > 0:
|
|
log.info(f"First module complete ({transcribed} files). Continuing automatically...")
|
|
|
|
empty_outputs = [
|
|
lec["title"]
|
|
for mod in manifest["modules"]
|
|
for lec in mod["lectures"]
|
|
if lec.get("transcribe_status") == "complete"
|
|
and lec.get("type") != "text"
|
|
and not Path(lec.get("transcript_path", "")).exists()
|
|
]
|
|
|
|
log.info("\n" + "=" * 60)
|
|
log.info(f"Transcribed {transcribed}/{total} files, {skipped} skipped, {failed} failures.")
|
|
log.info(f"No empty outputs: {'PASS' if not empty_outputs else 'FAIL'}")
|
|
if empty_outputs:
|
|
for t in empty_outputs:
|
|
log.error(f" Missing transcript: {t}")
|
|
log.info("=" * 60)
|
|
|
|
save_manifest(manifest, paths["manifest"])
|
|
|
|
if failed:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|