Files
roa2web-service-auto/reports-app/telegram-bot/app/auth/email_auth.py
Marius Mutu 706062dc0f Implement email-based 2FA authentication for Telegram bot with Oracle integration fixes
This commit adds a complete email authentication flow for the Telegram bot, allowing users to login with email + password instead of web app linking codes. Includes critical bug fixes for Oracle integration.

**New Features:**
- Email-based 2FA authentication with 6-digit codes sent via SMTP
- Backend endpoints: verify-email and login-with-email
- ConversationHandler for email authentication flow in Telegram bot
- Session token verification to prevent user ID spoofing
- Rate limiting (5 attempts per 5 minutes)
- Email code expiry (5 minutes) with automatic cleanup

**Bug Fixes:**
- Fixed Oracle column name: ACTIV → INACTIV (with inverted logic)
- Fixed Oracle password verification: verificautilizator returns checksum, not user_id
- Fixed username case sensitivity: Oracle usernames must be uppercase
- Fixed SMTP connection: use start_tls parameter instead of manual STARTTLS
- Added middleware exclusions for public email auth endpoints

**Backend Changes:**
- Added verify-email endpoint (public) in telegram.py
- Added login-with-email endpoint (public) with rate limiting and session verification
- Updated middleware exclusions in main.py and auth_middleware_wrapper.py
- Added AUTH_SESSION_SECRET configuration for session token signing

**Telegram Bot Changes:**
- New modules: app/auth/email_auth.py, app/bot/email_handlers.py
- New utilities: app/utils/email_service.py (SMTP email sending)
- Updated handlers.py: ignore callbacks handled by ConversationHandler
- Updated menus.py: show Login button for unauthenticated users
- Updated API client: verify_email() and login_with_email() methods
- Database: email_auth_codes table with cleanup task

**Configuration:**
- Added SMTP configuration to telegram-bot .env.example
- Added AUTH_SESSION_SECRET to backend .env.example
- Updated .gitignore: exclude temporary files (*.pid, *.checksum, test scripts)

**Dependencies:**
- Added aiosmtplib for async SMTP email sending

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 12:00:46 +02:00

172 lines
5.1 KiB
Python

"""
Email authentication logic with crypto-secure code generation
"""
import secrets
import re
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict
from collections import defaultdict
logger = logging.getLogger(__name__)
# ============================================================================
# RATE LIMITING (In-Memory)
# ============================================================================
# NOTE: For production with multiple bot instances, migrate to Redis
# See "Optional Future Enhancements" section in plan
_rate_limit_store: Dict[str, list] = defaultdict(list)
async def check_rate_limit(
identifier: str,
max_attempts: int = 3,
window_minutes: int = 60
) -> bool:
"""
Check if identifier is within rate limit
Args:
identifier: Email or telegram_user_id (as string)
max_attempts: Maximum attempts allowed
window_minutes: Time window in minutes
Returns:
True if within limit (can proceed), False if exceeded
NOTE: In-memory implementation - resets on bot restart
"""
now = datetime.now()
cutoff = now - timedelta(minutes=window_minutes)
# Clean old attempts
_rate_limit_store[identifier] = [
attempt for attempt in _rate_limit_store[identifier]
if attempt > cutoff
]
# Check limit
if len(_rate_limit_store[identifier]) >= max_attempts:
logger.warning(f"Rate limit exceeded for {identifier}")
return False
# Add new attempt
_rate_limit_store[identifier].append(now)
return True
def clear_rate_limit(identifier: str) -> None:
"""Clear rate limit for identifier (e.g., after successful auth)"""
if identifier in _rate_limit_store:
del _rate_limit_store[identifier]
logger.debug(f"Rate limit cleared for {identifier}")
# ============================================================================
# CODE GENERATION (Crypto-Secure)
# ============================================================================
def generate_email_code() -> str:
"""
Generate crypto-secure 6-digit code
Uses secrets module (not random) for cryptographic security
Returns:
6-digit string (000000 - 999999)
"""
# Generate 6-digit code using secrets (crypto-secure)
code = ''.join(secrets.choice('0123456789') for _ in range(6))
logger.debug(f"Generated email auth code (length: {len(code)})")
return code
# ============================================================================
# EMAIL VALIDATION
# ============================================================================
def is_valid_email_format(email: str) -> bool:
"""
Validate email format (basic regex)
Args:
email: Email address to validate
Returns:
True if format is valid
"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
async def verify_email_in_oracle(email: str) -> Optional[str]:
"""
Verify email exists in Oracle UTILIZATORI table via backend API
Args:
email: Email address to check
Returns:
Oracle username if found and active, None otherwise
NOTE: Uses backend API endpoint /api/telegram/auth/verify-email
"""
try:
from app.api.client import get_backend_client
backend_client = get_backend_client()
# Call backend API to verify email
response = await backend_client.verify_email(email)
if response.get('success'):
username = response.get('username')
logger.info(f"Email verified via backend: {email} -> {username}")
return username
else:
logger.warning(f"Email not found or inactive: {email}")
return None
except Exception as e:
logger.error(f"Error verifying email via backend: {e}", exc_info=True)
return None
# ============================================================================
# SESSION TOKEN GENERATION (Prevent User ID Spoofing)
# ============================================================================
def generate_session_token(telegram_user_id: int, email: str) -> str:
"""
Generate signed session token for backend verification
This prevents user ID spoofing attacks where malicious clients
could impersonate Telegram users by sending arbitrary user IDs
Args:
telegram_user_id: Telegram user ID
email: Verified email address
Returns:
Signed token (simple implementation - upgrade to JWT in future)
NOTE: For production, use proper JWT signing with shared secret
"""
import hashlib
import os
# Get secret from env (should match backend)
secret = os.getenv("AUTH_SESSION_SECRET", "change-me-in-production")
# Create signature: HMAC-like hash
payload = f"{telegram_user_id}:{email}:{secret}"
signature = hashlib.sha256(payload.encode()).hexdigest()[:16]
# Token format: user_id:email:signature
token = f"{telegram_user_id}:{email}:{signature}"
logger.debug(f"Generated session token for user {telegram_user_id}")
return token