fix telegram
This commit is contained in:
@@ -0,0 +1,172 @@
|
||||
"""
|
||||
Email authentication logic with crypto-secure code generation
|
||||
"""
|
||||
import secrets
|
||||
import re
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict
|
||||
from collections import defaultdict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# RATE LIMITING (In-Memory)
|
||||
# ============================================================================
|
||||
# NOTE: For production with multiple bot instances, migrate to Redis
|
||||
# See "Optional Future Enhancements" section in plan
|
||||
|
||||
_rate_limit_store: Dict[str, list] = defaultdict(list)
|
||||
|
||||
|
||||
async def check_rate_limit(
|
||||
identifier: str,
|
||||
max_attempts: int = 3,
|
||||
window_minutes: int = 60
|
||||
) -> bool:
|
||||
"""
|
||||
Check if identifier is within rate limit
|
||||
|
||||
Args:
|
||||
identifier: Email or telegram_user_id (as string)
|
||||
max_attempts: Maximum attempts allowed
|
||||
window_minutes: Time window in minutes
|
||||
|
||||
Returns:
|
||||
True if within limit (can proceed), False if exceeded
|
||||
|
||||
NOTE: In-memory implementation - resets on bot restart
|
||||
"""
|
||||
now = datetime.now()
|
||||
cutoff = now - timedelta(minutes=window_minutes)
|
||||
|
||||
# Clean old attempts
|
||||
_rate_limit_store[identifier] = [
|
||||
attempt for attempt in _rate_limit_store[identifier]
|
||||
if attempt > cutoff
|
||||
]
|
||||
|
||||
# Check limit
|
||||
if len(_rate_limit_store[identifier]) >= max_attempts:
|
||||
logger.warning(f"Rate limit exceeded for {identifier}")
|
||||
return False
|
||||
|
||||
# Add new attempt
|
||||
_rate_limit_store[identifier].append(now)
|
||||
return True
|
||||
|
||||
|
||||
def clear_rate_limit(identifier: str) -> None:
|
||||
"""Clear rate limit for identifier (e.g., after successful auth)"""
|
||||
if identifier in _rate_limit_store:
|
||||
del _rate_limit_store[identifier]
|
||||
logger.debug(f"Rate limit cleared for {identifier}")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CODE GENERATION (Crypto-Secure)
|
||||
# ============================================================================
|
||||
|
||||
def generate_email_code() -> str:
|
||||
"""
|
||||
Generate crypto-secure 6-digit code
|
||||
|
||||
Uses secrets module (not random) for cryptographic security
|
||||
|
||||
Returns:
|
||||
6-digit string (000000 - 999999)
|
||||
"""
|
||||
# Generate 6-digit code using secrets (crypto-secure)
|
||||
code = ''.join(secrets.choice('0123456789') for _ in range(6))
|
||||
|
||||
logger.debug(f"Generated email auth code (length: {len(code)})")
|
||||
return code
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# EMAIL VALIDATION
|
||||
# ============================================================================
|
||||
|
||||
def is_valid_email_format(email: str) -> bool:
|
||||
"""
|
||||
Validate email format (basic regex)
|
||||
|
||||
Args:
|
||||
email: Email address to validate
|
||||
|
||||
Returns:
|
||||
True if format is valid
|
||||
"""
|
||||
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
||||
return bool(re.match(pattern, email))
|
||||
|
||||
|
||||
async def verify_email_in_oracle(email: str, server_id: Optional[str] = None) -> Optional[str]:
|
||||
"""
|
||||
Verify email exists in Oracle UTILIZATORI table via backend API
|
||||
|
||||
Args:
|
||||
email: Email address to check
|
||||
server_id: Optional Oracle server ID (for multi-server mode)
|
||||
|
||||
Returns:
|
||||
Oracle username if found and active, None otherwise
|
||||
|
||||
NOTE: Uses backend API endpoint /api/telegram/auth/verify-email
|
||||
"""
|
||||
try:
|
||||
from backend.modules.telegram.api.client import get_backend_client
|
||||
|
||||
backend_client = get_backend_client()
|
||||
|
||||
# Call backend API to verify email (on specific server if provided)
|
||||
response = await backend_client.verify_email(email, server_id=server_id)
|
||||
|
||||
if response.get('success'):
|
||||
username = response.get('username')
|
||||
logger.info(f"Email verified via backend: {email} -> {username}")
|
||||
return username
|
||||
else:
|
||||
logger.warning(f"Email not found or inactive: {email}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error verifying email via backend: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# SESSION TOKEN GENERATION (Prevent User ID Spoofing)
|
||||
# ============================================================================
|
||||
|
||||
def generate_session_token(telegram_user_id: int, email: str) -> str:
|
||||
"""
|
||||
Generate signed session token for backend verification
|
||||
|
||||
This prevents user ID spoofing attacks where malicious clients
|
||||
could impersonate Telegram users by sending arbitrary user IDs
|
||||
|
||||
Args:
|
||||
telegram_user_id: Telegram user ID
|
||||
email: Verified email address
|
||||
|
||||
Returns:
|
||||
Signed token (simple implementation - upgrade to JWT in future)
|
||||
|
||||
NOTE: For production, use proper JWT signing with shared secret
|
||||
"""
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
# Get secret from env (should match backend)
|
||||
secret = os.getenv("AUTH_SESSION_SECRET", "change-me-in-production")
|
||||
|
||||
# Create signature: HMAC-like hash
|
||||
payload = f"{telegram_user_id}:{email}:{secret}"
|
||||
signature = hashlib.sha256(payload.encode()).hexdigest()[:16]
|
||||
|
||||
# Token format: user_id:email:signature
|
||||
token = f"{telegram_user_id}:{email}:{signature}"
|
||||
|
||||
logger.debug(f"Generated session token for user {telegram_user_id}")
|
||||
return token
|
||||
Reference in New Issue
Block a user