refactor: parametrize pipeline cu --course flag + suport Vimeo/text

Un singur set de scripturi acum rulează pe orice curs configurat în
courses.py. Master rămâne la rădăcina repo (backward-compat M1-M6);
cursuri noi (ex. practitioner la shop.cursnlp.ro) primesc un root
dedicat (nlp-practitioner/) cu propriile artefacte.

- courses.py: config dict (master, practitioner) + course_paths() +
  validate_manifest_course() (manifest fără course_key = master).
- download.py: --course + --modules; trei tipuri de lecții (audio HTTP,
  Vimeo iframe via yt-dlp audio-only, text-only cu captură HTML);
  merge cu manifest existent în loc de replace; strip [Audio] pentru
  backward-compat paths.
- transcribe.py: --course + --modules; skip type==text; path-uri prin
  course_paths(); validare course_key.
- summarize.py: --course + --compile; template prompt folosește
  course['name']; scrie SUPORT_CURS.md cu LF explicit (WSL2 baseline).
- md_to_pdf.py: --course resolv-ă summaries_dir / pdf_dir per curs.
- run.bat: detectează master|practitioner ca primul argument,
  propagă --course la sub-scripturi; backward-compat run.bat [modules].
- requirements.txt: + yt-dlp.
- .gitignore: nlp-practitioner/audio/, audio_wav/, scratch_recon.py, tmp_recon/.
- tests/test_regression.sh: 5 gate-uri read-only (import, schema,
  disk-coherence, SUPORT_CURS byte-identic, cross-course isolation).

Regression curs master: PASS (manifest + SUPORT_CURS.md hash
identic cu baseline /tmp/suport_before.md).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-22 14:33:19 +03:00
parent ada00e380d
commit d22038d002
9 changed files with 1192 additions and 795 deletions

8
.gitignore vendored
View File

@@ -38,3 +38,11 @@ __pycache__/
# Logs # Logs
*.log *.log
# Second course (practitioner) — artifacts only, scripts partajate
nlp-practitioner/audio/
nlp-practitioner/audio_wav/
# Recon scratch
scratch_recon.py
tmp_recon/

80
courses.py Normal file
View File

@@ -0,0 +1,80 @@
"""
Shared course configuration for the NLP Master pipeline.
A single pipeline (download -> transcribe -> summarize -> pdf) runs on
multiple courses by passing --course <key>. Scripts resolve all artifact
paths against course["root"], so curs master remains in-place at repo root
and subsequent courses land in their own subdirectory.
"""
from pathlib import Path
COURSES = {
"master": {
"name": "NLP Master Practitioner Bucuresti 2025",
"base_url": "https://cursuri.aresens.ro",
"course_path": "/curs/26",
"login_path": "/login",
"env_user": "COURSE_USERNAME",
"env_pass": "COURSE_PASSWORD",
# Curs master stays at repo root for backward-compat with M1-M6 outputs.
"root": Path("."),
},
"practitioner": {
"name": "NLP Practitioner (cursnlp.ro)",
"base_url": "https://shop.cursnlp.ro",
"course_path": "/curs/50",
"login_path": "/login",
"env_user": "PRACTITIONER_USERNAME",
"env_pass": "PRACTITIONER_PASSWORD",
"root": Path("nlp-practitioner"),
},
}
def get_course(key: str) -> dict:
"""Return course config by key; SystemExit on unknown key."""
if key not in COURSES:
raise SystemExit(
f"Unknown course '{key}'. Available: {sorted(COURSES)}"
)
c = dict(COURSES[key])
c["key"] = key
c["course_url"] = c["base_url"] + c["course_path"]
c["login_url"] = c["base_url"] + c["login_path"]
return c
def course_paths(course: dict) -> dict:
"""Resolve artifact paths under course['root']."""
root = course["root"]
return {
"root": root,
"manifest": root / "manifest.json",
"audio_dir": root / "audio",
"wav_cache_dir": root / "audio_wav",
"transcripts_dir": root / "transcripts",
"summaries_dir": root / "summaries",
"pdf_dir": root / "summaries" / "pdf",
"master_guide": root / "SUPORT_CURS.md",
}
def validate_manifest_course(manifest: dict, course_key: str) -> None:
"""
Ensure a pre-existing manifest belongs to the course currently being run.
Legacy policy: a manifest without `course_key` (written before this refactor)
is treated as `master`. This keeps backward-compat with the existing
curs_26 manifest.json from M1-M6.
"""
mck = manifest.get("course_key")
if mck is None:
effective = "master"
else:
effective = mck
if effective != course_key:
raise SystemExit(
f"Manifest belongs to course '{effective}' but --course='{course_key}'. "
f"Refusing to corrupt cross-course state. "
f"Delete {course_key}'s manifest to start fresh, or run with --course={effective}."
)

View File

@@ -1,277 +1,475 @@
""" """
Download all audio files from cursuri.aresens.ro NLP Master course. Download all lecture media from a configured course (see courses.py).
Logs in, discovers modules and lectures, downloads MP3s, writes manifest.json.
Resumable: skips already-downloaded files. Logs in, discovers modules + lectures, downloads whichever media each
""" lecture exposes, writes <root>/manifest.json. Resumable: skips already-
downloaded files.
import json
import logging Lecture types:
import os - "audio": <audio source> MP3 on the course CDN -> requests stream download
import sys - "vimeo": <iframe src="...player.vimeo.com/video/ID"> -> yt-dlp
import time (audio-only HLS track -> MP3 96kbps, no video bytes fetched)
from pathlib import Path - "text": neither audio nor video -> capture the lecture HTML body as
from urllib.parse import urljoin a plain-text transcript directly (skips whisper entirely)
"""
import requests
from bs4 import BeautifulSoup import argparse
from dotenv import load_dotenv import json
import logging
BASE_URL = "https://cursuri.aresens.ro" import os
COURSE_URL = f"{BASE_URL}/curs/26" import re
LOGIN_URL = f"{BASE_URL}/login" import sys
AUDIO_DIR = Path("audio") import time
MANIFEST_PATH = Path("manifest.json") from pathlib import Path
MAX_RETRIES = 3 from urllib.parse import urljoin, urlparse
RETRY_BACKOFF = [5, 15, 30]
import requests
logging.basicConfig( from bs4 import BeautifulSoup
level=logging.INFO, from dotenv import load_dotenv
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[ from courses import course_paths, get_course, validate_manifest_course
logging.StreamHandler(),
logging.FileHandler("download_errors.log"), MAX_RETRIES = 3
], RETRY_BACKOFF = [5, 15, 30]
)
log = logging.getLogger(__name__) logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
def login(session: requests.Session, email: str, password: str) -> bool: handlers=[
"""Login and return True on success.""" logging.StreamHandler(),
resp = session.post(LOGIN_URL, data={ logging.FileHandler("download_errors.log"),
"email": email, ],
"password": password, )
"act": "login", log = logging.getLogger(__name__)
"remember": "on",
}, allow_redirects=True)
# Successful login redirects to the course page, not back to /login def login(session: requests.Session, course: dict, email: str, password: str) -> bool:
if "/login" in resp.url or "loginform" in resp.text: resp = session.post(course["login_url"], data={
return False "email": email,
return True "password": password,
"act": "login",
"remember": "on",
def parse_module_filter(arg: str) -> set[int]: }, allow_redirects=True)
"""Parse module filter like '1-3' or '4,5' or '1-3,5' into a set of 1-based indices.""" if "/login" in resp.url or "loginform" in resp.text:
result = set() return False
for part in arg.split(","): return True
part = part.strip()
if "-" in part:
a, b = part.split("-", 1) def parse_module_filter(arg: str) -> set[int]:
result.update(range(int(a), int(b) + 1)) result = set()
else: for part in arg.split(","):
result.add(int(part)) part = part.strip()
return result if "-" in part:
a, b = part.split("-", 1)
result.update(range(int(a), int(b) + 1))
def discover_modules(session: requests.Session) -> list[dict]: else:
"""Fetch course page and return list of {name, url, module_id}.""" result.add(int(part))
resp = session.get(COURSE_URL) return result
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
def discover_modules(session: requests.Session, course: dict) -> list[dict]:
modules = [] resp = session.get(course["course_url"])
for div in soup.select("div.module"): resp.raise_for_status()
number_el = div.select_one("div.module__number") soup = BeautifulSoup(resp.text, "html.parser")
link_el = div.select_one("a.btn")
if not number_el or not link_el: modules = []
continue for div in soup.select("div.module"):
href = link_el.get("href", "") number_el = div.select_one("div.module__number")
module_id = href.rstrip("/").split("/")[-1] link_el = div.select_one("a.btn")
modules.append({ if not number_el or not link_el:
"name": number_el.get_text(strip=True), continue
"url": urljoin(BASE_URL, href), href = link_el.get("href", "")
"module_id": module_id, module_id = href.rstrip("/").split("/")[-1]
}) modules.append({
log.info(f"Found {len(modules)} modules") "name": number_el.get_text(strip=True),
return modules "url": urljoin(course["base_url"], href),
"module_id": module_id,
})
def discover_lectures(session: requests.Session, module: dict) -> list[dict]: log.info(f"Found {len(modules)} modules")
"""Fetch a module page and return list of lectures with audio URLs.""" if not modules:
resp = session.get(module["url"]) log.error("No modules found on course page — selectors mismatch or not logged in")
resp.raise_for_status() sys.exit(1)
soup = BeautifulSoup(resp.text, "html.parser") return modules
lectures = []
for lesson_div in soup.select("div.lesson"): VIMEO_ID_RE = re.compile(r"player\.vimeo\.com/video/(\d+)", re.I)
name_el = lesson_div.select_one("div.module__name")
source_el = lesson_div.select_one("audio source")
if not name_el or not source_el: def slugify(text: str) -> str:
continue """Filesystem-safe slug for text lectures (no URL-derived filename)."""
src = source_el.get("src", "").strip() text = text.strip().lower()
if not src: text = re.sub(r"[^\w\s-]", "", text, flags=re.UNICODE)
continue text = re.sub(r"[\s_-]+", "_", text)
audio_url = urljoin(BASE_URL, src) return text[:80] or "untitled"
filename = src.split("/")[-1]
title = name_el.get_text(strip=True)
lectures.append({ def derived_stem(filename: str) -> str:
"title": title, """
"original_filename": filename, Stem used for transcript/srt/summary paths.
"url": audio_url, Strips the ' [Audio]' suffix used on curs master (aresens) filenames
"audio_path": str(AUDIO_DIR / filename), so derived paths stay short and backward-compatible with M1-M6.
}) """
log.info(f" {module['name']}: {len(lectures)} lectures") return Path(filename).stem.replace(" [Audio]", "")
return lectures
def classify_lesson(lesson_div, base_url: str) -> tuple[str, str, str]:
def download_file(session: requests.Session, url: str, dest: Path) -> bool: """
"""Download a file with retry logic. Returns True on success.""" Return (lecture_type, media_url_or_empty, filename_stem).
for attempt in range(MAX_RETRIES):
try: Types:
resp = session.get(url, stream=True, timeout=300) - ("audio", mp3_url, filename_from_url)
resp.raise_for_status() - ("vimeo", vimeo_url, "vimeo_<id>")
- ("text", "", slug_from_title) # no media found
# Write to temp file first, then rename (atomic) """
tmp = dest.with_suffix(".tmp") audio_el = lesson_div.select_one("audio source")
total = 0 if audio_el and audio_el.get("src", "").strip():
with open(tmp, "wb") as f: src = audio_el["src"].strip()
for chunk in resp.iter_content(chunk_size=1024 * 1024): return "audio", urljoin(base_url, src), src.split("/")[-1].rsplit(".", 1)[0]
f.write(chunk)
total += len(chunk) iframe_el = lesson_div.select_one("iframe")
if iframe_el:
if total < 1_000_000: # < 1MB is suspicious src = (iframe_el.get("src") or iframe_el.get("data-src") or "").strip()
log.warning(f"File too small ({total} bytes): {dest.name}") m = VIMEO_ID_RE.search(src)
tmp.unlink(missing_ok=True) if m:
return False vimeo_id = m.group(1)
# Canonical player URL works with yt-dlp + referer.
tmp.rename(dest) return "vimeo", f"https://player.vimeo.com/video/{vimeo_id}", f"vimeo_{vimeo_id}"
log.info(f" Downloaded: {dest.name} ({total / 1_000_000:.1f} MB)")
return True return "text", "", "" # stem filled in by caller using title slug
except Exception as e:
wait = RETRY_BACKOFF[attempt] if attempt < len(RETRY_BACKOFF) else 30 def discover_lectures(session: requests.Session, module: dict, course: dict) -> list[dict]:
log.warning(f" Attempt {attempt + 1}/{MAX_RETRIES} failed for {dest.name}: {e}") resp = session.get(module["url"])
if attempt < MAX_RETRIES - 1: resp.raise_for_status()
log.info(f" Retrying in {wait}s...") soup = BeautifulSoup(resp.text, "html.parser")
time.sleep(wait)
lectures = []
log.error(f" FAILED after {MAX_RETRIES} attempts: {dest.name}") for lesson_div in soup.select("div.lesson"):
return False name_el = lesson_div.select_one("div.module__name")
if not name_el:
continue
def load_manifest() -> dict | None: title = name_el.get_text(strip=True)
"""Load existing manifest if present.""" if not title:
if MANIFEST_PATH.exists(): continue
with open(MANIFEST_PATH) as f:
return json.load(f) ltype, media_url, stem = classify_lesson(lesson_div, course["base_url"])
return None if ltype == "text":
stem = slugify(title)
# Capture the lesson body HTML (source for text -> transcript)
def save_manifest(manifest: dict): # so we don't have to re-request it later.
"""Write manifest.json.""" body_el = lesson_div.select_one("div.module__content") or lesson_div
with open(MANIFEST_PATH, "w", encoding="utf-8") as f: lecture = {
json.dump(manifest, f, indent=2, ensure_ascii=False) "type": "text",
"title": title,
"original_filename": stem + ".txt",
def main(): "url": module["url"], # lesson is inline in module page
load_dotenv() "audio_path": "", # no audio
email = os.getenv("COURSE_USERNAME", "") "text_content": body_el.get_text("\n", strip=True),
password = os.getenv("COURSE_PASSWORD", "") }
if not email or not password: elif ltype == "vimeo":
log.error("Set COURSE_USERNAME and COURSE_PASSWORD in .env") # Target MP3 filename has .mp3 (yt-dlp extract-audio writes .mp3)
sys.exit(1) audio_path = course_paths(course)["audio_dir"] / f"{stem}.mp3"
lecture = {
# Parse --modules filter (e.g. "4-5" or "1,3,5") "type": "vimeo",
module_filter = None "title": title,
if "--modules" in sys.argv: "original_filename": f"{stem}.mp3",
idx = sys.argv.index("--modules") "url": media_url,
if idx + 1 < len(sys.argv): "audio_path": str(audio_path),
module_filter = parse_module_filter(sys.argv[idx + 1]) }
log.info(f"Module filter: {sorted(module_filter)}") else: # "audio"
# Preserve original filename (may contain spaces).
AUDIO_DIR.mkdir(exist_ok=True) filename = media_url.split("/")[-1]
audio_path = course_paths(course)["audio_dir"] / filename
session = requests.Session() lecture = {
session.headers.update({"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}) "type": "audio",
"title": title,
log.info("Logging in...") "original_filename": filename,
if not login(session, email, password): "url": media_url,
log.error("Login failed. Check credentials in .env") "audio_path": str(audio_path),
sys.exit(1) }
log.info("Login successful")
lectures.append(lecture)
# Discover structure
modules = discover_modules(session) counts = {
if not modules: "audio": sum(1 for L in lectures if L["type"] == "audio"),
log.error("No modules found") "vimeo": sum(1 for L in lectures if L["type"] == "vimeo"),
sys.exit(1) "text": sum(1 for L in lectures if L["type"] == "text"),
}
manifest = { log.info(
"course": "NLP Master Practitioner Bucuresti 2025", f" {module['name']}: {len(lectures)} lectures "
"source_url": COURSE_URL, f"(audio={counts['audio']}, vimeo={counts['vimeo']}, text={counts['text']})"
"modules": [], )
} return lectures
total_files = 0
downloaded = 0 def download_audio_http(session: requests.Session, url: str, dest: Path) -> bool:
skipped = 0 """HTTP stream download with retry. Returns True on success."""
failed = 0 for attempt in range(MAX_RETRIES):
try:
for mod_idx, mod in enumerate(modules, 1): resp = session.get(url, stream=True, timeout=300)
if module_filter and mod_idx not in module_filter: resp.raise_for_status()
log.info(f" Skipping module {mod_idx}: {mod['name']}") tmp = dest.with_suffix(dest.suffix + ".tmp")
continue total = 0
lectures = discover_lectures(session, mod) with open(tmp, "wb") as f:
module_entry = { for chunk in resp.iter_content(chunk_size=1024 * 1024):
"name": mod["name"], f.write(chunk)
"module_id": mod["module_id"], total += len(chunk)
"lectures": [], if total < 1_000_000:
} log.warning(f"File too small ({total} bytes): {dest.name}")
tmp.unlink(missing_ok=True)
for lec in lectures: return False
total_files += 1 tmp.rename(dest)
dest = Path(lec["audio_path"]) log.info(f" Downloaded: {dest.name} ({total / 1_000_000:.1f} MB)")
stem = dest.stem.replace(" [Audio]", "") return True
except Exception as e:
lecture_entry = { wait = RETRY_BACKOFF[attempt] if attempt < len(RETRY_BACKOFF) else 30
"title": lec["title"], log.warning(f" Attempt {attempt + 1}/{MAX_RETRIES} failed for {dest.name}: {e}")
"original_filename": lec["original_filename"], if attempt < MAX_RETRIES - 1:
"url": lec["url"], log.info(f" Retrying in {wait}s...")
"audio_path": lec["audio_path"], time.sleep(wait)
"transcript_path": f"transcripts/{stem}.txt",
"srt_path": f"transcripts/{stem}.srt", log.error(f" FAILED after {MAX_RETRIES} attempts: {dest.name}")
"summary_path": f"summaries/{stem}_summary.md", return False
"download_status": "pending",
"transcribe_status": "pending",
"file_size_bytes": 0, def download_vimeo_audio(vimeo_url: str, referer: str, dest: Path) -> bool:
} """
Download Vimeo audio-only stream via yt-dlp, output as MP3 96kbps.
# Skip if already downloaded No video bytes fetched (Vimeo HLS has separate audio tracks).
if dest.exists() and dest.stat().st_size > 1_000_000: """
lecture_entry["download_status"] = "complete" try:
lecture_entry["file_size_bytes"] = dest.stat().st_size import yt_dlp
skipped += 1 except ImportError:
log.info(f" Skipping (exists): {dest.name}") log.error("yt-dlp not installed. Run: pip install yt-dlp")
else: return False
if download_file(session, lec["url"], dest):
lecture_entry["download_status"] = "complete" dest.parent.mkdir(parents=True, exist_ok=True)
lecture_entry["file_size_bytes"] = dest.stat().st_size # yt-dlp adds .mp3 extension after postprocessing; give it the stem.
downloaded += 1 outtmpl_stem = str(dest.with_suffix(""))
else:
lecture_entry["download_status"] = "failed" ydl_opts = {
failed += 1 "format": "bestaudio",
"outtmpl": outtmpl_stem + ".%(ext)s",
module_entry["lectures"].append(lecture_entry) "http_headers": {"Referer": referer},
"quiet": True,
manifest["modules"].append(module_entry) "no_warnings": True,
# Save manifest after each module (checkpoint) "postprocessors": [{
save_manifest(manifest) "key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
# Final validation "preferredquality": "96",
all_ok = all( }],
Path(lec["audio_path"]).exists() and Path(lec["audio_path"]).stat().st_size > 1_000_000 }
for mod in manifest["modules"]
for lec in mod["lectures"] for attempt in range(MAX_RETRIES):
if lec["download_status"] == "complete" try:
) with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([vimeo_url])
log.info("=" * 60) if dest.exists() and dest.stat().st_size > 100_000:
log.info(f"Downloaded {downloaded}/{total_files} files, {skipped} skipped, {failed} failures.") log.info(f" Downloaded (vimeo): {dest.name} ({dest.stat().st_size / 1_000_000:.1f} MB)")
log.info(f"All files > 1MB: {'PASS' if all_ok else 'FAIL'}") return True
log.info("=" * 60) log.warning(f" yt-dlp produced no file or too small: {dest}")
except Exception as e:
if failed: wait = RETRY_BACKOFF[attempt] if attempt < len(RETRY_BACKOFF) else 30
sys.exit(1) log.warning(f" Vimeo attempt {attempt + 1}/{MAX_RETRIES} failed: {e}")
if attempt < MAX_RETRIES - 1:
log.info(f" Retrying in {wait}s...")
if __name__ == "__main__": time.sleep(wait)
main()
log.error(f" FAILED vimeo download after {MAX_RETRIES} attempts: {vimeo_url}")
return False
def capture_text_lecture(lecture: dict, transcripts_dir: Path) -> bool:
"""
Write the lecture's captured HTML text as a transcript .txt file.
Text lectures bypass whisper — content is final here.
"""
transcripts_dir.mkdir(parents=True, exist_ok=True)
stem = Path(lecture["original_filename"]).stem
txt_path = transcripts_dir / f"{stem}.txt"
text = lecture.get("text_content", "").strip()
if len(text) < 50:
log.warning(f" text lesson '{lecture['title']}' has <50 chars, skipping")
return False
header = f"[TEXT LECTURE — no audio]\nTitlu: {lecture['title']}\n\n"
txt_path.write_text(header + text, encoding="utf-8")
log.info(f" Captured (text): {txt_path.name} ({txt_path.stat().st_size} bytes)")
return True
def load_manifest(manifest_path: Path) -> dict | None:
if manifest_path.exists():
with open(manifest_path, encoding="utf-8") as f:
return json.load(f)
return None
def save_manifest(manifest: dict, manifest_path: Path):
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2, ensure_ascii=False)
def parse_args():
p = argparse.ArgumentParser(description="Download lecture media for a course")
p.add_argument("--course", default="master", help="Course key (see courses.py)")
p.add_argument("--modules", default=None, help="Module filter, e.g. '1-3' or '2,4,5'")
return p.parse_args()
def main():
args = parse_args()
course = get_course(args.course)
paths = course_paths(course)
load_dotenv()
email = os.getenv(course["env_user"], "")
password = os.getenv(course["env_pass"], "")
if not email or not password:
log.error(f"Set {course['env_user']} and {course['env_pass']} in .env")
sys.exit(1)
module_filter = parse_module_filter(args.modules) if args.modules else None
if module_filter:
log.info(f"Module filter: {sorted(module_filter)}")
paths["audio_dir"].mkdir(parents=True, exist_ok=True)
paths["transcripts_dir"].mkdir(parents=True, exist_ok=True)
# Validate existing manifest belongs to this course
existing = load_manifest(paths["manifest"])
if existing is not None:
validate_manifest_course(existing, course["key"])
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"})
log.info(f"Course: {course['key']} ({course['name']})")
log.info(f"Root: {paths['root']}")
log.info("Logging in...")
if not login(session, course, email, password):
log.error("Login failed. Check credentials in .env")
sys.exit(1)
log.info("Login successful")
modules = discover_modules(session, course)
# Start from existing manifest if present — preserves modules outside
# the current --modules filter, and preserves per-lecture state (e.g.
# transcribe_status) for modules in the filter.
if existing:
manifest = dict(existing)
manifest["course_key"] = course["key"]
manifest["course"] = course["name"]
manifest["source_url"] = course["course_url"]
if "modules" not in manifest:
manifest["modules"] = []
else:
manifest = {
"course_key": course["key"],
"course": course["name"],
"source_url": course["course_url"],
"modules": [],
}
# Index of existing modules by name for in-place replacement.
existing_by_name = {m["name"]: i for i, m in enumerate(manifest["modules"])}
# Prior lecture state (by title) for preserving transcribe_status.
prior_lecture_state: dict[str, dict] = {
lec["title"]: lec
for m in manifest["modules"]
for lec in m.get("lectures", [])
}
total = 0
downloaded = 0
skipped = 0
failed = 0
for mod_idx, mod in enumerate(modules, 1):
if module_filter and mod_idx not in module_filter:
log.info(f" Skipping module {mod_idx}: {mod['name']} (outside --modules filter, preserved in manifest)")
continue
lectures = discover_lectures(session, mod, course)
module_entry = {
"name": mod["name"],
"module_id": mod["module_id"],
"lectures": [],
}
for lec in lectures:
total += 1
stem = derived_stem(lec["original_filename"])
prior = prior_lecture_state.get(lec["title"], {})
entry = {
"type": lec["type"],
"title": lec["title"],
"original_filename": lec["original_filename"],
"url": lec["url"],
"audio_path": lec["audio_path"],
"transcript_path": prior.get("transcript_path") or f"{paths['transcripts_dir'].as_posix()}/{stem}.txt",
"srt_path": prior.get("srt_path") or f"{paths['transcripts_dir'].as_posix()}/{stem}.srt",
"summary_path": prior.get("summary_path") or f"{paths['summaries_dir'].as_posix()}/{stem}_summary.md",
"download_status": "pending",
# Preserve transcribe_status from prior run (disk check in transcribe.py will correct it if needed).
"transcribe_status": prior.get("transcribe_status", "pending"),
"file_size_bytes": 0,
}
if lec["type"] == "text":
# Captured directly; treated as already-transcribed.
txt_path = Path(entry["transcript_path"])
if txt_path.exists() and txt_path.stat().st_size > 50:
entry["download_status"] = "complete"
entry["transcribe_status"] = "complete"
skipped += 1
log.info(f" Skipping text (exists): {txt_path.name}")
elif capture_text_lecture(lec, paths["transcripts_dir"]):
entry["download_status"] = "complete"
entry["transcribe_status"] = "complete"
entry["file_size_bytes"] = txt_path.stat().st_size
downloaded += 1
else:
entry["download_status"] = "failed"
failed += 1
else:
dest = Path(lec["audio_path"])
if dest.exists() and dest.stat().st_size > 1_000_000:
entry["download_status"] = "complete"
entry["file_size_bytes"] = dest.stat().st_size
skipped += 1
log.info(f" Skipping (exists): {dest.name}")
else:
if lec["type"] == "audio":
ok = download_audio_http(session, lec["url"], dest)
else: # "vimeo"
ok = download_vimeo_audio(lec["url"], course["base_url"] + "/", dest)
if ok:
entry["download_status"] = "complete"
entry["file_size_bytes"] = dest.stat().st_size
downloaded += 1
else:
entry["download_status"] = "failed"
failed += 1
module_entry["lectures"].append(entry)
# Replace or append module in manifest (preserves order for existing, appends new at end).
if mod["name"] in existing_by_name:
manifest["modules"][existing_by_name[mod["name"]]] = module_entry
else:
manifest["modules"].append(module_entry)
save_manifest(manifest, paths["manifest"])
log.info("=" * 60)
log.info(f"Downloaded {downloaded}/{total} items, {skipped} skipped, {failed} failures.")
log.info("=" * 60)
if failed:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -10,8 +10,7 @@ from concurrent.futures import ProcessPoolExecutor, as_completed
import markdown2 import markdown2
from weasyprint import HTML from weasyprint import HTML
SUMMARIES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "summaries") from courses import course_paths, get_course
PDF_DIR = os.path.join(SUMMARIES_DIR, "pdf")
CSS = """ CSS = """
@page { @page {
@@ -178,9 +177,9 @@ def convert_one(args):
return os.path.basename(md_path), os.path.basename(pdf_path) return os.path.basename(md_path), os.path.basename(pdf_path)
def find_files(modules=None): def find_files(summaries_dir, modules=None):
"""Find all .md files in summaries/, optionally filtered by module numbers.""" """Find all .md files in summaries/, optionally filtered by module numbers."""
pattern = os.path.join(SUMMARIES_DIR, "*.md") pattern = os.path.join(summaries_dir, "*.md")
files = sorted(glob.glob(pattern)) files = sorted(glob.glob(pattern))
if modules: if modules:
@@ -216,32 +215,35 @@ def parse_modules(spec):
def main(): def main():
parser = argparse.ArgumentParser(description="Convert MD summaries to PDF") parser = argparse.ArgumentParser(description="Convert MD summaries to PDF")
parser.add_argument("files", nargs="*", help="Specific MD files to convert") parser.add_argument("files", nargs="*", help="Specific MD files to convert")
parser.add_argument( parser.add_argument("--course", default="master", help="Course key (see courses.py)")
"--modules", "-m", help="Module filter, e.g. '1-3' or '2,4,5'" parser.add_argument("--modules", "-m", help="Module filter, e.g. '1-3' or '2,4,5'")
) parser.add_argument("--workers", "-w", type=int, default=4, help="Parallel workers (default: 4)")
parser.add_argument(
"--workers", "-w", type=int, default=4, help="Parallel workers (default: 4)"
)
args = parser.parse_args() args = parser.parse_args()
os.makedirs(PDF_DIR, exist_ok=True) course = get_course(args.course)
paths = course_paths(course)
summaries_dir = str(paths["summaries_dir"].resolve())
pdf_dir = str(paths["pdf_dir"].resolve())
os.makedirs(pdf_dir, exist_ok=True)
if args.files: if args.files:
md_files = [os.path.abspath(f) for f in args.files] md_files = [os.path.abspath(f) for f in args.files]
else: else:
modules = parse_modules(args.modules) if args.modules else None modules = parse_modules(args.modules) if args.modules else None
md_files = find_files(modules) md_files = find_files(summaries_dir, modules)
if not md_files: if not md_files:
print("No MD files found to convert.") print(f"No MD files found in {summaries_dir}")
sys.exit(1) sys.exit(1)
jobs = [] jobs = []
for md_path in md_files: for md_path in md_files:
basename = os.path.splitext(os.path.basename(md_path))[0] basename = os.path.splitext(os.path.basename(md_path))[0]
pdf_path = os.path.join(PDF_DIR, basename + ".pdf") pdf_path = os.path.join(pdf_dir, basename + ".pdf")
jobs.append((md_path, pdf_path)) jobs.append((md_path, pdf_path))
print(f"Course: {course['key']} ({course['name']})")
print(f"Converting {len(jobs)} file(s) to PDF with {args.workers} workers...") print(f"Converting {len(jobs)} file(s) to PDF with {args.workers} workers...")
with ProcessPoolExecutor(max_workers=args.workers) as pool: with ProcessPoolExecutor(max_workers=args.workers) as pool:
@@ -254,7 +256,7 @@ def main():
md_path = futures[future][0] md_path = futures[future][0]
print(f" ERROR {os.path.basename(md_path)}: {e}", file=sys.stderr) print(f" ERROR {os.path.basename(md_path)}: {e}", file=sys.stderr)
print(f"Done. PDFs saved to {PDF_DIR}") print(f"Done. PDFs saved to {pdf_dir}")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -3,3 +3,4 @@ beautifulsoup4
python-dotenv python-dotenv
markdown2 markdown2
weasyprint weasyprint
yt-dlp

56
run.bat
View File

@@ -2,9 +2,27 @@
setlocal enabledelayedexpansion setlocal enabledelayedexpansion
cd /d "%~dp0" cd /d "%~dp0"
:: ============================================================
:: Course + module filter argument parsing
:: Usage:
:: run.bat -> master, all modules (backward-compat)
:: run.bat 1-3 -> master, modules 1-3 (backward-compat)
:: run.bat practitioner -> practitioner, all modules
:: run.bat practitioner 1-3 -> practitioner, modules 1-3
:: ============================================================
set "COURSE_KEY=master"
set "MODULE_FILTER=%~1"
if /i "%~1"=="master" (
set "COURSE_KEY=master"
set "MODULE_FILTER=%~2"
)
if /i "%~1"=="practitioner" (
set "COURSE_KEY=practitioner"
set "MODULE_FILTER=%~2"
)
echo ============================================================ echo ============================================================
echo NLP Master - Download + Transcribe Pipeline echo NLP Course Pipeline (course: %COURSE_KEY%)
echo ============================================================ echo ============================================================
echo. echo.
@@ -46,20 +64,28 @@ if not defined PYTHON_CMD (
) )
:: --- .env credentials --- :: --- .env credentials ---
:: Each course uses its own env var pair. Check based on selected course.
if /i "%COURSE_KEY%"=="practitioner" (
set "ENV_USER=PRACTITIONER_USERNAME"
set "ENV_PASS=PRACTITIONER_PASSWORD"
) else (
set "ENV_USER=COURSE_USERNAME"
set "ENV_PASS=COURSE_PASSWORD"
)
if exist ".env" ( if exist ".env" (
findstr /m "COURSE_USERNAME=." ".env" >nul 2>&1 findstr /m "!ENV_USER!=." ".env" >nul 2>&1
if errorlevel 1 ( if errorlevel 1 (
echo [X] .env File exists but COURSE_USERNAME is empty echo [X] .env File exists but !ENV_USER! is empty
echo Edit .env and fill in your credentials. echo Edit .env and set !ENV_USER! and !ENV_PASS!.
set "PREREQ_OK=" set "PREREQ_OK="
) else ( ) else (
echo [OK] .env Credentials configured echo [OK] .env Credentials configured for %COURSE_KEY%
) )
) else ( ) else (
echo [X] .env NOT FOUND echo [X] .env NOT FOUND
echo Create .env with: echo Create .env with:
echo COURSE_USERNAME=your_email echo !ENV_USER!=your_email
echo COURSE_PASSWORD=your_password echo !ENV_PASS!=your_password
set "PREREQ_OK=" set "PREREQ_OK="
) )
@@ -265,11 +291,11 @@ echo Done.
echo. echo.
echo [3/4] Downloading audio files... echo [3/4] Downloading audio files...
echo ============================================================ echo ============================================================
if "%~1"=="" ( if "!MODULE_FILTER!"=="" (
.venv\Scripts\python download.py .venv\Scripts\python download.py --course %COURSE_KEY%
) else ( ) else (
echo Modules filter: %~1 echo Modules filter: !MODULE_FILTER!
.venv\Scripts\python download.py --modules %~1 .venv\Scripts\python download.py --course %COURSE_KEY% --modules !MODULE_FILTER!
) )
if errorlevel 1 ( if errorlevel 1 (
echo. echo.
@@ -287,11 +313,11 @@ echo Using: %WHISPER_BIN%
echo Model: %WHISPER_MODEL% echo Model: %WHISPER_MODEL%
echo. echo.
if "%~1"=="" ( if "!MODULE_FILTER!"=="" (
.venv\Scripts\python transcribe.py .venv\Scripts\python transcribe.py --course %COURSE_KEY%
) else ( ) else (
echo Modules filter: %~1 echo Modules filter: !MODULE_FILTER!
.venv\Scripts\python transcribe.py --modules %~1 .venv\Scripts\python transcribe.py --course %COURSE_KEY% --modules !MODULE_FILTER!
) )
if errorlevel 1 ( if errorlevel 1 (
echo. echo.

View File

@@ -1,192 +1,200 @@
""" """
Generate summaries from transcripts using Claude Code. Generate summaries from transcripts using Claude Code.
Reads manifest.json, processes each transcript, outputs per-lecture summaries, Reads <root>/manifest.json, processes each transcript, outputs per-lecture
and compiles SUPORT_CURS.md master study guide. summaries, and compiles <root>/SUPORT_CURS.md master study guide.
Usage: Usage:
python summarize.py # Print prompts for each transcript (pipe to Claude) python summarize.py # master, print prompts
python summarize.py --compile # Compile existing summaries into SUPORT_CURS.md python summarize.py --course practitioner # practitioner, print prompts
""" python summarize.py --compile # master, compile SUPORT_CURS.md
python summarize.py --course practitioner --compile
import json """
import sys
import textwrap import argparse
from pathlib import Path import json
import sys
MANIFEST_PATH = Path("manifest.json") from pathlib import Path
SUMMARIES_DIR = Path("summaries")
TRANSCRIPTS_DIR = Path("transcripts") from courses import course_paths, get_course, validate_manifest_course
MASTER_GUIDE = Path("SUPORT_CURS.md")
MAX_WORDS_PER_CHUNK = 10000
MAX_WORDS_PER_CHUNK = 10000 OVERLAP_WORDS = 500
OVERLAP_WORDS = 500
SUMMARY_PROMPT = """Rezuma aceasta transcriere a unei lectii din cursul {course_name}.
SUMMARY_PROMPT = """Rezuma aceasta transcriere a unei lectii din cursul NLP Master Practitioner.
Ofera:
Ofera: 1. **Prezentare generala** - 3-5 propozitii care descriu subiectul principal al lectiei
1. **Prezentare generala** - 3-5 propozitii care descriu subiectul principal al lectiei 2. **Concepte cheie** - lista cu definitii scurte pentru fiecare concept important
2. **Concepte cheie** - lista cu definitii scurte pentru fiecare concept important 3. **Detalii si exemple importante** - informatii concrete, exercitii practice, exemple relevante mentionate de trainer
3. **Detalii si exemple importante** - informatii concrete, exercitii practice, exemple relevante mentionate de trainer 4. **Citate memorabile** - fraze sau idei remarcabile (daca exista)
4. **Citate memorabile** - fraze sau idei remarcabile (daca exista)
Raspunde in limba romana. Formateaza ca Markdown.
Raspunde in limba romana. Formateaza ca Markdown.
---
--- TITLU LECTIE: {title}
TITLU LECTIE: {title} ---
--- TRANSCRIERE:
TRANSCRIERE: {text}
{text} """
"""
MERGE_PROMPT = """Am mai multe rezumate partiale ale aceleiasi lectii (a fost prea lunga pentru un singur rezumat).
MERGE_PROMPT = """Am mai multe rezumate partiale ale aceleiasi lectii (a fost prea lunga pentru un singur rezumat). Combina-le intr-un singur rezumat coerent, eliminand duplicatele.
Combina-le intr-un singur rezumat coerent, eliminand duplicatele.
Pastreaza structura:
Pastreaza structura: 1. Prezentare generala (3-5 propozitii)
1. Prezentare generala (3-5 propozitii) 2. Concepte cheie cu definitii
2. Concepte cheie cu definitii 3. Detalii si exemple importante
3. Detalii si exemple importante 4. Citate memorabile
4. Citate memorabile
Raspunde in limba romana. Formateaza ca Markdown.
Raspunde in limba romana. Formateaza ca Markdown.
---
--- TITLU LECTIE: {title}
TITLU LECTIE: {title} ---
--- REZUMATE PARTIALE:
REZUMATE PARTIALE: {chunks}
{chunks} """
"""
def load_manifest(manifest_path: Path) -> dict:
def load_manifest() -> dict: with open(manifest_path, encoding="utf-8") as f:
with open(MANIFEST_PATH, encoding="utf-8") as f: return json.load(f)
return json.load(f)
def split_at_sentences(text: str, max_words: int, overlap: int) -> list[str]:
def split_at_sentences(text: str, max_words: int, overlap: int) -> list[str]: words = text.split()
"""Split text into chunks at sentence boundaries with overlap.""" if len(words) <= max_words:
words = text.split() return [text]
if len(words) <= max_words:
return [text] chunks = []
start = 0
chunks = [] while start < len(words):
start = 0 end = min(start + max_words, len(words))
while start < len(words): chunk_words = words[start:end]
end = min(start + max_words, len(words)) chunk_text = " ".join(chunk_words)
chunk_words = words[start:end]
chunk_text = " ".join(chunk_words) if end < len(words):
for sep in [". ", "! ", "? ", ".\n", "!\n", "?\n"]:
# Try to break at sentence boundary (look back from end) last_sep = chunk_text.rfind(sep)
if end < len(words): if last_sep > len(chunk_text) // 2:
for sep in [". ", "! ", "? ", ".\n", "!\n", "?\n"]: chunk_text = chunk_text[:last_sep + 1]
last_sep = chunk_text.rfind(sep) end = start + len(chunk_text.split())
if last_sep > len(chunk_text) // 2: # Don't break too early break
chunk_text = chunk_text[:last_sep + 1]
# Recalculate end based on actual words used chunks.append(chunk_text)
end = start + len(chunk_text.split()) start = max(end - overlap, start + 1)
break
return chunks
chunks.append(chunk_text)
start = max(end - overlap, start + 1) # Overlap, but always advance
def generate_prompts(manifest: dict, course: dict, paths: dict):
return chunks paths["summaries_dir"].mkdir(parents=True, exist_ok=True)
for mod in manifest["modules"]:
def generate_prompts(manifest: dict): for lec in mod["lectures"]:
"""Print summary prompts for each transcript to stdout.""" if lec.get("transcribe_status") != "complete":
SUMMARIES_DIR.mkdir(exist_ok=True) continue
for mod in manifest["modules"]: summary_path = Path(lec["summary_path"])
for lec in mod["lectures"]: if summary_path.exists() and summary_path.stat().st_size > 0:
if lec.get("transcribe_status") != "complete": print(f"# SKIP (exists): {lec['title']}", file=sys.stderr)
continue continue
summary_path = Path(lec["summary_path"]) txt_path = Path(lec["transcript_path"])
if summary_path.exists() and summary_path.stat().st_size > 0: if not txt_path.exists():
print(f"# SKIP (exists): {lec['title']}", file=sys.stderr) print(f"# SKIP (no transcript): {lec['title']}", file=sys.stderr)
continue continue
txt_path = Path(lec["transcript_path"]) text = txt_path.read_text(encoding="utf-8").strip()
if not txt_path.exists(): if not text:
print(f"# SKIP (no transcript): {lec['title']}", file=sys.stderr) print(f"# SKIP (empty): {lec['title']}", file=sys.stderr)
continue continue
text = txt_path.read_text(encoding="utf-8").strip() chunks = split_at_sentences(text, MAX_WORDS_PER_CHUNK, OVERLAP_WORDS)
if not text:
print(f"# SKIP (empty): {lec['title']}", file=sys.stderr) print(f"\n{'='*60}", file=sys.stderr)
continue print(f"Lecture: {lec['title']}", file=sys.stderr)
print(f"Words: {len(text.split())}, Chunks: {len(chunks)}", file=sys.stderr)
chunks = split_at_sentences(text, MAX_WORDS_PER_CHUNK, OVERLAP_WORDS) print(f"Output: {summary_path}", file=sys.stderr)
print(f"\n{'='*60}", file=sys.stderr) if len(chunks) == 1:
print(f"Lecture: {lec['title']}", file=sys.stderr) prompt = SUMMARY_PROMPT.format(
print(f"Words: {len(text.split())}, Chunks: {len(chunks)}", file=sys.stderr) course_name=course["name"], title=lec["title"], text=text,
print(f"Output: {summary_path}", file=sys.stderr) )
print(f"SUMMARY_FILE:{summary_path}")
if len(chunks) == 1: print(prompt)
prompt = SUMMARY_PROMPT.format(title=lec["title"], text=text) print("---END_PROMPT---")
print(f"SUMMARY_FILE:{summary_path}") else:
print(prompt) for i, chunk in enumerate(chunks, 1):
print("---END_PROMPT---") prompt = SUMMARY_PROMPT.format(
else: course_name=course["name"],
# Multi-chunk: generate individual chunk prompts title=f"{lec['title']} (partea {i}/{len(chunks)})",
for i, chunk in enumerate(chunks, 1): text=chunk,
prompt = SUMMARY_PROMPT.format( )
title=f"{lec['title']} (partea {i}/{len(chunks)})", print(f"CHUNK_PROMPT:{i}/{len(chunks)}:{summary_path}")
text=chunk, print(prompt)
) print("---END_PROMPT---")
print(f"CHUNK_PROMPT:{i}/{len(chunks)}:{summary_path}")
print(prompt) print(f"MERGE_FILE:{summary_path}")
print("---END_PROMPT---") merge = MERGE_PROMPT.format(
title=lec["title"],
# Then a merge prompt chunks="{chunk_summaries}",
print(f"MERGE_FILE:{summary_path}") )
merge = MERGE_PROMPT.format( print(merge)
title=lec["title"], print("---END_PROMPT---")
chunks="{chunk_summaries}", # Placeholder for merge step
)
print(merge) def compile_master_guide(manifest: dict, course: dict, paths: dict):
print("---END_PROMPT---") lines = [
f"# SUPORT CURS - {course['name']}\n",
"_Generat automat din transcrierile audio ale cursului._\n",
def compile_master_guide(manifest: dict): "---\n",
"""Compile all summaries into SUPORT_CURS.md.""" ]
lines = [
"# SUPORT CURS - NLP Master Practitioner Bucuresti 2025\n", for mod in manifest["modules"]:
"_Generat automat din transcrierile audio ale cursului._\n", lines.append(f"\n## {mod['name']}\n")
"---\n", for lec in mod["lectures"]:
] summary_path = Path(lec["summary_path"])
lines.append(f"\n### {lec['title']}\n")
for mod in manifest["modules"]: if summary_path.exists():
lines.append(f"\n## {mod['name']}\n") content = summary_path.read_text(encoding="utf-8").strip()
lines.append(f"{content}\n")
for lec in mod["lectures"]: else:
summary_path = Path(lec["summary_path"]) lines.append("_Rezumat indisponibil._\n")
lines.append(f"\n### {lec['title']}\n") lines.append("\n---\n")
if summary_path.exists(): paths["master_guide"].parent.mkdir(parents=True, exist_ok=True)
content = summary_path.read_text(encoding="utf-8").strip() # Write LF-only to match the WSL2 baseline (the documented summary workflow
lines.append(f"{content}\n") # runs from WSL2; Windows text-mode CRLF would break byte-identic compares).
else: with open(paths["master_guide"], "w", encoding="utf-8", newline="\n") as f:
lines.append("_Rezumat indisponibil._\n") f.write("\n".join(lines))
print(f"Compiled {paths['master_guide']} ({paths['master_guide'].stat().st_size} bytes)")
lines.append("\n---\n")
MASTER_GUIDE.write_text("\n".join(lines), encoding="utf-8") def parse_args():
print(f"Compiled {MASTER_GUIDE} ({MASTER_GUIDE.stat().st_size} bytes)") p = argparse.ArgumentParser(description="Generate summaries / compile SUPORT_CURS.md")
p.add_argument("--course", default="master", help="Course key (see courses.py)")
p.add_argument("--compile", action="store_true", help="Compile SUPORT_CURS.md from existing summaries")
def main(): return p.parse_args()
if not MANIFEST_PATH.exists():
print("manifest.json not found. Run download.py and transcribe.py first.")
sys.exit(1) def main():
args = parse_args()
manifest = load_manifest() course = get_course(args.course)
paths = course_paths(course)
if "--compile" in sys.argv:
compile_master_guide(manifest) if not paths["manifest"].exists():
else: print(f"{paths['manifest']} not found. Run download.py and transcribe.py first.")
generate_prompts(manifest) sys.exit(1)
manifest = load_manifest(paths["manifest"])
if __name__ == "__main__": validate_manifest_course(manifest, course["key"])
main()
if args.compile:
compile_master_guide(manifest, course, paths)
else:
generate_prompts(manifest, course, paths)
if __name__ == "__main__":
main()

91
tests/test_regression.sh Normal file
View File

@@ -0,0 +1,91 @@
#!/bin/bash
# Regression test: curs master (cursuri.aresens.ro/curs/26) — rulat după
# refactor pentru a confirma că backward-compat e intactă.
#
# Read-only: nu face download, nu re-transcrie, nu modifică manifest în mod
# vizibil (summarize.py --compile suprascrie doar SUPORT_CURS.md pe care îl
# comparăm byte-identic cu baseline-ul capturat pre-refactor).
#
# Baseline: /tmp/suport_before.md (captured pre-refactor).
# Rulare: bash tests/test_regression.sh
set -euo pipefail
ROOT="$(cd "$(dirname "$0")/.." && pwd)"
cd "$ROOT"
PY="$ROOT/.venv/Scripts/python.exe"
[ -x "$PY" ] || PY=python
if [ ! -f /tmp/suport_before.md ]; then
echo "FAIL: baseline /tmp/suport_before.md lipsește. Capturează cu:"
echo " cp SUPORT_CURS.md /tmp/suport_before.md"
exit 1
fi
echo "=== [1/5] courses.py importabil + curs 'master' rezolvă ==="
"$PY" -c "
from courses import get_course, course_paths, validate_manifest_course
c = get_course('master')
p = course_paths(c)
assert c['key'] == 'master'
assert str(p['manifest']) == 'manifest.json', p['manifest']
assert str(p['master_guide']) == 'SUPORT_CURS.md'
print('OK: master root=. manifest=manifest.json')
"
echo "=== [2/5] manifest.json: schema backward-compat (course_key absent sau 'master') ==="
"$PY" - <<'PY'
import json
from courses import validate_manifest_course
m = json.load(open("manifest.json", encoding="utf-8"))
# Legacy (no course_key) must be accepted as 'master'.
validate_manifest_course(m, "master")
# Opposite direction must raise.
try:
validate_manifest_course(m, "practitioner")
except SystemExit as e:
print(f"OK: cross-course validation refuses: {e}")
else:
raise SystemExit("FAIL: cross-course validation silently allowed")
assert len(m["modules"]) >= 1, "no modules"
print(f"OK: {len(m['modules'])} modules in manifest")
PY
echo "=== [3/5] transcribe.py --course master (idempotent dry-run — citește manifest, nu re-transcrie) ==="
# Invocarea directă e dominată de disk-check pe transcript_path; dacă toate
# .txt există, nu rulează whisper.
"$PY" -c "
import json
m = json.load(open('manifest.json', encoding='utf-8'))
from pathlib import Path
missing = [l['title'] for mod in m['modules'] for l in mod['lectures']
if l.get('transcribe_status') == 'complete'
and l.get('type') != 'text'
and not Path(l['transcript_path']).exists()]
if missing:
print('FAIL: transcribe_status=complete but .txt missing for:', missing[:3])
raise SystemExit(1)
print(f'OK: all completed transcripts present on disk')
"
echo "=== [4/5] summarize.py --course master --compile — SUPORT_CURS.md byte-identic cu baseline ==="
"$PY" summarize.py --course master --compile
if ! diff -q SUPORT_CURS.md /tmp/suport_before.md >/dev/null; then
echo "FAIL: SUPORT_CURS.md diferă de baseline /tmp/suport_before.md"
diff /tmp/suport_before.md SUPORT_CURS.md | head -30
exit 1
fi
echo "OK: SUPORT_CURS.md byte-identic cu baseline."
echo "=== [5/5] cross-course isolation — --course practitioner nu atinge state-ul master ==="
OUT="$("$PY" transcribe.py --course practitioner 2>&1 || true)"
if echo "$OUT" | grep -qiE "belongs to course|not found"; then
echo "OK: transcribe --course practitioner nu a rulat pe manifest master"
echo " (mesaj: $(echo "$OUT" | grep -oE '(belongs to course[^"]*|not found[^"]*)' | head -1))"
else
echo "FAIL: transcribe --course practitioner output neașteptat:"
echo "$OUT" | head -3
exit 1
fi
echo ""
echo "REGRESSION OK — backward-compat curs master intactă."

View File

@@ -1,296 +1,279 @@
""" """
Batch transcription using whisper.cpp. Batch transcription using whisper.cpp.
Reads manifest.json, transcribes each audio file in module order, Reads <root>/manifest.json, transcribes each audio file in module order,
outputs .txt and .srt files, updates manifest status. outputs .txt and .srt files, updates manifest status.
Resumable: skips files with existing transcripts. Resumable: skips files with existing transcripts.
Converts MP3 -> WAV (16kHz mono) via ffmpeg before transcription. Converts MP3 -> WAV (16kHz mono) via ffmpeg before transcription.
"""
Text lectures (type=="text") are skipped — their transcript files are
import json written directly by download.py.
import logging """
import os
import shutil import argparse
import subprocess import json
import sys import logging
from pathlib import Path import os
import shutil
MANIFEST_PATH = Path("manifest.json") import subprocess
TRANSCRIPTS_DIR = Path("transcripts") import sys
WAV_CACHE_DIR = Path("audio_wav") from pathlib import Path
# whisper.cpp defaults — override with env vars or CLI args from courses import course_paths, get_course, validate_manifest_course
WHISPER_BIN = os.getenv("WHISPER_BIN", r"whisper-cli.exe")
WHISPER_MODEL = os.getenv("WHISPER_MODEL", r"models\ggml-medium-q5_0.bin") # whisper.cpp defaults — override with env vars or CLI args.
# Shared across courses (same model + binary).
logging.basicConfig( WHISPER_BIN = os.getenv("WHISPER_BIN", r"whisper-cli.exe")
level=logging.INFO, WHISPER_MODEL = os.getenv("WHISPER_MODEL", r"models\ggml-medium-q5_0.bin")
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[ logging.basicConfig(
logging.StreamHandler(), level=logging.INFO,
logging.FileHandler("transcribe_errors.log"), format="%(asctime)s [%(levelname)s] %(message)s",
], handlers=[
) logging.StreamHandler(),
log = logging.getLogger(__name__) logging.FileHandler("transcribe_errors.log"),
],
)
def find_ffmpeg() -> str: log = logging.getLogger(__name__)
"""Find ffmpeg executable."""
if shutil.which("ffmpeg"):
return "ffmpeg" def find_ffmpeg() -> str:
# Check local directories if shutil.which("ffmpeg"):
for p in [Path("ffmpeg.exe"), Path("ffmpeg-bin/ffmpeg.exe")]: return "ffmpeg"
if p.exists(): for p in [Path("ffmpeg.exe"), Path("ffmpeg-bin/ffmpeg.exe")]:
return str(p.resolve()) if p.exists():
# Try imageio-ffmpeg (pip fallback) return str(p.resolve())
try: try:
import imageio_ffmpeg import imageio_ffmpeg
return imageio_ffmpeg.get_ffmpeg_exe() return imageio_ffmpeg.get_ffmpeg_exe()
except ImportError: except ImportError:
pass pass
return "" return ""
def convert_to_wav(audio_path: str) -> str: def convert_to_wav(audio_path: str, wav_cache_dir: Path) -> str:
""" src = Path(audio_path)
Convert audio file to WAV 16kHz mono (optimal for whisper.cpp). if src.suffix.lower() == ".wav":
Returns path to WAV file. Skips if WAV already exists. return audio_path
"""
src = Path(audio_path) wav_cache_dir.mkdir(parents=True, exist_ok=True)
wav_path = wav_cache_dir / (src.stem + ".wav")
# Already a WAV file, skip
if src.suffix.lower() == ".wav": if wav_path.exists() and wav_path.stat().st_size > 0:
return audio_path log.info(f" WAV cache hit: {wav_path}")
return str(wav_path)
WAV_CACHE_DIR.mkdir(exist_ok=True)
wav_path = WAV_CACHE_DIR / (src.stem + ".wav") ffmpeg = find_ffmpeg()
if not ffmpeg:
# Skip if already converted log.warning(" ffmpeg not found, using original file (may cause bad transcription)")
if wav_path.exists() and wav_path.stat().st_size > 0: return audio_path
log.info(f" WAV cache hit: {wav_path}")
return str(wav_path) log.info(f" Converting to WAV: {src.name} -> {wav_path.name}")
cmd = [
ffmpeg = find_ffmpeg() ffmpeg, "-i", audio_path,
if not ffmpeg: "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
log.warning(" ffmpeg not found, using original file (may cause bad transcription)") "-y", str(wav_path),
return audio_path ]
try:
log.info(f" Converting to WAV: {src.name} -> {wav_path.name}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
cmd = [ if result.returncode != 0:
ffmpeg, log.error(f" ffmpeg failed: {result.stderr[:300]}")
"-i", audio_path, return audio_path
"-vn", # no video log.info(f" WAV ready: {wav_path.name} ({wav_path.stat().st_size / 1_048_576:.0f} MB)")
"-acodec", "pcm_s16le", # 16-bit PCM return str(wav_path)
"-ar", "16000", # 16kHz sample rate (whisper standard) except FileNotFoundError:
"-ac", "1", # mono log.warning(f" ffmpeg not found at: {ffmpeg}")
"-y", # overwrite return audio_path
str(wav_path), except subprocess.TimeoutExpired:
] log.error(f" ffmpeg conversion timeout for {audio_path}")
return audio_path
try:
result = subprocess.run(
cmd, def load_manifest(manifest_path: Path) -> dict:
capture_output=True, with open(manifest_path, encoding="utf-8") as f:
text=True, return json.load(f)
timeout=300, # 5 min max for conversion
)
if result.returncode != 0: def save_manifest(manifest: dict, manifest_path: Path):
log.error(f" ffmpeg failed: {result.stderr[:300]}") manifest_path.parent.mkdir(parents=True, exist_ok=True)
return audio_path with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2, ensure_ascii=False)
log.info(f" WAV ready: {wav_path.name} ({wav_path.stat().st_size / 1_048_576:.0f} MB)")
return str(wav_path)
def transcribe_file(audio_path: str, output_base: str) -> bool:
except FileNotFoundError: cmd = [
log.warning(f" ffmpeg not found at: {ffmpeg}") WHISPER_BIN,
return audio_path "--model", WHISPER_MODEL,
except subprocess.TimeoutExpired: "--language", "ro",
log.error(f" ffmpeg conversion timeout for {audio_path}") "--no-gpu",
return audio_path "--threads", str(os.cpu_count() or 4),
"--beam-size", "1",
"--best-of", "1",
def load_manifest() -> dict: "--max-context", "0",
with open(MANIFEST_PATH, encoding="utf-8") as f: "--entropy-thold", "2.4",
return json.load(f) "--max-len", "60",
"--suppress-nst",
"--no-fallback",
def save_manifest(manifest: dict): "--output-txt",
with open(MANIFEST_PATH, "w", encoding="utf-8") as f: "--output-srt",
json.dump(manifest, f, indent=2, ensure_ascii=False) "--output-file", output_base,
"--file", audio_path,
]
def transcribe_file(audio_path: str, output_base: str) -> bool:
""" log.info(f" CMD: {' '.join(cmd)}")
Run whisper.cpp on a single file. try:
Returns True on success. env = os.environ.copy()
""" whisper_dir = str(Path(WHISPER_BIN).resolve().parent)
cmd = [ env["PATH"] = whisper_dir + os.pathsep + env.get("PATH", "")
WHISPER_BIN,
"--model", WHISPER_MODEL, result = subprocess.run(
"--language", "ro", cmd,
"--no-gpu", stdout=sys.stdout,
"--threads", str(os.cpu_count() or 4), stderr=sys.stderr,
"--beam-size", "1", timeout=7200,
"--best-of", "1", env=env,
"--max-context", "0", # don't carry context between segments (prevents hallucination loops) )
"--entropy-thold", "2.4", # reject high-entropy (hallucinated) segments if result.returncode != 0:
"--max-len", "60", # shorter segments reduce drift log.error(f" whisper.cpp failed (exit {result.returncode})")
"--suppress-nst", # suppress non-speech tokens (reduces hallucination on silence) return False
"--no-fallback", # don't retry with higher temperature
"--output-txt", txt_path = Path(f"{output_base}.txt")
"--output-srt", srt_path = Path(f"{output_base}.srt")
"--output-file", output_base, if not txt_path.exists() or txt_path.stat().st_size == 0:
"--file", audio_path, log.error(f" Empty or missing transcript: {txt_path}")
] return False
log.info(f" CMD: {' '.join(cmd)}") log.info(f" Output: {txt_path.name} ({txt_path.stat().st_size} bytes)")
if srt_path.exists():
try: log.info(f" Output: {srt_path.name} ({srt_path.stat().st_size} bytes)")
# Add whisper.exe's directory to PATH so Windows finds its DLLs return True
env = os.environ.copy()
whisper_dir = str(Path(WHISPER_BIN).resolve().parent) except subprocess.TimeoutExpired:
env["PATH"] = whisper_dir + os.pathsep + env.get("PATH", "") log.error(f" Timeout (>2h) for {audio_path}")
return False
result = subprocess.run( except FileNotFoundError:
cmd, log.error(f" whisper.cpp not found at: {WHISPER_BIN}")
stdout=sys.stdout, log.error(f" Set WHISPER_BIN env var or put whisper-cli.exe in PATH")
stderr=sys.stderr, return False
timeout=7200, # 2 hour timeout per file except Exception as e:
env=env, log.error(f" Error: {e}")
) return False
if result.returncode != 0:
log.error(f" whisper.cpp failed (exit {result.returncode})") def parse_module_filter(arg: str) -> set[int]:
return False result = set()
for part in arg.split(","):
# Verify output exists and is non-empty part = part.strip()
txt_path = Path(f"{output_base}.txt") if "-" in part:
srt_path = Path(f"{output_base}.srt") a, b = part.split("-", 1)
result.update(range(int(a), int(b) + 1))
if not txt_path.exists() or txt_path.stat().st_size == 0: else:
log.error(f" Empty or missing transcript: {txt_path}") result.add(int(part))
return False return result
log.info(f" Output: {txt_path.name} ({txt_path.stat().st_size} bytes)")
if srt_path.exists(): def parse_args():
log.info(f" Output: {srt_path.name} ({srt_path.stat().st_size} bytes)") p = argparse.ArgumentParser(description="Transcribe lecture audio for a course")
p.add_argument("--course", default="master", help="Course key (see courses.py)")
return True p.add_argument("--modules", default=None, help="Module filter, e.g. '1-3' or '2,4,5'")
return p.parse_args()
except subprocess.TimeoutExpired:
log.error(f" Timeout (>2h) for {audio_path}")
return False def main():
except FileNotFoundError: args = parse_args()
log.error(f" whisper.cpp not found at: {WHISPER_BIN}") course = get_course(args.course)
log.error(f" Set WHISPER_BIN env var or put whisper-cli.exe in PATH") paths = course_paths(course)
return False
except Exception as e: if not paths["manifest"].exists():
log.error(f" Error: {e}") log.error(f"{paths['manifest']} not found. Run download.py --course {course['key']} first.")
return False sys.exit(1)
module_filter = parse_module_filter(args.modules) if args.modules else None
def parse_module_filter(arg: str) -> set[int]: if module_filter:
"""Parse module filter like '1-3' or '4,5' or '1-3,5' into a set of 1-based indices.""" log.info(f"Module filter: {sorted(module_filter)}")
result = set()
for part in arg.split(","): manifest = load_manifest(paths["manifest"])
part = part.strip() validate_manifest_course(manifest, course["key"])
if "-" in part: paths["transcripts_dir"].mkdir(parents=True, exist_ok=True)
a, b = part.split("-", 1)
result.update(range(int(a), int(b) + 1)) total = 0
else: transcribed = 0
result.add(int(part)) skipped = 0
return result failed = 0
for mod_idx, mod in enumerate(manifest["modules"], 1):
def main(): if module_filter and mod_idx not in module_filter:
if not MANIFEST_PATH.exists(): log.info(f"\nSkipping module {mod_idx}: {mod['name']}")
log.error("manifest.json not found. Run download.py first.") continue
sys.exit(1) log.info(f"\n{'='*60}")
log.info(f"Module: {mod['name']}")
# Parse --modules filter log.info(f"{'='*60}")
module_filter = None
if "--modules" in sys.argv: for lec in mod["lectures"]:
idx = sys.argv.index("--modules") total += 1
if idx + 1 < len(sys.argv):
module_filter = parse_module_filter(sys.argv[idx + 1]) # Text lectures bypass whisper — transcript written by download.py.
log.info(f"Module filter: {sorted(module_filter)}") if lec.get("type") == "text":
lec["transcribe_status"] = "complete"
manifest = load_manifest() skipped += 1
TRANSCRIPTS_DIR.mkdir(exist_ok=True) log.info(f" Skipping text: {lec['title']}")
continue
total = 0
transcribed = 0 if lec.get("download_status") != "complete":
skipped = 0 log.warning(f" Skipping (not downloaded): {lec['title']}")
failed = 0 continue
for mod_idx, mod in enumerate(manifest["modules"], 1): audio_path = lec["audio_path"]
if module_filter and mod_idx not in module_filter: # Reuse the stem already recorded in the manifest for backward-compat
log.info(f"\nSkipping module {mod_idx}: {mod['name']}") # with M1-M6 paths (strips ' [Audio]' for aresens filenames).
continue stem = Path(lec["original_filename"]).stem.replace(" [Audio]", "")
log.info(f"\n{'='*60}") output_base = str(paths["transcripts_dir"] / stem)
log.info(f"Module: {mod['name']}")
log.info(f"{'='*60}") txt_path = Path(f"{output_base}.txt")
if txt_path.exists() and txt_path.stat().st_size > 0:
for lec in mod["lectures"]: lec["transcribe_status"] = "complete"
total += 1 skipped += 1
log.info(f" Skipping (exists): {stem}.txt")
if lec.get("download_status") != "complete": continue
log.warning(f" Skipping (not downloaded): {lec['title']}")
continue log.info(f" Transcribing: {lec['title']}")
log.info(f" File: {audio_path}")
audio_path = lec["audio_path"]
stem = Path(lec["original_filename"]).stem.replace(" [Audio]", "") wav_path = convert_to_wav(audio_path, paths["wav_cache_dir"])
output_base = str(TRANSCRIPTS_DIR / stem)
if transcribe_file(wav_path, output_base):
# Check if already transcribed lec["transcribe_status"] = "complete"
txt_path = Path(f"{output_base}.txt") transcribed += 1
if txt_path.exists() and txt_path.stat().st_size > 0: else:
lec["transcribe_status"] = "complete" lec["transcribe_status"] = "failed"
skipped += 1 failed += 1
log.info(f" Skipping (exists): {stem}.txt")
continue save_manifest(manifest, paths["manifest"])
log.info(f" Transcribing: {lec['title']}") if mod == manifest["modules"][0] and transcribed > 0:
log.info(f" File: {audio_path}") log.info(f"First module complete ({transcribed} files). Continuing automatically...")
# Convert MP3 -> WAV 16kHz mono for reliable whisper.cpp input empty_outputs = [
wav_path = convert_to_wav(audio_path) lec["title"]
for mod in manifest["modules"]
if transcribe_file(wav_path, output_base): for lec in mod["lectures"]
lec["transcribe_status"] = "complete" if lec.get("transcribe_status") == "complete"
transcribed += 1 and lec.get("type") != "text"
else: and not Path(lec.get("transcript_path", "")).exists()
lec["transcribe_status"] = "failed" ]
failed += 1
log.info("\n" + "=" * 60)
# Save manifest after each file (checkpoint) log.info(f"Transcribed {transcribed}/{total} files, {skipped} skipped, {failed} failures.")
save_manifest(manifest) log.info(f"No empty outputs: {'PASS' if not empty_outputs else 'FAIL'}")
if empty_outputs:
# Log milestone after first module (no longer pauses) for t in empty_outputs:
if mod == manifest["modules"][0] and transcribed > 0: log.error(f" Missing transcript: {t}")
log.info(f"First module complete ({transcribed} files). Continuing automatically...") log.info("=" * 60)
# Validation save_manifest(manifest, paths["manifest"])
empty_outputs = [
lec["title"] if failed:
for mod in manifest["modules"] sys.exit(1)
for lec in mod["lectures"]
if lec.get("transcribe_status") == "complete"
and not Path(lec["transcript_path"]).exists() if __name__ == "__main__":
] main()
log.info("\n" + "=" * 60)
log.info(f"Transcribed {transcribed}/{total} files, {skipped} skipped, {failed} failures.")
log.info(f"No empty outputs: {'PASS' if not empty_outputs else 'FAIL'}")
if empty_outputs:
for t in empty_outputs:
log.error(f" Missing transcript: {t}")
log.info("=" * 60)
save_manifest(manifest)
if failed:
sys.exit(1)
if __name__ == "__main__":
main()