stage-4: discord bot minimal with channel/admin management
Discord.py bot with slash commands (/ping, /help, /setup, /channel, /admin), PID file, graceful shutdown. 30 new tests (119 total). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
198
src/adapters/discord_bot.py
Normal file
198
src/adapters/discord_bot.py
Normal file
@@ -0,0 +1,198 @@
|
||||
"""Discord bot adapter — slash commands and event handlers."""
|
||||
|
||||
import logging
|
||||
|
||||
import discord
|
||||
from discord import app_commands
|
||||
|
||||
from src.config import Config
|
||||
|
||||
logger = logging.getLogger("echo-core.discord")
|
||||
|
||||
# Module-level config reference, set by create_bot()
|
||||
_config: Config | None = None
|
||||
|
||||
|
||||
def _get_config() -> Config:
|
||||
"""Return the module-level config, raising if not initialized."""
|
||||
if _config is None:
|
||||
raise RuntimeError("Bot not initialized — call create_bot() first")
|
||||
return _config
|
||||
|
||||
|
||||
# --- Authorization helpers ---
|
||||
|
||||
|
||||
def is_owner(user_id: str) -> bool:
|
||||
"""Check if user_id matches config bot.owner."""
|
||||
return _get_config().get("bot.owner") == user_id
|
||||
|
||||
|
||||
def is_admin(user_id: str) -> bool:
|
||||
"""Check if user_id is owner or in admins list."""
|
||||
if is_owner(user_id):
|
||||
return True
|
||||
admins = _get_config().get("bot.admins", [])
|
||||
return user_id in admins
|
||||
|
||||
|
||||
def is_registered_channel(channel_id: str) -> bool:
|
||||
"""Check if channel_id is in any registered channel entry."""
|
||||
channels = _get_config().get("channels", {})
|
||||
return any(ch.get("id") == channel_id for ch in channels.values())
|
||||
|
||||
|
||||
# --- Factory ---
|
||||
|
||||
|
||||
def create_bot(config: Config) -> discord.Client:
|
||||
"""Create and configure the Discord bot with all slash commands."""
|
||||
global _config
|
||||
_config = config
|
||||
|
||||
intents = discord.Intents.default()
|
||||
intents.message_content = True
|
||||
|
||||
client = discord.Client(intents=intents)
|
||||
tree = app_commands.CommandTree(client)
|
||||
client.tree = tree # type: ignore[attr-defined]
|
||||
|
||||
# --- Slash commands ---
|
||||
|
||||
@tree.command(name="ping", description="Check bot latency")
|
||||
async def ping(interaction: discord.Interaction) -> None:
|
||||
latency_ms = round(client.latency * 1000)
|
||||
await interaction.response.send_message(
|
||||
f"Pong! Latency: {latency_ms}ms", ephemeral=True
|
||||
)
|
||||
|
||||
@tree.command(name="help", description="List available commands")
|
||||
async def help_cmd(interaction: discord.Interaction) -> None:
|
||||
lines = [
|
||||
"**Echo Commands**",
|
||||
"`/ping` — Check bot latency",
|
||||
"`/help` — Show this help message",
|
||||
"`/setup` — Claim ownership of the bot (first run only)",
|
||||
"`/channel add <alias>` — Register current channel (owner only)",
|
||||
"`/channels` — List registered channels",
|
||||
"`/admin add <user_id>` — Add an admin (owner only)",
|
||||
]
|
||||
await interaction.response.send_message(
|
||||
"\n".join(lines), ephemeral=True
|
||||
)
|
||||
|
||||
@tree.command(name="setup", description="Claim ownership of the bot")
|
||||
async def setup(interaction: discord.Interaction) -> None:
|
||||
if config.get("bot.owner") is not None:
|
||||
await interaction.response.send_message(
|
||||
"Owner already set.", ephemeral=True
|
||||
)
|
||||
return
|
||||
config.set("bot.owner", str(interaction.user.id))
|
||||
config.save()
|
||||
await interaction.response.send_message(
|
||||
"You are now the owner of Echo.", ephemeral=True
|
||||
)
|
||||
|
||||
channel_group = app_commands.Group(
|
||||
name="channel", description="Channel management"
|
||||
)
|
||||
|
||||
@channel_group.command(name="add", description="Register current channel")
|
||||
@app_commands.describe(alias="Short name for this channel")
|
||||
async def channel_add(
|
||||
interaction: discord.Interaction, alias: str
|
||||
) -> None:
|
||||
if not is_owner(str(interaction.user.id)):
|
||||
await interaction.response.send_message(
|
||||
"Owner only.", ephemeral=True
|
||||
)
|
||||
return
|
||||
config.set(
|
||||
f"channels.{alias}",
|
||||
{"id": str(interaction.channel_id), "default_model": "sonnet"},
|
||||
)
|
||||
config.save()
|
||||
await interaction.response.send_message(
|
||||
f"Channel registered as '{alias}'.", ephemeral=True
|
||||
)
|
||||
|
||||
tree.add_command(channel_group)
|
||||
|
||||
admin_group = app_commands.Group(
|
||||
name="admin", description="Admin management"
|
||||
)
|
||||
|
||||
@admin_group.command(name="add", description="Add an admin user")
|
||||
@app_commands.describe(user_id="Discord user ID to add as admin")
|
||||
async def admin_add(
|
||||
interaction: discord.Interaction, user_id: str
|
||||
) -> None:
|
||||
if not is_owner(str(interaction.user.id)):
|
||||
await interaction.response.send_message(
|
||||
"Owner only.", ephemeral=True
|
||||
)
|
||||
return
|
||||
admins = config.get("bot.admins", [])
|
||||
if user_id not in admins:
|
||||
admins.append(user_id)
|
||||
config.set("bot.admins", admins)
|
||||
config.save()
|
||||
await interaction.response.send_message(
|
||||
f"User {user_id} added as admin.", ephemeral=True
|
||||
)
|
||||
|
||||
tree.add_command(admin_group)
|
||||
|
||||
@tree.command(name="channels", description="List registered channels")
|
||||
async def channels(interaction: discord.Interaction) -> None:
|
||||
ch_map = config.get("channels", {})
|
||||
if not ch_map:
|
||||
await interaction.response.send_message(
|
||||
"No channels registered yet.", ephemeral=True
|
||||
)
|
||||
return
|
||||
lines = []
|
||||
for alias, info in ch_map.items():
|
||||
cid = info.get("id", "?")
|
||||
model = info.get("default_model", "?")
|
||||
lines.append(f"\u2022 {alias} \u2192 <#{cid}> (model: {model})")
|
||||
await interaction.response.send_message(
|
||||
"\n".join(lines), ephemeral=True
|
||||
)
|
||||
|
||||
# --- Events ---
|
||||
|
||||
@client.event
|
||||
async def on_ready() -> None:
|
||||
await tree.sync()
|
||||
logger.info("Echo Core online as %s", client.user)
|
||||
|
||||
@client.event
|
||||
async def on_message(message: discord.Message) -> None:
|
||||
# Ignore bot's own messages
|
||||
if message.author == client.user:
|
||||
return
|
||||
|
||||
# DM handling: ignore if sender not admin
|
||||
if isinstance(message.channel, discord.DMChannel):
|
||||
if not is_admin(str(message.author.id)):
|
||||
return
|
||||
logger.info(
|
||||
"DM from admin %s: %s", message.author, message.content[:100]
|
||||
)
|
||||
return # Stage 5 will add chat integration
|
||||
|
||||
# Guild messages: ignore if channel not registered
|
||||
if not is_registered_channel(str(message.channel.id)):
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Message in registered channel %s from %s: %s",
|
||||
message.channel,
|
||||
message.author,
|
||||
message.content[:100],
|
||||
)
|
||||
# Stage 5 will add chat integration here
|
||||
|
||||
return client
|
||||
69
src/main.py
Normal file
69
src/main.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Echo Core — main entry point."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from src.config import load_config
|
||||
from src.secrets import get_secret
|
||||
from src.adapters.discord_bot import create_bot
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
||||
PID_FILE = PROJECT_ROOT / "echo-core.pid"
|
||||
LOG_DIR = PROJECT_ROOT / "logs"
|
||||
|
||||
|
||||
def setup_logging():
|
||||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler(LOG_DIR / "echo-core.log"),
|
||||
logging.StreamHandler(sys.stderr),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
setup_logging()
|
||||
logger = logging.getLogger("echo-core")
|
||||
|
||||
token = get_secret("discord_token")
|
||||
if not token:
|
||||
logger.error(
|
||||
"discord_token not found in keyring. "
|
||||
"Run: python cli.py secrets set discord_token"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
config = load_config()
|
||||
client = create_bot(config)
|
||||
|
||||
# PID file
|
||||
PID_FILE.write_text(str(os.getpid()))
|
||||
|
||||
# Signal handlers for graceful shutdown
|
||||
loop = asyncio.new_event_loop()
|
||||
|
||||
def handle_signal(sig, frame):
|
||||
logger.info("Received signal %s, shutting down...", sig)
|
||||
loop.create_task(client.close())
|
||||
|
||||
signal.signal(signal.SIGTERM, handle_signal)
|
||||
signal.signal(signal.SIGINT, handle_signal)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(client.start(token))
|
||||
except KeyboardInterrupt:
|
||||
loop.run_until_complete(client.close())
|
||||
finally:
|
||||
PID_FILE.unlink(missing_ok=True)
|
||||
logger.info("Echo Core shut down.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
392
tests/test_discord.py
Normal file
392
tests/test_discord.py
Normal file
@@ -0,0 +1,392 @@
|
||||
"""Tests for src/adapters/discord_bot.py — Discord bot adapter."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import discord
|
||||
|
||||
from src.config import Config
|
||||
from src.adapters import discord_bot
|
||||
from src.adapters.discord_bot import (
|
||||
create_bot,
|
||||
is_admin,
|
||||
is_owner,
|
||||
is_registered_channel,
|
||||
)
|
||||
|
||||
|
||||
# --- Fixtures ---
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_config(tmp_path):
|
||||
"""Create a Config backed by a temp file with default data."""
|
||||
data = {
|
||||
"bot": {
|
||||
"name": "Echo",
|
||||
"default_model": "sonnet",
|
||||
"owner": None,
|
||||
"admins": [],
|
||||
},
|
||||
"channels": {},
|
||||
}
|
||||
config_file = tmp_path / "config.json"
|
||||
config_file.write_text(json.dumps(data, indent=2))
|
||||
return Config(config_file)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def owned_config(tmp_path):
|
||||
"""Config with owner already set."""
|
||||
data = {
|
||||
"bot": {
|
||||
"name": "Echo",
|
||||
"default_model": "sonnet",
|
||||
"owner": "111",
|
||||
"admins": ["222"],
|
||||
},
|
||||
"channels": {
|
||||
"general": {"id": "900", "default_model": "sonnet"},
|
||||
},
|
||||
}
|
||||
config_file = tmp_path / "config.json"
|
||||
config_file.write_text(json.dumps(data, indent=2))
|
||||
return Config(config_file)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bot(tmp_config):
|
||||
"""Create a bot with fresh (no-owner) config, return the client."""
|
||||
return create_bot(tmp_config)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def owned_bot(owned_config):
|
||||
"""Create a bot with owned config, return the client."""
|
||||
return create_bot(owned_config)
|
||||
|
||||
|
||||
def _mock_interaction(user_id="123", channel_id="456"):
|
||||
"""Create a mock discord.Interaction."""
|
||||
interaction = AsyncMock(spec=discord.Interaction)
|
||||
interaction.user = MagicMock()
|
||||
interaction.user.id = int(user_id)
|
||||
interaction.channel_id = int(channel_id)
|
||||
interaction.response = AsyncMock()
|
||||
interaction.response.send_message = AsyncMock()
|
||||
return interaction
|
||||
|
||||
|
||||
def _find_command(tree, name):
|
||||
"""Find a top-level command on the tree by name."""
|
||||
for cmd in tree.get_commands():
|
||||
if cmd.name == name:
|
||||
return cmd
|
||||
return None
|
||||
|
||||
|
||||
def _find_subcommand(tree, group_name, sub_name):
|
||||
"""Find a subcommand inside a group on the tree."""
|
||||
group = _find_command(tree, group_name)
|
||||
if group is None or not isinstance(group, discord.app_commands.Group):
|
||||
return None
|
||||
for cmd in group.commands:
|
||||
if cmd.name == sub_name:
|
||||
return cmd
|
||||
return None
|
||||
|
||||
|
||||
# --- Authorization helpers ---
|
||||
|
||||
|
||||
class TestIsOwner:
|
||||
def test_is_owner_true(self, owned_bot, owned_config):
|
||||
assert is_owner("111") is True
|
||||
|
||||
def test_is_owner_false(self, owned_bot, owned_config):
|
||||
assert is_owner("999") is False
|
||||
|
||||
def test_is_owner_none_owner(self, bot, tmp_config):
|
||||
assert is_owner("123") is False
|
||||
|
||||
|
||||
class TestIsAdmin:
|
||||
def test_is_admin_owner_is_admin(self, owned_bot):
|
||||
assert is_admin("111") is True
|
||||
|
||||
def test_is_admin_listed(self, owned_bot):
|
||||
assert is_admin("222") is True
|
||||
|
||||
def test_is_admin_not_listed(self, owned_bot):
|
||||
assert is_admin("999") is False
|
||||
|
||||
|
||||
class TestIsRegisteredChannel:
|
||||
def test_is_registered_channel_true(self, owned_bot):
|
||||
assert is_registered_channel("900") is True
|
||||
|
||||
def test_is_registered_channel_false(self, owned_bot):
|
||||
assert is_registered_channel("000") is False
|
||||
|
||||
def test_is_registered_channel_empty(self, bot):
|
||||
assert is_registered_channel("900") is False
|
||||
|
||||
|
||||
# --- create_bot ---
|
||||
|
||||
|
||||
class TestCreateBot:
|
||||
def test_create_bot_returns_client(self, bot):
|
||||
assert isinstance(bot, discord.Client)
|
||||
|
||||
def test_create_bot_has_tree(self, bot):
|
||||
assert hasattr(bot, "tree")
|
||||
assert isinstance(bot.tree, discord.app_commands.CommandTree)
|
||||
|
||||
def test_create_bot_intents_message_content(self, bot):
|
||||
assert bot.intents.message_content is True
|
||||
|
||||
|
||||
# --- Slash commands ---
|
||||
|
||||
|
||||
class TestPing:
|
||||
@pytest.mark.asyncio
|
||||
async def test_ping_responds(self, bot):
|
||||
cmd = _find_command(bot.tree, "ping")
|
||||
assert cmd is not None
|
||||
|
||||
interaction = _mock_interaction()
|
||||
with patch.object(type(bot), "latency", new_callable=lambda: property(lambda self: 0.042)):
|
||||
await cmd.callback(interaction)
|
||||
|
||||
interaction.response.send_message.assert_awaited_once()
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "Pong!" in msg.args[0] or "Pong!" in msg.kwargs.get("content", msg.args[0] if msg.args else "")
|
||||
assert msg.kwargs.get("ephemeral") is True
|
||||
|
||||
|
||||
class TestHelp:
|
||||
@pytest.mark.asyncio
|
||||
async def test_help_responds_with_commands(self, bot):
|
||||
cmd = _find_command(bot.tree, "help")
|
||||
assert cmd is not None
|
||||
|
||||
interaction = _mock_interaction()
|
||||
await cmd.callback(interaction)
|
||||
|
||||
interaction.response.send_message.assert_awaited_once()
|
||||
msg_text = interaction.response.send_message.call_args.args[0]
|
||||
assert "/ping" in msg_text
|
||||
assert "/help" in msg_text
|
||||
assert "/setup" in msg_text
|
||||
assert msg_text # non-empty
|
||||
|
||||
|
||||
class TestSetup:
|
||||
@pytest.mark.asyncio
|
||||
async def test_setup_sets_owner(self, bot, tmp_config):
|
||||
cmd = _find_command(bot.tree, "setup")
|
||||
interaction = _mock_interaction(user_id="123")
|
||||
await cmd.callback(interaction)
|
||||
|
||||
interaction.response.send_message.assert_awaited_once()
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "owner of Echo" in msg.args[0]
|
||||
assert tmp_config.get("bot.owner") == "123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_setup_already_set(self, owned_bot, owned_config):
|
||||
cmd = _find_command(owned_bot.tree, "setup")
|
||||
interaction = _mock_interaction(user_id="999")
|
||||
await cmd.callback(interaction)
|
||||
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "already set" in msg.args[0].lower()
|
||||
|
||||
|
||||
class TestChannelAdd:
|
||||
@pytest.mark.asyncio
|
||||
async def test_channel_add_as_owner(self, owned_bot, owned_config):
|
||||
cmd = _find_subcommand(owned_bot.tree, "channel", "add")
|
||||
assert cmd is not None
|
||||
|
||||
interaction = _mock_interaction(user_id="111", channel_id="456")
|
||||
await cmd.callback(interaction, alias="work")
|
||||
|
||||
interaction.response.send_message.assert_awaited_once()
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "work" in msg.args[0]
|
||||
assert owned_config.get("channels.work") is not None
|
||||
assert owned_config.get("channels.work")["id"] == "456"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_channel_add_not_owner(self, owned_bot, owned_config):
|
||||
cmd = _find_subcommand(owned_bot.tree, "channel", "add")
|
||||
interaction = _mock_interaction(user_id="999", channel_id="456")
|
||||
await cmd.callback(interaction, alias="work")
|
||||
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "owner only" in msg.args[0].lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_channel_add_saves_to_config(self, owned_bot, owned_config):
|
||||
cmd = _find_subcommand(owned_bot.tree, "channel", "add")
|
||||
interaction = _mock_interaction(user_id="111", channel_id="789")
|
||||
await cmd.callback(interaction, alias="dev")
|
||||
|
||||
# Reload config from disk to verify save() was called
|
||||
owned_config.reload()
|
||||
assert owned_config.get("channels.dev")["id"] == "789"
|
||||
assert owned_config.get("channels.dev")["default_model"] == "sonnet"
|
||||
|
||||
|
||||
class TestChannels:
|
||||
@pytest.mark.asyncio
|
||||
async def test_channels_lists(self, owned_bot, owned_config):
|
||||
cmd = _find_command(owned_bot.tree, "channels")
|
||||
interaction = _mock_interaction()
|
||||
await cmd.callback(interaction)
|
||||
|
||||
msg = interaction.response.send_message.call_args
|
||||
text = msg.args[0]
|
||||
assert "general" in text
|
||||
assert "900" in text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_channels_empty(self, bot, tmp_config):
|
||||
cmd = _find_command(bot.tree, "channels")
|
||||
interaction = _mock_interaction()
|
||||
await cmd.callback(interaction)
|
||||
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "no channels" in msg.args[0].lower()
|
||||
|
||||
|
||||
class TestAdminAdd:
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_add_as_owner(self, owned_bot, owned_config):
|
||||
cmd = _find_subcommand(owned_bot.tree, "admin", "add")
|
||||
assert cmd is not None
|
||||
|
||||
interaction = _mock_interaction(user_id="111")
|
||||
await cmd.callback(interaction, user_id="333")
|
||||
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "333" in msg.args[0]
|
||||
assert "333" in owned_config.get("bot.admins")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_add_not_owner(self, owned_bot, owned_config):
|
||||
cmd = _find_subcommand(owned_bot.tree, "admin", "add")
|
||||
interaction = _mock_interaction(user_id="999")
|
||||
await cmd.callback(interaction, user_id="333")
|
||||
|
||||
msg = interaction.response.send_message.call_args
|
||||
assert "owner only" in msg.args[0].lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_add_idempotent(self, owned_bot, owned_config):
|
||||
cmd = _find_subcommand(owned_bot.tree, "admin", "add")
|
||||
interaction = _mock_interaction(user_id="111")
|
||||
# Add "222" which is already in admins list
|
||||
await cmd.callback(interaction, user_id="222")
|
||||
|
||||
admins = owned_config.get("bot.admins")
|
||||
assert admins.count("222") == 1 # no duplicates
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_add_saves_to_config(self, owned_bot, owned_config):
|
||||
cmd = _find_subcommand(owned_bot.tree, "admin", "add")
|
||||
interaction = _mock_interaction(user_id="111")
|
||||
await cmd.callback(interaction, user_id="444")
|
||||
|
||||
owned_config.reload()
|
||||
assert "444" in owned_config.get("bot.admins")
|
||||
|
||||
|
||||
# --- on_message ---
|
||||
|
||||
|
||||
class TestOnMessage:
|
||||
def _get_on_message(self, client):
|
||||
"""Extract the on_message handler from the client's event listeners."""
|
||||
# discord.py Client stores overridden events; we can call it directly
|
||||
# The on_message is registered via @client.event decorator
|
||||
return client.on_message
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ignores_own_messages(self, owned_bot):
|
||||
on_message = self._get_on_message(owned_bot)
|
||||
message = MagicMock(spec=discord.Message)
|
||||
message.author = owned_bot.user # bot's own user
|
||||
message.channel = MagicMock()
|
||||
|
||||
# Should return without logging (no error)
|
||||
await on_message(message)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ignores_unregistered_channel(self, owned_bot, caplog):
|
||||
on_message = self._get_on_message(owned_bot)
|
||||
message = MagicMock(spec=discord.Message)
|
||||
message.author = MagicMock()
|
||||
message.author.id = 999
|
||||
message.author != owned_bot.user # not the bot
|
||||
# Make sure author comparison returns False
|
||||
message.author.__eq__ = lambda self, other: False
|
||||
message.channel = MagicMock(spec=discord.TextChannel)
|
||||
message.channel.id = 12345 # not registered
|
||||
message.content = "hello"
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="echo-core.discord"):
|
||||
await on_message(message)
|
||||
|
||||
assert "registered channel" not in caplog.text.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_logs_registered_channel(self, owned_bot, caplog):
|
||||
on_message = self._get_on_message(owned_bot)
|
||||
message = MagicMock(spec=discord.Message)
|
||||
message.author = MagicMock()
|
||||
message.author.id = 555
|
||||
message.author.__eq__ = lambda self, other: False
|
||||
message.channel = MagicMock(spec=discord.TextChannel)
|
||||
message.channel.id = 900 # registered channel
|
||||
message.content = "hello world"
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="echo-core.discord"):
|
||||
await on_message(message)
|
||||
|
||||
assert "registered channel" in caplog.text.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ignores_non_admin_dm(self, owned_bot, caplog):
|
||||
on_message = self._get_on_message(owned_bot)
|
||||
message = MagicMock(spec=discord.Message)
|
||||
message.author = MagicMock()
|
||||
message.author.id = 999 # not admin
|
||||
message.author.__eq__ = lambda self, other: False
|
||||
message.channel = MagicMock(spec=discord.DMChannel)
|
||||
message.content = "hello"
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="echo-core.discord"):
|
||||
await on_message(message)
|
||||
|
||||
assert "dm from admin" not in caplog.text.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_logs_admin_dm(self, owned_bot, caplog):
|
||||
on_message = self._get_on_message(owned_bot)
|
||||
message = MagicMock(spec=discord.Message)
|
||||
message.author = MagicMock()
|
||||
message.author.id = 222 # in admins list
|
||||
message.author.__eq__ = lambda self, other: False
|
||||
message.channel = MagicMock(spec=discord.DMChannel)
|
||||
message.content = "admin message"
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="echo-core.discord"):
|
||||
await on_message(message)
|
||||
|
||||
assert "dm from admin" in caplog.text.lower()
|
||||
Reference in New Issue
Block a user