Files
Claude Agent 06cbf8fb9d feat(auth,dashboard): 2FA mobile session persistence and sparkline cards
- Persist 2FA state in sessionStorage to survive mobile page reloads
- Reuse existing valid OTP on re-login to avoid rate limiting and duplicate emails
- Add embedded sparkline charts to SolduriCompactCard with expand toggle
- Mobile dashboard redesigned: 2 pages with enriched compact cards + cashflow type
- Login UI simplified: remove gradient bg, subtitle, icon; use design tokens
- Focus OTP input when session is restored from 2FA state

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-26 14:36:22 +00:00

1021 lines
41 KiB
Python

"""
Authentication Routes Template pentru ROA2WEB FastAPI Applications
Acest modul oferă rute predefinite pentru autentificare care pot fi integrate
în orice aplicație FastAPI din ecosistemul ROA2WEB.
Endpoints disponibile:
- POST /auth/login - Autentificare utilizator
- POST /auth/refresh - Refresh access token
- POST /auth/logout - Deconectare utilizator
- GET /auth/me - Informații utilizator curent
- GET /auth/companies - Firmele utilizatorului
- GET /auth/status - Status autentificare
"""
import logging
from typing import Any, Dict, List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, Request, Response
from fastapi.security import HTTPAuthorizationCredentials
from .models import (
LoginRequest, TokenResponse, RefreshTokenRequest, LogoutRequest,
CurrentUser, UserCompany, CompanyAccessRequest, CompanyAccessResponse,
AuthError, AuthStats, CheckEmailRequest, CheckEmailResponse, ServerInfo,
CheckIdentityRequest, CheckIdentityResponse,
LoginRequires2FAResponse, Verify2FARequest, Resend2FARequest,
VerifyBackupCodeRequest,
)
from .auth_service import auth_service, AuthenticationError
from .jwt_handler import jwt_handler
from .dependencies import (
get_current_user, get_optional_user,
security_required, security_optional
)
from .middleware import default_rate_limiter, RateLimiter
from .otp_service import (
create_otp, verify_otp, get_otp_entry, _mask_email
)
from .trusted_device_service import (
create_trusted_device_token, verify_trusted_device_token
)
from .backup_codes_service import (
generate_backup_codes, verify_backup_code,
has_backup_codes, get_remaining_count
)
logger = logging.getLogger(__name__)
def create_auth_router(
prefix: str = "/auth",
tags: Optional[List[str]] = None,
include_admin_routes: bool = False
) -> APIRouter:
"""
Creează un router FastAPI cu toate rutele de autentificare
Args:
prefix: Prefix-ul pentru toate rutele
tags: Tag-urile pentru documentația OpenAPI
include_admin_routes: Dacă să includă rutele de administrare
Returns:
Router-ul FastAPI configurat
"""
router = APIRouter(prefix=prefix, tags=tags or ["authentication"])
# Rate limiter pentru check-identity/check-email: 5 requests per minut per IP
check_identity_rate_limiter = RateLimiter(max_requests=5, time_window=60)
# Rate limitere pentru 2FA
verify_2fa_rate_limiter = RateLimiter(max_requests=10, time_window=300) # 10 req / 5 min per IP
resend_2fa_rate_limiter = RateLimiter(max_requests=3, time_window=600) # 3 req / 10 min per IP
# -------------------------------------------------------------------------
# HELPER FUNCTIONS (private, în scope-ul create_auth_router)
# -------------------------------------------------------------------------
async def _get_email_for_username(username: str, server_id: Optional[str]) -> Optional[str]:
"""
Caută emailul unui utilizator în Oracle după username.
Returnează emailul lowercase sau None dacă nu există / nu e setat.
"""
try:
from shared.database.oracle_pool import oracle_pool
async with oracle_pool.get_connection(server_id) as connection:
with connection.cursor() as cursor:
cursor.execute("""
SELECT LOWER(TRIM(EMAIL))
FROM CONTAFIN_ORACLE.UTILIZATORI
WHERE UPPER(UTILIZATOR) = :username
AND INACTIV = 0
AND STERS = 0
AND EMAIL IS NOT NULL
AND TRIM(EMAIL) IS NOT NULL
""", {"username": username.upper()})
row = cursor.fetchone()
if row and row[0] and "@" in row[0]:
return row[0].strip()
return None
except Exception as e:
logger.error(f"[2FA] Error getting email for username '{username}': {e}")
return None
async def _create_token_response_for_user(
username: str,
server_id: Optional[str],
response: Response
) -> TokenResponse:
"""
Creează TokenResponse complet pentru un utilizator deja verificat.
Folosit după verificarea OTP (pasul 2 al 2FA) — fără re-verificare parolă.
"""
companies = await auth_service.get_user_companies(username, server_id)
permissions = await auth_service.get_user_permissions(
username, companies[0] if companies else "", server_id
)
jwt_tokens = jwt_handler.create_token_response(
username=username,
companies=companies,
user_id=None,
permissions=permissions,
server_id=server_id,
)
current_user = CurrentUser(
username=username,
user_id=None,
companies=companies,
permissions=permissions,
)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
return TokenResponse(
access_token=jwt_tokens.access_token,
refresh_token=jwt_tokens.refresh_token,
token_type=jwt_tokens.token_type,
expires_in=jwt_tokens.expires_in,
user=current_user,
)
@router.post("/check-identity", response_model=CheckIdentityResponse, status_code=status.HTTP_200_OK)
async def check_identity(
check_data: CheckIdentityRequest,
request: Request
) -> CheckIdentityResponse:
"""
Verifică dacă un email sau username există în sistem și pe câte servere Oracle (US-013).
Acest endpoint suportă dual login:
- Input cu '@': tratează ca email și caută în EmailServerCache
- Input fără '@': tratează ca username și caută direct în Oracle
SECURITATE:
- Rate limited: max 5 requests/minut per IP
- NU expune serverele disponibile pentru identități invalide
- Identități invalide returnează {exists: false, servers: []}
Args:
check_data: Identitatea de verificat (email sau username)
request: Request-ul HTTP (pentru rate limiting)
Returns:
CheckIdentityResponse cu exists, servers[] și identity_type
Raises:
HTTPException 429: Rate limit exceeded
"""
# Rate limiting - 5 req/min per IP
client_ip = request.client.host if request.client else "unknown"
if not check_identity_rate_limiter.is_allowed(client_ip):
reset_time = check_identity_rate_limiter.get_reset_time(client_ip)
logger.warning(f"Rate limit exceeded for check-identity from IP {client_ip}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many requests. Please try again later.",
headers={
"X-RateLimit-Limit": "5",
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(reset_time),
"Retry-After": str(max(1, reset_time - int(__import__('time').time())))
}
)
try:
from .email_server_cache import email_server_cache
from backend.config import settings
identity = check_data.identity # Already normalized by validator
is_email = '@' in identity
identity_type = "email" if is_email else "username"
logger.info(f"Check-identity request for '{identity}' (type: {identity_type}) from IP {client_ip}")
# Get server IDs based on identity type
if is_email:
# Email lookup from cache
server_ids = email_server_cache.get_servers_for_email(identity)
else:
# Username lookup directly from Oracle (async)
server_ids = await email_server_cache.get_servers_for_username(identity)
if not server_ids:
# Identity not found - return empty response (don't expose available servers!)
logger.info(f"Identity '{identity}' not found in any server")
return CheckIdentityResponse(exists=False, servers=[], identity_type=identity_type)
# Build server info list with human-readable names
servers: List[ServerInfo] = []
for server_id in server_ids:
server_config = settings.get_oracle_server(server_id)
if server_config:
servers.append(ServerInfo(
id=server_config.id,
name=server_config.name
))
else:
# Fallback if server config not found (shouldn't happen)
logger.warning(f"Server '{server_id}' not found in config")
servers.append(ServerInfo(id=server_id, name=server_id))
logger.info(f"Identity '{identity}' found on {len(servers)} server(s): {[s.id for s in servers]}")
return CheckIdentityResponse(exists=True, servers=servers, identity_type=identity_type)
except Exception as e:
logger.error(f"Error checking identity '{check_data.identity}': {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error checking identity"
)
@router.post("/check-email", response_model=CheckEmailResponse, status_code=status.HTTP_200_OK)
async def check_email(
check_data: CheckEmailRequest,
request: Request
) -> CheckEmailResponse:
"""
Verifică dacă un email există în sistem și pe câte servere Oracle.
DEPRECATED: Folosește /check-identity pentru suport dual email/username.
Păstrat pentru backward compatibility.
Args:
check_data: Email-ul de verificat
request: Request-ul HTTP (pentru rate limiting)
Returns:
CheckEmailResponse cu exists și servers[]
"""
# Rate limiting - shared with check-identity
client_ip = request.client.host if request.client else "unknown"
if not check_identity_rate_limiter.is_allowed(client_ip):
reset_time = check_identity_rate_limiter.get_reset_time(client_ip)
logger.warning(f"Rate limit exceeded for check-email from IP {client_ip}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many requests. Please try again later.",
headers={
"X-RateLimit-Limit": "5",
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(reset_time),
"Retry-After": str(max(1, reset_time - int(__import__('time').time())))
}
)
try:
from .email_server_cache import email_server_cache
from backend.config import settings
email = check_data.email.lower().strip()
logger.info(f"Check-email request for '{email}' from IP {client_ip}")
# Get server IDs from cache
server_ids = email_server_cache.get_servers_for_email(email)
if not server_ids:
# Email not found - return empty response (don't expose available servers!)
logger.info(f"Email '{email}' not found in any server")
return CheckEmailResponse(exists=False, servers=[])
# Build server info list with human-readable names
servers: List[ServerInfo] = []
for server_id in server_ids:
server_config = settings.get_oracle_server(server_id)
if server_config:
servers.append(ServerInfo(
id=server_config.id,
name=server_config.name
))
else:
# Fallback if server config not found (shouldn't happen)
logger.warning(f"Server '{server_id}' not found in config")
servers.append(ServerInfo(id=server_id, name=server_id))
logger.info(f"Email '{email}' found on {len(servers)} server(s): {[s.id for s in servers]}")
return CheckEmailResponse(exists=True, servers=servers)
except Exception as e:
logger.error(f"Error checking email '{check_data.email}': {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error checking email"
)
@router.post("/login", status_code=status.HTTP_200_OK)
async def login(
login_data: LoginRequest,
request: Request,
response: Response,
):
"""
Autentifică un utilizator.
Flow cu 2FA (utilizator are email în Oracle):
1. Verifică credențialele în Oracle
2. Trimite cod OTP pe email
3. Returnează {requires_2fa: true, masked_email, email}
→ Frontend afișează câmpul de cod
→ Userul introduce codul → POST /auth/verify-2fa-code → JWT
Fallback fără 2FA (utilizator fără email):
- Returnează TokenResponse direct (comportament anterior)
Raises:
HTTPException 400: server_id invalid
HTTPException 401: credențiale invalide
HTTPException 429: rate limit OTP depășit
HTTPException 503: email service indisponibil
HTTPException 500: eroare internă
"""
try:
client_ip = request.client.host if request.client else "unknown"
server_info = f" on server {login_data.server_id}" if login_data.server_id else ""
logger.info(f"[LOGIN] Attempt for '{login_data.username}'{server_info} from IP {client_ip}")
# Validare server_id (cod existent, păstrat intact)
if login_data.server_id:
from backend.config import settings
from shared.database.oracle_pool import oracle_pool
server_config = settings.get_oracle_server(login_data.server_id)
if not server_config:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid server_id: '{login_data.server_id}'. Server not found in configuration."
)
if not oracle_pool.is_server_registered(login_data.server_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Server '{login_data.server_id}' is not available."
)
# Pas 1: Rezolvă email → username dacă input conține '@'
actual_username = login_data.username
input_email: Optional[str] = None
if "@" in login_data.username:
input_email = login_data.username.lower().strip()
resolved = await auth_service.get_username_by_email(input_email, login_data.server_id)
if not resolved:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
actual_username = resolved
logger.info(f"[LOGIN] Email '{input_email}' resolved to username '{actual_username}'")
# Pas 2: Verifică credențialele Oracle
is_valid = await auth_service.verify_user_credentials(
actual_username, login_data.password, login_data.server_id
)
if not is_valid:
logger.warning(f"[LOGIN] Failed credentials for '{actual_username}'{server_info}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
# Pas 3: Caută emailul utilizatorului (dacă nu îl știm deja din input)
user_email = input_email
if not user_email:
user_email = await _get_email_for_username(actual_username, login_data.server_id)
# Pas 2.5: Verificare trusted device — skip 2FA dacă tokenul e valid
if login_data.trusted_device_token and user_email:
is_trusted = await verify_trusted_device_token(
login_data.trusted_device_token,
actual_username,
login_data.server_id,
)
if is_trusted:
logger.info(
f"[TRUSTED_DEVICE] Device known for '{actual_username}' — skip 2FA"
)
return await _create_token_response_for_user(
actual_username, login_data.server_id, response
)
# Invalid/expirat → fail silently, continuă cu 2FA normal
# Pas 4: Dacă are email → trimitem OTP (2FA)
if user_email:
# Check for existing valid OTP (mobile page reload scenario)
existing_entry = get_otp_entry(user_email)
if existing_entry:
# OTP already exists and is valid — skip generation and email
logger.info(
f"[2FA] Reusing existing OTP for {user_email[:3]}*** "
f"(user='{actual_username}', skipping email)"
)
else:
# Generate new OTP
code = await create_otp(user_email, actual_username, login_data.server_id)
if code is None:
# Rate limited
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Prea multe cereri de cod. Așteptați 10 minute și încercați din nou."
)
# Trimitem emailul
try:
from backend.modules.telegram.utils.email_service import get_email_service
email_service = get_email_service()
email_sent = await email_service.send_auth_code(user_email, code, actual_username)
if not email_sent:
logger.error(f"[2FA] Failed to send OTP email to {user_email[:3]}***")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Nu s-a putut trimite codul de verificare. Încercați din nou."
)
logger.info(f"[2FA] OTP sent to {user_email[:3]}*** for user '{actual_username}'")
except ImportError:
# Email service nu e disponibil — fallback la login direct
logger.warning("[2FA] Email service not available, falling back to direct login")
user_email = None
# Pas 5: Dacă 2FA activ → returnăm cerere de cod
if user_email:
return LoginRequires2FAResponse(
requires_2fa=True,
masked_email=_mask_email(user_email),
email=user_email,
)
# Pas 6: Fallback — fără email → JWT direct (comportament anterior)
logger.info(f"[LOGIN] No email for '{actual_username}', issuing JWT directly (no 2FA)")
return await _create_token_response_for_user(actual_username, login_data.server_id, response)
except HTTPException:
raise
except AuthenticationError as e:
logger.error(f"[LOGIN] Authentication error for '{login_data.username}': {e}")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
except Exception as e:
logger.error(f"[LOGIN] Unexpected error for '{login_data.username}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal authentication error"
)
@router.post("/verify-2fa-code", response_model=TokenResponse, status_code=status.HTTP_200_OK)
async def verify_2fa_code(
verify_data: Verify2FARequest,
request: Request,
response: Response,
) -> TokenResponse:
"""
Verifică codul OTP și emite JWT tokens (pasul 2 al 2FA).
Args:
verify_data: {code: "483921", email: "marius@romfast.ro", server_id: "romfast"}
Returns:
TokenResponse cu JWT tokens
Raises:
HTTPException 400: cod invalid, expirat sau prea multe încercări
HTTPException 429: rate limit depășit (IP)
HTTPException 500: eroare internă
"""
client_ip = request.client.host if request.client else "unknown"
# Rate limiting per IP
if not verify_2fa_rate_limiter.is_allowed(client_ip):
reset_time = verify_2fa_rate_limiter.get_reset_time(client_ip)
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Prea multe cereri. Încercați din nou mai târziu.",
headers={
"X-RateLimit-Limit": "10",
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(reset_time),
},
)
result = verify_otp(verify_data.email, verify_data.code)
if not result["success"]:
error_code = result.get("error_code", "OTP_ERROR")
http_status = (
status.HTTP_429_TOO_MANY_REQUESTS
if error_code == "OTP_MAX_ATTEMPTS"
else status.HTTP_400_BAD_REQUEST
)
raise HTTPException(status_code=http_status, detail=result["error"])
# OTP valid — creăm JWT
username = result["username"]
server_id = result.get("server_id") or verify_data.server_id
logger.info(f"[2FA] OTP verified OK for '{username}' from IP {client_ip}")
token_response = await _create_token_response_for_user(username, server_id, response)
# Dacă utilizatorul a bifat "Ține minte acest dispozitiv"
if verify_data.trust_device:
trusted_token = await create_trusted_device_token(username, server_id)
token_response.trusted_device_token = trusted_token
logger.info(f"[TRUSTED_DEVICE] Token generated for '{username}' (server={server_id})")
# Generăm backup codes dacă nu există deja
if not await has_backup_codes(username, server_id):
codes = await generate_backup_codes(username, server_id)
token_response.backup_codes = codes
logger.info(f"[BACKUP_CODES] Generated {len(codes)} backup codes for '{username}'")
return token_response
@router.post("/resend-2fa-code", status_code=status.HTTP_200_OK)
async def resend_2fa_code(
resend_data: Resend2FARequest,
request: Request,
) -> Dict[str, Any]:
"""
Retrimite codul OTP pe email (butonul "Retrimite codul" din frontend).
Verifică că există o sesiune OTP activă pentru email înainte de a retrimite.
Returns:
{"message": "Codul a fost retrimis", "masked_email": "m***@romfast.ro"}
Raises:
HTTPException 404: sesiunea OTP nu mai există (expirată sau deja verificată)
HTTPException 429: rate limit depășit
HTTPException 503: email service indisponibil
"""
client_ip = request.client.host if request.client else "unknown"
# Rate limiting per IP
if not resend_2fa_rate_limiter.is_allowed(client_ip):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Prea multe cereri de retrimis. Așteptați și încercați din nou.",
)
email = resend_data.email.lower().strip()
# Verificăm că există sesiune OTP activă pentru email
entry = get_otp_entry(email)
if not entry:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Sesiunea de autentificare a expirat. Reîncepeti procesul de login.",
)
username = entry["username"]
server_id = entry.get("server_id") or resend_data.server_id
# Creăm cod nou (cu rate limiting)
code = await create_otp(email, username, server_id)
if code is None:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Prea multe cereri de cod. Așteptați 10 minute și încercați din nou.",
)
# Trimitem emailul
try:
from backend.modules.telegram.utils.email_service import get_email_service
email_service = get_email_service()
sent = await email_service.send_auth_code(email, code, username)
if not sent:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Nu s-a putut retrimite codul. Încercați din nou.",
)
logger.info(f"[2FA] OTP resent to {email[:3]}*** for user '{username}'")
return {
"message": "Codul a fost retrimis",
"masked_email": _mask_email(email),
}
except ImportError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Serviciul de email nu este disponibil.",
)
# -------------------------------------------------------------------------
# BACKUP CODES ENDPOINT
# -------------------------------------------------------------------------
backup_code_rate_limiter = RateLimiter(max_requests=5, time_window=300) # 5 req / 5 min
@router.post("/verify-backup-code", response_model=TokenResponse, status_code=status.HTTP_200_OK)
async def verify_backup_code_endpoint(
verify_data: VerifyBackupCodeRequest,
request: Request,
response: Response,
) -> TokenResponse:
"""
Verifică un cod de recuperare (backup code) și emite JWT tokens.
Fallback pentru cazul când emailul OTP nu sosește.
Raises:
HTTPException 400: cod invalid sau deja folosit
HTTPException 429: rate limit depășit
"""
client_ip = request.client.host if request.client else "unknown"
if not backup_code_rate_limiter.is_allowed(client_ip):
raise HTTPException(
status_code=429,
detail="Prea multe cereri. Încercați din nou mai târziu."
)
email = verify_data.email.lower().strip()
# Rezolvă username din email (dacă e email) sau e direct username
if "@" in email:
actual_username = await auth_service.get_username_by_email(email, verify_data.server_id)
if not actual_username:
raise HTTPException(status_code=400, detail="Email invalid")
else:
actual_username = email.upper()
is_valid = await verify_backup_code(actual_username, verify_data.server_id, verify_data.code)
if not is_valid:
raise HTTPException(status_code=400, detail="Cod de recuperare invalid sau deja folosit")
logger.info(f"[BACKUP_CODE] Used backup code for '{actual_username}' from {client_ip}")
token_response = await _create_token_response_for_user(actual_username, verify_data.server_id, response)
if verify_data.trust_device:
trusted_token = await create_trusted_device_token(actual_username, verify_data.server_id)
token_response.trusted_device_token = trusted_token
logger.info(f"[TRUSTED_DEVICE] Token generated via backup code for '{actual_username}'")
return token_response
@router.post("/refresh", response_model=TokenResponse, status_code=status.HTTP_200_OK)
async def refresh_token(refresh_data: RefreshTokenRequest) -> TokenResponse:
"""
Reîmprospătează access token-ul folosind refresh token-ul
Args:
refresh_data: Refresh token-ul valid
Returns:
Noul access token și informațiile utilizatorului
Raises:
HTTPException: Pentru refresh token-uri invalide
"""
try:
# Validează refresh token-ul
token_data = jwt_handler.verify_token(refresh_data.refresh_token)
if not token_data or token_data.token_type != "refresh":
logger.warning("Invalid refresh token provided")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
# Obține datele actualizate ale utilizatorului
companies = await auth_service.get_user_companies(token_data.username)
permissions = ["read", "reports"] # Poate fi extins în viitor
# Creează noul access token
new_access_token = jwt_handler.create_access_token(
username=token_data.username,
companies=companies,
user_id=token_data.user_id,
permissions=permissions
)
# Informațiile utilizatorului
current_user = CurrentUser(
username=token_data.username,
user_id=token_data.user_id,
companies=companies,
permissions=permissions
)
token_response = TokenResponse(
access_token=new_access_token,
token_type="bearer",
expires_in=jwt_handler.access_token_expire_minutes * 60,
user=current_user
)
logger.info(f"Token refreshed for user {token_data.username}")
return token_response
except Exception as e:
logger.error(f"Error refreshing token: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token refresh failed"
)
@router.post("/logout", status_code=status.HTTP_200_OK)
async def logout(
logout_data: Optional[LogoutRequest] = None,
current_user: CurrentUser = Depends(get_current_user)
) -> dict:
"""
Deconectează utilizatorul (invalidează token-urile)
Note: În implementarea curentă, token-urile JWT sunt stateless,
deci nu pot fi invalidate direct. În viitor poate fi implementat
un blacklist pentru token-uri.
Args:
logout_data: Date pentru logout (opțional)
current_user: Utilizatorul curent autentificat
Returns:
Confirmarea deconectării
"""
logger.info(f"User {current_user.username} logged out")
# În viitor, aici se poate implementa:
# - Adăugarea token-ului într-un blacklist
# - Invalidarea tuturor sesiunilor utilizatorului
# - Notificări de securitate
return {
"message": "Successfully logged out",
"username": current_user.username,
"logout_time": datetime.now().isoformat()
}
@router.get("/me", response_model=CurrentUser)
async def get_current_user_info(
current_user: CurrentUser = Depends(get_current_user)
) -> CurrentUser:
"""
Returnează informațiile despre utilizatorul curent
Args:
current_user: Utilizatorul curent autentificat
Returns:
Informațiile complete ale utilizatorului
"""
logger.debug(f"User info requested for {current_user.username}")
return current_user
@router.get("/companies", response_model=List[UserCompany])
async def get_user_companies(
current_user: CurrentUser = Depends(get_current_user)
) -> List[UserCompany]:
"""
Returnează lista firmelor la care utilizatorul are acces
Args:
current_user: Utilizatorul curent autentificat
Returns:
Lista firmelor cu permisiunile asociate
"""
try:
# Obține firmele actualizate din baza de date
companies = await auth_service.get_user_companies(current_user.username)
user_companies = []
for i, company_code in enumerate(companies):
# Obține permisiunile pentru fiecare firmă
permissions = await auth_service.get_user_permissions(
current_user.username,
company_code
)
user_company = UserCompany(
code=company_code,
permissions=permissions,
is_default=(i == 0) # Prima firmă ca default
)
user_companies.append(user_company)
logger.debug(f"Returned {len(user_companies)} companies for user {current_user.username}")
return user_companies
except Exception as e:
logger.error(f"Error getting companies for user {current_user.username}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error retrieving user companies"
)
@router.post("/check-company-access", response_model=CompanyAccessResponse)
async def check_company_access(
access_request: CompanyAccessRequest,
current_user: CurrentUser = Depends(get_current_user)
) -> CompanyAccessResponse:
"""
Verifică dacă utilizatorul are acces la o firmă specifică
Args:
access_request: Request-ul de verificare acces
current_user: Utilizatorul curent autentificat
Returns:
Răspunsul cu informații despre acces
"""
try:
has_access = await auth_service.validate_user_company_access(
current_user.username,
access_request.company_code
)
if not has_access:
return CompanyAccessResponse(
has_access=False,
company=None,
missing_permissions=None
)
# Obține permisiunile pentru firmă
permissions = await auth_service.get_user_permissions(
current_user.username,
access_request.company_code
)
# Verifică permisiunile cerute
missing_permissions = []
if access_request.required_permissions:
missing_permissions = [
perm for perm in access_request.required_permissions
if perm not in permissions
]
user_company = UserCompany(
code=access_request.company_code,
permissions=permissions
)
return CompanyAccessResponse(
has_access=True,
company=user_company,
missing_permissions=missing_permissions if missing_permissions else None
)
except Exception as e:
logger.error(f"Error checking company access: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error checking company access"
)
@router.get("/my-servers", response_model=dict)
async def get_my_servers(
current_user: CurrentUser = Depends(get_current_user)
) -> dict:
"""
Returnează lista serverelor la care utilizatorul autentificat are acces (US-006).
Acest endpoint este folosit de frontend pentru a popula dropdown-ul de server switch.
Lookup-ul se face pe baza email-ului sau username-ului utilizatorului curent.
Args:
current_user: Utilizatorul curent autentificat
Returns:
Dict cu lista de servere: {servers: [{id: string, name: string}, ...]}
"""
try:
from .email_server_cache import email_server_cache
from backend.config import settings
logger.info(f"Get my-servers request for user '{current_user.username}'")
# Try email lookup first (faster, from cache)
server_ids: List[str] = []
if current_user.email:
server_ids = email_server_cache.get_servers_for_email(current_user.email)
logger.debug(f"Email lookup for '{current_user.email}': {server_ids}")
# If no email or no results, try username lookup (queries Oracle directly)
if not server_ids:
server_ids = await email_server_cache.get_servers_for_username(current_user.username)
logger.debug(f"Username lookup for '{current_user.username}': {server_ids}")
# Build server info list with human-readable names
servers: List[ServerInfo] = []
for server_id in server_ids:
server_config = settings.get_oracle_server(server_id)
if server_config:
servers.append(ServerInfo(
id=server_config.id,
name=server_config.name
))
else:
# Fallback if server config not found
logger.warning(f"Server '{server_id}' not found in config")
servers.append(ServerInfo(id=server_id, name=server_id))
logger.info(f"User '{current_user.username}' has access to {len(servers)} server(s)")
return {"servers": [s.model_dump() for s in servers]}
except Exception as e:
logger.error(f"Error getting servers for user '{current_user.username}': {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error retrieving user servers"
)
@router.get("/status")
async def get_auth_status(
current_user: Optional[CurrentUser] = Depends(get_optional_user)
) -> dict:
"""
Returnează statusul de autentificare (endpoint public)
Args:
current_user: Utilizatorul curent (opțional)
Returns:
Statusul de autentificare
"""
if current_user:
return {
"authenticated": True,
"username": current_user.username,
"companies_count": len(current_user.companies),
"permissions": current_user.permissions
}
else:
return {
"authenticated": False,
"username": None,
"companies_count": 0,
"permissions": []
}
# Rute de administrare (opționale)
if include_admin_routes:
@router.get("/admin/stats", response_model=AuthStats)
async def get_auth_stats(
current_user: CurrentUser = Depends(get_current_user)
) -> AuthStats:
"""
Returnează statistici despre sistemul de autentificare
Necesită permisiuni de admin.
"""
# Verifică permisiuni admin
if "admin" not in current_user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin permissions required"
)
cache_stats = auth_service.get_cache_stats()
return AuthStats(
total_users=1, # Placeholder - poate fi implementat
active_sessions=1, # Placeholder - poate fi implementat
cache_hit_ratio=cache_stats.get('cache_hit_ratio', 0),
last_cleanup=datetime.now()
)
@router.post("/admin/refresh-cache")
async def refresh_user_cache(
username: Optional[str] = None,
current_user: CurrentUser = Depends(get_current_user)
) -> dict:
"""
Reîmprospătează cache-ul utilizatorilor
Necesită permisiuni de admin.
"""
if "admin" not in current_user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin permissions required"
)
if username:
success = await auth_service.refresh_user_data(username)
return {
"message": f"Cache refreshed for user {username}",
"success": success
}
else:
auth_service.clear_cache()
return {"message": "All user cache cleared"}
return router
# Router implicit pentru folosire rapidă
auth_router = create_auth_router()
# Router cu rute de admin incluse
auth_router_with_admin = create_auth_router(include_admin_routes=True)