"""Echo Core heartbeat — periodic health checks. Python checks are configured via config.json heartbeat section. If personality/HEARTBEAT.md has extra instructions beyond basic rules, a Claude CLI session is triggered to handle them. """ import json import logging import subprocess from datetime import datetime, date, timezone from pathlib import Path log = logging.getLogger(__name__) PROJECT_ROOT = Path(__file__).resolve().parent.parent STATE_FILE = PROJECT_ROOT / "memory" / "heartbeat-state.json" TOOLS_DIR = PROJECT_ROOT / "tools" HEARTBEAT_MD = PROJECT_ROOT / "personality" / "HEARTBEAT.md" # Defaults (overridable via config.json heartbeat section) DEFAULT_CHECKS = { "email": True, "calendar": True, "kb_index": True, "git": True, "embeddings": True, } DEFAULT_COOLDOWNS = { "email": 1800, # 30 min "calendar": 0, # every run "kb_index": 14400, # 4h "git": 14400, # 4h "embeddings": 3600, # 1h } DEFAULT_QUIET_HOURS = [23, 8] def run_heartbeat(config: dict | None = None) -> str: """Run all heartbeat checks. Returns summary string. Config is read from config["heartbeat"]. Python checks run first, then Claude CLI is called if HEARTBEAT.md has extra instructions. """ hb_config = (config or {}).get("heartbeat", {}) quiet_hours = tuple(hb_config.get("quiet_hours", DEFAULT_QUIET_HOURS)) check_flags = {**DEFAULT_CHECKS, **hb_config.get("checks", {})} cooldowns = {**DEFAULT_COOLDOWNS, **hb_config.get("cooldowns", {})} now = datetime.now(timezone.utc) hour = datetime.now().hour # local hour is_quiet = _is_quiet_hour(hour, quiet_hours) state = _load_state() checks = state.setdefault("checks", {}) results = [] critical = [] # Check 1: Email if check_flags.get("email") and _should_run("email", checks, now, cooldowns): email_result = _check_email(state) if email_result: results.append(email_result) checks["email"] = now.isoformat() # Check 2: Calendar — daily summary + next-event reminder (no quiet hours bypass) if check_flags.get("calendar") and not is_quiet and _should_run("calendar", checks, now, cooldowns): cal_result = _check_calendar_smart(state, quiet_hours) if cal_result: results.append(cal_result) checks["calendar"] = now.isoformat() # Check 3: KB index freshness + auto-reindex if check_flags.get("kb_index") and _should_run("kb_index", checks, now, cooldowns): kb_result = _check_kb_index() if kb_result: results.append(kb_result) checks["kb_index"] = now.isoformat() # Check 4: Git status if check_flags.get("git") and _should_run("git", checks, now, cooldowns): git_result = _check_git() if git_result: results.append(git_result) checks["git"] = now.isoformat() # Check 5: Incremental embeddings index if check_flags.get("embeddings") and _should_run("embeddings", checks, now, cooldowns): emb_result = _check_embeddings() if emb_result: results.append(emb_result) checks["embeddings"] = now.isoformat() # Claude CLI: run if HEARTBEAT.md has extra instructions if not is_quiet: claude_result = _run_claude_extra( hb_config, critical + results, is_quiet ) if claude_result: results.append(claude_result) # Update state state["last_run"] = now.isoformat() _save_state(state) # Critical items always get through (even quiet hours) if is_quiet: if critical: return " | ".join(critical) return "HEARTBEAT_OK" all_results = critical + results if not all_results: return "HEARTBEAT_OK" return " | ".join(all_results) def _should_run(check_name: str, checks: dict, now: datetime, cooldowns: dict | None = None) -> bool: """Check if enough time has passed since last run of this check.""" cd = cooldowns or DEFAULT_COOLDOWNS cooldown = cd.get(check_name, 0) if cooldown == 0: return True last_run_str = checks.get(check_name) if not last_run_str: return True try: last_run = datetime.fromisoformat(last_run_str) return (now - last_run).total_seconds() >= cooldown except (ValueError, TypeError): return True def _is_quiet_hour(hour: int, quiet_hours: tuple[int, int]) -> bool: """Check if current hour is in quiet range. Handles overnight (23-08).""" start, end = quiet_hours if start > end: # overnight return hour >= start or hour < end return start <= hour < end def _check_email(state: dict) -> str | None: """Check for new emails via tools/email_check.py. Parses JSON output.""" script = TOOLS_DIR / "email_check.py" if not script.exists(): return None try: result = subprocess.run( ["python3", str(script)], capture_output=True, text=True, timeout=30, cwd=str(PROJECT_ROOT) ) if result.returncode != 0: return None output = result.stdout.strip() if not output: return None data = json.loads(output) if not data.get("ok"): return None count = data.get("unread_count", 0) if count == 0: return None emails = data.get("emails", []) subjects = [e.get("subject", "?") for e in emails[:5]] subject_list = ", ".join(subjects) return f"Email: {count} necitite ({subject_list})" except json.JSONDecodeError: # Fallback: treat as plain text output = result.stdout.strip() if output and output != "0": return f"Email: {output}" return None except Exception as e: log.warning("Email check failed: %s", e) return None def _check_calendar_smart(state: dict, quiet_hours: tuple) -> str | None: """Smart calendar check: daily summary once, then only next-event reminders. - First run after quiet hours: full day summary (all events today) - Subsequent runs: only the nearest event within 45 min, deduplicated """ script = TOOLS_DIR / "calendar_check.py" if not script.exists(): return None today_str = date.today().isoformat() cal_state = state.setdefault("calendar", {}) sent_summary_date = cal_state.get("daily_summary_date") reminded_events = cal_state.get("reminded_events", {}) # Clean old reminded_events (from previous days) reminded_events = { k: v for k, v in reminded_events.items() if v == today_str } cal_state["reminded_events"] = reminded_events is_first_run_today = sent_summary_date != today_str if is_first_run_today: # Daily summary: all events for today return _calendar_daily_summary(cal_state, today_str) else: # Next-event reminder (within 45 min, deduplicated) return _calendar_next_reminder(cal_state, today_str, reminded_events) def _calendar_daily_summary(cal_state: dict, today_str: str) -> str | None: """Fetch all today's events and return a daily summary.""" script = TOOLS_DIR / "calendar_check.py" try: result = subprocess.run( ["python3", str(script), "today"], capture_output=True, text=True, timeout=30, cwd=str(PROJECT_ROOT) ) if result.returncode != 0: return None output = result.stdout.strip() if not output: return None data = json.loads(output) today_events = data.get("today", []) if not today_events: cal_state["daily_summary_date"] = today_str return None parts = [] for event in today_events: time = event.get("time", "") name = event.get("summary", "?") parts.append(f" {time} — {name}") cal_state["daily_summary_date"] = today_str return "📅 Program azi:\n" + "\n".join(parts) except Exception as e: log.warning("Calendar daily summary failed: %s", e) return None def _calendar_next_reminder(cal_state: dict, today_str: str, reminded_events: dict) -> str | None: """Remind only for the next upcoming event within 45 min, if not already reminded.""" script = TOOLS_DIR / "calendar_check.py" try: result = subprocess.run( ["python3", str(script), "soon", "1"], capture_output=True, text=True, timeout=30, cwd=str(PROJECT_ROOT) ) if result.returncode != 0: return None output = result.stdout.strip() if not output: return None data = json.loads(output) upcoming = data.get("upcoming", []) if not upcoming: return None event = upcoming[0] mins = event.get("minutes_until", 999) name = event.get("summary", "?") time = event.get("time", "") # Only remind if within 45 minutes if isinstance(mins, (int, float)) and mins > 45: return None # Dedup key: event name + time dedup_key = f"{name}@{time}" if dedup_key in reminded_events: return None # Mark as reminded reminded_events[dedup_key] = today_str cal_state["reminded_events"] = reminded_events return f"⏰ in {mins} min — {name} ({time})" except Exception as e: log.warning("Calendar next reminder failed: %s", e) return None def _check_kb_index() -> str | None: """Check if .md files in memory/kb/ are newer than index.json. Auto-reindex.""" index_file = PROJECT_ROOT / "memory" / "kb" / "index.json" if not index_file.exists(): _run_reindex() log.info("KB: index regenerat") return None index_mtime = index_file.stat().st_mtime kb_dir = PROJECT_ROOT / "memory" / "kb" newer = 0 for md in kb_dir.rglob("*.md"): if md.stat().st_mtime > index_mtime: newer += 1 if newer > 0: _run_reindex() log.info("KB: %d fișiere reindexate", newer) return None def _run_reindex() -> None: """Run tools/update_notes_index.py to regenerate KB index.""" script = TOOLS_DIR / "update_notes_index.py" if not script.exists(): log.warning("KB reindex script not found: %s", script) return try: subprocess.run( ["python3", str(script)], capture_output=True, text=True, timeout=60, cwd=str(PROJECT_ROOT) ) except Exception as e: log.warning("KB reindex failed: %s", e) def _check_embeddings() -> str | None: """Incremental re-index of memory/ embeddings for semantic search.""" try: from src.memory_search import incremental_index result = incremental_index() indexed = result.get("indexed", 0) if indexed > 0: chunks = result.get("chunks", 0) log.info("Embeddings: %d fisiere reindexate (%d chunks)", indexed, chunks) return None except ConnectionError: log.warning("Embeddings check skipped: Ollama unreachable") return None except Exception as e: log.warning("Embeddings check failed: %s", e) return None def _check_git() -> str | None: """Check for uncommitted files in project.""" try: result = subprocess.run( ["git", "status", "--porcelain"], capture_output=True, text=True, timeout=10, cwd=str(PROJECT_ROOT) ) if result.returncode == 0: lines = [l for l in result.stdout.strip().split("\n") if l.strip()] if lines: return f"Git: {len(lines)} uncommitted" return None except Exception: return None def _get_extra_instructions() -> str | None: """Read HEARTBEAT.md and return extra instructions if any. Skips the basic structure (title, quiet hours rules). Returns None if only boilerplate remains. """ if not HEARTBEAT_MD.exists(): return None content = HEARTBEAT_MD.read_text(encoding="utf-8").strip() if not content: return None # Strip lines that are just headers, empty, or the standard rules meaningful = [] for line in content.split("\n"): stripped = line.strip() if not stripped: continue if stripped.startswith("# HEARTBEAT"): continue if stripped.startswith("## Reguli"): continue if "HEARTBEAT_OK" in stripped: continue if "quiet" in stripped.lower() or "noapte" in stripped.lower(): continue if "nu spama" in stripped.lower() or "nu deranja" in stripped.lower(): continue meaningful.append(line) if not meaningful: return None return "\n".join(meaningful).strip() def _run_claude_extra(hb_config: dict, python_results: list[str], is_quiet: bool) -> str | None: """Run Claude CLI if HEARTBEAT.md has extra instructions.""" from src.claude_session import CLAUDE_BIN, _safe_env extra = _get_extra_instructions() if not extra: return None model = hb_config.get("model", "haiku") # Build prompt with context context_parts = ["Heartbeat tick."] if python_results: context_parts.append( f"Check-uri Python: {' | '.join(python_results)}" ) else: context_parts.append("Check-urile Python nu au găsit nimic.") context_parts.append(f"Instrucțiuni extra din HEARTBEAT.md:\n{extra}") context_parts.append( "Execută instrucțiunile de mai sus. " "Răspunde DOAR cu rezultatul (scurt, fără explicații). " "Dacă nu e nimic de raportat, răspunde cu HEARTBEAT_OK." ) prompt = "\n\n".join(context_parts) cmd = [ CLAUDE_BIN, "-p", prompt, "--model", model, "--output-format", "json", ] try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=120, env=_safe_env(), cwd=str(PROJECT_ROOT), ) if proc.returncode != 0: log.warning("Claude heartbeat extra failed (exit %d): %s", proc.returncode, proc.stderr[:200]) return None data = json.loads(proc.stdout) result = data.get("result", "").strip() if not result or result == "HEARTBEAT_OK": return None return result except subprocess.TimeoutExpired: log.warning("Claude heartbeat extra timed out") return None except Exception as e: log.warning("Claude heartbeat extra error: %s", e) return None def _load_state() -> dict: """Load heartbeat state from JSON file.""" if STATE_FILE.exists(): try: return json.loads(STATE_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): pass return {"last_run": None, "checks": {}} def _save_state(state: dict) -> None: """Save heartbeat state to JSON file.""" STATE_FILE.parent.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text( json.dumps(state, indent=2, ensure_ascii=False) + "\n", encoding="utf-8" )