Complete implementation of multi-server Oracle database support: Backend: - Multi-pool Oracle with lazy loading per server - Email-to-server cache for automatic server discovery - JWT tokens include server_id claim - /auth/check-identity and /auth/check-email endpoints - /auth/my-servers endpoint for listing user's accessible servers - Server switch with password re-authentication Frontend: - New ServerSelector component for header dropdown - Multi-step login flow (identity → server → password) - Server switching from header with password modal - Mobile drawer menu with server selection - Dark mode support for all new components - URL bookmark support with ?server= query param Scripts: - Unified start.sh replacing start-prod.sh/start-test.sh - Unified ssh-tunnel.sh with multi-server support - Updated status.sh for new architecture Tests: - E2E tests for multi-server and single-server login flows - Backend unit tests for all new endpoints - Oracle multi-pool integration tests Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
264 lines
9.2 KiB
Python
264 lines
9.2 KiB
Python
"""
|
|
JWT Authentication Handler - Shared între toate aplicațiile ROA2WEB
|
|
|
|
Acest modul gestionează crearea, validarea și refresh-ul token-urilor JWT
|
|
pentru autentificarea utilizatorilor în ecosistemul ROA2WEB.
|
|
|
|
Payload structure:
|
|
{
|
|
"username": "string",
|
|
"user_id": "integer",
|
|
"companies": ["schema1", "schema2"],
|
|
"permissions": ["read", "write", "admin"],
|
|
"server_id": "string|null", // ID-ul serverului Oracle (multi-server mode)
|
|
"exp": "timestamp",
|
|
"iat": "timestamp",
|
|
"type": "access|refresh"
|
|
}
|
|
"""
|
|
from jose import jwt
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any, List
|
|
from pydantic import BaseModel, Field
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TokenData(BaseModel):
|
|
"""Date conținute în token"""
|
|
username: str = Field(description="Numele utilizatorului")
|
|
user_id: Optional[int] = Field(default=None, description="ID-ul utilizatorului")
|
|
companies: List[str] = Field(default_factory=list, description="Lista firmelor accesibile")
|
|
permissions: List[str] = Field(default_factory=list, description="Lista permisiunilor")
|
|
server_id: Optional[str] = Field(default=None, description="ID-ul serverului Oracle (pentru multi-server mode)")
|
|
exp: datetime = Field(description="Data expirării")
|
|
iat: datetime = Field(description="Data creării")
|
|
token_type: str = Field(alias="type", description="Tipul token-ului (access/refresh)")
|
|
|
|
|
|
class TokenResponse(BaseModel):
|
|
"""Răspuns pentru token-uri"""
|
|
access_token: str = Field(description="JWT access token")
|
|
refresh_token: Optional[str] = Field(default=None, description="JWT refresh token")
|
|
token_type: str = Field(default="bearer", description="Tipul token-ului")
|
|
expires_in: int = Field(description="Timpul de expirare în secunde")
|
|
|
|
|
|
class JWTHandler:
|
|
"""
|
|
Gestionarea JWT tokens pentru autentificare
|
|
|
|
Această clasă oferă funcționalități pentru:
|
|
- Crearea token-urilor access și refresh
|
|
- Validarea și decodificarea token-urilor
|
|
- Gestionarea expirării token-urilor
|
|
"""
|
|
|
|
def __init__(self, secret_key: Optional[str] = None, algorithm: str = "HS256"):
|
|
"""
|
|
Inițializează JWT handler
|
|
|
|
Args:
|
|
secret_key: Cheia secretă pentru semnarea token-urilor
|
|
algorithm: Algoritmul de criptare (default: HS256)
|
|
"""
|
|
self.secret_key = secret_key or os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-in-production')
|
|
self.algorithm = algorithm
|
|
self.access_token_expire_minutes = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', 30))
|
|
self.refresh_token_expire_days = int(os.getenv('REFRESH_TOKEN_EXPIRE_DAYS', 7))
|
|
|
|
# Warning pentru development
|
|
if self.secret_key == 'your-secret-key-change-in-production':
|
|
logger.warning("Using default JWT secret key! Change JWT_SECRET_KEY in production!")
|
|
|
|
def create_access_token(
|
|
self,
|
|
username: str,
|
|
companies: List[str],
|
|
user_id: Optional[int] = None,
|
|
permissions: Optional[List[str]] = None,
|
|
server_id: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Creează un JWT access token
|
|
|
|
Args:
|
|
username: Numele utilizatorului
|
|
companies: Lista firmelor la care utilizatorul are acces
|
|
user_id: ID-ul utilizatorului în baza de date
|
|
permissions: Lista permisiunilor utilizatorului
|
|
server_id: ID-ul serverului Oracle (pentru multi-server mode)
|
|
|
|
Returns:
|
|
Token JWT ca string
|
|
"""
|
|
now = datetime.utcnow()
|
|
expire = now + timedelta(minutes=self.access_token_expire_minutes)
|
|
|
|
payload = {
|
|
"username": username,
|
|
"user_id": user_id,
|
|
"companies": companies or [],
|
|
"permissions": permissions or ["read"],
|
|
"server_id": server_id,
|
|
"exp": expire,
|
|
"iat": now,
|
|
"type": "access"
|
|
}
|
|
|
|
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
|
|
logger.debug(f"Created access token for user {username} on server {server_id or 'default'} with companies: {companies}")
|
|
|
|
return token
|
|
|
|
def create_refresh_token(
|
|
self,
|
|
username: str,
|
|
user_id: Optional[int] = None,
|
|
server_id: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Creează un refresh token cu durată mai mare
|
|
|
|
Args:
|
|
username: Numele utilizatorului
|
|
user_id: ID-ul utilizatorului
|
|
server_id: ID-ul serverului Oracle (pentru multi-server mode)
|
|
|
|
Returns:
|
|
Refresh token JWT ca string
|
|
"""
|
|
now = datetime.utcnow()
|
|
expire = now + timedelta(days=self.refresh_token_expire_days)
|
|
|
|
payload = {
|
|
"username": username,
|
|
"user_id": user_id,
|
|
"server_id": server_id,
|
|
"exp": expire,
|
|
"iat": now,
|
|
"type": "refresh"
|
|
}
|
|
|
|
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
|
|
logger.debug(f"Created refresh token for user {username} on server {server_id or 'default'}")
|
|
|
|
return token
|
|
|
|
def verify_token(self, token: str) -> Optional[TokenData]:
|
|
"""
|
|
Verifică și decodează un JWT token
|
|
|
|
Args:
|
|
token: Token-ul JWT de verificat
|
|
|
|
Returns:
|
|
TokenData cu informațiile din token sau None dacă token-ul e invalid
|
|
"""
|
|
try:
|
|
logger.debug(f"Using JWT secret key (first 10 chars): {self.secret_key[:10]}...")
|
|
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
|
|
token_data = TokenData(**payload)
|
|
logger.debug(f"Token verified successfully for user {token_data.username}")
|
|
return token_data
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("Token has expired")
|
|
return None
|
|
except jwt.JWTError as e:
|
|
logger.warning(f"Invalid token: {str(e)}")
|
|
logger.debug(f"Token that failed verification: {token[:50]}...")
|
|
return None
|
|
|
|
def refresh_access_token(
|
|
self,
|
|
refresh_token: str,
|
|
companies: List[str],
|
|
permissions: Optional[List[str]] = None
|
|
) -> Optional[str]:
|
|
"""
|
|
Creează un nou access token folosind refresh token-ul
|
|
|
|
Args:
|
|
refresh_token: Refresh token-ul valid
|
|
companies: Lista actualizată a firmelor (poate fi modificată între refresh-uri)
|
|
permissions: Lista actualizată a permisiunilor
|
|
|
|
Returns:
|
|
Noul access token sau None dacă refresh token-ul e invalid
|
|
"""
|
|
token_data = self.verify_token(refresh_token)
|
|
|
|
if not token_data or token_data.token_type != "refresh":
|
|
logger.warning("Invalid refresh token")
|
|
return None
|
|
|
|
# Creează nou access token cu datele din refresh token
|
|
# Păstrează server_id din refresh token pentru consistență multi-server
|
|
return self.create_access_token(
|
|
username=token_data.username,
|
|
companies=companies,
|
|
user_id=token_data.user_id,
|
|
permissions=permissions,
|
|
server_id=token_data.server_id
|
|
)
|
|
|
|
def create_token_response(
|
|
self,
|
|
username: str,
|
|
companies: List[str],
|
|
user_id: Optional[int] = None,
|
|
permissions: Optional[List[str]] = None,
|
|
include_refresh: bool = True,
|
|
server_id: Optional[str] = None
|
|
) -> TokenResponse:
|
|
"""
|
|
Creează un răspuns complet cu access și refresh token
|
|
|
|
Args:
|
|
username: Numele utilizatorului
|
|
companies: Lista firmelor accesibile
|
|
user_id: ID-ul utilizatorului
|
|
permissions: Lista permisiunilor
|
|
include_refresh: Dacă să includă și refresh token
|
|
server_id: ID-ul serverului Oracle (pentru multi-server mode)
|
|
|
|
Returns:
|
|
TokenResponse cu toate token-urile
|
|
"""
|
|
access_token = self.create_access_token(
|
|
username, companies, user_id, permissions, server_id
|
|
)
|
|
refresh_token = self.create_refresh_token(
|
|
username, user_id, server_id
|
|
) if include_refresh else None
|
|
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
token_type="bearer",
|
|
expires_in=self.access_token_expire_minutes * 60
|
|
)
|
|
|
|
def decode_token_payload(self, token: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Decodează token-ul fără verificare (pentru debugging)
|
|
|
|
Args:
|
|
token: Token-ul de decodat
|
|
|
|
Returns:
|
|
Payload-ul token-ului sau None
|
|
"""
|
|
try:
|
|
# Decodare fără verificare - doar pentru debugging
|
|
payload = jwt.decode(token, key="", algorithms=[self.algorithm], options={"verify_signature": False})
|
|
return payload
|
|
except Exception as e:
|
|
logger.error(f"Error decoding token payload: {str(e)}")
|
|
return None
|
|
|
|
|
|
# Instance globală pentru folosire în toate aplicațiile
|
|
jwt_handler = JWTHandler() |