""" OTP (One-Time Password) Service pentru Web 2FA Stochează OTP-urile în memorie (dict singleton). Nu e nevoie de SQLite — OTP-urile expiră în 5 minute și backend-ul rulează cu --workers 1 (single-worker garantat). Utilizare: code = await create_otp(email, username, server_id) result = verify_otp(email, code) # result: {"success": True, "username": ..., "server_id": ...} # result: {"success": False, "error": ..., "error_code": ...} """ import logging from datetime import datetime, timedelta from typing import Any, Dict, Optional from backend.modules.telegram.auth.email_auth import generate_email_code logger = logging.getLogger(__name__) # ============================================================================ # CONSTANTE # ============================================================================ OTP_EXPIRY_MINUTES = 5 # Codul expiră după 5 minute OTP_MAX_ATTEMPTS = 5 # Max încercări greșite per cod OTP_MAX_SENDS_PER_WINDOW = 3 # Max coduri trimise în fereastra de rate limit OTP_RATE_WINDOW_MINUTES = 10 # Fereastra de rate limiting (minute) # ============================================================================ # IN-MEMORY STORE # ============================================================================ # Structura entry: # { # "email@domeniu.ro": { # "code": "483921", # "username": "MARIUS M", # "server_id": "romfast" | None, # "expires_at": datetime, # "attempts": 0, # "send_count": 1, # "created_at": datetime, # pentru rate limiting (fereastra de 10 min) # } # } _otp_store: Dict[str, Dict[str, Any]] = {} # ============================================================================ # FUNCȚII UTILITARE # ============================================================================ def _mask_email(email: str) -> str: """ Maschează emailul pentru afișare sigură. Exemple: "marius@romfast.ro" → "m***@romfast.ro" "ab@domeniu.ro" → "a***@domeniu.ro" "x@y.com" → "x***@y.com" "@invalid" → "***" """ if "@" not in email: return "***" local, domain = email.split("@", 1) masked_local = (local[0] + "***") if local else "***" return f"{masked_local}@{domain}" def _is_rate_limited(email: str) -> bool: """ Verifică dacă emailul a depășit limita de trimiteri OTP. Limita: OTP_MAX_SENDS_PER_WINDOW trimiteri în OTP_RATE_WINDOW_MINUTES minute. Returnează True dacă este rate limited (nu mai poate trimite cod). """ entry = _otp_store.get(email) if not entry: return False # Dacă entry-ul a expirat, îl ștergem și permitem un nou OTP if datetime.now() > entry["expires_at"]: del _otp_store[email] return False # Verificăm fereastra de rate limit (separată de expiry-ul codului) window_end = entry["created_at"] + timedelta(minutes=OTP_RATE_WINDOW_MINUTES) if datetime.now() > window_end: # Fereastra de rate limit a expirat — permitem trimitere nouă return False return entry.get("send_count", 0) >= OTP_MAX_SENDS_PER_WINDOW def _cleanup_expired() -> None: """Curăță OTP-urile expirate din store. Apelat automat la create_otp.""" now = datetime.now() expired = [email for email, entry in _otp_store.items() if now > entry["expires_at"]] for email in expired: del _otp_store[email] logger.debug(f"[OTP] Cleaned up expired OTP for {email}") # ============================================================================ # API PUBLICĂ # ============================================================================ async def create_otp( email: str, username: str, server_id: Optional[str] ) -> Optional[str]: """ Generează și stochează un OTP nou pentru email. Args: email: Adresa de email a utilizatorului (lowercase) username: Username Oracle (uppercase) — salvat pentru JWT la verificare server_id: ID-ul serverului Oracle (pentru multi-server mode) Returns: Codul generat (str, 6 cifre) sau None dacă este rate limited """ email = email.lower().strip() # Curățăm OTP-urile expirate periodic _cleanup_expired() # Verificăm rate limiting if _is_rate_limited(email): logger.warning(f"[OTP] Rate limit exceeded for {email[:3]}*** (max {OTP_MAX_SENDS_PER_WINDOW} sends/{OTP_RATE_WINDOW_MINUTES}min)") return None # Generăm cod crypto-secure (6 cifre) code = generate_email_code() now = datetime.now() existing = _otp_store.get(email) # Păstrăm created_at și send_count din entry-ul anterior (pentru rate limiting corect) if existing: send_count = existing.get("send_count", 0) + 1 created_at = existing.get("created_at", now) else: send_count = 1 created_at = now _otp_store[email] = { "code": code, "username": username, "server_id": server_id, "expires_at": now + timedelta(minutes=OTP_EXPIRY_MINUTES), "attempts": 0, "send_count": send_count, "created_at": created_at, } logger.info(f"[OTP] Created OTP for {email[:3]}*** (username={username}, server={server_id}, send_count={send_count})") return code def verify_otp(email: str, code: str) -> Dict[str, Any]: """ Verifică OTP-ul introdus de utilizator. Args: email: Adresa de email code: Codul de 6 cifre introdus de utilizator Returns: Succes: {"success": True, "username": "MARIUS M", "server_id": "romfast"} Eroare: {"success": False, "error": "...", "error_code": "..."} Error codes: OTP_NOT_FOUND — nu există OTP activ pentru email OTP_EXPIRED — OTP-ul a expirat (5 minute) OTP_MAX_ATTEMPTS — prea multe încercări greșite (5) OTP_INVALID — codul este greșit """ email = email.lower().strip() entry = _otp_store.get(email) # OTP inexistent if not entry: logger.warning(f"[OTP] No active OTP for {email[:3]}***") return { "success": False, "error": "Cod invalid sau expirat. Solicitați un cod nou.", "error_code": "OTP_NOT_FOUND", } # OTP expirat if datetime.now() > entry["expires_at"]: delete_otp(email) logger.info(f"[OTP] Expired OTP attempt for {email[:3]}***") return { "success": False, "error": "Codul a expirat. Solicitați un cod nou.", "error_code": "OTP_EXPIRED", } # Prea multe încercări greșite if entry["attempts"] >= OTP_MAX_ATTEMPTS: delete_otp(email) logger.warning(f"[OTP] Max attempts reached for {email[:3]}***") return { "success": False, "error": "Prea multe încercări greșite. Solicitați un cod nou.", "error_code": "OTP_MAX_ATTEMPTS", } # Incrementăm contorul de încercări ÎNAINTE de verificare entry["attempts"] += 1 # Cod greșit if entry["code"] != code.strip(): remaining = OTP_MAX_ATTEMPTS - entry["attempts"] logger.warning(f"[OTP] Invalid code for {email[:3]}*** (attempt {entry['attempts']}, {remaining} remaining)") # La ultima încercare permisă, blocăm imediat și ștergem OTP-ul if remaining <= 0: delete_otp(email) return { "success": False, "error": "Prea multe încercări greșite. Solicitați un cod nou.", "error_code": "OTP_MAX_ATTEMPTS", } return { "success": False, "error": f"Cod incorect. Mai aveți {remaining} {'încercare' if remaining == 1 else 'încercări'}.", "error_code": "OTP_INVALID", } # Cod corect — ștergem din store și returnăm datele salvate username = entry["username"] server_id = entry["server_id"] delete_otp(email) logger.info(f"[OTP] OTP verified successfully for {email[:3]}*** (username={username})") return { "success": True, "username": username, "server_id": server_id, } def delete_otp(email: str) -> None: """ Șterge OTP-ul pentru email din store. Apelat după verificare reușită sau când utilizatorul renunță. """ email = email.lower().strip() if email in _otp_store: del _otp_store[email] logger.debug(f"[OTP] Deleted OTP for {email[:3]}***") def get_otp_entry(email: str) -> Optional[Dict[str, Any]]: """ Returnează entry-ul OTP pentru email (read-only). Folosit de endpoint-ul /resend-2fa-code pentru a verifica că sesiunea există. Returns: Dict cu datele OTP sau None dacă nu există / a expirat """ email = email.lower().strip() entry = _otp_store.get(email) if not entry: return None # Verificăm că nu e expirat if datetime.now() > entry["expires_at"]: delete_otp(email) return None return entry