- New src/adapters/telegram_bot.py: full Telegram adapter with python-telegram-bot v22 - Commands: /start, /help, /clear, /status, /model, /register - Inline keyboards for model selection - Message routing through existing router.py - Private chat: admin-only access - Group chat: responds to @mentions and replies to bot - Security logging for unauthorized access attempts - Message splitting for 4096 char limit - Updated main.py: runs Discord + Telegram bots concurrently - Telegram is optional (gracefully skipped if no telegram_token) - Updated requirements.txt: added python-telegram-bot>=21.0 - Updated config.json: added telegram_channels section - Updated cli.py doctor: telegram token check (optional) - 37 new tests (410 total, zero failures) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
433 lines
14 KiB
Python
433 lines
14 KiB
Python
"""Tests for src/adapters/telegram_bot.py — Telegram bot adapter."""
|
|
|
|
import json
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from src.config import Config
|
|
from src.adapters import telegram_bot
|
|
from src.adapters.telegram_bot import (
|
|
create_telegram_bot,
|
|
is_admin,
|
|
is_owner,
|
|
is_registered_chat,
|
|
split_message,
|
|
cmd_start,
|
|
cmd_help,
|
|
cmd_clear,
|
|
cmd_status,
|
|
cmd_model,
|
|
cmd_register,
|
|
callback_model,
|
|
handle_message,
|
|
)
|
|
|
|
|
|
# --- Fixtures ---
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_config(tmp_path):
|
|
"""Create a Config backed by a temp file with default data."""
|
|
data = {
|
|
"bot": {
|
|
"name": "Echo",
|
|
"default_model": "sonnet",
|
|
"owner": None,
|
|
"admins": [],
|
|
},
|
|
"channels": {},
|
|
"telegram_channels": {},
|
|
}
|
|
config_file = tmp_path / "config.json"
|
|
config_file.write_text(json.dumps(data, indent=2))
|
|
return Config(config_file)
|
|
|
|
|
|
@pytest.fixture
|
|
def owned_config(tmp_path):
|
|
"""Config with owner and telegram channels set."""
|
|
data = {
|
|
"bot": {
|
|
"name": "Echo",
|
|
"default_model": "sonnet",
|
|
"owner": "111",
|
|
"admins": ["222"],
|
|
},
|
|
"channels": {},
|
|
"telegram_channels": {
|
|
"general": {"id": "900", "default_model": "sonnet"},
|
|
},
|
|
}
|
|
config_file = tmp_path / "config.json"
|
|
config_file.write_text(json.dumps(data, indent=2))
|
|
return Config(config_file)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _set_config(tmp_config):
|
|
"""Ensure module config is set for each test."""
|
|
telegram_bot._config = tmp_config
|
|
yield
|
|
telegram_bot._config = None
|
|
|
|
|
|
@pytest.fixture
|
|
def _set_owned(owned_config):
|
|
"""Set owned config for specific tests."""
|
|
telegram_bot._config = owned_config
|
|
yield
|
|
telegram_bot._config = None
|
|
|
|
|
|
def _mock_update(user_id=123, chat_id=456, text="hello", chat_type="private", username="testuser"):
|
|
"""Create a mock telegram Update."""
|
|
update = MagicMock()
|
|
update.effective_user = MagicMock()
|
|
update.effective_user.id = user_id
|
|
update.effective_user.username = username
|
|
update.effective_chat = MagicMock()
|
|
update.effective_chat.id = chat_id
|
|
update.effective_chat.type = chat_type
|
|
update.message = MagicMock()
|
|
update.message.text = text
|
|
update.message.reply_text = AsyncMock()
|
|
update.message.reply_to_message = None
|
|
return update
|
|
|
|
|
|
def _mock_context(bot_id=999, bot_username="echo_bot"):
|
|
"""Create a mock context."""
|
|
context = MagicMock()
|
|
context.args = []
|
|
context.bot = MagicMock()
|
|
context.bot.id = bot_id
|
|
context.bot.username = bot_username
|
|
context.bot.send_chat_action = AsyncMock()
|
|
return context
|
|
|
|
|
|
# --- Authorization helpers ---
|
|
|
|
|
|
class TestIsOwner:
|
|
def test_is_owner_true(self, _set_owned):
|
|
assert is_owner(111) is True
|
|
|
|
def test_is_owner_false(self, _set_owned):
|
|
assert is_owner(999) is False
|
|
|
|
def test_is_owner_none_owner(self):
|
|
assert is_owner(123) is False
|
|
|
|
|
|
class TestIsAdmin:
|
|
def test_is_admin_owner_is_admin(self, _set_owned):
|
|
assert is_admin(111) is True
|
|
|
|
def test_is_admin_listed(self, _set_owned):
|
|
assert is_admin(222) is True
|
|
|
|
def test_is_admin_not_listed(self, _set_owned):
|
|
assert is_admin(999) is False
|
|
|
|
|
|
class TestIsRegisteredChat:
|
|
def test_is_registered_true(self, _set_owned):
|
|
assert is_registered_chat(900) is True
|
|
|
|
def test_is_registered_false(self, _set_owned):
|
|
assert is_registered_chat(000) is False
|
|
|
|
def test_is_registered_empty(self):
|
|
assert is_registered_chat(900) is False
|
|
|
|
|
|
# --- split_message ---
|
|
|
|
|
|
class TestSplitMessage:
|
|
def test_short_message_not_split(self):
|
|
assert split_message("hello") == ["hello"]
|
|
|
|
def test_long_message_split(self):
|
|
text = "a" * 8192
|
|
chunks = split_message(text, limit=4096)
|
|
assert len(chunks) == 2
|
|
assert all(len(c) <= 4096 for c in chunks)
|
|
assert "".join(chunks) == text
|
|
|
|
def test_split_at_newline(self):
|
|
text = "line1\n" * 1000
|
|
chunks = split_message(text, limit=100)
|
|
assert all(len(c) <= 100 for c in chunks)
|
|
|
|
def test_empty_string(self):
|
|
assert split_message("") == [""]
|
|
|
|
|
|
# --- Command handlers ---
|
|
|
|
|
|
class TestCmdStart:
|
|
@pytest.mark.asyncio
|
|
async def test_start_responds(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
await cmd_start(update, context)
|
|
update.message.reply_text.assert_called_once()
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "Echo Core" in msg
|
|
|
|
|
|
class TestCmdHelp:
|
|
@pytest.mark.asyncio
|
|
async def test_help_responds(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
await cmd_help(update, context)
|
|
update.message.reply_text.assert_called_once()
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "/clear" in msg
|
|
assert "/model" in msg
|
|
|
|
|
|
class TestCmdClear:
|
|
@pytest.mark.asyncio
|
|
async def test_clear_no_session(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
with patch("src.adapters.telegram_bot.clear_session", return_value=False):
|
|
await cmd_clear(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "No active session" in msg
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clear_with_session(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
with patch("src.adapters.telegram_bot.clear_session", return_value=True):
|
|
await cmd_clear(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "Session cleared" in msg
|
|
|
|
|
|
class TestCmdStatus:
|
|
@pytest.mark.asyncio
|
|
async def test_status_no_session(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
with patch("src.adapters.telegram_bot.get_active_session", return_value=None):
|
|
await cmd_status(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "No active session" in msg
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_with_session(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
session = {
|
|
"model": "opus",
|
|
"session_id": "sess-abc-12345678",
|
|
"message_count": 5,
|
|
"total_input_tokens": 1000,
|
|
"total_output_tokens": 500,
|
|
}
|
|
with patch("src.adapters.telegram_bot.get_active_session", return_value=session):
|
|
await cmd_status(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "opus" in msg
|
|
assert "5" in msg
|
|
|
|
|
|
class TestCmdModel:
|
|
@pytest.mark.asyncio
|
|
async def test_model_with_arg(self, tmp_path, monkeypatch):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
context.args = ["opus"]
|
|
|
|
# Need session dir for _save_sessions
|
|
sessions_dir = tmp_path / "sessions"
|
|
sessions_dir.mkdir()
|
|
from src import claude_session
|
|
monkeypatch.setattr(claude_session, "SESSIONS_DIR", sessions_dir)
|
|
monkeypatch.setattr(claude_session, "_SESSIONS_FILE", sessions_dir / "active.json")
|
|
|
|
with patch("src.adapters.telegram_bot.get_active_session", return_value=None):
|
|
await cmd_model(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "opus" in msg
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_invalid(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
context.args = ["gpt4"]
|
|
await cmd_model(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "Invalid model" in msg
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_model_keyboard(self):
|
|
update = _mock_update()
|
|
context = _mock_context()
|
|
context.args = []
|
|
with patch("src.adapters.telegram_bot.get_active_session", return_value=None):
|
|
await cmd_model(update, context)
|
|
call_kwargs = update.message.reply_text.call_args[1]
|
|
assert "reply_markup" in call_kwargs
|
|
|
|
|
|
class TestCmdRegister:
|
|
@pytest.mark.asyncio
|
|
async def test_register_not_owner(self):
|
|
update = _mock_update(user_id=999)
|
|
context = _mock_context()
|
|
context.args = ["test"]
|
|
await cmd_register(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "Owner only" in msg
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_owner(self, _set_owned):
|
|
update = _mock_update(user_id=111, chat_id=777)
|
|
context = _mock_context()
|
|
context.args = ["mychat"]
|
|
await cmd_register(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "registered" in msg
|
|
assert "mychat" in msg
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_no_args(self, _set_owned):
|
|
update = _mock_update(user_id=111)
|
|
context = _mock_context()
|
|
context.args = []
|
|
await cmd_register(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "Usage" in msg
|
|
|
|
|
|
# --- Message handler ---
|
|
|
|
|
|
class TestHandleMessage:
|
|
@pytest.mark.asyncio
|
|
async def test_private_chat_admin(self, _set_owned):
|
|
update = _mock_update(user_id=111, chat_type="private", text="Hello Claude")
|
|
context = _mock_context()
|
|
with patch("src.adapters.telegram_bot.route_message", return_value=("Hi!", False)) as mock_route:
|
|
await handle_message(update, context)
|
|
mock_route.assert_called_once()
|
|
update.message.reply_text.assert_called_with("Hi!")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_private_chat_unauthorized(self, _set_owned):
|
|
update = _mock_update(user_id=999, chat_type="private", text="Hello")
|
|
context = _mock_context()
|
|
with patch("src.adapters.telegram_bot.route_message") as mock_route:
|
|
await handle_message(update, context)
|
|
mock_route.assert_not_called()
|
|
update.message.reply_text.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_chat_unregistered(self, _set_owned):
|
|
update = _mock_update(user_id=111, chat_id=999, chat_type="supergroup", text="Hello")
|
|
context = _mock_context()
|
|
with patch("src.adapters.telegram_bot.route_message") as mock_route:
|
|
await handle_message(update, context)
|
|
mock_route.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_chat_registered_mention(self, _set_owned):
|
|
update = _mock_update(
|
|
user_id=111, chat_id=900, chat_type="supergroup",
|
|
text="@echo_bot what is the weather?"
|
|
)
|
|
context = _mock_context(bot_username="echo_bot")
|
|
with patch("src.adapters.telegram_bot.route_message", return_value=("Sunny!", False)):
|
|
await handle_message(update, context)
|
|
update.message.reply_text.assert_called_with("Sunny!")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_chat_registered_no_mention(self, _set_owned):
|
|
update = _mock_update(
|
|
user_id=111, chat_id=900, chat_type="supergroup",
|
|
text="just chatting"
|
|
)
|
|
context = _mock_context(bot_username="echo_bot")
|
|
with patch("src.adapters.telegram_bot.route_message") as mock_route:
|
|
await handle_message(update, context)
|
|
mock_route.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_chat_reply_to_bot(self, _set_owned):
|
|
update = _mock_update(
|
|
user_id=111, chat_id=900, chat_type="supergroup",
|
|
text="follow up"
|
|
)
|
|
# Set up reply-to-bot
|
|
update.message.reply_to_message = MagicMock()
|
|
update.message.reply_to_message.from_user = MagicMock()
|
|
update.message.reply_to_message.from_user.id = 999 # bot id
|
|
context = _mock_context(bot_id=999)
|
|
with patch("src.adapters.telegram_bot.route_message", return_value=("Response", False)):
|
|
await handle_message(update, context)
|
|
update.message.reply_text.assert_called_with("Response")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_long_response_split(self, _set_owned):
|
|
update = _mock_update(user_id=111, chat_type="private", text="Hello")
|
|
context = _mock_context()
|
|
long_response = "x" * 8000
|
|
with patch("src.adapters.telegram_bot.route_message", return_value=(long_response, False)):
|
|
await handle_message(update, context)
|
|
assert update.message.reply_text.call_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_handling(self, _set_owned):
|
|
update = _mock_update(user_id=111, chat_type="private", text="Hello")
|
|
context = _mock_context()
|
|
with patch("src.adapters.telegram_bot.route_message", side_effect=Exception("boom")):
|
|
await handle_message(update, context)
|
|
msg = update.message.reply_text.call_args[0][0]
|
|
assert "Sorry" in msg
|
|
|
|
|
|
# --- Security logging ---
|
|
|
|
|
|
class TestSecurityLogging:
|
|
@pytest.mark.asyncio
|
|
async def test_unauthorized_dm_logged(self, _set_owned):
|
|
update = _mock_update(user_id=999, chat_type="private", text="hack attempt")
|
|
context = _mock_context()
|
|
with patch.object(telegram_bot._security_log, "warning") as mock_log:
|
|
await handle_message(update, context)
|
|
mock_log.assert_called_once()
|
|
assert "Unauthorized" in mock_log.call_args[0][0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unauthorized_register_logged(self):
|
|
update = _mock_update(user_id=999)
|
|
context = _mock_context()
|
|
context.args = ["test"]
|
|
with patch.object(telegram_bot._security_log, "warning") as mock_log:
|
|
await cmd_register(update, context)
|
|
mock_log.assert_called_once()
|
|
|
|
|
|
# --- Factory ---
|
|
|
|
|
|
class TestCreateTelegramBot:
|
|
def test_creates_application(self, tmp_config):
|
|
from telegram.ext import Application
|
|
app = create_telegram_bot(tmp_config, "fake-token-123")
|
|
assert isinstance(app, Application)
|
|
|
|
def test_sets_config(self, tmp_config):
|
|
create_telegram_bot(tmp_config, "fake-token-123")
|
|
assert telegram_bot._config is tmp_config
|