This commit adds a complete email authentication flow for the Telegram bot, allowing users to login with email + password instead of web app linking codes. Includes critical bug fixes for Oracle integration. **New Features:** - Email-based 2FA authentication with 6-digit codes sent via SMTP - Backend endpoints: verify-email and login-with-email - ConversationHandler for email authentication flow in Telegram bot - Session token verification to prevent user ID spoofing - Rate limiting (5 attempts per 5 minutes) - Email code expiry (5 minutes) with automatic cleanup **Bug Fixes:** - Fixed Oracle column name: ACTIV → INACTIV (with inverted logic) - Fixed Oracle password verification: verificautilizator returns checksum, not user_id - Fixed username case sensitivity: Oracle usernames must be uppercase - Fixed SMTP connection: use start_tls parameter instead of manual STARTTLS - Added middleware exclusions for public email auth endpoints **Backend Changes:** - Added verify-email endpoint (public) in telegram.py - Added login-with-email endpoint (public) with rate limiting and session verification - Updated middleware exclusions in main.py and auth_middleware_wrapper.py - Added AUTH_SESSION_SECRET configuration for session token signing **Telegram Bot Changes:** - New modules: app/auth/email_auth.py, app/bot/email_handlers.py - New utilities: app/utils/email_service.py (SMTP email sending) - Updated handlers.py: ignore callbacks handled by ConversationHandler - Updated menus.py: show Login button for unauthenticated users - Updated API client: verify_email() and login_with_email() methods - Database: email_auth_codes table with cleanup task **Configuration:** - Added SMTP configuration to telegram-bot .env.example - Added AUTH_SESSION_SECRET to backend .env.example - Updated .gitignore: exclude temporary files (*.pid, *.checksum, test scripts) **Dependencies:** - Added aiosmtplib for async SMTP email sending 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
172 lines
5.1 KiB
Python
172 lines
5.1 KiB
Python
"""
|
|
Email authentication logic with crypto-secure code generation
|
|
"""
|
|
import secrets
|
|
import re
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict
|
|
from collections import defaultdict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ============================================================================
|
|
# RATE LIMITING (In-Memory)
|
|
# ============================================================================
|
|
# NOTE: For production with multiple bot instances, migrate to Redis
|
|
# See "Optional Future Enhancements" section in plan
|
|
|
|
_rate_limit_store: Dict[str, list] = defaultdict(list)
|
|
|
|
|
|
async def check_rate_limit(
|
|
identifier: str,
|
|
max_attempts: int = 3,
|
|
window_minutes: int = 60
|
|
) -> bool:
|
|
"""
|
|
Check if identifier is within rate limit
|
|
|
|
Args:
|
|
identifier: Email or telegram_user_id (as string)
|
|
max_attempts: Maximum attempts allowed
|
|
window_minutes: Time window in minutes
|
|
|
|
Returns:
|
|
True if within limit (can proceed), False if exceeded
|
|
|
|
NOTE: In-memory implementation - resets on bot restart
|
|
"""
|
|
now = datetime.now()
|
|
cutoff = now - timedelta(minutes=window_minutes)
|
|
|
|
# Clean old attempts
|
|
_rate_limit_store[identifier] = [
|
|
attempt for attempt in _rate_limit_store[identifier]
|
|
if attempt > cutoff
|
|
]
|
|
|
|
# Check limit
|
|
if len(_rate_limit_store[identifier]) >= max_attempts:
|
|
logger.warning(f"Rate limit exceeded for {identifier}")
|
|
return False
|
|
|
|
# Add new attempt
|
|
_rate_limit_store[identifier].append(now)
|
|
return True
|
|
|
|
|
|
def clear_rate_limit(identifier: str) -> None:
|
|
"""Clear rate limit for identifier (e.g., after successful auth)"""
|
|
if identifier in _rate_limit_store:
|
|
del _rate_limit_store[identifier]
|
|
logger.debug(f"Rate limit cleared for {identifier}")
|
|
|
|
|
|
# ============================================================================
|
|
# CODE GENERATION (Crypto-Secure)
|
|
# ============================================================================
|
|
|
|
def generate_email_code() -> str:
|
|
"""
|
|
Generate crypto-secure 6-digit code
|
|
|
|
Uses secrets module (not random) for cryptographic security
|
|
|
|
Returns:
|
|
6-digit string (000000 - 999999)
|
|
"""
|
|
# Generate 6-digit code using secrets (crypto-secure)
|
|
code = ''.join(secrets.choice('0123456789') for _ in range(6))
|
|
|
|
logger.debug(f"Generated email auth code (length: {len(code)})")
|
|
return code
|
|
|
|
|
|
# ============================================================================
|
|
# EMAIL VALIDATION
|
|
# ============================================================================
|
|
|
|
def is_valid_email_format(email: str) -> bool:
|
|
"""
|
|
Validate email format (basic regex)
|
|
|
|
Args:
|
|
email: Email address to validate
|
|
|
|
Returns:
|
|
True if format is valid
|
|
"""
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
return bool(re.match(pattern, email))
|
|
|
|
|
|
async def verify_email_in_oracle(email: str) -> Optional[str]:
|
|
"""
|
|
Verify email exists in Oracle UTILIZATORI table via backend API
|
|
|
|
Args:
|
|
email: Email address to check
|
|
|
|
Returns:
|
|
Oracle username if found and active, None otherwise
|
|
|
|
NOTE: Uses backend API endpoint /api/telegram/auth/verify-email
|
|
"""
|
|
try:
|
|
from app.api.client import get_backend_client
|
|
|
|
backend_client = get_backend_client()
|
|
|
|
# Call backend API to verify email
|
|
response = await backend_client.verify_email(email)
|
|
|
|
if response.get('success'):
|
|
username = response.get('username')
|
|
logger.info(f"Email verified via backend: {email} -> {username}")
|
|
return username
|
|
else:
|
|
logger.warning(f"Email not found or inactive: {email}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error verifying email via backend: {e}", exc_info=True)
|
|
return None
|
|
|
|
|
|
# ============================================================================
|
|
# SESSION TOKEN GENERATION (Prevent User ID Spoofing)
|
|
# ============================================================================
|
|
|
|
def generate_session_token(telegram_user_id: int, email: str) -> str:
|
|
"""
|
|
Generate signed session token for backend verification
|
|
|
|
This prevents user ID spoofing attacks where malicious clients
|
|
could impersonate Telegram users by sending arbitrary user IDs
|
|
|
|
Args:
|
|
telegram_user_id: Telegram user ID
|
|
email: Verified email address
|
|
|
|
Returns:
|
|
Signed token (simple implementation - upgrade to JWT in future)
|
|
|
|
NOTE: For production, use proper JWT signing with shared secret
|
|
"""
|
|
import hashlib
|
|
import os
|
|
|
|
# Get secret from env (should match backend)
|
|
secret = os.getenv("AUTH_SESSION_SECRET", "change-me-in-production")
|
|
|
|
# Create signature: HMAC-like hash
|
|
payload = f"{telegram_user_id}:{email}:{secret}"
|
|
signature = hashlib.sha256(payload.encode()).hexdigest()[:16]
|
|
|
|
# Token format: user_id:email:signature
|
|
token = f"{telegram_user_id}:{email}:{signature}"
|
|
|
|
logger.debug(f"Generated session token for user {telegram_user_id}")
|
|
return token
|