""" Email authentication logic with crypto-secure code generation """ import secrets import re import logging from datetime import datetime, timedelta from typing import Optional, Dict from collections import defaultdict logger = logging.getLogger(__name__) # ============================================================================ # RATE LIMITING (In-Memory) # ============================================================================ # NOTE: For production with multiple bot instances, migrate to Redis # See "Optional Future Enhancements" section in plan _rate_limit_store: Dict[str, list] = defaultdict(list) async def check_rate_limit( identifier: str, max_attempts: int = 3, window_minutes: int = 60 ) -> bool: """ Check if identifier is within rate limit Args: identifier: Email or telegram_user_id (as string) max_attempts: Maximum attempts allowed window_minutes: Time window in minutes Returns: True if within limit (can proceed), False if exceeded NOTE: In-memory implementation - resets on bot restart """ now = datetime.now() cutoff = now - timedelta(minutes=window_minutes) # Clean old attempts _rate_limit_store[identifier] = [ attempt for attempt in _rate_limit_store[identifier] if attempt > cutoff ] # Check limit if len(_rate_limit_store[identifier]) >= max_attempts: logger.warning(f"Rate limit exceeded for {identifier}") return False # Add new attempt _rate_limit_store[identifier].append(now) return True def clear_rate_limit(identifier: str) -> None: """Clear rate limit for identifier (e.g., after successful auth)""" if identifier in _rate_limit_store: del _rate_limit_store[identifier] logger.debug(f"Rate limit cleared for {identifier}") # ============================================================================ # CODE GENERATION (Crypto-Secure) # ============================================================================ def generate_email_code() -> str: """ Generate crypto-secure 6-digit code Uses secrets module (not random) for cryptographic security Returns: 6-digit string (000000 - 999999) """ # Generate 6-digit code using secrets (crypto-secure) code = ''.join(secrets.choice('0123456789') for _ in range(6)) logger.debug(f"Generated email auth code (length: {len(code)})") return code # ============================================================================ # EMAIL VALIDATION # ============================================================================ def is_valid_email_format(email: str) -> bool: """ Validate email format (basic regex) Args: email: Email address to validate Returns: True if format is valid """ pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) async def verify_email_in_oracle(email: str) -> Optional[str]: """ Verify email exists in Oracle UTILIZATORI table via backend API Args: email: Email address to check Returns: Oracle username if found and active, None otherwise NOTE: Uses backend API endpoint /api/telegram/auth/verify-email """ try: from app.api.client import get_backend_client backend_client = get_backend_client() # Call backend API to verify email response = await backend_client.verify_email(email) if response.get('success'): username = response.get('username') logger.info(f"Email verified via backend: {email} -> {username}") return username else: logger.warning(f"Email not found or inactive: {email}") return None except Exception as e: logger.error(f"Error verifying email via backend: {e}", exc_info=True) return None # ============================================================================ # SESSION TOKEN GENERATION (Prevent User ID Spoofing) # ============================================================================ def generate_session_token(telegram_user_id: int, email: str) -> str: """ Generate signed session token for backend verification This prevents user ID spoofing attacks where malicious clients could impersonate Telegram users by sending arbitrary user IDs Args: telegram_user_id: Telegram user ID email: Verified email address Returns: Signed token (simple implementation - upgrade to JWT in future) NOTE: For production, use proper JWT signing with shared secret """ import hashlib import os # Get secret from env (should match backend) secret = os.getenv("AUTH_SESSION_SECRET", "change-me-in-production") # Create signature: HMAC-like hash payload = f"{telegram_user_id}:{email}:{secret}" signature = hashlib.sha256(payload.encode()).hexdigest()[:16] # Token format: user_id:email:signature token = f"{telegram_user_id}:{email}:{signature}" logger.debug(f"Generated session token for user {telegram_user_id}") return token