""" Authentication Routes Template pentru ROA2WEB FastAPI Applications Acest modul oferă rute predefinite pentru autentificare care pot fi integrate în orice aplicație FastAPI din ecosistemul ROA2WEB. Endpoints disponibile: - POST /auth/login - Autentificare utilizator - POST /auth/refresh - Refresh access token - POST /auth/logout - Deconectare utilizator - GET /auth/me - Informații utilizator curent - GET /auth/companies - Firmele utilizatorului - GET /auth/status - Status autentificare """ import logging from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, Request, Response from fastapi.security import HTTPAuthorizationCredentials from .models import ( LoginRequest, TokenResponse, RefreshTokenRequest, LogoutRequest, CurrentUser, UserCompany, CompanyAccessRequest, CompanyAccessResponse, AuthError, AuthStats ) from .auth_service import auth_service, AuthenticationError from .jwt_handler import jwt_handler from .dependencies import ( get_current_user, get_optional_user, security_required, security_optional ) from .middleware import default_rate_limiter logger = logging.getLogger(__name__) def create_auth_router( prefix: str = "/auth", tags: Optional[List[str]] = None, include_admin_routes: bool = False ) -> APIRouter: """ Creează un router FastAPI cu toate rutele de autentificare Args: prefix: Prefix-ul pentru toate rutele tags: Tag-urile pentru documentația OpenAPI include_admin_routes: Dacă să includă rutele de administrare Returns: Router-ul FastAPI configurat """ router = APIRouter(prefix=prefix, tags=tags or ["authentication"]) @router.post("/login", response_model=TokenResponse, status_code=status.HTTP_200_OK) async def login( login_data: LoginRequest, request: Request, response: Response ) -> TokenResponse: """ Autentifică un utilizator și returnează token-urile JWT Acest endpoint: - Validează credențialele utilizatorului în Oracle - Obține firmele la care utilizatorul are acces - Generează access și refresh token-uri JWT - Aplică rate limiting pentru securitate Args: login_data: Datele de autentificare (username, password) request: Request-ul HTTP (pentru rate limiting) response: Response-ul HTTP (pentru header-e) Returns: Token-urile JWT și informațiile utilizatorului Raises: HTTPException: Pentru credențiale invalide sau erori de sistem """ try: # Log tentativa de autentificare client_ip = request.client.host if request.client else "unknown" logger.info(f"Login attempt for user {login_data.username} from IP {client_ip}") # Autentifică și creează token-urile success, token_response, error_message = await auth_service.authenticate_and_create_tokens( login_data.username, login_data.password ) if not success: logger.warning(f"Failed login attempt for user {login_data.username}: {error_message}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=error_message or "Authentication failed" ) # Adaugă informațiile utilizatorului în răspuns companies = await auth_service.get_user_companies(login_data.username) current_user = CurrentUser( username=login_data.username, companies=companies, permissions=["read", "reports"], # Permisiuni de bază last_login=datetime.now() ) token_response.user = current_user # Header-e de securitate response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" logger.info(f"Successful login for user {login_data.username}") return token_response except AuthenticationError as e: logger.error(f"Authentication error for user {login_data.username}: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e) ) except Exception as e: logger.error(f"Unexpected error during login for user {login_data.username}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal authentication error" ) @router.post("/refresh", response_model=TokenResponse, status_code=status.HTTP_200_OK) async def refresh_token(refresh_data: RefreshTokenRequest) -> TokenResponse: """ Reîmprospătează access token-ul folosind refresh token-ul Args: refresh_data: Refresh token-ul valid Returns: Noul access token și informațiile utilizatorului Raises: HTTPException: Pentru refresh token-uri invalide """ try: # Validează refresh token-ul token_data = jwt_handler.verify_token(refresh_data.refresh_token) if not token_data or token_data.token_type != "refresh": logger.warning("Invalid refresh token provided") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) # Obține datele actualizate ale utilizatorului companies = await auth_service.get_user_companies(token_data.username) permissions = ["read", "reports"] # Poate fi extins în viitor # Creează noul access token new_access_token = jwt_handler.create_access_token( username=token_data.username, companies=companies, user_id=token_data.user_id, permissions=permissions ) # Informațiile utilizatorului current_user = CurrentUser( username=token_data.username, user_id=token_data.user_id, companies=companies, permissions=permissions ) token_response = TokenResponse( access_token=new_access_token, token_type="bearer", expires_in=jwt_handler.access_token_expire_minutes * 60, user=current_user ) logger.info(f"Token refreshed for user {token_data.username}") return token_response except Exception as e: logger.error(f"Error refreshing token: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token refresh failed" ) @router.post("/logout", status_code=status.HTTP_200_OK) async def logout( logout_data: Optional[LogoutRequest] = None, current_user: CurrentUser = Depends(get_current_user) ) -> dict: """ Deconectează utilizatorul (invalidează token-urile) Note: În implementarea curentă, token-urile JWT sunt stateless, deci nu pot fi invalidate direct. În viitor poate fi implementat un blacklist pentru token-uri. Args: logout_data: Date pentru logout (opțional) current_user: Utilizatorul curent autentificat Returns: Confirmarea deconectării """ logger.info(f"User {current_user.username} logged out") # În viitor, aici se poate implementa: # - Adăugarea token-ului într-un blacklist # - Invalidarea tuturor sesiunilor utilizatorului # - Notificări de securitate return { "message": "Successfully logged out", "username": current_user.username, "logout_time": datetime.now().isoformat() } @router.get("/me", response_model=CurrentUser) async def get_current_user_info( current_user: CurrentUser = Depends(get_current_user) ) -> CurrentUser: """ Returnează informațiile despre utilizatorul curent Args: current_user: Utilizatorul curent autentificat Returns: Informațiile complete ale utilizatorului """ logger.debug(f"User info requested for {current_user.username}") return current_user @router.get("/companies", response_model=List[UserCompany]) async def get_user_companies( current_user: CurrentUser = Depends(get_current_user) ) -> List[UserCompany]: """ Returnează lista firmelor la care utilizatorul are acces Args: current_user: Utilizatorul curent autentificat Returns: Lista firmelor cu permisiunile asociate """ try: # Obține firmele actualizate din baza de date companies = await auth_service.get_user_companies(current_user.username) user_companies = [] for i, company_code in enumerate(companies): # Obține permisiunile pentru fiecare firmă permissions = await auth_service.get_user_permissions( current_user.username, company_code ) user_company = UserCompany( code=company_code, permissions=permissions, is_default=(i == 0) # Prima firmă ca default ) user_companies.append(user_company) logger.debug(f"Returned {len(user_companies)} companies for user {current_user.username}") return user_companies except Exception as e: logger.error(f"Error getting companies for user {current_user.username}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error retrieving user companies" ) @router.post("/check-company-access", response_model=CompanyAccessResponse) async def check_company_access( access_request: CompanyAccessRequest, current_user: CurrentUser = Depends(get_current_user) ) -> CompanyAccessResponse: """ Verifică dacă utilizatorul are acces la o firmă specifică Args: access_request: Request-ul de verificare acces current_user: Utilizatorul curent autentificat Returns: Răspunsul cu informații despre acces """ try: has_access = await auth_service.validate_user_company_access( current_user.username, access_request.company_code ) if not has_access: return CompanyAccessResponse( has_access=False, company=None, missing_permissions=None ) # Obține permisiunile pentru firmă permissions = await auth_service.get_user_permissions( current_user.username, access_request.company_code ) # Verifică permisiunile cerute missing_permissions = [] if access_request.required_permissions: missing_permissions = [ perm for perm in access_request.required_permissions if perm not in permissions ] user_company = UserCompany( code=access_request.company_code, permissions=permissions ) return CompanyAccessResponse( has_access=True, company=user_company, missing_permissions=missing_permissions if missing_permissions else None ) except Exception as e: logger.error(f"Error checking company access: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error checking company access" ) @router.get("/status") async def get_auth_status( current_user: Optional[CurrentUser] = Depends(get_optional_user) ) -> dict: """ Returnează statusul de autentificare (endpoint public) Args: current_user: Utilizatorul curent (opțional) Returns: Statusul de autentificare """ if current_user: return { "authenticated": True, "username": current_user.username, "companies_count": len(current_user.companies), "permissions": current_user.permissions } else: return { "authenticated": False, "username": None, "companies_count": 0, "permissions": [] } # Rute de administrare (opționale) if include_admin_routes: @router.get("/admin/stats", response_model=AuthStats) async def get_auth_stats( current_user: CurrentUser = Depends(get_current_user) ) -> AuthStats: """ Returnează statistici despre sistemul de autentificare Necesită permisiuni de admin. """ # Verifică permisiuni admin if "admin" not in current_user.permissions: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin permissions required" ) cache_stats = auth_service.get_cache_stats() return AuthStats( total_users=1, # Placeholder - poate fi implementat active_sessions=1, # Placeholder - poate fi implementat cache_hit_ratio=cache_stats.get('cache_hit_ratio', 0), last_cleanup=datetime.now() ) @router.post("/admin/refresh-cache") async def refresh_user_cache( username: Optional[str] = None, current_user: CurrentUser = Depends(get_current_user) ) -> dict: """ Reîmprospătează cache-ul utilizatorilor Necesită permisiuni de admin. """ if "admin" not in current_user.permissions: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin permissions required" ) if username: success = await auth_service.refresh_user_data(username) return { "message": f"Cache refreshed for user {username}", "success": success } else: auth_service.clear_cache() return {"message": "All user cache cleared"} return router # Router implicit pentru folosire rapidă auth_router = create_auth_router() # Router cu rute de admin incluse auth_router_with_admin = create_auth_router(include_admin_routes=True)