- Replace all ~/clawd and ~/.clawdbot paths with ~/echo-core equivalents in tools (git_commit, ralph_prd_generator, backup_config, lead-gen) - Update personality files: TOOLS.md repo/paths, AGENTS.md security audit cmd - Migrate HANDOFF.md architectural decisions to docs/architecture.md - Tighten credentials/ dir to 700, add to .gitignore - Add .claude/ and *.pid to .gitignore - Various adapter, router, and session improvements from prior work Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
532 lines
20 KiB
Python
532 lines
20 KiB
Python
"""Comprehensive tests for cli.py subcommands."""
|
|
|
|
import argparse
|
|
import json
|
|
import time
|
|
from contextlib import ExitStack
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
import cli
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers / fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _args(**kwargs):
|
|
"""Create an argparse.Namespace with given keyword attrs."""
|
|
return argparse.Namespace(**kwargs)
|
|
|
|
|
|
@pytest.fixture
|
|
def iso(tmp_path, monkeypatch):
|
|
"""Redirect all cli module-level paths to tmp_path for isolation."""
|
|
pid_file = tmp_path / "echo-core.pid"
|
|
logs_dir = tmp_path / "logs"
|
|
logs_dir.mkdir()
|
|
log_file = logs_dir / "echo-core.log"
|
|
sess_dir = tmp_path / "sessions"
|
|
sess_dir.mkdir()
|
|
sessions_file = sess_dir / "active.json"
|
|
config_file = tmp_path / "config.json"
|
|
|
|
monkeypatch.setattr(cli, "PID_FILE", pid_file)
|
|
monkeypatch.setattr(cli, "LOG_FILE", log_file)
|
|
monkeypatch.setattr(cli, "SESSIONS_FILE", sessions_file)
|
|
monkeypatch.setattr(cli, "CONFIG_FILE", config_file)
|
|
monkeypatch.setattr(cli, "PROJECT_ROOT", tmp_path)
|
|
|
|
return {
|
|
"pid_file": pid_file,
|
|
"log_file": log_file,
|
|
"sessions_file": sessions_file,
|
|
"config_file": config_file,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# cmd_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStatus:
|
|
def _mock_service(self, active="active", pid="1234", ts="Fri 2026-02-13 22:00:00 UTC"):
|
|
return {"ActiveState": active, "MainPID": pid, "ActiveEnterTimestamp": ts}
|
|
|
|
def test_offline(self, iso, capsys):
|
|
with patch("cli._get_service_status", return_value=self._mock_service(active="inactive", pid="0")):
|
|
cli.cmd_status(_args())
|
|
out = capsys.readouterr().out
|
|
assert "OFFLINE" in out
|
|
|
|
def test_online(self, iso, capsys):
|
|
iso["sessions_file"].write_text(json.dumps({"ch1": {}}))
|
|
with patch("cli._get_service_status", return_value=self._mock_service()):
|
|
cli.cmd_status(_args())
|
|
out = capsys.readouterr().out
|
|
assert "ONLINE" in out
|
|
assert "1234" in out
|
|
assert "1 active" in out
|
|
|
|
def test_sessions_count_zero(self, iso, capsys):
|
|
with patch("cli._get_service_status", return_value=self._mock_service(active="inactive")):
|
|
cli.cmd_status(_args())
|
|
out = capsys.readouterr().out
|
|
assert "0 active" in out
|
|
|
|
def test_bridge_status(self, iso, capsys):
|
|
with patch("cli._get_service_status", side_effect=[
|
|
self._mock_service(),
|
|
self._mock_service(pid="5678"),
|
|
]):
|
|
cli.cmd_status(_args())
|
|
out = capsys.readouterr().out
|
|
assert "WA Bridge: ONLINE" in out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# cmd_doctor
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDoctor:
|
|
"""Tests for the doctor diagnostic command."""
|
|
|
|
def _run_doctor(self, iso, capsys, *, token="tok",
|
|
claude="/usr/bin/claude",
|
|
disk_bavail=1_000_000, disk_frsize=4096,
|
|
setup_full=False):
|
|
"""Run cmd_doctor with mocked externals, return (stdout, exit_code)."""
|
|
import os as _os
|
|
stat = MagicMock(f_bavail=disk_bavail, f_frsize=disk_frsize)
|
|
|
|
# Mock subprocess.run for claude --version
|
|
mock_proc = MagicMock(returncode=0, stdout="1.0.0", stderr="")
|
|
|
|
# Mock urllib for Ollama reachability
|
|
mock_resp = MagicMock(status=200)
|
|
|
|
patches = [
|
|
patch("cli.get_secret", return_value=token),
|
|
patch("keyring.get_password", return_value=None),
|
|
patch("shutil.which", return_value=claude),
|
|
patch("os.statvfs", return_value=stat),
|
|
patch("subprocess.run", return_value=mock_proc),
|
|
patch("urllib.request.urlopen", return_value=mock_resp),
|
|
patch("cli._get_service_status", return_value={"ActiveState": "active", "MainPID": "123"}),
|
|
]
|
|
|
|
if setup_full:
|
|
# Create .gitignore with required entries
|
|
gi_path = cli.PROJECT_ROOT / ".gitignore"
|
|
gi_path.write_text("sessions/\nlogs/\n.env\n*.sqlite\n")
|
|
# Set config.json not world-readable
|
|
iso["config_file"].chmod(0o600)
|
|
# Create sessions dir not world-readable
|
|
sessions_dir = cli.PROJECT_ROOT / "sessions"
|
|
sessions_dir.mkdir(exist_ok=True)
|
|
sessions_dir.chmod(0o700)
|
|
|
|
with ExitStack() as stack:
|
|
for p in patches:
|
|
stack.enter_context(p)
|
|
try:
|
|
cli.cmd_doctor(_args())
|
|
return capsys.readouterr().out, 0
|
|
except SystemExit as exc:
|
|
return capsys.readouterr().out, exc.code
|
|
|
|
def test_all_pass(self, iso, capsys):
|
|
iso["config_file"].write_text('{"bot":{}}')
|
|
out, code = self._run_doctor(iso, capsys, setup_full=True)
|
|
assert "All checks passed" in out
|
|
assert "[FAIL]" not in out
|
|
assert code == 0
|
|
|
|
def test_missing_token(self, iso, capsys):
|
|
iso["config_file"].write_text('{"bot":{}}')
|
|
out, code = self._run_doctor(iso, capsys, token=None)
|
|
assert "[FAIL] Discord token" in out
|
|
assert code == 1
|
|
|
|
def test_missing_claude(self, iso, capsys):
|
|
iso["config_file"].write_text('{"bot":{}}')
|
|
out, code = self._run_doctor(iso, capsys, claude=None)
|
|
assert "[FAIL] Claude CLI found" in out
|
|
assert code == 1
|
|
|
|
def test_invalid_config(self, iso, capsys):
|
|
# config_file not created -> FileNotFoundError
|
|
out, code = self._run_doctor(iso, capsys)
|
|
assert "[FAIL] config.json valid" in out
|
|
assert code == 1
|
|
|
|
def test_low_disk_space(self, iso, capsys):
|
|
iso["config_file"].write_text('{"bot":{}}')
|
|
out, code = self._run_doctor(iso, capsys, disk_bavail=10, disk_frsize=4096)
|
|
assert "[FAIL]" in out
|
|
assert "Disk space" in out
|
|
assert code == 1
|
|
|
|
def test_config_with_token_fails(self, iso, capsys):
|
|
iso["config_file"].write_text('{"discord_token": "sk-abcdefghijklmnopqrstuvwxyz"}')
|
|
out, code = self._run_doctor(iso, capsys)
|
|
assert "[FAIL] config.json no plain text secrets" in out
|
|
assert code == 1
|
|
|
|
def test_gitignore_check(self, iso, capsys):
|
|
iso["config_file"].write_text('{"bot":{}}')
|
|
# No .gitignore → FAIL
|
|
out, code = self._run_doctor(iso, capsys)
|
|
assert "[FAIL] .gitignore" in out
|
|
assert code == 1
|
|
|
|
def test_ollama_check(self, iso, capsys):
|
|
iso["config_file"].write_text('{"bot":{}}')
|
|
out, code = self._run_doctor(iso, capsys, setup_full=True)
|
|
assert "Ollama reachable" in out
|
|
|
|
def test_claude_functional_check(self, iso, capsys):
|
|
iso["config_file"].write_text('{"bot":{}}')
|
|
out, code = self._run_doctor(iso, capsys, setup_full=True)
|
|
assert "Claude CLI functional" in out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# cmd_restart
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRestart:
|
|
def test_restart_success(self, iso, capsys):
|
|
with patch("cli._systemctl", return_value=(0, "")), \
|
|
patch("cli._get_service_status", return_value={"ActiveState": "active", "MainPID": "999"}), \
|
|
patch("time.sleep"):
|
|
cli.cmd_restart(_args(bridge=False))
|
|
out = capsys.readouterr().out
|
|
assert "restarted" in out.lower()
|
|
assert "999" in out
|
|
|
|
def test_restart_with_bridge(self, iso, capsys):
|
|
calls = []
|
|
def mock_ctl(*args):
|
|
calls.append(args)
|
|
return (0, "")
|
|
with patch("cli._systemctl", side_effect=mock_ctl), \
|
|
patch("cli._get_service_status", return_value={"ActiveState": "active", "MainPID": "100"}), \
|
|
patch("time.sleep"):
|
|
cli.cmd_restart(_args(bridge=True))
|
|
# kill+start bridge, restart core
|
|
assert len(calls) == 3
|
|
|
|
def test_restart_fails(self, iso, capsys):
|
|
with patch("cli._systemctl", return_value=(0, "")), \
|
|
patch("cli._get_service_status", return_value={"ActiveState": "failed"}), \
|
|
patch("time.sleep"):
|
|
with pytest.raises(SystemExit):
|
|
cli.cmd_restart(_args(bridge=False))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# cmd_logs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLogs:
|
|
def test_missing_file(self, iso, capsys):
|
|
cli.cmd_logs(_args(lines=20))
|
|
assert "No log file" in capsys.readouterr().out
|
|
|
|
def test_empty_file(self, iso, capsys):
|
|
iso["log_file"].write_text("")
|
|
cli.cmd_logs(_args(lines=20))
|
|
assert capsys.readouterr().out == ""
|
|
|
|
def test_last_n_lines(self, iso, capsys):
|
|
iso["log_file"].write_text("\n".join(f"L{i}" for i in range(50)))
|
|
cli.cmd_logs(_args(lines=5))
|
|
lines = capsys.readouterr().out.strip().splitlines()
|
|
assert len(lines) == 5
|
|
assert lines[0] == "L45"
|
|
assert lines[-1] == "L49"
|
|
|
|
def test_fewer_than_n(self, iso, capsys):
|
|
iso["log_file"].write_text("a\nb\nc")
|
|
cli.cmd_logs(_args(lines=20))
|
|
lines = capsys.readouterr().out.strip().splitlines()
|
|
assert len(lines) == 3
|
|
assert lines == ["a", "b", "c"]
|
|
|
|
def test_exact_n(self, iso, capsys):
|
|
iso["log_file"].write_text("\n".join(f"x{i}" for i in range(5)))
|
|
cli.cmd_logs(_args(lines=5))
|
|
lines = capsys.readouterr().out.strip().splitlines()
|
|
assert len(lines) == 5
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# sessions list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSessionsList:
|
|
def test_empty(self, iso, capsys):
|
|
cli._sessions_list()
|
|
assert "No active sessions" in capsys.readouterr().out
|
|
|
|
def test_missing_file(self, iso, capsys):
|
|
# sessions_file doesn't exist yet
|
|
iso["sessions_file"].parent.joinpath("active.json") # no write
|
|
cli._sessions_list()
|
|
assert "No active sessions" in capsys.readouterr().out
|
|
|
|
def test_shows_sessions(self, iso, capsys):
|
|
data = {
|
|
"ch1": {"model": "sonnet", "message_count": 5,
|
|
"last_message_at": "2025-01-15T10:30:00.000+00:00"},
|
|
"ch2": {"model": "opus", "message_count": 12,
|
|
"last_message_at": "2025-01-15T11:00:00.000+00:00"},
|
|
}
|
|
iso["sessions_file"].write_text(json.dumps(data))
|
|
cli._sessions_list()
|
|
out = capsys.readouterr().out
|
|
assert "ch1" in out
|
|
assert "sonnet" in out
|
|
assert "ch2" in out
|
|
assert "opus" in out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# sessions clear
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSessionsClear:
|
|
def test_clear_specific(self, iso, capsys):
|
|
with patch("src.claude_session.clear_session", return_value=True) as m:
|
|
cli._sessions_clear("ch1")
|
|
m.assert_called_once_with("ch1")
|
|
assert "cleared" in capsys.readouterr().out.lower()
|
|
|
|
def test_clear_specific_not_found(self, iso, capsys):
|
|
with patch("src.claude_session.clear_session", return_value=False):
|
|
cli._sessions_clear("missing")
|
|
assert "No session found" in capsys.readouterr().out
|
|
|
|
def test_clear_all(self, iso, capsys):
|
|
with patch("src.claude_session.list_sessions",
|
|
return_value={"a": {}, "b": {}}), \
|
|
patch("src.claude_session.clear_session") as mc:
|
|
cli._sessions_clear(None)
|
|
assert mc.call_count == 2
|
|
assert "2 session(s)" in capsys.readouterr().out
|
|
|
|
def test_clear_all_empty(self, iso, capsys):
|
|
with patch("src.claude_session.list_sessions", return_value={}):
|
|
cli._sessions_clear(None)
|
|
assert "No active sessions" in capsys.readouterr().out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# channel add
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestChannelAdd:
|
|
def test_adds_channel(self, iso, capsys):
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.get.return_value = {}
|
|
with patch("src.config.Config", return_value=mock_cfg):
|
|
cli._channel_add("123456", "general")
|
|
mock_cfg.set.assert_called_once_with(
|
|
"channels", {"general": {"id": "123456"}}
|
|
)
|
|
mock_cfg.save.assert_called_once()
|
|
assert "added" in capsys.readouterr().out.lower()
|
|
|
|
def test_duplicate_alias(self, iso, capsys):
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.get.return_value = {"general": {"id": "111"}}
|
|
with patch("src.config.Config", return_value=mock_cfg):
|
|
with pytest.raises(SystemExit):
|
|
cli._channel_add("999", "general")
|
|
assert "already exists" in capsys.readouterr().out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# channel list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestChannelList:
|
|
def test_empty(self, iso, capsys):
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.get.return_value = {}
|
|
with patch("src.config.Config", return_value=mock_cfg):
|
|
cli._channel_list()
|
|
assert "No channels registered" in capsys.readouterr().out
|
|
|
|
def test_shows_channels(self, iso, capsys):
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.get.return_value = {
|
|
"general": {"id": "111"},
|
|
"dev": {"id": "222"},
|
|
}
|
|
with patch("src.config.Config", return_value=mock_cfg):
|
|
cli._channel_list()
|
|
out = capsys.readouterr().out
|
|
assert "general" in out
|
|
assert "111" in out
|
|
assert "dev" in out
|
|
assert "222" in out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# cmd_send
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSend:
|
|
def test_send_message(self, iso, capsys):
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.get.return_value = {"test": {"id": "chan1"}}
|
|
with patch("src.config.Config", return_value=mock_cfg), \
|
|
patch("src.router.route_message",
|
|
return_value=("Hello!", False)) as mock_route:
|
|
cli.cmd_send(_args(alias="test", message=["hi", "there"]))
|
|
mock_route.assert_called_once_with("chan1", "cli-user", "hi there")
|
|
out = capsys.readouterr().out
|
|
assert "Hello!" in out
|
|
|
|
def test_unknown_alias(self, iso, capsys):
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.get.return_value = {}
|
|
with patch("src.config.Config", return_value=mock_cfg):
|
|
with pytest.raises(SystemExit):
|
|
cli.cmd_send(_args(alias="nope", message=["hi"]))
|
|
assert "unknown channel" in capsys.readouterr().out.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# cron list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCronList:
|
|
def test_list_empty(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.list_jobs.return_value = []
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_list()
|
|
assert "No scheduled jobs" in capsys.readouterr().out
|
|
|
|
def test_list_shows_table(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = [
|
|
{
|
|
"name": "daily-run",
|
|
"cron": "30 6 * * *",
|
|
"channel": "work",
|
|
"model": "sonnet",
|
|
"enabled": True,
|
|
"last_status": "ok",
|
|
"next_run": None,
|
|
}
|
|
]
|
|
mock_sched.list_jobs.return_value = mock_sched._load_jobs.return_value
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_list()
|
|
out = capsys.readouterr().out
|
|
assert "daily-run" in out
|
|
assert "30 6 * * *" in out
|
|
assert "sonnet" in out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# cron add / remove / enable / disable
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCronAdd:
|
|
def test_add_success(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.add_job.return_value = {
|
|
"name": "new-job",
|
|
"cron": "0 * * * *",
|
|
"channel": "ch",
|
|
"model": "sonnet",
|
|
}
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_add("new-job", "0 * * * *", "ch", "hello prompt", "sonnet", [])
|
|
out = capsys.readouterr().out
|
|
assert "new-job" in out
|
|
assert "added" in out.lower()
|
|
|
|
def test_add_error(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.add_job.side_effect = ValueError("duplicate name")
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
with pytest.raises(SystemExit):
|
|
cli._cron_add("dup", "0 * * * *", "ch", "prompt", "sonnet", [])
|
|
assert "duplicate name" in capsys.readouterr().out
|
|
|
|
|
|
class TestCronRemove:
|
|
def test_remove_found(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.remove_job.return_value = True
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_remove("old-job")
|
|
assert "removed" in capsys.readouterr().out.lower()
|
|
|
|
def test_remove_not_found(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.remove_job.return_value = False
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_remove("ghost")
|
|
assert "not found" in capsys.readouterr().out.lower()
|
|
|
|
|
|
class TestCronEnable:
|
|
def test_enable_found(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.enable_job.return_value = True
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_enable("my-job")
|
|
assert "enabled" in capsys.readouterr().out.lower()
|
|
|
|
def test_enable_not_found(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.enable_job.return_value = False
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_enable("nope")
|
|
assert "not found" in capsys.readouterr().out.lower()
|
|
|
|
|
|
class TestCronDisable:
|
|
def test_disable_found(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.disable_job.return_value = True
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_disable("my-job")
|
|
assert "disabled" in capsys.readouterr().out.lower()
|
|
|
|
def test_disable_not_found(self, iso, capsys):
|
|
mock_sched = MagicMock()
|
|
mock_sched._load_jobs.return_value = []
|
|
mock_sched.disable_job.return_value = False
|
|
with patch("src.scheduler.Scheduler", return_value=mock_sched):
|
|
cli._cron_disable("nope")
|
|
assert "not found" in capsys.readouterr().out.lower()
|