""" Comprehensive Authentication Tests pentru ROA2WEB Acest modul conține teste pentru toate componentele sistemului de autentificare: - JWT Handler functionality - Oracle authentication service - FastAPI dependencies și middleware - Rate limiting și security features Testele acoperă: - Unit tests pentru funcționalitatea de bază - Integration tests cu Oracle database (mock) - Security tests pentru vulnerabilități comune - Performance tests pentru scalabilitate """ import pytest import asyncio import os import time from datetime import datetime, timedelta from unittest.mock import Mock, AsyncMock, patch, MagicMock from typing import List, Dict, Any, Optional import jwt as pyjwt from fastapi import FastAPI, HTTPException, status from fastapi.testclient import TestClient from httpx import AsyncClient # Import modulele de testat from .jwt_handler import JWTHandler, TokenData, TokenResponse from .auth_service import UserAuthService, AuthenticationError from .models import ( LoginRequest, CurrentUser, PermissionType, CompanyAccessRequest, CompanyAccessResponse ) from .middleware import AuthenticationMiddleware, RateLimiter from .dependencies import ( get_current_user_from_token, require_company_access, require_permissions, get_current_company_from_header ) from .routes import create_auth_router class TestJWTHandler: """Test suite pentru JWT Handler""" @pytest.fixture def jwt_handler(self): """Fixture pentru JWT handler cu configurare de test""" return JWTHandler( secret_key="test-secret-key", algorithm="HS256" ) def test_create_access_token(self, jwt_handler): """Test pentru crearea access token-urilor""" username = "testuser" companies = ["COMP1", "COMP2"] permissions = ["read", "write"] token = jwt_handler.create_access_token( username=username, companies=companies, user_id=123, permissions=permissions ) assert isinstance(token, str) assert len(token) > 0 # Verifică că token-ul poate fi decodat payload = pyjwt.decode(token, "test-secret-key", algorithms=["HS256"]) assert payload["username"] == username assert payload["companies"] == companies assert payload["permissions"] == permissions assert payload["user_id"] == 123 assert payload["type"] == "access" def test_create_refresh_token(self, jwt_handler): """Test pentru crearea refresh token-urilor""" username = "testuser" user_id = 123 token = jwt_handler.create_refresh_token(username, user_id) assert isinstance(token, str) assert len(token) > 0 # Verifică payload-ul payload = pyjwt.decode(token, "test-secret-key", algorithms=["HS256"]) assert payload["username"] == username assert payload["user_id"] == user_id assert payload["type"] == "refresh" def test_verify_valid_token(self, jwt_handler): """Test pentru verificarea token-urilor valide""" username = "testuser" companies = ["COMP1"] token = jwt_handler.create_access_token(username, companies) token_data = jwt_handler.verify_token(token) assert token_data is not None assert isinstance(token_data, TokenData) assert token_data.username == username assert token_data.companies == companies assert token_data.token_type == "access" def test_verify_expired_token(self, jwt_handler): """Test pentru token-uri expirate""" # Creează token cu expirare în trecut past_time = datetime.utcnow() - timedelta(minutes=10) payload = { "username": "testuser", "companies": ["COMP1"], "permissions": ["read"], "exp": past_time, "iat": past_time - timedelta(minutes=5), "type": "access" } expired_token = pyjwt.encode(payload, "test-secret-key", algorithm="HS256") token_data = jwt_handler.verify_token(expired_token) assert token_data is None def test_verify_invalid_token(self, jwt_handler): """Test pentru token-uri invalide""" invalid_tokens = [ "invalid.token.here", "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.invalid", "", None ] for invalid_token in invalid_tokens: if invalid_token is not None: token_data = jwt_handler.verify_token(invalid_token) assert token_data is None def test_create_token_response(self, jwt_handler): """Test pentru crearea răspunsului complet cu token-uri""" username = "testuser" companies = ["COMP1", "COMP2"] permissions = ["read", "reports"] response = jwt_handler.create_token_response( username=username, companies=companies, permissions=permissions, include_refresh=True ) assert isinstance(response, TokenResponse) assert response.access_token is not None assert response.refresh_token is not None assert response.token_type == "bearer" assert response.expires_in > 0 def test_refresh_access_token(self, jwt_handler): """Test pentru refresh-ul access token-urilor""" username = "testuser" refresh_token = jwt_handler.create_refresh_token(username, 123) companies = ["COMP1", "COMP2"] new_access_token = jwt_handler.refresh_access_token( refresh_token, companies, ["read", "write"] ) assert new_access_token is not None # Verifică noul token token_data = jwt_handler.verify_token(new_access_token) assert token_data.username == username assert token_data.companies == companies assert token_data.token_type == "access" class TestUserAuthService: """Test suite pentru User Authentication Service""" @pytest.fixture def auth_service(self): """Fixture pentru auth service cu mock database""" return UserAuthService() @pytest.fixture def mock_oracle_pool(self): """Mock pentru Oracle connection pool""" with patch('roa2web.shared.auth.auth_service.oracle_pool') as mock_pool: yield mock_pool @pytest.mark.asyncio async def test_verify_user_credentials_success(self, auth_service, mock_oracle_pool): """Test pentru verificarea cu succes a credențialelor""" # Mock pentru conexiunea Oracle mock_connection = AsyncMock() mock_cursor = MagicMock() mock_cursor.fetchone.return_value = [1] # Success mock_connection.__aenter__.return_value = mock_connection mock_connection.cursor.return_value.__enter__.return_value = mock_cursor mock_oracle_pool.get_connection.return_value = mock_connection result = await auth_service.verify_user_credentials("testuser", "password") assert result is True mock_cursor.execute.assert_called_once() @pytest.mark.asyncio async def test_verify_user_credentials_failure(self, auth_service, mock_oracle_pool): """Test pentru verificarea eșuată a credențialelor""" # Mock pentru conexiunea Oracle mock_connection = AsyncMock() mock_cursor = MagicMock() mock_cursor.fetchone.return_value = [0] # Failure mock_connection.__aenter__.return_value = mock_connection mock_connection.cursor.return_value.__enter__.return_value = mock_cursor mock_oracle_pool.get_connection.return_value = mock_connection result = await auth_service.verify_user_credentials("testuser", "wrongpassword") assert result is False @pytest.mark.asyncio async def test_get_user_companies(self, auth_service, mock_oracle_pool): """Test pentru obținerea firmelor utilizatorului""" # Mock pentru conexiunea Oracle mock_connection = AsyncMock() mock_cursor = MagicMock() mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"], ["COMP3"]] mock_connection.__aenter__.return_value = mock_connection mock_connection.cursor.return_value.__enter__.return_value = mock_cursor mock_oracle_pool.get_connection.return_value = mock_connection companies = await auth_service.get_user_companies("testuser") assert companies == ["COMP1", "COMP2", "COMP3"] mock_cursor.execute.assert_called_once() @pytest.mark.asyncio async def test_authenticate_and_create_tokens_success(self, auth_service, mock_oracle_pool): """Test pentru autentificare completă cu succes""" # Mock pentru conexiunea Oracle mock_connection = AsyncMock() mock_cursor = MagicMock() # Prima chiamată pentru verificare credențiale (succes) # A doua chiamată pentru obținerea firmelor mock_cursor.fetchone.return_value = [1] mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"]] mock_connection.__aenter__.return_value = mock_connection mock_connection.cursor.return_value.__enter__.return_value = mock_cursor mock_oracle_pool.get_connection.return_value = mock_connection success, token_response, error = await auth_service.authenticate_and_create_tokens( "testuser", "password" ) assert success is True assert token_response is not None assert isinstance(token_response, TokenResponse) assert error is None @pytest.mark.asyncio async def test_authenticate_and_create_tokens_failure(self, auth_service, mock_oracle_pool): """Test pentru autentificare eșuată""" # Mock pentru conexiunea Oracle mock_connection = AsyncMock() mock_cursor = MagicMock() mock_cursor.fetchone.return_value = [0] # Credențiale invalide mock_connection.__aenter__.return_value = mock_connection mock_connection.cursor.return_value.__enter__.return_value = mock_cursor mock_oracle_pool.get_connection.return_value = mock_connection success, token_response, error = await auth_service.authenticate_and_create_tokens( "testuser", "wrongpassword" ) assert success is False assert token_response is None assert error is not None @pytest.mark.asyncio async def test_validate_user_company_access(self, auth_service, mock_oracle_pool): """Test pentru validarea accesului la firmă""" # Mock pentru conexiunea Oracle mock_connection = AsyncMock() mock_cursor = MagicMock() mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"]] mock_connection.__aenter__.return_value = mock_connection mock_connection.cursor.return_value.__enter__.return_value = mock_cursor mock_oracle_pool.get_connection.return_value = mock_connection # Test acces valid has_access = await auth_service.validate_user_company_access("testuser", "COMP1") assert has_access is True # Test acces invalid has_access = await auth_service.validate_user_company_access("testuser", "COMP3") assert has_access is False def test_cache_functionality(self, auth_service): """Test pentru funcționalitatea de cache""" # Test cache stats stats = auth_service.get_cache_stats() assert isinstance(stats, dict) assert 'total_entries' in stats assert 'valid_entries' in stats assert 'cache_hit_ratio' in stats # Test clear cache auth_service.clear_cache() stats_after_clear = auth_service.get_cache_stats() assert stats_after_clear['total_entries'] == 0 class TestRateLimiter: """Test suite pentru Rate Limiter""" @pytest.fixture def rate_limiter(self): """Fixture pentru rate limiter""" return RateLimiter(max_requests=3, time_window=5) def test_rate_limiting_within_limit(self, rate_limiter): """Test pentru request-uri în limita permisă""" client_ip = "192.168.1.1" # Primele 3 request-uri trebuie să fie permise for i in range(3): assert rate_limiter.is_allowed(client_ip) is True # Al 4-lea request trebuie refuzat assert rate_limiter.is_allowed(client_ip) is False def test_rate_limiting_reset_after_time(self, rate_limiter): """Test pentru resetarea rate limiting după expirarea ferestrei""" client_ip = "192.168.1.2" # Consumă toate request-urile for i in range(3): assert rate_limiter.is_allowed(client_ip) is True # Request-ul următor trebuie refuzat assert rate_limiter.is_allowed(client_ip) is False # Simulează trecerea timpului time.sleep(6) # Acum ar trebui să funcționeze din nou assert rate_limiter.is_allowed(client_ip) is True def test_rate_limiting_different_ips(self, rate_limiter): """Test pentru rate limiting pe IP-uri diferite""" ip1 = "192.168.1.1" ip2 = "192.168.1.2" # Consumă toate request-urile pentru primul IP for i in range(3): assert rate_limiter.is_allowed(ip1) is True assert rate_limiter.is_allowed(ip1) is False # Al doilea IP ar trebui să funcționeze normal for i in range(3): assert rate_limiter.is_allowed(ip2) is True class TestAuthenticationRoutes: """Test suite pentru rutele de autentificare""" @pytest.fixture def app(self): """Fixture pentru aplicația FastAPI de test""" app = FastAPI() auth_router = create_auth_router() app.include_router(auth_router) return app @pytest.fixture def client(self, app): """Fixture pentru client de test""" return TestClient(app) @pytest.fixture def mock_auth_service(self): """Mock pentru auth service""" with patch('roa2web.shared.auth.routes.auth_service') as mock_service: yield mock_service def test_login_success(self, client, mock_auth_service): """Test pentru login cu succes""" # Mock pentru autentificare cu succes mock_token_response = TokenResponse( access_token="test-access-token", refresh_token="test-refresh-token", token_type="bearer", expires_in=1800, user=CurrentUser( username="testuser", companies=["COMP1", "COMP2"], permissions=[PermissionType.READ, PermissionType.REPORTS] ) ) mock_auth_service.authenticate_and_create_tokens.return_value = ( True, mock_token_response, None ) mock_auth_service.get_user_companies.return_value = ["COMP1", "COMP2"] response = client.post("/auth/login", json={ "username": "testuser", "password": "password" }) assert response.status_code == 200 data = response.json() assert data["access_token"] == "test-access-token" assert data["token_type"] == "bearer" assert data["user"]["username"] == "testuser" def test_login_failure(self, client, mock_auth_service): """Test pentru login eșuat""" mock_auth_service.authenticate_and_create_tokens.return_value = ( False, None, "Invalid credentials" ) response = client.post("/auth/login", json={ "username": "testuser", "password": "wrongpassword" }) assert response.status_code == 401 assert "Invalid credentials" in response.json()["detail"] def test_protected_endpoint_without_token(self, client): """Test pentru endpoint protejat fără token""" response = client.get("/auth/me") assert response.status_code == 401 def test_protected_endpoint_with_valid_token(self, client): """Test pentru endpoint protejat cu token valid""" # Creează un token de test jwt_handler = JWTHandler(secret_key="test-secret-key") token = jwt_handler.create_access_token( username="testuser", companies=["COMP1"], permissions=["read"] ) with patch('roa2web.shared.auth.dependencies.jwt_handler', jwt_handler): response = client.get("/auth/me", headers={ "Authorization": f"Bearer {token}" }) assert response.status_code == 200 data = response.json() assert data["username"] == "testuser" class TestSecurityFeatures: """Test suite pentru funcții de securitate""" def test_jwt_token_tampering(self): """Test pentru detectarea modificării token-urilor JWT""" jwt_handler = JWTHandler(secret_key="test-secret-key") token = jwt_handler.create_access_token("testuser", ["COMP1"]) # Modifică token-ul tampered_token = token[:-5] + "XXXXX" # Token-ul modificat trebuie să fie invalid token_data = jwt_handler.verify_token(tampered_token) assert token_data is None def test_jwt_secret_key_different(self): """Test pentru token-uri semnate cu chei diferite""" jwt_handler1 = JWTHandler(secret_key="secret1") jwt_handler2 = JWTHandler(secret_key="secret2") token = jwt_handler1.create_access_token("testuser", ["COMP1"]) # Token-ul nu trebuie să fie valid cu o cheie diferită token_data = jwt_handler2.verify_token(token) assert token_data is None @pytest.mark.asyncio async def test_sql_injection_prevention(self): """Test pentru prevenirea SQL injection""" auth_service = UserAuthService() with patch('roa2web.shared.auth.auth_service.oracle_pool') as mock_pool: mock_connection = AsyncMock() mock_cursor = MagicMock() mock_connection.__aenter__.return_value = mock_connection mock_connection.cursor.return_value.__enter__.return_value = mock_cursor mock_oracle_pool.get_connection.return_value = mock_connection # Încearcă SQL injection în username malicious_username = "admin'; DROP TABLE users; --" await auth_service.verify_user_credentials(malicious_username, "password") # Verifică că query-ul folosește parametri legați mock_cursor.execute.assert_called_once() call_args = mock_cursor.execute.call_args assert ':username' in call_args[0][0] # Query cu parametri assert malicious_username.upper() == call_args[1]['username'] # Parametri legați @pytest.mark.performance class TestPerformance: """Test suite pentru performanță""" def test_jwt_token_creation_performance(self): """Test pentru performanța creării token-urilor""" jwt_handler = JWTHandler(secret_key="test-secret-key") start_time = time.time() # Creează 1000 de token-uri for i in range(1000): jwt_handler.create_access_token(f"user{i}", ["COMP1"]) end_time = time.time() total_time = end_time - start_time # Ar trebui să dureze mai puțin de 1 secundă assert total_time < 1.0 print(f"Created 1000 tokens in {total_time:.4f} seconds") def test_jwt_token_verification_performance(self): """Test pentru performanța verificării token-urilor""" jwt_handler = JWTHandler(secret_key="test-secret-key") # Creează 100 de token-uri tokens = [] for i in range(100): token = jwt_handler.create_access_token(f"user{i}", ["COMP1"]) tokens.append(token) start_time = time.time() # Verifică toate token-urile for token in tokens: jwt_handler.verify_token(token) end_time = time.time() total_time = end_time - start_time # Ar trebui să dureze mai puțin de 0.5 secunde assert total_time < 0.5 print(f"Verified 100 tokens in {total_time:.4f} seconds") if __name__ == "__main__": # Rulează testele pytest.main([__file__, "-v", "--tb=short"])