"""Echo Core memory search — semantic search over memory/*.md files. Uses Ollama all-minilm embeddings stored in SQLite for cosine similarity search. """ import logging import math import re import sqlite3 import struct from datetime import datetime, timezone from pathlib import Path import httpx log = logging.getLogger(__name__) PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = PROJECT_ROOT / "memory" / "echo.sqlite" MEMORY_DIR = PROJECT_ROOT / "memory" # Defaults — overridable via config.json ollama/memory sections _OLLAMA_BASE_URL = "http://localhost:11434" _OLLAMA_MODEL = "all-minilm" _EMBEDDING_DIM = 384 _CHUNK_TARGET = 500 _CHUNK_MAX = 1000 _CHUNK_MIN = 100 # Runtime config (populated by init_config) OLLAMA_URL = f"{_OLLAMA_BASE_URL}/api/embeddings" OLLAMA_MODEL = _OLLAMA_MODEL EMBEDDING_DIM = _EMBEDDING_DIM def init_config(config=None) -> None: """Load settings from config object. Call once at startup.""" global OLLAMA_URL, OLLAMA_MODEL, EMBEDDING_DIM if config is None: # Try loading from config.json directly config_file = PROJECT_ROOT / "config.json" if config_file.exists(): import json try: with open(config_file, encoding="utf-8") as f: data = json.load(f) base_url = data.get("ollama", {}).get("url", _OLLAMA_BASE_URL) OLLAMA_URL = f"{base_url.rstrip('/')}/api/embeddings" OLLAMA_MODEL = data.get("ollama", {}).get("model", _OLLAMA_MODEL) EMBEDDING_DIM = data.get("ollama", {}).get("embedding_dim", _EMBEDDING_DIM) except (json.JSONDecodeError, OSError): pass return # Config object with .get() method base_url = config.get("ollama.url", _OLLAMA_BASE_URL) OLLAMA_URL = f"{base_url.rstrip('/')}/api/embeddings" OLLAMA_MODEL = config.get("ollama.model", _OLLAMA_MODEL) EMBEDDING_DIM = config.get("ollama.embedding_dim", _EMBEDDING_DIM) # Auto-init from config.json on import init_config() def _is_indexable(md_file: Path) -> bool: """Skip generated navigation files so they aren't embedded as if they were notes.""" return md_file.name != "index.md" def get_db() -> sqlite3.Connection: """Get SQLite connection, create table if needed.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH)) conn.execute( """CREATE TABLE IF NOT EXISTS chunks ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT NOT NULL, chunk_index INTEGER NOT NULL, chunk_text TEXT NOT NULL, embedding BLOB NOT NULL, updated_at TEXT NOT NULL, UNIQUE(file_path, chunk_index) )""" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_file_path ON chunks(file_path)" ) conn.commit() return conn def get_embedding(text: str) -> list[float]: """Get embedding vector from Ollama. Returns list of 384 floats.""" try: resp = httpx.post( OLLAMA_URL, json={"model": OLLAMA_MODEL, "prompt": text}, timeout=30.0, ) resp.raise_for_status() embedding = resp.json()["embedding"] if len(embedding) != EMBEDDING_DIM: raise ValueError( f"Expected {EMBEDDING_DIM} dimensions, got {len(embedding)}" ) return embedding except httpx.ConnectError: raise ConnectionError( f"Cannot connect to Ollama at {OLLAMA_URL}. Is Ollama running?" ) except httpx.HTTPStatusError as e: raise ConnectionError(f"Ollama API error: {e.response.status_code}") def serialize_embedding(embedding: list[float]) -> bytes: """Pack floats to bytes for SQLite storage.""" return struct.pack(f"{len(embedding)}f", *embedding) def deserialize_embedding(data: bytes) -> list[float]: """Unpack bytes to floats.""" n = len(data) // 4 return list(struct.unpack(f"{n}f", data)) def cosine_similarity(a: list[float], b: list[float]) -> float: """Compute cosine similarity between two vectors.""" dot = sum(x * y for x, y in zip(a, b)) norm_a = math.sqrt(sum(x * x for x in a)) norm_b = math.sqrt(sum(x * x for x in b)) if norm_a == 0 or norm_b == 0: return 0.0 return dot / (norm_a * norm_b) def chunk_file(file_path: Path) -> list[str]: """Split .md file into chunks of ~500 chars.""" text = file_path.read_text(encoding="utf-8") if not text.strip(): return [] # Split by double newlines or headers raw_parts: list[str] = [] current = "" for line in text.split("\n"): # Split on headers or empty lines (paragraph boundaries) if line.startswith("#") and current.strip(): raw_parts.append(current.strip()) current = line + "\n" elif line.strip() == "" and current.strip(): raw_parts.append(current.strip()) current = "" else: current += line + "\n" if current.strip(): raw_parts.append(current.strip()) # Merge small chunks with next, split large ones chunks: list[str] = [] buffer = "" for part in raw_parts: if buffer and len(buffer) + len(part) + 1 > _CHUNK_MAX: chunks.append(buffer) buffer = part elif buffer: buffer = buffer + "\n\n" + part else: buffer = part # If buffer exceeds max, flush if len(buffer) > _CHUNK_MAX: chunks.append(buffer) buffer = "" if buffer: # Merge tiny trailing chunk with previous if len(buffer) < _CHUNK_MIN and chunks: chunks[-1] = chunks[-1] + "\n\n" + buffer else: chunks.append(buffer) return chunks def index_file(file_path: Path) -> int: """Index a single file. Returns number of chunks created.""" rel_path = str(file_path.relative_to(MEMORY_DIR)) chunks = chunk_file(file_path) if not chunks: return 0 now = datetime.now(timezone.utc).isoformat() conn = get_db() try: conn.execute("DELETE FROM chunks WHERE file_path = ?", (rel_path,)) for i, chunk_text in enumerate(chunks): embedding = get_embedding(chunk_text) conn.execute( """INSERT INTO chunks (file_path, chunk_index, chunk_text, embedding, updated_at) VALUES (?, ?, ?, ?, ?)""", (rel_path, i, chunk_text, serialize_embedding(embedding), now), ) conn.commit() return len(chunks) finally: conn.close() def reindex() -> dict: """Rebuild entire index. Returns {"files": N, "chunks": M}.""" conn = get_db() conn.execute("DELETE FROM chunks") conn.commit() conn.close() files_count = 0 chunks_count = 0 for md_file in sorted(MEMORY_DIR.rglob("*.md")): if not _is_indexable(md_file): continue try: n = index_file(md_file) files_count += 1 chunks_count += n log.info("Indexed %s (%d chunks)", md_file.name, n) except Exception as e: log.warning("Failed to index %s: %s", md_file, e) return {"files": files_count, "chunks": chunks_count} def incremental_index() -> dict: """Index only new or modified .md files. Returns {"indexed": N, "chunks": M}.""" conn = get_db() try: # Get latest updated_at per file from DB rows = conn.execute( "SELECT file_path, MAX(updated_at) FROM chunks GROUP BY file_path" ).fetchall() db_times = {} for rel_path, updated_at in rows: try: db_times[rel_path] = datetime.fromisoformat(updated_at) except (ValueError, TypeError): pass finally: conn.close() files_indexed = 0 chunks_total = 0 for md_file in sorted(MEMORY_DIR.rglob("*.md")): if not _is_indexable(md_file): continue rel_path = str(md_file.relative_to(MEMORY_DIR)) file_mtime = datetime.fromtimestamp( md_file.stat().st_mtime, tz=timezone.utc ) db_time = db_times.get(rel_path) if db_time is not None: # Ensure both are offset-aware for comparison if db_time.tzinfo is None: db_time = db_time.replace(tzinfo=timezone.utc) if file_mtime <= db_time: continue try: n = index_file(md_file) files_indexed += 1 chunks_total += n log.info("Incremental indexed %s (%d chunks)", md_file.name, n) except Exception as e: log.warning("Failed to index %s: %s", md_file, e) return {"indexed": files_indexed, "chunks": chunks_total} def _keyword_fallback(query: str, top_k: int = 5) -> list[dict]: """Keyword search over indexed chunks. Used when the embedding backend is down. Returns the same shape as search() plus "degraded": True so callers can tell the user that semantic recall was unavailable. Ranks best-chunk-per-file by raw term-occurrence count. """ terms = [t for t in re.findall(r"\w+", query.lower()) if len(t) > 2] conn = get_db() try: rows = conn.execute("SELECT file_path, chunk_text FROM chunks").fetchall() finally: conn.close() best: dict[str, dict] = {} for file_path, chunk_text in rows: low = chunk_text.lower() hits = sum(low.count(t) for t in terms) if terms else 0 if hits == 0: continue cur = best.get(file_path) if cur is None or hits > cur["score"]: best[file_path] = { "file": file_path, "chunk": chunk_text, "score": float(hits), "degraded": True, } scored = sorted(best.values(), key=lambda x: x["score"], reverse=True) return scored[:top_k] def search(query: str, top_k: int = 5) -> list[dict]: """Search for query. Returns list of {"file": str, "chunk": str, "score": float}. Results are deduped to the best-scoring chunk per file, so a relevant note can't be buried by another file contributing several chunks. If the embedding backend (Ollama) is unreachable, falls back to keyword search and tags each result with "degraded": True instead of raising. """ try: query_embedding = get_embedding(query) except ConnectionError as e: log.warning( "Embedding backend unavailable (%s); falling back to keyword search", e ) return _keyword_fallback(query, top_k) conn = get_db() try: rows = conn.execute( "SELECT file_path, chunk_text, embedding FROM chunks" ).fetchall() finally: conn.close() if not rows: return [] best: dict[str, dict] = {} for file_path, chunk_text, emb_blob in rows: emb = deserialize_embedding(emb_blob) score = cosine_similarity(query_embedding, emb) cur = best.get(file_path) if cur is None or score > cur["score"]: best[file_path] = {"file": file_path, "chunk": chunk_text, "score": score} scored = sorted(best.values(), key=lambda x: x["score"], reverse=True) return scored[:top_k]