Files
clawd/tools/update_notes_index.py
Echo 10fb3d6fb5 refactor: mutat kb/ -> memory/kb/ pentru memory search
- Mutat toate fișierele din kb/ în memory/kb/
- Actualizat toate referințele în fișiere (.md, .py, .html)
- Actualizat 10 joburi cron cu noi căi
- Memory search indexează acum 58 fișiere din memory/
- TOOLS.md actualizat cu documentație completă
2026-02-01 21:18:45 +00:00

290 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Generează index.json pentru KB din fișierele .md
Scanează: memory/kb/, memory/, conversations/
Extrage titlu, dată, tags, și domenii (@work, @health, etc.)
"""
import os
import re
import json
from pathlib import Path
from datetime import datetime
BASE_DIR = Path(__file__).parent.parent
KB_ROOT = BASE_DIR / "memory" / "kb"
MEMORY_DIR = BASE_DIR / "memory"
CONVERSATIONS_DIR = BASE_DIR / "conversations"
INDEX_FILE = KB_ROOT / "index.json"
# Domenii de agenți
VALID_DOMAINS = ['work', 'health', 'growth', 'sprijin', 'scout']
# Tipuri speciale (pentru grup-sprijin etc.)
VALID_TYPES = ['exercitiu', 'meditatie', 'reflectie', 'intrebare', 'fisa', 'project', 'memory', 'conversation', 'coaching']
# Cache for rules files
_rules_cache = {}
def load_rules(filepath):
"""Încarcă regulile din .rules.json din directorul fișierului sau părinți"""
dir_path = filepath.parent
# Check cache
if str(dir_path) in _rules_cache:
return _rules_cache[str(dir_path)]
# Look for .rules.json in current dir and parents (up to memory/kb/)
rules = {
"defaultDomains": [],
"defaultTypes": [],
"defaultTags": [],
"inferTypeFromFilename": False,
"filenameTypeMap": {}
}
# Collect rules from all levels (child rules override parent)
rules_chain = []
current = dir_path
while current >= KB_ROOT:
rules_file = current / ".rules.json"
if rules_file.exists():
try:
with open(rules_file, 'r', encoding='utf-8') as f:
rules_chain.insert(0, json.load(f)) # Parent first
except:
pass
current = current.parent
# Merge rules (child overrides parent)
for r in rules_chain:
for key in rules:
if key in r:
if isinstance(rules[key], list):
# Extend lists (don't override)
rules[key] = list(set(rules[key] + r[key]))
else:
rules[key] = r[key]
_rules_cache[str(dir_path)] = rules
return rules
def extract_metadata(filepath, category, subcategory=None):
"""Extrage metadata din fișierul markdown"""
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Extrage titlul (prima linie cu #)
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
title = title_match.group(1) if title_match else filepath.stem
# Extrage tags (linia cu **Tags:** sau tags:)
tags = []
domains = []
types = []
tags_match = re.search(r'\*\*Tags?:\*\*\s*(.+)$|^Tags?:\s*(.+)$', content, re.MULTILINE | re.IGNORECASE)
if tags_match:
tags_str = tags_match.group(1) or tags_match.group(2)
# Extrage domenii (@work, @health, etc.)
domain_matches = re.findall(r'@(\w+)', tags_str)
domains = [d for d in domain_matches if d in VALID_DOMAINS]
types = [d for d in domain_matches if d in VALID_TYPES]
# Extrage tags normale (#tag)
all_tags = re.findall(r'#([\w-]+)', tags_str)
tags = [t for t in all_tags if t not in VALID_DOMAINS and t not in VALID_TYPES]
# Aplică reguli din .rules.json (dacă există)
rules = load_rules(filepath)
# Adaugă domains implicite (dacă nu sunt deja)
for d in rules.get("defaultDomains", []):
if d not in domains:
domains.append(d)
# Adaugă types implicite
for t in rules.get("defaultTypes", []):
if t not in types:
types.append(t)
# Adaugă tags implicite
for t in rules.get("defaultTags", []):
if t not in tags:
tags.append(t)
# Inferă type din filename (dacă e configurat)
if rules.get("inferTypeFromFilename"):
filename_lower = filepath.stem.lower()
for pattern, type_name in rules.get("filenameTypeMap", {}).items():
if pattern in filename_lower and type_name not in types:
types.append(type_name)
break
# Extrage data din filename (YYYY-MM-DD_slug.md sau YYYY-MM-DD.md)
date_match = re.match(r'(\d{4}-\d{2}-\d{2})', filepath.name)
date = date_match.group(1) if date_match else ""
# Pentru fișiere fără dată în nume, folosește mtime
if not date:
mtime = filepath.stat().st_mtime
date = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d')
# Extrage video URL
video_match = re.search(r'\*\*(?:Video|Link):\*\*\s*(https?://[^\s]+)', content)
video_url = video_match.group(1) if video_match else ""
# Extrage TL;DR sau primele 200 caractere de conținut
tldr = ""
tldr_match = re.search(r'##\s*📋?\s*TL;DR\s*\n+(.+?)(?=\n##|\n---|\Z)', content, re.DOTALL)
if tldr_match:
tldr = tldr_match.group(1).strip()[:200]
else:
# Fallback: primul paragraf după titlu
para_match = re.search(r'^#.+\n+(.+?)(?=\n\n|\n#|\Z)', content, re.DOTALL)
if para_match:
tldr = para_match.group(1).strip()[:200]
if len(tldr) >= 200:
tldr += "..."
# Construiește path-ul relativ pentru web (din dashboard/)
# Dashboard are symlinks: notes-data -> ../kb, memory -> ../memory, conversations -> ../conversations
rel_path = str(filepath.relative_to(BASE_DIR))
# Transformă memory/kb/... în notes-data/... pentru web
if rel_path.startswith('memory/kb/'):
rel_path = 'notes-data/' + rel_path[3:]
return {
"file": rel_path,
"title": title,
"date": date,
"tags": tags,
"domains": domains,
"types": types,
"category": category,
"project": subcategory, # primul nivel sub projects/ (grup-sprijin, vending-master)
"subdir": None, # se setează în scan_directory pentru niveluri mai adânci
"video": video_url,
"tldr": tldr
}
def scan_directory(dir_path, category, subcategory=None, recursive=False):
"""Scanează un director pentru fișiere .md"""
notes = []
if not dir_path.exists():
return notes
# Defaults pentru categorii speciale (memory/, conversations/)
category_defaults = {
"memory": {"types": ["memory"], "domains": []},
"conversations": {"types": ["conversation"], "domains": []}
}
if recursive:
# Scanează recursiv
for filepath in dir_path.rglob("*.md"):
if filepath.name.startswith('.') or 'template' in filepath.name.lower():
continue
try:
# Determină project și subdir din path
# Ex: projects/grup-sprijin/biblioteca/file.md
# project = grup-sprijin, subdir = biblioteca
rel_to_dir = filepath.relative_to(dir_path)
parts = rel_to_dir.parts[:-1] # exclude filename
project = parts[0] if len(parts) > 0 else None
subdir = parts[1] if len(parts) > 1 else None
metadata = extract_metadata(filepath, category, project)
metadata['subdir'] = subdir
notes.append(metadata)
except Exception as e:
print(f" ! Error processing {filepath}: {e}")
else:
# Scanează doar fișierele din director (nu subdirectoare)
for filepath in sorted(dir_path.glob("*.md"), reverse=True):
if filepath.name.startswith('.') or 'template' in filepath.name.lower():
continue
try:
metadata = extract_metadata(filepath, category, subcategory)
# Aplică defaults pentru categoria specială
if category in category_defaults:
defaults = category_defaults[category]
for t in defaults.get("types", []):
if t not in metadata["types"]:
metadata["types"].append(t)
for d in defaults.get("domains", []):
if d not in metadata["domains"]:
metadata["domains"].append(d)
notes.append(metadata)
except Exception as e:
print(f" ! Error processing {filepath}: {e}")
return notes
def generate_index():
"""Generează index.json din toate sursele"""
all_notes = []
# Stats
domain_stats = {d: 0 for d in VALID_DOMAINS}
category_stats = {}
# Scanează TOATE subdirectoarele din memory/kb/ recursiv
print("Scanning memory/kb/ (all subdirectories)...")
for subdir in sorted(KB_ROOT.iterdir()):
if subdir.is_dir() and not subdir.name.startswith('.'):
category = subdir.name
print(f" [{category}]")
notes = scan_directory(subdir, category, recursive=True)
all_notes.extend(notes)
category_stats[category] = len(notes)
for n in notes:
sub = f"/{n['subcategory']}" if n.get('subcategory') else ""
print(f" + {n['title'][:42]}...")
for d in n['domains']:
domain_stats[d] += 1
# 4. Scanează memory/
print("Scanning memory/...")
memory_notes = scan_directory(MEMORY_DIR, "memory")
all_notes.extend(memory_notes)
category_stats["memory"] = len(memory_notes)
for n in memory_notes:
print(f" + {n['title'][:45]}...")
# 5. Scanează conversations/
print("Scanning conversations/...")
conv_notes = scan_directory(CONVERSATIONS_DIR, "conversations")
all_notes.extend(conv_notes)
category_stats["conversations"] = len(conv_notes)
for n in conv_notes:
print(f" + {n['title'][:45]}...")
# Sortează după dată descrescător
all_notes.sort(key=lambda x: x['date'], reverse=True)
# Adaugă metadata globală
output = {
"notes": all_notes,
"stats": {
"total": len(all_notes),
"by_domain": domain_stats,
"by_category": category_stats
},
"domains": VALID_DOMAINS,
"types": VALID_TYPES,
"categories": list(category_stats.keys())
}
with open(INDEX_FILE, 'w', encoding='utf-8') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"\n✅ Generated {INDEX_FILE} with {len(all_notes)} notes")
print(f" Categories: {category_stats}")
return output
if __name__ == "__main__":
generate_index()