Files
roa2web-service-auto/backend/modules/telegram/auth/email_auth.py
Marius Mutu c5e051ad80 feat: Migrate to ultrathin monolith architecture
Consolidate 3 separate applications (reports-app, data-entry-app, telegram-bot) into a unified
architecture with single backend and frontend:

Backend Changes:
- Unified FastAPI backend at backend/ with modular structure
- Modules: reports, data_entry, telegram in backend/modules/
- Centralized config.py and main.py with all routers registered
- Single worker mode (--workers 1) for Telegram bot compatibility
- Shared Oracle connection pool and JWT authentication
- Unified requirements.txt and environment configuration

Frontend Changes:
- Single Vue.js SPA with module-based routing
- Unified frontend at src/ with modules in src/modules/{reports,data-entry}/
- Shared components and stores in src/shared/
- Error boundaries for module isolation
- Dual API proxy in Vite for module communication

Infrastructure:
- New unified startup scripts: start-prod.sh, start-test.sh, start-backend.sh
- Environment templates: .env.dev.example, .env.test.example, .env.prod.example
- Updated deployment scripts for Windows IIS
- Simplified SSH tunnel management

Documentation:
- Comprehensive CLAUDE.md with architecture overview
- Module-specific docs in docs/{data-entry,telegram}/
- Architecture decision records in docs/ARCHITECTURE-DECISIONS.md
- Deployment guides consolidated in deployment/windows/docs/

This migration reduces complexity, improves maintainability, and enables easier
deployment while maintaining all existing functionality.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-29 23:48:14 +02:00

172 lines
5.1 KiB
Python

"""
Email authentication logic with crypto-secure code generation
"""
import secrets
import re
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict
from collections import defaultdict
logger = logging.getLogger(__name__)
# ============================================================================
# RATE LIMITING (In-Memory)
# ============================================================================
# NOTE: For production with multiple bot instances, migrate to Redis
# See "Optional Future Enhancements" section in plan
_rate_limit_store: Dict[str, list] = defaultdict(list)
async def check_rate_limit(
identifier: str,
max_attempts: int = 3,
window_minutes: int = 60
) -> bool:
"""
Check if identifier is within rate limit
Args:
identifier: Email or telegram_user_id (as string)
max_attempts: Maximum attempts allowed
window_minutes: Time window in minutes
Returns:
True if within limit (can proceed), False if exceeded
NOTE: In-memory implementation - resets on bot restart
"""
now = datetime.now()
cutoff = now - timedelta(minutes=window_minutes)
# Clean old attempts
_rate_limit_store[identifier] = [
attempt for attempt in _rate_limit_store[identifier]
if attempt > cutoff
]
# Check limit
if len(_rate_limit_store[identifier]) >= max_attempts:
logger.warning(f"Rate limit exceeded for {identifier}")
return False
# Add new attempt
_rate_limit_store[identifier].append(now)
return True
def clear_rate_limit(identifier: str) -> None:
"""Clear rate limit for identifier (e.g., after successful auth)"""
if identifier in _rate_limit_store:
del _rate_limit_store[identifier]
logger.debug(f"Rate limit cleared for {identifier}")
# ============================================================================
# CODE GENERATION (Crypto-Secure)
# ============================================================================
def generate_email_code() -> str:
"""
Generate crypto-secure 6-digit code
Uses secrets module (not random) for cryptographic security
Returns:
6-digit string (000000 - 999999)
"""
# Generate 6-digit code using secrets (crypto-secure)
code = ''.join(secrets.choice('0123456789') for _ in range(6))
logger.debug(f"Generated email auth code (length: {len(code)})")
return code
# ============================================================================
# EMAIL VALIDATION
# ============================================================================
def is_valid_email_format(email: str) -> bool:
"""
Validate email format (basic regex)
Args:
email: Email address to validate
Returns:
True if format is valid
"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
async def verify_email_in_oracle(email: str) -> Optional[str]:
"""
Verify email exists in Oracle UTILIZATORI table via backend API
Args:
email: Email address to check
Returns:
Oracle username if found and active, None otherwise
NOTE: Uses backend API endpoint /api/telegram/auth/verify-email
"""
try:
from backend.modules.telegram.api.client import get_backend_client
backend_client = get_backend_client()
# Call backend API to verify email
response = await backend_client.verify_email(email)
if response.get('success'):
username = response.get('username')
logger.info(f"Email verified via backend: {email} -> {username}")
return username
else:
logger.warning(f"Email not found or inactive: {email}")
return None
except Exception as e:
logger.error(f"Error verifying email via backend: {e}", exc_info=True)
return None
# ============================================================================
# SESSION TOKEN GENERATION (Prevent User ID Spoofing)
# ============================================================================
def generate_session_token(telegram_user_id: int, email: str) -> str:
"""
Generate signed session token for backend verification
This prevents user ID spoofing attacks where malicious clients
could impersonate Telegram users by sending arbitrary user IDs
Args:
telegram_user_id: Telegram user ID
email: Verified email address
Returns:
Signed token (simple implementation - upgrade to JWT in future)
NOTE: For production, use proper JWT signing with shared secret
"""
import hashlib
import os
# Get secret from env (should match backend)
secret = os.getenv("AUTH_SESSION_SECRET", "change-me-in-production")
# Create signature: HMAC-like hash
payload = f"{telegram_user_id}:{email}:{secret}"
signature = hashlib.sha256(payload.encode()).hexdigest()[:16]
# Token format: user_id:email:signature
token = f"{telegram_user_id}:{email}:{signature}"
logger.debug(f"Generated session token for user {telegram_user_id}")
return token