Recon-ul pe practitioner M1 arată că unele lecții n-au nici audio nici
Vimeo iframe — doar un link "Descarcă rezumat PDF" (/resurse/*.pdf).
Scraperul vechi le clasifica drept "text" și le marca failed (HTML body
avea <50 chars).
- classify_lesson: detectează acum a[href$=".pdf"] → type="pdf".
- download_pdf_and_extract: download PDF via session autentificat
(pypdf reader) → transcript .txt cu header + conținut pe pagini →
șterge PDF sursă (preferință utilizator: nu păstrez sursele).
- Branch în main loop pentru type=="pdf".
- requirements.txt: + pypdf.
- transcribe.py: skip type in ("text", "pdf") — transcript e deja scris
de download.py.
Limitări: PDF-uri cu conținut vizual (infografice, diagrame) extrag
puțin text. Titlul și textul inline sunt capturate; restul rămâne
pentru review manual.
Testat pe 4 PDF-uri M1 practitioner (Premisele NLP, Forme de Pacing,
Gesturi de calmare, Exercitiu Pacing): 3/4 extract bun (877-3068 bytes),
1/4 conținut predominant grafic (203 bytes).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
280 lines
9.0 KiB
Python
280 lines
9.0 KiB
Python
"""
|
|
Batch transcription using whisper.cpp.
|
|
Reads <root>/manifest.json, transcribes each audio file in module order,
|
|
outputs .txt and .srt files, updates manifest status.
|
|
Resumable: skips files with existing transcripts.
|
|
Converts MP3 -> WAV (16kHz mono) via ffmpeg before transcription.
|
|
|
|
Text lectures (type=="text") are skipped — their transcript files are
|
|
written directly by download.py.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from courses import course_paths, get_course, validate_manifest_course
|
|
|
|
# whisper.cpp defaults — override with env vars or CLI args.
|
|
# Shared across courses (same model + binary).
|
|
WHISPER_BIN = os.getenv("WHISPER_BIN", r"whisper-cli.exe")
|
|
WHISPER_MODEL = os.getenv("WHISPER_MODEL", r"models\ggml-medium-q5_0.bin")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler("transcribe_errors.log"),
|
|
],
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def find_ffmpeg() -> str:
|
|
if shutil.which("ffmpeg"):
|
|
return "ffmpeg"
|
|
for p in [Path("ffmpeg.exe"), Path("ffmpeg-bin/ffmpeg.exe")]:
|
|
if p.exists():
|
|
return str(p.resolve())
|
|
try:
|
|
import imageio_ffmpeg
|
|
return imageio_ffmpeg.get_ffmpeg_exe()
|
|
except ImportError:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def convert_to_wav(audio_path: str, wav_cache_dir: Path) -> str:
|
|
src = Path(audio_path)
|
|
if src.suffix.lower() == ".wav":
|
|
return audio_path
|
|
|
|
wav_cache_dir.mkdir(parents=True, exist_ok=True)
|
|
wav_path = wav_cache_dir / (src.stem + ".wav")
|
|
|
|
if wav_path.exists() and wav_path.stat().st_size > 0:
|
|
log.info(f" WAV cache hit: {wav_path}")
|
|
return str(wav_path)
|
|
|
|
ffmpeg = find_ffmpeg()
|
|
if not ffmpeg:
|
|
log.warning(" ffmpeg not found, using original file (may cause bad transcription)")
|
|
return audio_path
|
|
|
|
log.info(f" Converting to WAV: {src.name} -> {wav_path.name}")
|
|
cmd = [
|
|
ffmpeg, "-i", audio_path,
|
|
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
|
|
"-y", str(wav_path),
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
|
if result.returncode != 0:
|
|
log.error(f" ffmpeg failed: {result.stderr[:300]}")
|
|
return audio_path
|
|
log.info(f" WAV ready: {wav_path.name} ({wav_path.stat().st_size / 1_048_576:.0f} MB)")
|
|
return str(wav_path)
|
|
except FileNotFoundError:
|
|
log.warning(f" ffmpeg not found at: {ffmpeg}")
|
|
return audio_path
|
|
except subprocess.TimeoutExpired:
|
|
log.error(f" ffmpeg conversion timeout for {audio_path}")
|
|
return audio_path
|
|
|
|
|
|
def load_manifest(manifest_path: Path) -> dict:
|
|
with open(manifest_path, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_manifest(manifest: dict, manifest_path: Path):
|
|
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(manifest_path, "w", encoding="utf-8") as f:
|
|
json.dump(manifest, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def transcribe_file(audio_path: str, output_base: str) -> bool:
|
|
cmd = [
|
|
WHISPER_BIN,
|
|
"--model", WHISPER_MODEL,
|
|
"--language", "ro",
|
|
"--no-gpu",
|
|
"--threads", str(os.cpu_count() or 4),
|
|
"--beam-size", "1",
|
|
"--best-of", "1",
|
|
"--max-context", "0",
|
|
"--entropy-thold", "2.4",
|
|
"--max-len", "60",
|
|
"--suppress-nst",
|
|
"--no-fallback",
|
|
"--output-txt",
|
|
"--output-srt",
|
|
"--output-file", output_base,
|
|
"--file", audio_path,
|
|
]
|
|
|
|
log.info(f" CMD: {' '.join(cmd)}")
|
|
try:
|
|
env = os.environ.copy()
|
|
whisper_dir = str(Path(WHISPER_BIN).resolve().parent)
|
|
env["PATH"] = whisper_dir + os.pathsep + env.get("PATH", "")
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
stdout=sys.stdout,
|
|
stderr=sys.stderr,
|
|
timeout=7200,
|
|
env=env,
|
|
)
|
|
if result.returncode != 0:
|
|
log.error(f" whisper.cpp failed (exit {result.returncode})")
|
|
return False
|
|
|
|
txt_path = Path(f"{output_base}.txt")
|
|
srt_path = Path(f"{output_base}.srt")
|
|
if not txt_path.exists() or txt_path.stat().st_size == 0:
|
|
log.error(f" Empty or missing transcript: {txt_path}")
|
|
return False
|
|
|
|
log.info(f" Output: {txt_path.name} ({txt_path.stat().st_size} bytes)")
|
|
if srt_path.exists():
|
|
log.info(f" Output: {srt_path.name} ({srt_path.stat().st_size} bytes)")
|
|
return True
|
|
|
|
except subprocess.TimeoutExpired:
|
|
log.error(f" Timeout (>2h) for {audio_path}")
|
|
return False
|
|
except FileNotFoundError:
|
|
log.error(f" whisper.cpp not found at: {WHISPER_BIN}")
|
|
log.error(f" Set WHISPER_BIN env var or put whisper-cli.exe in PATH")
|
|
return False
|
|
except Exception as e:
|
|
log.error(f" Error: {e}")
|
|
return False
|
|
|
|
|
|
def parse_module_filter(arg: str) -> set[int]:
|
|
result = set()
|
|
for part in arg.split(","):
|
|
part = part.strip()
|
|
if "-" in part:
|
|
a, b = part.split("-", 1)
|
|
result.update(range(int(a), int(b) + 1))
|
|
else:
|
|
result.add(int(part))
|
|
return result
|
|
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description="Transcribe lecture audio for a course")
|
|
p.add_argument("--course", default="master", help="Course key (see courses.py)")
|
|
p.add_argument("--modules", default=None, help="Module filter, e.g. '1-3' or '2,4,5'")
|
|
return p.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
course = get_course(args.course)
|
|
paths = course_paths(course)
|
|
|
|
if not paths["manifest"].exists():
|
|
log.error(f"{paths['manifest']} not found. Run download.py --course {course['key']} first.")
|
|
sys.exit(1)
|
|
|
|
module_filter = parse_module_filter(args.modules) if args.modules else None
|
|
if module_filter:
|
|
log.info(f"Module filter: {sorted(module_filter)}")
|
|
|
|
manifest = load_manifest(paths["manifest"])
|
|
validate_manifest_course(manifest, course["key"])
|
|
paths["transcripts_dir"].mkdir(parents=True, exist_ok=True)
|
|
|
|
total = 0
|
|
transcribed = 0
|
|
skipped = 0
|
|
failed = 0
|
|
|
|
for mod_idx, mod in enumerate(manifest["modules"], 1):
|
|
if module_filter and mod_idx not in module_filter:
|
|
log.info(f"\nSkipping module {mod_idx}: {mod['name']}")
|
|
continue
|
|
log.info(f"\n{'='*60}")
|
|
log.info(f"Module: {mod['name']}")
|
|
log.info(f"{'='*60}")
|
|
|
|
for lec in mod["lectures"]:
|
|
total += 1
|
|
|
|
# Text and PDF lectures bypass whisper — transcript written by download.py.
|
|
if lec.get("type") in ("text", "pdf"):
|
|
lec["transcribe_status"] = "complete"
|
|
skipped += 1
|
|
log.info(f" Skipping {lec.get('type')}: {lec['title']}")
|
|
continue
|
|
|
|
if lec.get("download_status") != "complete":
|
|
log.warning(f" Skipping (not downloaded): {lec['title']}")
|
|
continue
|
|
|
|
audio_path = lec["audio_path"]
|
|
# Reuse the stem already recorded in the manifest for backward-compat
|
|
# with M1-M6 paths (strips ' [Audio]' for aresens filenames).
|
|
stem = Path(lec["original_filename"]).stem.replace(" [Audio]", "")
|
|
output_base = str(paths["transcripts_dir"] / stem)
|
|
|
|
txt_path = Path(f"{output_base}.txt")
|
|
if txt_path.exists() and txt_path.stat().st_size > 0:
|
|
lec["transcribe_status"] = "complete"
|
|
skipped += 1
|
|
log.info(f" Skipping (exists): {stem}.txt")
|
|
continue
|
|
|
|
log.info(f" Transcribing: {lec['title']}")
|
|
log.info(f" File: {audio_path}")
|
|
|
|
wav_path = convert_to_wav(audio_path, paths["wav_cache_dir"])
|
|
|
|
if transcribe_file(wav_path, output_base):
|
|
lec["transcribe_status"] = "complete"
|
|
transcribed += 1
|
|
else:
|
|
lec["transcribe_status"] = "failed"
|
|
failed += 1
|
|
|
|
save_manifest(manifest, paths["manifest"])
|
|
|
|
if mod == manifest["modules"][0] and transcribed > 0:
|
|
log.info(f"First module complete ({transcribed} files). Continuing automatically...")
|
|
|
|
empty_outputs = [
|
|
lec["title"]
|
|
for mod in manifest["modules"]
|
|
for lec in mod["lectures"]
|
|
if lec.get("transcribe_status") == "complete"
|
|
and lec.get("type") != "text"
|
|
and not Path(lec.get("transcript_path", "")).exists()
|
|
]
|
|
|
|
log.info("\n" + "=" * 60)
|
|
log.info(f"Transcribed {transcribed}/{total} files, {skipped} skipped, {failed} failures.")
|
|
log.info(f"No empty outputs: {'PASS' if not empty_outputs else 'FAIL'}")
|
|
if empty_outputs:
|
|
for t in empty_outputs:
|
|
log.error(f" Missing transcript: {t}")
|
|
log.info("=" * 60)
|
|
|
|
save_manifest(manifest, paths["manifest"])
|
|
|
|
if failed:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|