Implements the approved plan to replace the broken regex/index-master extraction with an LLM-subagent pipeline. Four parallel lanes: Lane A — scripts/extract_common.py (PDF/docx/doc/pptx/html/zip, no max_pages truncation), normalize_sources.py, chunk_sources.py (~20pg chunks + overlap, manifest registry), activity_schema.json. Lane B — app/config_taxonomy.py (16 fixed category slugs), schema rebuilt from scratch in app/models/ with content_type, language, source_files, source_excerpt, normalized_name, extraction_confidence, needs_review; FTS5 + 3 triggers extended with materials_list and skills_developed. Lane C — build_database.py (--rebuild, atomic swap, schema + fuzzy source_excerpt validation, dedup with needs_review band), validate_extractions.py, review_queue.py, new run_extraction.py orchestrator, SUBAGENT_PROMPT.md. Lane D — search.py content_type/language filters (default search excludes non-game content), E7 schema-compat audit; fixed a NULL keywords AttributeError in _boost_search_relevance. Removes 8 orphaned/dead scripts and app/services/parser.py + indexer.py. Adds tests/ (70 passing, 1 skipped — libreoffice absent). Note: Lane D made one additive edit to app/models/database.py (_update_category_counts) to surface content_type/language in get_filter_options, outside its nominal lane boundary but after Lane B completed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
180 lines
6.4 KiB
Python
180 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
import_common.py — shared helpers for the import / validation side of the
|
|
extraction pipeline (Lane C).
|
|
|
|
Used by build_database.py and validate_extractions.py:
|
|
* JSON-schema validation of subagent extraction files,
|
|
* the anti-hallucination source_excerpt substring check (E5),
|
|
* locating the source chunk that an extraction file came from,
|
|
* the stable content key used by the needs_review queue.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import unicodedata
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
REPO_ROOT = SCRIPT_DIR.parent
|
|
|
|
DEFAULT_SCHEMA_PATH = SCRIPT_DIR / "activity_schema.json"
|
|
|
|
# rapidfuzz.partial_ratio is on a 0..100 scale; an excerpt counts as a real
|
|
# quote from the source when it scores at least this against the chunk text.
|
|
EXCERPT_MATCH_THRESHOLD = 90.0
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# schema validation
|
|
# --------------------------------------------------------------------------
|
|
def load_schema(schema_path: str | Path = DEFAULT_SCHEMA_PATH) -> dict:
|
|
"""Load the activity JSON schema produced by Lane A."""
|
|
return json.loads(Path(schema_path).read_text(encoding="utf-8"))
|
|
|
|
|
|
def validate_extraction(data: Any, schema: dict) -> list[str]:
|
|
"""
|
|
Validate one parsed extraction file against `schema`.
|
|
|
|
Returns a list of human-readable error strings; empty list == valid.
|
|
"""
|
|
import jsonschema
|
|
|
|
validator = jsonschema.Draft7Validator(schema)
|
|
errors: list[str] = []
|
|
for err in sorted(validator.iter_errors(data), key=lambda e: list(e.path)):
|
|
location = "/".join(str(p) for p in err.path) or "<root>"
|
|
errors.append(f"{location}: {err.message}")
|
|
return errors
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# excerpt verification (E5 — anti-hallucination)
|
|
# --------------------------------------------------------------------------
|
|
def _normalize_text(text: str) -> str:
|
|
return re.sub(r"\s+", " ", (text or "")).strip().lower()
|
|
|
|
|
|
def excerpt_score(excerpt: str, chunk_text: str) -> float:
|
|
"""Best fuzzy-substring score (0..100) of `excerpt` inside `chunk_text`."""
|
|
from rapidfuzz import fuzz
|
|
|
|
if not excerpt or not chunk_text:
|
|
return 0.0
|
|
return float(fuzz.partial_ratio(_normalize_text(excerpt), _normalize_text(chunk_text)))
|
|
|
|
|
|
def excerpt_matches(
|
|
excerpt: str, chunk_text: str, threshold: float = EXCERPT_MATCH_THRESHOLD
|
|
) -> bool:
|
|
"""True when `excerpt` appears (fuzzily) as a substring of `chunk_text`."""
|
|
return excerpt_score(excerpt, chunk_text) >= threshold
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# locating the source chunk an extraction file came from
|
|
# --------------------------------------------------------------------------
|
|
def chunk_key_for(json_path: Path, header: Optional[dict]) -> str:
|
|
"""
|
|
Resolve the chunk key for an extraction file.
|
|
|
|
Prefers the explicit `chunk_key` in the header, otherwise falls back to the
|
|
JSON file stem (extraction files are named `<chunk_key>.json`).
|
|
"""
|
|
if header and header.get("chunk_key"):
|
|
return str(header["chunk_key"])
|
|
return json_path.stem
|
|
|
|
|
|
def source_id_for(chunk_key: str, header: Optional[dict]) -> str:
|
|
"""Resolve the source id; `<source_id>.partNN` → `<source_id>`."""
|
|
if header and header.get("source_id"):
|
|
return str(header["source_id"])
|
|
# chunk keys look like "<source_id>.partNN"
|
|
return chunk_key.rsplit(".part", 1)[0]
|
|
|
|
|
|
def find_chunk_text(
|
|
json_path: Path, header: Optional[dict], chunks_dir: Path
|
|
) -> Optional[str]:
|
|
"""
|
|
Return the text of the source chunk for an extraction file, or None.
|
|
|
|
Looks for data/chunks/<source_id>/<chunk_key>.txt, then falls back to a
|
|
recursive glob on the chunk key.
|
|
"""
|
|
chunk_key = chunk_key_for(json_path, header)
|
|
source_id = source_id_for(chunk_key, header)
|
|
|
|
candidate = chunks_dir / source_id / f"{chunk_key}.txt"
|
|
if candidate.is_file():
|
|
return candidate.read_text(encoding="utf-8", errors="replace")
|
|
|
|
matches = list(chunks_dir.rglob(f"{chunk_key}.txt"))
|
|
if matches:
|
|
return matches[0].read_text(encoding="utf-8", errors="replace")
|
|
return None
|
|
|
|
|
|
def source_path_for(source_id: str, sources_dir: Path) -> Optional[str]:
|
|
"""
|
|
Read the original `SOURCE:` path from a normalized source header.
|
|
|
|
data/sources/<source_id>.txt starts with a `SOURCE: <relative path>` line.
|
|
"""
|
|
src_file = sources_dir / f"{source_id}.txt"
|
|
if not src_file.is_file():
|
|
return None
|
|
try:
|
|
with src_file.open(encoding="utf-8", errors="replace") as fh:
|
|
for line in fh:
|
|
if line.startswith("SOURCE:"):
|
|
return line.split(":", 1)[1].strip()
|
|
if line.startswith("=") or line.startswith("--- PAGE "):
|
|
break
|
|
except OSError:
|
|
return None
|
|
return None
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# stable content key for the needs_review queue (plan §5c)
|
|
# --------------------------------------------------------------------------
|
|
def normalize_name(name: str) -> str:
|
|
"""Diacritic-free, lowercased, whitespace-collapsed name (dedup key)."""
|
|
if not name:
|
|
return ""
|
|
decomposed = unicodedata.normalize("NFKD", name)
|
|
ascii_str = "".join(c for c in decomposed if not unicodedata.combining(c))
|
|
return re.sub(r"\s+", " ", ascii_str.lower().strip())
|
|
|
|
|
|
def content_key(normalized_name: str, language: Optional[str], description: str) -> str:
|
|
"""
|
|
Stable hash identifying a row for the review queue.
|
|
|
|
Only borderline-kept-separate rows and legacy `.doc` rows ever carry
|
|
needs_review, and neither is auto-merged — so their (normalized_name,
|
|
language, description) triple is stable across rebuilds.
|
|
"""
|
|
payload = f"{normalized_name}\x1f{language or ''}\x1f{_normalize_text(description)}"
|
|
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# iteration
|
|
# --------------------------------------------------------------------------
|
|
def iter_extraction_files(extracted_dir: Path):
|
|
"""Yield every *.json directly under `extracted_dir` (skips _rejected/)."""
|
|
if not extracted_dir.is_dir():
|
|
return
|
|
for path in sorted(extracted_dir.glob("*.json")):
|
|
if path.is_file():
|
|
yield path
|