Implements the approved plan to replace the broken regex/index-master extraction with an LLM-subagent pipeline. Four parallel lanes: Lane A — scripts/extract_common.py (PDF/docx/doc/pptx/html/zip, no max_pages truncation), normalize_sources.py, chunk_sources.py (~20pg chunks + overlap, manifest registry), activity_schema.json. Lane B — app/config_taxonomy.py (16 fixed category slugs), schema rebuilt from scratch in app/models/ with content_type, language, source_files, source_excerpt, normalized_name, extraction_confidence, needs_review; FTS5 + 3 triggers extended with materials_list and skills_developed. Lane C — build_database.py (--rebuild, atomic swap, schema + fuzzy source_excerpt validation, dedup with needs_review band), validate_extractions.py, review_queue.py, new run_extraction.py orchestrator, SUBAGENT_PROMPT.md. Lane D — search.py content_type/language filters (default search excludes non-game content), E7 schema-compat audit; fixed a NULL keywords AttributeError in _boost_search_relevance. Removes 8 orphaned/dead scripts and app/services/parser.py + indexer.py. Adds tests/ (70 passing, 1 skipped — libreoffice absent). Note: Lane D made one additive edit to app/models/database.py (_update_category_counts) to surface content_type/language in get_filter_options, outside its nominal lane boundary but after Lane B completed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
362 lines
12 KiB
Python
362 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
extract_common.py — single home for per-format text extraction.
|
|
|
|
Every extractor returns a plain text *body* with synthetic page markers
|
|
(`--- PAGE N ---`). The file-level header (`SOURCE:` / `CONVERTED:`) is added
|
|
by normalize_sources.py, not here.
|
|
|
|
Critical fix vs. the old pdf_to_text_converter.py: there is NO `max_pages` cap.
|
|
Large books are extracted in full.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import importlib
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import zipfile
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
PAGE_MARKER_RE = re.compile(r"^--- PAGE (\d+) ---\s*$", re.MULTILINE)
|
|
|
|
# paragraphs per synthetic page for paginated-by-flow formats (docx)
|
|
DOCX_PARAS_PER_PAGE = 40
|
|
|
|
# formats we deliberately ignore (epub duplicates existing PDFs — plan §1)
|
|
IGNORED_EXTENSIONS = {".epub"}
|
|
|
|
# obvious junk filenames skipped during a walk
|
|
JUNK_NAMES = {"desktop.ini", "linkuri-jocuri.txt"}
|
|
JUNK_SUFFIXES = {".bak", ".tmp", ".ini"}
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# page assembly helpers
|
|
# --------------------------------------------------------------------------
|
|
def join_pages(pages: list[str], start: int = 1) -> str:
|
|
"""Join a list of page texts into a body string with `--- PAGE N ---`."""
|
|
out: list[str] = []
|
|
for i, text in enumerate(pages, start):
|
|
out.append(f"\n--- PAGE {i} ---\n{(text or '').strip()}\n")
|
|
return "".join(out)
|
|
|
|
|
|
def split_pages(body: str) -> list[tuple[int, str]]:
|
|
"""Inverse of join_pages: parse a body into [(page_number, text), ...]."""
|
|
matches = list(PAGE_MARKER_RE.finditer(body))
|
|
if not matches:
|
|
return []
|
|
pages: list[tuple[int, str]] = []
|
|
for idx, m in enumerate(matches):
|
|
num = int(m.group(1))
|
|
seg_start = m.end()
|
|
seg_end = matches[idx + 1].start() if idx + 1 < len(matches) else len(body)
|
|
pages.append((num, body[seg_start:seg_end].strip()))
|
|
return pages
|
|
|
|
|
|
def count_page_markers(body: str) -> int:
|
|
return len(PAGE_MARKER_RE.findall(body))
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# format detection
|
|
# --------------------------------------------------------------------------
|
|
FORMAT_BY_EXT = {
|
|
".pdf": "pdf",
|
|
".docx": "docx",
|
|
".doc": "doc",
|
|
".pptx": "pptx",
|
|
".ppt": "pptx",
|
|
".htm": "html",
|
|
".html": "html",
|
|
".zip": "zip",
|
|
".epub": "epub",
|
|
".txt": "txt",
|
|
}
|
|
|
|
|
|
def detect_format(path: str | os.PathLike) -> str:
|
|
"""Return a format key for a path based on its extension."""
|
|
ext = Path(path).suffix.lower()
|
|
return FORMAT_BY_EXT.get(ext, "unknown")
|
|
|
|
|
|
def is_junk(path: str | os.PathLike) -> bool:
|
|
p = Path(path)
|
|
name = p.name.lower()
|
|
if name in JUNK_NAMES:
|
|
return True
|
|
if name.startswith("readme") and p.suffix.lower() == ".md":
|
|
return True
|
|
if p.suffix.lower() in JUNK_SUFFIXES:
|
|
return True
|
|
return False
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# content hashing + near-duplicate elimination
|
|
# --------------------------------------------------------------------------
|
|
def _normalize_for_hash(text: str) -> str:
|
|
return re.sub(r"\s+", " ", (text or "")).strip().lower()
|
|
|
|
|
|
def content_hash(text: str) -> str:
|
|
"""Stable SHA1 of whitespace-normalized text — used for exact-dup detection."""
|
|
return hashlib.sha1(_normalize_for_hash(text).encode("utf-8")).hexdigest()
|
|
|
|
|
|
def near_duplicate_ratio(a: str, b: str) -> float:
|
|
"""Similarity score in [0, 100] between two texts (rapidfuzz token ratio)."""
|
|
from rapidfuzz import fuzz
|
|
|
|
return fuzz.token_sort_ratio(_normalize_for_hash(a), _normalize_for_hash(b))
|
|
|
|
|
|
def dedupe_texts(
|
|
items: list[tuple[str, str]], threshold: float = 95.0
|
|
) -> list[tuple[str, str]]:
|
|
"""
|
|
Drop exact and near-duplicate texts from a list of (key, text) pairs.
|
|
|
|
Used for HTML mirror pages (print copies, repeated index/footer pages).
|
|
Keeps the first occurrence; O(n) on exact hash, O(n*k) fuzzy only against
|
|
already-kept items.
|
|
"""
|
|
kept: list[tuple[str, str]] = []
|
|
seen_hashes: set[str] = set()
|
|
for key, text in items:
|
|
h = content_hash(text)
|
|
if h in seen_hashes:
|
|
continue
|
|
if any(near_duplicate_ratio(text, kt) >= threshold for _, kt in kept):
|
|
continue
|
|
seen_hashes.add(h)
|
|
kept.append((key, text))
|
|
return kept
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# preflight dependency check
|
|
# --------------------------------------------------------------------------
|
|
REQUIRED_PYTHON_MODULES = {
|
|
"pdfplumber": "pdfplumber",
|
|
"PyPDF2": "pypdf2",
|
|
"docx": "python-docx",
|
|
"pptx": "python-pptx",
|
|
"bs4": "beautifulsoup4",
|
|
"lxml": "lxml",
|
|
"jsonschema": "jsonschema",
|
|
"rapidfuzz": "rapidfuzz",
|
|
"chardet": "chardet",
|
|
}
|
|
|
|
|
|
def preflight(check_ocr: bool = False) -> dict:
|
|
"""
|
|
Check system + Python dependencies before a long normalization run.
|
|
|
|
Returns {'ok': bool, 'missing_python': [...], 'missing_system': [...],
|
|
'warnings': [...]}. libreoffice is a *warning* (only .doc needs it),
|
|
tesseract only checked when check_ocr=True.
|
|
"""
|
|
missing_python: list[str] = []
|
|
for module, pip_name in REQUIRED_PYTHON_MODULES.items():
|
|
try:
|
|
importlib.import_module(module)
|
|
except ImportError:
|
|
missing_python.append(pip_name)
|
|
|
|
warnings: list[str] = []
|
|
missing_system: list[str] = []
|
|
|
|
if not (shutil.which("libreoffice") or shutil.which("soffice")):
|
|
warnings.append("libreoffice not found — legacy .doc files cannot be converted")
|
|
|
|
if check_ocr and not shutil.which("tesseract"):
|
|
missing_system.append("tesseract (OCR requested but not installed)")
|
|
|
|
return {
|
|
"ok": not missing_python and not missing_system,
|
|
"missing_python": missing_python,
|
|
"missing_system": missing_system,
|
|
"warnings": warnings,
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# per-format extractors
|
|
# --------------------------------------------------------------------------
|
|
def extract_pdf(path: str | os.PathLike) -> str:
|
|
"""PDF → body. pdfplumber primary, PyPDF2 fallback. No page cap."""
|
|
path = str(path)
|
|
try:
|
|
return _extract_pdf_pdfplumber(path)
|
|
except Exception:
|
|
return _extract_pdf_pypdf2(path)
|
|
|
|
|
|
def _extract_pdf_pdfplumber(path: str) -> str:
|
|
import pdfplumber
|
|
|
|
pages: list[str] = []
|
|
with pdfplumber.open(path) as pdf:
|
|
for page in pdf.pages: # ALL pages — no max_pages
|
|
try:
|
|
pages.append(page.extract_text() or "")
|
|
except Exception:
|
|
pages.append("")
|
|
return join_pages(pages)
|
|
|
|
|
|
def _extract_pdf_pypdf2(path: str) -> str:
|
|
import PyPDF2
|
|
|
|
pages: list[str] = []
|
|
with open(path, "rb") as fh:
|
|
reader = PyPDF2.PdfReader(fh)
|
|
for page in reader.pages: # ALL pages — no max_pages
|
|
try:
|
|
pages.append(page.extract_text() or "")
|
|
except Exception:
|
|
pages.append("")
|
|
return join_pages(pages)
|
|
|
|
|
|
def extract_docx(path: str | os.PathLike) -> str:
|
|
"""docx → body. Synthetic page marker every DOCX_PARAS_PER_PAGE paragraphs."""
|
|
import docx
|
|
|
|
document = docx.Document(str(path))
|
|
paragraphs = [p.text for p in document.paragraphs]
|
|
pages: list[str] = []
|
|
for i in range(0, max(len(paragraphs), 1), DOCX_PARAS_PER_PAGE):
|
|
chunk = paragraphs[i : i + DOCX_PARAS_PER_PAGE]
|
|
pages.append("\n".join(chunk))
|
|
return join_pages(pages)
|
|
|
|
|
|
def extract_doc(path: str | os.PathLike) -> str:
|
|
"""
|
|
Legacy .doc → body via `libreoffice --headless --convert-to docx`.
|
|
|
|
Raises RuntimeError if libreoffice is unavailable — the caller marks the
|
|
resulting source `needs_review` regardless (conversion is imperfect).
|
|
"""
|
|
soffice = shutil.which("libreoffice") or shutil.which("soffice")
|
|
if not soffice:
|
|
raise RuntimeError("libreoffice/soffice not available — cannot convert .doc")
|
|
|
|
src = Path(path).resolve()
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
subprocess.run(
|
|
[soffice, "--headless", "--convert-to", "docx", "--outdir", tmp, str(src)],
|
|
check=True,
|
|
capture_output=True,
|
|
timeout=300,
|
|
)
|
|
converted = Path(tmp) / (src.stem + ".docx")
|
|
if not converted.exists():
|
|
raise RuntimeError(f"libreoffice produced no output for {src.name}")
|
|
return extract_docx(converted)
|
|
|
|
|
|
def extract_pptx(path: str | os.PathLike) -> str:
|
|
"""pptx → body. One page per slide: title + body text + speaker notes."""
|
|
from pptx import Presentation
|
|
|
|
presentation = Presentation(str(path))
|
|
pages: list[str] = []
|
|
for slide in presentation.slides:
|
|
parts: list[str] = []
|
|
for shape in slide.shapes:
|
|
if shape.has_text_frame and shape.text_frame.text.strip():
|
|
parts.append(shape.text_frame.text.strip())
|
|
if slide.has_notes_slide:
|
|
notes = slide.notes_slide.notes_text_frame.text.strip()
|
|
if notes:
|
|
parts.append(f"[NOTES] {notes}")
|
|
pages.append("\n".join(parts))
|
|
return join_pages(pages)
|
|
|
|
|
|
def extract_html(path: str | os.PathLike) -> str:
|
|
"""HTML mirror page → body. Strips nav/script/style/footer/header/aside."""
|
|
import chardet
|
|
from bs4 import BeautifulSoup
|
|
|
|
raw = Path(path).read_bytes()
|
|
enc = chardet.detect(raw).get("encoding") or "utf-8"
|
|
soup = BeautifulSoup(raw.decode(enc, errors="replace"), "lxml")
|
|
|
|
for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript"]):
|
|
tag.decompose()
|
|
# also drop common chrome by role/class
|
|
for tag in soup.find_all(attrs={"role": ["navigation", "banner", "contentinfo"]}):
|
|
tag.decompose()
|
|
|
|
text = soup.get_text(separator="\n")
|
|
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
|
|
return join_pages(["\n".join(lines)])
|
|
|
|
|
|
def extract_zip(path: str | os.PathLike) -> str:
|
|
"""
|
|
zip → body. Unzips into a temp dir and recurses on every extractable inner
|
|
file. Inner files are page-renumbered into one continuous body.
|
|
"""
|
|
path = str(path)
|
|
pages: list[str] = []
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
try:
|
|
with zipfile.ZipFile(path) as zf:
|
|
zf.extractall(tmp)
|
|
except zipfile.BadZipFile:
|
|
return ""
|
|
for inner in sorted(Path(tmp).rglob("*")):
|
|
if not inner.is_file() or is_junk(inner):
|
|
continue
|
|
fmt = detect_format(inner)
|
|
if fmt in ("unknown", "epub", "zip"):
|
|
# nested zips handled by recursion below
|
|
if fmt == "zip":
|
|
body = extract_zip(inner)
|
|
pages.extend(t for _, t in split_pages(body))
|
|
continue
|
|
try:
|
|
body = extract_file(inner)
|
|
except Exception:
|
|
continue
|
|
pages.extend(t for _, t in split_pages(body))
|
|
return join_pages(pages)
|
|
|
|
|
|
EXTRACTORS: dict[str, Callable[[str | os.PathLike], str]] = {
|
|
"pdf": extract_pdf,
|
|
"docx": extract_docx,
|
|
"doc": extract_doc,
|
|
"pptx": extract_pptx,
|
|
"html": extract_html,
|
|
"zip": extract_zip,
|
|
}
|
|
|
|
|
|
def extract_file(path: str | os.PathLike) -> str:
|
|
"""Dispatch a single file to the right extractor. Returns a page-marked body."""
|
|
fmt = detect_format(path)
|
|
if fmt == "txt":
|
|
body = Path(path).read_text(encoding="utf-8", errors="replace")
|
|
# already paginated? pass through; else wrap as one page
|
|
return body if count_page_markers(body) else join_pages([body])
|
|
extractor = EXTRACTORS.get(fmt)
|
|
if extractor is None:
|
|
raise ValueError(f"No extractor for format '{fmt}': {path}")
|
|
return extractor(path)
|