Implement email-based 2FA authentication for Telegram bot with Oracle integration fixes
This commit adds a complete email authentication flow for the Telegram bot, allowing users to login with email + password instead of web app linking codes. Includes critical bug fixes for Oracle integration. **New Features:** - Email-based 2FA authentication with 6-digit codes sent via SMTP - Backend endpoints: verify-email and login-with-email - ConversationHandler for email authentication flow in Telegram bot - Session token verification to prevent user ID spoofing - Rate limiting (5 attempts per 5 minutes) - Email code expiry (5 minutes) with automatic cleanup **Bug Fixes:** - Fixed Oracle column name: ACTIV → INACTIV (with inverted logic) - Fixed Oracle password verification: verificautilizator returns checksum, not user_id - Fixed username case sensitivity: Oracle usernames must be uppercase - Fixed SMTP connection: use start_tls parameter instead of manual STARTTLS - Added middleware exclusions for public email auth endpoints **Backend Changes:** - Added verify-email endpoint (public) in telegram.py - Added login-with-email endpoint (public) with rate limiting and session verification - Updated middleware exclusions in main.py and auth_middleware_wrapper.py - Added AUTH_SESSION_SECRET configuration for session token signing **Telegram Bot Changes:** - New modules: app/auth/email_auth.py, app/bot/email_handlers.py - New utilities: app/utils/email_service.py (SMTP email sending) - Updated handlers.py: ignore callbacks handled by ConversationHandler - Updated menus.py: show Login button for unauthenticated users - Updated API client: verify_email() and login_with_email() methods - Database: email_auth_codes table with cleanup task **Configuration:** - Added SMTP configuration to telegram-bot .env.example - Added AUTH_SESSION_SECRET to backend .env.example - Updated .gitignore: exclude temporary files (*.pid, *.checksum, test scripts) **Dependencies:** - Added aiosmtplib for async SMTP email sending 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
171
reports-app/telegram-bot/app/auth/email_auth.py
Normal file
171
reports-app/telegram-bot/app/auth/email_auth.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""
|
||||
Email authentication logic with crypto-secure code generation
|
||||
"""
|
||||
import secrets
|
||||
import re
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict
|
||||
from collections import defaultdict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# RATE LIMITING (In-Memory)
|
||||
# ============================================================================
|
||||
# NOTE: For production with multiple bot instances, migrate to Redis
|
||||
# See "Optional Future Enhancements" section in plan
|
||||
|
||||
_rate_limit_store: Dict[str, list] = defaultdict(list)
|
||||
|
||||
|
||||
async def check_rate_limit(
|
||||
identifier: str,
|
||||
max_attempts: int = 3,
|
||||
window_minutes: int = 60
|
||||
) -> bool:
|
||||
"""
|
||||
Check if identifier is within rate limit
|
||||
|
||||
Args:
|
||||
identifier: Email or telegram_user_id (as string)
|
||||
max_attempts: Maximum attempts allowed
|
||||
window_minutes: Time window in minutes
|
||||
|
||||
Returns:
|
||||
True if within limit (can proceed), False if exceeded
|
||||
|
||||
NOTE: In-memory implementation - resets on bot restart
|
||||
"""
|
||||
now = datetime.now()
|
||||
cutoff = now - timedelta(minutes=window_minutes)
|
||||
|
||||
# Clean old attempts
|
||||
_rate_limit_store[identifier] = [
|
||||
attempt for attempt in _rate_limit_store[identifier]
|
||||
if attempt > cutoff
|
||||
]
|
||||
|
||||
# Check limit
|
||||
if len(_rate_limit_store[identifier]) >= max_attempts:
|
||||
logger.warning(f"Rate limit exceeded for {identifier}")
|
||||
return False
|
||||
|
||||
# Add new attempt
|
||||
_rate_limit_store[identifier].append(now)
|
||||
return True
|
||||
|
||||
|
||||
def clear_rate_limit(identifier: str) -> None:
|
||||
"""Clear rate limit for identifier (e.g., after successful auth)"""
|
||||
if identifier in _rate_limit_store:
|
||||
del _rate_limit_store[identifier]
|
||||
logger.debug(f"Rate limit cleared for {identifier}")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CODE GENERATION (Crypto-Secure)
|
||||
# ============================================================================
|
||||
|
||||
def generate_email_code() -> str:
|
||||
"""
|
||||
Generate crypto-secure 6-digit code
|
||||
|
||||
Uses secrets module (not random) for cryptographic security
|
||||
|
||||
Returns:
|
||||
6-digit string (000000 - 999999)
|
||||
"""
|
||||
# Generate 6-digit code using secrets (crypto-secure)
|
||||
code = ''.join(secrets.choice('0123456789') for _ in range(6))
|
||||
|
||||
logger.debug(f"Generated email auth code (length: {len(code)})")
|
||||
return code
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# EMAIL VALIDATION
|
||||
# ============================================================================
|
||||
|
||||
def is_valid_email_format(email: str) -> bool:
|
||||
"""
|
||||
Validate email format (basic regex)
|
||||
|
||||
Args:
|
||||
email: Email address to validate
|
||||
|
||||
Returns:
|
||||
True if format is valid
|
||||
"""
|
||||
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
||||
return bool(re.match(pattern, email))
|
||||
|
||||
|
||||
async def verify_email_in_oracle(email: str) -> Optional[str]:
|
||||
"""
|
||||
Verify email exists in Oracle UTILIZATORI table via backend API
|
||||
|
||||
Args:
|
||||
email: Email address to check
|
||||
|
||||
Returns:
|
||||
Oracle username if found and active, None otherwise
|
||||
|
||||
NOTE: Uses backend API endpoint /api/telegram/auth/verify-email
|
||||
"""
|
||||
try:
|
||||
from app.api.client import get_backend_client
|
||||
|
||||
backend_client = get_backend_client()
|
||||
|
||||
# Call backend API to verify email
|
||||
response = await backend_client.verify_email(email)
|
||||
|
||||
if response.get('success'):
|
||||
username = response.get('username')
|
||||
logger.info(f"Email verified via backend: {email} -> {username}")
|
||||
return username
|
||||
else:
|
||||
logger.warning(f"Email not found or inactive: {email}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error verifying email via backend: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# SESSION TOKEN GENERATION (Prevent User ID Spoofing)
|
||||
# ============================================================================
|
||||
|
||||
def generate_session_token(telegram_user_id: int, email: str) -> str:
|
||||
"""
|
||||
Generate signed session token for backend verification
|
||||
|
||||
This prevents user ID spoofing attacks where malicious clients
|
||||
could impersonate Telegram users by sending arbitrary user IDs
|
||||
|
||||
Args:
|
||||
telegram_user_id: Telegram user ID
|
||||
email: Verified email address
|
||||
|
||||
Returns:
|
||||
Signed token (simple implementation - upgrade to JWT in future)
|
||||
|
||||
NOTE: For production, use proper JWT signing with shared secret
|
||||
"""
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
# Get secret from env (should match backend)
|
||||
secret = os.getenv("AUTH_SESSION_SECRET", "change-me-in-production")
|
||||
|
||||
# Create signature: HMAC-like hash
|
||||
payload = f"{telegram_user_id}:{email}:{secret}"
|
||||
signature = hashlib.sha256(payload.encode()).hexdigest()[:16]
|
||||
|
||||
# Token format: user_id:email:signature
|
||||
token = f"{telegram_user_id}:{email}:{signature}"
|
||||
|
||||
logger.debug(f"Generated session token for user {telegram_user_id}")
|
||||
return token
|
||||
Reference in New Issue
Block a user