#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ normalize_sources.py — walk data/carti-camp-jocuri/ and write data/sources/.txt. Output files keep the existing header format: SOURCE: CONVERTED: FORMAT: NEEDS_REVIEW: (optional — legacy .doc conversions) ================================================== --- PAGE 1 --- ... Each source gets a stable id = <8-hex hash of relative path>_, so two files with the same name in different folders never collide. The pipeline is script-only: this normalizes formats, it does NOT run extraction. Run `--check-deps` before a long job. """ from __future__ import annotations import argparse import datetime as _dt import hashlib import re import sys from pathlib import Path SCRIPT_DIR = Path(__file__).resolve().parent if str(SCRIPT_DIR) not in sys.path: sys.path.insert(0, str(SCRIPT_DIR)) from extract_common import ( # noqa: E402 count_page_markers, dedupe_texts, detect_format, extract_file, extract_html, is_junk, join_pages, preflight, split_pages, ) HEADER_RULE = "=" * 50 # -------------------------------------------------------------------------- # stable source id # -------------------------------------------------------------------------- def sanitize_stem(stem: str) -> str: s = re.sub(r"[^\w]+", "_", stem, flags=re.UNICODE).strip("_").lower() return s[:60] or "source" def stable_id(relative_path: str | Path) -> str: """Collision-proof id derived from the path relative to the corpus root.""" rel = str(relative_path).replace("\\", "/") digest = hashlib.sha1(rel.encode("utf-8")).hexdigest()[:8] stem = sanitize_stem(Path(rel).stem) return f"{digest}_{stem}" # -------------------------------------------------------------------------- # header # -------------------------------------------------------------------------- def build_header( source_rel: str, fmt: str, needs_review: str | None = None ) -> str: today = _dt.date.today().isoformat() lines = [ f"SOURCE: {source_rel}", f"CONVERTED: {today}", f"FORMAT: {fmt}", ] if needs_review: lines.append(f"NEEDS_REVIEW: {needs_review}") lines.append(HEADER_RULE) return "\n".join(lines) + "\n\n" # -------------------------------------------------------------------------- # mirror-site directories # -------------------------------------------------------------------------- MIRROR_PAGE_EXTS = {".html", ".htm"} def is_mirror_dir(path: Path) -> bool: """A directory counts as a site mirror if it contains HTML pages.""" if not path.is_dir(): return False if path.name.endswith("_files"): return False return any( p.suffix.lower() in MIRROR_PAGE_EXTS for p in path.rglob("*") if p.is_file() ) def normalize_mirror(mirror_dir: Path) -> str: """Extract every HTML page in a mirror dir, dedupe near-duplicates, join.""" pages: list[tuple[str, str]] = [] for html in sorted(mirror_dir.rglob("*")): if not html.is_file() or html.suffix.lower() not in MIRROR_PAGE_EXTS: continue if "_files" in html.parts: continue try: body = extract_html(html) except Exception: continue text = "\n".join(t for _, t in split_pages(body)) if text.strip(): pages.append((str(html.relative_to(mirror_dir)), text)) pages = dedupe_texts(pages) return join_pages([t for _, t in pages]) # -------------------------------------------------------------------------- # one source # -------------------------------------------------------------------------- def normalize_one( path: Path, corpus_root: Path, out_dir: Path ) -> dict | None: """ Normalize a single file or mirror directory → data/sources/.txt. Returns a result dict, or None if the entry was skipped (junk / ignored). """ rel = path.relative_to(corpus_root) sid = stable_id(rel) if path.is_dir(): if not is_mirror_dir(path): return None fmt, needs_review = "html-mirror", None body = normalize_mirror(path) else: if is_junk(path): return None fmt = detect_format(path) if fmt in ("unknown", "epub", "txt"): return None # epub duplicates PDFs; txt is not a source format here needs_review = "legacy .doc conversion is imperfect" if fmt == "doc" else None try: body = extract_file(path) except Exception as exc: # noqa: BLE001 return {"id": sid, "source": str(rel), "status": "error", "error": str(exc)} if not body.strip(): return {"id": sid, "source": str(rel), "status": "empty"} out_path = out_dir / f"{sid}.txt" out_path.write_text(build_header(str(rel), fmt, needs_review) + body, encoding="utf-8") return { "id": sid, "source": str(rel), "status": "ok", "format": fmt, "pages": count_page_markers(body), "needs_review": bool(needs_review), } # -------------------------------------------------------------------------- # walk # -------------------------------------------------------------------------- def iter_corpus_entries(corpus_root: Path): """Yield top-level files and mirror directories under the corpus root.""" for entry in sorted(corpus_root.iterdir()): if entry.name.startswith("."): continue if entry.is_dir(): if is_mirror_dir(entry): yield entry else: yield entry def run(corpus_root: Path, out_dir: Path) -> dict: out_dir.mkdir(parents=True, exist_ok=True) results: list[dict] = [] for entry in iter_corpus_entries(corpus_root): res = normalize_one(entry, corpus_root, out_dir) if res is not None: results.append(res) summary = { "total": len(results), "ok": sum(1 for r in results if r["status"] == "ok"), "errors": sum(1 for r in results if r["status"] == "error"), "empty": sum(1 for r in results if r["status"] == "empty"), "needs_review": sum(1 for r in results if r.get("needs_review")), "results": results, } return summary # -------------------------------------------------------------------------- # CLI # -------------------------------------------------------------------------- def print_preflight(report: dict) -> int: print("Dependency preflight") print("--------------------") if report["missing_python"]: print(" MISSING Python packages: " + ", ".join(report["missing_python"])) else: print(" Python packages: OK") if report["missing_system"]: print(" MISSING system tools : " + ", ".join(report["missing_system"])) for w in report["warnings"]: print(f" WARNING: {w}") print(" => " + ("READY" if report["ok"] else "NOT READY — install the above")) return 0 if report["ok"] else 1 def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Normalize mixed sources to .txt") parser.add_argument("--corpus", default="data/carti-camp-jocuri", help="corpus root to walk") parser.add_argument("--out", default="data/sources", help="output directory") parser.add_argument("--check-deps", action="store_true", help="run dependency preflight and exit") parser.add_argument("--ocr", action="store_true", help="include OCR (tesseract) in the preflight check") args = parser.parse_args(argv) if args.check_deps: return print_preflight(preflight(check_ocr=args.ocr)) report = preflight(check_ocr=args.ocr) if report["missing_python"]: print_preflight(report) return 1 for w in report["warnings"]: print(f"WARNING: {w}") summary = run(Path(args.corpus), Path(args.out)) print(f"normalized : {summary['ok']}/{summary['total']}") print(f"errors : {summary['errors']}") print(f"empty : {summary['empty']}") print(f"needs_review: {summary['needs_review']}") for r in summary["results"]: if r["status"] != "ok": print(f" [{r['status']}] {r['source']}") return 0 if __name__ == "__main__": raise SystemExit(main())