""" Download all audio files from cursuri.aresens.ro NLP Master course. Logs in, discovers modules and lectures, downloads MP3s, writes manifest.json. Resumable: skips already-downloaded files. """ import json import logging import os import sys import time from pathlib import Path from urllib.parse import urljoin import requests from bs4 import BeautifulSoup from dotenv import load_dotenv BASE_URL = "https://cursuri.aresens.ro" COURSE_URL = f"{BASE_URL}/curs/26" LOGIN_URL = f"{BASE_URL}/login" AUDIO_DIR = Path("audio") MANIFEST_PATH = Path("manifest.json") MAX_RETRIES = 3 RETRY_BACKOFF = [5, 15, 30] logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler("download_errors.log"), ], ) log = logging.getLogger(__name__) def login(session: requests.Session, email: str, password: str) -> bool: """Login and return True on success.""" resp = session.post(LOGIN_URL, data={ "email": email, "password": password, "act": "login", "remember": "on", }, allow_redirects=True) # Successful login redirects to the course page, not back to /login if "/login" in resp.url or "loginform" in resp.text: return False return True def parse_module_filter(arg: str) -> set[int]: """Parse module filter like '1-3' or '4,5' or '1-3,5' into a set of 1-based indices.""" result = set() for part in arg.split(","): part = part.strip() if "-" in part: a, b = part.split("-", 1) result.update(range(int(a), int(b) + 1)) else: result.add(int(part)) return result def discover_modules(session: requests.Session) -> list[dict]: """Fetch course page and return list of {name, url, module_id}.""" resp = session.get(COURSE_URL) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") modules = [] for div in soup.select("div.module"): number_el = div.select_one("div.module__number") link_el = div.select_one("a.btn") if not number_el or not link_el: continue href = link_el.get("href", "") module_id = href.rstrip("/").split("/")[-1] modules.append({ "name": number_el.get_text(strip=True), "url": urljoin(BASE_URL, href), "module_id": module_id, }) log.info(f"Found {len(modules)} modules") return modules def discover_lectures(session: requests.Session, module: dict) -> list[dict]: """Fetch a module page and return list of lectures with audio URLs.""" resp = session.get(module["url"]) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") lectures = [] for lesson_div in soup.select("div.lesson"): name_el = lesson_div.select_one("div.module__name") source_el = lesson_div.select_one("audio source") if not name_el or not source_el: continue src = source_el.get("src", "").strip() if not src: continue audio_url = urljoin(BASE_URL, src) filename = src.split("/")[-1] title = name_el.get_text(strip=True) lectures.append({ "title": title, "original_filename": filename, "url": audio_url, "audio_path": str(AUDIO_DIR / filename), }) log.info(f" {module['name']}: {len(lectures)} lectures") return lectures def download_file(session: requests.Session, url: str, dest: Path) -> bool: """Download a file with retry logic. Returns True on success.""" for attempt in range(MAX_RETRIES): try: resp = session.get(url, stream=True, timeout=300) resp.raise_for_status() # Write to temp file first, then rename (atomic) tmp = dest.with_suffix(".tmp") total = 0 with open(tmp, "wb") as f: for chunk in resp.iter_content(chunk_size=1024 * 1024): f.write(chunk) total += len(chunk) if total < 1_000_000: # < 1MB is suspicious log.warning(f"File too small ({total} bytes): {dest.name}") tmp.unlink(missing_ok=True) return False tmp.rename(dest) log.info(f" Downloaded: {dest.name} ({total / 1_000_000:.1f} MB)") return True except Exception as e: wait = RETRY_BACKOFF[attempt] if attempt < len(RETRY_BACKOFF) else 30 log.warning(f" Attempt {attempt + 1}/{MAX_RETRIES} failed for {dest.name}: {e}") if attempt < MAX_RETRIES - 1: log.info(f" Retrying in {wait}s...") time.sleep(wait) log.error(f" FAILED after {MAX_RETRIES} attempts: {dest.name}") return False def load_manifest() -> dict | None: """Load existing manifest if present.""" if MANIFEST_PATH.exists(): with open(MANIFEST_PATH) as f: return json.load(f) return None def save_manifest(manifest: dict): """Write manifest.json.""" with open(MANIFEST_PATH, "w", encoding="utf-8") as f: json.dump(manifest, f, indent=2, ensure_ascii=False) def main(): load_dotenv() email = os.getenv("COURSE_USERNAME", "") password = os.getenv("COURSE_PASSWORD", "") if not email or not password: log.error("Set COURSE_USERNAME and COURSE_PASSWORD in .env") sys.exit(1) # Parse --modules filter (e.g. "4-5" or "1,3,5") module_filter = None if "--modules" in sys.argv: idx = sys.argv.index("--modules") if idx + 1 < len(sys.argv): module_filter = parse_module_filter(sys.argv[idx + 1]) log.info(f"Module filter: {sorted(module_filter)}") AUDIO_DIR.mkdir(exist_ok=True) session = requests.Session() session.headers.update({"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}) log.info("Logging in...") if not login(session, email, password): log.error("Login failed. Check credentials in .env") sys.exit(1) log.info("Login successful") # Discover structure modules = discover_modules(session) if not modules: log.error("No modules found") sys.exit(1) manifest = { "course": "NLP Master Practitioner Bucuresti 2025", "source_url": COURSE_URL, "modules": [], } total_files = 0 downloaded = 0 skipped = 0 failed = 0 for mod_idx, mod in enumerate(modules, 1): if module_filter and mod_idx not in module_filter: log.info(f" Skipping module {mod_idx}: {mod['name']}") continue lectures = discover_lectures(session, mod) module_entry = { "name": mod["name"], "module_id": mod["module_id"], "lectures": [], } for lec in lectures: total_files += 1 dest = Path(lec["audio_path"]) stem = dest.stem.replace(" [Audio]", "") lecture_entry = { "title": lec["title"], "original_filename": lec["original_filename"], "url": lec["url"], "audio_path": lec["audio_path"], "transcript_path": f"transcripts/{stem}.txt", "srt_path": f"transcripts/{stem}.srt", "summary_path": f"summaries/{stem}_summary.md", "download_status": "pending", "transcribe_status": "pending", "file_size_bytes": 0, } # Skip if already downloaded if dest.exists() and dest.stat().st_size > 1_000_000: lecture_entry["download_status"] = "complete" lecture_entry["file_size_bytes"] = dest.stat().st_size skipped += 1 log.info(f" Skipping (exists): {dest.name}") else: if download_file(session, lec["url"], dest): lecture_entry["download_status"] = "complete" lecture_entry["file_size_bytes"] = dest.stat().st_size downloaded += 1 else: lecture_entry["download_status"] = "failed" failed += 1 module_entry["lectures"].append(lecture_entry) manifest["modules"].append(module_entry) # Save manifest after each module (checkpoint) save_manifest(manifest) # Final validation all_ok = all( Path(lec["audio_path"]).exists() and Path(lec["audio_path"]).stat().st_size > 1_000_000 for mod in manifest["modules"] for lec in mod["lectures"] if lec["download_status"] == "complete" ) log.info("=" * 60) log.info(f"Downloaded {downloaded}/{total_files} files, {skipped} skipped, {failed} failures.") log.info(f"All files > 1MB: {'PASS' if all_ok else 'FAIL'}") log.info("=" * 60) if failed: sys.exit(1) if __name__ == "__main__": main()