stage-7: CLI tool with full subcommands

echo status/doctor/restart/logs/sessions/channel/send commands, symlink at ~/.local/bin/echo. QA fix: discord chat handler tuple unpacking bug. 32 new tests (193 total).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MoltBot Service
2026-02-13 13:29:39 +00:00
parent 5bdceff732
commit 09d3de003a
5 changed files with 716 additions and 11 deletions

353
cli.py
View File

@@ -3,14 +3,294 @@
import argparse
import getpass
import json
import os
import shutil
import signal
import sys
from datetime import datetime, timezone
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).resolve().parent))
PROJECT_ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(PROJECT_ROOT))
from src.secrets import set_secret, get_secret, list_secrets, delete_secret, check_secrets
PID_FILE = PROJECT_ROOT / "echo-core.pid"
LOG_FILE = PROJECT_ROOT / "logs" / "echo-core.log"
SESSIONS_FILE = PROJECT_ROOT / "sessions" / "active.json"
CONFIG_FILE = PROJECT_ROOT / "config.json"
# ---------------------------------------------------------------------------
# Subcommand handlers
# ---------------------------------------------------------------------------
def cmd_status(args):
"""Show bot status: online/offline, uptime, active sessions."""
# Check PID file
if not PID_FILE.exists():
print("Status: OFFLINE (no PID file)")
_print_session_count()
return
try:
pid = int(PID_FILE.read_text().strip())
except (ValueError, OSError):
print("Status: OFFLINE (invalid PID file)")
_print_session_count()
return
# Check if process is alive
try:
os.kill(pid, 0)
except OSError:
print(f"Status: OFFLINE (PID {pid} not running)")
_print_session_count()
return
# Process alive — calculate uptime from PID file mtime
mtime = PID_FILE.stat().st_mtime
started = datetime.fromtimestamp(mtime, tz=timezone.utc)
uptime = datetime.now(timezone.utc) - started
hours, remainder = divmod(int(uptime.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
print(f"Status: ONLINE (PID {pid})")
print(f"Uptime: {hours}h {minutes}m {seconds}s")
_print_session_count()
def _print_session_count():
"""Print the number of active sessions."""
sessions = _load_sessions_file()
count = len(sessions)
print(f"Sessions: {count} active")
def _load_sessions_file() -> dict:
"""Load sessions/active.json, return {} on any error."""
try:
text = SESSIONS_FILE.read_text(encoding="utf-8")
if not text.strip():
return {}
return json.loads(text)
except (FileNotFoundError, json.JSONDecodeError, OSError):
return {}
def cmd_doctor(args):
"""Run diagnostic checks."""
checks = []
# 1. Discord token present
token = get_secret("discord_token")
checks.append(("Discord token", bool(token)))
# 2. Keyring working
try:
import keyring
keyring.get_password("echo-core", "_registry")
checks.append(("Keyring accessible", True))
except Exception:
checks.append(("Keyring accessible", False))
# 3. Claude CLI found
claude_found = shutil.which("claude") is not None
checks.append(("Claude CLI found", claude_found))
# 4. Disk space (warn if <1GB free)
try:
stat = os.statvfs(str(PROJECT_ROOT))
free_gb = (stat.f_bavail * stat.f_frsize) / (1024 ** 3)
checks.append((f"Disk space ({free_gb:.1f} GB free)", free_gb >= 1.0))
except OSError:
checks.append(("Disk space", False))
# 5. config.json valid
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
json.load(f)
checks.append(("config.json valid", True))
except (FileNotFoundError, json.JSONDecodeError, OSError):
checks.append(("config.json valid", False))
# 6. Logs dir writable
logs_dir = PROJECT_ROOT / "logs"
try:
logs_dir.mkdir(parents=True, exist_ok=True)
test_file = logs_dir / ".write_test"
test_file.write_text("ok")
test_file.unlink()
checks.append(("Logs dir writable", True))
except OSError:
checks.append(("Logs dir writable", False))
# Print results
all_pass = True
for label, passed in checks:
status = "PASS" if passed else "FAIL"
print(f" [{status}] {label}")
if not passed:
all_pass = False
print()
if all_pass:
print("All checks passed.")
else:
print("Some checks failed!")
sys.exit(1)
def cmd_restart(args):
"""Restart the bot by sending SIGTERM to the running process."""
if not PID_FILE.exists():
print("Error: no PID file found (bot not running?)")
sys.exit(1)
try:
pid = int(PID_FILE.read_text().strip())
except (ValueError, OSError):
print("Error: invalid PID file")
sys.exit(1)
# Check process alive
try:
os.kill(pid, 0)
except OSError:
print(f"Error: process {pid} is not running")
sys.exit(1)
os.kill(pid, signal.SIGTERM)
print(f"Sent SIGTERM to PID {pid}")
def cmd_logs(args):
"""Show last N lines from the log file."""
n = args.lines
if not LOG_FILE.exists():
print(f"No log file found at {LOG_FILE}")
return
try:
lines = LOG_FILE.read_text(encoding="utf-8", errors="replace").splitlines()
except OSError as e:
print(f"Error reading log file: {e}")
sys.exit(1)
tail = lines[-n:]
for line in tail:
print(line)
def cmd_sessions(args):
"""Handle sessions subcommand."""
if args.sessions_action == "list":
_sessions_list()
elif args.sessions_action == "clear":
_sessions_clear(args.channel)
def _sessions_list():
"""List active sessions in tabular format."""
sessions = _load_sessions_file()
if not sessions:
print("No active sessions.")
return
# Header
print(f"{'Channel':<22} {'Model':<8} {'Messages':>8} {'Last message'}")
print(f"{'-'*22} {'-'*8} {'-'*8} {'-'*20}")
for channel_id, info in sessions.items():
model = info.get("model", "?")
count = info.get("message_count", 0)
last = info.get("last_message_at", "?")
# Truncate ISO timestamp to readable form
if last != "?" and len(last) > 19:
last = last[:19].replace("T", " ")
print(f"{channel_id:<22} {model:<8} {count:>8} {last}")
def _sessions_clear(channel: str | None):
"""Clear one or all sessions."""
from src.claude_session import clear_session, list_sessions as ls_sessions
if channel:
if clear_session(channel):
print(f"Session for '{channel}' cleared.")
else:
print(f"No session found for '{channel}'.")
else:
sessions = ls_sessions()
if not sessions:
print("No active sessions.")
return
count = len(sessions)
for ch in list(sessions.keys()):
clear_session(ch)
print(f"Cleared {count} session(s).")
def cmd_channel(args):
"""Handle channel subcommand."""
if args.channel_action == "add":
_channel_add(args.id, args.alias)
elif args.channel_action == "list":
_channel_list()
def _channel_add(channel_id: str, alias: str):
"""Add a channel to config.json."""
from src.config import Config
cfg = Config()
channels = cfg.get("channels", {})
if alias in channels:
print(f"Error: alias '{alias}' already exists")
sys.exit(1)
channels[alias] = {"id": channel_id}
cfg.set("channels", channels)
cfg.save()
print(f"Channel '{alias}' (ID: {channel_id}) added.")
def _channel_list():
"""List registered channels."""
from src.config import Config
cfg = Config()
channels = cfg.get("channels", {})
if not channels:
print("No channels registered.")
return
print(f"{'Alias':<20} {'Channel ID'}")
print(f"{'-'*20} {'-'*22}")
for alias, info in channels.items():
ch_id = info.get("id", "?") if isinstance(info, dict) else str(info)
print(f"{alias:<20} {ch_id}")
def cmd_send(args):
"""Send a message through the router."""
from src.config import Config
from src.router import route_message
cfg = Config()
channels = cfg.get("channels", {})
channel_info = channels.get(args.alias)
if not channel_info:
print(f"Error: unknown channel alias '{args.alias}'")
print(f"Available: {', '.join(channels.keys()) if channels else '(none)'}")
sys.exit(1)
channel_id = channel_info.get("id") if isinstance(channel_info, dict) else str(channel_info)
message = " ".join(args.message)
print(f"Sending to '{args.alias}' ({channel_id})...")
response, is_cmd = route_message(channel_id, "cli-user", message)
print(response)
def cmd_secrets(args):
"""Handle secrets subcommand."""
@@ -54,10 +334,53 @@ def cmd_secrets(args):
sys.exit(1)
# ---------------------------------------------------------------------------
# Argument parser
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(prog="echo", description="Echo Core CLI")
sub = parser.add_subparsers(dest="command")
# status
sub.add_parser("status", help="Show bot status")
# doctor
sub.add_parser("doctor", help="Run diagnostic checks")
# restart
sub.add_parser("restart", help="Restart the bot (send SIGTERM)")
# logs
logs_parser = sub.add_parser("logs", help="Show recent log lines")
logs_parser.add_argument("lines", nargs="?", type=int, default=20,
help="Number of lines (default: 20)")
# sessions
sessions_parser = sub.add_parser("sessions", help="Manage sessions")
sessions_sub = sessions_parser.add_subparsers(dest="sessions_action")
sessions_sub.add_parser("list", help="List active sessions")
sessions_clear_p = sessions_sub.add_parser("clear", help="Clear session(s)")
sessions_clear_p.add_argument("channel", nargs="?", default=None,
help="Channel ID to clear (omit for all)")
# channel
channel_parser = sub.add_parser("channel", help="Manage channels")
channel_sub = channel_parser.add_subparsers(dest="channel_action")
channel_add_p = channel_sub.add_parser("add", help="Add a channel")
channel_add_p.add_argument("--id", required=True, help="Discord channel ID")
channel_add_p.add_argument("--alias", required=True, help="Channel alias")
channel_sub.add_parser("list", help="List registered channels")
# send
send_parser = sub.add_parser("send", help="Send a message via router")
send_parser.add_argument("alias", help="Channel alias")
send_parser.add_argument("message", nargs="+", help="Message text")
# secrets
secrets_parser = sub.add_parser("secrets", help="Manage secrets")
secrets_sub = secrets_parser.add_subparsers(dest="secrets_action")
@@ -73,17 +396,35 @@ def main():
secrets_sub.add_parser("test", help="Check required secrets")
# Parse and dispatch
args = parser.parse_args()
if args.command is None:
parser.print_help()
sys.exit(0)
if args.command == "secrets":
if args.secrets_action is None:
secrets_parser.print_help()
sys.exit(0)
cmd_secrets(args)
dispatch = {
"status": cmd_status,
"doctor": cmd_doctor,
"restart": cmd_restart,
"logs": cmd_logs,
"sessions": lambda a: (
cmd_sessions(a) if a.sessions_action else (sessions_parser.print_help() or sys.exit(0))
),
"channel": lambda a: (
cmd_channel(a) if a.channel_action else (channel_parser.print_help() or sys.exit(0))
),
"send": cmd_send,
"secrets": lambda a: (
cmd_secrets(a) if a.secrets_action else (secrets_parser.print_help() or sys.exit(0))
),
}
handler = dispatch.get(args.command)
if handler:
handler(args)
else:
parser.print_help()
if __name__ == "__main__":