The escape-ASCII-quote rule previously lived only in ephemeral Agent-call strings. Bake it into the durable artifacts so the next session doesn't re-derive it: - SUBAGENT_PROMPT.md + ENRICHMENT_PROMPT.md: explicit rule to escape any ASCII " inside JSON string values (Romanian „cuvânt" is the trap). - run_enrichment.py collect_enrichment: repair malformed parts with escape_stray_quotes instead of dropping them — the enrichment path had no repair net (bad parts were silently dropped, losing that activity's enrichment). Extraction already had one; now both do. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
290 lines
11 KiB
Python
290 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
run_enrichment.py — enrichment orchestrator (plan Part B3).
|
|
|
|
Mirror of run_extraction.py, on the *other* side of the rebuild. It reads the
|
|
already-rebuilt data/activities.db, and for every activity emits one subagent
|
|
prompt asking for a single bilingual + inferred-filter enrichment pass. Like
|
|
extraction, this script does NOT call the LLM — the interactive Claude Code
|
|
orchestrator launches waves of subagents on the emitted prompts.
|
|
|
|
Keying is the crux (plan §"Cheia de keying"): each row's overlay is keyed on
|
|
import_common.content_key(normalized_name, language, _normalize_text(description))
|
|
— the SAME function build_database uses to apply the overlay. The key is stable
|
|
only while the extraction text is frozen, so enrichment runs AFTER the freezing
|
|
rebuild.
|
|
|
|
Modes:
|
|
(default) emit one prompt per activity that has no enrichment part yet
|
|
(resumable: data/enrichment_parts/<key>.json present => skip)
|
|
--collect merge data/enrichment_parts/*.json -> data/enrichment.json
|
|
|
|
Pilot scoping (plan B5): --source <source_id substring> and/or --limit N narrow
|
|
the emitted prompts to a single source / category for the sign-off pilot.
|
|
|
|
Usage:
|
|
python scripts/run_enrichment.py --source teambuilding_corbu # pilot
|
|
python scripts/run_enrichment.py # all rows
|
|
python scripts/run_enrichment.py --collect # merge parts
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
REPO_ROOT = SCRIPT_DIR.parent
|
|
for _p in (str(SCRIPT_DIR), str(REPO_ROOT)):
|
|
if _p not in sys.path:
|
|
sys.path.insert(0, _p)
|
|
|
|
from import_common import ( # noqa: E402
|
|
content_key,
|
|
find_chunk_text,
|
|
normalize_name,
|
|
)
|
|
from repair_extractions import escape_stray_quotes # noqa: E402
|
|
|
|
ENRICHMENT_PROMPT = SCRIPT_DIR / "ENRICHMENT_PROMPT.md"
|
|
|
|
# Columns pulled from the DB into the prompt as the "current value" context.
|
|
_DB_COLUMNS = (
|
|
"id", "name", "description", "rules", "variations",
|
|
"category", "content_type", "language", "normalized_name",
|
|
"page_reference", "source_id", "chunk_key",
|
|
"participants_min", "participants_max",
|
|
"duration_min", "duration_max",
|
|
"age_group_min", "age_group_max",
|
|
)
|
|
|
|
# How much source-chunk text to inline. Chunks are page-sized; cap so a dense
|
|
# chunk does not blow the prompt up, but keep enough to ground the expansion.
|
|
_CHUNK_TEXT_CAP = 12000
|
|
|
|
|
|
def _fetch_rows(db_path: Path, source_substr: Optional[str]) -> list[dict]:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
cols = ", ".join(_DB_COLUMNS)
|
|
sql = f"SELECT {cols} FROM activities"
|
|
params: list = []
|
|
if source_substr:
|
|
sql += " WHERE (source_id LIKE ? OR chunk_key LIKE ?)"
|
|
params = [f"%{source_substr}%", f"%{source_substr}%"]
|
|
sql += " ORDER BY source_id, id"
|
|
return [dict(r) for r in conn.execute(sql, params).fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _row_content_key(row: dict) -> str:
|
|
return content_key(
|
|
row.get("normalized_name") or normalize_name(row.get("name") or ""),
|
|
row.get("language"),
|
|
row.get("description") or "",
|
|
)
|
|
|
|
|
|
def _chunk_text_for_row(row: dict, chunks_dir: Path) -> Optional[str]:
|
|
"""Locate the source-chunk text via the row's chunk_key / source_id."""
|
|
header = {"chunk_key": row.get("chunk_key"), "source_id": row.get("source_id")}
|
|
if not header["chunk_key"]:
|
|
return None
|
|
# find_chunk_text resolves from the header when chunk_key is present;
|
|
# the json_path arg is only a fallback, so a synthetic path is fine.
|
|
text = find_chunk_text(Path(f"{row['chunk_key']}.json"), header, chunks_dir)
|
|
if text and len(text) > _CHUNK_TEXT_CAP:
|
|
text = text[:_CHUNK_TEXT_CAP] + "\n…[chunk truncated]…"
|
|
return text
|
|
|
|
|
|
def _current_fields_block(row: dict) -> str:
|
|
"""The activity's current DB values, as a compact JSON block for context."""
|
|
fields = {
|
|
"name": row.get("name"),
|
|
"description": row.get("description"),
|
|
"rules": row.get("rules"),
|
|
"variations": row.get("variations"),
|
|
"category": row.get("category"),
|
|
"content_type": row.get("content_type"),
|
|
"language": row.get("language"),
|
|
"participants_min": row.get("participants_min"),
|
|
"participants_max": row.get("participants_max"),
|
|
"duration_min": row.get("duration_min"),
|
|
"duration_max": row.get("duration_max"),
|
|
"age_group_min": row.get("age_group_min"),
|
|
"age_group_max": row.get("age_group_max"),
|
|
}
|
|
return json.dumps(fields, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def emit_enrichment_prompt(
|
|
row: dict, key: str, chunks_dir: Path, prompts_dir: Path
|
|
) -> Path:
|
|
"""Write the subagent enrichment prompt for one activity."""
|
|
chunk_text = _chunk_text_for_row(row, chunks_dir)
|
|
source_block = (
|
|
chunk_text if chunk_text is not None
|
|
else "[source chunk text unavailable — translate only what is given "
|
|
"above; do NOT invent steps, and mark any inferred filter field "
|
|
"as estimated]"
|
|
)
|
|
part_path = f"data/enrichment_parts/{key}.json"
|
|
text = "\n".join([
|
|
f"# ENRICHMENT — activity `{row.get('name')}` (id {row.get('id')})",
|
|
"",
|
|
f"Follow the rules in `{ENRICHMENT_PROMPT.relative_to(REPO_ROOT)}` EXACTLY.",
|
|
"Single pass. Translate faithfully to Romanian; expand description_ro "
|
|
"ONLY from the source chunk text below; mark inferred filter fields in "
|
|
"`estimated_fields`.",
|
|
"",
|
|
f"Write the result JSON to: `{part_path}`",
|
|
f'It MUST include `"content_key": "{key}"`.',
|
|
f'Page reference: {row.get("page_reference") or "?"}',
|
|
"",
|
|
"## Current activity values (the text to translate / enrich)",
|
|
"```json",
|
|
_current_fields_block(row),
|
|
"```",
|
|
"",
|
|
"## Source chunk text (ground description_ro expansion in THIS only)",
|
|
"```",
|
|
source_block,
|
|
"```",
|
|
"",
|
|
])
|
|
prompts_dir.mkdir(parents=True, exist_ok=True)
|
|
out = prompts_dir / f"{key}.prompt.md"
|
|
out.write_text(text, encoding="utf-8")
|
|
return out
|
|
|
|
|
|
def collect_enrichment(parts_dir: Path, out_path: Path) -> dict:
|
|
"""Merge data/enrichment_parts/*.json into one flat content_key map."""
|
|
merged: dict = {}
|
|
bad: list[str] = []
|
|
repaired: list[str] = []
|
|
if parts_dir.is_dir():
|
|
for part in sorted(parts_dir.glob("*.json")):
|
|
raw = part.read_text(encoding="utf-8")
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
# Enrichment subagents hit the same unescaped-ASCII-quote bug as
|
|
# extraction (description_ro is full of Romanian „…"). Repair by
|
|
# escaping rather than dropping the activity's enrichment.
|
|
try:
|
|
data = json.loads(escape_stray_quotes(raw))
|
|
repaired.append(part.name)
|
|
except json.JSONDecodeError:
|
|
bad.append(part.name)
|
|
continue
|
|
except OSError:
|
|
bad.append(part.name)
|
|
continue
|
|
if not isinstance(data, dict):
|
|
bad.append(part.name)
|
|
continue
|
|
key = data.get("content_key") or part.stem
|
|
entry = {k: v for k, v in data.items() if k != "content_key"}
|
|
merged[key] = entry
|
|
out_path.write_text(
|
|
json.dumps(merged, ensure_ascii=False, indent=2), encoding="utf-8"
|
|
)
|
|
return {"entries": len(merged), "repaired": repaired,
|
|
"bad_parts": bad, "out": str(out_path)}
|
|
|
|
|
|
def run_emit(
|
|
*,
|
|
db_path: Path,
|
|
chunks_dir: Path,
|
|
parts_dir: Path,
|
|
prompts_dir: Path,
|
|
source_substr: Optional[str],
|
|
limit: Optional[int],
|
|
) -> dict:
|
|
rows = _fetch_rows(db_path, source_substr)
|
|
emitted, skipped = 0, 0
|
|
for row in rows:
|
|
key = _row_content_key(row)
|
|
if (parts_dir / f"{key}.json").is_file():
|
|
skipped += 1
|
|
continue
|
|
emit_enrichment_prompt(row, key, chunks_dir, prompts_dir)
|
|
emitted += 1
|
|
if limit and emitted >= limit:
|
|
break
|
|
return {
|
|
"rows": len(rows),
|
|
"emitted": emitted,
|
|
"skipped_done": skipped,
|
|
"prompts_dir": str(prompts_dir),
|
|
}
|
|
|
|
|
|
def main(argv: Optional[list[str]] = None) -> int:
|
|
parser = argparse.ArgumentParser(description="Enrichment orchestrator.")
|
|
parser.add_argument("--db", default="data/activities.db")
|
|
parser.add_argument("--chunks", default="data/chunks")
|
|
parser.add_argument("--parts", default="data/enrichment_parts")
|
|
parser.add_argument("--prompts", default="data/enrichment_prompts")
|
|
parser.add_argument("--out", default="data/enrichment.json")
|
|
parser.add_argument("--source", default=None,
|
|
help="only rows whose source_id/chunk_key contains this (pilot)")
|
|
parser.add_argument("--limit", type=int, default=None,
|
|
help="cap emitted prompts (pilot)")
|
|
parser.add_argument("--collect", action="store_true",
|
|
help="merge enrichment parts into the overlay JSON")
|
|
args = parser.parse_args(argv)
|
|
|
|
print("=" * 60)
|
|
print("ENRICHMENT ORCHESTRATOR")
|
|
print("=" * 60)
|
|
|
|
if args.collect:
|
|
result = collect_enrichment(Path(args.parts), Path(args.out))
|
|
print(f"collected : {result['entries']} entries -> {result['out']}")
|
|
if result["repaired"]:
|
|
print(f"repaired : {len(result['repaired'])} parts (unescaped-quote fix)")
|
|
if result["bad_parts"]:
|
|
print(f"bad parts : {len(result['bad_parts'])} (skipped)")
|
|
for name in result["bad_parts"]:
|
|
print(f" - {name}")
|
|
print("Run build_database.py --rebuild to apply the overlay.")
|
|
print("=" * 60)
|
|
return 0
|
|
|
|
summary = run_emit(
|
|
db_path=Path(args.db),
|
|
chunks_dir=Path(args.chunks),
|
|
parts_dir=Path(args.parts),
|
|
prompts_dir=Path(args.prompts),
|
|
source_substr=args.source,
|
|
limit=args.limit,
|
|
)
|
|
print(f"rows in DB : {summary['rows']}"
|
|
+ (f" (filtered by '{args.source}')" if args.source else ""))
|
|
print(f"already enriched : {summary['skipped_done']}")
|
|
print(f"prompts emitted : {summary['emitted']}")
|
|
if summary["emitted"]:
|
|
print(f"prompts dir : {summary['prompts_dir']}/")
|
|
print("Launch waves of ~8-16 Sonnet subagents on those prompts, each "
|
|
"writing data/enrichment_parts/<key>.json, then run "
|
|
"run_enrichment.py --collect and build_database.py --rebuild.")
|
|
else:
|
|
print("Nothing to emit — run --collect then build_database.py --rebuild.")
|
|
print("=" * 60)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|