#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ extract_common.py — single home for per-format text extraction. Every extractor returns a plain text *body* with synthetic page markers (`--- PAGE N ---`). The file-level header (`SOURCE:` / `CONVERTED:`) is added by normalize_sources.py, not here. Critical fix vs. the old pdf_to_text_converter.py: there is NO `max_pages` cap. Large books are extracted in full. """ from __future__ import annotations import hashlib import importlib import os import re import shutil import subprocess import tempfile import zipfile from pathlib import Path from typing import Callable PAGE_MARKER_RE = re.compile(r"^--- PAGE (\d+) ---\s*$", re.MULTILINE) # paragraphs per synthetic page for paginated-by-flow formats (docx) DOCX_PARAS_PER_PAGE = 40 # formats we deliberately ignore (epub duplicates existing PDFs — plan §1) IGNORED_EXTENSIONS = {".epub"} # obvious junk filenames skipped during a walk JUNK_NAMES = {"desktop.ini", "linkuri-jocuri.txt"} JUNK_SUFFIXES = {".bak", ".tmp", ".ini"} # -------------------------------------------------------------------------- # page assembly helpers # -------------------------------------------------------------------------- def join_pages(pages: list[str], start: int = 1) -> str: """Join a list of page texts into a body string with `--- PAGE N ---`.""" out: list[str] = [] for i, text in enumerate(pages, start): out.append(f"\n--- PAGE {i} ---\n{(text or '').strip()}\n") return "".join(out) def split_pages(body: str) -> list[tuple[int, str]]: """Inverse of join_pages: parse a body into [(page_number, text), ...].""" matches = list(PAGE_MARKER_RE.finditer(body)) if not matches: return [] pages: list[tuple[int, str]] = [] for idx, m in enumerate(matches): num = int(m.group(1)) seg_start = m.end() seg_end = matches[idx + 1].start() if idx + 1 < len(matches) else len(body) pages.append((num, body[seg_start:seg_end].strip())) return pages def count_page_markers(body: str) -> int: return len(PAGE_MARKER_RE.findall(body)) # -------------------------------------------------------------------------- # format detection # -------------------------------------------------------------------------- FORMAT_BY_EXT = { ".pdf": "pdf", ".docx": "docx", ".doc": "doc", ".pptx": "pptx", ".ppt": "pptx", ".htm": "html", ".html": "html", ".zip": "zip", ".epub": "epub", ".txt": "txt", } def detect_format(path: str | os.PathLike) -> str: """Return a format key for a path based on its extension.""" ext = Path(path).suffix.lower() return FORMAT_BY_EXT.get(ext, "unknown") def is_junk(path: str | os.PathLike) -> bool: p = Path(path) name = p.name.lower() if name in JUNK_NAMES: return True if name.startswith("readme") and p.suffix.lower() == ".md": return True if p.suffix.lower() in JUNK_SUFFIXES: return True return False # -------------------------------------------------------------------------- # content hashing + near-duplicate elimination # -------------------------------------------------------------------------- def _normalize_for_hash(text: str) -> str: return re.sub(r"\s+", " ", (text or "")).strip().lower() def content_hash(text: str) -> str: """Stable SHA1 of whitespace-normalized text — used for exact-dup detection.""" return hashlib.sha1(_normalize_for_hash(text).encode("utf-8")).hexdigest() def near_duplicate_ratio(a: str, b: str) -> float: """Similarity score in [0, 100] between two texts (rapidfuzz token ratio).""" from rapidfuzz import fuzz return fuzz.token_sort_ratio(_normalize_for_hash(a), _normalize_for_hash(b)) def dedupe_texts( items: list[tuple[str, str]], threshold: float = 95.0 ) -> list[tuple[str, str]]: """ Drop exact and near-duplicate texts from a list of (key, text) pairs. Used for HTML mirror pages (print copies, repeated index/footer pages). Keeps the first occurrence; O(n) on exact hash, O(n*k) fuzzy only against already-kept items. """ kept: list[tuple[str, str]] = [] seen_hashes: set[str] = set() for key, text in items: h = content_hash(text) if h in seen_hashes: continue if any(near_duplicate_ratio(text, kt) >= threshold for _, kt in kept): continue seen_hashes.add(h) kept.append((key, text)) return kept # -------------------------------------------------------------------------- # preflight dependency check # -------------------------------------------------------------------------- REQUIRED_PYTHON_MODULES = { "pdfplumber": "pdfplumber", "PyPDF2": "pypdf2", "docx": "python-docx", "pptx": "python-pptx", "bs4": "beautifulsoup4", "lxml": "lxml", "jsonschema": "jsonschema", "rapidfuzz": "rapidfuzz", "chardet": "chardet", } def preflight(check_ocr: bool = False) -> dict: """ Check system + Python dependencies before a long normalization run. Returns {'ok': bool, 'missing_python': [...], 'missing_system': [...], 'warnings': [...]}. libreoffice is a *warning* (only .doc needs it), tesseract only checked when check_ocr=True. """ missing_python: list[str] = [] for module, pip_name in REQUIRED_PYTHON_MODULES.items(): try: importlib.import_module(module) except ImportError: missing_python.append(pip_name) warnings: list[str] = [] missing_system: list[str] = [] if not (shutil.which("libreoffice") or shutil.which("soffice")): warnings.append("libreoffice not found — legacy .doc files cannot be converted") if check_ocr and not shutil.which("tesseract"): missing_system.append("tesseract (OCR requested but not installed)") return { "ok": not missing_python and not missing_system, "missing_python": missing_python, "missing_system": missing_system, "warnings": warnings, } # -------------------------------------------------------------------------- # per-format extractors # -------------------------------------------------------------------------- def extract_pdf(path: str | os.PathLike) -> str: """PDF → body. pdfplumber primary, PyPDF2 fallback. No page cap.""" path = str(path) try: return _extract_pdf_pdfplumber(path) except Exception: return _extract_pdf_pypdf2(path) def _extract_pdf_pdfplumber(path: str) -> str: import pdfplumber pages: list[str] = [] with pdfplumber.open(path) as pdf: for page in pdf.pages: # ALL pages — no max_pages try: pages.append(page.extract_text() or "") except Exception: pages.append("") return join_pages(pages) def _extract_pdf_pypdf2(path: str) -> str: import PyPDF2 pages: list[str] = [] with open(path, "rb") as fh: reader = PyPDF2.PdfReader(fh) for page in reader.pages: # ALL pages — no max_pages try: pages.append(page.extract_text() or "") except Exception: pages.append("") return join_pages(pages) def extract_docx(path: str | os.PathLike) -> str: """docx → body. Synthetic page marker every DOCX_PARAS_PER_PAGE paragraphs.""" import docx document = docx.Document(str(path)) paragraphs = [p.text for p in document.paragraphs] pages: list[str] = [] for i in range(0, max(len(paragraphs), 1), DOCX_PARAS_PER_PAGE): chunk = paragraphs[i : i + DOCX_PARAS_PER_PAGE] pages.append("\n".join(chunk)) return join_pages(pages) def extract_doc(path: str | os.PathLike) -> str: """ Legacy .doc → body via `libreoffice --headless --convert-to docx`. Raises RuntimeError if libreoffice is unavailable — the caller marks the resulting source `needs_review` regardless (conversion is imperfect). """ soffice = shutil.which("libreoffice") or shutil.which("soffice") if not soffice: raise RuntimeError("libreoffice/soffice not available — cannot convert .doc") src = Path(path).resolve() with tempfile.TemporaryDirectory() as tmp: subprocess.run( [soffice, "--headless", "--convert-to", "docx", "--outdir", tmp, str(src)], check=True, capture_output=True, timeout=300, ) converted = Path(tmp) / (src.stem + ".docx") if not converted.exists(): raise RuntimeError(f"libreoffice produced no output for {src.name}") return extract_docx(converted) def extract_pptx(path: str | os.PathLike) -> str: """pptx → body. One page per slide: title + body text + speaker notes.""" from pptx import Presentation presentation = Presentation(str(path)) pages: list[str] = [] for slide in presentation.slides: parts: list[str] = [] for shape in slide.shapes: if shape.has_text_frame and shape.text_frame.text.strip(): parts.append(shape.text_frame.text.strip()) if slide.has_notes_slide: notes = slide.notes_slide.notes_text_frame.text.strip() if notes: parts.append(f"[NOTES] {notes}") pages.append("\n".join(parts)) return join_pages(pages) def extract_html(path: str | os.PathLike) -> str: """HTML mirror page → body. Strips nav/script/style/footer/header/aside.""" import chardet from bs4 import BeautifulSoup raw = Path(path).read_bytes() enc = chardet.detect(raw).get("encoding") or "utf-8" soup = BeautifulSoup(raw.decode(enc, errors="replace"), "lxml") for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript"]): tag.decompose() # also drop common chrome by role/class for tag in soup.find_all(attrs={"role": ["navigation", "banner", "contentinfo"]}): tag.decompose() text = soup.get_text(separator="\n") lines = [ln.strip() for ln in text.splitlines() if ln.strip()] return join_pages(["\n".join(lines)]) def extract_zip(path: str | os.PathLike) -> str: """ zip → body. Unzips into a temp dir and recurses on every extractable inner file. Inner files are page-renumbered into one continuous body. """ path = str(path) pages: list[str] = [] with tempfile.TemporaryDirectory() as tmp: try: with zipfile.ZipFile(path) as zf: zf.extractall(tmp) except zipfile.BadZipFile: return "" for inner in sorted(Path(tmp).rglob("*")): if not inner.is_file() or is_junk(inner): continue fmt = detect_format(inner) if fmt in ("unknown", "epub", "zip"): # nested zips handled by recursion below if fmt == "zip": body = extract_zip(inner) pages.extend(t for _, t in split_pages(body)) continue try: body = extract_file(inner) except Exception: continue pages.extend(t for _, t in split_pages(body)) return join_pages(pages) EXTRACTORS: dict[str, Callable[[str | os.PathLike], str]] = { "pdf": extract_pdf, "docx": extract_docx, "doc": extract_doc, "pptx": extract_pptx, "html": extract_html, "zip": extract_zip, } def extract_file(path: str | os.PathLike) -> str: """Dispatch a single file to the right extractor. Returns a page-marked body.""" fmt = detect_format(path) if fmt == "txt": body = Path(path).read_text(encoding="utf-8", errors="replace") # already paginated? pass through; else wrap as one page return body if count_page_markers(body) else join_pages([body]) extractor = EXTRACTORS.get(fmt) if extractor is None: raise ValueError(f"No extractor for format '{fmt}': {path}") return extractor(path)