Files
roa2web-service-auto/backend/modules/telegram/auth/email_auth.py
Claude Agent 30f55cf18b feat: Add A-Z filter for clients/suppliers in Telegram bot
- Add A-Z alphabetical filter keyboard for clients and suppliers lists
  (same pattern as company selection, without emoji)
- Increase clients/suppliers list pagination from 10 to 20 items per page
- Remove emoji from company A-Z filter button for consistency
- Add 6 new callback handlers: clients_alpha_menu, clients_alpha:LETTER,
  clients_alpha_page:PAGE:LETTER, and supplier equivalents
- Dashboard service and models updates
- Telegram bot: email handlers, auth, DB operations, internal API improvements
- Frontend: dashboard cards updates (CashFlow, Clienti, Furnizori, Treasury)
- Frontend: SolduriCompactCard and CollapsibleCard improvements
- DashboardView enhancements
- start.sh and run-with-restart.sh script updates
- IIS web.config and service worker updates

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 14:34:15 +00:00

173 lines
5.2 KiB
Python

"""
Email authentication logic with crypto-secure code generation
"""
import secrets
import re
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict
from collections import defaultdict
logger = logging.getLogger(__name__)
# ============================================================================
# RATE LIMITING (In-Memory)
# ============================================================================
# NOTE: For production with multiple bot instances, migrate to Redis
# See "Optional Future Enhancements" section in plan
_rate_limit_store: Dict[str, list] = defaultdict(list)
async def check_rate_limit(
identifier: str,
max_attempts: int = 3,
window_minutes: int = 60
) -> bool:
"""
Check if identifier is within rate limit
Args:
identifier: Email or telegram_user_id (as string)
max_attempts: Maximum attempts allowed
window_minutes: Time window in minutes
Returns:
True if within limit (can proceed), False if exceeded
NOTE: In-memory implementation - resets on bot restart
"""
now = datetime.now()
cutoff = now - timedelta(minutes=window_minutes)
# Clean old attempts
_rate_limit_store[identifier] = [
attempt for attempt in _rate_limit_store[identifier]
if attempt > cutoff
]
# Check limit
if len(_rate_limit_store[identifier]) >= max_attempts:
logger.warning(f"Rate limit exceeded for {identifier}")
return False
# Add new attempt
_rate_limit_store[identifier].append(now)
return True
def clear_rate_limit(identifier: str) -> None:
"""Clear rate limit for identifier (e.g., after successful auth)"""
if identifier in _rate_limit_store:
del _rate_limit_store[identifier]
logger.debug(f"Rate limit cleared for {identifier}")
# ============================================================================
# CODE GENERATION (Crypto-Secure)
# ============================================================================
def generate_email_code() -> str:
"""
Generate crypto-secure 6-digit code
Uses secrets module (not random) for cryptographic security
Returns:
6-digit string (000000 - 999999)
"""
# Generate 6-digit code using secrets (crypto-secure)
code = ''.join(secrets.choice('0123456789') for _ in range(6))
logger.debug(f"Generated email auth code (length: {len(code)})")
return code
# ============================================================================
# EMAIL VALIDATION
# ============================================================================
def is_valid_email_format(email: str) -> bool:
"""
Validate email format (basic regex)
Args:
email: Email address to validate
Returns:
True if format is valid
"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
async def verify_email_in_oracle(email: str, server_id: Optional[str] = None) -> Optional[str]:
"""
Verify email exists in Oracle UTILIZATORI table via backend API
Args:
email: Email address to check
server_id: Optional Oracle server ID (for multi-server mode)
Returns:
Oracle username if found and active, None otherwise
NOTE: Uses backend API endpoint /api/telegram/auth/verify-email
"""
try:
from backend.modules.telegram.api.client import get_backend_client
backend_client = get_backend_client()
# Call backend API to verify email (on specific server if provided)
response = await backend_client.verify_email(email, server_id=server_id)
if response.get('success'):
username = response.get('username')
logger.info(f"Email verified via backend: {email} -> {username}")
return username
else:
logger.warning(f"Email not found or inactive: {email}")
return None
except Exception as e:
logger.error(f"Error verifying email via backend: {e}", exc_info=True)
return None
# ============================================================================
# SESSION TOKEN GENERATION (Prevent User ID Spoofing)
# ============================================================================
def generate_session_token(telegram_user_id: int, email: str) -> str:
"""
Generate signed session token for backend verification
This prevents user ID spoofing attacks where malicious clients
could impersonate Telegram users by sending arbitrary user IDs
Args:
telegram_user_id: Telegram user ID
email: Verified email address
Returns:
Signed token (simple implementation - upgrade to JWT in future)
NOTE: For production, use proper JWT signing with shared secret
"""
import hashlib
import os
# Get secret from env (should match backend)
secret = os.getenv("AUTH_SESSION_SECRET", "change-me-in-production")
# Create signature: HMAC-like hash
payload = f"{telegram_user_id}:{email}:{secret}"
signature = hashlib.sha256(payload.encode()).hexdigest()[:16]
# Token format: user_id:email:signature
token = f"{telegram_user_id}:{email}:{signature}"
logger.debug(f"Generated session token for user {telegram_user_id}")
return token