Files
echo-core/src/adapters/telegram_bot.py
2026-02-15 23:23:29 +00:00

587 lines
19 KiB
Python

"""Telegram bot adapter — commands and message handlers."""
import asyncio
import logging
from telegram import BotCommand, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction, ChatType
from telegram.ext import (
Application,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from src.config import Config
from src.claude_session import (
clear_session,
get_active_session,
set_session_model,
VALID_MODELS,
)
from src.fast_commands import dispatch as fast_dispatch
from src.router import route_message
logger = logging.getLogger("echo-core.telegram")
_security_log = logging.getLogger("echo-core.security")
# Module-level config reference, set by create_telegram_bot()
_config: Config | None = None
def _get_config() -> Config:
"""Return the module-level config, raising if not initialized."""
if _config is None:
raise RuntimeError("Bot not initialized — call create_telegram_bot() first")
return _config
# --- Authorization helpers ---
def is_owner(user_id: int) -> bool:
"""Check if user_id matches config bot.owner."""
owner = _get_config().get("bot.owner")
return str(user_id) == str(owner)
def is_admin(user_id: int) -> bool:
"""Check if user_id is owner or in admins list."""
if is_owner(user_id):
return True
admins = _get_config().get("bot.admins", [])
return str(user_id) in admins
def is_registered_chat(chat_id: int) -> bool:
"""Check if Telegram chat_id is in any registered channel entry."""
channels = _get_config().get("telegram_channels", {})
return any(ch.get("id") == str(chat_id) for ch in channels.values())
def _channel_alias_for_chat(chat_id: int) -> str | None:
"""Resolve a Telegram chat ID to its config alias."""
channels = _get_config().get("telegram_channels", {})
for alias, info in channels.items():
if info.get("id") == str(chat_id):
return alias
return None
# --- Message splitting helper ---
def split_message(text: str, limit: int = 4096) -> list[str]:
"""Split text into chunks that fit Telegram's message limit."""
if len(text) <= limit:
return [text]
chunks = []
while text:
if len(text) <= limit:
chunks.append(text)
break
split_at = text.rfind("\n", 0, limit)
if split_at == -1:
split_at = limit
chunks.append(text[:split_at])
text = text[split_at:].lstrip("\n")
return chunks
# --- Command handlers ---
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start — welcome message."""
await update.message.reply_text(
"Echo Core — Telegram adapter.\n"
"Send a message to chat with Claude.\n"
"Use /help for available commands."
)
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /help — list commands."""
lines = [
"*Echo Commands*",
"/help — Show this help",
"/clear — Clear session",
"/status — Session status",
"/model — View/change AI model",
"",
"*Email*",
"/email — Check unread emails",
"/emailsend <to> <subject> :: <body>",
"/emailsave — Save emails to KB",
"",
"*Calendar*",
"/calendar — Today + tomorrow",
"/calendarweek — Week schedule",
"/calendarbusy — Am I busy?",
"",
"*Notes*",
"/note <text> — Quick note",
"/jurnal <text> — Journal entry",
"/search <query> — Memory search",
"/kb [category] — KB notes",
"",
"*Reminders*",
"/remind <HH:MM> <text>",
"",
"*Git*",
"/commit [msg] — Commit changes",
"/push — Push to remote",
"/pull — Pull with rebase",
"/test [pattern] — Run tests",
"",
"*Ops*",
"/logs [N] — Log lines",
"/doctor — Diagnostics",
"/heartbeat — Health checks",
]
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
async def cmd_clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /clear — clear session for this chat."""
chat_id = str(update.effective_chat.id)
default_model = _get_config().get("bot.default_model", "sonnet")
removed = clear_session(chat_id)
if removed:
await update.message.reply_text(
f"Session cleared. Model reset to {default_model}."
)
else:
await update.message.reply_text("No active session for this chat.")
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /status — show session status."""
chat_id = str(update.effective_chat.id)
session = get_active_session(chat_id)
if not session:
await update.message.reply_text("No active session.")
return
model = session.get("model", "?")
sid = session.get("session_id", "?")[:8]
count = session.get("message_count", 0)
in_tok = session.get("total_input_tokens", 0)
out_tok = session.get("total_output_tokens", 0)
await update.message.reply_text(
f"Model: {model}\n"
f"Session: {sid}\n"
f"Messages: {count}\n"
f"Tokens: {in_tok} in / {out_tok} out"
)
async def cmd_model(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /model — view or change model with inline keyboard."""
chat_id = str(update.effective_chat.id)
args = context.args
if args:
# /model opus — change model directly
choice = args[0].lower()
if choice not in VALID_MODELS:
await update.message.reply_text(
f"Invalid model '{choice}'. Choose from: {', '.join(sorted(VALID_MODELS))}"
)
return
session = get_active_session(chat_id)
if session:
set_session_model(chat_id, choice)
else:
from src.claude_session import _load_sessions, _save_sessions
from datetime import datetime, timezone
sessions = _load_sessions()
sessions[chat_id] = {
"session_id": "",
"model": choice,
"created_at": datetime.now(timezone.utc).isoformat(),
"last_message_at": datetime.now(timezone.utc).isoformat(),
"message_count": 0,
}
_save_sessions(sessions)
await update.message.reply_text(f"Model changed to *{choice}*.", parse_mode="Markdown")
return
# No args — show current model + inline keyboard
session = get_active_session(chat_id)
if session:
current = session.get("model", "?")
else:
current = _get_config().get("bot.default_model", "sonnet")
keyboard = [
[
InlineKeyboardButton(
f"{'> ' if m == current else ''}{m}",
callback_data=f"model:{m}",
)
for m in sorted(VALID_MODELS)
]
]
await update.message.reply_text(
f"Current model: *{current}*\nSelect a model:",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="Markdown",
)
async def callback_model(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle inline keyboard callback for model selection."""
query = update.callback_query
await query.answer()
choice = query.data.replace("model:", "")
if choice not in VALID_MODELS:
return
chat_id = str(query.message.chat_id)
session = get_active_session(chat_id)
if session:
set_session_model(chat_id, choice)
else:
from src.claude_session import _load_sessions, _save_sessions
from datetime import datetime, timezone
sessions = _load_sessions()
sessions[chat_id] = {
"session_id": "",
"model": choice,
"created_at": datetime.now(timezone.utc).isoformat(),
"last_message_at": datetime.now(timezone.utc).isoformat(),
"message_count": 0,
}
_save_sessions(sessions)
await query.edit_message_text(f"Model changed to *{choice}*.", parse_mode="Markdown")
async def cmd_register(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /register <alias> — register current chat (owner only)."""
user_id = update.effective_user.id
if not is_owner(user_id):
_security_log.warning(
"Unauthorized owner command /register by user=%s (%s)",
user_id, update.effective_user.username,
)
await update.message.reply_text("Owner only.")
return
if not context.args:
await update.message.reply_text("Usage: /register <alias>")
return
alias = context.args[0].lower()
chat_id = str(update.effective_chat.id)
config = _get_config()
channels = config.get("telegram_channels", {})
if alias in channels:
await update.message.reply_text(f"Alias '{alias}' already registered.")
return
channels[alias] = {"id": chat_id, "default_model": "sonnet"}
config.set("telegram_channels", channels)
config.save()
await update.message.reply_text(
f"Chat registered as '{alias}' (ID: {chat_id})."
)
# --- Fast command handlers ---
async def _fast_cmd(update: Update, name: str, args: list[str]) -> None:
"""Run a fast command and reply with the result."""
await update.message.chat.send_action(ChatAction.TYPING)
result = await asyncio.to_thread(fast_dispatch, name, args)
if result:
for chunk in split_message(result):
await update.message.reply_text(chunk)
else:
await update.message.reply_text(f"Unknown command: /{name}")
async def cmd_email(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/email — check unread emails."""
await _fast_cmd(update, "email", list(context.args or []))
async def cmd_emailsend(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/emailsend <to> <subject> :: <body>"""
await _fast_cmd(update, "email", ["send"] + list(context.args or []))
async def cmd_emailsave(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/emailsave — save unread emails to KB."""
await _fast_cmd(update, "email", ["save"])
async def cmd_calendar(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/calendar — today + tomorrow events."""
await _fast_cmd(update, "calendar", list(context.args or []))
async def cmd_calendarweek(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/calendarweek — this week's schedule."""
await _fast_cmd(update, "calendar", ["week"])
async def cmd_calendarbusy(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/calendarbusy — am I in a meeting?"""
await _fast_cmd(update, "calendar", ["busy"])
async def cmd_note(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/note <text>"""
args = list(context.args or [])
if not args:
await update.message.reply_text("Usage: /note <text>")
return
await _fast_cmd(update, "note", args)
async def cmd_jurnal(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/jurnal <text>"""
args = list(context.args or [])
if not args:
await update.message.reply_text("Usage: /jurnal <text>")
return
await _fast_cmd(update, "jurnal", args)
async def cmd_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/search <query>"""
args = list(context.args or [])
if not args:
await update.message.reply_text("Usage: /search <query>")
return
await _fast_cmd(update, "search", args)
async def cmd_kb(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/kb [category]"""
await _fast_cmd(update, "kb", list(context.args or []))
async def cmd_remind(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/remind <HH:MM> <text> or /remind <YYYY-MM-DD> <HH:MM> <text>"""
args = list(context.args or [])
if len(args) < 2:
await update.message.reply_text("Usage: /remind <HH:MM> <text>")
return
await _fast_cmd(update, "remind", args)
async def cmd_commit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/commit [message]"""
await _fast_cmd(update, "commit", list(context.args or []))
async def cmd_push(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/push"""
await _fast_cmd(update, "push", [])
async def cmd_pull(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/pull"""
await _fast_cmd(update, "pull", [])
async def cmd_test(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/test [pattern]"""
await _fast_cmd(update, "test", list(context.args or []))
async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/logs [N]"""
await _fast_cmd(update, "logs", list(context.args or []))
async def cmd_doctor(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/doctor"""
await _fast_cmd(update, "doctor", [])
async def cmd_heartbeat_tg(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/heartbeat"""
await _fast_cmd(update, "heartbeat", [])
# --- Message handler ---
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Process incoming text messages — route to Claude."""
message = update.message
if not message or not message.text:
return
user_id = update.effective_user.id
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type
# Private chat: only admins
if chat_type == ChatType.PRIVATE:
if not is_admin(user_id):
_security_log.warning(
"Unauthorized Telegram DM from user=%s (%s): %s",
user_id, update.effective_user.username,
message.text[:100],
)
return
# Group chat: only registered chats, and bot must be mentioned or replied to
elif chat_type in (ChatType.GROUP, ChatType.SUPERGROUP):
if not is_registered_chat(chat_id):
return
# In groups, only respond when mentioned or replied to
bot_username = context.bot.username
is_reply_to_bot = (
message.reply_to_message
and message.reply_to_message.from_user
and message.reply_to_message.from_user.id == context.bot.id
)
is_mention = bot_username and f"@{bot_username}" in message.text
if not is_reply_to_bot and not is_mention:
return
else:
return
text = message.text
# Remove bot mention from text if present
bot_username = context.bot.username
if bot_username:
text = text.replace(f"@{bot_username}", "").strip()
if not text:
return
logger.info(
"Telegram message from %s (%s) in chat %s: %s",
user_id, update.effective_user.username,
chat_id, text[:100],
)
# Show typing indicator
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
# Track intermediate messages sent via callback
sent_count = 0
loop = asyncio.get_event_loop()
def on_text(text_block: str) -> None:
"""Send intermediate Claude text blocks to the chat."""
nonlocal sent_count
chunks = split_message(text_block)
for chunk in chunks:
asyncio.run_coroutine_threadsafe(
context.bot.send_message(chat_id=chat_id, text=chunk), loop
)
sent_count += 1
try:
response, _is_cmd = await asyncio.to_thread(
route_message, str(chat_id), str(user_id), text,
on_text=on_text,
)
# Only send combined response if no intermediates were delivered
if sent_count == 0:
chunks = split_message(response)
for chunk in chunks:
await message.reply_text(chunk)
except Exception:
logger.exception("Error processing Telegram message from %s", user_id)
await message.reply_text("Sorry, something went wrong processing your message.")
# --- Factory ---
def create_telegram_bot(config: Config, token: str) -> Application:
"""Create and configure the Telegram bot with all handlers."""
global _config
_config = config
app = Application.builder().token(token).build()
# Core commands
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("clear", cmd_clear))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(CommandHandler("model", cmd_model))
app.add_handler(CommandHandler("register", cmd_register))
app.add_handler(CallbackQueryHandler(callback_model, pattern="^model:"))
# Fast commands
app.add_handler(CommandHandler("email", cmd_email))
app.add_handler(CommandHandler("emailsend", cmd_emailsend))
app.add_handler(CommandHandler("emailsave", cmd_emailsave))
app.add_handler(CommandHandler("calendar", cmd_calendar))
app.add_handler(CommandHandler("calendarweek", cmd_calendarweek))
app.add_handler(CommandHandler("calendarbusy", cmd_calendarbusy))
app.add_handler(CommandHandler("note", cmd_note))
app.add_handler(CommandHandler("jurnal", cmd_jurnal))
app.add_handler(CommandHandler("search", cmd_search))
app.add_handler(CommandHandler("kb", cmd_kb))
app.add_handler(CommandHandler("remind", cmd_remind))
app.add_handler(CommandHandler("commit", cmd_commit))
app.add_handler(CommandHandler("push", cmd_push))
app.add_handler(CommandHandler("pull", cmd_pull))
app.add_handler(CommandHandler("test", cmd_test))
app.add_handler(CommandHandler("logs", cmd_logs))
app.add_handler(CommandHandler("doctor", cmd_doctor))
app.add_handler(CommandHandler("heartbeat", cmd_heartbeat_tg))
# Text message handler (must be last)
app.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)
)
# Register bot menu commands on startup
async def post_init(application: Application) -> None:
await application.bot.set_my_commands([
BotCommand("help", "List commands"),
BotCommand("email", "Check unread emails"),
BotCommand("emailsend", "Send an email"),
BotCommand("emailsave", "Save emails to KB"),
BotCommand("calendar", "Today + tomorrow events"),
BotCommand("calendarweek", "Week schedule"),
BotCommand("calendarbusy", "Am I busy?"),
BotCommand("note", "Quick note"),
BotCommand("jurnal", "Journal entry"),
BotCommand("search", "Memory search"),
BotCommand("kb", "KB notes"),
BotCommand("remind", "Create reminder"),
BotCommand("commit", "Git commit"),
BotCommand("push", "Git push"),
BotCommand("pull", "Git pull"),
BotCommand("test", "Run tests"),
BotCommand("clear", "Clear session"),
BotCommand("status", "Session status"),
BotCommand("model", "View/change model"),
BotCommand("logs", "Show log lines"),
BotCommand("doctor", "Diagnostics"),
BotCommand("heartbeat", "Health checks"),
])
app.post_init = post_init
return app