stage-12: Telegram bot adapter
- New src/adapters/telegram_bot.py: full Telegram adapter with python-telegram-bot v22 - Commands: /start, /help, /clear, /status, /model, /register - Inline keyboards for model selection - Message routing through existing router.py - Private chat: admin-only access - Group chat: responds to @mentions and replies to bot - Security logging for unauthorized access attempts - Message splitting for 4096 char limit - Updated main.py: runs Discord + Telegram bots concurrently - Telegram is optional (gracefully skipped if no telegram_token) - Updated requirements.txt: added python-telegram-bot>=21.0 - Updated config.json: added telegram_channels section - Updated cli.py doctor: telegram token check (optional) - 37 new tests (410 total, zero failures) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
9
cli.py
9
cli.py
@@ -174,7 +174,14 @@ def cmd_doctor(args):
|
||||
except Exception:
|
||||
checks.append(("Ollama reachable", False))
|
||||
|
||||
# 10. Discord connection (bot PID running)
|
||||
# 10. Telegram token (optional)
|
||||
tg_token = get_secret("telegram_token")
|
||||
if tg_token:
|
||||
checks.append(("Telegram token in keyring", True))
|
||||
else:
|
||||
checks.append(("Telegram token (optional)", True)) # not required
|
||||
|
||||
# 11. Discord connection (bot PID running)
|
||||
pid_ok = False
|
||||
if PID_FILE.exists():
|
||||
try:
|
||||
|
||||
12
config.json
12
config.json
@@ -1,11 +1,17 @@
|
||||
{
|
||||
"bot": {
|
||||
"name": "Echo",
|
||||
"default_model": "sonnet",
|
||||
"owner": null,
|
||||
"default_model": "opus",
|
||||
"owner": "949388626146517022",
|
||||
"admins": []
|
||||
},
|
||||
"channels": {},
|
||||
"channels": {
|
||||
"echo-core": {
|
||||
"id": "1471916752119009432",
|
||||
"default_model": "opus"
|
||||
}
|
||||
},
|
||||
"telegram_channels": {},
|
||||
"heartbeat": {
|
||||
"enabled": true,
|
||||
"interval_minutes": 30
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
discord.py>=2.3
|
||||
python-telegram-bot>=21.0
|
||||
apscheduler>=3.10
|
||||
keyring>=25.0
|
||||
keyrings.alt>=5.0
|
||||
|
||||
368
src/adapters/telegram_bot.py
Normal file
368
src/adapters/telegram_bot.py
Normal file
@@ -0,0 +1,368 @@
|
||||
"""Telegram bot adapter — commands and message handlers."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.constants import ChatAction, ChatType
|
||||
from telegram.ext import (
|
||||
Application,
|
||||
CallbackQueryHandler,
|
||||
CommandHandler,
|
||||
ContextTypes,
|
||||
MessageHandler,
|
||||
filters,
|
||||
)
|
||||
|
||||
from src.config import Config
|
||||
from src.claude_session import (
|
||||
clear_session,
|
||||
get_active_session,
|
||||
set_session_model,
|
||||
VALID_MODELS,
|
||||
)
|
||||
from src.router import route_message
|
||||
|
||||
logger = logging.getLogger("echo-core.telegram")
|
||||
_security_log = logging.getLogger("echo-core.security")
|
||||
|
||||
# Module-level config reference, set by create_telegram_bot()
|
||||
_config: Config | None = None
|
||||
|
||||
|
||||
def _get_config() -> Config:
|
||||
"""Return the module-level config, raising if not initialized."""
|
||||
if _config is None:
|
||||
raise RuntimeError("Bot not initialized — call create_telegram_bot() first")
|
||||
return _config
|
||||
|
||||
|
||||
# --- Authorization helpers ---
|
||||
|
||||
|
||||
def is_owner(user_id: int) -> bool:
|
||||
"""Check if user_id matches config bot.owner."""
|
||||
owner = _get_config().get("bot.owner")
|
||||
return str(user_id) == str(owner)
|
||||
|
||||
|
||||
def is_admin(user_id: int) -> bool:
|
||||
"""Check if user_id is owner or in admins list."""
|
||||
if is_owner(user_id):
|
||||
return True
|
||||
admins = _get_config().get("bot.admins", [])
|
||||
return str(user_id) in admins
|
||||
|
||||
|
||||
def is_registered_chat(chat_id: int) -> bool:
|
||||
"""Check if Telegram chat_id is in any registered channel entry."""
|
||||
channels = _get_config().get("telegram_channels", {})
|
||||
return any(ch.get("id") == str(chat_id) for ch in channels.values())
|
||||
|
||||
|
||||
def _channel_alias_for_chat(chat_id: int) -> str | None:
|
||||
"""Resolve a Telegram chat ID to its config alias."""
|
||||
channels = _get_config().get("telegram_channels", {})
|
||||
for alias, info in channels.items():
|
||||
if info.get("id") == str(chat_id):
|
||||
return alias
|
||||
return None
|
||||
|
||||
|
||||
# --- Message splitting helper ---
|
||||
|
||||
|
||||
def split_message(text: str, limit: int = 4096) -> list[str]:
|
||||
"""Split text into chunks that fit Telegram's message limit."""
|
||||
if len(text) <= limit:
|
||||
return [text]
|
||||
|
||||
chunks = []
|
||||
while text:
|
||||
if len(text) <= limit:
|
||||
chunks.append(text)
|
||||
break
|
||||
split_at = text.rfind("\n", 0, limit)
|
||||
if split_at == -1:
|
||||
split_at = limit
|
||||
chunks.append(text[:split_at])
|
||||
text = text[split_at:].lstrip("\n")
|
||||
return chunks
|
||||
|
||||
|
||||
# --- Command handlers ---
|
||||
|
||||
|
||||
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /start — welcome message."""
|
||||
await update.message.reply_text(
|
||||
"Echo Core — Telegram adapter.\n"
|
||||
"Send a message to chat with Claude.\n"
|
||||
"Use /help for available commands."
|
||||
)
|
||||
|
||||
|
||||
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /help — list commands."""
|
||||
lines = [
|
||||
"*Echo Commands*",
|
||||
"/start — Welcome message",
|
||||
"/help — Show this help",
|
||||
"/clear — Clear the session for this chat",
|
||||
"/status — Show session status",
|
||||
"/model — View/change AI model",
|
||||
"/register <alias> — Register this chat (owner only)",
|
||||
]
|
||||
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
||||
|
||||
|
||||
async def cmd_clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /clear — clear session for this chat."""
|
||||
chat_id = str(update.effective_chat.id)
|
||||
default_model = _get_config().get("bot.default_model", "sonnet")
|
||||
removed = clear_session(chat_id)
|
||||
if removed:
|
||||
await update.message.reply_text(
|
||||
f"Session cleared. Model reset to {default_model}."
|
||||
)
|
||||
else:
|
||||
await update.message.reply_text("No active session for this chat.")
|
||||
|
||||
|
||||
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /status — show session status."""
|
||||
chat_id = str(update.effective_chat.id)
|
||||
session = get_active_session(chat_id)
|
||||
if not session:
|
||||
await update.message.reply_text("No active session.")
|
||||
return
|
||||
|
||||
model = session.get("model", "?")
|
||||
sid = session.get("session_id", "?")[:8]
|
||||
count = session.get("message_count", 0)
|
||||
in_tok = session.get("total_input_tokens", 0)
|
||||
out_tok = session.get("total_output_tokens", 0)
|
||||
|
||||
await update.message.reply_text(
|
||||
f"Model: {model}\n"
|
||||
f"Session: {sid}\n"
|
||||
f"Messages: {count}\n"
|
||||
f"Tokens: {in_tok} in / {out_tok} out"
|
||||
)
|
||||
|
||||
|
||||
async def cmd_model(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /model — view or change model with inline keyboard."""
|
||||
chat_id = str(update.effective_chat.id)
|
||||
args = context.args
|
||||
|
||||
if args:
|
||||
# /model opus — change model directly
|
||||
choice = args[0].lower()
|
||||
if choice not in VALID_MODELS:
|
||||
await update.message.reply_text(
|
||||
f"Invalid model '{choice}'. Choose from: {', '.join(sorted(VALID_MODELS))}"
|
||||
)
|
||||
return
|
||||
|
||||
session = get_active_session(chat_id)
|
||||
if session:
|
||||
set_session_model(chat_id, choice)
|
||||
else:
|
||||
from src.claude_session import _load_sessions, _save_sessions
|
||||
from datetime import datetime, timezone
|
||||
|
||||
sessions = _load_sessions()
|
||||
sessions[chat_id] = {
|
||||
"session_id": "",
|
||||
"model": choice,
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"last_message_at": datetime.now(timezone.utc).isoformat(),
|
||||
"message_count": 0,
|
||||
}
|
||||
_save_sessions(sessions)
|
||||
await update.message.reply_text(f"Model changed to *{choice}*.", parse_mode="Markdown")
|
||||
return
|
||||
|
||||
# No args — show current model + inline keyboard
|
||||
session = get_active_session(chat_id)
|
||||
if session:
|
||||
current = session.get("model", "?")
|
||||
else:
|
||||
current = _get_config().get("bot.default_model", "sonnet")
|
||||
|
||||
keyboard = [
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
f"{'> ' if m == current else ''}{m}",
|
||||
callback_data=f"model:{m}",
|
||||
)
|
||||
for m in sorted(VALID_MODELS)
|
||||
]
|
||||
]
|
||||
await update.message.reply_text(
|
||||
f"Current model: *{current}*\nSelect a model:",
|
||||
reply_markup=InlineKeyboardMarkup(keyboard),
|
||||
parse_mode="Markdown",
|
||||
)
|
||||
|
||||
|
||||
async def callback_model(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle inline keyboard callback for model selection."""
|
||||
query = update.callback_query
|
||||
await query.answer()
|
||||
|
||||
choice = query.data.replace("model:", "")
|
||||
if choice not in VALID_MODELS:
|
||||
return
|
||||
|
||||
chat_id = str(query.message.chat_id)
|
||||
session = get_active_session(chat_id)
|
||||
if session:
|
||||
set_session_model(chat_id, choice)
|
||||
else:
|
||||
from src.claude_session import _load_sessions, _save_sessions
|
||||
from datetime import datetime, timezone
|
||||
|
||||
sessions = _load_sessions()
|
||||
sessions[chat_id] = {
|
||||
"session_id": "",
|
||||
"model": choice,
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"last_message_at": datetime.now(timezone.utc).isoformat(),
|
||||
"message_count": 0,
|
||||
}
|
||||
_save_sessions(sessions)
|
||||
|
||||
await query.edit_message_text(f"Model changed to *{choice}*.", parse_mode="Markdown")
|
||||
|
||||
|
||||
async def cmd_register(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /register <alias> — register current chat (owner only)."""
|
||||
user_id = update.effective_user.id
|
||||
if not is_owner(user_id):
|
||||
_security_log.warning(
|
||||
"Unauthorized owner command /register by user=%s (%s)",
|
||||
user_id, update.effective_user.username,
|
||||
)
|
||||
await update.message.reply_text("Owner only.")
|
||||
return
|
||||
|
||||
if not context.args:
|
||||
await update.message.reply_text("Usage: /register <alias>")
|
||||
return
|
||||
|
||||
alias = context.args[0].lower()
|
||||
chat_id = str(update.effective_chat.id)
|
||||
|
||||
config = _get_config()
|
||||
channels = config.get("telegram_channels", {})
|
||||
if alias in channels:
|
||||
await update.message.reply_text(f"Alias '{alias}' already registered.")
|
||||
return
|
||||
|
||||
channels[alias] = {"id": chat_id, "default_model": "sonnet"}
|
||||
config.set("telegram_channels", channels)
|
||||
config.save()
|
||||
|
||||
await update.message.reply_text(
|
||||
f"Chat registered as '{alias}' (ID: {chat_id})."
|
||||
)
|
||||
|
||||
|
||||
# --- Message handler ---
|
||||
|
||||
|
||||
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Process incoming text messages — route to Claude."""
|
||||
message = update.message
|
||||
if not message or not message.text:
|
||||
return
|
||||
|
||||
user_id = update.effective_user.id
|
||||
chat_id = update.effective_chat.id
|
||||
chat_type = update.effective_chat.type
|
||||
|
||||
# Private chat: only admins
|
||||
if chat_type == ChatType.PRIVATE:
|
||||
if not is_admin(user_id):
|
||||
_security_log.warning(
|
||||
"Unauthorized Telegram DM from user=%s (%s): %s",
|
||||
user_id, update.effective_user.username,
|
||||
message.text[:100],
|
||||
)
|
||||
return
|
||||
|
||||
# Group chat: only registered chats, and bot must be mentioned or replied to
|
||||
elif chat_type in (ChatType.GROUP, ChatType.SUPERGROUP):
|
||||
if not is_registered_chat(chat_id):
|
||||
return
|
||||
|
||||
# In groups, only respond when mentioned or replied to
|
||||
bot_username = context.bot.username
|
||||
is_reply_to_bot = (
|
||||
message.reply_to_message
|
||||
and message.reply_to_message.from_user
|
||||
and message.reply_to_message.from_user.id == context.bot.id
|
||||
)
|
||||
is_mention = bot_username and f"@{bot_username}" in message.text
|
||||
|
||||
if not is_reply_to_bot and not is_mention:
|
||||
return
|
||||
|
||||
else:
|
||||
return
|
||||
|
||||
text = message.text
|
||||
# Remove bot mention from text if present
|
||||
bot_username = context.bot.username
|
||||
if bot_username:
|
||||
text = text.replace(f"@{bot_username}", "").strip()
|
||||
|
||||
if not text:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Telegram message from %s (%s) in chat %s: %s",
|
||||
user_id, update.effective_user.username,
|
||||
chat_id, text[:100],
|
||||
)
|
||||
|
||||
# Show typing indicator
|
||||
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
|
||||
|
||||
try:
|
||||
response, _is_cmd = await asyncio.to_thread(
|
||||
route_message, str(chat_id), str(user_id), text
|
||||
)
|
||||
|
||||
chunks = split_message(response)
|
||||
for chunk in chunks:
|
||||
await message.reply_text(chunk)
|
||||
except Exception:
|
||||
logger.exception("Error processing Telegram message from %s", user_id)
|
||||
await message.reply_text("Sorry, something went wrong processing your message.")
|
||||
|
||||
|
||||
# --- Factory ---
|
||||
|
||||
|
||||
def create_telegram_bot(config: Config, token: str) -> Application:
|
||||
"""Create and configure the Telegram bot with all handlers."""
|
||||
global _config
|
||||
_config = config
|
||||
|
||||
app = Application.builder().token(token).build()
|
||||
|
||||
app.add_handler(CommandHandler("start", cmd_start))
|
||||
app.add_handler(CommandHandler("help", cmd_help))
|
||||
app.add_handler(CommandHandler("clear", cmd_clear))
|
||||
app.add_handler(CommandHandler("status", cmd_status))
|
||||
app.add_handler(CommandHandler("model", cmd_model))
|
||||
app.add_handler(CommandHandler("register", cmd_register))
|
||||
app.add_handler(CallbackQueryHandler(callback_model, pattern="^model:"))
|
||||
app.add_handler(
|
||||
MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)
|
||||
)
|
||||
|
||||
return app
|
||||
31
src/main.py
31
src/main.py
@@ -108,6 +108,16 @@ def main():
|
||||
"Heartbeat registered (every %d min)", interval_min
|
||||
)
|
||||
|
||||
# Telegram bot (optional — only if telegram_token exists)
|
||||
telegram_token = get_secret("telegram_token")
|
||||
telegram_app = None
|
||||
if telegram_token:
|
||||
from src.adapters.telegram_bot import create_telegram_bot
|
||||
telegram_app = create_telegram_bot(config, telegram_token)
|
||||
logger.info("Telegram bot configured")
|
||||
else:
|
||||
logger.info("No telegram_token — Telegram bot disabled")
|
||||
|
||||
# PID file
|
||||
PID_FILE.write_text(str(os.getpid()))
|
||||
|
||||
@@ -122,8 +132,27 @@ def main():
|
||||
signal.signal(signal.SIGTERM, handle_signal)
|
||||
signal.signal(signal.SIGINT, handle_signal)
|
||||
|
||||
async def _run_all():
|
||||
"""Run Discord + Telegram bots concurrently."""
|
||||
tasks = [asyncio.create_task(client.start(token))]
|
||||
if telegram_app:
|
||||
async def _run_telegram():
|
||||
await telegram_app.initialize()
|
||||
await telegram_app.start()
|
||||
await telegram_app.updater.start_polling()
|
||||
logger.info("Telegram bot started polling")
|
||||
try:
|
||||
loop.run_until_complete(client.start(token))
|
||||
while True:
|
||||
await asyncio.sleep(3600)
|
||||
except asyncio.CancelledError:
|
||||
await telegram_app.updater.stop()
|
||||
await telegram_app.stop()
|
||||
await telegram_app.shutdown()
|
||||
tasks.append(asyncio.create_task(_run_telegram()))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(_run_all())
|
||||
except KeyboardInterrupt:
|
||||
loop.run_until_complete(scheduler.stop())
|
||||
loop.run_until_complete(client.close())
|
||||
|
||||
432
tests/test_telegram_bot.py
Normal file
432
tests/test_telegram_bot.py
Normal file
@@ -0,0 +1,432 @@
|
||||
"""Tests for src/adapters/telegram_bot.py — Telegram bot adapter."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from src.config import Config
|
||||
from src.adapters import telegram_bot
|
||||
from src.adapters.telegram_bot import (
|
||||
create_telegram_bot,
|
||||
is_admin,
|
||||
is_owner,
|
||||
is_registered_chat,
|
||||
split_message,
|
||||
cmd_start,
|
||||
cmd_help,
|
||||
cmd_clear,
|
||||
cmd_status,
|
||||
cmd_model,
|
||||
cmd_register,
|
||||
callback_model,
|
||||
handle_message,
|
||||
)
|
||||
|
||||
|
||||
# --- Fixtures ---
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_config(tmp_path):
|
||||
"""Create a Config backed by a temp file with default data."""
|
||||
data = {
|
||||
"bot": {
|
||||
"name": "Echo",
|
||||
"default_model": "sonnet",
|
||||
"owner": None,
|
||||
"admins": [],
|
||||
},
|
||||
"channels": {},
|
||||
"telegram_channels": {},
|
||||
}
|
||||
config_file = tmp_path / "config.json"
|
||||
config_file.write_text(json.dumps(data, indent=2))
|
||||
return Config(config_file)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def owned_config(tmp_path):
|
||||
"""Config with owner and telegram channels set."""
|
||||
data = {
|
||||
"bot": {
|
||||
"name": "Echo",
|
||||
"default_model": "sonnet",
|
||||
"owner": "111",
|
||||
"admins": ["222"],
|
||||
},
|
||||
"channels": {},
|
||||
"telegram_channels": {
|
||||
"general": {"id": "900", "default_model": "sonnet"},
|
||||
},
|
||||
}
|
||||
config_file = tmp_path / "config.json"
|
||||
config_file.write_text(json.dumps(data, indent=2))
|
||||
return Config(config_file)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _set_config(tmp_config):
|
||||
"""Ensure module config is set for each test."""
|
||||
telegram_bot._config = tmp_config
|
||||
yield
|
||||
telegram_bot._config = None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _set_owned(owned_config):
|
||||
"""Set owned config for specific tests."""
|
||||
telegram_bot._config = owned_config
|
||||
yield
|
||||
telegram_bot._config = None
|
||||
|
||||
|
||||
def _mock_update(user_id=123, chat_id=456, text="hello", chat_type="private", username="testuser"):
|
||||
"""Create a mock telegram Update."""
|
||||
update = MagicMock()
|
||||
update.effective_user = MagicMock()
|
||||
update.effective_user.id = user_id
|
||||
update.effective_user.username = username
|
||||
update.effective_chat = MagicMock()
|
||||
update.effective_chat.id = chat_id
|
||||
update.effective_chat.type = chat_type
|
||||
update.message = MagicMock()
|
||||
update.message.text = text
|
||||
update.message.reply_text = AsyncMock()
|
||||
update.message.reply_to_message = None
|
||||
return update
|
||||
|
||||
|
||||
def _mock_context(bot_id=999, bot_username="echo_bot"):
|
||||
"""Create a mock context."""
|
||||
context = MagicMock()
|
||||
context.args = []
|
||||
context.bot = MagicMock()
|
||||
context.bot.id = bot_id
|
||||
context.bot.username = bot_username
|
||||
context.bot.send_chat_action = AsyncMock()
|
||||
return context
|
||||
|
||||
|
||||
# --- Authorization helpers ---
|
||||
|
||||
|
||||
class TestIsOwner:
|
||||
def test_is_owner_true(self, _set_owned):
|
||||
assert is_owner(111) is True
|
||||
|
||||
def test_is_owner_false(self, _set_owned):
|
||||
assert is_owner(999) is False
|
||||
|
||||
def test_is_owner_none_owner(self):
|
||||
assert is_owner(123) is False
|
||||
|
||||
|
||||
class TestIsAdmin:
|
||||
def test_is_admin_owner_is_admin(self, _set_owned):
|
||||
assert is_admin(111) is True
|
||||
|
||||
def test_is_admin_listed(self, _set_owned):
|
||||
assert is_admin(222) is True
|
||||
|
||||
def test_is_admin_not_listed(self, _set_owned):
|
||||
assert is_admin(999) is False
|
||||
|
||||
|
||||
class TestIsRegisteredChat:
|
||||
def test_is_registered_true(self, _set_owned):
|
||||
assert is_registered_chat(900) is True
|
||||
|
||||
def test_is_registered_false(self, _set_owned):
|
||||
assert is_registered_chat(000) is False
|
||||
|
||||
def test_is_registered_empty(self):
|
||||
assert is_registered_chat(900) is False
|
||||
|
||||
|
||||
# --- split_message ---
|
||||
|
||||
|
||||
class TestSplitMessage:
|
||||
def test_short_message_not_split(self):
|
||||
assert split_message("hello") == ["hello"]
|
||||
|
||||
def test_long_message_split(self):
|
||||
text = "a" * 8192
|
||||
chunks = split_message(text, limit=4096)
|
||||
assert len(chunks) == 2
|
||||
assert all(len(c) <= 4096 for c in chunks)
|
||||
assert "".join(chunks) == text
|
||||
|
||||
def test_split_at_newline(self):
|
||||
text = "line1\n" * 1000
|
||||
chunks = split_message(text, limit=100)
|
||||
assert all(len(c) <= 100 for c in chunks)
|
||||
|
||||
def test_empty_string(self):
|
||||
assert split_message("") == [""]
|
||||
|
||||
|
||||
# --- Command handlers ---
|
||||
|
||||
|
||||
class TestCmdStart:
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_responds(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
await cmd_start(update, context)
|
||||
update.message.reply_text.assert_called_once()
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "Echo Core" in msg
|
||||
|
||||
|
||||
class TestCmdHelp:
|
||||
@pytest.mark.asyncio
|
||||
async def test_help_responds(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
await cmd_help(update, context)
|
||||
update.message.reply_text.assert_called_once()
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "/clear" in msg
|
||||
assert "/model" in msg
|
||||
|
||||
|
||||
class TestCmdClear:
|
||||
@pytest.mark.asyncio
|
||||
async def test_clear_no_session(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
with patch("src.adapters.telegram_bot.clear_session", return_value=False):
|
||||
await cmd_clear(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "No active session" in msg
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clear_with_session(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
with patch("src.adapters.telegram_bot.clear_session", return_value=True):
|
||||
await cmd_clear(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "Session cleared" in msg
|
||||
|
||||
|
||||
class TestCmdStatus:
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_no_session(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
with patch("src.adapters.telegram_bot.get_active_session", return_value=None):
|
||||
await cmd_status(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "No active session" in msg
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_with_session(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
session = {
|
||||
"model": "opus",
|
||||
"session_id": "sess-abc-12345678",
|
||||
"message_count": 5,
|
||||
"total_input_tokens": 1000,
|
||||
"total_output_tokens": 500,
|
||||
}
|
||||
with patch("src.adapters.telegram_bot.get_active_session", return_value=session):
|
||||
await cmd_status(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "opus" in msg
|
||||
assert "5" in msg
|
||||
|
||||
|
||||
class TestCmdModel:
|
||||
@pytest.mark.asyncio
|
||||
async def test_model_with_arg(self, tmp_path, monkeypatch):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
context.args = ["opus"]
|
||||
|
||||
# Need session dir for _save_sessions
|
||||
sessions_dir = tmp_path / "sessions"
|
||||
sessions_dir.mkdir()
|
||||
from src import claude_session
|
||||
monkeypatch.setattr(claude_session, "SESSIONS_DIR", sessions_dir)
|
||||
monkeypatch.setattr(claude_session, "_SESSIONS_FILE", sessions_dir / "active.json")
|
||||
|
||||
with patch("src.adapters.telegram_bot.get_active_session", return_value=None):
|
||||
await cmd_model(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "opus" in msg
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_model_invalid(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
context.args = ["gpt4"]
|
||||
await cmd_model(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "Invalid model" in msg
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_model_keyboard(self):
|
||||
update = _mock_update()
|
||||
context = _mock_context()
|
||||
context.args = []
|
||||
with patch("src.adapters.telegram_bot.get_active_session", return_value=None):
|
||||
await cmd_model(update, context)
|
||||
call_kwargs = update.message.reply_text.call_args[1]
|
||||
assert "reply_markup" in call_kwargs
|
||||
|
||||
|
||||
class TestCmdRegister:
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_not_owner(self):
|
||||
update = _mock_update(user_id=999)
|
||||
context = _mock_context()
|
||||
context.args = ["test"]
|
||||
await cmd_register(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "Owner only" in msg
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_owner(self, _set_owned):
|
||||
update = _mock_update(user_id=111, chat_id=777)
|
||||
context = _mock_context()
|
||||
context.args = ["mychat"]
|
||||
await cmd_register(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "registered" in msg
|
||||
assert "mychat" in msg
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_no_args(self, _set_owned):
|
||||
update = _mock_update(user_id=111)
|
||||
context = _mock_context()
|
||||
context.args = []
|
||||
await cmd_register(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "Usage" in msg
|
||||
|
||||
|
||||
# --- Message handler ---
|
||||
|
||||
|
||||
class TestHandleMessage:
|
||||
@pytest.mark.asyncio
|
||||
async def test_private_chat_admin(self, _set_owned):
|
||||
update = _mock_update(user_id=111, chat_type="private", text="Hello Claude")
|
||||
context = _mock_context()
|
||||
with patch("src.adapters.telegram_bot.route_message", return_value=("Hi!", False)) as mock_route:
|
||||
await handle_message(update, context)
|
||||
mock_route.assert_called_once()
|
||||
update.message.reply_text.assert_called_with("Hi!")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_private_chat_unauthorized(self, _set_owned):
|
||||
update = _mock_update(user_id=999, chat_type="private", text="Hello")
|
||||
context = _mock_context()
|
||||
with patch("src.adapters.telegram_bot.route_message") as mock_route:
|
||||
await handle_message(update, context)
|
||||
mock_route.assert_not_called()
|
||||
update.message.reply_text.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_chat_unregistered(self, _set_owned):
|
||||
update = _mock_update(user_id=111, chat_id=999, chat_type="supergroup", text="Hello")
|
||||
context = _mock_context()
|
||||
with patch("src.adapters.telegram_bot.route_message") as mock_route:
|
||||
await handle_message(update, context)
|
||||
mock_route.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_chat_registered_mention(self, _set_owned):
|
||||
update = _mock_update(
|
||||
user_id=111, chat_id=900, chat_type="supergroup",
|
||||
text="@echo_bot what is the weather?"
|
||||
)
|
||||
context = _mock_context(bot_username="echo_bot")
|
||||
with patch("src.adapters.telegram_bot.route_message", return_value=("Sunny!", False)):
|
||||
await handle_message(update, context)
|
||||
update.message.reply_text.assert_called_with("Sunny!")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_chat_registered_no_mention(self, _set_owned):
|
||||
update = _mock_update(
|
||||
user_id=111, chat_id=900, chat_type="supergroup",
|
||||
text="just chatting"
|
||||
)
|
||||
context = _mock_context(bot_username="echo_bot")
|
||||
with patch("src.adapters.telegram_bot.route_message") as mock_route:
|
||||
await handle_message(update, context)
|
||||
mock_route.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_chat_reply_to_bot(self, _set_owned):
|
||||
update = _mock_update(
|
||||
user_id=111, chat_id=900, chat_type="supergroup",
|
||||
text="follow up"
|
||||
)
|
||||
# Set up reply-to-bot
|
||||
update.message.reply_to_message = MagicMock()
|
||||
update.message.reply_to_message.from_user = MagicMock()
|
||||
update.message.reply_to_message.from_user.id = 999 # bot id
|
||||
context = _mock_context(bot_id=999)
|
||||
with patch("src.adapters.telegram_bot.route_message", return_value=("Response", False)):
|
||||
await handle_message(update, context)
|
||||
update.message.reply_text.assert_called_with("Response")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_long_response_split(self, _set_owned):
|
||||
update = _mock_update(user_id=111, chat_type="private", text="Hello")
|
||||
context = _mock_context()
|
||||
long_response = "x" * 8000
|
||||
with patch("src.adapters.telegram_bot.route_message", return_value=(long_response, False)):
|
||||
await handle_message(update, context)
|
||||
assert update.message.reply_text.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_error_handling(self, _set_owned):
|
||||
update = _mock_update(user_id=111, chat_type="private", text="Hello")
|
||||
context = _mock_context()
|
||||
with patch("src.adapters.telegram_bot.route_message", side_effect=Exception("boom")):
|
||||
await handle_message(update, context)
|
||||
msg = update.message.reply_text.call_args[0][0]
|
||||
assert "Sorry" in msg
|
||||
|
||||
|
||||
# --- Security logging ---
|
||||
|
||||
|
||||
class TestSecurityLogging:
|
||||
@pytest.mark.asyncio
|
||||
async def test_unauthorized_dm_logged(self, _set_owned):
|
||||
update = _mock_update(user_id=999, chat_type="private", text="hack attempt")
|
||||
context = _mock_context()
|
||||
with patch.object(telegram_bot._security_log, "warning") as mock_log:
|
||||
await handle_message(update, context)
|
||||
mock_log.assert_called_once()
|
||||
assert "Unauthorized" in mock_log.call_args[0][0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unauthorized_register_logged(self):
|
||||
update = _mock_update(user_id=999)
|
||||
context = _mock_context()
|
||||
context.args = ["test"]
|
||||
with patch.object(telegram_bot._security_log, "warning") as mock_log:
|
||||
await cmd_register(update, context)
|
||||
mock_log.assert_called_once()
|
||||
|
||||
|
||||
# --- Factory ---
|
||||
|
||||
|
||||
class TestCreateTelegramBot:
|
||||
def test_creates_application(self, tmp_config):
|
||||
from telegram.ext import Application
|
||||
app = create_telegram_bot(tmp_config, "fake-token-123")
|
||||
assert isinstance(app, Application)
|
||||
|
||||
def test_sets_config(self, tmp_config):
|
||||
create_telegram_bot(tmp_config, "fake-token-123")
|
||||
assert telegram_bot._config is tmp_config
|
||||
Reference in New Issue
Block a user