Files
nlp-master/download.py
Marius Mutu 6ee53133b7 feat(practitioner): structură per-modul + PDF-uri sursă + split 2-PC
- audio/Modul {N}/filename.mp3 — fiecare modul în subdirector separat
  pentru copiere pe telefon și transfer între PC-uri.
- PDF-urile se păstrează ca sursă în summaries/pdf/ (fără extract txt).
- transcribe_status="pdf_source_only" pentru lecțiile PDF → summarize.py
  le filtrează automat.
- Fix coliziune manifest transcript_path (stem-based, nu preserve prior).
- .bat per modul (M2-M8) + dispatchers run_pc1_all (M2-M5) + run_pc2_all
  (M6-M8) pentru partajare work pe 2 PC-uri.
- prepare_pc2_bundle.py: zip cu scripts + manifest + .env + PDFs pentru
  PC2 (self-installs whisper.cpp/model/ffmpeg la primul run).
- M1 whisper complete (49/49 audio+vimeo transcrise).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 08:48:58 +03:00

553 lines
21 KiB
Python

"""
Download all lecture media from a configured course (see courses.py).
Logs in, discovers modules + lectures, downloads whichever media each
lecture exposes, writes <root>/manifest.json. Resumable: skips already-
downloaded files.
Lecture types:
- "audio": <audio source> MP3 on the course CDN -> requests stream download
- "vimeo": <iframe src="...player.vimeo.com/video/ID"> -> yt-dlp
(audio-only HLS track -> MP3 96kbps, no video bytes fetched)
- "text": neither audio nor video -> capture the lecture HTML body as
a plain-text transcript directly (skips whisper entirely)
"""
import argparse
import json
import logging
import os
import re
import sys
import time
from pathlib import Path
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from courses import course_paths, get_course, validate_manifest_course
MAX_RETRIES = 3
RETRY_BACKOFF = [5, 15, 30]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("download_errors.log"),
],
)
log = logging.getLogger(__name__)
def login(session: requests.Session, course: dict, email: str, password: str) -> bool:
resp = session.post(course["login_url"], data={
"email": email,
"password": password,
"act": "login",
"remember": "on",
}, allow_redirects=True)
if "/login" in resp.url or "loginform" in resp.text:
return False
return True
def parse_module_filter(arg: str) -> set[int]:
result = set()
for part in arg.split(","):
part = part.strip()
if "-" in part:
a, b = part.split("-", 1)
result.update(range(int(a), int(b) + 1))
else:
result.add(int(part))
return result
def discover_modules(session: requests.Session, course: dict) -> list[dict]:
resp = session.get(course["course_url"])
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
modules = []
for div in soup.select("div.module"):
number_el = div.select_one("div.module__number")
link_el = div.select_one("a.btn")
if not number_el or not link_el:
continue
href = link_el.get("href", "")
module_id = href.rstrip("/").split("/")[-1]
modules.append({
"name": number_el.get_text(strip=True),
"url": urljoin(course["base_url"], href),
"module_id": module_id,
})
log.info(f"Found {len(modules)} modules")
if not modules:
log.error("No modules found on course page — selectors mismatch or not logged in")
sys.exit(1)
return modules
VIMEO_ID_RE = re.compile(r"player\.vimeo\.com/video/(\d+)", re.I)
def slugify(text: str) -> str:
"""Filesystem-safe slug for text lectures (no URL-derived filename)."""
text = text.strip().lower()
text = re.sub(r"[^\w\s-]", "", text, flags=re.UNICODE)
text = re.sub(r"[\s_-]+", "_", text)
return text[:80] or "untitled"
def derived_stem(filename: str) -> str:
"""
Stem used for transcript/srt/summary paths.
Strips the ' [Audio]' suffix used on curs master (aresens) filenames
so derived paths stay short and backward-compatible with M1-M6.
"""
return Path(filename).stem.replace(" [Audio]", "")
def classify_lesson(lesson_div, base_url: str) -> tuple[str, str, str]:
"""
Return (lecture_type, media_url_or_empty, filename_stem).
Types:
- ("audio", mp3_url, filename_from_url)
- ("vimeo", vimeo_url, "vimeo_<id>")
- ("pdf", pdf_url, filename_from_url) # only attachment is a PDF
- ("text", "", slug_from_title) # no media or PDF found
"""
audio_el = lesson_div.select_one("audio source")
if audio_el and audio_el.get("src", "").strip():
src = audio_el["src"].strip()
return "audio", urljoin(base_url, src), src.split("/")[-1].rsplit(".", 1)[0]
iframe_el = lesson_div.select_one("iframe")
if iframe_el:
src = (iframe_el.get("src") or iframe_el.get("data-src") or "").strip()
m = VIMEO_ID_RE.search(src)
if m:
vimeo_id = m.group(1)
# Canonical player URL works with yt-dlp + referer.
return "vimeo", f"https://player.vimeo.com/video/{vimeo_id}", f"vimeo_{vimeo_id}"
# PDF-only lecture: look for an attachment link ending in .pdf.
for a in lesson_div.select("a[href]"):
href = (a.get("href") or "").strip()
if href.lower().endswith(".pdf"):
pdf_url = urljoin(base_url, href)
# Stem from filename without extension, keep readable name.
stem = href.rsplit("/", 1)[-1].rsplit(".", 1)[0]
return "pdf", pdf_url, stem
return "text", "", "" # stem filled in by caller using title slug
def discover_lectures(session: requests.Session, module: dict, course: dict, mod_idx: int) -> list[dict]:
resp = session.get(module["url"])
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
paths = course_paths(course)
audio_mod_dir = paths["audio_dir"] / f"Modul {mod_idx}"
pdf_dir = paths["pdf_dir"]
lectures = []
for lesson_div in soup.select("div.lesson"):
name_el = lesson_div.select_one("div.module__name")
if not name_el:
continue
title = name_el.get_text(strip=True)
if not title:
continue
ltype, media_url, stem = classify_lesson(lesson_div, course["base_url"])
if ltype == "text":
stem = slugify(title)
# Capture the lesson body HTML (source for text -> transcript)
# so we don't have to re-request it later.
body_el = lesson_div.select_one("div.module__content") or lesson_div
lecture = {
"type": "text",
"title": title,
"original_filename": stem + ".txt",
"url": module["url"], # lesson is inline in module page
"audio_path": "", # no audio
"text_content": body_el.get_text("\n", strip=True),
}
elif ltype == "pdf":
# PDF source is kept (not extracted / not deleted). Lives flat in
# summaries/pdf/ — user reads PDFs directly, no whisper/no txt.
pdf_path = pdf_dir / f"{stem}.pdf"
lecture = {
"type": "pdf",
"title": title,
"original_filename": f"{stem}.pdf",
"url": media_url,
"audio_path": str(pdf_path),
}
elif ltype == "vimeo":
audio_path = audio_mod_dir / f"{stem}.mp3"
lecture = {
"type": "vimeo",
"title": title,
"original_filename": f"{stem}.mp3",
"url": media_url,
"audio_path": str(audio_path),
}
else: # "audio"
filename = media_url.split("/")[-1]
audio_path = audio_mod_dir / filename
lecture = {
"type": "audio",
"title": title,
"original_filename": filename,
"url": media_url,
"audio_path": str(audio_path),
}
lectures.append(lecture)
counts = {
"audio": sum(1 for L in lectures if L["type"] == "audio"),
"vimeo": sum(1 for L in lectures if L["type"] == "vimeo"),
"pdf": sum(1 for L in lectures if L["type"] == "pdf"),
"text": sum(1 for L in lectures if L["type"] == "text"),
}
log.info(
f" {module['name']}: {len(lectures)} lectures "
f"(audio={counts['audio']}, vimeo={counts['vimeo']}, "
f"pdf={counts['pdf']}, text={counts['text']})"
)
return lectures
def download_audio_http(session: requests.Session, url: str, dest: Path) -> bool:
"""HTTP stream download with retry. Returns True on success."""
for attempt in range(MAX_RETRIES):
try:
resp = session.get(url, stream=True, timeout=300)
resp.raise_for_status()
tmp = dest.with_suffix(dest.suffix + ".tmp")
total = 0
with open(tmp, "wb") as f:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
f.write(chunk)
total += len(chunk)
if total < 1_000_000:
log.warning(f"File too small ({total} bytes): {dest.name}")
tmp.unlink(missing_ok=True)
return False
tmp.rename(dest)
log.info(f" Downloaded: {dest.name} ({total / 1_000_000:.1f} MB)")
return True
except Exception as e:
wait = RETRY_BACKOFF[attempt] if attempt < len(RETRY_BACKOFF) else 30
log.warning(f" Attempt {attempt + 1}/{MAX_RETRIES} failed for {dest.name}: {e}")
if attempt < MAX_RETRIES - 1:
log.info(f" Retrying in {wait}s...")
time.sleep(wait)
log.error(f" FAILED after {MAX_RETRIES} attempts: {dest.name}")
return False
def download_vimeo_audio(vimeo_url: str, referer: str, dest: Path) -> bool:
"""
Download Vimeo audio-only stream via yt-dlp, output as MP3 96kbps.
No video bytes fetched (Vimeo HLS has separate audio tracks).
"""
try:
import yt_dlp
except ImportError:
log.error("yt-dlp not installed. Run: pip install yt-dlp")
return False
dest.parent.mkdir(parents=True, exist_ok=True)
# yt-dlp adds .mp3 extension after postprocessing; give it the stem.
outtmpl_stem = str(dest.with_suffix(""))
ydl_opts = {
"format": "bestaudio",
"outtmpl": outtmpl_stem + ".%(ext)s",
"http_headers": {"Referer": referer},
"quiet": True,
"no_warnings": True,
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "96",
}],
}
for attempt in range(MAX_RETRIES):
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([vimeo_url])
if dest.exists() and dest.stat().st_size > 100_000:
log.info(f" Downloaded (vimeo): {dest.name} ({dest.stat().st_size / 1_000_000:.1f} MB)")
return True
log.warning(f" yt-dlp produced no file or too small: {dest}")
except Exception as e:
wait = RETRY_BACKOFF[attempt] if attempt < len(RETRY_BACKOFF) else 30
log.warning(f" Vimeo attempt {attempt + 1}/{MAX_RETRIES} failed: {e}")
if attempt < MAX_RETRIES - 1:
log.info(f" Retrying in {wait}s...")
time.sleep(wait)
log.error(f" FAILED vimeo download after {MAX_RETRIES} attempts: {vimeo_url}")
return False
def capture_text_lecture(lecture: dict, transcripts_dir: Path) -> bool:
"""
Write the lecture's captured HTML text as a transcript .txt file.
Text lectures bypass whisper — content is final here.
"""
transcripts_dir.mkdir(parents=True, exist_ok=True)
stem = Path(lecture["original_filename"]).stem
txt_path = transcripts_dir / f"{stem}.txt"
text = lecture.get("text_content", "").strip()
if len(text) < 50:
log.warning(f" text lesson '{lecture['title']}' has <50 chars, skipping")
return False
header = f"[TEXT LECTURE — no audio]\nTitlu: {lecture['title']}\n\n"
txt_path.write_text(header + text, encoding="utf-8")
log.info(f" Captured (text): {txt_path.name} ({txt_path.stat().st_size} bytes)")
return True
def download_pdf(session: requests.Session, lecture: dict, pdf_path: Path) -> bool:
"""
Download PDF resource via authenticated session, save source file.
No text extraction (user reads PDFs directly — many are infographics).
"""
pdf_path.parent.mkdir(parents=True, exist_ok=True)
for attempt in range(MAX_RETRIES):
try:
resp = session.get(lecture["url"], stream=True, timeout=120)
resp.raise_for_status()
tmp = pdf_path.with_suffix(".pdf.tmp")
total = 0
with open(tmp, "wb") as f:
for chunk in resp.iter_content(chunk_size=256 * 1024):
f.write(chunk)
total += len(chunk)
if total < 1000:
log.warning(f" PDF too small ({total} bytes): {pdf_path.name}")
tmp.unlink(missing_ok=True)
return False
tmp.rename(pdf_path)
log.info(f" Downloaded (pdf): {pdf_path.name} ({total / 1024:.0f} KB)")
return True
except Exception as e:
wait = RETRY_BACKOFF[attempt] if attempt < len(RETRY_BACKOFF) else 30
log.warning(f" PDF attempt {attempt + 1}/{MAX_RETRIES} failed: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(wait)
log.error(f" FAILED PDF download: {lecture['url']}")
return False
def load_manifest(manifest_path: Path) -> dict | None:
if manifest_path.exists():
with open(manifest_path, encoding="utf-8") as f:
return json.load(f)
return None
def save_manifest(manifest: dict, manifest_path: Path):
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2, ensure_ascii=False)
def parse_args():
p = argparse.ArgumentParser(description="Download lecture media for a course")
p.add_argument("--course", default="master", help="Course key (see courses.py)")
p.add_argument("--modules", default=None, help="Module filter, e.g. '1-3' or '2,4,5'")
return p.parse_args()
def main():
args = parse_args()
course = get_course(args.course)
paths = course_paths(course)
load_dotenv()
email = os.getenv(course["env_user"], "")
password = os.getenv(course["env_pass"], "")
if not email or not password:
log.error(f"Set {course['env_user']} and {course['env_pass']} in .env")
sys.exit(1)
module_filter = parse_module_filter(args.modules) if args.modules else None
if module_filter:
log.info(f"Module filter: {sorted(module_filter)}")
paths["audio_dir"].mkdir(parents=True, exist_ok=True)
paths["transcripts_dir"].mkdir(parents=True, exist_ok=True)
# Validate existing manifest belongs to this course
existing = load_manifest(paths["manifest"])
if existing is not None:
validate_manifest_course(existing, course["key"])
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"})
log.info(f"Course: {course['key']} ({course['name']})")
log.info(f"Root: {paths['root']}")
log.info("Logging in...")
if not login(session, course, email, password):
log.error("Login failed. Check credentials in .env")
sys.exit(1)
log.info("Login successful")
modules = discover_modules(session, course)
# Start from existing manifest if present — preserves modules outside
# the current --modules filter, and preserves per-lecture state (e.g.
# transcribe_status) for modules in the filter.
if existing:
manifest = dict(existing)
manifest["course_key"] = course["key"]
manifest["course"] = course["name"]
manifest["source_url"] = course["course_url"]
if "modules" not in manifest:
manifest["modules"] = []
else:
manifest = {
"course_key": course["key"],
"course": course["name"],
"source_url": course["course_url"],
"modules": [],
}
# Index of existing modules by name for in-place replacement.
existing_by_name = {m["name"]: i for i, m in enumerate(manifest["modules"])}
# Prior lecture state (by title) for preserving transcribe_status.
prior_lecture_state: dict[str, dict] = {
lec["title"]: lec
for m in manifest["modules"]
for lec in m.get("lectures", [])
}
total = 0
downloaded = 0
skipped = 0
failed = 0
for mod_idx, mod in enumerate(modules, 1):
if module_filter and mod_idx not in module_filter:
log.info(f" Skipping module {mod_idx}: {mod['name']} (outside --modules filter, preserved in manifest)")
continue
lectures = discover_lectures(session, mod, course, mod_idx)
module_entry = {
"name": mod["name"],
"module_id": mod["module_id"],
"lectures": [],
}
for lec in lectures:
total += 1
stem = derived_stem(lec["original_filename"])
prior = prior_lecture_state.get(lec["title"], {})
entry = {
"type": lec["type"],
"title": lec["title"],
"original_filename": lec["original_filename"],
"url": lec["url"],
"audio_path": lec["audio_path"],
"transcript_path": f"{paths['transcripts_dir'].as_posix()}/{stem}.txt",
"srt_path": f"{paths['transcripts_dir'].as_posix()}/{stem}.srt",
"summary_path": f"{paths['summaries_dir'].as_posix()}/{stem}_summary.md",
"download_status": "pending",
# Preserve transcribe_status from prior run (disk check in transcribe.py will correct it if needed).
"transcribe_status": prior.get("transcribe_status", "pending"),
"file_size_bytes": 0,
}
if lec["type"] == "text":
# Captured directly; treated as already-transcribed.
txt_path = Path(entry["transcript_path"])
if txt_path.exists() and txt_path.stat().st_size > 50:
entry["download_status"] = "complete"
entry["transcribe_status"] = "complete"
skipped += 1
log.info(f" Skipping text (exists): {txt_path.name}")
elif capture_text_lecture(lec, paths["transcripts_dir"]):
entry["download_status"] = "complete"
entry["transcribe_status"] = "complete"
entry["file_size_bytes"] = txt_path.stat().st_size
downloaded += 1
else:
entry["download_status"] = "failed"
failed += 1
elif lec["type"] == "pdf":
# PDF -> download source to summaries/pdf/, keep as-is. No
# transcript, no whisper. User reads PDFs directly.
pdf_path = Path(lec["audio_path"]) # now points to pdf_dir
if pdf_path.exists() and pdf_path.stat().st_size > 1000:
entry["download_status"] = "complete"
entry["transcribe_status"] = "pdf_source_only"
entry["file_size_bytes"] = pdf_path.stat().st_size
skipped += 1
log.info(f" Skipping pdf (source exists): {pdf_path.name}")
elif download_pdf(session, lec, pdf_path):
entry["download_status"] = "complete"
entry["transcribe_status"] = "pdf_source_only"
entry["file_size_bytes"] = pdf_path.stat().st_size
downloaded += 1
else:
entry["download_status"] = "failed"
failed += 1
else:
dest = Path(lec["audio_path"])
if dest.exists() and dest.stat().st_size > 1_000_000:
entry["download_status"] = "complete"
entry["file_size_bytes"] = dest.stat().st_size
skipped += 1
log.info(f" Skipping (exists): {dest.name}")
else:
if lec["type"] == "audio":
ok = download_audio_http(session, lec["url"], dest)
else: # "vimeo"
ok = download_vimeo_audio(lec["url"], course["base_url"] + "/", dest)
if ok:
entry["download_status"] = "complete"
entry["file_size_bytes"] = dest.stat().st_size
downloaded += 1
else:
entry["download_status"] = "failed"
failed += 1
module_entry["lectures"].append(entry)
# Replace or append module in manifest (preserves order for existing, appends new at end).
if mod["name"] in existing_by_name:
manifest["modules"][existing_by_name[mod["name"]]] = module_entry
else:
manifest["modules"].append(module_entry)
save_manifest(manifest, paths["manifest"])
log.info("=" * 60)
log.info(f"Downloaded {downloaded}/{total} items, {skipped} skipped, {failed} failures.")
log.info("=" * 60)
if failed:
sys.exit(1)
if __name__ == "__main__":
main()