Files
game-library/scripts/build_database.py
Claude Agent bcfb6841eb Faza 1 complete: bilingual+enrichment plumbing, UI/filters, frozen DB
Extraction finished (575/588 chunks; 6 content-filter-blocked, 7 await
re-extraction). DB rebuilt and frozen at 9418 activities — content_keys
are now stable for the enrichment overlay.

Part A (plumbing + UI):
- database.py: name_ro/description_ro/rules_ro/variations_ro, indoor_outdoor,
  space_needed, estimated_fields, source_id/source_ids/chunk_key columns;
  FTS5 indexes the 4 *_ro columns across CREATE + all 3 triggers; new equality
  filters + category counts for both axes.
- activity.py: new fields + bilingual display helpers (get_display_*,
  is_estimated, axis displays).
- config_taxonomy.py: INDOOR_OUTDOOR/SPACE_NEEDED enums + normalizers
  (None on unrecognised, no fabrication).
- search.py / routes.py / config.py / templates / css: new dropdowns,
  RO-primary rendering with "(estimat)" markers and collapsible original
  text, and a /source/<id> download route shipped DARK behind
  SOURCE_DOWNLOAD_ENABLED (copyright opt-in).
- build_database.py: source_id/chunk_key in dict_to_activity; merge_cluster
  unions source_ids without touching enrichment fields.

Part B (enrichment pipeline, built not yet run):
- build_database.py: load_enrichment + apply_enrichment (post-dedup, keyed on
  content_key) + --enrichment CLI + stated-vs-estimated QA.
- run_enrichment.py (resumable, --source/--limit pilot scoping, --collect),
  ENRICHMENT_PROMPT.md.

Repair: scripts/repair_extractions.py fixes the subagents' systematic
unescaped-ASCII-quote bug with a faithful char-scanner (escapes, never
truncates) + schema validation + a strictly-more-text guard. json_repair was
tried first, truncated silently, and is NOT used. build_database has no repair
dependency.

Tests: tests/test_enrichment.py added; 99 pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 18:10:13 +00:00

781 lines
30 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
build_database.py — build data/activities.db from the subagent extraction JSON.
Replaces the old import_claude_activities.py. Pipeline (plan §4):
1. `--rebuild` builds into data/activities.db.tmp; on success the live DB is
backed up to data/activities.db.bak and the tmp file is swapped in with an
atomic os.replace. A mid-build crash leaves the live DB untouched.
2. Every data/extracted/*.json is validated against scripts/activity_schema.json;
invalid files are moved to data/extracted/_rejected/ with an error log.
2b. Each source_excerpt must appear as a fuzzy substring (rapidfuzz
partial_ratio >= 90) of its source chunk — non-matches are hallucinations
and the activity is dropped (logged to _rejected/).
3. `category` is normalized to a valid taxonomy slug (fallback `altele`).
4. Dedup (D5): group by exact normalized_name, never across languages; within a
group rapidfuzz on descriptions — >=85 auto-merge, 60-85 borderline (keep
both, needs_review), <60 separate variants.
5. data/review_decisions.json is applied before insert.
6. Bulk insert into the tmp DB, populate the categories table, rebuild FTS.
7. A QA report is printed.
Usage:
python scripts/build_database.py --rebuild
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Optional
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
for _p in (str(SCRIPT_DIR), str(REPO_ROOT)):
if _p not in sys.path:
sys.path.insert(0, _p)
from app.config_taxonomy import ( # noqa: E402
category_display_name,
normalize_category,
normalize_content_type,
)
from app.models.activity import Activity # noqa: E402
from app.models.database import DatabaseManager # noqa: E402
from import_common import ( # noqa: E402
DEFAULT_SCHEMA_PATH,
content_key,
excerpt_matches,
find_chunk_text,
iter_extraction_files,
load_schema,
normalize_name,
source_path_for,
)
# dedup thresholds (rapidfuzz token_sort_ratio, 0..100 scale)
AUTO_MERGE_THRESHOLD = 85.0
BORDERLINE_THRESHOLD = 60.0
# --------------------------------------------------------------------------
# extraction dict -> Activity
# --------------------------------------------------------------------------
def _csv(value: Any) -> Optional[str]:
"""Schema arrays -> comma string for the (TEXT) DB columns."""
if value is None:
return None
if isinstance(value, str):
return value.strip() or None
if isinstance(value, (list, tuple)):
parts = [str(v).strip() for v in value if str(v).strip()]
return ", ".join(parts) or None
return str(value)
def _split_csv(value: Optional[str]) -> list[str]:
if not value:
return []
return [p.strip() for p in str(value).split(",") if p.strip()]
def dict_to_activity(
adict: dict,
source_file: str,
source_id: Optional[str] = None,
chunk_key: Optional[str] = None,
) -> Activity:
"""Build an Activity from one extraction-JSON activity object."""
tags = adict.get("tags") or []
if isinstance(tags, str):
tags = _split_csv(tags)
source_files = adict.get("source_files") or []
if isinstance(source_files, str):
source_files = _split_csv(source_files)
if source_file and source_file not in source_files:
source_files = [source_file, *source_files]
return Activity(
source_id=source_id,
source_ids=[source_id] if source_id else [],
chunk_key=chunk_key,
name=(adict.get("name") or "").strip(),
description=(adict.get("description") or "").strip(),
rules=adict.get("rules"),
variations=adict.get("variations"),
category=normalize_category(adict.get("category", "")),
subcategory=adict.get("subcategory"),
content_type=normalize_content_type(adict.get("content_type", "")),
source_file=source_file,
source_files=list(source_files),
page_reference=adict.get("page_reference"),
source_excerpt=adict.get("source_excerpt"),
age_group_min=adict.get("age_group_min"),
age_group_max=adict.get("age_group_max"),
participants_min=adict.get("participants_min"),
participants_max=adict.get("participants_max"),
duration_min=adict.get("duration_min"),
duration_max=adict.get("duration_max"),
materials_category=adict.get("materials_category"),
materials_list=_csv(adict.get("materials_list")),
skills_developed=_csv(adict.get("skills_developed")),
difficulty_level=adict.get("difficulty_level"),
keywords=_csv(adict.get("keywords")),
tags=list(tags),
language=adict.get("language"),
extraction_confidence=adict.get("extraction_confidence"),
)
# --------------------------------------------------------------------------
# step 3 — category normalization is done in dict_to_activity; a non-taxonomy
# value silently falls back to `altele`. This logs the substitutions.
# --------------------------------------------------------------------------
def log_category_fallbacks(raw_pairs: list[tuple[str, str]]) -> list[str]:
"""raw_pairs = (original, slug); return human-readable fallback messages."""
msgs = []
for original, slug in raw_pairs:
if slug == "altele" and normalize_name(original or "") not in ("", "altele"):
msgs.append(f"category '{original}' -> altele (not in taxonomy)")
return msgs
# --------------------------------------------------------------------------
# step 4 — dedup
# --------------------------------------------------------------------------
def _longest(*values: Optional[str]) -> Optional[str]:
best: Optional[str] = None
for v in values:
if v and (best is None or len(v) > len(best)):
best = v
return best
def _union_csv(values: list[Optional[str]]) -> Optional[str]:
seen: list[str] = []
for value in values:
for item in _split_csv(value):
if item not in seen:
seen.append(item)
return ", ".join(seen) or None
def merge_cluster(cluster: list[Activity]) -> Activity:
"""Collapse a cluster of duplicate activities into one merged Activity."""
if len(cluster) == 1:
return cluster[0]
# representative = the one with the longest description
rep = max(cluster, key=lambda a: len(a.description or ""))
merged = Activity(
name=rep.name,
description=_longest(*(a.description for a in cluster)) or rep.description,
rules=_longest(*(a.rules for a in cluster)),
variations=_longest(*(a.variations for a in cluster)),
category=rep.category,
subcategory=rep.subcategory,
content_type=rep.content_type,
source_file=rep.source_file,
page_reference=rep.page_reference,
source_excerpt=rep.source_excerpt,
age_group_min=rep.age_group_min,
age_group_max=rep.age_group_max,
participants_min=rep.participants_min,
participants_max=rep.participants_max,
duration_min=rep.duration_min,
duration_max=rep.duration_max,
materials_category=rep.materials_category,
materials_list=_union_csv([a.materials_list for a in cluster]),
skills_developed=_union_csv([a.skills_developed for a in cluster]),
difficulty_level=rep.difficulty_level,
keywords=_union_csv([a.keywords for a in cluster]),
language=rep.language,
extraction_confidence=rep.extraction_confidence,
)
# union of tags
tags: list[str] = []
for a in cluster:
for t in a.tags or []:
if t not in tags:
tags.append(t)
merged.tags = tags
# accumulate every source the activity was seen in
sources: list[str] = []
for a in cluster:
for s in [a.source_file, *(a.source_files or [])]:
if s and s not in sources:
sources.append(s)
merged.source_files = sources
# source provenance: keep rep's chunk_key/source_id as primary, union the
# source_ids for the download route. Enrichment fields (name_ro,
# description_ro, indoor_outdoor, ...) are intentionally NOT carried here:
# enrichment is applied AFTER dedup (plan Part B2), keyed on the merged
# row's content_key, so merging must not pre-populate them.
merged.source_id = rep.source_id
merged.chunk_key = rep.chunk_key
source_ids: list[str] = []
for a in cluster:
for sid in [a.source_id, *(a.source_ids or [])]:
if sid and sid not in source_ids:
source_ids.append(sid)
merged.source_ids = source_ids
# popularity_score++ per merged duplicate (plan §4)
merged.popularity_score = max(a.popularity_score for a in cluster) + (len(cluster) - 1)
return merged
def dedup_activities(activities: list[Activity]) -> tuple[list[Activity], dict]:
"""
Dedup per plan D5.
Groups by (normalized_name, language) — different languages are NEVER
merged. Within a group, descriptions are clustered with rapidfuzz:
>= 85 -> same cluster (auto-merge)
60-85 -> borderline: kept as separate clusters, both flagged needs_review
< 60 -> separate variants
"""
from rapidfuzz import fuzz
groups: dict[tuple, list[Activity]] = defaultdict(list)
for act in activities:
key = (act.normalized_name or normalize_name(act.name), act.language)
groups[key].append(act)
result: list[Activity] = []
stats = {"input": len(activities), "auto_merged": 0, "borderline": 0, "output": 0}
for members in groups.values():
clusters: list[list[Activity]] = []
borderline_idx: set[int] = set()
for act in members:
best_idx, best_score = -1, -1.0
borderline_here: list[int] = []
for idx, cluster in enumerate(clusters):
score = fuzz.token_sort_ratio(
act.description or "", cluster[0].description or ""
)
if score >= AUTO_MERGE_THRESHOLD:
if score > best_score:
best_idx, best_score = idx, score
elif score >= BORDERLINE_THRESHOLD:
borderline_here.append(idx)
if best_idx >= 0:
clusters[best_idx].append(act)
else:
clusters.append([act])
new_idx = len(clusters) - 1
for bidx in borderline_here:
borderline_idx.add(bidx)
borderline_idx.add(new_idx)
for idx, cluster in enumerate(clusters):
merged = merge_cluster(cluster)
if len(cluster) > 1:
stats["auto_merged"] += len(cluster) - 1
if idx in borderline_idx:
merged.needs_review = 1
stats["borderline"] += 1
result.append(merged)
stats["output"] = len(result)
return result, stats
# --------------------------------------------------------------------------
# step 5 — review decisions
# --------------------------------------------------------------------------
def load_review_decisions(path: Path) -> dict:
if path and path.is_file():
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
except (json.JSONDecodeError, OSError):
pass
return {}
def apply_review_decisions(
activities: list[Activity], decisions: dict
) -> tuple[list[Activity], dict]:
"""
Apply data/review_decisions.json (plan §5c).
Keyed by the stable content_key. A decision of `drop` removes the row;
`keep-separate` / `merge` clear needs_review (the user has resolved it).
Rows with no decision keep needs_review and resurface in the queue.
"""
kept: list[Activity] = []
stats = {"dropped": 0, "resolved": 0}
for act in activities:
key = content_key(
act.normalized_name or normalize_name(act.name),
act.language,
act.description or "",
)
entry = decisions.get(key)
decision = entry.get("decision") if isinstance(entry, dict) else entry
if decision == "drop":
stats["dropped"] += 1
continue
if decision in ("keep-separate", "merge"):
act.needs_review = 0
stats["resolved"] += 1
kept.append(act)
return kept, stats
# --------------------------------------------------------------------------
# step 5b — enrichment overlay (plan Part B)
# --------------------------------------------------------------------------
# Translation / inferred-filter fields written by run_enrichment.py. Applied
# AFTER dedup + review decisions, keyed on the same stable content_key, so the
# overlay survives rebuilds as long as extraction text is frozen.
_ENRICHMENT_TEXT_FIELDS = ("name_ro", "description_ro", "rules_ro", "variations_ro")
_ENRICHMENT_INT_FIELDS = (
"participants_min", "participants_max",
"duration_min", "duration_max",
"age_group_min", "age_group_max",
)
def load_enrichment(path: Path) -> dict:
"""Load data/enrichment.json (flat map content_key -> field dict)."""
if path and path.is_file():
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
except (json.JSONDecodeError, OSError):
pass
return {}
def apply_enrichment(activities: list[Activity], enrichment: dict) -> dict:
"""
Overlay enrichment fields onto the post-dedup activity list (plan B2).
Keyed by content_key. Only fields PRESENT in an entry are written; absent
fields leave the underlying DB value untouched. indoor_outdoor /
space_needed are normalized to slugs (None on unrecognised). Inferred
fields are recorded in `estimated_fields`. Translated / expanded text is
NOT re-validated against the source here — expansion fidelity is the
enrichment prompt's responsibility (plan B2 comment).
Returns {entries, matched, orphaned, fields_stated, fields_estimated}.
"""
from app.config_taxonomy import normalize_indoor_outdoor, normalize_space_needed
matched_keys: set[str] = set()
fields_stated: dict[str, int] = defaultdict(int)
fields_estimated: dict[str, int] = defaultdict(int)
for act in activities:
key = content_key(
act.normalized_name or normalize_name(act.name),
act.language,
act.description or "",
)
entry = enrichment.get(key)
if not isinstance(entry, dict):
continue
matched_keys.add(key)
estimated = set(entry.get("estimated_fields") or [])
# bilingual text twins
for fld in _ENRICHMENT_TEXT_FIELDS:
val = entry.get(fld)
if isinstance(val, str) and val.strip():
setattr(act, fld, val.strip())
# inferred / clarified structured numeric fields
for fld in _ENRICHMENT_INT_FIELDS:
if entry.get(fld) is not None:
try:
setattr(act, fld, int(entry[fld]))
except (TypeError, ValueError):
pass
# enum filters — normalized to slug, dropped if unrecognised
if entry.get("indoor_outdoor") is not None:
slug = normalize_indoor_outdoor(entry["indoor_outdoor"])
if slug:
act.indoor_outdoor = slug
if entry.get("space_needed") is not None:
slug = normalize_space_needed(entry["space_needed"])
if slug:
act.space_needed = slug
act.estimated_fields = sorted(estimated)
# QA tally: stated vs estimated population, per field
for fld in (*_ENRICHMENT_INT_FIELDS, "indoor_outdoor", "space_needed"):
if entry.get(fld) is None:
continue
if fld in estimated:
fields_estimated[fld] += 1
else:
fields_stated[fld] += 1
return {
"entries": len(enrichment),
"matched": len(matched_keys),
"orphaned": len(enrichment) - len(matched_keys),
"fields_stated": dict(fields_stated),
"fields_estimated": dict(fields_estimated),
}
# --------------------------------------------------------------------------
# golden-set recall (plan §7)
# --------------------------------------------------------------------------
def _golden_names(data: Any) -> list[str]:
items = data.get("activities", data) if isinstance(data, dict) else data
names: list[str] = []
for item in items or []:
if isinstance(item, str):
names.append(item)
elif isinstance(item, dict) and item.get("name"):
names.append(item["name"])
return names
def golden_recall(golden_dir: Path, activities: list[Activity]) -> Optional[dict]:
if not golden_dir or not golden_dir.is_dir():
return None
found = {normalize_name(a.name) for a in activities}
expected, hits = 0, 0
for gf in sorted(golden_dir.glob("*.json")):
try:
data = json.loads(gf.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
continue
for name in _golden_names(data):
expected += 1
if normalize_name(name) in found:
hits += 1
if expected == 0:
return None
return {"expected": expected, "found": hits, "recall": round(hits / expected, 3)}
# --------------------------------------------------------------------------
# load + validate + excerpt-check the extraction files
# --------------------------------------------------------------------------
def collect_activities(
extracted_dir: Path,
chunks_dir: Path,
sources_dir: Path,
schema: dict,
) -> dict:
"""Validate, excerpt-check and convert every extraction file."""
rejected_dir = extracted_dir / "_rejected"
activities: list[Activity] = []
report = {
"files_total": 0,
"files_valid": 0,
"files_rejected_schema": 0,
"activities_raw": 0,
"activities_hallucinated": 0,
"category_fallbacks": [],
}
raw_categories: list[tuple[str, str]] = []
from import_common import chunk_key_for # local import to avoid clutter
for json_path in iter_extraction_files(extracted_dir):
report["files_total"] += 1
try:
data = json.loads(json_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
_reject_file(json_path, rejected_dir, [f"invalid JSON: {exc}"])
report["files_rejected_schema"] += 1
continue
from import_common import validate_extraction
errors = validate_extraction(data, schema)
if errors:
_reject_file(json_path, rejected_dir, errors)
report["files_rejected_schema"] += 1
continue
report["files_valid"] += 1
header = data.get("header", {})
chunk_text = find_chunk_text(json_path, header, chunks_dir)
chunk_key = chunk_key_for(json_path, header)
source_id = header.get("source_id") or chunk_key.rsplit(".part", 1)[0]
fallback_source = (
source_path_for(source_id, sources_dir) or source_id or json_path.stem
)
hallucinated: list[dict] = []
for adict in data.get("activities", []):
report["activities_raw"] += 1
excerpt = adict.get("source_excerpt") or ""
# if the chunk text is unavailable we cannot verify — keep but the
# QA report still counts it under activities_raw.
if chunk_text is not None and not excerpt_matches(excerpt, chunk_text):
hallucinated.append(adict)
report["activities_hallucinated"] += 1
continue
src = adict.get("source_file") or fallback_source
raw_categories.append((adict.get("category", ""), normalize_category(adict.get("category", ""))))
activities.append(dict_to_activity(adict, src, source_id, chunk_key))
if hallucinated:
_log_hallucinations(json_path, rejected_dir, hallucinated)
report["category_fallbacks"] = log_category_fallbacks(raw_categories)
report["activities"] = activities
return report
def _reject_file(json_path: Path, rejected_dir: Path, errors: list[str]) -> None:
rejected_dir.mkdir(parents=True, exist_ok=True)
dest = rejected_dir / json_path.name
shutil.move(str(json_path), str(dest))
log = rejected_dir / f"{json_path.stem}.errors.txt"
log.write_text(
f"REJECTED (schema validation): {json_path.name}\n\n"
+ "\n".join(f" - {e}" for e in errors)
+ "\n",
encoding="utf-8",
)
def _log_hallucinations(
json_path: Path, rejected_dir: Path, hallucinated: list[dict]
) -> None:
rejected_dir.mkdir(parents=True, exist_ok=True)
log = rejected_dir / f"{json_path.stem}.hallucinations.txt"
lines = [f"DROPPED activities (source_excerpt not found in chunk): {json_path.name}", ""]
for a in hallucinated:
lines.append(f" - {a.get('name')!r}")
lines.append(f" excerpt: {a.get('source_excerpt')!r}")
log.write_text("\n".join(lines) + "\n", encoding="utf-8")
# --------------------------------------------------------------------------
# DB write + atomic swap
# --------------------------------------------------------------------------
def _enrich_category_display_names(db_path: Path) -> None:
"""Give the categories table proper Romanian display names for slugs."""
import sqlite3
conn = sqlite3.connect(db_path)
try:
rows = conn.execute(
"SELECT value FROM categories WHERE type = 'category'"
).fetchall()
for (slug,) in rows:
conn.execute(
"UPDATE categories SET display_name = ? WHERE type='category' AND value = ?",
(category_display_name(slug), slug),
)
conn.commit()
finally:
conn.close()
def write_database(db_tmp_path: Path, activities: list[Activity]) -> None:
"""Create a fresh tmp DB, bulk insert, populate categories, rebuild FTS."""
if db_tmp_path.exists():
db_tmp_path.unlink()
db = DatabaseManager(str(db_tmp_path))
db.bulk_insert_activities(activities)
_enrich_category_display_names(db_tmp_path)
db.rebuild_fts_index()
def atomic_swap(db_tmp_path: Path, db_path: Path) -> Optional[Path]:
"""Back up the live DB then atomically swap the tmp file in."""
backup: Optional[Path] = None
if db_path.exists():
backup = db_path.with_suffix(db_path.suffix + ".bak")
shutil.copy2(db_path, backup)
os.replace(db_tmp_path, db_path)
return backup
# --------------------------------------------------------------------------
# orchestration
# --------------------------------------------------------------------------
def rebuild(
*,
extracted_dir: Path,
chunks_dir: Path,
sources_dir: Path,
db_path: Path,
decisions_path: Optional[Path] = None,
enrichment_path: Optional[Path] = None,
schema_path: Path = DEFAULT_SCHEMA_PATH,
golden_dir: Optional[Path] = None,
do_swap: bool = True,
) -> dict:
"""
Full rebuild. Everything is built into <db_path>.tmp; the live DB is only
touched by the final atomic swap, so a crash anywhere above leaves it intact.
"""
extracted_dir = Path(extracted_dir)
db_path = Path(db_path)
db_tmp_path = db_path.with_suffix(db_path.suffix + ".tmp")
schema = load_schema(schema_path)
collected = collect_activities(extracted_dir, Path(chunks_dir), Path(sources_dir), schema)
activities: list[Activity] = collected.pop("activities")
deduped, dedup_stats = dedup_activities(activities)
decisions = load_review_decisions(Path(decisions_path)) if decisions_path else {}
final, decision_stats = apply_review_decisions(deduped, decisions)
# Enrichment overlay — applied immediately after review decisions, on the
# post-dedup list, keyed on the same stable content_key (plan B2).
enrichment = load_enrichment(Path(enrichment_path)) if enrichment_path else {}
enrichment_stats = apply_enrichment(final, enrichment)
try:
write_database(db_tmp_path, final)
backup = atomic_swap(db_tmp_path, db_path) if do_swap else None
except Exception:
if db_tmp_path.exists():
db_tmp_path.unlink()
raise
report = {
**collected,
"dedup": dedup_stats,
"decisions": decision_stats,
"enrichment": enrichment_stats,
"final_count": len(final),
"backup": str(backup) if backup else None,
"swapped": do_swap,
"qa": _qa_report(final, collected, golden_dir),
}
return report
def _qa_report(
activities: list[Activity], collected: dict, golden_dir: Optional[Path]
) -> dict:
per_category: dict[str, int] = defaultdict(int)
per_content_type: dict[str, int] = defaultdict(int)
confidence: dict[str, int] = defaultdict(int)
with_rules = 0
for a in activities:
per_category[a.category] += 1
per_content_type[a.content_type or "?"] += 1
confidence[a.extraction_confidence or "?"] += 1
if a.rules and a.rules.strip():
with_rules += 1
raw = collected.get("activities_raw", 0)
hallucinated = collected.get("activities_hallucinated", 0)
return {
"total": len(activities),
"per_category": dict(per_category),
"per_content_type": dict(per_content_type),
"extraction_confidence": dict(confidence),
"pct_with_rules": round(100 * with_rules / len(activities), 1) if activities else 0.0,
"needs_review": sum(1 for a in activities if a.needs_review),
"hallucination_rate": round(100 * hallucinated / raw, 2) if raw else 0.0,
"golden_recall": golden_recall(Path(golden_dir), activities) if golden_dir else None,
}
def print_report(report: dict) -> None:
qa = report["qa"]
print("=" * 60)
print("BUILD DATABASE — QA REPORT")
print("=" * 60)
print(f"extraction files : {report['files_total']} "
f"(valid {report['files_valid']}, schema-rejected {report['files_rejected_schema']})")
print(f"activities raw : {report['activities_raw']}")
print(f" hallucinated drop : {report['activities_hallucinated']} "
f"({qa['hallucination_rate']}%)")
d = report["dedup"]
print(f"dedup : {d['input']} -> {d['output']} "
f"(auto-merged {d['auto_merged']}, borderline {d['borderline']})")
print(f"review decisions : dropped {report['decisions']['dropped']}, "
f"resolved {report['decisions']['resolved']}")
enr = report.get("enrichment")
if enr and enr.get("entries"):
print(f"enrichment : {enr['entries']} entries "
f"(matched {enr['matched']}, orphaned {enr['orphaned']})")
stated, estimated = enr.get("fields_stated", {}), enr.get("fields_estimated", {})
all_fields = sorted(set(stated) | set(estimated))
if all_fields:
print(" field population : (stated / estimated)")
for fld in all_fields:
print(f" {fld:<18}: {stated.get(fld, 0)} / {estimated.get(fld, 0)}")
print(f"final inserted : {report['final_count']}")
print(f"% with rules : {qa['pct_with_rules']}")
print(f"needs_review rows : {qa['needs_review']}")
print("per category :")
for slug, n in sorted(qa["per_category"].items(), key=lambda kv: -kv[1]):
print(f" {slug:<24}: {n}")
print("per content_type :")
for ct, n in sorted(qa["per_content_type"].items(), key=lambda kv: -kv[1]):
print(f" {ct:<24}: {n}")
print("extraction_confidence:")
for c, n in sorted(qa["extraction_confidence"].items()):
print(f" {c:<24}: {n}")
if qa["golden_recall"]:
g = qa["golden_recall"]
print(f"golden recall : {g['found']}/{g['expected']} = {g['recall']}")
if report["category_fallbacks"]:
print("category fallbacks :")
for msg in report["category_fallbacks"]:
print(f" {msg}")
if report["backup"]:
print(f"live DB backed up to : {report['backup']}")
print("=" * 60)
# --------------------------------------------------------------------------
# CLI
# --------------------------------------------------------------------------
def main(argv: Optional[list[str]] = None) -> int:
parser = argparse.ArgumentParser(description="Build activities.db from extraction JSON.")
parser.add_argument("--rebuild", action="store_true",
help="rebuild the database from scratch (only mode supported)")
parser.add_argument("--extracted", default="data/extracted")
parser.add_argument("--chunks", default="data/chunks")
parser.add_argument("--sources", default="data/sources")
parser.add_argument("--db", default="data/activities.db")
parser.add_argument("--decisions", default="data/review_decisions.json")
parser.add_argument("--enrichment", default="data/enrichment.json")
parser.add_argument("--golden", default="data/golden")
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH))
args = parser.parse_args(argv)
if not args.rebuild:
parser.error("only --rebuild is supported (full rebuild, no incremental merge)")
report = rebuild(
extracted_dir=Path(args.extracted),
chunks_dir=Path(args.chunks),
sources_dir=Path(args.sources),
db_path=Path(args.db),
decisions_path=Path(args.decisions),
enrichment_path=Path(args.enrichment),
schema_path=Path(args.schema),
golden_dir=Path(args.golden),
)
print_report(report)
return 0
if __name__ == "__main__":
raise SystemExit(main())