- Persist 2FA state in sessionStorage to survive mobile page reloads - Reuse existing valid OTP on re-login to avoid rate limiting and duplicate emails - Add embedded sparkline charts to SolduriCompactCard with expand toggle - Mobile dashboard redesigned: 2 pages with enriched compact cards + cashflow type - Login UI simplified: remove gradient bg, subtitle, icon; use design tokens - Focus OTP input when session is restored from 2FA state Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1021 lines
41 KiB
Python
1021 lines
41 KiB
Python
"""
|
|
Authentication Routes Template pentru ROA2WEB FastAPI Applications
|
|
|
|
Acest modul oferă rute predefinite pentru autentificare care pot fi integrate
|
|
în orice aplicație FastAPI din ecosistemul ROA2WEB.
|
|
|
|
Endpoints disponibile:
|
|
- POST /auth/login - Autentificare utilizator
|
|
- POST /auth/refresh - Refresh access token
|
|
- POST /auth/logout - Deconectare utilizator
|
|
- GET /auth/me - Informații utilizator curent
|
|
- GET /auth/companies - Firmele utilizatorului
|
|
- GET /auth/status - Status autentificare
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request, Response
|
|
from fastapi.security import HTTPAuthorizationCredentials
|
|
|
|
from .models import (
|
|
LoginRequest, TokenResponse, RefreshTokenRequest, LogoutRequest,
|
|
CurrentUser, UserCompany, CompanyAccessRequest, CompanyAccessResponse,
|
|
AuthError, AuthStats, CheckEmailRequest, CheckEmailResponse, ServerInfo,
|
|
CheckIdentityRequest, CheckIdentityResponse,
|
|
LoginRequires2FAResponse, Verify2FARequest, Resend2FARequest,
|
|
VerifyBackupCodeRequest,
|
|
)
|
|
from .auth_service import auth_service, AuthenticationError
|
|
from .jwt_handler import jwt_handler
|
|
from .dependencies import (
|
|
get_current_user, get_optional_user,
|
|
security_required, security_optional
|
|
)
|
|
from .middleware import default_rate_limiter, RateLimiter
|
|
from .otp_service import (
|
|
create_otp, verify_otp, get_otp_entry, _mask_email
|
|
)
|
|
from .trusted_device_service import (
|
|
create_trusted_device_token, verify_trusted_device_token
|
|
)
|
|
from .backup_codes_service import (
|
|
generate_backup_codes, verify_backup_code,
|
|
has_backup_codes, get_remaining_count
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_auth_router(
|
|
prefix: str = "/auth",
|
|
tags: Optional[List[str]] = None,
|
|
include_admin_routes: bool = False
|
|
) -> APIRouter:
|
|
"""
|
|
Creează un router FastAPI cu toate rutele de autentificare
|
|
|
|
Args:
|
|
prefix: Prefix-ul pentru toate rutele
|
|
tags: Tag-urile pentru documentația OpenAPI
|
|
include_admin_routes: Dacă să includă rutele de administrare
|
|
|
|
Returns:
|
|
Router-ul FastAPI configurat
|
|
"""
|
|
router = APIRouter(prefix=prefix, tags=tags or ["authentication"])
|
|
|
|
# Rate limiter pentru check-identity/check-email: 5 requests per minut per IP
|
|
check_identity_rate_limiter = RateLimiter(max_requests=5, time_window=60)
|
|
|
|
# Rate limitere pentru 2FA
|
|
verify_2fa_rate_limiter = RateLimiter(max_requests=10, time_window=300) # 10 req / 5 min per IP
|
|
resend_2fa_rate_limiter = RateLimiter(max_requests=3, time_window=600) # 3 req / 10 min per IP
|
|
|
|
# -------------------------------------------------------------------------
|
|
# HELPER FUNCTIONS (private, în scope-ul create_auth_router)
|
|
# -------------------------------------------------------------------------
|
|
|
|
async def _get_email_for_username(username: str, server_id: Optional[str]) -> Optional[str]:
|
|
"""
|
|
Caută emailul unui utilizator în Oracle după username.
|
|
Returnează emailul lowercase sau None dacă nu există / nu e setat.
|
|
"""
|
|
try:
|
|
from shared.database.oracle_pool import oracle_pool
|
|
async with oracle_pool.get_connection(server_id) as connection:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT LOWER(TRIM(EMAIL))
|
|
FROM CONTAFIN_ORACLE.UTILIZATORI
|
|
WHERE UPPER(UTILIZATOR) = :username
|
|
AND INACTIV = 0
|
|
AND STERS = 0
|
|
AND EMAIL IS NOT NULL
|
|
AND TRIM(EMAIL) IS NOT NULL
|
|
""", {"username": username.upper()})
|
|
row = cursor.fetchone()
|
|
if row and row[0] and "@" in row[0]:
|
|
return row[0].strip()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"[2FA] Error getting email for username '{username}': {e}")
|
|
return None
|
|
|
|
async def _create_token_response_for_user(
|
|
username: str,
|
|
server_id: Optional[str],
|
|
response: Response
|
|
) -> TokenResponse:
|
|
"""
|
|
Creează TokenResponse complet pentru un utilizator deja verificat.
|
|
Folosit după verificarea OTP (pasul 2 al 2FA) — fără re-verificare parolă.
|
|
"""
|
|
companies = await auth_service.get_user_companies(username, server_id)
|
|
permissions = await auth_service.get_user_permissions(
|
|
username, companies[0] if companies else "", server_id
|
|
)
|
|
|
|
jwt_tokens = jwt_handler.create_token_response(
|
|
username=username,
|
|
companies=companies,
|
|
user_id=None,
|
|
permissions=permissions,
|
|
server_id=server_id,
|
|
)
|
|
|
|
current_user = CurrentUser(
|
|
username=username,
|
|
user_id=None,
|
|
companies=companies,
|
|
permissions=permissions,
|
|
)
|
|
|
|
response.headers["X-Content-Type-Options"] = "nosniff"
|
|
response.headers["X-Frame-Options"] = "DENY"
|
|
|
|
return TokenResponse(
|
|
access_token=jwt_tokens.access_token,
|
|
refresh_token=jwt_tokens.refresh_token,
|
|
token_type=jwt_tokens.token_type,
|
|
expires_in=jwt_tokens.expires_in,
|
|
user=current_user,
|
|
)
|
|
|
|
@router.post("/check-identity", response_model=CheckIdentityResponse, status_code=status.HTTP_200_OK)
|
|
async def check_identity(
|
|
check_data: CheckIdentityRequest,
|
|
request: Request
|
|
) -> CheckIdentityResponse:
|
|
"""
|
|
Verifică dacă un email sau username există în sistem și pe câte servere Oracle (US-013).
|
|
|
|
Acest endpoint suportă dual login:
|
|
- Input cu '@': tratează ca email și caută în EmailServerCache
|
|
- Input fără '@': tratează ca username și caută direct în Oracle
|
|
|
|
SECURITATE:
|
|
- Rate limited: max 5 requests/minut per IP
|
|
- NU expune serverele disponibile pentru identități invalide
|
|
- Identități invalide returnează {exists: false, servers: []}
|
|
|
|
Args:
|
|
check_data: Identitatea de verificat (email sau username)
|
|
request: Request-ul HTTP (pentru rate limiting)
|
|
|
|
Returns:
|
|
CheckIdentityResponse cu exists, servers[] și identity_type
|
|
|
|
Raises:
|
|
HTTPException 429: Rate limit exceeded
|
|
"""
|
|
# Rate limiting - 5 req/min per IP
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
|
|
if not check_identity_rate_limiter.is_allowed(client_ip):
|
|
reset_time = check_identity_rate_limiter.get_reset_time(client_ip)
|
|
logger.warning(f"Rate limit exceeded for check-identity from IP {client_ip}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Too many requests. Please try again later.",
|
|
headers={
|
|
"X-RateLimit-Limit": "5",
|
|
"X-RateLimit-Remaining": "0",
|
|
"X-RateLimit-Reset": str(reset_time),
|
|
"Retry-After": str(max(1, reset_time - int(__import__('time').time())))
|
|
}
|
|
)
|
|
|
|
try:
|
|
from .email_server_cache import email_server_cache
|
|
from backend.config import settings
|
|
|
|
identity = check_data.identity # Already normalized by validator
|
|
is_email = '@' in identity
|
|
|
|
identity_type = "email" if is_email else "username"
|
|
logger.info(f"Check-identity request for '{identity}' (type: {identity_type}) from IP {client_ip}")
|
|
|
|
# Get server IDs based on identity type
|
|
if is_email:
|
|
# Email lookup from cache
|
|
server_ids = email_server_cache.get_servers_for_email(identity)
|
|
else:
|
|
# Username lookup directly from Oracle (async)
|
|
server_ids = await email_server_cache.get_servers_for_username(identity)
|
|
|
|
if not server_ids:
|
|
# Identity not found - return empty response (don't expose available servers!)
|
|
logger.info(f"Identity '{identity}' not found in any server")
|
|
return CheckIdentityResponse(exists=False, servers=[], identity_type=identity_type)
|
|
|
|
# Build server info list with human-readable names
|
|
servers: List[ServerInfo] = []
|
|
for server_id in server_ids:
|
|
server_config = settings.get_oracle_server(server_id)
|
|
if server_config:
|
|
servers.append(ServerInfo(
|
|
id=server_config.id,
|
|
name=server_config.name
|
|
))
|
|
else:
|
|
# Fallback if server config not found (shouldn't happen)
|
|
logger.warning(f"Server '{server_id}' not found in config")
|
|
servers.append(ServerInfo(id=server_id, name=server_id))
|
|
|
|
logger.info(f"Identity '{identity}' found on {len(servers)} server(s): {[s.id for s in servers]}")
|
|
return CheckIdentityResponse(exists=True, servers=servers, identity_type=identity_type)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking identity '{check_data.identity}': {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error checking identity"
|
|
)
|
|
|
|
@router.post("/check-email", response_model=CheckEmailResponse, status_code=status.HTTP_200_OK)
|
|
async def check_email(
|
|
check_data: CheckEmailRequest,
|
|
request: Request
|
|
) -> CheckEmailResponse:
|
|
"""
|
|
Verifică dacă un email există în sistem și pe câte servere Oracle.
|
|
|
|
DEPRECATED: Folosește /check-identity pentru suport dual email/username.
|
|
Păstrat pentru backward compatibility.
|
|
|
|
Args:
|
|
check_data: Email-ul de verificat
|
|
request: Request-ul HTTP (pentru rate limiting)
|
|
|
|
Returns:
|
|
CheckEmailResponse cu exists și servers[]
|
|
"""
|
|
# Rate limiting - shared with check-identity
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
|
|
if not check_identity_rate_limiter.is_allowed(client_ip):
|
|
reset_time = check_identity_rate_limiter.get_reset_time(client_ip)
|
|
logger.warning(f"Rate limit exceeded for check-email from IP {client_ip}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Too many requests. Please try again later.",
|
|
headers={
|
|
"X-RateLimit-Limit": "5",
|
|
"X-RateLimit-Remaining": "0",
|
|
"X-RateLimit-Reset": str(reset_time),
|
|
"Retry-After": str(max(1, reset_time - int(__import__('time').time())))
|
|
}
|
|
)
|
|
|
|
try:
|
|
from .email_server_cache import email_server_cache
|
|
from backend.config import settings
|
|
|
|
email = check_data.email.lower().strip()
|
|
logger.info(f"Check-email request for '{email}' from IP {client_ip}")
|
|
|
|
# Get server IDs from cache
|
|
server_ids = email_server_cache.get_servers_for_email(email)
|
|
|
|
if not server_ids:
|
|
# Email not found - return empty response (don't expose available servers!)
|
|
logger.info(f"Email '{email}' not found in any server")
|
|
return CheckEmailResponse(exists=False, servers=[])
|
|
|
|
# Build server info list with human-readable names
|
|
servers: List[ServerInfo] = []
|
|
for server_id in server_ids:
|
|
server_config = settings.get_oracle_server(server_id)
|
|
if server_config:
|
|
servers.append(ServerInfo(
|
|
id=server_config.id,
|
|
name=server_config.name
|
|
))
|
|
else:
|
|
# Fallback if server config not found (shouldn't happen)
|
|
logger.warning(f"Server '{server_id}' not found in config")
|
|
servers.append(ServerInfo(id=server_id, name=server_id))
|
|
|
|
logger.info(f"Email '{email}' found on {len(servers)} server(s): {[s.id for s in servers]}")
|
|
return CheckEmailResponse(exists=True, servers=servers)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking email '{check_data.email}': {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error checking email"
|
|
)
|
|
|
|
@router.post("/login", status_code=status.HTTP_200_OK)
|
|
async def login(
|
|
login_data: LoginRequest,
|
|
request: Request,
|
|
response: Response,
|
|
):
|
|
"""
|
|
Autentifică un utilizator.
|
|
|
|
Flow cu 2FA (utilizator are email în Oracle):
|
|
1. Verifică credențialele în Oracle
|
|
2. Trimite cod OTP pe email
|
|
3. Returnează {requires_2fa: true, masked_email, email}
|
|
→ Frontend afișează câmpul de cod
|
|
→ Userul introduce codul → POST /auth/verify-2fa-code → JWT
|
|
|
|
Fallback fără 2FA (utilizator fără email):
|
|
- Returnează TokenResponse direct (comportament anterior)
|
|
|
|
Raises:
|
|
HTTPException 400: server_id invalid
|
|
HTTPException 401: credențiale invalide
|
|
HTTPException 429: rate limit OTP depășit
|
|
HTTPException 503: email service indisponibil
|
|
HTTPException 500: eroare internă
|
|
"""
|
|
try:
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
server_info = f" on server {login_data.server_id}" if login_data.server_id else ""
|
|
logger.info(f"[LOGIN] Attempt for '{login_data.username}'{server_info} from IP {client_ip}")
|
|
|
|
# Validare server_id (cod existent, păstrat intact)
|
|
if login_data.server_id:
|
|
from backend.config import settings
|
|
from shared.database.oracle_pool import oracle_pool
|
|
|
|
server_config = settings.get_oracle_server(login_data.server_id)
|
|
if not server_config:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid server_id: '{login_data.server_id}'. Server not found in configuration."
|
|
)
|
|
if not oracle_pool.is_server_registered(login_data.server_id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Server '{login_data.server_id}' is not available."
|
|
)
|
|
|
|
# Pas 1: Rezolvă email → username dacă input conține '@'
|
|
actual_username = login_data.username
|
|
input_email: Optional[str] = None
|
|
|
|
if "@" in login_data.username:
|
|
input_email = login_data.username.lower().strip()
|
|
resolved = await auth_service.get_username_by_email(input_email, login_data.server_id)
|
|
if not resolved:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid username or password"
|
|
)
|
|
actual_username = resolved
|
|
logger.info(f"[LOGIN] Email '{input_email}' resolved to username '{actual_username}'")
|
|
|
|
# Pas 2: Verifică credențialele Oracle
|
|
is_valid = await auth_service.verify_user_credentials(
|
|
actual_username, login_data.password, login_data.server_id
|
|
)
|
|
if not is_valid:
|
|
logger.warning(f"[LOGIN] Failed credentials for '{actual_username}'{server_info}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid username or password"
|
|
)
|
|
|
|
# Pas 3: Caută emailul utilizatorului (dacă nu îl știm deja din input)
|
|
user_email = input_email
|
|
if not user_email:
|
|
user_email = await _get_email_for_username(actual_username, login_data.server_id)
|
|
|
|
# Pas 2.5: Verificare trusted device — skip 2FA dacă tokenul e valid
|
|
if login_data.trusted_device_token and user_email:
|
|
is_trusted = await verify_trusted_device_token(
|
|
login_data.trusted_device_token,
|
|
actual_username,
|
|
login_data.server_id,
|
|
)
|
|
if is_trusted:
|
|
logger.info(
|
|
f"[TRUSTED_DEVICE] Device known for '{actual_username}' — skip 2FA"
|
|
)
|
|
return await _create_token_response_for_user(
|
|
actual_username, login_data.server_id, response
|
|
)
|
|
# Invalid/expirat → fail silently, continuă cu 2FA normal
|
|
|
|
# Pas 4: Dacă are email → trimitem OTP (2FA)
|
|
if user_email:
|
|
# Check for existing valid OTP (mobile page reload scenario)
|
|
existing_entry = get_otp_entry(user_email)
|
|
|
|
if existing_entry:
|
|
# OTP already exists and is valid — skip generation and email
|
|
logger.info(
|
|
f"[2FA] Reusing existing OTP for {user_email[:3]}*** "
|
|
f"(user='{actual_username}', skipping email)"
|
|
)
|
|
else:
|
|
# Generate new OTP
|
|
code = await create_otp(user_email, actual_username, login_data.server_id)
|
|
|
|
if code is None:
|
|
# Rate limited
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Prea multe cereri de cod. Așteptați 10 minute și încercați din nou."
|
|
)
|
|
|
|
# Trimitem emailul
|
|
try:
|
|
from backend.modules.telegram.utils.email_service import get_email_service
|
|
email_service = get_email_service()
|
|
email_sent = await email_service.send_auth_code(user_email, code, actual_username)
|
|
|
|
if not email_sent:
|
|
logger.error(f"[2FA] Failed to send OTP email to {user_email[:3]}***")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Nu s-a putut trimite codul de verificare. Încercați din nou."
|
|
)
|
|
|
|
logger.info(f"[2FA] OTP sent to {user_email[:3]}*** for user '{actual_username}'")
|
|
|
|
except ImportError:
|
|
# Email service nu e disponibil — fallback la login direct
|
|
logger.warning("[2FA] Email service not available, falling back to direct login")
|
|
user_email = None
|
|
|
|
# Pas 5: Dacă 2FA activ → returnăm cerere de cod
|
|
if user_email:
|
|
return LoginRequires2FAResponse(
|
|
requires_2fa=True,
|
|
masked_email=_mask_email(user_email),
|
|
email=user_email,
|
|
)
|
|
|
|
# Pas 6: Fallback — fără email → JWT direct (comportament anterior)
|
|
logger.info(f"[LOGIN] No email for '{actual_username}', issuing JWT directly (no 2FA)")
|
|
return await _create_token_response_for_user(actual_username, login_data.server_id, response)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except AuthenticationError as e:
|
|
logger.error(f"[LOGIN] Authentication error for '{login_data.username}': {e}")
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"[LOGIN] Unexpected error for '{login_data.username}': {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Internal authentication error"
|
|
)
|
|
|
|
@router.post("/verify-2fa-code", response_model=TokenResponse, status_code=status.HTTP_200_OK)
|
|
async def verify_2fa_code(
|
|
verify_data: Verify2FARequest,
|
|
request: Request,
|
|
response: Response,
|
|
) -> TokenResponse:
|
|
"""
|
|
Verifică codul OTP și emite JWT tokens (pasul 2 al 2FA).
|
|
|
|
Args:
|
|
verify_data: {code: "483921", email: "marius@romfast.ro", server_id: "romfast"}
|
|
|
|
Returns:
|
|
TokenResponse cu JWT tokens
|
|
|
|
Raises:
|
|
HTTPException 400: cod invalid, expirat sau prea multe încercări
|
|
HTTPException 429: rate limit depășit (IP)
|
|
HTTPException 500: eroare internă
|
|
"""
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
|
|
# Rate limiting per IP
|
|
if not verify_2fa_rate_limiter.is_allowed(client_ip):
|
|
reset_time = verify_2fa_rate_limiter.get_reset_time(client_ip)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Prea multe cereri. Încercați din nou mai târziu.",
|
|
headers={
|
|
"X-RateLimit-Limit": "10",
|
|
"X-RateLimit-Remaining": "0",
|
|
"X-RateLimit-Reset": str(reset_time),
|
|
},
|
|
)
|
|
|
|
result = verify_otp(verify_data.email, verify_data.code)
|
|
|
|
if not result["success"]:
|
|
error_code = result.get("error_code", "OTP_ERROR")
|
|
http_status = (
|
|
status.HTTP_429_TOO_MANY_REQUESTS
|
|
if error_code == "OTP_MAX_ATTEMPTS"
|
|
else status.HTTP_400_BAD_REQUEST
|
|
)
|
|
raise HTTPException(status_code=http_status, detail=result["error"])
|
|
|
|
# OTP valid — creăm JWT
|
|
username = result["username"]
|
|
server_id = result.get("server_id") or verify_data.server_id
|
|
|
|
logger.info(f"[2FA] OTP verified OK for '{username}' from IP {client_ip}")
|
|
|
|
token_response = await _create_token_response_for_user(username, server_id, response)
|
|
|
|
# Dacă utilizatorul a bifat "Ține minte acest dispozitiv"
|
|
if verify_data.trust_device:
|
|
trusted_token = await create_trusted_device_token(username, server_id)
|
|
token_response.trusted_device_token = trusted_token
|
|
logger.info(f"[TRUSTED_DEVICE] Token generated for '{username}' (server={server_id})")
|
|
|
|
# Generăm backup codes dacă nu există deja
|
|
if not await has_backup_codes(username, server_id):
|
|
codes = await generate_backup_codes(username, server_id)
|
|
token_response.backup_codes = codes
|
|
logger.info(f"[BACKUP_CODES] Generated {len(codes)} backup codes for '{username}'")
|
|
|
|
return token_response
|
|
|
|
@router.post("/resend-2fa-code", status_code=status.HTTP_200_OK)
|
|
async def resend_2fa_code(
|
|
resend_data: Resend2FARequest,
|
|
request: Request,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Retrimite codul OTP pe email (butonul "Retrimite codul" din frontend).
|
|
|
|
Verifică că există o sesiune OTP activă pentru email înainte de a retrimite.
|
|
|
|
Returns:
|
|
{"message": "Codul a fost retrimis", "masked_email": "m***@romfast.ro"}
|
|
|
|
Raises:
|
|
HTTPException 404: sesiunea OTP nu mai există (expirată sau deja verificată)
|
|
HTTPException 429: rate limit depășit
|
|
HTTPException 503: email service indisponibil
|
|
"""
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
|
|
# Rate limiting per IP
|
|
if not resend_2fa_rate_limiter.is_allowed(client_ip):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Prea multe cereri de retrimis. Așteptați și încercați din nou.",
|
|
)
|
|
|
|
email = resend_data.email.lower().strip()
|
|
|
|
# Verificăm că există sesiune OTP activă pentru email
|
|
entry = get_otp_entry(email)
|
|
if not entry:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Sesiunea de autentificare a expirat. Reîncepeti procesul de login.",
|
|
)
|
|
|
|
username = entry["username"]
|
|
server_id = entry.get("server_id") or resend_data.server_id
|
|
|
|
# Creăm cod nou (cu rate limiting)
|
|
code = await create_otp(email, username, server_id)
|
|
if code is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Prea multe cereri de cod. Așteptați 10 minute și încercați din nou.",
|
|
)
|
|
|
|
# Trimitem emailul
|
|
try:
|
|
from backend.modules.telegram.utils.email_service import get_email_service
|
|
email_service = get_email_service()
|
|
sent = await email_service.send_auth_code(email, code, username)
|
|
|
|
if not sent:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Nu s-a putut retrimite codul. Încercați din nou.",
|
|
)
|
|
|
|
logger.info(f"[2FA] OTP resent to {email[:3]}*** for user '{username}'")
|
|
return {
|
|
"message": "Codul a fost retrimis",
|
|
"masked_email": _mask_email(email),
|
|
}
|
|
|
|
except ImportError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Serviciul de email nu este disponibil.",
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# BACKUP CODES ENDPOINT
|
|
# -------------------------------------------------------------------------
|
|
backup_code_rate_limiter = RateLimiter(max_requests=5, time_window=300) # 5 req / 5 min
|
|
|
|
@router.post("/verify-backup-code", response_model=TokenResponse, status_code=status.HTTP_200_OK)
|
|
async def verify_backup_code_endpoint(
|
|
verify_data: VerifyBackupCodeRequest,
|
|
request: Request,
|
|
response: Response,
|
|
) -> TokenResponse:
|
|
"""
|
|
Verifică un cod de recuperare (backup code) și emite JWT tokens.
|
|
|
|
Fallback pentru cazul când emailul OTP nu sosește.
|
|
|
|
Raises:
|
|
HTTPException 400: cod invalid sau deja folosit
|
|
HTTPException 429: rate limit depășit
|
|
"""
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
if not backup_code_rate_limiter.is_allowed(client_ip):
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail="Prea multe cereri. Încercați din nou mai târziu."
|
|
)
|
|
|
|
email = verify_data.email.lower().strip()
|
|
# Rezolvă username din email (dacă e email) sau e direct username
|
|
if "@" in email:
|
|
actual_username = await auth_service.get_username_by_email(email, verify_data.server_id)
|
|
if not actual_username:
|
|
raise HTTPException(status_code=400, detail="Email invalid")
|
|
else:
|
|
actual_username = email.upper()
|
|
|
|
is_valid = await verify_backup_code(actual_username, verify_data.server_id, verify_data.code)
|
|
if not is_valid:
|
|
raise HTTPException(status_code=400, detail="Cod de recuperare invalid sau deja folosit")
|
|
|
|
logger.info(f"[BACKUP_CODE] Used backup code for '{actual_username}' from {client_ip}")
|
|
token_response = await _create_token_response_for_user(actual_username, verify_data.server_id, response)
|
|
|
|
if verify_data.trust_device:
|
|
trusted_token = await create_trusted_device_token(actual_username, verify_data.server_id)
|
|
token_response.trusted_device_token = trusted_token
|
|
logger.info(f"[TRUSTED_DEVICE] Token generated via backup code for '{actual_username}'")
|
|
|
|
return token_response
|
|
|
|
@router.post("/refresh", response_model=TokenResponse, status_code=status.HTTP_200_OK)
|
|
async def refresh_token(refresh_data: RefreshTokenRequest) -> TokenResponse:
|
|
"""
|
|
Reîmprospătează access token-ul folosind refresh token-ul
|
|
|
|
Args:
|
|
refresh_data: Refresh token-ul valid
|
|
|
|
Returns:
|
|
Noul access token și informațiile utilizatorului
|
|
|
|
Raises:
|
|
HTTPException: Pentru refresh token-uri invalide
|
|
"""
|
|
try:
|
|
# Validează refresh token-ul
|
|
token_data = jwt_handler.verify_token(refresh_data.refresh_token)
|
|
|
|
if not token_data or token_data.token_type != "refresh":
|
|
logger.warning("Invalid refresh token provided")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid refresh token"
|
|
)
|
|
|
|
# Obține datele actualizate ale utilizatorului
|
|
companies = await auth_service.get_user_companies(token_data.username)
|
|
permissions = ["read", "reports"] # Poate fi extins în viitor
|
|
|
|
# Creează noul access token
|
|
new_access_token = jwt_handler.create_access_token(
|
|
username=token_data.username,
|
|
companies=companies,
|
|
user_id=token_data.user_id,
|
|
permissions=permissions
|
|
)
|
|
|
|
# Informațiile utilizatorului
|
|
current_user = CurrentUser(
|
|
username=token_data.username,
|
|
user_id=token_data.user_id,
|
|
companies=companies,
|
|
permissions=permissions
|
|
)
|
|
|
|
token_response = TokenResponse(
|
|
access_token=new_access_token,
|
|
token_type="bearer",
|
|
expires_in=jwt_handler.access_token_expire_minutes * 60,
|
|
user=current_user
|
|
)
|
|
|
|
logger.info(f"Token refreshed for user {token_data.username}")
|
|
return token_response
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing token: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token refresh failed"
|
|
)
|
|
|
|
@router.post("/logout", status_code=status.HTTP_200_OK)
|
|
async def logout(
|
|
logout_data: Optional[LogoutRequest] = None,
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
) -> dict:
|
|
"""
|
|
Deconectează utilizatorul (invalidează token-urile)
|
|
|
|
Note: În implementarea curentă, token-urile JWT sunt stateless,
|
|
deci nu pot fi invalidate direct. În viitor poate fi implementat
|
|
un blacklist pentru token-uri.
|
|
|
|
Args:
|
|
logout_data: Date pentru logout (opțional)
|
|
current_user: Utilizatorul curent autentificat
|
|
|
|
Returns:
|
|
Confirmarea deconectării
|
|
"""
|
|
logger.info(f"User {current_user.username} logged out")
|
|
|
|
# În viitor, aici se poate implementa:
|
|
# - Adăugarea token-ului într-un blacklist
|
|
# - Invalidarea tuturor sesiunilor utilizatorului
|
|
# - Notificări de securitate
|
|
|
|
return {
|
|
"message": "Successfully logged out",
|
|
"username": current_user.username,
|
|
"logout_time": datetime.now().isoformat()
|
|
}
|
|
|
|
@router.get("/me", response_model=CurrentUser)
|
|
async def get_current_user_info(
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
) -> CurrentUser:
|
|
"""
|
|
Returnează informațiile despre utilizatorul curent
|
|
|
|
Args:
|
|
current_user: Utilizatorul curent autentificat
|
|
|
|
Returns:
|
|
Informațiile complete ale utilizatorului
|
|
"""
|
|
logger.debug(f"User info requested for {current_user.username}")
|
|
return current_user
|
|
|
|
@router.get("/companies", response_model=List[UserCompany])
|
|
async def get_user_companies(
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
) -> List[UserCompany]:
|
|
"""
|
|
Returnează lista firmelor la care utilizatorul are acces
|
|
|
|
Args:
|
|
current_user: Utilizatorul curent autentificat
|
|
|
|
Returns:
|
|
Lista firmelor cu permisiunile asociate
|
|
"""
|
|
try:
|
|
# Obține firmele actualizate din baza de date
|
|
companies = await auth_service.get_user_companies(current_user.username)
|
|
|
|
user_companies = []
|
|
for i, company_code in enumerate(companies):
|
|
# Obține permisiunile pentru fiecare firmă
|
|
permissions = await auth_service.get_user_permissions(
|
|
current_user.username,
|
|
company_code
|
|
)
|
|
|
|
user_company = UserCompany(
|
|
code=company_code,
|
|
permissions=permissions,
|
|
is_default=(i == 0) # Prima firmă ca default
|
|
)
|
|
user_companies.append(user_company)
|
|
|
|
logger.debug(f"Returned {len(user_companies)} companies for user {current_user.username}")
|
|
return user_companies
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting companies for user {current_user.username}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error retrieving user companies"
|
|
)
|
|
|
|
@router.post("/check-company-access", response_model=CompanyAccessResponse)
|
|
async def check_company_access(
|
|
access_request: CompanyAccessRequest,
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
) -> CompanyAccessResponse:
|
|
"""
|
|
Verifică dacă utilizatorul are acces la o firmă specifică
|
|
|
|
Args:
|
|
access_request: Request-ul de verificare acces
|
|
current_user: Utilizatorul curent autentificat
|
|
|
|
Returns:
|
|
Răspunsul cu informații despre acces
|
|
"""
|
|
try:
|
|
has_access = await auth_service.validate_user_company_access(
|
|
current_user.username,
|
|
access_request.company_code
|
|
)
|
|
|
|
if not has_access:
|
|
return CompanyAccessResponse(
|
|
has_access=False,
|
|
company=None,
|
|
missing_permissions=None
|
|
)
|
|
|
|
# Obține permisiunile pentru firmă
|
|
permissions = await auth_service.get_user_permissions(
|
|
current_user.username,
|
|
access_request.company_code
|
|
)
|
|
|
|
# Verifică permisiunile cerute
|
|
missing_permissions = []
|
|
if access_request.required_permissions:
|
|
missing_permissions = [
|
|
perm for perm in access_request.required_permissions
|
|
if perm not in permissions
|
|
]
|
|
|
|
user_company = UserCompany(
|
|
code=access_request.company_code,
|
|
permissions=permissions
|
|
)
|
|
|
|
return CompanyAccessResponse(
|
|
has_access=True,
|
|
company=user_company,
|
|
missing_permissions=missing_permissions if missing_permissions else None
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking company access: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error checking company access"
|
|
)
|
|
|
|
@router.get("/my-servers", response_model=dict)
|
|
async def get_my_servers(
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
) -> dict:
|
|
"""
|
|
Returnează lista serverelor la care utilizatorul autentificat are acces (US-006).
|
|
|
|
Acest endpoint este folosit de frontend pentru a popula dropdown-ul de server switch.
|
|
Lookup-ul se face pe baza email-ului sau username-ului utilizatorului curent.
|
|
|
|
Args:
|
|
current_user: Utilizatorul curent autentificat
|
|
|
|
Returns:
|
|
Dict cu lista de servere: {servers: [{id: string, name: string}, ...]}
|
|
"""
|
|
try:
|
|
from .email_server_cache import email_server_cache
|
|
from backend.config import settings
|
|
|
|
logger.info(f"Get my-servers request for user '{current_user.username}'")
|
|
|
|
# Try email lookup first (faster, from cache)
|
|
server_ids: List[str] = []
|
|
if current_user.email:
|
|
server_ids = email_server_cache.get_servers_for_email(current_user.email)
|
|
logger.debug(f"Email lookup for '{current_user.email}': {server_ids}")
|
|
|
|
# If no email or no results, try username lookup (queries Oracle directly)
|
|
if not server_ids:
|
|
server_ids = await email_server_cache.get_servers_for_username(current_user.username)
|
|
logger.debug(f"Username lookup for '{current_user.username}': {server_ids}")
|
|
|
|
# Build server info list with human-readable names
|
|
servers: List[ServerInfo] = []
|
|
for server_id in server_ids:
|
|
server_config = settings.get_oracle_server(server_id)
|
|
if server_config:
|
|
servers.append(ServerInfo(
|
|
id=server_config.id,
|
|
name=server_config.name
|
|
))
|
|
else:
|
|
# Fallback if server config not found
|
|
logger.warning(f"Server '{server_id}' not found in config")
|
|
servers.append(ServerInfo(id=server_id, name=server_id))
|
|
|
|
logger.info(f"User '{current_user.username}' has access to {len(servers)} server(s)")
|
|
return {"servers": [s.model_dump() for s in servers]}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting servers for user '{current_user.username}': {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Error retrieving user servers"
|
|
)
|
|
|
|
@router.get("/status")
|
|
async def get_auth_status(
|
|
current_user: Optional[CurrentUser] = Depends(get_optional_user)
|
|
) -> dict:
|
|
"""
|
|
Returnează statusul de autentificare (endpoint public)
|
|
|
|
Args:
|
|
current_user: Utilizatorul curent (opțional)
|
|
|
|
Returns:
|
|
Statusul de autentificare
|
|
"""
|
|
if current_user:
|
|
return {
|
|
"authenticated": True,
|
|
"username": current_user.username,
|
|
"companies_count": len(current_user.companies),
|
|
"permissions": current_user.permissions
|
|
}
|
|
else:
|
|
return {
|
|
"authenticated": False,
|
|
"username": None,
|
|
"companies_count": 0,
|
|
"permissions": []
|
|
}
|
|
|
|
# Rute de administrare (opționale)
|
|
if include_admin_routes:
|
|
|
|
@router.get("/admin/stats", response_model=AuthStats)
|
|
async def get_auth_stats(
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
) -> AuthStats:
|
|
"""
|
|
Returnează statistici despre sistemul de autentificare
|
|
|
|
Necesită permisiuni de admin.
|
|
"""
|
|
# Verifică permisiuni admin
|
|
if "admin" not in current_user.permissions:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin permissions required"
|
|
)
|
|
|
|
cache_stats = auth_service.get_cache_stats()
|
|
|
|
return AuthStats(
|
|
total_users=1, # Placeholder - poate fi implementat
|
|
active_sessions=1, # Placeholder - poate fi implementat
|
|
cache_hit_ratio=cache_stats.get('cache_hit_ratio', 0),
|
|
last_cleanup=datetime.now()
|
|
)
|
|
|
|
@router.post("/admin/refresh-cache")
|
|
async def refresh_user_cache(
|
|
username: Optional[str] = None,
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
) -> dict:
|
|
"""
|
|
Reîmprospătează cache-ul utilizatorilor
|
|
|
|
Necesită permisiuni de admin.
|
|
"""
|
|
if "admin" not in current_user.permissions:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin permissions required"
|
|
)
|
|
|
|
if username:
|
|
success = await auth_service.refresh_user_data(username)
|
|
return {
|
|
"message": f"Cache refreshed for user {username}",
|
|
"success": success
|
|
}
|
|
else:
|
|
auth_service.clear_cache()
|
|
return {"message": "All user cache cleared"}
|
|
|
|
return router
|
|
|
|
|
|
# Router implicit pentru folosire rapidă
|
|
auth_router = create_auth_router()
|
|
|
|
# Router cu rute de admin incluse
|
|
auth_router_with_admin = create_auth_router(include_admin_routes=True) |