refactor(heartbeat): config-driven checks, channel delivery, remove hardcoded values

Heartbeat system overhaul:
- Fix email/calendar checks to parse JSON output correctly
- Add per-check cooldowns and quiet hours config
- Send findings to Discord channel instead of just logging
- Auto-reindex KB when stale files detected
- Claude CLI called only if HEARTBEAT.md has extra instructions
- All settings configurable via config.json heartbeat section

Move hardcoded values to config.json:
- allowed_tools list (claude_session.py)
- Ollama URL/model (memory_search.py now reads ollama.url from config)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MoltBot Service
2026-02-14 23:46:04 +00:00
parent ad31b25af3
commit 9c1f9f94e7
6 changed files with 588 additions and 141 deletions

View File

@@ -26,8 +26,46 @@
}, },
"heartbeat": { "heartbeat": {
"enabled": true, "enabled": true,
"interval_minutes": 30 "interval_minutes": 30,
"channel": "echo-core",
"model": "haiku",
"quiet_hours": [23, 8],
"checks": {
"email": true,
"calendar": true,
"kb_index": true,
"git": true
},
"cooldowns": {
"email": 1800,
"calendar": 0,
"kb_index": 14400,
"git": 14400
}
}, },
"allowed_tools": [
"Read", "Edit", "Write", "Glob", "Grep",
"WebFetch", "WebSearch",
"Bash(python3 *)", "Bash(.venv/bin/python3 *)",
"Bash(pip *)", "Bash(pytest *)",
"Bash(git add *)", "Bash(git commit *)",
"Bash(git push)", "Bash(git push *)",
"Bash(git pull)", "Bash(git pull *)",
"Bash(git status)", "Bash(git status *)",
"Bash(git diff)", "Bash(git diff *)",
"Bash(git log)", "Bash(git log *)",
"Bash(git checkout *)",
"Bash(git branch)", "Bash(git branch *)",
"Bash(git stash)", "Bash(git stash *)",
"Bash(npm *)", "Bash(node *)", "Bash(npx *)",
"Bash(systemctl --user *)",
"Bash(trash *)", "Bash(mkdir *)", "Bash(cp *)",
"Bash(mv *)", "Bash(ls *)", "Bash(cat *)", "Bash(chmod *)",
"Bash(docker *)", "Bash(docker-compose *)", "Bash(docker compose *)",
"Bash(ssh *@10.0.20.*)", "Bash(ssh root@10.0.20.*)",
"Bash(ssh echo@10.0.20.*)",
"Bash(scp *10.0.20.*)", "Bash(rsync *10.0.20.*)"
],
"ollama": { "ollama": {
"url": "http://localhost:11434" "url": "http://localhost:11434"
}, },

View File

@@ -47,62 +47,50 @@ PERSONALITY_FILES = [
] ]
# Tools allowed in non-interactive (-p) mode. # Tools allowed in non-interactive (-p) mode.
# NOTE: curl/wget intentionally excluded (data exfiltration risk). # Loaded from config.json "allowed_tools" at init, with hardcoded defaults.
# Use WebFetch/WebSearch for safe, read-only web access. _DEFAULT_ALLOWED_TOOLS = [
# SSH/SCP/rsync restricted to local network (10.0.20.*).
ALLOWED_TOOLS = [
"Read", "Edit", "Write", "Glob", "Grep", "Read", "Edit", "Write", "Glob", "Grep",
# Read-only web (safe — cannot POST data) "WebFetch", "WebSearch",
"WebFetch", "Bash(python3 *)", "Bash(.venv/bin/python3 *)",
"WebSearch", "Bash(pip *)", "Bash(pytest *)",
# Python scripts "Bash(git add *)", "Bash(git commit *)",
"Bash(python3 *)", "Bash(git push)", "Bash(git push *)",
"Bash(.venv/bin/python3 *)", "Bash(git pull)", "Bash(git pull *)",
"Bash(pip *)", "Bash(git status)", "Bash(git status *)",
"Bash(pytest *)", "Bash(git diff)", "Bash(git diff *)",
# Git — both bare commands and commands with arguments "Bash(git log)", "Bash(git log *)",
"Bash(git add *)",
"Bash(git commit *)",
"Bash(git push)",
"Bash(git push *)",
"Bash(git pull)",
"Bash(git pull *)",
"Bash(git status)",
"Bash(git status *)",
"Bash(git diff)",
"Bash(git diff *)",
"Bash(git log)",
"Bash(git log *)",
"Bash(git checkout *)", "Bash(git checkout *)",
"Bash(git branch)", "Bash(git branch)", "Bash(git branch *)",
"Bash(git branch *)", "Bash(git stash)", "Bash(git stash *)",
"Bash(git stash)", "Bash(npm *)", "Bash(node *)", "Bash(npx *)",
"Bash(git stash *)",
# Node/npm
"Bash(npm *)",
"Bash(node *)",
"Bash(npx *)",
# System
"Bash(systemctl --user *)", "Bash(systemctl --user *)",
"Bash(trash *)", "Bash(trash *)", "Bash(mkdir *)", "Bash(cp *)",
"Bash(mkdir *)", "Bash(mv *)", "Bash(ls *)", "Bash(cat *)", "Bash(chmod *)",
"Bash(cp *)", "Bash(docker *)", "Bash(docker-compose *)", "Bash(docker compose *)",
"Bash(mv *)", "Bash(ssh *@10.0.20.*)", "Bash(ssh root@10.0.20.*)",
"Bash(ls *)",
"Bash(cat *)",
"Bash(chmod *)",
# Docker (local daemon only)
"Bash(docker *)",
"Bash(docker-compose *)",
"Bash(docker compose *)",
# SSH — local network only (no external hosts)
"Bash(ssh *@10.0.20.*)",
"Bash(ssh root@10.0.20.*)",
"Bash(ssh echo@10.0.20.*)", "Bash(ssh echo@10.0.20.*)",
"Bash(scp *10.0.20.*)", "Bash(scp *10.0.20.*)", "Bash(rsync *10.0.20.*)",
"Bash(rsync *10.0.20.*)",
] ]
def _load_allowed_tools() -> list[str]:
"""Load allowed_tools from config.json, falling back to defaults."""
config_file = PROJECT_ROOT / "config.json"
if config_file.exists():
try:
import json as _json
with open(config_file, encoding="utf-8") as f:
data = _json.load(f)
tools = data.get("allowed_tools")
if isinstance(tools, list) and tools:
return tools
except (ValueError, OSError):
pass
return list(_DEFAULT_ALLOWED_TOOLS)
ALLOWED_TOOLS = _load_allowed_tools()
# Environment variables to REMOVE from Claude subprocess # Environment variables to REMOVE from Claude subprocess
# (secrets, tokens, and vars that cause nested-session errors) # (secrets, tokens, and vars that cause nested-session errors)
_ENV_STRIP = { _ENV_STRIP = {

View File

@@ -1,4 +1,9 @@
"""Echo Core heartbeat — periodic health checks.""" """Echo Core heartbeat — periodic health checks.
Python checks are configured via config.json heartbeat section.
If personality/HEARTBEAT.md has extra instructions beyond basic rules,
a Claude CLI session is triggered to handle them.
"""
import json import json
import logging import logging
@@ -11,51 +16,114 @@ log = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).resolve().parent.parent PROJECT_ROOT = Path(__file__).resolve().parent.parent
STATE_FILE = PROJECT_ROOT / "memory" / "heartbeat-state.json" STATE_FILE = PROJECT_ROOT / "memory" / "heartbeat-state.json"
TOOLS_DIR = PROJECT_ROOT / "tools" TOOLS_DIR = PROJECT_ROOT / "tools"
HEARTBEAT_MD = PROJECT_ROOT / "personality" / "HEARTBEAT.md"
# Defaults (overridable via config.json heartbeat section)
DEFAULT_CHECKS = {
"email": True,
"calendar": True,
"kb_index": True,
"git": True,
}
DEFAULT_COOLDOWNS = {
"email": 1800, # 30 min
"calendar": 0, # every run
"kb_index": 14400, # 4h
"git": 14400, # 4h
}
DEFAULT_QUIET_HOURS = [23, 8]
def run_heartbeat(quiet_hours: tuple[int, int] = (23, 8)) -> str: def run_heartbeat(config: dict | None = None) -> str:
"""Run all heartbeat checks. Returns summary string. """Run all heartbeat checks. Returns summary string.
During quiet hours, returns "HEARTBEAT_OK" unless something critical. Config is read from config["heartbeat"]. Python checks run first,
then Claude CLI is called if HEARTBEAT.md has extra instructions.
""" """
hb_config = (config or {}).get("heartbeat", {})
quiet_hours = tuple(hb_config.get("quiet_hours", DEFAULT_QUIET_HOURS))
check_flags = {**DEFAULT_CHECKS, **hb_config.get("checks", {})}
cooldowns = {**DEFAULT_COOLDOWNS, **hb_config.get("cooldowns", {})}
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
hour = datetime.now().hour # local hour hour = datetime.now().hour # local hour
is_quiet = _is_quiet_hour(hour, quiet_hours) is_quiet = _is_quiet_hour(hour, quiet_hours)
state = _load_state() state = _load_state()
checks = state.setdefault("checks", {})
results = [] results = []
critical = []
# Check 1: Email # Check 1: Email
email_result = _check_email(state) if check_flags.get("email") and _should_run("email", checks, now, cooldowns):
if email_result: email_result = _check_email(state)
results.append(email_result) if email_result:
results.append(email_result)
checks["email"] = now.isoformat()
# Check 2: Calendar # Check 2: Calendar (critical — pierces quiet hours)
cal_result = _check_calendar(state) if check_flags.get("calendar"):
if cal_result: cal_result = _check_calendar(state)
results.append(cal_result) if cal_result:
critical.append(cal_result)
checks["calendar"] = now.isoformat()
# Check 3: KB index freshness # Check 3: KB index freshness + auto-reindex
kb_result = _check_kb_index() if check_flags.get("kb_index") and _should_run("kb_index", checks, now, cooldowns):
if kb_result: kb_result = _check_kb_index()
results.append(kb_result) if kb_result:
results.append(kb_result)
checks["kb_index"] = now.isoformat()
# Check 4: Git status # Check 4: Git status
git_result = _check_git() if check_flags.get("git") and _should_run("git", checks, now, cooldowns):
if git_result: git_result = _check_git()
results.append(git_result) if git_result:
results.append(git_result)
checks["git"] = now.isoformat()
# Claude CLI: run if HEARTBEAT.md has extra instructions
if not is_quiet:
claude_result = _run_claude_extra(
hb_config, critical + results, is_quiet
)
if claude_result:
results.append(claude_result)
# Update state # Update state
state["last_run"] = now.isoformat() state["last_run"] = now.isoformat()
_save_state(state) _save_state(state)
if not results: # Critical items always get through (even quiet hours)
return "HEARTBEAT_OK"
if is_quiet: if is_quiet:
if critical:
return " | ".join(critical)
return "HEARTBEAT_OK" return "HEARTBEAT_OK"
return " | ".join(results) all_results = critical + results
if not all_results:
return "HEARTBEAT_OK"
return " | ".join(all_results)
def _should_run(check_name: str, checks: dict, now: datetime,
cooldowns: dict | None = None) -> bool:
"""Check if enough time has passed since last run of this check."""
cd = cooldowns or DEFAULT_COOLDOWNS
cooldown = cd.get(check_name, 0)
if cooldown == 0:
return True
last_run_str = checks.get(check_name)
if not last_run_str:
return True
try:
last_run = datetime.fromisoformat(last_run_str)
return (now - last_run).total_seconds() >= cooldown
except (ValueError, TypeError):
return True
def _is_quiet_hour(hour: int, quiet_hours: tuple[int, int]) -> bool: def _is_quiet_hour(hour: int, quiet_hours: tuple[int, int]) -> bool:
@@ -67,7 +135,7 @@ def _is_quiet_hour(hour: int, quiet_hours: tuple[int, int]) -> bool:
def _check_email(state: dict) -> str | None: def _check_email(state: dict) -> str | None:
"""Check for new emails via tools/email_check.py.""" """Check for new emails via tools/email_check.py. Parses JSON output."""
script = TOOLS_DIR / "email_check.py" script = TOOLS_DIR / "email_check.py"
if not script.exists(): if not script.exists():
return None return None
@@ -77,18 +145,34 @@ def _check_email(state: dict) -> str | None:
capture_output=True, text=True, timeout=30, capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT) cwd=str(PROJECT_ROOT)
) )
if result.returncode == 0: if result.returncode != 0:
output = result.stdout.strip() return None
if output and output != "0": output = result.stdout.strip()
return f"Email: {output}" if not output:
return None
data = json.loads(output)
if not data.get("ok"):
return None
count = data.get("unread_count", 0)
if count == 0:
return None
emails = data.get("emails", [])
subjects = [e.get("subject", "?") for e in emails[:5]]
subject_list = ", ".join(subjects)
return f"Email: {count} necitite ({subject_list})"
except json.JSONDecodeError:
# Fallback: treat as plain text
output = result.stdout.strip()
if output and output != "0":
return f"Email: {output}"
return None return None
except Exception as e: except Exception as e:
log.warning(f"Email check failed: {e}") log.warning("Email check failed: %s", e)
return None return None
def _check_calendar(state: dict) -> str | None: def _check_calendar(state: dict) -> str | None:
"""Check upcoming calendar events via tools/calendar_check.py.""" """Check upcoming calendar events via tools/calendar_check.py. Parses JSON."""
script = TOOLS_DIR / "calendar_check.py" script = TOOLS_DIR / "calendar_check.py"
if not script.exists(): if not script.exists():
return None return None
@@ -98,21 +182,39 @@ def _check_calendar(state: dict) -> str | None:
capture_output=True, text=True, timeout=30, capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT) cwd=str(PROJECT_ROOT)
) )
if result.returncode == 0: if result.returncode != 0:
output = result.stdout.strip() return None
if output: output = result.stdout.strip()
return f"Calendar: {output}" if not output:
return None
data = json.loads(output)
upcoming = data.get("upcoming", [])
if not upcoming:
return None
parts = []
for event in upcoming:
mins = event.get("minutes_until", "?")
name = event.get("summary", "?")
time = event.get("time", "")
parts.append(f"in {mins} min — {name} ({time})")
return "Calendar: " + "; ".join(parts)
except json.JSONDecodeError:
# Fallback: treat as plain text
output = result.stdout.strip()
if output:
return f"Calendar: {output}"
return None return None
except Exception as e: except Exception as e:
log.warning(f"Calendar check failed: {e}") log.warning("Calendar check failed: %s", e)
return None return None
def _check_kb_index() -> str | None: def _check_kb_index() -> str | None:
"""Check if any .md files in memory/kb/ are newer than index.json.""" """Check if .md files in memory/kb/ are newer than index.json. Auto-reindex."""
index_file = PROJECT_ROOT / "memory" / "kb" / "index.json" index_file = PROJECT_ROOT / "memory" / "kb" / "index.json"
if not index_file.exists(): if not index_file.exists():
return "KB: index missing" _run_reindex()
return "KB: index regenerat"
index_mtime = index_file.stat().st_mtime index_mtime = index_file.stat().st_mtime
kb_dir = PROJECT_ROOT / "memory" / "kb" kb_dir = PROJECT_ROOT / "memory" / "kb"
@@ -123,10 +225,27 @@ def _check_kb_index() -> str | None:
newer += 1 newer += 1
if newer > 0: if newer > 0:
return f"KB: {newer} files need reindex" _run_reindex()
return f"KB: {newer} fișiere reindexate"
return None return None
def _run_reindex() -> None:
"""Run tools/update_notes_index.py to regenerate KB index."""
script = TOOLS_DIR / "update_notes_index.py"
if not script.exists():
log.warning("KB reindex script not found: %s", script)
return
try:
subprocess.run(
["python3", str(script)],
capture_output=True, text=True, timeout=60,
cwd=str(PROJECT_ROOT)
)
except Exception as e:
log.warning("KB reindex failed: %s", e)
def _check_git() -> str | None: def _check_git() -> str | None:
"""Check for uncommitted files in project.""" """Check for uncommitted files in project."""
try: try:
@@ -144,6 +263,96 @@ def _check_git() -> str | None:
return None return None
def _get_extra_instructions() -> str | None:
"""Read HEARTBEAT.md and return extra instructions if any.
Skips the basic structure (title, quiet hours rules).
Returns None if only boilerplate remains.
"""
if not HEARTBEAT_MD.exists():
return None
content = HEARTBEAT_MD.read_text(encoding="utf-8").strip()
if not content:
return None
# Strip lines that are just headers, empty, or the standard rules
meaningful = []
for line in content.split("\n"):
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("# HEARTBEAT"):
continue
if stripped.startswith("## Reguli"):
continue
if "HEARTBEAT_OK" in stripped:
continue
if "quiet" in stripped.lower() or "noapte" in stripped.lower():
continue
if "nu spama" in stripped.lower() or "nu deranja" in stripped.lower():
continue
meaningful.append(line)
if not meaningful:
return None
return "\n".join(meaningful).strip()
def _run_claude_extra(hb_config: dict, python_results: list[str],
is_quiet: bool) -> str | None:
"""Run Claude CLI if HEARTBEAT.md has extra instructions."""
from src.claude_session import CLAUDE_BIN, _safe_env
extra = _get_extra_instructions()
if not extra:
return None
model = hb_config.get("model", "haiku")
# Build prompt with context
context_parts = ["Heartbeat tick."]
if python_results:
context_parts.append(
f"Check-uri Python: {' | '.join(python_results)}"
)
else:
context_parts.append("Check-urile Python nu au găsit nimic.")
context_parts.append(f"Instrucțiuni extra din HEARTBEAT.md:\n{extra}")
context_parts.append(
"Execută instrucțiunile de mai sus. "
"Răspunde DOAR cu rezultatul (scurt, fără explicații). "
"Dacă nu e nimic de raportat, răspunde cu HEARTBEAT_OK."
)
prompt = "\n\n".join(context_parts)
cmd = [
CLAUDE_BIN, "-p", prompt,
"--model", model,
"--output-format", "json",
]
try:
proc = subprocess.run(
cmd,
capture_output=True, text=True, timeout=120,
env=_safe_env(),
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0:
log.warning("Claude heartbeat extra failed (exit %d): %s",
proc.returncode, proc.stderr[:200])
return None
data = json.loads(proc.stdout)
result = data.get("result", "").strip()
if not result or result == "HEARTBEAT_OK":
return None
return result
except subprocess.TimeoutExpired:
log.warning("Claude heartbeat extra timed out")
return None
except Exception as e:
log.warning("Claude heartbeat extra error: %s", e)
return None
def _load_state() -> dict: def _load_state() -> dict:
"""Load heartbeat state from JSON file.""" """Load heartbeat state from JSON file."""
if STATE_FILE.exists(): if STATE_FILE.exists():

View File

@@ -87,12 +87,15 @@ def main():
from src.heartbeat import run_heartbeat from src.heartbeat import run_heartbeat
interval_min = hb_config.get("interval_minutes", 30) interval_min = hb_config.get("interval_minutes", 30)
hb_channel = hb_config.get("channel", "echo-core")
async def _heartbeat_tick() -> None: async def _heartbeat_tick() -> None:
"""Run heartbeat and log result.""" """Run heartbeat and send findings to channel."""
try: try:
result = await asyncio.to_thread(run_heartbeat) result = await asyncio.to_thread(run_heartbeat, config)
logger.info("Heartbeat: %s", result) logger.info("Heartbeat: %s", result)
if result != "HEARTBEAT_OK":
await _send_to_channel(hb_channel, result)
except Exception as exc: except Exception as exc:
logger.error("Heartbeat failed: %s", exc) logger.error("Heartbeat failed: %s", exc)
@@ -105,7 +108,8 @@ def main():
max_instances=1, max_instances=1,
) )
logger.info( logger.info(
"Heartbeat registered (every %d min)", interval_min "Heartbeat registered (every %d min, channel: %s)",
interval_min, hb_channel,
) )
# Telegram bot (optional — only if telegram_token exists) # Telegram bot (optional — only if telegram_token exists)

View File

@@ -14,16 +14,53 @@ import httpx
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
OLLAMA_URL = "http://10.0.20.161:11434/api/embeddings" PROJECT_ROOT = Path(__file__).resolve().parent.parent
OLLAMA_MODEL = "all-minilm" DB_PATH = PROJECT_ROOT / "memory" / "echo.sqlite"
EMBEDDING_DIM = 384 MEMORY_DIR = PROJECT_ROOT / "memory"
DB_PATH = Path(__file__).resolve().parent.parent / "memory" / "echo.sqlite"
MEMORY_DIR = Path(__file__).resolve().parent.parent / "memory"
# Defaults — overridable via config.json ollama/memory sections
_OLLAMA_BASE_URL = "http://localhost:11434"
_OLLAMA_MODEL = "all-minilm"
_EMBEDDING_DIM = 384
_CHUNK_TARGET = 500 _CHUNK_TARGET = 500
_CHUNK_MAX = 1000 _CHUNK_MAX = 1000
_CHUNK_MIN = 100 _CHUNK_MIN = 100
# Runtime config (populated by init_config)
OLLAMA_URL = f"{_OLLAMA_BASE_URL}/api/embeddings"
OLLAMA_MODEL = _OLLAMA_MODEL
EMBEDDING_DIM = _EMBEDDING_DIM
def init_config(config=None) -> None:
"""Load settings from config object. Call once at startup."""
global OLLAMA_URL, OLLAMA_MODEL, EMBEDDING_DIM
if config is None:
# Try loading from config.json directly
config_file = PROJECT_ROOT / "config.json"
if config_file.exists():
import json
try:
with open(config_file, encoding="utf-8") as f:
data = json.load(f)
base_url = data.get("ollama", {}).get("url", _OLLAMA_BASE_URL)
OLLAMA_URL = f"{base_url.rstrip('/')}/api/embeddings"
OLLAMA_MODEL = data.get("ollama", {}).get("model", _OLLAMA_MODEL)
EMBEDDING_DIM = data.get("ollama", {}).get("embedding_dim", _EMBEDDING_DIM)
except (json.JSONDecodeError, OSError):
pass
return
# Config object with .get() method
base_url = config.get("ollama.url", _OLLAMA_BASE_URL)
OLLAMA_URL = f"{base_url.rstrip('/')}/api/embeddings"
OLLAMA_MODEL = config.get("ollama.model", _OLLAMA_MODEL)
EMBEDDING_DIM = config.get("ollama.embedding_dim", _EMBEDDING_DIM)
# Auto-init from config.json on import
init_config()
def get_db() -> sqlite3.Connection: def get_db() -> sqlite3.Connection:
"""Get SQLite connection, create table if needed.""" """Get SQLite connection, create table if needed."""

View File

@@ -2,6 +2,7 @@
import json import json
import time import time
from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
@@ -12,10 +13,13 @@ from src.heartbeat import (
_check_email, _check_email,
_check_git, _check_git,
_check_kb_index, _check_kb_index,
_get_extra_instructions,
_is_quiet_hour, _is_quiet_hour,
_load_state, _load_state,
_save_state, _save_state,
_should_run,
run_heartbeat, run_heartbeat,
DEFAULT_COOLDOWNS,
) )
@@ -26,19 +30,25 @@ from src.heartbeat import (
@pytest.fixture @pytest.fixture
def tmp_env(tmp_path, monkeypatch): def tmp_env(tmp_path, monkeypatch):
"""Redirect PROJECT_ROOT, STATE_FILE, TOOLS_DIR to tmp_path.""" """Redirect PROJECT_ROOT, STATE_FILE, TOOLS_DIR, HEARTBEAT_MD to tmp_path."""
root = tmp_path / "project" root = tmp_path / "project"
root.mkdir() root.mkdir()
tools = root / "tools" tools = root / "tools"
tools.mkdir() tools.mkdir()
mem = root / "memory" mem = root / "memory"
mem.mkdir() mem.mkdir()
personality = root / "personality"
personality.mkdir()
state_file = mem / "heartbeat-state.json" state_file = mem / "heartbeat-state.json"
monkeypatch.setattr("src.heartbeat.PROJECT_ROOT", root) monkeypatch.setattr("src.heartbeat.PROJECT_ROOT", root)
monkeypatch.setattr("src.heartbeat.STATE_FILE", state_file) monkeypatch.setattr("src.heartbeat.STATE_FILE", state_file)
monkeypatch.setattr("src.heartbeat.TOOLS_DIR", tools) monkeypatch.setattr("src.heartbeat.TOOLS_DIR", tools)
return {"root": root, "tools": tools, "memory": mem, "state_file": state_file} monkeypatch.setattr("src.heartbeat.HEARTBEAT_MD", personality / "HEARTBEAT.md")
return {
"root": root, "tools": tools, "memory": mem,
"state_file": state_file, "personality": personality,
}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -59,7 +69,6 @@ class TestIsQuietHour:
assert _is_quiet_hour(12, (23, 8)) is False assert _is_quiet_hour(12, (23, 8)) is False
def test_overnight_range_at_end_boundary(self): def test_overnight_range_at_end_boundary(self):
# hour == end is NOT quiet (end is exclusive)
assert _is_quiet_hour(8, (23, 8)) is False assert _is_quiet_hour(8, (23, 8)) is False
def test_daytime_range_inside(self): def test_daytime_range_inside(self):
@@ -75,28 +84,97 @@ class TestIsQuietHour:
assert _is_quiet_hour(20, (9, 17)) is False assert _is_quiet_hour(20, (9, 17)) is False
# ---------------------------------------------------------------------------
# _should_run (cooldowns)
# ---------------------------------------------------------------------------
class TestShouldRun:
"""Test cooldown logic for checks."""
def test_no_previous_run(self):
assert _should_run("email", {}, datetime.now(timezone.utc)) is True
def test_within_cooldown(self):
now = datetime.now(timezone.utc)
checks = {"email": (now - timedelta(minutes=10)).isoformat()}
assert _should_run("email", checks, now) is False
def test_past_cooldown(self):
now = datetime.now(timezone.utc)
checks = {"email": (now - timedelta(minutes=35)).isoformat()}
assert _should_run("email", checks, now) is True
def test_zero_cooldown_always_runs(self):
now = datetime.now(timezone.utc)
checks = {"calendar": now.isoformat()}
assert _should_run("calendar", checks, now) is True
def test_corrupt_timestamp(self):
now = datetime.now(timezone.utc)
checks = {"email": "not-a-date"}
assert _should_run("email", checks, now) is True
def test_custom_cooldowns(self):
"""Accepts custom cooldowns dict."""
now = datetime.now(timezone.utc)
checks = {"email": (now - timedelta(minutes=5)).isoformat()}
# Default 1800s (30min) — should NOT run
assert _should_run("email", checks, now) is False
# Custom 60s — should run (5 min > 60s)
assert _should_run("email", checks, now, cooldowns={"email": 60}) is True
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _check_email # _check_email
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestCheckEmail: class TestCheckEmail:
"""Test email check via tools/email_check.py.""" """Test email check via tools/email_check.py with JSON parsing."""
def test_no_script(self, tmp_env): def test_no_script(self, tmp_env):
"""Returns None when email_check.py does not exist."""
assert _check_email({}) is None assert _check_email({}) is None
def test_with_output(self, tmp_env): def test_json_with_unread(self, tmp_env):
"""Returns formatted email string when script outputs something.""" script = tmp_env["tools"] / "email_check.py"
script.write_text("pass")
output = json.dumps({
"ok": True, "unread_count": 2,
"emails": [
{"subject": "Meeting azi", "from": "boss@work.com"},
{"subject": "Factura", "from": "billing@x.com"},
]
})
mock_result = MagicMock(returncode=0, stdout=output)
with patch("src.heartbeat.subprocess.run", return_value=mock_result):
result = _check_email({})
assert result == "Email: 2 necitite (Meeting azi, Factura)"
def test_json_zero_unread(self, tmp_env):
script = tmp_env["tools"] / "email_check.py"
script.write_text("pass")
output = json.dumps({"ok": True, "unread_count": 0, "emails": []})
mock_result = MagicMock(returncode=0, stdout=output)
with patch("src.heartbeat.subprocess.run", return_value=mock_result):
assert _check_email({}) is None
def test_json_not_ok(self, tmp_env):
script = tmp_env["tools"] / "email_check.py"
script.write_text("pass")
output = json.dumps({"ok": False, "error": "auth failed"})
mock_result = MagicMock(returncode=0, stdout=output)
with patch("src.heartbeat.subprocess.run", return_value=mock_result):
assert _check_email({}) is None
def test_plaintext_fallback(self, tmp_env):
script = tmp_env["tools"] / "email_check.py" script = tmp_env["tools"] / "email_check.py"
script.write_text("pass") script.write_text("pass")
mock_result = MagicMock(returncode=0, stdout="3 new messages\n") mock_result = MagicMock(returncode=0, stdout="3 new messages\n")
with patch("src.heartbeat.subprocess.run", return_value=mock_result): with patch("src.heartbeat.subprocess.run", return_value=mock_result):
assert _check_email({}) == "Email: 3 new messages" assert _check_email({}) == "Email: 3 new messages"
def test_zero_output(self, tmp_env): def test_plaintext_zero(self, tmp_env):
"""Returns None when script outputs '0' (no new mail)."""
script = tmp_env["tools"] / "email_check.py" script = tmp_env["tools"] / "email_check.py"
script.write_text("pass") script.write_text("pass")
mock_result = MagicMock(returncode=0, stdout="0\n") mock_result = MagicMock(returncode=0, stdout="0\n")
@@ -104,7 +182,6 @@ class TestCheckEmail:
assert _check_email({}) is None assert _check_email({}) is None
def test_empty_output(self, tmp_env): def test_empty_output(self, tmp_env):
"""Returns None when script outputs empty string."""
script = tmp_env["tools"] / "email_check.py" script = tmp_env["tools"] / "email_check.py"
script.write_text("pass") script.write_text("pass")
mock_result = MagicMock(returncode=0, stdout="\n") mock_result = MagicMock(returncode=0, stdout="\n")
@@ -112,7 +189,6 @@ class TestCheckEmail:
assert _check_email({}) is None assert _check_email({}) is None
def test_nonzero_returncode(self, tmp_env): def test_nonzero_returncode(self, tmp_env):
"""Returns None when script exits with error."""
script = tmp_env["tools"] / "email_check.py" script = tmp_env["tools"] / "email_check.py"
script.write_text("pass") script.write_text("pass")
mock_result = MagicMock(returncode=1, stdout="error") mock_result = MagicMock(returncode=1, stdout="error")
@@ -120,7 +196,6 @@ class TestCheckEmail:
assert _check_email({}) is None assert _check_email({}) is None
def test_subprocess_exception(self, tmp_env): def test_subprocess_exception(self, tmp_env):
"""Returns None when subprocess raises (e.g. timeout)."""
script = tmp_env["tools"] / "email_check.py" script = tmp_env["tools"] / "email_check.py"
script.write_text("pass") script.write_text("pass")
with patch("src.heartbeat.subprocess.run", side_effect=TimeoutError): with patch("src.heartbeat.subprocess.run", side_effect=TimeoutError):
@@ -133,14 +208,33 @@ class TestCheckEmail:
class TestCheckCalendar: class TestCheckCalendar:
"""Test calendar check via tools/calendar_check.py.""" """Test calendar check via tools/calendar_check.py with JSON parsing."""
def test_no_script(self, tmp_env): def test_no_script(self, tmp_env):
"""Returns None when calendar_check.py does not exist."""
assert _check_calendar({}) is None assert _check_calendar({}) is None
def test_with_events(self, tmp_env): def test_json_with_upcoming(self, tmp_env):
"""Returns formatted calendar string when script outputs events.""" script = tmp_env["tools"] / "calendar_check.py"
script.write_text("pass")
output = json.dumps({
"upcoming": [
{"summary": "NLP Session", "minutes_until": 45, "time": "15:00"},
]
})
mock_result = MagicMock(returncode=0, stdout=output)
with patch("src.heartbeat.subprocess.run", return_value=mock_result):
result = _check_calendar({})
assert result == "Calendar: in 45 min — NLP Session (15:00)"
def test_json_no_upcoming(self, tmp_env):
script = tmp_env["tools"] / "calendar_check.py"
script.write_text("pass")
output = json.dumps({"upcoming": []})
mock_result = MagicMock(returncode=0, stdout=output)
with patch("src.heartbeat.subprocess.run", return_value=mock_result):
assert _check_calendar({}) is None
def test_plaintext_fallback(self, tmp_env):
script = tmp_env["tools"] / "calendar_check.py" script = tmp_env["tools"] / "calendar_check.py"
script.write_text("pass") script.write_text("pass")
mock_result = MagicMock(returncode=0, stdout="Meeting at 3pm\n") mock_result = MagicMock(returncode=0, stdout="Meeting at 3pm\n")
@@ -148,7 +242,6 @@ class TestCheckCalendar:
assert _check_calendar({}) == "Calendar: Meeting at 3pm" assert _check_calendar({}) == "Calendar: Meeting at 3pm"
def test_empty_output(self, tmp_env): def test_empty_output(self, tmp_env):
"""Returns None when no upcoming events."""
script = tmp_env["tools"] / "calendar_check.py" script = tmp_env["tools"] / "calendar_check.py"
script.write_text("pass") script.write_text("pass")
mock_result = MagicMock(returncode=0, stdout="\n") mock_result = MagicMock(returncode=0, stdout="\n")
@@ -156,7 +249,6 @@ class TestCheckCalendar:
assert _check_calendar({}) is None assert _check_calendar({}) is None
def test_subprocess_exception(self, tmp_env): def test_subprocess_exception(self, tmp_env):
"""Returns None when subprocess raises."""
script = tmp_env["tools"] / "calendar_check.py" script = tmp_env["tools"] / "calendar_check.py"
script.write_text("pass") script.write_text("pass")
with patch("src.heartbeat.subprocess.run", side_effect=OSError("fail")): with patch("src.heartbeat.subprocess.run", side_effect=OSError("fail")):
@@ -169,14 +261,15 @@ class TestCheckCalendar:
class TestCheckKbIndex: class TestCheckKbIndex:
"""Test KB index freshness check.""" """Test KB index freshness check with auto-reindex."""
def test_missing_index(self, tmp_env): def test_missing_index(self, tmp_env):
"""Returns warning when index.json does not exist.""" with patch("src.heartbeat._run_reindex") as mock_reindex:
assert _check_kb_index() == "KB: index missing" result = _check_kb_index()
assert result == "KB: index regenerat"
mock_reindex.assert_called_once()
def test_up_to_date(self, tmp_env): def test_up_to_date(self, tmp_env):
"""Returns None when all .md files are older than index."""
kb_dir = tmp_env["root"] / "memory" / "kb" kb_dir = tmp_env["root"] / "memory" / "kb"
kb_dir.mkdir(parents=True) kb_dir.mkdir(parents=True)
md_file = kb_dir / "notes.md" md_file = kb_dir / "notes.md"
@@ -187,7 +280,6 @@ class TestCheckKbIndex:
assert _check_kb_index() is None assert _check_kb_index() is None
def test_needs_reindex(self, tmp_env): def test_needs_reindex(self, tmp_env):
"""Returns reindex warning when .md files are newer than index."""
kb_dir = tmp_env["root"] / "memory" / "kb" kb_dir = tmp_env["root"] / "memory" / "kb"
kb_dir.mkdir(parents=True) kb_dir.mkdir(parents=True)
index = kb_dir / "index.json" index = kb_dir / "index.json"
@@ -197,7 +289,10 @@ class TestCheckKbIndex:
md1.write_text("new") md1.write_text("new")
md2 = kb_dir / "b.md" md2 = kb_dir / "b.md"
md2.write_text("also new") md2.write_text("also new")
assert _check_kb_index() == "KB: 2 files need reindex" with patch("src.heartbeat._run_reindex") as mock_reindex:
result = _check_kb_index()
assert result == "KB: 2 fișiere reindexate"
mock_reindex.assert_called_once()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -209,13 +304,11 @@ class TestCheckGit:
"""Test git status check.""" """Test git status check."""
def test_clean(self, tmp_env): def test_clean(self, tmp_env):
"""Returns None when working tree is clean."""
mock_result = MagicMock(returncode=0, stdout="\n") mock_result = MagicMock(returncode=0, stdout="\n")
with patch("src.heartbeat.subprocess.run", return_value=mock_result): with patch("src.heartbeat.subprocess.run", return_value=mock_result):
assert _check_git() is None assert _check_git() is None
def test_dirty(self, tmp_env): def test_dirty(self, tmp_env):
"""Returns uncommitted count when there are changes."""
mock_result = MagicMock( mock_result = MagicMock(
returncode=0, returncode=0,
stdout=" M file1.py\n?? file2.py\n M file3.py\n", stdout=" M file1.py\n?? file2.py\n M file3.py\n",
@@ -224,11 +317,50 @@ class TestCheckGit:
assert _check_git() == "Git: 3 uncommitted" assert _check_git() == "Git: 3 uncommitted"
def test_subprocess_exception(self, tmp_env): def test_subprocess_exception(self, tmp_env):
"""Returns None when git command fails."""
with patch("src.heartbeat.subprocess.run", side_effect=OSError): with patch("src.heartbeat.subprocess.run", side_effect=OSError):
assert _check_git() is None assert _check_git() is None
# ---------------------------------------------------------------------------
# _get_extra_instructions
# ---------------------------------------------------------------------------
class TestGetExtraInstructions:
"""Test HEARTBEAT.md parsing for extra instructions."""
def test_no_file(self, tmp_env):
"""Returns None when HEARTBEAT.md doesn't exist."""
assert _get_extra_instructions() is None
def test_only_boilerplate(self, tmp_env):
"""Returns None when HEARTBEAT.md has only standard rules."""
hb = tmp_env["personality"] / "HEARTBEAT.md"
hb.write_text(
"# HEARTBEAT.md\n\n"
"## Reguli\n\n"
"- **Noapte (23:00-08:00):** Doar HEARTBEAT_OK, nu deranja\n"
"- **Nu spama:** Dacă nu e nimic, HEARTBEAT_OK\n"
)
assert _get_extra_instructions() is None
def test_with_extra(self, tmp_env):
"""Returns extra instructions when present."""
hb = tmp_env["personality"] / "HEARTBEAT.md"
hb.write_text(
"# HEARTBEAT.md\n\n"
"## Reguli\n\n"
"- **Noapte (23:00-08:00):** Doar HEARTBEAT_OK, nu deranja\n\n"
"## Extra\n\n"
"- Verifică dacă backup-ul s-a făcut\n"
"- Raportează uptime-ul serverului\n"
)
result = _get_extra_instructions()
assert result is not None
assert "backup" in result
assert "uptime" in result
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _load_state / _save_state # _load_state / _save_state
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -238,25 +370,21 @@ class TestState:
"""Test state persistence.""" """Test state persistence."""
def test_load_missing_file(self, tmp_env): def test_load_missing_file(self, tmp_env):
"""Returns default state when file does not exist."""
state = _load_state() state = _load_state()
assert state == {"last_run": None, "checks": {}} assert state == {"last_run": None, "checks": {}}
def test_round_trip(self, tmp_env): def test_round_trip(self, tmp_env):
"""State survives save then load.""" original = {"last_run": "2025-01-01T00:00:00", "checks": {"email": "2025-01-01T00:00:00"}}
original = {"last_run": "2025-01-01T00:00:00", "checks": {"email": True}}
_save_state(original) _save_state(original)
loaded = _load_state() loaded = _load_state()
assert loaded == original assert loaded == original
def test_load_corrupt_json(self, tmp_env): def test_load_corrupt_json(self, tmp_env):
"""Returns default state when JSON is corrupt."""
tmp_env["state_file"].write_text("not valid json {{{") tmp_env["state_file"].write_text("not valid json {{{")
state = _load_state() state = _load_state()
assert state == {"last_run": None, "checks": {}} assert state == {"last_run": None, "checks": {}}
def test_save_creates_parent_dir(self, tmp_path, monkeypatch): def test_save_creates_parent_dir(self, tmp_path, monkeypatch):
"""_save_state creates parent directory if missing."""
state_file = tmp_path / "deep" / "nested" / "state.json" state_file = tmp_path / "deep" / "nested" / "state.json"
monkeypatch.setattr("src.heartbeat.STATE_FILE", state_file) monkeypatch.setattr("src.heartbeat.STATE_FILE", state_file)
_save_state({"last_run": None, "checks": {}}) _save_state({"last_run": None, "checks": {}})
@@ -272,41 +400,84 @@ class TestRunHeartbeat:
"""Test the top-level run_heartbeat orchestrator.""" """Test the top-level run_heartbeat orchestrator."""
def test_all_ok(self, tmp_env): def test_all_ok(self, tmp_env):
"""Returns HEARTBEAT_OK when all checks pass with no issues."""
with patch("src.heartbeat._check_email", return_value=None), \ with patch("src.heartbeat._check_email", return_value=None), \
patch("src.heartbeat._check_calendar", return_value=None), \ patch("src.heartbeat._check_calendar", return_value=None), \
patch("src.heartbeat._check_kb_index", return_value=None), \ patch("src.heartbeat._check_kb_index", return_value=None), \
patch("src.heartbeat._check_git", return_value=None): patch("src.heartbeat._check_git", return_value=None), \
patch("src.heartbeat._run_claude_extra", return_value=None):
result = run_heartbeat() result = run_heartbeat()
assert result == "HEARTBEAT_OK" assert result == "HEARTBEAT_OK"
def test_with_results(self, tmp_env): def test_with_results(self, tmp_env):
"""Returns joined results when checks report issues.""" with patch("src.heartbeat._check_email", return_value="Email: 2 necitite (X, Y)"), \
with patch("src.heartbeat._check_email", return_value="Email: 2 new"), \
patch("src.heartbeat._check_calendar", return_value=None), \ patch("src.heartbeat._check_calendar", return_value=None), \
patch("src.heartbeat._check_kb_index", return_value="KB: 1 files need reindex"), \ patch("src.heartbeat._check_kb_index", return_value="KB: 1 fișiere reindexate"), \
patch("src.heartbeat._check_git", return_value=None), \ patch("src.heartbeat._check_git", return_value=None), \
patch("src.heartbeat._is_quiet_hour", return_value=False): patch("src.heartbeat._is_quiet_hour", return_value=False), \
patch("src.heartbeat._run_claude_extra", return_value=None):
result = run_heartbeat() result = run_heartbeat()
assert result == "Email: 2 new | KB: 1 files need reindex" assert result == "Email: 2 necitite (X, Y) | KB: 1 fișiere reindexate"
def test_quiet_hours_suppression(self, tmp_env): def test_quiet_hours_suppresses_normal(self, tmp_env):
"""Returns HEARTBEAT_OK during quiet hours even with issues.""" with patch("src.heartbeat._check_email", return_value="Email: 5 necitite (A)"), \
with patch("src.heartbeat._check_email", return_value="Email: 5 new"), \ patch("src.heartbeat._check_calendar", return_value=None), \
patch("src.heartbeat._check_calendar", return_value="Calendar: meeting"), \
patch("src.heartbeat._check_kb_index", return_value=None), \ patch("src.heartbeat._check_kb_index", return_value=None), \
patch("src.heartbeat._check_git", return_value="Git: 2 uncommitted"), \ patch("src.heartbeat._check_git", return_value="Git: 2 uncommitted"), \
patch("src.heartbeat._is_quiet_hour", return_value=True): patch("src.heartbeat._is_quiet_hour", return_value=True):
result = run_heartbeat() result = run_heartbeat()
assert result == "HEARTBEAT_OK" assert result == "HEARTBEAT_OK"
def test_saves_state_after_run(self, tmp_env): def test_quiet_hours_allows_critical_calendar(self, tmp_env):
"""State file is updated after heartbeat runs.""" with patch("src.heartbeat._check_email", return_value="Email: 5 necitite (A)"), \
patch("src.heartbeat._check_calendar", return_value="Calendar: in 30 min — Meeting (15:00)"), \
patch("src.heartbeat._check_kb_index", return_value=None), \
patch("src.heartbeat._check_git", return_value="Git: 2 uncommitted"), \
patch("src.heartbeat._is_quiet_hour", return_value=True):
result = run_heartbeat()
assert result == "Calendar: in 30 min — Meeting (15:00)"
def test_config_disables_check(self, tmp_env):
"""Checks can be disabled via config."""
config = {"heartbeat": {"checks": {"git": False}}}
with patch("src.heartbeat._check_email", return_value=None), \ with patch("src.heartbeat._check_email", return_value=None), \
patch("src.heartbeat._check_calendar", return_value=None), \ patch("src.heartbeat._check_calendar", return_value=None), \
patch("src.heartbeat._check_kb_index", return_value=None), \ patch("src.heartbeat._check_kb_index", return_value=None), \
patch("src.heartbeat._check_git", return_value=None): patch("src.heartbeat._check_git", return_value="Git: 5 uncommitted") as mock_git, \
patch("src.heartbeat._run_claude_extra", return_value=None):
result = run_heartbeat(config)
mock_git.assert_not_called()
assert result == "HEARTBEAT_OK"
def test_config_custom_quiet_hours(self, tmp_env):
"""Quiet hours can be overridden via config."""
config = {"heartbeat": {"quiet_hours": [0, 1]}} # only 0-1 is quiet
with patch("src.heartbeat._check_email", return_value=None), \
patch("src.heartbeat._check_calendar", return_value=None), \
patch("src.heartbeat._check_kb_index", return_value=None), \
patch("src.heartbeat._check_git", return_value="Git: 3 uncommitted"), \
patch("src.heartbeat._is_quiet_hour", return_value=False), \
patch("src.heartbeat._run_claude_extra", return_value=None):
result = run_heartbeat(config)
assert "Git: 3 uncommitted" in result
def test_saves_state_after_run(self, tmp_env):
with patch("src.heartbeat._check_email", return_value=None), \
patch("src.heartbeat._check_calendar", return_value=None), \
patch("src.heartbeat._check_kb_index", return_value=None), \
patch("src.heartbeat._check_git", return_value=None), \
patch("src.heartbeat._run_claude_extra", return_value=None):
run_heartbeat() run_heartbeat()
state = json.loads(tmp_env["state_file"].read_text()) state = json.loads(tmp_env["state_file"].read_text())
assert "last_run" in state assert "last_run" in state
assert state["last_run"] is not None assert state["last_run"] is not None
def test_saves_check_timestamps(self, tmp_env):
with patch("src.heartbeat._check_email", return_value=None), \
patch("src.heartbeat._check_calendar", return_value=None), \
patch("src.heartbeat._check_kb_index", return_value=None), \
patch("src.heartbeat._check_git", return_value=None), \
patch("src.heartbeat._run_claude_extra", return_value=None):
run_heartbeat()
state = json.loads(tmp_env["state_file"].read_text())
assert "checks" in state
assert "calendar" in state["checks"]