#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ chunk_sources.py — split normalized data/sources/*.txt into ~20-page chunks for subagent extraction, and maintain data/chunks/manifest.json. Paginated text → ~20-page chunks, ~4-page overlap (plan D8). Unpaginated text → ~10000-word windows, ~2000-word overlap. The manifest is a cache derived from the filesystem + per-chunk state. Re-running this script is idempotent: existing chunk states (pending/assigned/done/rejected) survive as long as the source content hash is unchanged. """ from __future__ import annotations import argparse import json import sys from pathlib import Path SCRIPT_DIR = Path(__file__).resolve().parent if str(SCRIPT_DIR) not in sys.path: sys.path.insert(0, str(SCRIPT_DIR)) from extract_common import content_hash, split_pages # noqa: E402 SCHEMA_VERSION = "1.0" PAGES_PER_CHUNK = 20 PAGE_OVERLAP = 4 WORD_WINDOW = 10_000 WORD_OVERLAP = 2_000 VALID_STATES = {"pending", "assigned", "done", "rejected"} # -------------------------------------------------------------------------- # header parsing # -------------------------------------------------------------------------- def parse_source(text: str) -> tuple[dict, str]: """Split a normalized source file into (header_dict, body).""" lines = text.splitlines() header: dict = {} body_start = 0 in_header = True for i, line in enumerate(lines): if line.startswith("--- PAGE "): body_start = i break if not in_header: continue if set(line.strip()) == {"="} and line.strip(): body_start = i + 1 in_header = False # header ends at the rule line continue if ":" in line: key, _, val = line.partition(":") header[key.strip()] = val.strip() body = "\n".join(lines[body_start:]) return header, body # -------------------------------------------------------------------------- # chunking — pure functions # -------------------------------------------------------------------------- def chunk_pages( pages: list[tuple[int, str]], pages_per_chunk: int = PAGES_PER_CHUNK, overlap: int = PAGE_OVERLAP, ) -> list[dict]: """ Split an ordered list of (page_no, text) into overlapping chunks. stride = pages_per_chunk - overlap. Because stride < pages_per_chunk - 1, any activity straddling a page boundary appears whole in at least one chunk. """ if not pages: return [] stride = max(1, pages_per_chunk - overlap) chunks: list[dict] = [] i = 0 n = len(pages) while i < n: window = pages[i : i + pages_per_chunk] first, last = window[0][0], window[-1][0] text = "".join( f"\n--- PAGE {num} ---\n{txt}\n" for num, txt in window ) chunks.append( {"page_start": first, "page_end": last, "chunk_range": f"pages {first}-{last}", "text": text} ) if i + pages_per_chunk >= n: break i += stride return chunks def chunk_words( text: str, window: int = WORD_WINDOW, overlap: int = WORD_OVERLAP ) -> list[dict]: """Split unpaginated text into overlapping word windows.""" words = text.split() if not words: return [] stride = max(1, window - overlap) chunks: list[dict] = [] i = 0 n = len(words) while i < n: seg = words[i : i + window] chunks.append( {"word_start": i, "word_end": i + len(seg), "chunk_range": f"words {i}-{i + len(seg)}", "text": " ".join(seg)} ) if i + window >= n: break i += stride return chunks def make_chunks(source_text: str) -> list[dict]: """Chunk one normalized source file. Picks page- or word-windowing.""" _, body = parse_source(source_text) pages = split_pages(body) if pages: return chunk_pages(pages) return chunk_words(body) # -------------------------------------------------------------------------- # manifest # -------------------------------------------------------------------------- def _empty_manifest() -> dict: return {"schema_version": SCHEMA_VERSION, "chunks": {}} def load_manifest(manifest_path: Path) -> dict: if manifest_path.exists(): try: data = json.loads(manifest_path.read_text(encoding="utf-8")) data.setdefault("schema_version", SCHEMA_VERSION) data.setdefault("chunks", {}) return data except (json.JSONDecodeError, OSError): pass return _empty_manifest() def save_manifest(manifest: dict, manifest_path: Path) -> None: manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text( json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8" ) def chunk_source_file( source_path: Path, chunks_dir: Path, manifest: dict ) -> list[str]: """ Chunk one data/sources/.txt → data/chunks//.partNN.txt and register every chunk in `manifest`. Preserves prior state when the source content hash is unchanged. Returns the list of chunk keys written. """ source_id = source_path.stem text = source_path.read_text(encoding="utf-8", errors="replace") src_hash = content_hash(text) chunks = make_chunks(text) out_dir = chunks_dir / source_id out_dir.mkdir(parents=True, exist_ok=True) written: list[str] = [] for idx, chunk in enumerate(chunks, 1): key = f"{source_id}.part{idx:02d}" chunk_file = out_dir / f"{key}.txt" chunk_file.write_text(chunk["text"], encoding="utf-8") prior = manifest["chunks"].get(key) # preserve state only if the source content is unchanged if prior and prior.get("source_hash") == src_hash and \ prior.get("state") in VALID_STATES: state = prior["state"] else: state = "pending" manifest["chunks"][key] = { "source_id": source_id, "source_hash": src_hash, "part": idx, "chunk_range": chunk["chunk_range"], "chunk_file": str(chunk_file.relative_to(chunks_dir.parent)), "expected_json": f"{key}.json", "state": state, } written.append(key) return written def prune_stale(manifest: dict, live_keys: set[str]) -> list[str]: """Drop manifest entries whose chunk no longer exists on disk.""" stale = [k for k in manifest["chunks"] if k not in live_keys] for k in stale: del manifest["chunks"][k] return stale # -------------------------------------------------------------------------- # CLI # -------------------------------------------------------------------------- def run(sources_dir: Path, chunks_dir: Path) -> dict: """Chunk every *.txt in sources_dir. Returns a summary dict.""" manifest_path = chunks_dir / "manifest.json" manifest = load_manifest(manifest_path) live_keys: set[str] = set() source_files = sorted(sources_dir.glob("*.txt")) for src in source_files: live_keys.update(chunk_source_file(src, chunks_dir, manifest)) stale = prune_stale(manifest, live_keys) save_manifest(manifest, manifest_path) states: dict[str, int] = {} for meta in manifest["chunks"].values(): states[meta["state"]] = states.get(meta["state"], 0) + 1 return { "sources": len(source_files), "chunks": len(live_keys), "pruned": len(stale), "states": states, } def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Chunk normalized sources.") parser.add_argument("--sources", default="data/sources", help="sources dir") parser.add_argument("--chunks", default="data/chunks", help="chunks output dir") args = parser.parse_args(argv) summary = run(Path(args.sources), Path(args.chunks)) print(f"sources processed : {summary['sources']}") print(f"chunks written : {summary['chunks']}") print(f"stale pruned : {summary['pruned']}") for state, count in sorted(summary["states"].items()): print(f" {state:<10}: {count}") return 0 if __name__ == "__main__": raise SystemExit(main())