Files
roa2web-service-auto/shared/auth/jwt_handler.py
Claude Agent b137e80b71 feat: multi-Oracle server support with runtime switching
Complete implementation of multi-server Oracle database support:

Backend:
- Multi-pool Oracle with lazy loading per server
- Email-to-server cache for automatic server discovery
- JWT tokens include server_id claim
- /auth/check-identity and /auth/check-email endpoints
- /auth/my-servers endpoint for listing user's accessible servers
- Server switch with password re-authentication

Frontend:
- New ServerSelector component for header dropdown
- Multi-step login flow (identity → server → password)
- Server switching from header with password modal
- Mobile drawer menu with server selection
- Dark mode support for all new components
- URL bookmark support with ?server= query param

Scripts:
- Unified start.sh replacing start-prod.sh/start-test.sh
- Unified ssh-tunnel.sh with multi-server support
- Updated status.sh for new architecture

Tests:
- E2E tests for multi-server and single-server login flows
- Backend unit tests for all new endpoints
- Oracle multi-pool integration tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 22:39:06 +00:00

264 lines
9.2 KiB
Python

"""
JWT Authentication Handler - Shared între toate aplicațiile ROA2WEB
Acest modul gestionează crearea, validarea și refresh-ul token-urilor JWT
pentru autentificarea utilizatorilor în ecosistemul ROA2WEB.
Payload structure:
{
"username": "string",
"user_id": "integer",
"companies": ["schema1", "schema2"],
"permissions": ["read", "write", "admin"],
"server_id": "string|null", // ID-ul serverului Oracle (multi-server mode)
"exp": "timestamp",
"iat": "timestamp",
"type": "access|refresh"
}
"""
from jose import jwt
import os
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
import logging
logger = logging.getLogger(__name__)
class TokenData(BaseModel):
"""Date conținute în token"""
username: str = Field(description="Numele utilizatorului")
user_id: Optional[int] = Field(default=None, description="ID-ul utilizatorului")
companies: List[str] = Field(default_factory=list, description="Lista firmelor accesibile")
permissions: List[str] = Field(default_factory=list, description="Lista permisiunilor")
server_id: Optional[str] = Field(default=None, description="ID-ul serverului Oracle (pentru multi-server mode)")
exp: datetime = Field(description="Data expirării")
iat: datetime = Field(description="Data creării")
token_type: str = Field(alias="type", description="Tipul token-ului (access/refresh)")
class TokenResponse(BaseModel):
"""Răspuns pentru token-uri"""
access_token: str = Field(description="JWT access token")
refresh_token: Optional[str] = Field(default=None, description="JWT refresh token")
token_type: str = Field(default="bearer", description="Tipul token-ului")
expires_in: int = Field(description="Timpul de expirare în secunde")
class JWTHandler:
"""
Gestionarea JWT tokens pentru autentificare
Această clasă oferă funcționalități pentru:
- Crearea token-urilor access și refresh
- Validarea și decodificarea token-urilor
- Gestionarea expirării token-urilor
"""
def __init__(self, secret_key: Optional[str] = None, algorithm: str = "HS256"):
"""
Inițializează JWT handler
Args:
secret_key: Cheia secretă pentru semnarea token-urilor
algorithm: Algoritmul de criptare (default: HS256)
"""
self.secret_key = secret_key or os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-in-production')
self.algorithm = algorithm
self.access_token_expire_minutes = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', 30))
self.refresh_token_expire_days = int(os.getenv('REFRESH_TOKEN_EXPIRE_DAYS', 7))
# Warning pentru development
if self.secret_key == 'your-secret-key-change-in-production':
logger.warning("Using default JWT secret key! Change JWT_SECRET_KEY in production!")
def create_access_token(
self,
username: str,
companies: List[str],
user_id: Optional[int] = None,
permissions: Optional[List[str]] = None,
server_id: Optional[str] = None
) -> str:
"""
Creează un JWT access token
Args:
username: Numele utilizatorului
companies: Lista firmelor la care utilizatorul are acces
user_id: ID-ul utilizatorului în baza de date
permissions: Lista permisiunilor utilizatorului
server_id: ID-ul serverului Oracle (pentru multi-server mode)
Returns:
Token JWT ca string
"""
now = datetime.utcnow()
expire = now + timedelta(minutes=self.access_token_expire_minutes)
payload = {
"username": username,
"user_id": user_id,
"companies": companies or [],
"permissions": permissions or ["read"],
"server_id": server_id,
"exp": expire,
"iat": now,
"type": "access"
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
logger.debug(f"Created access token for user {username} on server {server_id or 'default'} with companies: {companies}")
return token
def create_refresh_token(
self,
username: str,
user_id: Optional[int] = None,
server_id: Optional[str] = None
) -> str:
"""
Creează un refresh token cu durată mai mare
Args:
username: Numele utilizatorului
user_id: ID-ul utilizatorului
server_id: ID-ul serverului Oracle (pentru multi-server mode)
Returns:
Refresh token JWT ca string
"""
now = datetime.utcnow()
expire = now + timedelta(days=self.refresh_token_expire_days)
payload = {
"username": username,
"user_id": user_id,
"server_id": server_id,
"exp": expire,
"iat": now,
"type": "refresh"
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
logger.debug(f"Created refresh token for user {username} on server {server_id or 'default'}")
return token
def verify_token(self, token: str) -> Optional[TokenData]:
"""
Verifică și decodează un JWT token
Args:
token: Token-ul JWT de verificat
Returns:
TokenData cu informațiile din token sau None dacă token-ul e invalid
"""
try:
logger.debug(f"Using JWT secret key (first 10 chars): {self.secret_key[:10]}...")
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
token_data = TokenData(**payload)
logger.debug(f"Token verified successfully for user {token_data.username}")
return token_data
except jwt.ExpiredSignatureError:
logger.warning("Token has expired")
return None
except jwt.JWTError as e:
logger.warning(f"Invalid token: {str(e)}")
logger.debug(f"Token that failed verification: {token[:50]}...")
return None
def refresh_access_token(
self,
refresh_token: str,
companies: List[str],
permissions: Optional[List[str]] = None
) -> Optional[str]:
"""
Creează un nou access token folosind refresh token-ul
Args:
refresh_token: Refresh token-ul valid
companies: Lista actualizată a firmelor (poate fi modificată între refresh-uri)
permissions: Lista actualizată a permisiunilor
Returns:
Noul access token sau None dacă refresh token-ul e invalid
"""
token_data = self.verify_token(refresh_token)
if not token_data or token_data.token_type != "refresh":
logger.warning("Invalid refresh token")
return None
# Creează nou access token cu datele din refresh token
# Păstrează server_id din refresh token pentru consistență multi-server
return self.create_access_token(
username=token_data.username,
companies=companies,
user_id=token_data.user_id,
permissions=permissions,
server_id=token_data.server_id
)
def create_token_response(
self,
username: str,
companies: List[str],
user_id: Optional[int] = None,
permissions: Optional[List[str]] = None,
include_refresh: bool = True,
server_id: Optional[str] = None
) -> TokenResponse:
"""
Creează un răspuns complet cu access și refresh token
Args:
username: Numele utilizatorului
companies: Lista firmelor accesibile
user_id: ID-ul utilizatorului
permissions: Lista permisiunilor
include_refresh: Dacă să includă și refresh token
server_id: ID-ul serverului Oracle (pentru multi-server mode)
Returns:
TokenResponse cu toate token-urile
"""
access_token = self.create_access_token(
username, companies, user_id, permissions, server_id
)
refresh_token = self.create_refresh_token(
username, user_id, server_id
) if include_refresh else None
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=self.access_token_expire_minutes * 60
)
def decode_token_payload(self, token: str) -> Optional[Dict[str, Any]]:
"""
Decodează token-ul fără verificare (pentru debugging)
Args:
token: Token-ul de decodat
Returns:
Payload-ul token-ului sau None
"""
try:
# Decodare fără verificare - doar pentru debugging
payload = jwt.decode(token, key="", algorithms=[self.algorithm], options={"verify_signature": False})
return payload
except Exception as e:
logger.error(f"Error decoding token payload: {str(e)}")
return None
# Instance globală pentru folosire în toate aplicațiile
jwt_handler = JWTHandler()