#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ enrichment_wave.py — throttled, window-paced wave preparation for the corpus enrichment pipeline. The enrichment backlog (~9541 keys) does NOT fit in one 5-hour Anthropic usage window. Launching all remaining batches at once always runs the window to EXHAUSTION (the "subagent completed without calling StructuredOutput" signature), consuming 100% and blocking other work. There is no readable real-time window meter, so pacing must be BLIND: cap each wave to a fixed KEY COUNT (sized to ~75% of empirical window capacity, ~950 keys), and let an external scheduler (cron, every 6h) space waves across windows. This script encapsulates the reconcile + bounded-wave preparation that used to live as ad-hoc inline Python. It does NOT call the LLM and does NOT launch workflows — it only prepares files on disk and prints what to launch. Modes: --status read-only: print done / missing / pct --prepare --keys N --shards K drop corrupt parts; take the FIRST N missing keys (sorted, deterministic); write batch files for ONLY those; regenerate K shard JS files covering exactly those batches; print machine-greppable WAVE:/SHARD: lines. Idempotency: a key is "done" iff data/enrichment_parts/.json exists AND parses. Re-running --prepare with the same args is deterministic (same sorted first-N keys), so a re-fire never reshuffles work. Parts on disk are the durable checkpoint. Output contract (parsed by the cron wave-runner): WAVE: COMPLETE -> backlog empty; run collect+rebuild WAVE: PREPARED keys=.. batches=.. shards=.. remaining_after=.. SHARD: data/enrichment_wf/shard_0.js -> one line per workflow to launch ... Usage: python3 scripts/enrichment_wave.py --status python3 scripts/enrichment_wave.py --prepare --keys 700 --shards 8 """ from __future__ import annotations import argparse import json import sys from pathlib import Path SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = SCRIPT_DIR.parent PROMPT_SUFFIX = ".prompt.md" PART_SUFFIX = ".json" BATCH_SIZE_DEFAULT = 12 KEYS_DEFAULT = 700 SHARDS_DEFAULT = 8 # Resolved relative to REPO_ROOT so the script works from any cwd. DEF_PROMPTS = "data/enrichment_prompts" DEF_PARTS = "data/enrichment_parts" DEF_BATCHES = "data/enrichment_batches" DEF_WF = "data/enrichment_wf" TEMPLATE_NAME = "shard.js.tmpl" # --------------------------------------------------------------------------- # # Helpers # --------------------------------------------------------------------------- # def _abs(p: str) -> Path: q = Path(p) return q if q.is_absolute() else (REPO_ROOT / q) def part_ok(path: Path) -> bool: """A part counts as done iff it parses as a JSON object.""" try: return isinstance(json.load(open(path, encoding="utf-8")), dict) except Exception: return False def corrupt_parts(parts_dir: Path) -> list[Path]: return [p for p in parts_dir.glob("*" + PART_SUFFIX) if not part_ok(p)] def compute_missing(prompts_dir: Path, parts_dir: Path) -> list[str]: """Keys whose prompt exists but whose part is absent. Sorted = deterministic.""" missing = [] for pr in prompts_dir.glob("*" + PROMPT_SUFFIX): key = pr.name[: -len(PROMPT_SUFFIX)] if not (parts_dir / (key + PART_SUFFIX)).exists(): missing.append(key) return sorted(missing) def count_done(prompts_dir: Path, parts_dir: Path) -> tuple[int, int]: """(good_parts_with_prompt, total_prompts).""" total = 0 good = 0 for pr in prompts_dir.glob("*" + PROMPT_SUFFIX): total += 1 key = pr.name[: -len(PROMPT_SUFFIX)] part = parts_dir / (key + PART_SUFFIX) if part.exists() and part_ok(part): good += 1 return good, total def write_batches(keys: list[str], batches_dir: Path, size: int) -> int: """Replace all batch_*.txt with fresh files of <= size keys. Returns NB.""" batches_dir.mkdir(parents=True, exist_ok=True) for old in batches_dir.glob("batch_*.txt"): old.unlink() nb = 0 for i in range(0, len(keys), size): chunk = keys[i : i + size] (batches_dir / f"batch_{nb:04d}.txt").write_text( "\n".join(chunk) + "\n", encoding="utf-8" ) nb += 1 return nb def shard_ranges(nb: int, k: int) -> list[tuple[int, int]]: """Split [0,nb) into k contiguous, disjoint, total-covering ranges. Even distribution: the first (nb % k) shards carry one extra batch. When nb < k the trailing ranges are empty [x,x) and are dropped by the caller. """ if nb <= 0 or k <= 0: return [] base, extra = divmod(nb, k) ranges = [] start = 0 for i in range(k): length = base + (1 if i < extra else 0) ranges.append((start, start + length)) start += length return ranges def render_shard(template: str, shard: int, start: int, end: int, nshards: int) -> str: return ( template.replace("__SHARD__", str(shard)) .replace("__START__", str(start)) .replace("__END__", str(end)) .replace("__NSHARDS__", str(nshards)) ) def write_shards(ranges: list[tuple[int, int]], template: str, wf_dir: Path) -> list[Path]: """Delete stale shard_*.js, then write one per NON-EMPTY range. Returns paths.""" wf_dir.mkdir(parents=True, exist_ok=True) for old in wf_dir.glob("shard_*.js"): old.unlink() non_empty = [(i, s, e) for i, (s, e) in enumerate(ranges) if e > s] nshards = len(non_empty) paths = [] # Re-index shards 0..nshards-1 so labels/meta stay contiguous even if some # trailing ranges were empty (tiny final wave with fewer batches than K). for new_idx, (_, s, e) in enumerate(non_empty): path = wf_dir / f"shard_{new_idx}.js" path.write_text( render_shard(template, new_idx, s, e, nshards), encoding="utf-8" ) paths.append(path) return paths def rel(path: Path) -> str: try: return str(path.relative_to(REPO_ROOT)) except ValueError: return str(path) # --------------------------------------------------------------------------- # # Modes # --------------------------------------------------------------------------- # def cmd_status(prompts_dir: Path, parts_dir: Path) -> int: good, total = count_done(prompts_dir, parts_dir) parts_on_disk = len(list(parts_dir.glob("*" + PART_SUFFIX))) bad = len(corrupt_parts(parts_dir)) missing = total - good pct = (100.0 * good / total) if total else 0.0 print("=== enrichment status ===") print(f"prompts (universe) : {total}") print(f"parts on disk : {parts_on_disk}") print(f"good (done) : {good}") print(f"corrupt parts : {bad} (reported only; --prepare drops them)") print(f"missing : {missing}") print(f"done : {pct:.1f}%") if total: print(f"WAVE: {'COMPLETE' if missing == 0 else 'PENDING'} missing={missing}") return 0 def cmd_prepare( prompts_dir: Path, parts_dir: Path, batches_dir: Path, wf_dir: Path, keys: int, shards: int, batch_size: int, make_shards: bool = True, ) -> int: template = "" if make_shards: template_path = wf_dir / TEMPLATE_NAME if not template_path.is_file(): print(f"ERROR: missing shard template {rel(template_path)}", file=sys.stderr) return 2 template = template_path.read_text(encoding="utf-8") # 1) drop corrupt parts (only mutation to parts/) dropped = 0 for p in corrupt_parts(parts_dir): p.unlink() dropped += 1 # 2) compute missing (deterministic) missing = compute_missing(prompts_dir, parts_dir) # 3) empty -> COMPLETE sentinel, no files written if not missing: print(f"dropped_corrupt={dropped}") print("WAVE: COMPLETE") return 0 # 4) clamp to first N take = missing[:keys] # 5) batches for ONLY those keys nb = write_batches(take, batches_dir, batch_size) # 6) shard scripts covering exactly those batches (skipped on the bash path) paths = [] if make_shards: ranges = shard_ranges(nb, shards) paths = write_shards(ranges, template, wf_dir) remaining_after = len(missing) - len(take) print(f"dropped_corrupt={dropped}") print( f"WAVE: PREPARED keys={len(take)} batches={nb} " f"shards={len(paths)} remaining_after={remaining_after}" ) for p in paths: print(f"SHARD: {rel(p)}") return 0 def main(argv=None) -> int: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--status", action="store_true", help="read-only progress report") ap.add_argument("--prepare", action="store_true", help="prepare one bounded wave") ap.add_argument("--keys", type=int, default=KEYS_DEFAULT, help=f"max keys this wave (default {KEYS_DEFAULT})") ap.add_argument("--shards", type=int, default=SHARDS_DEFAULT, help=f"workflow shards (default {SHARDS_DEFAULT})") ap.add_argument("--batch-size", type=int, default=BATCH_SIZE_DEFAULT, help=f"keys per batch (default {BATCH_SIZE_DEFAULT})") ap.add_argument("--no-shards", action="store_true", help="prepare batch files only; skip shard JS generation (bash/headless path)") ap.add_argument("--prompts", default=DEF_PROMPTS) ap.add_argument("--parts", default=DEF_PARTS) ap.add_argument("--batches", default=DEF_BATCHES) ap.add_argument("--wf-dir", default=DEF_WF) args = ap.parse_args(argv) prompts_dir = _abs(args.prompts) parts_dir = _abs(args.parts) batches_dir = _abs(args.batches) wf_dir = _abs(args.wf_dir) if not prompts_dir.is_dir(): print(f"ERROR: prompts dir not found: {rel(prompts_dir)}", file=sys.stderr) return 2 parts_dir.mkdir(parents=True, exist_ok=True) if args.keys < 1 or args.shards < 1 or args.batch_size < 1: print("ERROR: --keys/--shards/--batch-size must be >= 1", file=sys.stderr) return 2 if args.prepare: return cmd_prepare( prompts_dir, parts_dir, batches_dir, wf_dir, args.keys, args.shards, args.batch_size, make_shards=not args.no_shards, ) # default to status return cmd_status(prompts_dir, parts_dir) if __name__ == "__main__": raise SystemExit(main())