Files
game-library/scripts/chunk_sources.py
Claude Agent 66ae831c36 Rebuild extraction pipeline infrastructure (Faza 0 prep)
Implements the approved plan to replace the broken regex/index-master
extraction with an LLM-subagent pipeline. Four parallel lanes:

Lane A — scripts/extract_common.py (PDF/docx/doc/pptx/html/zip, no
  max_pages truncation), normalize_sources.py, chunk_sources.py
  (~20pg chunks + overlap, manifest registry), activity_schema.json.
Lane B — app/config_taxonomy.py (16 fixed category slugs), schema
  rebuilt from scratch in app/models/ with content_type, language,
  source_files, source_excerpt, normalized_name, extraction_confidence,
  needs_review; FTS5 + 3 triggers extended with materials_list and
  skills_developed.
Lane C — build_database.py (--rebuild, atomic swap, schema + fuzzy
  source_excerpt validation, dedup with needs_review band),
  validate_extractions.py, review_queue.py, new run_extraction.py
  orchestrator, SUBAGENT_PROMPT.md.
Lane D — search.py content_type/language filters (default search
  excludes non-game content), E7 schema-compat audit; fixed a NULL
  keywords AttributeError in _boost_search_relevance.

Removes 8 orphaned/dead scripts and app/services/parser.py +
indexer.py. Adds tests/ (70 passing, 1 skipped — libreoffice absent).

Note: Lane D made one additive edit to app/models/database.py
(_update_category_counts) to surface content_type/language in
get_filter_options, outside its nominal lane boundary but after
Lane B completed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 17:43:38 +00:00

252 lines
8.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
chunk_sources.py — split normalized data/sources/*.txt into ~20-page chunks
for subagent extraction, and maintain data/chunks/manifest.json.
Paginated text → ~20-page chunks, ~4-page overlap (plan D8).
Unpaginated text → ~10000-word windows, ~2000-word overlap.
The manifest is a cache derived from the filesystem + per-chunk state. Re-running
this script is idempotent: existing chunk states (pending/assigned/done/rejected)
survive as long as the source content hash is unchanged.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
if str(SCRIPT_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPT_DIR))
from extract_common import content_hash, split_pages # noqa: E402
SCHEMA_VERSION = "1.0"
PAGES_PER_CHUNK = 20
PAGE_OVERLAP = 4
WORD_WINDOW = 10_000
WORD_OVERLAP = 2_000
VALID_STATES = {"pending", "assigned", "done", "rejected"}
# --------------------------------------------------------------------------
# header parsing
# --------------------------------------------------------------------------
def parse_source(text: str) -> tuple[dict, str]:
"""Split a normalized source file into (header_dict, body)."""
lines = text.splitlines()
header: dict = {}
body_start = 0
in_header = True
for i, line in enumerate(lines):
if line.startswith("--- PAGE "):
body_start = i
break
if not in_header:
continue
if set(line.strip()) == {"="} and line.strip():
body_start = i + 1
in_header = False # header ends at the rule line
continue
if ":" in line:
key, _, val = line.partition(":")
header[key.strip()] = val.strip()
body = "\n".join(lines[body_start:])
return header, body
# --------------------------------------------------------------------------
# chunking — pure functions
# --------------------------------------------------------------------------
def chunk_pages(
pages: list[tuple[int, str]],
pages_per_chunk: int = PAGES_PER_CHUNK,
overlap: int = PAGE_OVERLAP,
) -> list[dict]:
"""
Split an ordered list of (page_no, text) into overlapping chunks.
stride = pages_per_chunk - overlap. Because stride < pages_per_chunk - 1, any
activity straddling a page boundary appears whole in at least one chunk.
"""
if not pages:
return []
stride = max(1, pages_per_chunk - overlap)
chunks: list[dict] = []
i = 0
n = len(pages)
while i < n:
window = pages[i : i + pages_per_chunk]
first, last = window[0][0], window[-1][0]
text = "".join(
f"\n--- PAGE {num} ---\n{txt}\n" for num, txt in window
)
chunks.append(
{"page_start": first, "page_end": last,
"chunk_range": f"pages {first}-{last}", "text": text}
)
if i + pages_per_chunk >= n:
break
i += stride
return chunks
def chunk_words(
text: str, window: int = WORD_WINDOW, overlap: int = WORD_OVERLAP
) -> list[dict]:
"""Split unpaginated text into overlapping word windows."""
words = text.split()
if not words:
return []
stride = max(1, window - overlap)
chunks: list[dict] = []
i = 0
n = len(words)
while i < n:
seg = words[i : i + window]
chunks.append(
{"word_start": i, "word_end": i + len(seg),
"chunk_range": f"words {i}-{i + len(seg)}", "text": " ".join(seg)}
)
if i + window >= n:
break
i += stride
return chunks
def make_chunks(source_text: str) -> list[dict]:
"""Chunk one normalized source file. Picks page- or word-windowing."""
_, body = parse_source(source_text)
pages = split_pages(body)
if pages:
return chunk_pages(pages)
return chunk_words(body)
# --------------------------------------------------------------------------
# manifest
# --------------------------------------------------------------------------
def _empty_manifest() -> dict:
return {"schema_version": SCHEMA_VERSION, "chunks": {}}
def load_manifest(manifest_path: Path) -> dict:
if manifest_path.exists():
try:
data = json.loads(manifest_path.read_text(encoding="utf-8"))
data.setdefault("schema_version", SCHEMA_VERSION)
data.setdefault("chunks", {})
return data
except (json.JSONDecodeError, OSError):
pass
return _empty_manifest()
def save_manifest(manifest: dict, manifest_path: Path) -> None:
manifest_path.parent.mkdir(parents=True, exist_ok=True)
manifest_path.write_text(
json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8"
)
def chunk_source_file(
source_path: Path, chunks_dir: Path, manifest: dict
) -> list[str]:
"""
Chunk one data/sources/<id>.txt → data/chunks/<id>/<id>.partNN.txt and
register every chunk in `manifest`. Preserves prior state when the source
content hash is unchanged. Returns the list of chunk keys written.
"""
source_id = source_path.stem
text = source_path.read_text(encoding="utf-8", errors="replace")
src_hash = content_hash(text)
chunks = make_chunks(text)
out_dir = chunks_dir / source_id
out_dir.mkdir(parents=True, exist_ok=True)
written: list[str] = []
for idx, chunk in enumerate(chunks, 1):
key = f"{source_id}.part{idx:02d}"
chunk_file = out_dir / f"{key}.txt"
chunk_file.write_text(chunk["text"], encoding="utf-8")
prior = manifest["chunks"].get(key)
# preserve state only if the source content is unchanged
if prior and prior.get("source_hash") == src_hash and \
prior.get("state") in VALID_STATES:
state = prior["state"]
else:
state = "pending"
manifest["chunks"][key] = {
"source_id": source_id,
"source_hash": src_hash,
"part": idx,
"chunk_range": chunk["chunk_range"],
"chunk_file": str(chunk_file.relative_to(chunks_dir.parent)),
"expected_json": f"{key}.json",
"state": state,
}
written.append(key)
return written
def prune_stale(manifest: dict, live_keys: set[str]) -> list[str]:
"""Drop manifest entries whose chunk no longer exists on disk."""
stale = [k for k in manifest["chunks"] if k not in live_keys]
for k in stale:
del manifest["chunks"][k]
return stale
# --------------------------------------------------------------------------
# CLI
# --------------------------------------------------------------------------
def run(sources_dir: Path, chunks_dir: Path) -> dict:
"""Chunk every *.txt in sources_dir. Returns a summary dict."""
manifest_path = chunks_dir / "manifest.json"
manifest = load_manifest(manifest_path)
live_keys: set[str] = set()
source_files = sorted(sources_dir.glob("*.txt"))
for src in source_files:
live_keys.update(chunk_source_file(src, chunks_dir, manifest))
stale = prune_stale(manifest, live_keys)
save_manifest(manifest, manifest_path)
states: dict[str, int] = {}
for meta in manifest["chunks"].values():
states[meta["state"]] = states.get(meta["state"], 0) + 1
return {
"sources": len(source_files),
"chunks": len(live_keys),
"pruned": len(stale),
"states": states,
}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Chunk normalized sources.")
parser.add_argument("--sources", default="data/sources", help="sources dir")
parser.add_argument("--chunks", default="data/chunks", help="chunks output dir")
args = parser.parse_args(argv)
summary = run(Path(args.sources), Path(args.chunks))
print(f"sources processed : {summary['sources']}")
print(f"chunks written : {summary['chunks']}")
print(f"stale pruned : {summary['pruned']}")
for state, count in sorted(summary["states"].items()):
print(f" {state:<10}: {count}")
return 0
if __name__ == "__main__":
raise SystemExit(main())