#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ build_database.py — build data/activities.db from the subagent extraction JSON. Replaces the old import_claude_activities.py. Pipeline (plan §4): 1. `--rebuild` builds into data/activities.db.tmp; on success the live DB is backed up to data/activities.db.bak and the tmp file is swapped in with an atomic os.replace. A mid-build crash leaves the live DB untouched. 2. Every data/extracted/*.json is validated against scripts/activity_schema.json; invalid files are moved to data/extracted/_rejected/ with an error log. 2b. Each source_excerpt must appear as a fuzzy substring (rapidfuzz partial_ratio >= 90) of its source chunk — non-matches are hallucinations and the activity is dropped (logged to _rejected/). 3. `category` is normalized to a valid taxonomy slug (fallback `altele`). 4. Dedup (D5): group by exact normalized_name, never across languages; within a group rapidfuzz on descriptions — >=85 auto-merge, 60-85 borderline (keep both, needs_review), <60 separate variants. 5. data/review_decisions.json is applied before insert. 6. Bulk insert into the tmp DB, populate the categories table, rebuild FTS. 7. A QA report is printed. Usage: python scripts/build_database.py --rebuild """ from __future__ import annotations import argparse import json import os import shutil import sys from collections import defaultdict from pathlib import Path from typing import Any, Optional SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = SCRIPT_DIR.parent for _p in (str(SCRIPT_DIR), str(REPO_ROOT)): if _p not in sys.path: sys.path.insert(0, _p) from app.config_taxonomy import ( # noqa: E402 category_display_name, normalize_category, normalize_content_type, ) from app.models.activity import Activity # noqa: E402 from app.models.database import DatabaseManager # noqa: E402 from import_common import ( # noqa: E402 DEFAULT_SCHEMA_PATH, content_key, excerpt_matches, find_chunk_text, iter_extraction_files, load_schema, normalize_name, source_path_for, ) # dedup thresholds (rapidfuzz token_sort_ratio, 0..100 scale) AUTO_MERGE_THRESHOLD = 85.0 BORDERLINE_THRESHOLD = 60.0 # -------------------------------------------------------------------------- # extraction dict -> Activity # -------------------------------------------------------------------------- def _csv(value: Any) -> Optional[str]: """Schema arrays -> comma string for the (TEXT) DB columns.""" if value is None: return None if isinstance(value, str): return value.strip() or None if isinstance(value, (list, tuple)): parts = [str(v).strip() for v in value if str(v).strip()] return ", ".join(parts) or None return str(value) def _split_csv(value: Optional[str]) -> list[str]: if not value: return [] return [p.strip() for p in str(value).split(",") if p.strip()] def dict_to_activity( adict: dict, source_file: str, source_id: Optional[str] = None, chunk_key: Optional[str] = None, ) -> Activity: """Build an Activity from one extraction-JSON activity object.""" tags = adict.get("tags") or [] if isinstance(tags, str): tags = _split_csv(tags) source_files = adict.get("source_files") or [] if isinstance(source_files, str): source_files = _split_csv(source_files) if source_file and source_file not in source_files: source_files = [source_file, *source_files] return Activity( source_id=source_id, source_ids=[source_id] if source_id else [], chunk_key=chunk_key, name=(adict.get("name") or "").strip(), description=(adict.get("description") or "").strip(), rules=adict.get("rules"), variations=adict.get("variations"), category=normalize_category(adict.get("category", "")), subcategory=adict.get("subcategory"), content_type=normalize_content_type(adict.get("content_type", "")), source_file=source_file, source_files=list(source_files), page_reference=adict.get("page_reference"), source_excerpt=adict.get("source_excerpt"), age_group_min=adict.get("age_group_min"), age_group_max=adict.get("age_group_max"), participants_min=adict.get("participants_min"), participants_max=adict.get("participants_max"), duration_min=adict.get("duration_min"), duration_max=adict.get("duration_max"), materials_category=adict.get("materials_category"), materials_list=_csv(adict.get("materials_list")), skills_developed=_csv(adict.get("skills_developed")), difficulty_level=adict.get("difficulty_level"), keywords=_csv(adict.get("keywords")), tags=list(tags), language=adict.get("language"), extraction_confidence=adict.get("extraction_confidence"), ) # -------------------------------------------------------------------------- # step 3 — category normalization is done in dict_to_activity; a non-taxonomy # value silently falls back to `altele`. This logs the substitutions. # -------------------------------------------------------------------------- def log_category_fallbacks(raw_pairs: list[tuple[str, str]]) -> list[str]: """raw_pairs = (original, slug); return human-readable fallback messages.""" msgs = [] for original, slug in raw_pairs: if slug == "altele" and normalize_name(original or "") not in ("", "altele"): msgs.append(f"category '{original}' -> altele (not in taxonomy)") return msgs # -------------------------------------------------------------------------- # step 4 — dedup # -------------------------------------------------------------------------- def _longest(*values: Optional[str]) -> Optional[str]: best: Optional[str] = None for v in values: if v and (best is None or len(v) > len(best)): best = v return best def _union_csv(values: list[Optional[str]]) -> Optional[str]: seen: list[str] = [] for value in values: for item in _split_csv(value): if item not in seen: seen.append(item) return ", ".join(seen) or None def merge_cluster(cluster: list[Activity]) -> Activity: """Collapse a cluster of duplicate activities into one merged Activity.""" if len(cluster) == 1: return cluster[0] # representative = the one with the longest description rep = max(cluster, key=lambda a: len(a.description or "")) merged = Activity( name=rep.name, description=_longest(*(a.description for a in cluster)) or rep.description, rules=_longest(*(a.rules for a in cluster)), variations=_longest(*(a.variations for a in cluster)), category=rep.category, subcategory=rep.subcategory, content_type=rep.content_type, source_file=rep.source_file, page_reference=rep.page_reference, source_excerpt=rep.source_excerpt, age_group_min=rep.age_group_min, age_group_max=rep.age_group_max, participants_min=rep.participants_min, participants_max=rep.participants_max, duration_min=rep.duration_min, duration_max=rep.duration_max, materials_category=rep.materials_category, materials_list=_union_csv([a.materials_list for a in cluster]), skills_developed=_union_csv([a.skills_developed for a in cluster]), difficulty_level=rep.difficulty_level, keywords=_union_csv([a.keywords for a in cluster]), language=rep.language, extraction_confidence=rep.extraction_confidence, ) # union of tags tags: list[str] = [] for a in cluster: for t in a.tags or []: if t not in tags: tags.append(t) merged.tags = tags # accumulate every source the activity was seen in sources: list[str] = [] for a in cluster: for s in [a.source_file, *(a.source_files or [])]: if s and s not in sources: sources.append(s) merged.source_files = sources # source provenance: keep rep's chunk_key/source_id as primary, union the # source_ids for the download route. Enrichment fields (name_ro, # description_ro, indoor_outdoor, ...) are intentionally NOT carried here: # enrichment is applied AFTER dedup (plan Part B2), keyed on the merged # row's content_key, so merging must not pre-populate them. merged.source_id = rep.source_id merged.chunk_key = rep.chunk_key source_ids: list[str] = [] for a in cluster: for sid in [a.source_id, *(a.source_ids or [])]: if sid and sid not in source_ids: source_ids.append(sid) merged.source_ids = source_ids # popularity_score++ per merged duplicate (plan §4) merged.popularity_score = max(a.popularity_score for a in cluster) + (len(cluster) - 1) return merged def dedup_activities(activities: list[Activity]) -> tuple[list[Activity], dict]: """ Dedup per plan D5. Groups by (normalized_name, language) — different languages are NEVER merged. Within a group, descriptions are clustered with rapidfuzz: >= 85 -> same cluster (auto-merge) 60-85 -> borderline: kept as separate clusters, both flagged needs_review < 60 -> separate variants """ from rapidfuzz import fuzz groups: dict[tuple, list[Activity]] = defaultdict(list) for act in activities: key = (act.normalized_name or normalize_name(act.name), act.language) groups[key].append(act) result: list[Activity] = [] stats = {"input": len(activities), "auto_merged": 0, "borderline": 0, "output": 0} for members in groups.values(): clusters: list[list[Activity]] = [] borderline_idx: set[int] = set() for act in members: best_idx, best_score = -1, -1.0 borderline_here: list[int] = [] for idx, cluster in enumerate(clusters): score = fuzz.token_sort_ratio( act.description or "", cluster[0].description or "" ) if score >= AUTO_MERGE_THRESHOLD: if score > best_score: best_idx, best_score = idx, score elif score >= BORDERLINE_THRESHOLD: borderline_here.append(idx) if best_idx >= 0: clusters[best_idx].append(act) else: clusters.append([act]) new_idx = len(clusters) - 1 for bidx in borderline_here: borderline_idx.add(bidx) borderline_idx.add(new_idx) for idx, cluster in enumerate(clusters): merged = merge_cluster(cluster) if len(cluster) > 1: stats["auto_merged"] += len(cluster) - 1 if idx in borderline_idx: merged.needs_review = 1 stats["borderline"] += 1 result.append(merged) stats["output"] = len(result) return result, stats # -------------------------------------------------------------------------- # step 5 — review decisions # -------------------------------------------------------------------------- def load_review_decisions(path: Path) -> dict: if path and path.is_file(): try: data = json.loads(path.read_text(encoding="utf-8")) if isinstance(data, dict): return data except (json.JSONDecodeError, OSError): pass return {} def apply_review_decisions( activities: list[Activity], decisions: dict ) -> tuple[list[Activity], dict]: """ Apply data/review_decisions.json (plan §5c). Keyed by the stable content_key. A decision of `drop` removes the row; `keep-separate` / `merge` clear needs_review (the user has resolved it). Rows with no decision keep needs_review and resurface in the queue. """ kept: list[Activity] = [] stats = {"dropped": 0, "resolved": 0} for act in activities: key = content_key( act.normalized_name or normalize_name(act.name), act.language, act.description or "", ) entry = decisions.get(key) decision = entry.get("decision") if isinstance(entry, dict) else entry if decision == "drop": stats["dropped"] += 1 continue if decision in ("keep-separate", "merge"): act.needs_review = 0 stats["resolved"] += 1 kept.append(act) return kept, stats # -------------------------------------------------------------------------- # step 5b — enrichment overlay (plan Part B) # -------------------------------------------------------------------------- # Translation / inferred-filter fields written by run_enrichment.py. Applied # AFTER dedup + review decisions, keyed on the same stable content_key, so the # overlay survives rebuilds as long as extraction text is frozen. _ENRICHMENT_TEXT_FIELDS = ("name_ro", "description_ro", "rules_ro", "variations_ro") _ENRICHMENT_INT_FIELDS = ( "participants_min", "participants_max", "duration_min", "duration_max", "age_group_min", "age_group_max", ) def load_enrichment(path: Path) -> dict: """Load data/enrichment.json (flat map content_key -> field dict).""" if path and path.is_file(): try: data = json.loads(path.read_text(encoding="utf-8")) if isinstance(data, dict): return data except (json.JSONDecodeError, OSError): pass return {} def apply_enrichment(activities: list[Activity], enrichment: dict) -> dict: """ Overlay enrichment fields onto the post-dedup activity list (plan B2). Keyed by content_key. Only fields PRESENT in an entry are written; absent fields leave the underlying DB value untouched. indoor_outdoor / space_needed are normalized to slugs (None on unrecognised). Inferred fields are recorded in `estimated_fields`. Translated / expanded text is NOT re-validated against the source here — expansion fidelity is the enrichment prompt's responsibility (plan B2 comment). Returns {entries, matched, orphaned, fields_stated, fields_estimated}. """ from app.config_taxonomy import normalize_indoor_outdoor, normalize_space_needed matched_keys: set[str] = set() fields_stated: dict[str, int] = defaultdict(int) fields_estimated: dict[str, int] = defaultdict(int) for act in activities: key = content_key( act.normalized_name or normalize_name(act.name), act.language, act.description or "", ) entry = enrichment.get(key) if not isinstance(entry, dict): continue matched_keys.add(key) estimated = set(entry.get("estimated_fields") or []) # bilingual text twins for fld in _ENRICHMENT_TEXT_FIELDS: val = entry.get(fld) if isinstance(val, str) and val.strip(): setattr(act, fld, val.strip()) # inferred / clarified structured numeric fields for fld in _ENRICHMENT_INT_FIELDS: if entry.get(fld) is not None: try: setattr(act, fld, int(entry[fld])) except (TypeError, ValueError): pass # enum filters — normalized to slug, dropped if unrecognised if entry.get("indoor_outdoor") is not None: slug = normalize_indoor_outdoor(entry["indoor_outdoor"]) if slug: act.indoor_outdoor = slug if entry.get("space_needed") is not None: slug = normalize_space_needed(entry["space_needed"]) if slug: act.space_needed = slug act.estimated_fields = sorted(estimated) # QA tally: stated vs estimated population, per field for fld in (*_ENRICHMENT_INT_FIELDS, "indoor_outdoor", "space_needed"): if entry.get(fld) is None: continue if fld in estimated: fields_estimated[fld] += 1 else: fields_stated[fld] += 1 return { "entries": len(enrichment), "matched": len(matched_keys), "orphaned": len(enrichment) - len(matched_keys), "fields_stated": dict(fields_stated), "fields_estimated": dict(fields_estimated), } # -------------------------------------------------------------------------- # golden-set recall (plan §7) # -------------------------------------------------------------------------- def _golden_names(data: Any) -> list[str]: items = data.get("activities", data) if isinstance(data, dict) else data names: list[str] = [] for item in items or []: if isinstance(item, str): names.append(item) elif isinstance(item, dict) and item.get("name"): names.append(item["name"]) return names def golden_recall(golden_dir: Path, activities: list[Activity]) -> Optional[dict]: if not golden_dir or not golden_dir.is_dir(): return None found = {normalize_name(a.name) for a in activities} expected, hits = 0, 0 for gf in sorted(golden_dir.glob("*.json")): try: data = json.loads(gf.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): continue for name in _golden_names(data): expected += 1 if normalize_name(name) in found: hits += 1 if expected == 0: return None return {"expected": expected, "found": hits, "recall": round(hits / expected, 3)} # -------------------------------------------------------------------------- # load + validate + excerpt-check the extraction files # -------------------------------------------------------------------------- def collect_activities( extracted_dir: Path, chunks_dir: Path, sources_dir: Path, schema: dict, ) -> dict: """Validate, excerpt-check and convert every extraction file.""" rejected_dir = extracted_dir / "_rejected" activities: list[Activity] = [] report = { "files_total": 0, "files_valid": 0, "files_rejected_schema": 0, "activities_raw": 0, "activities_hallucinated": 0, "category_fallbacks": [], } raw_categories: list[tuple[str, str]] = [] from import_common import chunk_key_for # local import to avoid clutter for json_path in iter_extraction_files(extracted_dir): report["files_total"] += 1 try: data = json.loads(json_path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: _reject_file(json_path, rejected_dir, [f"invalid JSON: {exc}"]) report["files_rejected_schema"] += 1 continue from import_common import validate_extraction errors = validate_extraction(data, schema) if errors: _reject_file(json_path, rejected_dir, errors) report["files_rejected_schema"] += 1 continue report["files_valid"] += 1 header = data.get("header", {}) chunk_text = find_chunk_text(json_path, header, chunks_dir) chunk_key = chunk_key_for(json_path, header) source_id = header.get("source_id") or chunk_key.rsplit(".part", 1)[0] fallback_source = ( source_path_for(source_id, sources_dir) or source_id or json_path.stem ) hallucinated: list[dict] = [] for adict in data.get("activities", []): report["activities_raw"] += 1 excerpt = adict.get("source_excerpt") or "" # if the chunk text is unavailable we cannot verify — keep but the # QA report still counts it under activities_raw. if chunk_text is not None and not excerpt_matches(excerpt, chunk_text): hallucinated.append(adict) report["activities_hallucinated"] += 1 continue src = adict.get("source_file") or fallback_source raw_categories.append((adict.get("category", ""), normalize_category(adict.get("category", "")))) activities.append(dict_to_activity(adict, src, source_id, chunk_key)) if hallucinated: _log_hallucinations(json_path, rejected_dir, hallucinated) report["category_fallbacks"] = log_category_fallbacks(raw_categories) report["activities"] = activities return report def _reject_file(json_path: Path, rejected_dir: Path, errors: list[str]) -> None: rejected_dir.mkdir(parents=True, exist_ok=True) dest = rejected_dir / json_path.name shutil.move(str(json_path), str(dest)) log = rejected_dir / f"{json_path.stem}.errors.txt" log.write_text( f"REJECTED (schema validation): {json_path.name}\n\n" + "\n".join(f" - {e}" for e in errors) + "\n", encoding="utf-8", ) def _log_hallucinations( json_path: Path, rejected_dir: Path, hallucinated: list[dict] ) -> None: rejected_dir.mkdir(parents=True, exist_ok=True) log = rejected_dir / f"{json_path.stem}.hallucinations.txt" lines = [f"DROPPED activities (source_excerpt not found in chunk): {json_path.name}", ""] for a in hallucinated: lines.append(f" - {a.get('name')!r}") lines.append(f" excerpt: {a.get('source_excerpt')!r}") log.write_text("\n".join(lines) + "\n", encoding="utf-8") # -------------------------------------------------------------------------- # DB write + atomic swap # -------------------------------------------------------------------------- def _enrich_category_display_names(db_path: Path) -> None: """Give the categories table proper Romanian display names for slugs.""" import sqlite3 conn = sqlite3.connect(db_path) try: rows = conn.execute( "SELECT value FROM categories WHERE type = 'category'" ).fetchall() for (slug,) in rows: conn.execute( "UPDATE categories SET display_name = ? WHERE type='category' AND value = ?", (category_display_name(slug), slug), ) conn.commit() finally: conn.close() def write_database(db_tmp_path: Path, activities: list[Activity]) -> None: """Create a fresh tmp DB, bulk insert, populate categories, rebuild FTS.""" if db_tmp_path.exists(): db_tmp_path.unlink() db = DatabaseManager(str(db_tmp_path)) db.bulk_insert_activities(activities) _enrich_category_display_names(db_tmp_path) db.rebuild_fts_index() def atomic_swap(db_tmp_path: Path, db_path: Path) -> Optional[Path]: """Back up the live DB then atomically swap the tmp file in.""" backup: Optional[Path] = None if db_path.exists(): backup = db_path.with_suffix(db_path.suffix + ".bak") shutil.copy2(db_path, backup) os.replace(db_tmp_path, db_path) return backup # -------------------------------------------------------------------------- # orchestration # -------------------------------------------------------------------------- def rebuild( *, extracted_dir: Path, chunks_dir: Path, sources_dir: Path, db_path: Path, decisions_path: Optional[Path] = None, enrichment_path: Optional[Path] = None, schema_path: Path = DEFAULT_SCHEMA_PATH, golden_dir: Optional[Path] = None, do_swap: bool = True, ) -> dict: """ Full rebuild. Everything is built into .tmp; the live DB is only touched by the final atomic swap, so a crash anywhere above leaves it intact. """ extracted_dir = Path(extracted_dir) db_path = Path(db_path) db_tmp_path = db_path.with_suffix(db_path.suffix + ".tmp") schema = load_schema(schema_path) collected = collect_activities(extracted_dir, Path(chunks_dir), Path(sources_dir), schema) activities: list[Activity] = collected.pop("activities") deduped, dedup_stats = dedup_activities(activities) decisions = load_review_decisions(Path(decisions_path)) if decisions_path else {} final, decision_stats = apply_review_decisions(deduped, decisions) # Enrichment overlay — applied immediately after review decisions, on the # post-dedup list, keyed on the same stable content_key (plan B2). enrichment = load_enrichment(Path(enrichment_path)) if enrichment_path else {} enrichment_stats = apply_enrichment(final, enrichment) try: write_database(db_tmp_path, final) backup = atomic_swap(db_tmp_path, db_path) if do_swap else None except Exception: if db_tmp_path.exists(): db_tmp_path.unlink() raise report = { **collected, "dedup": dedup_stats, "decisions": decision_stats, "enrichment": enrichment_stats, "final_count": len(final), "backup": str(backup) if backup else None, "swapped": do_swap, "qa": _qa_report(final, collected, golden_dir), } return report def _qa_report( activities: list[Activity], collected: dict, golden_dir: Optional[Path] ) -> dict: per_category: dict[str, int] = defaultdict(int) per_content_type: dict[str, int] = defaultdict(int) confidence: dict[str, int] = defaultdict(int) with_rules = 0 for a in activities: per_category[a.category] += 1 per_content_type[a.content_type or "?"] += 1 confidence[a.extraction_confidence or "?"] += 1 if a.rules and a.rules.strip(): with_rules += 1 raw = collected.get("activities_raw", 0) hallucinated = collected.get("activities_hallucinated", 0) return { "total": len(activities), "per_category": dict(per_category), "per_content_type": dict(per_content_type), "extraction_confidence": dict(confidence), "pct_with_rules": round(100 * with_rules / len(activities), 1) if activities else 0.0, "needs_review": sum(1 for a in activities if a.needs_review), "hallucination_rate": round(100 * hallucinated / raw, 2) if raw else 0.0, "golden_recall": golden_recall(Path(golden_dir), activities) if golden_dir else None, } def print_report(report: dict) -> None: qa = report["qa"] print("=" * 60) print("BUILD DATABASE — QA REPORT") print("=" * 60) print(f"extraction files : {report['files_total']} " f"(valid {report['files_valid']}, schema-rejected {report['files_rejected_schema']})") print(f"activities raw : {report['activities_raw']}") print(f" hallucinated drop : {report['activities_hallucinated']} " f"({qa['hallucination_rate']}%)") d = report["dedup"] print(f"dedup : {d['input']} -> {d['output']} " f"(auto-merged {d['auto_merged']}, borderline {d['borderline']})") print(f"review decisions : dropped {report['decisions']['dropped']}, " f"resolved {report['decisions']['resolved']}") enr = report.get("enrichment") if enr and enr.get("entries"): print(f"enrichment : {enr['entries']} entries " f"(matched {enr['matched']}, orphaned {enr['orphaned']})") stated, estimated = enr.get("fields_stated", {}), enr.get("fields_estimated", {}) all_fields = sorted(set(stated) | set(estimated)) if all_fields: print(" field population : (stated / estimated)") for fld in all_fields: print(f" {fld:<18}: {stated.get(fld, 0)} / {estimated.get(fld, 0)}") print(f"final inserted : {report['final_count']}") print(f"% with rules : {qa['pct_with_rules']}") print(f"needs_review rows : {qa['needs_review']}") print("per category :") for slug, n in sorted(qa["per_category"].items(), key=lambda kv: -kv[1]): print(f" {slug:<24}: {n}") print("per content_type :") for ct, n in sorted(qa["per_content_type"].items(), key=lambda kv: -kv[1]): print(f" {ct:<24}: {n}") print("extraction_confidence:") for c, n in sorted(qa["extraction_confidence"].items()): print(f" {c:<24}: {n}") if qa["golden_recall"]: g = qa["golden_recall"] print(f"golden recall : {g['found']}/{g['expected']} = {g['recall']}") if report["category_fallbacks"]: print("category fallbacks :") for msg in report["category_fallbacks"]: print(f" {msg}") if report["backup"]: print(f"live DB backed up to : {report['backup']}") print("=" * 60) # -------------------------------------------------------------------------- # CLI # -------------------------------------------------------------------------- def main(argv: Optional[list[str]] = None) -> int: parser = argparse.ArgumentParser(description="Build activities.db from extraction JSON.") parser.add_argument("--rebuild", action="store_true", help="rebuild the database from scratch (only mode supported)") parser.add_argument("--extracted", default="data/extracted") parser.add_argument("--chunks", default="data/chunks") parser.add_argument("--sources", default="data/sources") parser.add_argument("--db", default="data/activities.db") parser.add_argument("--decisions", default="data/review_decisions.json") parser.add_argument("--enrichment", default="data/enrichment.json") parser.add_argument("--golden", default="data/golden") parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH)) args = parser.parse_args(argv) if not args.rebuild: parser.error("only --rebuild is supported (full rebuild, no incremental merge)") report = rebuild( extracted_dir=Path(args.extracted), chunks_dir=Path(args.chunks), sources_dir=Path(args.sources), db_path=Path(args.db), decisions_path=Path(args.decisions), enrichment_path=Path(args.enrichment), schema_path=Path(args.schema), golden_dir=Path(args.golden), ) print_report(report) return 0 if __name__ == "__main__": raise SystemExit(main())