""" JWT Authentication Handler - Shared între toate aplicațiile ROA2WEB Acest modul gestionează crearea, validarea și refresh-ul token-urilor JWT pentru autentificarea utilizatorilor în ecosistemul ROA2WEB. Payload structure: { "username": "string", "user_id": "integer", "companies": ["schema1", "schema2"], "permissions": ["read", "write", "admin"], "server_id": "string|null", // ID-ul serverului Oracle (multi-server mode) "exp": "timestamp", "iat": "timestamp", "type": "access|refresh" } """ from jose import jwt import os from datetime import datetime, timedelta from typing import Optional, Dict, Any, List from pydantic import BaseModel, Field import logging logger = logging.getLogger(__name__) class TokenData(BaseModel): """Date conținute în token""" username: str = Field(description="Numele utilizatorului") user_id: Optional[int] = Field(default=None, description="ID-ul utilizatorului") companies: List[str] = Field(default_factory=list, description="Lista firmelor accesibile") permissions: List[str] = Field(default_factory=list, description="Lista permisiunilor") server_id: Optional[str] = Field(default=None, description="ID-ul serverului Oracle (pentru multi-server mode)") exp: datetime = Field(description="Data expirării") iat: datetime = Field(description="Data creării") token_type: str = Field(alias="type", description="Tipul token-ului (access/refresh)") class TokenResponse(BaseModel): """Răspuns pentru token-uri""" access_token: str = Field(description="JWT access token") refresh_token: Optional[str] = Field(default=None, description="JWT refresh token") token_type: str = Field(default="bearer", description="Tipul token-ului") expires_in: int = Field(description="Timpul de expirare în secunde") class JWTHandler: """ Gestionarea JWT tokens pentru autentificare Această clasă oferă funcționalități pentru: - Crearea token-urilor access și refresh - Validarea și decodificarea token-urilor - Gestionarea expirării token-urilor """ def __init__(self, secret_key: Optional[str] = None, algorithm: str = "HS256"): """ Inițializează JWT handler Args: secret_key: Cheia secretă pentru semnarea token-urilor algorithm: Algoritmul de criptare (default: HS256) """ self.secret_key = secret_key or os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-in-production') self.algorithm = algorithm self.access_token_expire_minutes = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', 30)) self.refresh_token_expire_days = int(os.getenv('REFRESH_TOKEN_EXPIRE_DAYS', 7)) # Warning pentru development if self.secret_key == 'your-secret-key-change-in-production': logger.warning("Using default JWT secret key! Change JWT_SECRET_KEY in production!") def create_access_token( self, username: str, companies: List[str], user_id: Optional[int] = None, permissions: Optional[List[str]] = None, server_id: Optional[str] = None ) -> str: """ Creează un JWT access token Args: username: Numele utilizatorului companies: Lista firmelor la care utilizatorul are acces user_id: ID-ul utilizatorului în baza de date permissions: Lista permisiunilor utilizatorului server_id: ID-ul serverului Oracle (pentru multi-server mode) Returns: Token JWT ca string """ now = datetime.utcnow() expire = now + timedelta(minutes=self.access_token_expire_minutes) payload = { "username": username, "user_id": user_id, "companies": companies or [], "permissions": permissions or ["read"], "server_id": server_id, "exp": expire, "iat": now, "type": "access" } token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm) logger.debug(f"Created access token for user {username} on server {server_id or 'default'} with companies: {companies}") return token def create_refresh_token( self, username: str, user_id: Optional[int] = None, server_id: Optional[str] = None ) -> str: """ Creează un refresh token cu durată mai mare Args: username: Numele utilizatorului user_id: ID-ul utilizatorului server_id: ID-ul serverului Oracle (pentru multi-server mode) Returns: Refresh token JWT ca string """ now = datetime.utcnow() expire = now + timedelta(days=self.refresh_token_expire_days) payload = { "username": username, "user_id": user_id, "server_id": server_id, "exp": expire, "iat": now, "type": "refresh" } token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm) logger.debug(f"Created refresh token for user {username} on server {server_id or 'default'}") return token def verify_token(self, token: str) -> Optional[TokenData]: """ Verifică și decodează un JWT token Args: token: Token-ul JWT de verificat Returns: TokenData cu informațiile din token sau None dacă token-ul e invalid """ try: logger.debug(f"Using JWT secret key (first 10 chars): {self.secret_key[:10]}...") payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) token_data = TokenData(**payload) logger.debug(f"Token verified successfully for user {token_data.username}") return token_data except jwt.ExpiredSignatureError: logger.warning("Token has expired") return None except jwt.JWTError as e: logger.warning(f"Invalid token: {str(e)}") logger.debug(f"Token that failed verification: {token[:50]}...") return None def refresh_access_token( self, refresh_token: str, companies: List[str], permissions: Optional[List[str]] = None ) -> Optional[str]: """ Creează un nou access token folosind refresh token-ul Args: refresh_token: Refresh token-ul valid companies: Lista actualizată a firmelor (poate fi modificată între refresh-uri) permissions: Lista actualizată a permisiunilor Returns: Noul access token sau None dacă refresh token-ul e invalid """ token_data = self.verify_token(refresh_token) if not token_data or token_data.token_type != "refresh": logger.warning("Invalid refresh token") return None # Creează nou access token cu datele din refresh token # Păstrează server_id din refresh token pentru consistență multi-server return self.create_access_token( username=token_data.username, companies=companies, user_id=token_data.user_id, permissions=permissions, server_id=token_data.server_id ) def create_token_response( self, username: str, companies: List[str], user_id: Optional[int] = None, permissions: Optional[List[str]] = None, include_refresh: bool = True, server_id: Optional[str] = None ) -> TokenResponse: """ Creează un răspuns complet cu access și refresh token Args: username: Numele utilizatorului companies: Lista firmelor accesibile user_id: ID-ul utilizatorului permissions: Lista permisiunilor include_refresh: Dacă să includă și refresh token server_id: ID-ul serverului Oracle (pentru multi-server mode) Returns: TokenResponse cu toate token-urile """ access_token = self.create_access_token( username, companies, user_id, permissions, server_id ) refresh_token = self.create_refresh_token( username, user_id, server_id ) if include_refresh else None return TokenResponse( access_token=access_token, refresh_token=refresh_token, token_type="bearer", expires_in=self.access_token_expire_minutes * 60 ) def decode_token_payload(self, token: str) -> Optional[Dict[str, Any]]: """ Decodează token-ul fără verificare (pentru debugging) Args: token: Token-ul de decodat Returns: Payload-ul token-ului sau None """ try: # Decodare fără verificare - doar pentru debugging payload = jwt.decode(token, key="", algorithms=[self.algorithm], options={"verify_signature": False}) return payload except Exception as e: logger.error(f"Error decoding token payload: {str(e)}") return None # Instance globală pentru folosire în toate aplicațiile jwt_handler = JWTHandler()