282 lines
8.9 KiB
Python
282 lines
8.9 KiB
Python
"""
|
|
OTP (One-Time Password) Service pentru Web 2FA
|
|
|
|
Stochează OTP-urile în memorie (dict singleton).
|
|
Nu e nevoie de SQLite — OTP-urile expiră în 5 minute și backend-ul
|
|
rulează cu --workers 1 (single-worker garantat).
|
|
|
|
Utilizare:
|
|
code = await create_otp(email, username, server_id)
|
|
result = verify_otp(email, code)
|
|
# result: {"success": True, "username": ..., "server_id": ...}
|
|
# result: {"success": False, "error": ..., "error_code": ...}
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, Optional
|
|
|
|
from backend.modules.telegram.auth.email_auth import generate_email_code
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ============================================================================
|
|
# CONSTANTE
|
|
# ============================================================================
|
|
|
|
OTP_EXPIRY_MINUTES = 5 # Codul expiră după 5 minute
|
|
OTP_MAX_ATTEMPTS = 5 # Max încercări greșite per cod
|
|
OTP_MAX_SENDS_PER_WINDOW = 3 # Max coduri trimise în fereastra de rate limit
|
|
OTP_RATE_WINDOW_MINUTES = 10 # Fereastra de rate limiting (minute)
|
|
|
|
|
|
# ============================================================================
|
|
# IN-MEMORY STORE
|
|
# ============================================================================
|
|
|
|
# Structura entry:
|
|
# {
|
|
# "email@domeniu.ro": {
|
|
# "code": "483921",
|
|
# "username": "MARIUS M",
|
|
# "server_id": "romfast" | None,
|
|
# "expires_at": datetime,
|
|
# "attempts": 0,
|
|
# "send_count": 1,
|
|
# "created_at": datetime, # pentru rate limiting (fereastra de 10 min)
|
|
# }
|
|
# }
|
|
_otp_store: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
# ============================================================================
|
|
# FUNCȚII UTILITARE
|
|
# ============================================================================
|
|
|
|
def _mask_email(email: str) -> str:
|
|
"""
|
|
Maschează emailul pentru afișare sigură.
|
|
|
|
Exemple:
|
|
"marius@romfast.ro" → "m***@romfast.ro"
|
|
"ab@domeniu.ro" → "a***@domeniu.ro"
|
|
"x@y.com" → "x***@y.com"
|
|
"@invalid" → "***"
|
|
"""
|
|
if "@" not in email:
|
|
return "***"
|
|
local, domain = email.split("@", 1)
|
|
masked_local = (local[0] + "***") if local else "***"
|
|
return f"{masked_local}@{domain}"
|
|
|
|
|
|
def _is_rate_limited(email: str) -> bool:
|
|
"""
|
|
Verifică dacă emailul a depășit limita de trimiteri OTP.
|
|
|
|
Limita: OTP_MAX_SENDS_PER_WINDOW trimiteri în OTP_RATE_WINDOW_MINUTES minute.
|
|
Returnează True dacă este rate limited (nu mai poate trimite cod).
|
|
"""
|
|
entry = _otp_store.get(email)
|
|
if not entry:
|
|
return False
|
|
|
|
# Dacă entry-ul a expirat, îl ștergem și permitem un nou OTP
|
|
if datetime.now() > entry["expires_at"]:
|
|
del _otp_store[email]
|
|
return False
|
|
|
|
# Verificăm fereastra de rate limit (separată de expiry-ul codului)
|
|
window_end = entry["created_at"] + timedelta(minutes=OTP_RATE_WINDOW_MINUTES)
|
|
if datetime.now() > window_end:
|
|
# Fereastra de rate limit a expirat — permitem trimitere nouă
|
|
return False
|
|
|
|
return entry.get("send_count", 0) >= OTP_MAX_SENDS_PER_WINDOW
|
|
|
|
|
|
def _cleanup_expired() -> None:
|
|
"""Curăță OTP-urile expirate din store. Apelat automat la create_otp."""
|
|
now = datetime.now()
|
|
expired = [email for email, entry in _otp_store.items() if now > entry["expires_at"]]
|
|
for email in expired:
|
|
del _otp_store[email]
|
|
logger.debug(f"[OTP] Cleaned up expired OTP for {email}")
|
|
|
|
|
|
# ============================================================================
|
|
# API PUBLICĂ
|
|
# ============================================================================
|
|
|
|
async def create_otp(
|
|
email: str,
|
|
username: str,
|
|
server_id: Optional[str]
|
|
) -> Optional[str]:
|
|
"""
|
|
Generează și stochează un OTP nou pentru email.
|
|
|
|
Args:
|
|
email: Adresa de email a utilizatorului (lowercase)
|
|
username: Username Oracle (uppercase) — salvat pentru JWT la verificare
|
|
server_id: ID-ul serverului Oracle (pentru multi-server mode)
|
|
|
|
Returns:
|
|
Codul generat (str, 6 cifre) sau None dacă este rate limited
|
|
"""
|
|
email = email.lower().strip()
|
|
|
|
# Curățăm OTP-urile expirate periodic
|
|
_cleanup_expired()
|
|
|
|
# Verificăm rate limiting
|
|
if _is_rate_limited(email):
|
|
logger.warning(f"[OTP] Rate limit exceeded for {email[:3]}*** (max {OTP_MAX_SENDS_PER_WINDOW} sends/{OTP_RATE_WINDOW_MINUTES}min)")
|
|
return None
|
|
|
|
# Generăm cod crypto-secure (6 cifre)
|
|
code = generate_email_code()
|
|
|
|
now = datetime.now()
|
|
existing = _otp_store.get(email)
|
|
|
|
# Păstrăm created_at și send_count din entry-ul anterior (pentru rate limiting corect)
|
|
if existing:
|
|
send_count = existing.get("send_count", 0) + 1
|
|
created_at = existing.get("created_at", now)
|
|
else:
|
|
send_count = 1
|
|
created_at = now
|
|
|
|
_otp_store[email] = {
|
|
"code": code,
|
|
"username": username,
|
|
"server_id": server_id,
|
|
"expires_at": now + timedelta(minutes=OTP_EXPIRY_MINUTES),
|
|
"attempts": 0,
|
|
"send_count": send_count,
|
|
"created_at": created_at,
|
|
}
|
|
|
|
logger.info(f"[OTP] Created OTP for {email[:3]}*** (username={username}, server={server_id}, send_count={send_count})")
|
|
return code
|
|
|
|
|
|
def verify_otp(email: str, code: str) -> Dict[str, Any]:
|
|
"""
|
|
Verifică OTP-ul introdus de utilizator.
|
|
|
|
Args:
|
|
email: Adresa de email
|
|
code: Codul de 6 cifre introdus de utilizator
|
|
|
|
Returns:
|
|
Succes: {"success": True, "username": "MARIUS M", "server_id": "romfast"}
|
|
Eroare: {"success": False, "error": "...", "error_code": "..."}
|
|
|
|
Error codes:
|
|
OTP_NOT_FOUND — nu există OTP activ pentru email
|
|
OTP_EXPIRED — OTP-ul a expirat (5 minute)
|
|
OTP_MAX_ATTEMPTS — prea multe încercări greșite (5)
|
|
OTP_INVALID — codul este greșit
|
|
"""
|
|
email = email.lower().strip()
|
|
entry = _otp_store.get(email)
|
|
|
|
# OTP inexistent
|
|
if not entry:
|
|
logger.warning(f"[OTP] No active OTP for {email[:3]}***")
|
|
return {
|
|
"success": False,
|
|
"error": "Cod invalid sau expirat. Solicitați un cod nou.",
|
|
"error_code": "OTP_NOT_FOUND",
|
|
}
|
|
|
|
# OTP expirat
|
|
if datetime.now() > entry["expires_at"]:
|
|
delete_otp(email)
|
|
logger.info(f"[OTP] Expired OTP attempt for {email[:3]}***")
|
|
return {
|
|
"success": False,
|
|
"error": "Codul a expirat. Solicitați un cod nou.",
|
|
"error_code": "OTP_EXPIRED",
|
|
}
|
|
|
|
# Prea multe încercări greșite
|
|
if entry["attempts"] >= OTP_MAX_ATTEMPTS:
|
|
delete_otp(email)
|
|
logger.warning(f"[OTP] Max attempts reached for {email[:3]}***")
|
|
return {
|
|
"success": False,
|
|
"error": "Prea multe încercări greșite. Solicitați un cod nou.",
|
|
"error_code": "OTP_MAX_ATTEMPTS",
|
|
}
|
|
|
|
# Incrementăm contorul de încercări ÎNAINTE de verificare
|
|
entry["attempts"] += 1
|
|
|
|
# Cod greșit
|
|
if entry["code"] != code.strip():
|
|
remaining = OTP_MAX_ATTEMPTS - entry["attempts"]
|
|
logger.warning(f"[OTP] Invalid code for {email[:3]}*** (attempt {entry['attempts']}, {remaining} remaining)")
|
|
|
|
# La ultima încercare permisă, blocăm imediat și ștergem OTP-ul
|
|
if remaining <= 0:
|
|
delete_otp(email)
|
|
return {
|
|
"success": False,
|
|
"error": "Prea multe încercări greșite. Solicitați un cod nou.",
|
|
"error_code": "OTP_MAX_ATTEMPTS",
|
|
}
|
|
|
|
return {
|
|
"success": False,
|
|
"error": f"Cod incorect. Mai aveți {remaining} {'încercare' if remaining == 1 else 'încercări'}.",
|
|
"error_code": "OTP_INVALID",
|
|
}
|
|
|
|
# Cod corect — ștergem din store și returnăm datele salvate
|
|
username = entry["username"]
|
|
server_id = entry["server_id"]
|
|
delete_otp(email)
|
|
|
|
logger.info(f"[OTP] OTP verified successfully for {email[:3]}*** (username={username})")
|
|
return {
|
|
"success": True,
|
|
"username": username,
|
|
"server_id": server_id,
|
|
}
|
|
|
|
|
|
def delete_otp(email: str) -> None:
|
|
"""
|
|
Șterge OTP-ul pentru email din store.
|
|
Apelat după verificare reușită sau când utilizatorul renunță.
|
|
"""
|
|
email = email.lower().strip()
|
|
if email in _otp_store:
|
|
del _otp_store[email]
|
|
logger.debug(f"[OTP] Deleted OTP for {email[:3]}***")
|
|
|
|
|
|
def get_otp_entry(email: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Returnează entry-ul OTP pentru email (read-only).
|
|
Folosit de endpoint-ul /resend-2fa-code pentru a verifica că sesiunea există.
|
|
|
|
Returns:
|
|
Dict cu datele OTP sau None dacă nu există / a expirat
|
|
"""
|
|
email = email.lower().strip()
|
|
entry = _otp_store.get(email)
|
|
|
|
if not entry:
|
|
return None
|
|
|
|
# Verificăm că nu e expirat
|
|
if datetime.now() > entry["expires_at"]:
|
|
delete_otp(email)
|
|
return None
|
|
|
|
return entry
|