feat: add 19 fast commands (no-LLM) + incremental embeddings indexing

Fast commands for git, email, calendar, notes, search, reminders, and
diagnostics — all execute instantly without Claude CLI. Incremental
embeddings indexing in heartbeat (1h cooldown) + inline indexing after
/note, /jurnal, /email save. Fix Ollama URL (localhost → 10.0.20.161),
fix email_process.py KB path (kb/ → memory/kb/).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MoltBot Service
2026-02-15 15:10:44 +00:00
parent 8b76a2dbf7
commit c8ce94611b
7 changed files with 1300 additions and 3 deletions

View File

@@ -59,7 +59,7 @@
"Bash(scp *10.0.20.*)", "Bash(rsync *10.0.20.*)"
],
"ollama": {
"url": "http://localhost:11434"
"url": "http://10.0.20.161:11434"
},
"paths": {
"personality": "personality/",

712
src/fast_commands.py Normal file
View File

@@ -0,0 +1,712 @@
"""Echo Core fast commands — no-LLM instant responses.
Dispatch table with handlers for quick operations: git, email, calendar,
notes, search, reminders, and diagnostics.
"""
import json
import logging
import os
import shutil
import subprocess
import threading
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import Callable
log = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).resolve().parent.parent
TOOLS_DIR = PROJECT_ROOT / "tools"
MEMORY_DIR = PROJECT_ROOT / "memory"
LOGS_DIR = PROJECT_ROOT / "logs"
# ---------------------------------------------------------------------------
# Git
# ---------------------------------------------------------------------------
def cmd_commit(args: list[str]) -> str:
"""git add -A + commit with auto or custom message."""
status = _git("status", "--porcelain")
if not status.strip():
return "Nothing to commit."
files = [l for l in status.strip().split("\n") if l.strip()]
file_count = len(files)
if args:
message = " ".join(args)
else:
message = _auto_commit_message(files)
_git("add", "-A")
result = _git("commit", "-m", message)
# Extract short hash
short_hash = _git("rev-parse", "--short", "HEAD").strip()
return f"Committed: {short_hash}{message} ({file_count} files)"
def cmd_push(args: list[str]) -> str:
"""git push with safety checks."""
# Check for uncommitted changes
status = _git("status", "--porcelain")
if status.strip():
uncommitted = len([l for l in status.strip().split("\n") if l.strip()])
return f"Warning: {uncommitted} uncommitted changes. Commit first."
result = _git("push")
if "Everything up-to-date" in result or "up-to-date" in result.lower():
return "Already up to date."
branch = _git("branch", "--show-current").strip()
return f"Pushed to origin/{branch}."
def cmd_pull(args: list[str]) -> str:
"""git pull --rebase."""
result = _git("pull", "--rebase")
if "Already up to date" in result:
return "Already up to date."
if "Fast-forward" in result or "rewinding" in result.lower():
# Count commits pulled
lines = [l for l in result.split("\n") if l.strip()]
return f"Pulled updates ({len(lines)} lines of output)."
return result.strip()[:500]
# ---------------------------------------------------------------------------
# Test & Dev
# ---------------------------------------------------------------------------
def cmd_test(args: list[str]) -> str:
"""Run pytest tests."""
cmd = ["python3", "-m", "pytest", "tests/", "-q", "--tb=short"]
if args:
cmd.extend(["-k", " ".join(args)])
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=120,
cwd=str(PROJECT_ROOT),
)
output = proc.stdout.strip()
if proc.stderr and proc.returncode != 0:
output += "\n" + proc.stderr.strip()
# Return last few lines (summary)
lines = output.split("\n")
summary = "\n".join(lines[-10:]) if len(lines) > 10 else output
return summary
except subprocess.TimeoutExpired:
return "Tests timed out (120s limit)."
# ---------------------------------------------------------------------------
# Email
# ---------------------------------------------------------------------------
def cmd_email(args: list[str]) -> str:
"""Dispatch: /email, /email send ..., /email save."""
if not args:
return _email_check()
sub = args[0].lower()
if sub == "send":
return _email_send(args[1:])
if sub == "save":
return _email_save()
return f"Unknown email sub-command: {sub}. Use: /email, /email send, /email save"
def _email_check() -> str:
"""Check unread emails."""
script = TOOLS_DIR / "email_check.py"
if not script.exists():
return "email_check.py not found."
try:
proc = subprocess.run(
["python3", str(script)],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0:
return f"Email check failed: {proc.stderr.strip()[:200]}"
data = json.loads(proc.stdout)
if not data.get("ok"):
return f"Email error: {data.get('error', 'unknown')}"
count = data.get("unread_count", 0)
if count == 0:
return "Inbox curat."
emails = data.get("emails", [])
subjects = [e.get("subject", "?") for e in emails[:5]]
subject_list = ", ".join(subjects)
return f"{count} necitite: {subject_list}"
except json.JSONDecodeError:
return proc.stdout.strip()[:300] if proc.stdout else "Email check: no output."
except Exception as e:
return f"Email check error: {e}"
def _email_send(args: list[str]) -> str:
"""Send email: /email send <to> <subject> :: <body>."""
raw = " ".join(args)
if "::" not in raw:
return "Format: /email send <to> <subject> :: <body>"
header, body = raw.split("::", 1)
header_parts = header.strip().split(None, 1)
if len(header_parts) < 2:
return "Format: /email send <to> <subject> :: <body>"
to_addr = header_parts[0]
subject = header_parts[1]
body = body.strip()
script = TOOLS_DIR / "email_send.py"
if not script.exists():
return "email_send.py not found."
try:
proc = subprocess.run(
["python3", str(script), to_addr, subject, body],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0:
return f"Email send failed: {proc.stderr.strip()[:200]}"
data = json.loads(proc.stdout)
if data.get("ok"):
return f"Email trimis catre {to_addr}."
return f"Email error: {data.get('error', 'unknown')}"
except Exception as e:
return f"Email send error: {e}"
def _email_save() -> str:
"""Save unread emails as KB notes."""
script = TOOLS_DIR / "email_process.py"
if not script.exists():
return "email_process.py not found."
try:
proc = subprocess.run(
["python3", str(script), "--save"],
capture_output=True, text=True, timeout=60,
cwd=str(PROJECT_ROOT),
)
output = proc.stdout.strip()
if not output or "Niciun email" in output:
return "Nimic de salvat."
# Count saved lines and index new files
saved = output.count("Salvat")
if saved:
# Index newly saved email files
kb_emails = MEMORY_DIR / "kb" / "emails"
if kb_emails.exists():
for md in sorted(kb_emails.glob("*.md"))[-saved:]:
_index_file_async(md)
return f"Salvate {saved} emailuri in KB."
return output[:300]
except Exception as e:
return f"Email save error: {e}"
# ---------------------------------------------------------------------------
# Calendar
# ---------------------------------------------------------------------------
def cmd_calendar(args: list[str]) -> str:
"""Dispatch: /calendar, /calendar week, /calendar busy."""
script = TOOLS_DIR / "calendar_check.py"
if not script.exists():
return "calendar_check.py not found."
if not args:
return _calendar_today(script)
sub = args[0].lower()
if sub == "week":
return _calendar_week(script)
if sub == "busy":
return _calendar_busy(script)
return f"Unknown calendar sub-command: {sub}. Use: /calendar, /calendar week, /calendar busy"
def _calendar_today(script: Path) -> str:
"""Today + tomorrow events."""
try:
proc = subprocess.run(
["python3", str(script), "today"],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0:
return f"Calendar error: {proc.stderr.strip()[:200]}"
data = json.loads(proc.stdout)
parts = []
today_events = data.get("today", [])
tomorrow_events = data.get("tomorrow", [])
if today_events:
parts.append("Program azi:")
for ev in today_events:
parts.append(f" {ev.get('time', '?')}{ev.get('summary', '?')}")
else:
parts.append("Azi: nimic programat.")
if tomorrow_events:
parts.append("Maine:")
for ev in tomorrow_events:
parts.append(f" {ev.get('time', '?')}{ev.get('summary', '?')}")
return "\n".join(parts)
except Exception as e:
return f"Calendar error: {e}"
def _calendar_week(script: Path) -> str:
"""Week events."""
try:
proc = subprocess.run(
["python3", str(script), "week"],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0:
return f"Calendar error: {proc.stderr.strip()[:200]}"
data = json.loads(proc.stdout)
events = data.get("events", [])
if not events:
return "Saptamana libera."
week_range = f"{data.get('week_start', '?')}{data.get('week_end', '?')}"
parts = [f"Saptamana {week_range}:"]
current_date = None
for ev in events:
ev_date = ev.get("date", "?")
if ev_date != current_date:
current_date = ev_date
parts.append(f"\n {ev_date}:")
parts.append(f" {ev.get('time', '?')}{ev.get('summary', '?')}")
return "\n".join(parts)
except Exception as e:
return f"Calendar error: {e}"
def _calendar_busy(script: Path) -> str:
"""Am I busy right now?"""
try:
proc = subprocess.run(
["python3", str(script), "busy"],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0:
return f"Calendar error: {proc.stderr.strip()[:200]}"
data = json.loads(proc.stdout)
if data.get("busy"):
event = data.get("event", "?")
ends = data.get("ends", "?")
return f"Ocupat: {event} (pana la {ends})"
return "Liber."
except Exception as e:
return f"Calendar error: {e}"
# ---------------------------------------------------------------------------
# Notes & Journal
# ---------------------------------------------------------------------------
def cmd_note(args: list[str]) -> str:
"""Append a quick note to today's daily file."""
if not args:
return "Usage: /note <text>"
text = " ".join(args)
today = date.today().isoformat()
now = datetime.now().strftime("%H:%M")
filepath = MEMORY_DIR / f"{today}.md"
if not filepath.exists():
filepath.write_text(f"# {today}\n\n", encoding="utf-8")
with open(filepath, "a", encoding="utf-8") as f:
f.write(f"- [{now}] {text}\n")
_index_file_async(filepath)
return "Notat."
def cmd_jurnal(args: list[str]) -> str:
"""Append a journal entry under ## Jurnal section in today's file."""
if not args:
return "Usage: /jurnal <text>"
text = " ".join(args)
today = date.today().isoformat()
now = datetime.now().strftime("%H:%M")
filepath = MEMORY_DIR / f"{today}.md"
if not filepath.exists():
filepath.write_text(f"# {today}\n\n## Jurnal\n\n", encoding="utf-8")
content = filepath.read_text(encoding="utf-8")
else:
content = filepath.read_text(encoding="utf-8")
if "## Jurnal" not in content:
with open(filepath, "a", encoding="utf-8") as f:
f.write("\n## Jurnal\n\n")
content = filepath.read_text(encoding="utf-8")
# Append under ## Jurnal
entry = f"**{now}** — {text}\n"
# Find position after ## Jurnal and any existing entries
jurnal_idx = content.index("## Jurnal")
# Find next section or end of file
rest = content[jurnal_idx + len("## Jurnal"):]
next_section = rest.find("\n## ")
if next_section == -1:
# Append at end
with open(filepath, "a", encoding="utf-8") as f:
f.write(f"{entry}\n")
else:
# Insert before next section
insert_pos = jurnal_idx + len("## Jurnal") + next_section
new_content = content[:insert_pos] + f"\n{entry}" + content[insert_pos:]
filepath.write_text(new_content, encoding="utf-8")
_index_file_async(filepath)
return "Jurnal actualizat."
# ---------------------------------------------------------------------------
# Search & KB
# ---------------------------------------------------------------------------
def cmd_search(args: list[str]) -> str:
"""Semantic search over memory/ files."""
if not args:
return "Usage: /search <query>"
query = " ".join(args)
try:
from src.memory_search import search
results = search(query, top_k=5)
if not results:
return "Niciun rezultat."
parts = []
for r in results:
score = f"{r['score']:.2f}"
snippet = r["chunk"][:120].replace("\n", " ")
parts.append(f" [{score}] {r['file']}{snippet}")
return "Rezultate:\n" + "\n".join(parts)
except ConnectionError as e:
return f"Search error (Ollama): {e}"
except Exception as e:
return f"Search error: {e}"
def cmd_kb(args: list[str]) -> str:
"""List recent KB notes, optionally filtered by category."""
index_file = MEMORY_DIR / "kb" / "index.json"
if not index_file.exists():
return "KB index not found. Run reindex."
try:
data = json.loads(index_file.read_text(encoding="utf-8"))
notes = data.get("notes", [])
if not notes:
return "KB gol."
category = args[0].lower() if args else None
if category:
notes = [n for n in notes if n.get("category", "").lower() == category]
if not notes:
return f"Nicio nota in categoria '{category}'."
# Sort by date desc, take top 10
notes.sort(key=lambda n: n.get("date", ""), reverse=True)
notes = notes[:10]
parts = []
for n in notes:
title = n.get("title", "?")
d = n.get("date", "?")
cat = n.get("category", "")
parts.append(f" {d} [{cat}] {title}")
return "Ultimele note:\n" + "\n".join(parts)
except Exception as e:
return f"KB error: {e}"
# ---------------------------------------------------------------------------
# Reminders
# ---------------------------------------------------------------------------
def cmd_remind(args: list[str]) -> str:
"""Create a Google Calendar reminder event.
/remind HH:MM text — today at HH:MM
/remind YYYY-MM-DD HH:MM text — specific date
"""
if len(args) < 2:
return "Usage: /remind <HH:MM> <text> or /remind <YYYY-MM-DD> <HH:MM> <text>"
# Try to detect date vs time in first arg
first = args[0]
if "-" in first and len(first) == 10:
# YYYY-MM-DD format
if len(args) < 3:
return "Usage: /remind <YYYY-MM-DD> <HH:MM> <text>"
day = first
time_str = args[1]
text = " ".join(args[2:])
else:
# HH:MM — today
day = date.today().isoformat()
time_str = first
text = " ".join(args[1:])
# Validate time
try:
hour, minute = time_str.split(":")
int(hour)
int(minute)
except (ValueError, AttributeError):
return f"Invalid time format: {time_str}. Use HH:MM."
start_iso = f"{day}T{time_str}:00"
try:
# Import create_event from calendar_check.py
import sys
sys.path.insert(0, str(TOOLS_DIR))
from calendar_check import create_event
result = create_event(text, start_iso, duration_minutes=30)
time_display = time_str
return f"Reminder setat: {time_display}{text}"
except Exception as e:
return f"Remind error: {e}"
finally:
# Clean up sys.path
if str(TOOLS_DIR) in sys.path:
sys.path.remove(str(TOOLS_DIR))
# ---------------------------------------------------------------------------
# Ops & Diagnostics
# ---------------------------------------------------------------------------
def cmd_logs(args: list[str]) -> str:
"""Show last N lines from echo-core.log."""
n = 20
if args:
try:
n = int(args[0])
except ValueError:
return "Usage: /logs [N] (default 20)"
log_file = LOGS_DIR / "echo-core.log"
if not log_file.exists():
return "No log file found."
try:
lines = log_file.read_text(encoding="utf-8").split("\n")
tail = lines[-n:] if len(lines) > n else lines
return "\n".join(tail).strip() or "Log empty."
except Exception as e:
return f"Log read error: {e}"
def cmd_doctor(args: list[str]) -> str:
"""Quick diagnostic checks."""
checks = []
# Claude CLI
claude_found = shutil.which("claude") is not None
checks.append(("Claude CLI", claude_found))
# Keyring secrets count
try:
from src.credential_store import get_secret
import keyring
keyring.get_password("echo-core", "_registry")
checks.append(("Keyring", True))
except Exception:
checks.append(("Keyring", False))
# Config valid
config_file = PROJECT_ROOT / "config.json"
try:
json.loads(config_file.read_text(encoding="utf-8"))
checks.append(("config.json", True))
except Exception:
checks.append(("config.json", False))
# Ollama reachable (read URL from config)
try:
import httpx
ollama_url = "http://10.0.20.161:11434"
try:
cfg_data = json.loads(config_file.read_text(encoding="utf-8"))
ollama_url = cfg_data.get("ollama", {}).get("url", ollama_url).rstrip("/")
except Exception:
pass
resp = httpx.get(f"{ollama_url}/api/version", timeout=5.0)
checks.append(("Ollama", resp.status_code == 200))
except Exception:
checks.append(("Ollama", False))
# Disk space
try:
stat = os.statvfs(str(PROJECT_ROOT))
free_gb = (stat.f_bavail * stat.f_frsize) / (1024 ** 3)
checks.append((f"Disk ({free_gb:.1f}GB free)", free_gb >= 1.0))
except OSError:
checks.append(("Disk", False))
# Format output
lines = []
for name, ok in checks:
icon = "OK" if ok else "FAIL"
lines.append(f" [{icon}] {name}")
passed = sum(1 for _, ok in checks if ok)
total = len(checks)
header = f"Doctor: {passed}/{total} checks passed"
return header + "\n" + "\n".join(lines)
def cmd_heartbeat(args: list[str]) -> str:
"""Force a heartbeat run and return the result."""
try:
from src.heartbeat import run_heartbeat
from src.config import Config
config = Config()
result = run_heartbeat(config.data)
return f"Heartbeat: {result}"
except Exception as e:
return f"Heartbeat error: {e}"
def cmd_help(args: list[str]) -> str:
"""List all available fast commands."""
return """Comenzi disponibile:
Git:
/commit [msg] — Commit all changes (auto-message or custom)
/push — Push to remote (with safety checks)
/pull — Pull with rebase
Dev:
/test [pattern] — Run tests (optional -k filter)
Email:
/email — Check unread emails
/email send <to> <subject> :: <body> — Send email
/email save — Save unread emails to KB
Calendar:
/calendar — Today + tomorrow events
/calendar week — This week's schedule
/calendar busy — Am I in a meeting now?
Notes:
/note <text> — Quick note in daily file
/jurnal <text> — Journal entry in daily file
Search:
/search <query> — Semantic search over memory/
/kb [category] — Recent KB notes
Reminders:
/remind <HH:MM> <text> — Reminder today
/remind <YYYY-MM-DD> <HH:MM> <text> — Reminder on date
Ops:
/logs [N] — Last N log lines (default 20)
/doctor — System diagnostics
/heartbeat — Force heartbeat check
/help — This message
Session:
/clear — Clear Claude session
/status — Session info
/model [name] — Show/change model"""
# ---------------------------------------------------------------------------
# Dispatch
# ---------------------------------------------------------------------------
COMMANDS: dict[str, Callable] = {
"commit": cmd_commit,
"push": cmd_push,
"pull": cmd_pull,
"test": cmd_test,
"email": cmd_email,
"calendar": cmd_calendar,
"note": cmd_note,
"jurnal": cmd_jurnal,
"search": cmd_search,
"kb": cmd_kb,
"remind": cmd_remind,
"logs": cmd_logs,
"doctor": cmd_doctor,
"heartbeat": cmd_heartbeat,
"help": cmd_help,
}
def dispatch(name: str, args: list[str]) -> str | None:
"""Dispatch a fast command by name. Returns response or None if unknown."""
handler = COMMANDS.get(name)
if handler is None:
return None
try:
return handler(args)
except Exception as e:
log.exception("Fast command /%s failed", name)
return f"Error in /{name}: {e}"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _index_file_async(filepath: Path) -> None:
"""Index a file for semantic search in a background thread."""
def _do_index():
try:
from src.memory_search import index_file
index_file(filepath)
except Exception as e:
log.debug("Background index of %s failed: %s", filepath.name, e)
threading.Thread(target=_do_index, daemon=True).start()
def _git(*args: str) -> str:
"""Run a git command and return stdout."""
proc = subprocess.run(
["git", *args],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0 and proc.stderr:
return proc.stderr.strip()
return proc.stdout.strip()
def _auto_commit_message(files: list[str]) -> str:
"""Generate a commit message from changed files list (porcelain output)."""
areas = set()
added = modified = deleted = 0
for line in files:
if not line.strip():
continue
status_code = line[:2].strip()
filename = line[3:]
if "/" in filename:
areas.add(filename.split("/")[0])
else:
areas.add("root")
if "A" in status_code or "?" in status_code:
added += 1
elif "D" in status_code:
deleted += 1
else:
modified += 1
parts = []
if added:
parts.append(f"+{added}")
if modified:
parts.append(f"~{modified}")
if deleted:
parts.append(f"-{deleted}")
change_summary = " ".join(parts) if parts else "changes"
area_list = ", ".join(sorted(areas)[:3])
if len(areas) > 3:
area_list += f" +{len(areas) - 3} more"
return f"Update {area_list} ({change_summary})"

View File

@@ -24,6 +24,7 @@ DEFAULT_CHECKS = {
"calendar": True,
"kb_index": True,
"git": True,
"embeddings": True,
}
DEFAULT_COOLDOWNS = {
@@ -31,6 +32,7 @@ DEFAULT_COOLDOWNS = {
"calendar": 0, # every run
"kb_index": 14400, # 4h
"git": 14400, # 4h
"embeddings": 3600, # 1h
}
DEFAULT_QUIET_HOURS = [23, 8]
@@ -84,6 +86,13 @@ def run_heartbeat(config: dict | None = None) -> str:
results.append(git_result)
checks["git"] = now.isoformat()
# Check 5: Incremental embeddings index
if check_flags.get("embeddings") and _should_run("embeddings", checks, now, cooldowns):
emb_result = _check_embeddings()
if emb_result:
results.append(emb_result)
checks["embeddings"] = now.isoformat()
# Claude CLI: run if HEARTBEAT.md has extra instructions
if not is_quiet:
claude_result = _run_claude_extra(
@@ -316,6 +325,24 @@ def _run_reindex() -> None:
log.warning("KB reindex failed: %s", e)
def _check_embeddings() -> str | None:
"""Incremental re-index of memory/ embeddings for semantic search."""
try:
from src.memory_search import incremental_index
result = incremental_index()
indexed = result.get("indexed", 0)
if indexed > 0:
chunks = result.get("chunks", 0)
return f"Embeddings: {indexed} fisiere reindexate ({chunks} chunks)"
return None
except ConnectionError:
log.warning("Embeddings check skipped: Ollama unreachable")
return None
except Exception as e:
log.warning("Embeddings check failed: %s", e)
return None
def _check_git() -> str | None:
"""Check for uncommitted files in project."""
try:

View File

@@ -222,6 +222,48 @@ def reindex() -> dict:
return {"files": files_count, "chunks": chunks_count}
def incremental_index() -> dict:
"""Index only new or modified .md files. Returns {"indexed": N, "chunks": M}."""
conn = get_db()
try:
# Get latest updated_at per file from DB
rows = conn.execute(
"SELECT file_path, MAX(updated_at) FROM chunks GROUP BY file_path"
).fetchall()
db_times = {}
for rel_path, updated_at in rows:
try:
db_times[rel_path] = datetime.fromisoformat(updated_at)
except (ValueError, TypeError):
pass
finally:
conn.close()
files_indexed = 0
chunks_total = 0
for md_file in sorted(MEMORY_DIR.rglob("*.md")):
rel_path = str(md_file.relative_to(MEMORY_DIR))
file_mtime = datetime.fromtimestamp(
md_file.stat().st_mtime, tz=timezone.utc
)
db_time = db_times.get(rel_path)
if db_time is not None:
# Ensure both are offset-aware for comparison
if db_time.tzinfo is None:
db_time = db_time.replace(tzinfo=timezone.utc)
if file_mtime <= db_time:
continue
try:
n = index_file(md_file)
files_indexed += 1
chunks_total += n
log.info("Incremental indexed %s (%d chunks)", md_file.name, n)
except Exception as e:
log.warning("Failed to index %s: %s", md_file, e)
return {"indexed": files_indexed, "chunks": chunks_total}
def search(query: str, top_k: int = 5) -> list[dict]:
"""Search for query. Returns list of {"file": str, "chunk": str, "score": float}."""
query_embedding = get_embedding(query)

View File

@@ -4,6 +4,7 @@ import logging
from typing import Callable
from src.config import Config
from src.fast_commands import dispatch as fast_dispatch
from src.claude_session import (
send_message,
clear_session,
@@ -59,7 +60,13 @@ def route_message(
return _model_command(channel_id, text), True
if text.startswith("/"):
return f"Unknown command: {text.split()[0]}", True
parts = text[1:].split()
cmd_name = parts[0].lower()
cmd_args = parts[1:]
result = fast_dispatch(cmd_name, cmd_args)
if result is not None:
return result, True
return f"Unknown command: /{cmd_name}", True
# Regular message → Claude
if not model:

509
tests/test_fast_commands.py Normal file
View File

@@ -0,0 +1,509 @@
"""Tests for src/fast_commands.py — fast (non-LLM) commands."""
import json
import pytest
from datetime import date, datetime
from pathlib import Path
from unittest.mock import patch, MagicMock
from src.fast_commands import (
dispatch,
cmd_commit,
cmd_push,
cmd_pull,
cmd_test,
cmd_email,
cmd_calendar,
cmd_note,
cmd_jurnal,
cmd_search,
cmd_kb,
cmd_remind,
cmd_logs,
cmd_doctor,
cmd_heartbeat,
cmd_help,
COMMANDS,
_auto_commit_message,
)
# --- Dispatch ---
class TestDispatch:
def test_known_command(self):
"""Known commands return a string."""
result = dispatch("help", [])
assert result is not None
assert "Comenzi disponibile" in result
def test_unknown_command(self):
"""Unknown commands return None."""
assert dispatch("nonexistent", []) is None
def test_all_commands_registered(self):
expected = {
"commit", "push", "pull", "test", "email", "calendar",
"note", "jurnal", "search", "kb", "remind", "logs",
"doctor", "heartbeat", "help",
}
assert set(COMMANDS.keys()) == expected
def test_handler_exception_caught(self):
"""Exceptions in handlers are caught and returned as error strings."""
with patch.dict(COMMANDS, {"boom": lambda args: 1 / 0}):
result = dispatch("boom", [])
assert result is not None
assert "Error in /boom" in result
# --- Git ---
class TestGitCommit:
@patch("src.fast_commands._git")
def test_nothing_to_commit(self, mock_git):
mock_git.return_value = ""
assert cmd_commit([]) == "Nothing to commit."
@patch("src.fast_commands._git")
def test_commit_with_custom_message(self, mock_git):
mock_git.side_effect = [
" M file.py\n", # status --porcelain
"", # add -A
"", # commit -m
"abc1234", # rev-parse --short HEAD
]
result = cmd_commit(["fix", "bug"])
assert "abc1234" in result
assert "fix bug" in result
assert "1 files" in result
@patch("src.fast_commands._git")
def test_commit_auto_message(self, mock_git):
mock_git.side_effect = [
" M src/router.py\n?? newfile.txt\n", # status
"", # add -A
"", # commit -m
"def5678", # rev-parse
]
result = cmd_commit([])
assert "def5678" in result
assert "2 files" in result
class TestGitPush:
@patch("src.fast_commands._git")
def test_push_with_uncommitted(self, mock_git):
mock_git.return_value = " M dirty.py\n"
result = cmd_push([])
assert "uncommitted" in result.lower()
@patch("src.fast_commands._git")
def test_push_up_to_date(self, mock_git):
mock_git.side_effect = [
"", # status --porcelain
"Everything up-to-date", # push
]
result = cmd_push([])
assert "up to date" in result.lower()
@patch("src.fast_commands._git")
def test_push_success(self, mock_git):
mock_git.side_effect = [
"", # status --porcelain
"abc..def master -> master", # push
"master", # branch --show-current
]
result = cmd_push([])
assert "origin/master" in result
class TestGitPull:
@patch("src.fast_commands._git")
def test_pull_up_to_date(self, mock_git):
mock_git.return_value = "Already up to date."
assert "up to date" in cmd_pull([]).lower()
@patch("src.fast_commands._git")
def test_pull_with_changes(self, mock_git):
mock_git.return_value = "Fast-forward\n file.py | 2 +-"
result = cmd_pull([])
assert "Pulled" in result
# --- Test ---
class TestTestCmd:
@patch("subprocess.run")
def test_basic_run(self, mock_run):
mock_run.return_value = MagicMock(
stdout="5 passed in 1.2s",
stderr="",
returncode=0,
)
result = cmd_test([])
assert "passed" in result
@patch("subprocess.run")
def test_with_pattern(self, mock_run):
mock_run.return_value = MagicMock(
stdout="2 passed",
stderr="",
returncode=0,
)
cmd_test(["router"])
args = mock_run.call_args[0][0]
assert "-k" in args
assert "router" in args
@patch("subprocess.run")
def test_timeout(self, mock_run):
import subprocess as sp
mock_run.side_effect = sp.TimeoutExpired(cmd="pytest", timeout=120)
result = cmd_test([])
assert "timed out" in result.lower()
# --- Email ---
class TestEmail:
@patch("subprocess.run")
def test_email_check_clean(self, mock_run):
mock_run.return_value = MagicMock(
stdout=json.dumps({"ok": True, "unread_count": 0, "emails": []}),
stderr="",
returncode=0,
)
result = cmd_email([])
assert "curat" in result.lower()
@patch("subprocess.run")
def test_email_check_unread(self, mock_run):
mock_run.return_value = MagicMock(
stdout=json.dumps({
"ok": True,
"unread_count": 2,
"emails": [
{"subject": "Hello"},
{"subject": "Meeting"},
],
}),
stderr="",
returncode=0,
)
result = cmd_email([])
assert "2 necitite" in result
assert "Hello" in result
def test_email_unknown_sub(self):
result = cmd_email(["foo"])
assert "Unknown email sub-command" in result
@patch("subprocess.run")
def test_email_send(self, mock_run):
mock_run.return_value = MagicMock(
stdout=json.dumps({"ok": True}),
stderr="",
returncode=0,
)
result = cmd_email(["send", "test@x.com", "Subject", "::", "Body"])
assert "trimis" in result.lower()
def test_email_send_bad_format(self):
result = cmd_email(["send", "no-separator"])
assert "Format" in result
@patch("subprocess.run")
def test_email_save_nothing(self, mock_run):
mock_run.return_value = MagicMock(
stdout="Niciun email nou de la adrese whitelisted.",
stderr="",
returncode=0,
)
result = cmd_email(["save"])
assert "Nimic de salvat" in result
# --- Calendar ---
class TestCalendar:
@patch("subprocess.run")
def test_calendar_today(self, mock_run):
mock_run.return_value = MagicMock(
stdout=json.dumps({
"today": [{"time": "09:00", "summary": "Standup"}],
"tomorrow": [],
}),
stderr="",
returncode=0,
)
result = cmd_calendar([])
assert "Standup" in result
assert "09:00" in result
@patch("subprocess.run")
def test_calendar_week(self, mock_run):
mock_run.return_value = MagicMock(
stdout=json.dumps({
"week_start": "10 Feb",
"week_end": "16 Feb",
"events": [{"date": "10 Feb", "time": "10:00", "summary": "Review"}],
}),
stderr="",
returncode=0,
)
result = cmd_calendar(["week"])
assert "Review" in result
@patch("subprocess.run")
def test_calendar_busy_free(self, mock_run):
mock_run.return_value = MagicMock(
stdout=json.dumps({"busy": False}),
stderr="",
returncode=0,
)
result = cmd_calendar(["busy"])
assert "Liber" in result
@patch("subprocess.run")
def test_calendar_busy_occupied(self, mock_run):
mock_run.return_value = MagicMock(
stdout=json.dumps({"busy": True, "event": "Meeting", "ends": "16:30"}),
stderr="",
returncode=0,
)
result = cmd_calendar(["busy"])
assert "Ocupat" in result
assert "Meeting" in result
def test_calendar_unknown_sub(self):
result = cmd_calendar(["foo"])
assert "Unknown calendar sub-command" in result
# --- Notes ---
class TestNote:
def test_note_empty(self):
assert "Usage" in cmd_note([])
def test_note_creates_file(self, tmp_path):
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
result = cmd_note(["test", "note"])
assert result == "Notat."
today = date.today().isoformat()
filepath = tmp_path / f"{today}.md"
assert filepath.exists()
content = filepath.read_text()
assert "test note" in content
assert f"# {today}" in content
def test_note_appends(self, tmp_path):
today = date.today().isoformat()
filepath = tmp_path / f"{today}.md"
filepath.write_text(f"# {today}\n\n- existing\n")
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
cmd_note(["new", "note"])
content = filepath.read_text()
assert "existing" in content
assert "new note" in content
class TestJurnal:
def test_jurnal_empty(self):
assert "Usage" in cmd_jurnal([])
def test_jurnal_creates_file(self, tmp_path):
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
result = cmd_jurnal(["my", "entry"])
assert result == "Jurnal actualizat."
today = date.today().isoformat()
filepath = tmp_path / f"{today}.md"
content = filepath.read_text()
assert "## Jurnal" in content
assert "my entry" in content
def test_jurnal_adds_section_if_missing(self, tmp_path):
today = date.today().isoformat()
filepath = tmp_path / f"{today}.md"
filepath.write_text(f"# {today}\n\n- some note\n")
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
cmd_jurnal(["entry"])
content = filepath.read_text()
assert "## Jurnal" in content
assert "entry" in content
# --- Search ---
class TestSearch:
def test_search_empty(self):
assert "Usage" in cmd_search([])
@patch("src.memory_search.search")
def test_search_no_results(self, mock_search):
mock_search.return_value = []
result = cmd_search(["query"])
assert "Niciun rezultat" in result
@patch("src.memory_search.search")
def test_search_with_results(self, mock_search):
mock_search.return_value = [
{"file": "notes.md", "chunk": "important info here", "score": 0.92},
]
result = cmd_search(["important"])
assert "Rezultate" in result
assert "notes.md" in result
assert "0.92" in result
# --- KB ---
class TestKB:
def test_kb_no_index(self, tmp_path):
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
result = cmd_kb([])
assert "not found" in result.lower()
def test_kb_list(self, tmp_path):
kb_dir = tmp_path / "kb"
kb_dir.mkdir()
index = {
"notes": [
{"title": "Note 1", "date": "2026-02-15", "category": "insights"},
{"title": "Note 2", "date": "2026-02-14", "category": "projects"},
]
}
(kb_dir / "index.json").write_text(json.dumps(index))
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
result = cmd_kb([])
assert "Note 1" in result
assert "Note 2" in result
def test_kb_filter_category(self, tmp_path):
kb_dir = tmp_path / "kb"
kb_dir.mkdir()
index = {
"notes": [
{"title": "Note A", "date": "2026-02-15", "category": "insights"},
{"title": "Note B", "date": "2026-02-14", "category": "projects"},
]
}
(kb_dir / "index.json").write_text(json.dumps(index))
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
result = cmd_kb(["insights"])
assert "Note A" in result
assert "Note B" not in result
def test_kb_filter_no_match(self, tmp_path):
kb_dir = tmp_path / "kb"
kb_dir.mkdir()
index = {"notes": [{"title": "X", "date": "2026-01-01", "category": "a"}]}
(kb_dir / "index.json").write_text(json.dumps(index))
with patch("src.fast_commands.MEMORY_DIR", tmp_path):
result = cmd_kb(["nonexistent"])
assert "Nicio nota" in result
# --- Remind ---
class TestRemind:
def test_remind_too_few_args(self):
assert "Usage" in cmd_remind([])
assert "Usage" in cmd_remind(["15:00"])
def test_remind_bad_time(self):
result = cmd_remind(["nottime", "test"])
assert "Invalid time" in result
@patch("src.fast_commands.TOOLS_DIR", Path("/tmp/fake_tools"))
def test_remind_with_date(self):
with patch.dict("sys.modules", {
"calendar_check": MagicMock(
create_event=MagicMock(return_value={"id": "123"})
),
}):
result = cmd_remind(["2026-03-01", "15:00", "Test", "reminder"])
assert "Reminder setat" in result
assert "15:00" in result
# --- Logs ---
class TestLogs:
def test_logs_no_file(self, tmp_path):
with patch("src.fast_commands.LOGS_DIR", tmp_path):
result = cmd_logs([])
assert "No log file" in result
def test_logs_default(self, tmp_path):
log_file = tmp_path / "echo-core.log"
lines = [f"line {i}" for i in range(30)]
log_file.write_text("\n".join(lines))
with patch("src.fast_commands.LOGS_DIR", tmp_path):
result = cmd_logs([])
assert "line 29" in result
assert "line 10" in result
assert "line 0" not in result
def test_logs_custom_count(self, tmp_path):
log_file = tmp_path / "echo-core.log"
lines = [f"line {i}" for i in range(30)]
log_file.write_text("\n".join(lines))
with patch("src.fast_commands.LOGS_DIR", tmp_path):
result = cmd_logs(["5"])
assert "line 29" in result
assert "line 20" not in result
# --- Doctor ---
class TestDoctor:
@patch("shutil.which", return_value="/usr/bin/claude")
def test_doctor_runs(self, mock_which):
result = cmd_doctor([])
assert "Doctor:" in result
assert "Claude CLI" in result
# --- Help ---
class TestHelp:
def test_help_lists_commands(self):
result = cmd_help([])
assert "/commit" in result
assert "/email" in result
assert "/calendar" in result
assert "/search" in result
assert "/help" in result
assert "/clear" in result
# --- Auto commit message ---
class TestAutoCommitMessage:
def test_single_modified(self):
msg = _auto_commit_message([" M src/router.py"])
assert "src" in msg
assert "~1" in msg
def test_mixed_changes(self):
files = [
" M src/a.py",
"?? tests/new.py",
" D old.py",
]
msg = _auto_commit_message(files)
assert "+1" in msg
assert "~1" in msg
assert "-1" in msg
def test_many_areas(self):
files = [
" M a/x.py",
" M b/y.py",
" M c/z.py",
" M d/w.py",
]
msg = _auto_commit_message(files)
assert "+1 more" in msg

View File

@@ -54,7 +54,7 @@ WHITELIST = [
'marius.mutu@romfast.ro',
]
KB_PATH = Path(__file__).parent.parent / 'kb' / 'emails'
KB_PATH = Path(__file__).parent.parent / 'memory' / 'kb' / 'emails'
def slugify(text: str, max_len: int = 50) -> str:
"""Convert text to URL-friendly slug"""