Files
echo-core/cli.py
2026-02-17 09:30:35 +00:00

909 lines
30 KiB
Python
Executable File

#!/home/moltbot/echo-core/.venv/bin/python3
"""Echo Core CLI tool."""
import argparse
import getpass
import json
import os
import shutil
import signal
import sys
from datetime import datetime, timezone
from pathlib import Path
# Add project root to path
PROJECT_ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(PROJECT_ROOT))
from src.credential_store import set_secret, get_secret, list_secrets, delete_secret, check_secrets
PID_FILE = PROJECT_ROOT / "echo-core.pid"
LOG_FILE = PROJECT_ROOT / "logs" / "echo-core.log"
SESSIONS_FILE = PROJECT_ROOT / "sessions" / "active.json"
CONFIG_FILE = PROJECT_ROOT / "config.json"
# ---------------------------------------------------------------------------
# Subcommand handlers
# ---------------------------------------------------------------------------
SERVICE_NAME = "echo-core.service"
BRIDGE_SERVICE_NAME = "echo-whatsapp-bridge.service"
def _systemctl(*cmd_args) -> tuple[int, str]:
"""Run systemctl --user and return (returncode, stdout)."""
import subprocess
result = subprocess.run(
["systemctl", "--user", *cmd_args],
capture_output=True, text=True, timeout=30,
)
return result.returncode, result.stdout.strip()
def _get_service_status(service: str) -> dict:
"""Get service ActiveState, SubState, MainPID, and ActiveEnterTimestamp."""
import subprocess
result = subprocess.run(
["systemctl", "--user", "show", service,
"--property=ActiveState,SubState,MainPID,ActiveEnterTimestamp"],
capture_output=True, text=True, timeout=30,
)
info = {}
for line in result.stdout.strip().splitlines():
if "=" in line:
k, v = line.split("=", 1)
info[k] = v
return info
def cmd_status(args):
"""Show bot status: online/offline, uptime, active sessions."""
# Echo Core service
info = _get_service_status(SERVICE_NAME)
active = info.get("ActiveState", "unknown")
pid = info.get("MainPID", "0")
ts = info.get("ActiveEnterTimestamp", "")
if active == "active":
# Parse uptime from ActiveEnterTimestamp
uptime_str = ""
if ts:
try:
started = datetime.strptime(ts.strip(), "%a %Y-%m-%d %H:%M:%S %Z")
started = started.replace(tzinfo=timezone.utc)
uptime = datetime.now(timezone.utc) - started
hours, remainder = divmod(int(uptime.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
uptime_str = f"{hours}h {minutes}m {seconds}s"
except (ValueError, OSError):
uptime_str = "?"
print(f"Echo Core: ONLINE (PID {pid})")
if uptime_str:
print(f"Uptime: {uptime_str}")
else:
print(f"Echo Core: OFFLINE ({active})")
# WhatsApp bridge service
bridge_info = _get_service_status(BRIDGE_SERVICE_NAME)
bridge_active = bridge_info.get("ActiveState", "unknown")
bridge_pid = bridge_info.get("MainPID", "0")
if bridge_active == "active":
print(f"WA Bridge: ONLINE (PID {bridge_pid})")
else:
print(f"WA Bridge: OFFLINE ({bridge_active})")
_print_session_count()
def _print_session_count():
"""Print the number of active sessions."""
sessions = _load_sessions_file()
count = len(sessions)
print(f"Sessions: {count} active")
def _load_sessions_file() -> dict:
"""Load sessions/active.json, return {} on any error."""
try:
text = SESSIONS_FILE.read_text(encoding="utf-8")
if not text.strip():
return {}
return json.loads(text)
except (FileNotFoundError, json.JSONDecodeError, OSError):
return {}
def cmd_doctor(args):
"""Run diagnostic checks."""
import re
import subprocess
checks = []
# 1. Discord token present
token = get_secret("discord_token")
checks.append(("Discord token in keyring", bool(token)))
# 2. Keyring working
try:
import keyring
keyring.get_password("echo-core", "_registry")
checks.append(("Keyring accessible", True))
except Exception:
checks.append(("Keyring accessible", False))
# 3. Claude CLI found and functional
claude_found = shutil.which("claude") is not None
checks.append(("Claude CLI found", claude_found))
if claude_found:
try:
result = subprocess.run(
["claude", "--version"], capture_output=True, text=True, timeout=10,
)
checks.append(("Claude CLI functional", result.returncode == 0))
except Exception:
checks.append(("Claude CLI functional", False))
# 4. Disk space (warn if <1GB free)
try:
stat = os.statvfs(str(PROJECT_ROOT))
free_gb = (stat.f_bavail * stat.f_frsize) / (1024 ** 3)
checks.append((f"Disk space ({free_gb:.1f} GB free)", free_gb >= 1.0))
except OSError:
checks.append(("Disk space", False))
# 5. config.json valid + no tokens/secrets in plain text
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
config_text = f.read()
json.loads(config_text)
checks.append(("config.json valid", True))
# Scan for token-like patterns
token_patterns = re.compile(
r'(sk-[a-zA-Z0-9]{20,}|xoxb-|xoxp-|ghp_|gho_|discord.*token.*["\']:\s*["\'][A-Za-z0-9._-]{20,})',
re.IGNORECASE,
)
has_tokens = bool(token_patterns.search(config_text))
checks.append(("config.json no plain text secrets", not has_tokens))
except (FileNotFoundError, json.JSONDecodeError, OSError):
checks.append(("config.json valid", False))
# 6. Logs dir writable
logs_dir = PROJECT_ROOT / "logs"
try:
logs_dir.mkdir(parents=True, exist_ok=True)
test_file = logs_dir / ".write_test"
test_file.write_text("ok")
test_file.unlink()
checks.append(("Logs dir writable", True))
except OSError:
checks.append(("Logs dir writable", False))
# 7. .gitignore correct (must contain key entries)
gitignore = PROJECT_ROOT / ".gitignore"
required_gitignore = {"sessions/", "logs/", ".env", "*.sqlite"}
try:
gi_text = gitignore.read_text(encoding="utf-8")
gi_lines = {l.strip() for l in gi_text.splitlines()}
missing = required_gitignore - gi_lines
checks.append((".gitignore complete", len(missing) == 0))
if missing:
print(f" (missing from .gitignore: {', '.join(sorted(missing))})")
except FileNotFoundError:
checks.append((".gitignore exists", False))
# 8. File permissions: sessions/ and config.json not world-readable
for sensitive in [PROJECT_ROOT / "sessions", CONFIG_FILE]:
if sensitive.exists():
mode = sensitive.stat().st_mode
world_read = mode & 0o004
checks.append((f"{sensitive.name} not world-readable", not world_read))
# 9. Ollama reachable
try:
import urllib.request
req = urllib.request.urlopen("http://10.0.20.161:11434/api/tags", timeout=5)
checks.append(("Ollama reachable", req.status == 200))
except Exception:
checks.append(("Ollama reachable", False))
# 10. Telegram token (optional)
tg_token = get_secret("telegram_token")
if tg_token:
checks.append(("Telegram token in keyring", True))
else:
checks.append(("Telegram token (optional)", True)) # not required
# 11. Echo Core service running
info = _get_service_status(SERVICE_NAME)
checks.append(("Echo Core service running", info.get("ActiveState") == "active"))
# 12. WhatsApp bridge service running (optional)
bridge_info = _get_service_status(BRIDGE_SERVICE_NAME)
bridge_active = bridge_info.get("ActiveState") == "active"
if bridge_active:
checks.append(("WhatsApp bridge running", True))
else:
checks.append(("WhatsApp bridge (optional)", True))
# Print results
all_pass = True
for label, passed in checks:
status = "PASS" if passed else "FAIL"
print(f" [{status}] {label}")
if not passed:
all_pass = False
print()
if all_pass:
print("All checks passed.")
else:
print("Some checks failed!")
sys.exit(1)
def cmd_restart(args):
"""Restart the bot via systemctl (kill + start)."""
import time
# Also restart bridge if requested
if getattr(args, "bridge", False):
print("Restarting WhatsApp bridge...")
_systemctl("kill", BRIDGE_SERVICE_NAME)
time.sleep(2)
_systemctl("start", BRIDGE_SERVICE_NAME)
print("Restarting Echo Core...")
_systemctl("restart", SERVICE_NAME)
time.sleep(3)
info = _get_service_status(SERVICE_NAME)
if info.get("ActiveState") == "active":
print(f"Echo Core restarted (PID {info.get('MainPID')}).")
elif info.get("ActiveState") == "activating":
print("Echo Core starting...")
else:
print(f"Warning: Echo Core status is {info.get('ActiveState')}")
sys.exit(1)
def cmd_stop(args):
"""Stop the bot via systemctl."""
print("Stopping Echo Core...")
_systemctl("stop", "--no-block", SERVICE_NAME)
import time
time.sleep(2)
info = _get_service_status(SERVICE_NAME)
if info.get("ActiveState") in ("inactive", "deactivating"):
print("Echo Core stopped.")
else:
print(f"Echo Core status: {info.get('ActiveState')}")
def cmd_logs(args):
"""Show last N lines from the log file."""
n = args.lines
if not LOG_FILE.exists():
print(f"No log file found at {LOG_FILE}")
return
try:
lines = LOG_FILE.read_text(encoding="utf-8", errors="replace").splitlines()
except OSError as e:
print(f"Error reading log file: {e}")
sys.exit(1)
tail = lines[-n:]
for line in tail:
print(line)
def cmd_sessions(args):
"""Handle sessions subcommand."""
if args.sessions_action == "list":
_sessions_list()
elif args.sessions_action == "clear":
_sessions_clear(args.channel)
def _sessions_list():
"""List active sessions in tabular format."""
sessions = _load_sessions_file()
if not sessions:
print("No active sessions.")
return
# Header
print(f"{'Channel':<22} {'Model':<8} {'Messages':>8} {'Last message'}")
print(f"{'-'*22} {'-'*8} {'-'*8} {'-'*20}")
for channel_id, info in sessions.items():
model = info.get("model", "?")
count = info.get("message_count", 0)
last = info.get("last_message_at", "?")
# Truncate ISO timestamp to readable form
if last != "?" and len(last) > 19:
last = last[:19].replace("T", " ")
print(f"{channel_id:<22} {model:<8} {count:>8} {last}")
def _sessions_clear(channel: str | None):
"""Clear one or all sessions."""
from src.claude_session import clear_session, list_sessions as ls_sessions
if channel:
if clear_session(channel):
print(f"Session for '{channel}' cleared.")
else:
print(f"No session found for '{channel}'.")
else:
sessions = ls_sessions()
if not sessions:
print("No active sessions.")
return
count = len(sessions)
for ch in list(sessions.keys()):
clear_session(ch)
print(f"Cleared {count} session(s).")
def cmd_channel(args):
"""Handle channel subcommand."""
if args.channel_action == "add":
_channel_add(args.id, args.alias)
elif args.channel_action == "list":
_channel_list()
def _channel_add(channel_id: str, alias: str):
"""Add a channel to config.json."""
from src.config import Config
cfg = Config()
channels = cfg.get("channels", {})
if alias in channels:
print(f"Error: alias '{alias}' already exists")
sys.exit(1)
channels[alias] = {"id": channel_id}
cfg.set("channels", channels)
cfg.save()
print(f"Channel '{alias}' (ID: {channel_id}) added.")
def _channel_list():
"""List registered channels."""
from src.config import Config
cfg = Config()
channels = cfg.get("channels", {})
if not channels:
print("No channels registered.")
return
print(f"{'Alias':<20} {'Channel ID'}")
print(f"{'-'*20} {'-'*22}")
for alias, info in channels.items():
ch_id = info.get("id", "?") if isinstance(info, dict) else str(info)
print(f"{alias:<20} {ch_id}")
def cmd_send(args):
"""Send a message through the router."""
from src.config import Config
from src.router import route_message
cfg = Config()
channels = cfg.get("channels", {})
channel_info = channels.get(args.alias)
if not channel_info:
print(f"Error: unknown channel alias '{args.alias}'")
print(f"Available: {', '.join(channels.keys()) if channels else '(none)'}")
sys.exit(1)
channel_id = channel_info.get("id") if isinstance(channel_info, dict) else str(channel_info)
message = " ".join(args.message)
print(f"Sending to '{args.alias}' ({channel_id})...")
response, is_cmd = route_message(channel_id, "cli-user", message)
print(response)
def cmd_cron(args):
"""Handle cron subcommand."""
if args.cron_action == "list":
_cron_list()
elif args.cron_action == "run":
_cron_run(args.name)
elif args.cron_action == "add":
tools = [t.strip() for t in args.tools.split(",")] if args.tools else []
if args.prompt == "-":
prompt = sys.stdin.read().strip()
else:
prompt = args.prompt
_cron_add(args.name, args.expression, args.channel, prompt,
args.model, tools)
elif args.cron_action == "remove":
_cron_remove(args.name)
elif args.cron_action == "enable":
_cron_enable(args.name)
elif args.cron_action == "disable":
_cron_disable(args.name)
def _cron_list():
"""List scheduled jobs in tabular format."""
from src.scheduler import Scheduler
s = Scheduler()
s._jobs = s._load_jobs()
jobs = s.list_jobs()
if not jobs:
print("No scheduled jobs.")
return
print(f"{'Name':<24} {'Cron':<16} {'Channel':<10} {'Model':<8} {'Enabled':<8} {'Last Status':<12} {'Next Run'}")
print(f"{'-'*24} {'-'*16} {'-'*10} {'-'*8} {'-'*8} {'-'*12} {'-'*20}")
for job in jobs:
name = job.get("name", "?")
cron = job.get("cron", "?")
channel = job.get("channel", "?")
model = job.get("model", "?")
enabled = "yes" if job.get("enabled") else "no"
last_status = job.get("last_status") or "-"
next_run = job.get("next_run") or "-"
if next_run != "-" and len(next_run) > 19:
next_run = next_run[:19].replace("T", " ")
print(f"{name:<24} {cron:<16} {channel:<10} {model:<8} {enabled:<8} {last_status:<12} {next_run}")
def _cron_run(name: str):
"""Force-run a job and print output."""
import asyncio
from src.scheduler import Scheduler
async def _run():
s = Scheduler()
s._jobs = s._load_jobs()
return await s.run_job(name)
try:
result = asyncio.run(_run())
print(result)
except KeyError as exc:
print(f"Error: {exc}")
sys.exit(1)
def _cron_add(name: str, cron: str, channel: str, prompt: str,
model: str, tools: list[str]):
"""Add a new cron job."""
from src.scheduler import Scheduler
s = Scheduler()
s._jobs = s._load_jobs()
try:
job = s.add_job(name, cron, channel, prompt, model, tools or None)
print(f"Job '{job['name']}' added (cron: {job['cron']}, channel: {job['channel']}, model: {job['model']})")
except ValueError as exc:
print(f"Error: {exc}")
sys.exit(1)
def _cron_remove(name: str):
"""Remove a cron job."""
from src.scheduler import Scheduler
s = Scheduler()
s._jobs = s._load_jobs()
if s.remove_job(name):
print(f"Job '{name}' removed.")
else:
print(f"Job '{name}' not found.")
def _cron_enable(name: str):
"""Enable a cron job."""
from src.scheduler import Scheduler
s = Scheduler()
s._jobs = s._load_jobs()
if s.enable_job(name):
print(f"Job '{name}' enabled.")
else:
print(f"Job '{name}' not found.")
def _cron_disable(name: str):
"""Disable a cron job."""
from src.scheduler import Scheduler
s = Scheduler()
s._jobs = s._load_jobs()
if s.disable_job(name):
print(f"Job '{name}' disabled.")
else:
print(f"Job '{name}' not found.")
def cmd_memory(args):
"""Handle memory subcommand."""
if args.memory_action == "search":
_memory_search(args.query)
elif args.memory_action == "reindex":
_memory_reindex()
def _memory_search(query: str):
"""Search memory and print results."""
from src.memory_search import search
try:
results = search(query)
except ConnectionError as e:
print(f"Error: {e}")
sys.exit(1)
if not results:
print("No results found (index may be empty — run `echo memory reindex`).")
return
for i, r in enumerate(results, 1):
score = r["score"]
print(f"\n--- Result {i} (score: {score:.3f}) ---")
print(f"File: {r['file']}")
preview = r["chunk"][:200]
if len(r["chunk"]) > 200:
preview += "..."
print(preview)
def _memory_reindex():
"""Rebuild memory search index."""
from src.memory_search import reindex
print("Reindexing memory files...")
try:
stats = reindex()
except ConnectionError as e:
print(f"Error: {e}")
sys.exit(1)
print(f"Done. Indexed {stats['files']} files, {stats['chunks']} chunks.")
def cmd_heartbeat(args):
"""Run heartbeat health checks."""
from src.heartbeat import run_heartbeat
print(run_heartbeat())
def cmd_whatsapp(args):
"""Handle whatsapp subcommand."""
if args.whatsapp_action == "status":
_whatsapp_status()
elif args.whatsapp_action == "qr":
_whatsapp_qr()
def cmd_openrouter(args):
"""Handle openrouter subcommand."""
semaphore = PROJECT_ROOT / ".use_openrouter"
if args.openrouter_action == "on":
env_file = Path.home() / ".claude-env.sh"
if not env_file.exists():
print(f"Error: {env_file} not found")
sys.exit(1)
# Verify required vars exist in file
text = env_file.read_text()
if "ANTHROPIC_BASE_URL" not in text or "OPENROUTER_API_KEY" not in text:
print(f"Warning: {env_file} may be missing ANTHROPIC_BASE_URL or OPENROUTER_API_KEY")
semaphore.write_text("# OpenRouter mode enabled\n")
print("OpenRouter mode: ENABLED")
print("Restart echo-core for changes to take effect:")
print(" systemctl --user restart echo-core")
elif args.openrouter_action == "off":
if semaphore.exists():
semaphore.unlink()
print("OpenRouter mode: DISABLED")
print("Restart echo-core to use Anthropic API:")
print(" systemctl --user restart echo-core")
else:
print("OpenRouter mode: already disabled")
elif args.openrouter_action == "status":
status = "ENABLED" if semaphore.exists() else "DISABLED"
print(f"OpenRouter mode: {status}")
if semaphore.exists():
print(f"Semafor: {semaphore}")
env_file = Path.home() / ".claude-env.sh"
if env_file.exists():
# Show which vars will be loaded
print(f"Env file: {env_file}")
text = env_file.read_text()
for line in text.splitlines():
if line.strip().startswith("export ") and not line.strip().startswith("#"):
var_name = line.strip()[7:].split("=")[0] if "=" in line else "?"
if any(x in var_name for x in ["ANTHROPIC", "OPENROUTER"]):
print(f" {var_name}=***")
def _whatsapp_status():
"""Check WhatsApp bridge connection status."""
import urllib.request
import urllib.error
cfg_file = CONFIG_FILE
bridge_url = "http://127.0.0.1:8098"
try:
text = cfg_file.read_text(encoding="utf-8")
cfg = json.loads(text)
bridge_url = cfg.get("whatsapp", {}).get("bridge_url", bridge_url)
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
try:
req = urllib.request.urlopen(f"{bridge_url}/status", timeout=5)
data = json.loads(req.read().decode())
except (urllib.error.URLError, OSError) as e:
print(f"Bridge not reachable at {bridge_url}")
print(f" Error: {e}")
return
connected = data.get("connected", False)
phone = data.get("phone", "unknown")
has_qr = data.get("qr", False)
if connected:
print(f"Status: CONNECTED")
print(f"Phone: {phone}")
elif has_qr:
print(f"Status: WAITING FOR QR SCAN")
print(f"Run 'echo whatsapp qr' for QR code instructions.")
else:
print(f"Status: DISCONNECTED")
print(f"Start the bridge and scan the QR code to connect.")
def _whatsapp_qr():
"""Show QR code instructions from the bridge."""
import urllib.request
import urllib.error
cfg_file = CONFIG_FILE
bridge_url = "http://127.0.0.1:8098"
try:
text = cfg_file.read_text(encoding="utf-8")
cfg = json.loads(text)
bridge_url = cfg.get("whatsapp", {}).get("bridge_url", bridge_url)
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
try:
req = urllib.request.urlopen(f"{bridge_url}/qr", timeout=5)
data = json.loads(req.read().decode())
except (urllib.error.URLError, OSError) as e:
print(f"Bridge not reachable at {bridge_url}")
print(f" Error: {e}")
return
qr = data.get("qr")
if not qr:
if data.get("connected"):
print("Already connected — no QR code needed.")
else:
print("No QR code available yet. Wait for the bridge to initialize.")
return
print("QR code is available at the bridge.")
print(f"Open {bridge_url}/qr in a browser to scan,")
print("or check the bridge terminal output for the QR code.")
def cmd_secrets(args):
"""Handle secrets subcommand."""
if args.secrets_action == "set":
if args.file:
path = Path(args.file)
if not path.exists():
print(f"Error: file {args.file} not found")
sys.exit(1)
value = path.read_text().strip()
set_secret(args.name, value)
path.unlink() # Delete source file after storing
print(f"Secret '{args.name}' set from file (file deleted)")
else:
value = getpass.getpass(f"Enter value for '{args.name}': ")
set_secret(args.name, value)
print(f"Secret '{args.name}' set")
elif args.secrets_action == "list":
names = list_secrets()
if not names:
print("No secrets stored")
else:
for name in names:
print(f" - {name}")
elif args.secrets_action == "delete":
if delete_secret(args.name):
print(f"Secret '{args.name}' deleted")
else:
print(f"Secret '{args.name}' not found")
elif args.secrets_action == "test":
results = check_secrets()
for name, exists in results.items():
print(f" {name}: {'OK' if exists else 'MISSING'}")
if all(results.values()):
print("\nAll required secrets present.")
else:
print("\nWARNING: Some required secrets are missing!")
sys.exit(1)
# ---------------------------------------------------------------------------
# Argument parser
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(prog="echo", description="Echo Core CLI")
sub = parser.add_subparsers(dest="command")
# status
sub.add_parser("status", help="Show bot status")
# doctor
sub.add_parser("doctor", help="Run diagnostic checks")
# restart
restart_parser = sub.add_parser("restart", help="Restart the bot via systemctl")
restart_parser.add_argument("--bridge", action="store_true",
help="Also restart WhatsApp bridge")
# stop
sub.add_parser("stop", help="Stop the bot via systemctl")
# logs
logs_parser = sub.add_parser("logs", help="Show recent log lines")
logs_parser.add_argument("lines", nargs="?", type=int, default=20,
help="Number of lines (default: 20)")
# sessions
sessions_parser = sub.add_parser("sessions", help="Manage sessions")
sessions_sub = sessions_parser.add_subparsers(dest="sessions_action")
sessions_sub.add_parser("list", help="List active sessions")
sessions_clear_p = sessions_sub.add_parser("clear", help="Clear session(s)")
sessions_clear_p.add_argument("channel", nargs="?", default=None,
help="Channel ID to clear (omit for all)")
# channel
channel_parser = sub.add_parser("channel", help="Manage channels")
channel_sub = channel_parser.add_subparsers(dest="channel_action")
channel_add_p = channel_sub.add_parser("add", help="Add a channel")
channel_add_p.add_argument("--id", required=True, help="Discord channel ID")
channel_add_p.add_argument("--alias", required=True, help="Channel alias")
channel_sub.add_parser("list", help="List registered channels")
# send
send_parser = sub.add_parser("send", help="Send a message via router")
send_parser.add_argument("alias", help="Channel alias")
send_parser.add_argument("message", nargs="+", help="Message text")
# secrets
secrets_parser = sub.add_parser("secrets", help="Manage secrets")
secrets_sub = secrets_parser.add_subparsers(dest="secrets_action")
set_p = secrets_sub.add_parser("set", help="Set a secret")
set_p.add_argument("name", help="Secret name")
set_p.add_argument("--file", help="Read value from file (file deleted after)")
secrets_sub.add_parser("list", help="List secret names")
del_p = secrets_sub.add_parser("delete", help="Delete a secret")
del_p.add_argument("name", help="Secret name")
secrets_sub.add_parser("test", help="Check required secrets")
# memory
memory_parser = sub.add_parser("memory", help="Memory search commands")
memory_sub = memory_parser.add_subparsers(dest="memory_action")
memory_search_p = memory_sub.add_parser("search", help="Search memory files")
memory_search_p.add_argument("query", help="Search query text")
memory_sub.add_parser("reindex", help="Rebuild memory search index")
# heartbeat
sub.add_parser("heartbeat", help="Run heartbeat health checks")
# cron
cron_parser = sub.add_parser("cron", help="Manage scheduled jobs")
cron_sub = cron_parser.add_subparsers(dest="cron_action")
cron_sub.add_parser("list", help="List scheduled jobs")
cron_run_p = cron_sub.add_parser("run", help="Force-run a job")
cron_run_p.add_argument("name", help="Job name")
cron_add_p = cron_sub.add_parser("add", help="Add a scheduled job")
cron_add_p.add_argument("name", help="Job name")
cron_add_p.add_argument("expression", help="Cron expression (e.g. '30 6 * * *')")
cron_add_p.add_argument("--channel", required=True, help="Channel alias")
cron_add_p.add_argument("--prompt", required=True, help="Prompt text (use '-' for stdin)")
cron_add_p.add_argument("--model", default="sonnet", help="Model (default: sonnet)")
cron_add_p.add_argument("--tools", default=None, help="Comma-separated allowed tools")
cron_remove_p = cron_sub.add_parser("remove", help="Remove a job")
cron_remove_p.add_argument("name", help="Job name")
cron_enable_p = cron_sub.add_parser("enable", help="Enable a job")
cron_enable_p.add_argument("name", help="Job name")
cron_disable_p = cron_sub.add_parser("disable", help="Disable a job")
cron_disable_p.add_argument("name", help="Job name")
# whatsapp
whatsapp_parser = sub.add_parser("whatsapp", help="WhatsApp bridge commands")
whatsapp_sub = whatsapp_parser.add_subparsers(dest="whatsapp_action")
whatsapp_sub.add_parser("status", help="Check bridge connection status")
whatsapp_sub.add_parser("qr", help="Show QR code instructions")
# openrouter
openrouter_parser = sub.add_parser("openrouter", help="Toggle OpenRouter mode")
openrouter_sub = openrouter_parser.add_subparsers(dest="openrouter_action")
openrouter_sub.add_parser("on", help="Enable OpenRouter mode")
openrouter_sub.add_parser("off", help="Disable OpenRouter mode")
openrouter_sub.add_parser("status", help="Check OpenRouter status")
# Parse and dispatch
args = parser.parse_args()
if args.command is None:
parser.print_help()
sys.exit(0)
dispatch = {
"status": cmd_status,
"doctor": cmd_doctor,
"restart": cmd_restart,
"stop": cmd_stop,
"logs": cmd_logs,
"sessions": lambda a: (
cmd_sessions(a) if a.sessions_action else (sessions_parser.print_help() or sys.exit(0))
),
"channel": lambda a: (
cmd_channel(a) if a.channel_action else (channel_parser.print_help() or sys.exit(0))
),
"send": cmd_send,
"memory": lambda a: (
cmd_memory(a) if a.memory_action else (memory_parser.print_help() or sys.exit(0))
),
"heartbeat": cmd_heartbeat,
"cron": lambda a: (
cmd_cron(a) if a.cron_action else (cron_parser.print_help() or sys.exit(0))
),
"secrets": lambda a: (
cmd_secrets(a) if a.secrets_action else (secrets_parser.print_help() or sys.exit(0))
),
"whatsapp": lambda a: (
cmd_whatsapp(a) if a.whatsapp_action else (whatsapp_parser.print_help() or sys.exit(0))
),
"openrouter": lambda a: (
cmd_openrouter(a) if a.openrouter_action else (openrouter_parser.print_help() or sys.exit(0))
),
}
handler = dispatch.get(args.command)
if handler:
handler(args)
else:
parser.print_help()
if __name__ == "__main__":
main()