"""Tests for src/adapters/discord_bot.py — Discord bot adapter.""" import json import logging import signal import pytest from unittest.mock import AsyncMock, MagicMock, patch import discord from src.config import Config from src.adapters import discord_bot from src.adapters.discord_bot import ( create_bot, is_admin, is_owner, is_registered_channel, split_message, ) # --- Fixtures --- @pytest.fixture def tmp_config(tmp_path): """Create a Config backed by a temp file with default data.""" data = { "bot": { "name": "Echo", "default_model": "sonnet", "owner": None, "admins": [], }, "channels": {}, } config_file = tmp_path / "config.json" config_file.write_text(json.dumps(data, indent=2)) return Config(config_file) @pytest.fixture def owned_config(tmp_path): """Config with owner already set.""" data = { "bot": { "name": "Echo", "default_model": "sonnet", "owner": "111", "admins": ["222"], }, "channels": { "general": {"id": "900", "default_model": "sonnet"}, }, } config_file = tmp_path / "config.json" config_file.write_text(json.dumps(data, indent=2)) return Config(config_file) @pytest.fixture def bot(tmp_config): """Create a bot with fresh (no-owner) config, return the client.""" return create_bot(tmp_config) @pytest.fixture def owned_bot(owned_config): """Create a bot with owned config, return the client.""" return create_bot(owned_config) def _mock_interaction(user_id="123", channel_id="456"): """Create a mock discord.Interaction.""" interaction = AsyncMock(spec=discord.Interaction) interaction.user = MagicMock() interaction.user.id = int(user_id) interaction.channel_id = int(channel_id) interaction.response = AsyncMock() interaction.response.send_message = AsyncMock() return interaction def _find_command(tree, name): """Find a top-level command on the tree by name.""" for cmd in tree.get_commands(): if cmd.name == name: return cmd return None def _find_subcommand(tree, group_name, sub_name): """Find a subcommand inside a group on the tree.""" group = _find_command(tree, group_name) if group is None or not isinstance(group, discord.app_commands.Group): return None for cmd in group.commands: if cmd.name == sub_name: return cmd return None # --- Authorization helpers --- class TestIsOwner: def test_is_owner_true(self, owned_bot, owned_config): assert is_owner("111") is True def test_is_owner_false(self, owned_bot, owned_config): assert is_owner("999") is False def test_is_owner_none_owner(self, bot, tmp_config): assert is_owner("123") is False class TestIsAdmin: def test_is_admin_owner_is_admin(self, owned_bot): assert is_admin("111") is True def test_is_admin_listed(self, owned_bot): assert is_admin("222") is True def test_is_admin_not_listed(self, owned_bot): assert is_admin("999") is False class TestIsRegisteredChannel: def test_is_registered_channel_true(self, owned_bot): assert is_registered_channel("900") is True def test_is_registered_channel_false(self, owned_bot): assert is_registered_channel("000") is False def test_is_registered_channel_empty(self, bot): assert is_registered_channel("900") is False # --- create_bot --- class TestCreateBot: def test_create_bot_returns_client(self, bot): assert isinstance(bot, discord.Client) def test_create_bot_has_tree(self, bot): assert hasattr(bot, "tree") assert isinstance(bot.tree, discord.app_commands.CommandTree) def test_create_bot_intents_message_content(self, bot): assert bot.intents.message_content is True # --- Slash commands --- class TestPing: @pytest.mark.asyncio async def test_ping_responds(self, bot): cmd = _find_command(bot.tree, "ping") assert cmd is not None interaction = _mock_interaction() with patch.object(type(bot), "latency", new_callable=lambda: property(lambda self: 0.042)): await cmd.callback(interaction) interaction.response.send_message.assert_awaited_once() msg = interaction.response.send_message.call_args assert "Pong!" in msg.args[0] or "Pong!" in msg.kwargs.get("content", msg.args[0] if msg.args else "") assert msg.kwargs.get("ephemeral") is True class TestHelp: @pytest.mark.asyncio async def test_help_responds_with_commands(self, bot): cmd = _find_command(bot.tree, "help") assert cmd is not None interaction = _mock_interaction() await cmd.callback(interaction) interaction.response.send_message.assert_awaited_once() msg_text = interaction.response.send_message.call_args.args[0] assert "/ping" in msg_text assert "/help" in msg_text assert "/setup" in msg_text assert msg_text # non-empty class TestSetup: @pytest.mark.asyncio async def test_setup_sets_owner(self, bot, tmp_config): cmd = _find_command(bot.tree, "setup") interaction = _mock_interaction(user_id="123") await cmd.callback(interaction) interaction.response.send_message.assert_awaited_once() msg = interaction.response.send_message.call_args assert "owner of Echo" in msg.args[0] assert tmp_config.get("bot.owner") == "123" @pytest.mark.asyncio async def test_setup_already_set(self, owned_bot, owned_config): cmd = _find_command(owned_bot.tree, "setup") interaction = _mock_interaction(user_id="999") await cmd.callback(interaction) msg = interaction.response.send_message.call_args assert "already set" in msg.args[0].lower() class TestChannelAdd: @pytest.mark.asyncio async def test_channel_add_as_owner(self, owned_bot, owned_config): cmd = _find_subcommand(owned_bot.tree, "channel", "add") assert cmd is not None interaction = _mock_interaction(user_id="111", channel_id="456") await cmd.callback(interaction, alias="work") interaction.response.send_message.assert_awaited_once() msg = interaction.response.send_message.call_args assert "work" in msg.args[0] assert owned_config.get("channels.work") is not None assert owned_config.get("channels.work")["id"] == "456" @pytest.mark.asyncio async def test_channel_add_not_owner(self, owned_bot, owned_config): cmd = _find_subcommand(owned_bot.tree, "channel", "add") interaction = _mock_interaction(user_id="999", channel_id="456") await cmd.callback(interaction, alias="work") msg = interaction.response.send_message.call_args assert "owner only" in msg.args[0].lower() @pytest.mark.asyncio async def test_channel_add_saves_to_config(self, owned_bot, owned_config): cmd = _find_subcommand(owned_bot.tree, "channel", "add") interaction = _mock_interaction(user_id="111", channel_id="789") await cmd.callback(interaction, alias="dev") # Reload config from disk to verify save() was called owned_config.reload() assert owned_config.get("channels.dev")["id"] == "789" assert owned_config.get("channels.dev")["default_model"] == "sonnet" class TestChannels: @pytest.mark.asyncio async def test_channels_lists(self, owned_bot, owned_config): cmd = _find_command(owned_bot.tree, "channels") interaction = _mock_interaction() await cmd.callback(interaction) msg = interaction.response.send_message.call_args text = msg.args[0] assert "general" in text assert "900" in text @pytest.mark.asyncio async def test_channels_empty(self, bot, tmp_config): cmd = _find_command(bot.tree, "channels") interaction = _mock_interaction() await cmd.callback(interaction) msg = interaction.response.send_message.call_args assert "no channels" in msg.args[0].lower() class TestAdminAdd: @pytest.mark.asyncio async def test_admin_add_as_owner(self, owned_bot, owned_config): cmd = _find_subcommand(owned_bot.tree, "admin", "add") assert cmd is not None interaction = _mock_interaction(user_id="111") await cmd.callback(interaction, user_id="333") msg = interaction.response.send_message.call_args assert "333" in msg.args[0] assert "333" in owned_config.get("bot.admins") @pytest.mark.asyncio async def test_admin_add_not_owner(self, owned_bot, owned_config): cmd = _find_subcommand(owned_bot.tree, "admin", "add") interaction = _mock_interaction(user_id="999") await cmd.callback(interaction, user_id="333") msg = interaction.response.send_message.call_args assert "owner only" in msg.args[0].lower() @pytest.mark.asyncio async def test_admin_add_idempotent(self, owned_bot, owned_config): cmd = _find_subcommand(owned_bot.tree, "admin", "add") interaction = _mock_interaction(user_id="111") # Add "222" which is already in admins list await cmd.callback(interaction, user_id="222") admins = owned_config.get("bot.admins") assert admins.count("222") == 1 # no duplicates @pytest.mark.asyncio async def test_admin_add_saves_to_config(self, owned_bot, owned_config): cmd = _find_subcommand(owned_bot.tree, "admin", "add") interaction = _mock_interaction(user_id="111") await cmd.callback(interaction, user_id="444") owned_config.reload() assert "444" in owned_config.get("bot.admins") # --- on_message --- class TestOnMessage: def _get_on_message(self, client): """Extract the on_message handler from the client's event listeners.""" # discord.py Client stores overridden events; we can call it directly # The on_message is registered via @client.event decorator return client.on_message @pytest.mark.asyncio async def test_ignores_own_messages(self, owned_bot): on_message = self._get_on_message(owned_bot) message = MagicMock(spec=discord.Message) message.author = owned_bot.user # bot's own user message.channel = MagicMock() # Should return without logging (no error) await on_message(message) @pytest.mark.asyncio async def test_ignores_unregistered_channel(self, owned_bot, caplog): on_message = self._get_on_message(owned_bot) message = MagicMock(spec=discord.Message) message.author = MagicMock() message.author.id = 999 message.author != owned_bot.user # not the bot # Make sure author comparison returns False message.author.__eq__ = lambda self, other: False message.channel = MagicMock(spec=discord.TextChannel) message.channel.id = 12345 # not registered message.content = "hello" with caplog.at_level(logging.INFO, logger="echo-core.discord"): await on_message(message) assert "registered channel" not in caplog.text.lower() @pytest.mark.asyncio async def test_logs_registered_channel(self, owned_bot, caplog): on_message = self._get_on_message(owned_bot) message = MagicMock(spec=discord.Message) message.author = MagicMock() message.author.id = 555 message.author.__eq__ = lambda self, other: False message.channel = MagicMock(spec=discord.TextChannel) message.channel.id = 900 # registered channel message.content = "hello world" with caplog.at_level(logging.INFO, logger="echo-core.discord"): await on_message(message) assert "registered channel" in caplog.text.lower() @pytest.mark.asyncio async def test_ignores_non_admin_dm(self, owned_bot, caplog): on_message = self._get_on_message(owned_bot) message = MagicMock(spec=discord.Message) message.author = MagicMock() message.author.id = 999 # not admin message.author.__eq__ = lambda self, other: False message.channel = MagicMock(spec=discord.DMChannel) message.content = "hello" with caplog.at_level(logging.INFO, logger="echo-core.discord"): await on_message(message) assert "dm from admin" not in caplog.text.lower() @pytest.mark.asyncio async def test_logs_admin_dm(self, owned_bot, caplog): on_message = self._get_on_message(owned_bot) message = MagicMock(spec=discord.Message) message.author = MagicMock() message.author.id = 222 # in admins list message.author.__eq__ = lambda self, other: False message.channel = MagicMock(spec=discord.DMChannel) message.content = "admin message" with caplog.at_level(logging.INFO, logger="echo-core.discord"): await on_message(message) assert "dm from admin" in caplog.text.lower() @pytest.mark.asyncio @patch("src.adapters.discord_bot.route_message") async def test_chat_flow(self, mock_route, owned_bot): """on_message chat flow: reaction, typing, route, send, cleanup.""" mock_route.return_value = "Hello from Claude!" on_message = self._get_on_message(owned_bot) message = AsyncMock(spec=discord.Message) message.author = MagicMock() message.author.id = 555 message.author.__eq__ = lambda self, other: False message.channel = AsyncMock(spec=discord.TextChannel) message.channel.id = 900 # registered channel message.content = "test message" await on_message(message) # Verify eyes reaction added message.add_reaction.assert_awaited_once_with("\U0001f440") # Verify typing indicator was triggered message.channel.typing.assert_called_once() # Verify response sent message.channel.send.assert_awaited_once_with("Hello from Claude!") # Verify eyes reaction removed message.remove_reaction.assert_awaited_once() # --- split_message --- class TestSplitMessage: def test_short_text_no_split(self): result = split_message("hello") assert result == ["hello"] def test_long_text_split_at_newline(self): text = "a" * 10 + "\n" + "b" * 10 result = split_message(text, limit=15) assert result == ["a" * 10, "b" * 10] def test_very_long_without_newlines_hard_split(self): text = "a" * 30 result = split_message(text, limit=10) assert result == ["a" * 10, "a" * 10, "a" * 10] # --- /clear slash command --- class TestClearSlashCommand: @pytest.mark.asyncio @patch("src.adapters.discord_bot.clear_session") async def test_clear_with_session(self, mock_clear, owned_bot): mock_clear.return_value = True cmd = _find_command(owned_bot.tree, "clear") interaction = _mock_interaction(channel_id="900") await cmd.callback(interaction) msg = interaction.response.send_message.call_args assert "session cleared" in msg.args[0].lower() @pytest.mark.asyncio @patch("src.adapters.discord_bot.clear_session") async def test_clear_no_session(self, mock_clear, owned_bot): mock_clear.return_value = False cmd = _find_command(owned_bot.tree, "clear") interaction = _mock_interaction(channel_id="900") await cmd.callback(interaction) msg = interaction.response.send_message.call_args assert "no active session" in msg.args[0].lower() # --- /status slash command --- class TestStatusSlashCommand: @pytest.mark.asyncio @patch("src.adapters.discord_bot.get_active_session") async def test_status_with_session(self, mock_get, owned_bot): mock_get.return_value = { "session_id": "abcdef1234567890", "model": "sonnet", "message_count": 3, } cmd = _find_command(owned_bot.tree, "status") interaction = _mock_interaction(channel_id="900") await cmd.callback(interaction) msg = interaction.response.send_message.call_args text = msg.args[0] if msg.args else msg.kwargs.get("content", "") assert "sonnet" in text assert "3" in text @pytest.mark.asyncio @patch("src.adapters.discord_bot.get_active_session") async def test_status_no_session(self, mock_get, owned_bot): mock_get.return_value = None cmd = _find_command(owned_bot.tree, "status") interaction = _mock_interaction(channel_id="900") await cmd.callback(interaction) msg = interaction.response.send_message.call_args assert "no active session" in msg.args[0].lower() # --- /clear mentions model reset --- class TestClearMentionsModelReset: @pytest.mark.asyncio @patch("src.adapters.discord_bot.clear_session") async def test_clear_mentions_model_reset(self, mock_clear, owned_bot): mock_clear.return_value = True cmd = _find_command(owned_bot.tree, "clear") interaction = _mock_interaction(channel_id="900") await cmd.callback(interaction) msg = interaction.response.send_message.call_args text = msg.args[0] assert "model reset" in text.lower() assert "sonnet" in text.lower() # --- /model slash command --- class TestModelSlashCommand: @pytest.mark.asyncio @patch("src.adapters.discord_bot.get_active_session") async def test_model_no_args_shows_current_with_session(self, mock_get, owned_bot): mock_get.return_value = {"model": "opus"} cmd = _find_command(owned_bot.tree, "model") interaction = _mock_interaction(channel_id="900") await cmd.callback(interaction, choice=None) msg = interaction.response.send_message.call_args text = msg.args[0] assert "opus" in text.lower() assert "haiku" in text assert "sonnet" in text assert msg.kwargs.get("ephemeral") is True @pytest.mark.asyncio @patch("src.adapters.discord_bot.get_active_session") async def test_model_no_args_shows_default_without_session(self, mock_get, owned_bot): mock_get.return_value = None cmd = _find_command(owned_bot.tree, "model") interaction = _mock_interaction(channel_id="900") await cmd.callback(interaction, choice=None) msg = interaction.response.send_message.call_args text = msg.args[0] assert "sonnet" in text # default from config @pytest.mark.asyncio @patch("src.adapters.discord_bot.set_session_model") @patch("src.adapters.discord_bot.get_active_session") async def test_model_with_choice_changes_existing_session(self, mock_get, mock_set, owned_bot): mock_get.return_value = {"model": "sonnet", "session_id": "abc"} cmd = _find_command(owned_bot.tree, "model") interaction = _mock_interaction(channel_id="900") choice = MagicMock() choice.value = "opus" await cmd.callback(interaction, choice=choice) mock_set.assert_called_once_with("900", "opus") msg = interaction.response.send_message.call_args assert "opus" in msg.args[0] @pytest.mark.asyncio @patch("src.claude_session._save_sessions") @patch("src.claude_session._load_sessions") @patch("src.adapters.discord_bot.get_active_session") async def test_model_with_choice_presets_when_no_session(self, mock_get, mock_load, mock_save, owned_bot): mock_get.return_value = None mock_load.return_value = {} cmd = _find_command(owned_bot.tree, "model") interaction = _mock_interaction(channel_id="900") choice = MagicMock() choice.value = "haiku" await cmd.callback(interaction, choice=choice) mock_save.assert_called_once() saved_data = mock_save.call_args[0][0] assert saved_data["900"]["model"] == "haiku" assert saved_data["900"]["session_id"] == "" msg = interaction.response.send_message.call_args assert "haiku" in msg.args[0] # --- /restart slash command --- class TestRestartSlashCommand: @pytest.mark.asyncio async def test_restart_owner_succeeds(self, owned_bot, tmp_path): pid_file = tmp_path / "echo-core.pid" pid_file.write_text("12345") with patch.object(discord_bot, "PROJECT_ROOT", tmp_path), \ patch("os.kill") as mock_kill: cmd = _find_command(owned_bot.tree, "restart") interaction = _mock_interaction(user_id="111") await cmd.callback(interaction) mock_kill.assert_called_once_with(12345, signal.SIGTERM) msg = interaction.response.send_message.call_args assert "restarting" in msg.args[0].lower() @pytest.mark.asyncio async def test_restart_non_owner_rejected(self, owned_bot): cmd = _find_command(owned_bot.tree, "restart") interaction = _mock_interaction(user_id="999") await cmd.callback(interaction) msg = interaction.response.send_message.call_args assert "owner only" in msg.args[0].lower() @pytest.mark.asyncio async def test_restart_no_pid_file(self, owned_bot, tmp_path): with patch.object(discord_bot, "PROJECT_ROOT", tmp_path): cmd = _find_command(owned_bot.tree, "restart") interaction = _mock_interaction(user_id="111") await cmd.callback(interaction) msg = interaction.response.send_message.call_args assert "no pid file" in msg.args[0].lower() # --- /logs slash command --- class TestLogsSlashCommand: @pytest.mark.asyncio async def test_logs_returns_code_block(self, owned_bot, tmp_path): log_dir = tmp_path / "logs" log_dir.mkdir() log_file = log_dir / "echo-core.log" log_file.write_text("line1\nline2\nline3\n") with patch.object(discord_bot, "PROJECT_ROOT", tmp_path): cmd = _find_command(owned_bot.tree, "logs") interaction = _mock_interaction() await cmd.callback(interaction, n=10) msg = interaction.response.send_message.call_args text = msg.args[0] assert "```" in text assert "line1" in text assert "line3" in text @pytest.mark.asyncio async def test_logs_no_file(self, owned_bot, tmp_path): with patch.object(discord_bot, "PROJECT_ROOT", tmp_path): cmd = _find_command(owned_bot.tree, "logs") interaction = _mock_interaction() await cmd.callback(interaction, n=10) msg = interaction.response.send_message.call_args assert "no log file" in msg.args[0].lower()