stage-9: heartbeat system with periodic checks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MoltBot Service
2026-02-13 16:40:39 +00:00
parent 24a4d87f8c
commit 0bc4b8cb3e
6 changed files with 527 additions and 1 deletions

163
src/heartbeat.py Normal file
View File

@@ -0,0 +1,163 @@
"""Echo Core heartbeat — periodic health checks."""
import json
import logging
import subprocess
from datetime import datetime, timezone
from pathlib import Path
log = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).resolve().parent.parent
STATE_FILE = PROJECT_ROOT / "memory" / "heartbeat-state.json"
TOOLS_DIR = PROJECT_ROOT / "tools"
def run_heartbeat(quiet_hours: tuple[int, int] = (23, 8)) -> str:
"""Run all heartbeat checks. Returns summary string.
During quiet hours, returns "HEARTBEAT_OK" unless something critical.
"""
now = datetime.now(timezone.utc)
hour = datetime.now().hour # local hour
is_quiet = _is_quiet_hour(hour, quiet_hours)
state = _load_state()
results = []
# Check 1: Email
email_result = _check_email(state)
if email_result:
results.append(email_result)
# Check 2: Calendar
cal_result = _check_calendar(state)
if cal_result:
results.append(cal_result)
# Check 3: KB index freshness
kb_result = _check_kb_index()
if kb_result:
results.append(kb_result)
# Check 4: Git status
git_result = _check_git()
if git_result:
results.append(git_result)
# Update state
state["last_run"] = now.isoformat()
_save_state(state)
if not results:
return "HEARTBEAT_OK"
if is_quiet:
return "HEARTBEAT_OK"
return " | ".join(results)
def _is_quiet_hour(hour: int, quiet_hours: tuple[int, int]) -> bool:
"""Check if current hour is in quiet range. Handles overnight (23-08)."""
start, end = quiet_hours
if start > end: # overnight
return hour >= start or hour < end
return start <= hour < end
def _check_email(state: dict) -> str | None:
"""Check for new emails via tools/email_check.py."""
script = TOOLS_DIR / "email_check.py"
if not script.exists():
return None
try:
result = subprocess.run(
["python3", str(script)],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT)
)
if result.returncode == 0:
output = result.stdout.strip()
if output and output != "0":
return f"Email: {output}"
return None
except Exception as e:
log.warning(f"Email check failed: {e}")
return None
def _check_calendar(state: dict) -> str | None:
"""Check upcoming calendar events via tools/calendar_check.py."""
script = TOOLS_DIR / "calendar_check.py"
if not script.exists():
return None
try:
result = subprocess.run(
["python3", str(script), "soon"],
capture_output=True, text=True, timeout=30,
cwd=str(PROJECT_ROOT)
)
if result.returncode == 0:
output = result.stdout.strip()
if output:
return f"Calendar: {output}"
return None
except Exception as e:
log.warning(f"Calendar check failed: {e}")
return None
def _check_kb_index() -> str | None:
"""Check if any .md files in memory/kb/ are newer than index.json."""
index_file = PROJECT_ROOT / "memory" / "kb" / "index.json"
if not index_file.exists():
return "KB: index missing"
index_mtime = index_file.stat().st_mtime
kb_dir = PROJECT_ROOT / "memory" / "kb"
newer = 0
for md in kb_dir.rglob("*.md"):
if md.stat().st_mtime > index_mtime:
newer += 1
if newer > 0:
return f"KB: {newer} files need reindex"
return None
def _check_git() -> str | None:
"""Check for uncommitted files in project."""
try:
result = subprocess.run(
["git", "status", "--porcelain"],
capture_output=True, text=True, timeout=10,
cwd=str(PROJECT_ROOT)
)
if result.returncode == 0:
lines = [l for l in result.stdout.strip().split("\n") if l.strip()]
if lines:
return f"Git: {len(lines)} uncommitted"
return None
except Exception:
return None
def _load_state() -> dict:
"""Load heartbeat state from JSON file."""
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
return {"last_run": None, "checks": {}}
def _save_state(state: dict) -> None:
"""Save heartbeat state to JSON file."""
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(
json.dumps(state, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8"
)