"""Comprehensive tests for cli.py subcommands.""" import argparse import json import time from contextlib import ExitStack from unittest.mock import patch, MagicMock import pytest import cli # --------------------------------------------------------------------------- # Helpers / fixtures # --------------------------------------------------------------------------- def _args(**kwargs): """Create an argparse.Namespace with given keyword attrs.""" return argparse.Namespace(**kwargs) @pytest.fixture def iso(tmp_path, monkeypatch): """Redirect all cli module-level paths to tmp_path for isolation.""" pid_file = tmp_path / "echo-core.pid" logs_dir = tmp_path / "logs" logs_dir.mkdir() log_file = logs_dir / "echo-core.log" sess_dir = tmp_path / "sessions" sess_dir.mkdir() sessions_file = sess_dir / "active.json" config_file = tmp_path / "config.json" monkeypatch.setattr(cli, "PID_FILE", pid_file) monkeypatch.setattr(cli, "LOG_FILE", log_file) monkeypatch.setattr(cli, "SESSIONS_FILE", sessions_file) monkeypatch.setattr(cli, "CONFIG_FILE", config_file) monkeypatch.setattr(cli, "PROJECT_ROOT", tmp_path) return { "pid_file": pid_file, "log_file": log_file, "sessions_file": sessions_file, "config_file": config_file, } # --------------------------------------------------------------------------- # cmd_status # --------------------------------------------------------------------------- class TestStatus: def _mock_service(self, active="active", pid="1234", ts="Fri 2026-02-13 22:00:00 UTC"): return {"ActiveState": active, "MainPID": pid, "ActiveEnterTimestamp": ts} def test_offline(self, iso, capsys): with patch("cli._get_service_status", return_value=self._mock_service(active="inactive", pid="0")): cli.cmd_status(_args()) out = capsys.readouterr().out assert "OFFLINE" in out def test_online(self, iso, capsys): iso["sessions_file"].write_text(json.dumps({"ch1": {}})) with patch("cli._get_service_status", return_value=self._mock_service()): cli.cmd_status(_args()) out = capsys.readouterr().out assert "ONLINE" in out assert "1234" in out assert "1 active" in out def test_sessions_count_zero(self, iso, capsys): with patch("cli._get_service_status", return_value=self._mock_service(active="inactive")): cli.cmd_status(_args()) out = capsys.readouterr().out assert "0 active" in out def test_bridge_status(self, iso, capsys): with patch("cli._get_service_status", side_effect=[ self._mock_service(), self._mock_service(pid="5678"), ]): cli.cmd_status(_args()) out = capsys.readouterr().out assert "WA Bridge: ONLINE" in out # --------------------------------------------------------------------------- # cmd_doctor # --------------------------------------------------------------------------- class TestDoctor: """Tests for the doctor diagnostic command.""" def _run_doctor(self, iso, capsys, *, token="tok", claude="/usr/bin/claude", disk_bavail=1_000_000, disk_frsize=4096, setup_full=False): """Run cmd_doctor with mocked externals, return (stdout, exit_code).""" import os as _os stat = MagicMock(f_bavail=disk_bavail, f_frsize=disk_frsize) # Mock subprocess.run for claude --version mock_proc = MagicMock(returncode=0, stdout="1.0.0", stderr="") # Mock urllib for Ollama reachability mock_resp = MagicMock(status=200) patches = [ patch("cli.get_secret", return_value=token), patch("keyring.get_password", return_value=None), patch("shutil.which", return_value=claude), patch("os.statvfs", return_value=stat), patch("subprocess.run", return_value=mock_proc), patch("urllib.request.urlopen", return_value=mock_resp), patch("cli._get_service_status", return_value={"ActiveState": "active", "MainPID": "123"}), ] if setup_full: # Create .gitignore with required entries gi_path = cli.PROJECT_ROOT / ".gitignore" gi_path.write_text("sessions/\nlogs/\n.env\n*.sqlite\n") # Set config.json not world-readable iso["config_file"].chmod(0o600) # Create sessions dir not world-readable sessions_dir = cli.PROJECT_ROOT / "sessions" sessions_dir.mkdir(exist_ok=True) sessions_dir.chmod(0o700) with ExitStack() as stack: for p in patches: stack.enter_context(p) try: cli.cmd_doctor(_args()) return capsys.readouterr().out, 0 except SystemExit as exc: return capsys.readouterr().out, exc.code def test_all_pass(self, iso, capsys): iso["config_file"].write_text('{"bot":{}}') out, code = self._run_doctor(iso, capsys, setup_full=True) assert "All checks passed" in out assert "[FAIL]" not in out assert code == 0 def test_missing_token(self, iso, capsys): iso["config_file"].write_text('{"bot":{}}') out, code = self._run_doctor(iso, capsys, token=None) assert "[FAIL] Discord token" in out assert code == 1 def test_missing_claude(self, iso, capsys): iso["config_file"].write_text('{"bot":{}}') out, code = self._run_doctor(iso, capsys, claude=None) assert "[FAIL] Claude CLI found" in out assert code == 1 def test_invalid_config(self, iso, capsys): # config_file not created -> FileNotFoundError out, code = self._run_doctor(iso, capsys) assert "[FAIL] config.json valid" in out assert code == 1 def test_low_disk_space(self, iso, capsys): iso["config_file"].write_text('{"bot":{}}') out, code = self._run_doctor(iso, capsys, disk_bavail=10, disk_frsize=4096) assert "[FAIL]" in out assert "Disk space" in out assert code == 1 def test_config_with_token_fails(self, iso, capsys): iso["config_file"].write_text('{"discord_token": "sk-abcdefghijklmnopqrstuvwxyz"}') out, code = self._run_doctor(iso, capsys) assert "[FAIL] config.json no plain text secrets" in out assert code == 1 def test_gitignore_check(self, iso, capsys): iso["config_file"].write_text('{"bot":{}}') # No .gitignore → FAIL out, code = self._run_doctor(iso, capsys) assert "[FAIL] .gitignore" in out assert code == 1 def test_ollama_check(self, iso, capsys): iso["config_file"].write_text('{"bot":{}}') out, code = self._run_doctor(iso, capsys, setup_full=True) assert "Ollama reachable" in out def test_claude_functional_check(self, iso, capsys): iso["config_file"].write_text('{"bot":{}}') out, code = self._run_doctor(iso, capsys, setup_full=True) assert "Claude CLI functional" in out # --------------------------------------------------------------------------- # cmd_restart # --------------------------------------------------------------------------- class TestRestart: def test_restart_success(self, iso, capsys): with patch("cli._systemctl", return_value=(0, "")), \ patch("cli._get_service_status", return_value={"ActiveState": "active", "MainPID": "999"}), \ patch("time.sleep"): cli.cmd_restart(_args(bridge=False)) out = capsys.readouterr().out assert "restarted" in out.lower() assert "999" in out def test_restart_with_bridge(self, iso, capsys): calls = [] def mock_ctl(*args): calls.append(args) return (0, "") with patch("cli._systemctl", side_effect=mock_ctl), \ patch("cli._get_service_status", return_value={"ActiveState": "active", "MainPID": "100"}), \ patch("time.sleep"): cli.cmd_restart(_args(bridge=True)) # Should have called kill+start for both bridge and core assert len(calls) == 4 def test_restart_fails(self, iso, capsys): with patch("cli._systemctl", return_value=(0, "")), \ patch("cli._get_service_status", return_value={"ActiveState": "failed"}), \ patch("time.sleep"): with pytest.raises(SystemExit): cli.cmd_restart(_args(bridge=False)) # --------------------------------------------------------------------------- # cmd_logs # --------------------------------------------------------------------------- class TestLogs: def test_missing_file(self, iso, capsys): cli.cmd_logs(_args(lines=20)) assert "No log file" in capsys.readouterr().out def test_empty_file(self, iso, capsys): iso["log_file"].write_text("") cli.cmd_logs(_args(lines=20)) assert capsys.readouterr().out == "" def test_last_n_lines(self, iso, capsys): iso["log_file"].write_text("\n".join(f"L{i}" for i in range(50))) cli.cmd_logs(_args(lines=5)) lines = capsys.readouterr().out.strip().splitlines() assert len(lines) == 5 assert lines[0] == "L45" assert lines[-1] == "L49" def test_fewer_than_n(self, iso, capsys): iso["log_file"].write_text("a\nb\nc") cli.cmd_logs(_args(lines=20)) lines = capsys.readouterr().out.strip().splitlines() assert len(lines) == 3 assert lines == ["a", "b", "c"] def test_exact_n(self, iso, capsys): iso["log_file"].write_text("\n".join(f"x{i}" for i in range(5))) cli.cmd_logs(_args(lines=5)) lines = capsys.readouterr().out.strip().splitlines() assert len(lines) == 5 # --------------------------------------------------------------------------- # sessions list # --------------------------------------------------------------------------- class TestSessionsList: def test_empty(self, iso, capsys): cli._sessions_list() assert "No active sessions" in capsys.readouterr().out def test_missing_file(self, iso, capsys): # sessions_file doesn't exist yet iso["sessions_file"].parent.joinpath("active.json") # no write cli._sessions_list() assert "No active sessions" in capsys.readouterr().out def test_shows_sessions(self, iso, capsys): data = { "ch1": {"model": "sonnet", "message_count": 5, "last_message_at": "2025-01-15T10:30:00.000+00:00"}, "ch2": {"model": "opus", "message_count": 12, "last_message_at": "2025-01-15T11:00:00.000+00:00"}, } iso["sessions_file"].write_text(json.dumps(data)) cli._sessions_list() out = capsys.readouterr().out assert "ch1" in out assert "sonnet" in out assert "ch2" in out assert "opus" in out # --------------------------------------------------------------------------- # sessions clear # --------------------------------------------------------------------------- class TestSessionsClear: def test_clear_specific(self, iso, capsys): with patch("src.claude_session.clear_session", return_value=True) as m: cli._sessions_clear("ch1") m.assert_called_once_with("ch1") assert "cleared" in capsys.readouterr().out.lower() def test_clear_specific_not_found(self, iso, capsys): with patch("src.claude_session.clear_session", return_value=False): cli._sessions_clear("missing") assert "No session found" in capsys.readouterr().out def test_clear_all(self, iso, capsys): with patch("src.claude_session.list_sessions", return_value={"a": {}, "b": {}}), \ patch("src.claude_session.clear_session") as mc: cli._sessions_clear(None) assert mc.call_count == 2 assert "2 session(s)" in capsys.readouterr().out def test_clear_all_empty(self, iso, capsys): with patch("src.claude_session.list_sessions", return_value={}): cli._sessions_clear(None) assert "No active sessions" in capsys.readouterr().out # --------------------------------------------------------------------------- # channel add # --------------------------------------------------------------------------- class TestChannelAdd: def test_adds_channel(self, iso, capsys): mock_cfg = MagicMock() mock_cfg.get.return_value = {} with patch("src.config.Config", return_value=mock_cfg): cli._channel_add("123456", "general") mock_cfg.set.assert_called_once_with( "channels", {"general": {"id": "123456"}} ) mock_cfg.save.assert_called_once() assert "added" in capsys.readouterr().out.lower() def test_duplicate_alias(self, iso, capsys): mock_cfg = MagicMock() mock_cfg.get.return_value = {"general": {"id": "111"}} with patch("src.config.Config", return_value=mock_cfg): with pytest.raises(SystemExit): cli._channel_add("999", "general") assert "already exists" in capsys.readouterr().out # --------------------------------------------------------------------------- # channel list # --------------------------------------------------------------------------- class TestChannelList: def test_empty(self, iso, capsys): mock_cfg = MagicMock() mock_cfg.get.return_value = {} with patch("src.config.Config", return_value=mock_cfg): cli._channel_list() assert "No channels registered" in capsys.readouterr().out def test_shows_channels(self, iso, capsys): mock_cfg = MagicMock() mock_cfg.get.return_value = { "general": {"id": "111"}, "dev": {"id": "222"}, } with patch("src.config.Config", return_value=mock_cfg): cli._channel_list() out = capsys.readouterr().out assert "general" in out assert "111" in out assert "dev" in out assert "222" in out # --------------------------------------------------------------------------- # cmd_send # --------------------------------------------------------------------------- class TestSend: def test_send_message(self, iso, capsys): mock_cfg = MagicMock() mock_cfg.get.return_value = {"test": {"id": "chan1"}} with patch("src.config.Config", return_value=mock_cfg), \ patch("src.router.route_message", return_value=("Hello!", False)) as mock_route: cli.cmd_send(_args(alias="test", message=["hi", "there"])) mock_route.assert_called_once_with("chan1", "cli-user", "hi there") out = capsys.readouterr().out assert "Hello!" in out def test_unknown_alias(self, iso, capsys): mock_cfg = MagicMock() mock_cfg.get.return_value = {} with patch("src.config.Config", return_value=mock_cfg): with pytest.raises(SystemExit): cli.cmd_send(_args(alias="nope", message=["hi"])) assert "unknown channel" in capsys.readouterr().out.lower() # --------------------------------------------------------------------------- # cron list # --------------------------------------------------------------------------- class TestCronList: def test_list_empty(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.list_jobs.return_value = [] with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_list() assert "No scheduled jobs" in capsys.readouterr().out def test_list_shows_table(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [ { "name": "daily-run", "cron": "30 6 * * *", "channel": "work", "model": "sonnet", "enabled": True, "last_status": "ok", "next_run": None, } ] mock_sched.list_jobs.return_value = mock_sched._load_jobs.return_value with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_list() out = capsys.readouterr().out assert "daily-run" in out assert "30 6 * * *" in out assert "sonnet" in out # --------------------------------------------------------------------------- # cron add / remove / enable / disable # --------------------------------------------------------------------------- class TestCronAdd: def test_add_success(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.add_job.return_value = { "name": "new-job", "cron": "0 * * * *", "channel": "ch", "model": "sonnet", } with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_add("new-job", "0 * * * *", "ch", "hello prompt", "sonnet", []) out = capsys.readouterr().out assert "new-job" in out assert "added" in out.lower() def test_add_error(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.add_job.side_effect = ValueError("duplicate name") with patch("src.scheduler.Scheduler", return_value=mock_sched): with pytest.raises(SystemExit): cli._cron_add("dup", "0 * * * *", "ch", "prompt", "sonnet", []) assert "duplicate name" in capsys.readouterr().out class TestCronRemove: def test_remove_found(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.remove_job.return_value = True with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_remove("old-job") assert "removed" in capsys.readouterr().out.lower() def test_remove_not_found(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.remove_job.return_value = False with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_remove("ghost") assert "not found" in capsys.readouterr().out.lower() class TestCronEnable: def test_enable_found(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.enable_job.return_value = True with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_enable("my-job") assert "enabled" in capsys.readouterr().out.lower() def test_enable_not_found(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.enable_job.return_value = False with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_enable("nope") assert "not found" in capsys.readouterr().out.lower() class TestCronDisable: def test_disable_found(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.disable_job.return_value = True with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_disable("my-job") assert "disabled" in capsys.readouterr().out.lower() def test_disable_not_found(self, iso, capsys): mock_sched = MagicMock() mock_sched._load_jobs.return_value = [] mock_sched.disable_job.return_value = False with patch("src.scheduler.Scheduler", return_value=mock_sched): cli._cron_disable("nope") assert "not found" in capsys.readouterr().out.lower()