#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ import_common.py — shared helpers for the import / validation side of the extraction pipeline (Lane C). Used by build_database.py and validate_extractions.py: * JSON-schema validation of subagent extraction files, * the anti-hallucination source_excerpt substring check (E5), * locating the source chunk that an extraction file came from, * the stable content key used by the needs_review queue. """ from __future__ import annotations import hashlib import json import re import unicodedata from pathlib import Path from typing import Any, Optional SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = SCRIPT_DIR.parent DEFAULT_SCHEMA_PATH = SCRIPT_DIR / "activity_schema.json" # rapidfuzz.partial_ratio is on a 0..100 scale; an excerpt counts as a real # quote from the source when it scores at least this against the chunk text. EXCERPT_MATCH_THRESHOLD = 90.0 # -------------------------------------------------------------------------- # schema validation # -------------------------------------------------------------------------- def load_schema(schema_path: str | Path = DEFAULT_SCHEMA_PATH) -> dict: """Load the activity JSON schema produced by Lane A.""" return json.loads(Path(schema_path).read_text(encoding="utf-8")) def validate_extraction(data: Any, schema: dict) -> list[str]: """ Validate one parsed extraction file against `schema`. Returns a list of human-readable error strings; empty list == valid. """ import jsonschema validator = jsonschema.Draft7Validator(schema) errors: list[str] = [] for err in sorted(validator.iter_errors(data), key=lambda e: list(e.path)): location = "/".join(str(p) for p in err.path) or "" errors.append(f"{location}: {err.message}") return errors # -------------------------------------------------------------------------- # excerpt verification (E5 — anti-hallucination) # -------------------------------------------------------------------------- def _normalize_text(text: str) -> str: return re.sub(r"\s+", " ", (text or "")).strip().lower() def excerpt_score(excerpt: str, chunk_text: str) -> float: """Best fuzzy-substring score (0..100) of `excerpt` inside `chunk_text`.""" from rapidfuzz import fuzz if not excerpt or not chunk_text: return 0.0 return float(fuzz.partial_ratio(_normalize_text(excerpt), _normalize_text(chunk_text))) def excerpt_matches( excerpt: str, chunk_text: str, threshold: float = EXCERPT_MATCH_THRESHOLD ) -> bool: """True when `excerpt` appears (fuzzily) as a substring of `chunk_text`.""" return excerpt_score(excerpt, chunk_text) >= threshold # -------------------------------------------------------------------------- # locating the source chunk an extraction file came from # -------------------------------------------------------------------------- def chunk_key_for(json_path: Path, header: Optional[dict]) -> str: """ Resolve the chunk key for an extraction file. Prefers the explicit `chunk_key` in the header, otherwise falls back to the JSON file stem (extraction files are named `.json`). """ if header and header.get("chunk_key"): return str(header["chunk_key"]) return json_path.stem def source_id_for(chunk_key: str, header: Optional[dict]) -> str: """Resolve the source id; `.partNN` → ``.""" if header and header.get("source_id"): return str(header["source_id"]) # chunk keys look like ".partNN" return chunk_key.rsplit(".part", 1)[0] def find_chunk_text( json_path: Path, header: Optional[dict], chunks_dir: Path ) -> Optional[str]: """ Return the text of the source chunk for an extraction file, or None. Looks for data/chunks//.txt, then falls back to a recursive glob on the chunk key. """ chunk_key = chunk_key_for(json_path, header) source_id = source_id_for(chunk_key, header) candidate = chunks_dir / source_id / f"{chunk_key}.txt" if candidate.is_file(): return candidate.read_text(encoding="utf-8", errors="replace") matches = list(chunks_dir.rglob(f"{chunk_key}.txt")) if matches: return matches[0].read_text(encoding="utf-8", errors="replace") return None def source_path_for(source_id: str, sources_dir: Path) -> Optional[str]: """ Read the original `SOURCE:` path from a normalized source header. data/sources/.txt starts with a `SOURCE: ` line. """ src_file = sources_dir / f"{source_id}.txt" if not src_file.is_file(): return None try: with src_file.open(encoding="utf-8", errors="replace") as fh: for line in fh: if line.startswith("SOURCE:"): return line.split(":", 1)[1].strip() if line.startswith("=") or line.startswith("--- PAGE "): break except OSError: return None return None # -------------------------------------------------------------------------- # stable content key for the needs_review queue (plan §5c) # -------------------------------------------------------------------------- def normalize_name(name: str) -> str: """Diacritic-free, lowercased, whitespace-collapsed name (dedup key).""" if not name: return "" decomposed = unicodedata.normalize("NFKD", name) ascii_str = "".join(c for c in decomposed if not unicodedata.combining(c)) return re.sub(r"\s+", " ", ascii_str.lower().strip()) def content_key(normalized_name: str, language: Optional[str], description: str) -> str: """ Stable hash identifying a row for the review queue. Only borderline-kept-separate rows and legacy `.doc` rows ever carry needs_review, and neither is auto-merged — so their (normalized_name, language, description) triple is stable across rebuilds. """ payload = f"{normalized_name}\x1f{language or ''}\x1f{_normalize_text(description)}" return hashlib.sha1(payload.encode("utf-8")).hexdigest() # -------------------------------------------------------------------------- # iteration # -------------------------------------------------------------------------- def iter_extraction_files(extracted_dir: Path): """Yield every *.json directly under `extracted_dir` (skips _rejected/).""" if not extracted_dir.is_dir(): return for path in sorted(extracted_dir.glob("*.json")): if path.is_file(): yield path