Add missing test files and update .gitignore to allow test files
This commit addresses the overly restrictive .gitignore pattern that was excluding all test files (test_*.py), including legitimate pytest and unittest test suites essential for code quality and CI/CD. Changes to .gitignore: - Added negation patterns !**/tests/test_*.py and !**/test_*.py to allow proper test files while still blocking temporary scripts - This enables pytest test suites to be tracked by git Added test files (17 files): Telegram Bot Tests (15 files): - reports-app/telegram-bot/tests/test_auth.py Tests for authentication and account linking flow - reports-app/telegram-bot/tests/test_callbacks.py Tests for callback query handlers - reports-app/telegram-bot/tests/test_formatters.py Tests for message formatting utilities - reports-app/telegram-bot/tests/test_formatters_extended.py Extended formatter tests - reports-app/telegram-bot/tests/test_handlers_menu.py Tests for menu handlers - reports-app/telegram-bot/tests/test_helpers.py Tests for helper functions - reports-app/telegram-bot/tests/test_helpers_extended.py Extended helper tests - reports-app/telegram-bot/tests/test_helpers_real.py Real integration tests for helpers - reports-app/telegram-bot/tests/test_helpers_real_simple.py Simplified integration tests - reports-app/telegram-bot/tests/test_login_flow.py Complete login flow integration tests - reports-app/telegram-bot/tests/test_menus.py Menu system tests - reports-app/telegram-bot/tests/test_session_company.py Session and company management tests - reports-app/telegram-bot/test_claude_integration.py Manual integration test (Claude AI) - reports-app/telegram-bot/test_claude_response.py Response formatting test - reports-app/telegram-bot/test_db.py Database operations manual test Shared Module Tests (2 files): - shared/auth/test_auth.py Authentication system tests - shared/database/test_pool.py Oracle connection pool tests Security verification: ✅ All test files use mock objects, fixtures, and environment variables ✅ No hardcoded credentials or secrets found ✅ Safe for version control 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
559
shared/auth/test_auth.py
Normal file
559
shared/auth/test_auth.py
Normal file
@@ -0,0 +1,559 @@
|
||||
"""
|
||||
Comprehensive Authentication Tests pentru ROA2WEB
|
||||
|
||||
Acest modul conține teste pentru toate componentele sistemului de autentificare:
|
||||
- JWT Handler functionality
|
||||
- Oracle authentication service
|
||||
- FastAPI dependencies și middleware
|
||||
- Rate limiting și security features
|
||||
|
||||
Testele acoperă:
|
||||
- Unit tests pentru funcționalitatea de bază
|
||||
- Integration tests cu Oracle database (mock)
|
||||
- Security tests pentru vulnerabilități comune
|
||||
- Performance tests pentru scalabilitate
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import Mock, AsyncMock, patch, MagicMock
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
import jwt as pyjwt
|
||||
from fastapi import FastAPI, HTTPException, status
|
||||
from fastapi.testclient import TestClient
|
||||
from httpx import AsyncClient
|
||||
|
||||
# Import modulele de testat
|
||||
from .jwt_handler import JWTHandler, TokenData, TokenResponse
|
||||
from .auth_service import UserAuthService, AuthenticationError
|
||||
from .models import (
|
||||
LoginRequest, CurrentUser, PermissionType,
|
||||
CompanyAccessRequest, CompanyAccessResponse
|
||||
)
|
||||
from .middleware import AuthenticationMiddleware, RateLimiter
|
||||
from .dependencies import (
|
||||
get_current_user_from_token, require_company_access,
|
||||
require_permissions, get_current_company_from_header
|
||||
)
|
||||
from .routes import create_auth_router
|
||||
|
||||
|
||||
class TestJWTHandler:
|
||||
"""Test suite pentru JWT Handler"""
|
||||
|
||||
@pytest.fixture
|
||||
def jwt_handler(self):
|
||||
"""Fixture pentru JWT handler cu configurare de test"""
|
||||
return JWTHandler(
|
||||
secret_key="test-secret-key",
|
||||
algorithm="HS256"
|
||||
)
|
||||
|
||||
def test_create_access_token(self, jwt_handler):
|
||||
"""Test pentru crearea access token-urilor"""
|
||||
username = "testuser"
|
||||
companies = ["COMP1", "COMP2"]
|
||||
permissions = ["read", "write"]
|
||||
|
||||
token = jwt_handler.create_access_token(
|
||||
username=username,
|
||||
companies=companies,
|
||||
user_id=123,
|
||||
permissions=permissions
|
||||
)
|
||||
|
||||
assert isinstance(token, str)
|
||||
assert len(token) > 0
|
||||
|
||||
# Verifică că token-ul poate fi decodat
|
||||
payload = pyjwt.decode(token, "test-secret-key", algorithms=["HS256"])
|
||||
assert payload["username"] == username
|
||||
assert payload["companies"] == companies
|
||||
assert payload["permissions"] == permissions
|
||||
assert payload["user_id"] == 123
|
||||
assert payload["type"] == "access"
|
||||
|
||||
def test_create_refresh_token(self, jwt_handler):
|
||||
"""Test pentru crearea refresh token-urilor"""
|
||||
username = "testuser"
|
||||
user_id = 123
|
||||
|
||||
token = jwt_handler.create_refresh_token(username, user_id)
|
||||
|
||||
assert isinstance(token, str)
|
||||
assert len(token) > 0
|
||||
|
||||
# Verifică payload-ul
|
||||
payload = pyjwt.decode(token, "test-secret-key", algorithms=["HS256"])
|
||||
assert payload["username"] == username
|
||||
assert payload["user_id"] == user_id
|
||||
assert payload["type"] == "refresh"
|
||||
|
||||
def test_verify_valid_token(self, jwt_handler):
|
||||
"""Test pentru verificarea token-urilor valide"""
|
||||
username = "testuser"
|
||||
companies = ["COMP1"]
|
||||
|
||||
token = jwt_handler.create_access_token(username, companies)
|
||||
token_data = jwt_handler.verify_token(token)
|
||||
|
||||
assert token_data is not None
|
||||
assert isinstance(token_data, TokenData)
|
||||
assert token_data.username == username
|
||||
assert token_data.companies == companies
|
||||
assert token_data.token_type == "access"
|
||||
|
||||
def test_verify_expired_token(self, jwt_handler):
|
||||
"""Test pentru token-uri expirate"""
|
||||
# Creează token cu expirare în trecut
|
||||
past_time = datetime.utcnow() - timedelta(minutes=10)
|
||||
payload = {
|
||||
"username": "testuser",
|
||||
"companies": ["COMP1"],
|
||||
"permissions": ["read"],
|
||||
"exp": past_time,
|
||||
"iat": past_time - timedelta(minutes=5),
|
||||
"type": "access"
|
||||
}
|
||||
|
||||
expired_token = pyjwt.encode(payload, "test-secret-key", algorithm="HS256")
|
||||
token_data = jwt_handler.verify_token(expired_token)
|
||||
|
||||
assert token_data is None
|
||||
|
||||
def test_verify_invalid_token(self, jwt_handler):
|
||||
"""Test pentru token-uri invalide"""
|
||||
invalid_tokens = [
|
||||
"invalid.token.here",
|
||||
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.invalid",
|
||||
"",
|
||||
None
|
||||
]
|
||||
|
||||
for invalid_token in invalid_tokens:
|
||||
if invalid_token is not None:
|
||||
token_data = jwt_handler.verify_token(invalid_token)
|
||||
assert token_data is None
|
||||
|
||||
def test_create_token_response(self, jwt_handler):
|
||||
"""Test pentru crearea răspunsului complet cu token-uri"""
|
||||
username = "testuser"
|
||||
companies = ["COMP1", "COMP2"]
|
||||
permissions = ["read", "reports"]
|
||||
|
||||
response = jwt_handler.create_token_response(
|
||||
username=username,
|
||||
companies=companies,
|
||||
permissions=permissions,
|
||||
include_refresh=True
|
||||
)
|
||||
|
||||
assert isinstance(response, TokenResponse)
|
||||
assert response.access_token is not None
|
||||
assert response.refresh_token is not None
|
||||
assert response.token_type == "bearer"
|
||||
assert response.expires_in > 0
|
||||
|
||||
def test_refresh_access_token(self, jwt_handler):
|
||||
"""Test pentru refresh-ul access token-urilor"""
|
||||
username = "testuser"
|
||||
refresh_token = jwt_handler.create_refresh_token(username, 123)
|
||||
companies = ["COMP1", "COMP2"]
|
||||
|
||||
new_access_token = jwt_handler.refresh_access_token(
|
||||
refresh_token, companies, ["read", "write"]
|
||||
)
|
||||
|
||||
assert new_access_token is not None
|
||||
|
||||
# Verifică noul token
|
||||
token_data = jwt_handler.verify_token(new_access_token)
|
||||
assert token_data.username == username
|
||||
assert token_data.companies == companies
|
||||
assert token_data.token_type == "access"
|
||||
|
||||
|
||||
class TestUserAuthService:
|
||||
"""Test suite pentru User Authentication Service"""
|
||||
|
||||
@pytest.fixture
|
||||
def auth_service(self):
|
||||
"""Fixture pentru auth service cu mock database"""
|
||||
return UserAuthService()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_oracle_pool(self):
|
||||
"""Mock pentru Oracle connection pool"""
|
||||
with patch('roa2web.shared.auth.auth_service.oracle_pool') as mock_pool:
|
||||
yield mock_pool
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_verify_user_credentials_success(self, auth_service, mock_oracle_pool):
|
||||
"""Test pentru verificarea cu succes a credențialelor"""
|
||||
# Mock pentru conexiunea Oracle
|
||||
mock_connection = AsyncMock()
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchone.return_value = [1] # Success
|
||||
mock_connection.__aenter__.return_value = mock_connection
|
||||
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
||||
mock_oracle_pool.get_connection.return_value = mock_connection
|
||||
|
||||
result = await auth_service.verify_user_credentials("testuser", "password")
|
||||
|
||||
assert result is True
|
||||
mock_cursor.execute.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_verify_user_credentials_failure(self, auth_service, mock_oracle_pool):
|
||||
"""Test pentru verificarea eșuată a credențialelor"""
|
||||
# Mock pentru conexiunea Oracle
|
||||
mock_connection = AsyncMock()
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchone.return_value = [0] # Failure
|
||||
mock_connection.__aenter__.return_value = mock_connection
|
||||
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
||||
mock_oracle_pool.get_connection.return_value = mock_connection
|
||||
|
||||
result = await auth_service.verify_user_credentials("testuser", "wrongpassword")
|
||||
|
||||
assert result is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_companies(self, auth_service, mock_oracle_pool):
|
||||
"""Test pentru obținerea firmelor utilizatorului"""
|
||||
# Mock pentru conexiunea Oracle
|
||||
mock_connection = AsyncMock()
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"], ["COMP3"]]
|
||||
mock_connection.__aenter__.return_value = mock_connection
|
||||
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
||||
mock_oracle_pool.get_connection.return_value = mock_connection
|
||||
|
||||
companies = await auth_service.get_user_companies("testuser")
|
||||
|
||||
assert companies == ["COMP1", "COMP2", "COMP3"]
|
||||
mock_cursor.execute.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_and_create_tokens_success(self, auth_service, mock_oracle_pool):
|
||||
"""Test pentru autentificare completă cu succes"""
|
||||
# Mock pentru conexiunea Oracle
|
||||
mock_connection = AsyncMock()
|
||||
mock_cursor = MagicMock()
|
||||
|
||||
# Prima chiamată pentru verificare credențiale (succes)
|
||||
# A doua chiamată pentru obținerea firmelor
|
||||
mock_cursor.fetchone.return_value = [1]
|
||||
mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"]]
|
||||
|
||||
mock_connection.__aenter__.return_value = mock_connection
|
||||
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
||||
mock_oracle_pool.get_connection.return_value = mock_connection
|
||||
|
||||
success, token_response, error = await auth_service.authenticate_and_create_tokens(
|
||||
"testuser", "password"
|
||||
)
|
||||
|
||||
assert success is True
|
||||
assert token_response is not None
|
||||
assert isinstance(token_response, TokenResponse)
|
||||
assert error is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_and_create_tokens_failure(self, auth_service, mock_oracle_pool):
|
||||
"""Test pentru autentificare eșuată"""
|
||||
# Mock pentru conexiunea Oracle
|
||||
mock_connection = AsyncMock()
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchone.return_value = [0] # Credențiale invalide
|
||||
mock_connection.__aenter__.return_value = mock_connection
|
||||
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
||||
mock_oracle_pool.get_connection.return_value = mock_connection
|
||||
|
||||
success, token_response, error = await auth_service.authenticate_and_create_tokens(
|
||||
"testuser", "wrongpassword"
|
||||
)
|
||||
|
||||
assert success is False
|
||||
assert token_response is None
|
||||
assert error is not None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_validate_user_company_access(self, auth_service, mock_oracle_pool):
|
||||
"""Test pentru validarea accesului la firmă"""
|
||||
# Mock pentru conexiunea Oracle
|
||||
mock_connection = AsyncMock()
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"]]
|
||||
mock_connection.__aenter__.return_value = mock_connection
|
||||
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
||||
mock_oracle_pool.get_connection.return_value = mock_connection
|
||||
|
||||
# Test acces valid
|
||||
has_access = await auth_service.validate_user_company_access("testuser", "COMP1")
|
||||
assert has_access is True
|
||||
|
||||
# Test acces invalid
|
||||
has_access = await auth_service.validate_user_company_access("testuser", "COMP3")
|
||||
assert has_access is False
|
||||
|
||||
def test_cache_functionality(self, auth_service):
|
||||
"""Test pentru funcționalitatea de cache"""
|
||||
# Test cache stats
|
||||
stats = auth_service.get_cache_stats()
|
||||
assert isinstance(stats, dict)
|
||||
assert 'total_entries' in stats
|
||||
assert 'valid_entries' in stats
|
||||
assert 'cache_hit_ratio' in stats
|
||||
|
||||
# Test clear cache
|
||||
auth_service.clear_cache()
|
||||
stats_after_clear = auth_service.get_cache_stats()
|
||||
assert stats_after_clear['total_entries'] == 0
|
||||
|
||||
|
||||
class TestRateLimiter:
|
||||
"""Test suite pentru Rate Limiter"""
|
||||
|
||||
@pytest.fixture
|
||||
def rate_limiter(self):
|
||||
"""Fixture pentru rate limiter"""
|
||||
return RateLimiter(max_requests=3, time_window=5)
|
||||
|
||||
def test_rate_limiting_within_limit(self, rate_limiter):
|
||||
"""Test pentru request-uri în limita permisă"""
|
||||
client_ip = "192.168.1.1"
|
||||
|
||||
# Primele 3 request-uri trebuie să fie permise
|
||||
for i in range(3):
|
||||
assert rate_limiter.is_allowed(client_ip) is True
|
||||
|
||||
# Al 4-lea request trebuie refuzat
|
||||
assert rate_limiter.is_allowed(client_ip) is False
|
||||
|
||||
def test_rate_limiting_reset_after_time(self, rate_limiter):
|
||||
"""Test pentru resetarea rate limiting după expirarea ferestrei"""
|
||||
client_ip = "192.168.1.2"
|
||||
|
||||
# Consumă toate request-urile
|
||||
for i in range(3):
|
||||
assert rate_limiter.is_allowed(client_ip) is True
|
||||
|
||||
# Request-ul următor trebuie refuzat
|
||||
assert rate_limiter.is_allowed(client_ip) is False
|
||||
|
||||
# Simulează trecerea timpului
|
||||
time.sleep(6)
|
||||
|
||||
# Acum ar trebui să funcționeze din nou
|
||||
assert rate_limiter.is_allowed(client_ip) is True
|
||||
|
||||
def test_rate_limiting_different_ips(self, rate_limiter):
|
||||
"""Test pentru rate limiting pe IP-uri diferite"""
|
||||
ip1 = "192.168.1.1"
|
||||
ip2 = "192.168.1.2"
|
||||
|
||||
# Consumă toate request-urile pentru primul IP
|
||||
for i in range(3):
|
||||
assert rate_limiter.is_allowed(ip1) is True
|
||||
assert rate_limiter.is_allowed(ip1) is False
|
||||
|
||||
# Al doilea IP ar trebui să funcționeze normal
|
||||
for i in range(3):
|
||||
assert rate_limiter.is_allowed(ip2) is True
|
||||
|
||||
|
||||
class TestAuthenticationRoutes:
|
||||
"""Test suite pentru rutele de autentificare"""
|
||||
|
||||
@pytest.fixture
|
||||
def app(self):
|
||||
"""Fixture pentru aplicația FastAPI de test"""
|
||||
app = FastAPI()
|
||||
auth_router = create_auth_router()
|
||||
app.include_router(auth_router)
|
||||
return app
|
||||
|
||||
@pytest.fixture
|
||||
def client(self, app):
|
||||
"""Fixture pentru client de test"""
|
||||
return TestClient(app)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_auth_service(self):
|
||||
"""Mock pentru auth service"""
|
||||
with patch('roa2web.shared.auth.routes.auth_service') as mock_service:
|
||||
yield mock_service
|
||||
|
||||
def test_login_success(self, client, mock_auth_service):
|
||||
"""Test pentru login cu succes"""
|
||||
# Mock pentru autentificare cu succes
|
||||
mock_token_response = TokenResponse(
|
||||
access_token="test-access-token",
|
||||
refresh_token="test-refresh-token",
|
||||
token_type="bearer",
|
||||
expires_in=1800,
|
||||
user=CurrentUser(
|
||||
username="testuser",
|
||||
companies=["COMP1", "COMP2"],
|
||||
permissions=[PermissionType.READ, PermissionType.REPORTS]
|
||||
)
|
||||
)
|
||||
|
||||
mock_auth_service.authenticate_and_create_tokens.return_value = (
|
||||
True, mock_token_response, None
|
||||
)
|
||||
mock_auth_service.get_user_companies.return_value = ["COMP1", "COMP2"]
|
||||
|
||||
response = client.post("/auth/login", json={
|
||||
"username": "testuser",
|
||||
"password": "password"
|
||||
})
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["access_token"] == "test-access-token"
|
||||
assert data["token_type"] == "bearer"
|
||||
assert data["user"]["username"] == "testuser"
|
||||
|
||||
def test_login_failure(self, client, mock_auth_service):
|
||||
"""Test pentru login eșuat"""
|
||||
mock_auth_service.authenticate_and_create_tokens.return_value = (
|
||||
False, None, "Invalid credentials"
|
||||
)
|
||||
|
||||
response = client.post("/auth/login", json={
|
||||
"username": "testuser",
|
||||
"password": "wrongpassword"
|
||||
})
|
||||
|
||||
assert response.status_code == 401
|
||||
assert "Invalid credentials" in response.json()["detail"]
|
||||
|
||||
def test_protected_endpoint_without_token(self, client):
|
||||
"""Test pentru endpoint protejat fără token"""
|
||||
response = client.get("/auth/me")
|
||||
assert response.status_code == 401
|
||||
|
||||
def test_protected_endpoint_with_valid_token(self, client):
|
||||
"""Test pentru endpoint protejat cu token valid"""
|
||||
# Creează un token de test
|
||||
jwt_handler = JWTHandler(secret_key="test-secret-key")
|
||||
token = jwt_handler.create_access_token(
|
||||
username="testuser",
|
||||
companies=["COMP1"],
|
||||
permissions=["read"]
|
||||
)
|
||||
|
||||
with patch('roa2web.shared.auth.dependencies.jwt_handler', jwt_handler):
|
||||
response = client.get("/auth/me", headers={
|
||||
"Authorization": f"Bearer {token}"
|
||||
})
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == "testuser"
|
||||
|
||||
|
||||
class TestSecurityFeatures:
|
||||
"""Test suite pentru funcții de securitate"""
|
||||
|
||||
def test_jwt_token_tampering(self):
|
||||
"""Test pentru detectarea modificării token-urilor JWT"""
|
||||
jwt_handler = JWTHandler(secret_key="test-secret-key")
|
||||
token = jwt_handler.create_access_token("testuser", ["COMP1"])
|
||||
|
||||
# Modifică token-ul
|
||||
tampered_token = token[:-5] + "XXXXX"
|
||||
|
||||
# Token-ul modificat trebuie să fie invalid
|
||||
token_data = jwt_handler.verify_token(tampered_token)
|
||||
assert token_data is None
|
||||
|
||||
def test_jwt_secret_key_different(self):
|
||||
"""Test pentru token-uri semnate cu chei diferite"""
|
||||
jwt_handler1 = JWTHandler(secret_key="secret1")
|
||||
jwt_handler2 = JWTHandler(secret_key="secret2")
|
||||
|
||||
token = jwt_handler1.create_access_token("testuser", ["COMP1"])
|
||||
|
||||
# Token-ul nu trebuie să fie valid cu o cheie diferită
|
||||
token_data = jwt_handler2.verify_token(token)
|
||||
assert token_data is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sql_injection_prevention(self):
|
||||
"""Test pentru prevenirea SQL injection"""
|
||||
auth_service = UserAuthService()
|
||||
|
||||
with patch('roa2web.shared.auth.auth_service.oracle_pool') as mock_pool:
|
||||
mock_connection = AsyncMock()
|
||||
mock_cursor = MagicMock()
|
||||
mock_connection.__aenter__.return_value = mock_connection
|
||||
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
||||
mock_oracle_pool.get_connection.return_value = mock_connection
|
||||
|
||||
# Încearcă SQL injection în username
|
||||
malicious_username = "admin'; DROP TABLE users; --"
|
||||
|
||||
await auth_service.verify_user_credentials(malicious_username, "password")
|
||||
|
||||
# Verifică că query-ul folosește parametri legați
|
||||
mock_cursor.execute.assert_called_once()
|
||||
call_args = mock_cursor.execute.call_args
|
||||
assert ':username' in call_args[0][0] # Query cu parametri
|
||||
assert malicious_username.upper() == call_args[1]['username'] # Parametri legați
|
||||
|
||||
|
||||
@pytest.mark.performance
|
||||
class TestPerformance:
|
||||
"""Test suite pentru performanță"""
|
||||
|
||||
def test_jwt_token_creation_performance(self):
|
||||
"""Test pentru performanța creării token-urilor"""
|
||||
jwt_handler = JWTHandler(secret_key="test-secret-key")
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# Creează 1000 de token-uri
|
||||
for i in range(1000):
|
||||
jwt_handler.create_access_token(f"user{i}", ["COMP1"])
|
||||
|
||||
end_time = time.time()
|
||||
total_time = end_time - start_time
|
||||
|
||||
# Ar trebui să dureze mai puțin de 1 secundă
|
||||
assert total_time < 1.0
|
||||
print(f"Created 1000 tokens in {total_time:.4f} seconds")
|
||||
|
||||
def test_jwt_token_verification_performance(self):
|
||||
"""Test pentru performanța verificării token-urilor"""
|
||||
jwt_handler = JWTHandler(secret_key="test-secret-key")
|
||||
|
||||
# Creează 100 de token-uri
|
||||
tokens = []
|
||||
for i in range(100):
|
||||
token = jwt_handler.create_access_token(f"user{i}", ["COMP1"])
|
||||
tokens.append(token)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# Verifică toate token-urile
|
||||
for token in tokens:
|
||||
jwt_handler.verify_token(token)
|
||||
|
||||
end_time = time.time()
|
||||
total_time = end_time - start_time
|
||||
|
||||
# Ar trebui să dureze mai puțin de 0.5 secunde
|
||||
assert total_time < 0.5
|
||||
print(f"Verified 100 tokens in {total_time:.4f} seconds")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Rulează testele
|
||||
pytest.main([__file__, "-v", "--tb=short"])
|
||||
69
shared/database/test_pool.py
Normal file
69
shared/database/test_pool.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""
|
||||
Test script pentru verificarea conexiunii Oracle pool
|
||||
"""
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
# Load environment variables
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
# Load .env from roa2web root directory
|
||||
env_path = os.path.join(os.path.dirname(__file__), '../../.env')
|
||||
load_dotenv(env_path)
|
||||
print(f"📄 Loaded environment from: {env_path}")
|
||||
except ImportError:
|
||||
print("⚠️ python-dotenv not available, using system environment variables")
|
||||
|
||||
# Adăugare path pentru shared modules
|
||||
sys.path.append(os.path.dirname(__file__))
|
||||
|
||||
from oracle_pool import oracle_pool
|
||||
|
||||
async def test_oracle_pool():
|
||||
"""Test simplu pentru verificarea pool-ului Oracle"""
|
||||
print("🔄 Testing Oracle connection pool...")
|
||||
|
||||
try:
|
||||
# Inițializare pool
|
||||
print("📊 Initializing Oracle pool...")
|
||||
await oracle_pool.initialize()
|
||||
print("✅ Pool initialized successfully")
|
||||
|
||||
# Test conexiune
|
||||
print("🔍 Testing database connection...")
|
||||
async with oracle_pool.get_connection() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("SELECT 1 FROM DUAL")
|
||||
result = cursor.fetchone()
|
||||
print(f"✅ Database connection test successful: {result}")
|
||||
|
||||
print("🎯 Testing connection pool info...")
|
||||
if oracle_pool._pool:
|
||||
print(f"📈 Pool connections opened: {oracle_pool._pool.opened}")
|
||||
print(f"📊 Pool connections busy: {oracle_pool._pool.busy}")
|
||||
|
||||
# Cleanup
|
||||
print("🧹 Closing pool...")
|
||||
await oracle_pool.close_pool()
|
||||
print("✅ Pool closed successfully")
|
||||
|
||||
print("\n🎉 All tests passed! Oracle pool is working correctly.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error testing Oracle pool: {str(e)}")
|
||||
print(f"💡 Make sure environment variables are set:")
|
||||
print(f" - ORACLE_USER: {'✅ SET' if os.getenv('ORACLE_USER') else '❌ NOT SET'}")
|
||||
print(f" - ORACLE_PASSWORD: {'✅ SET' if os.getenv('ORACLE_PASSWORD') else '❌ NOT SET'}")
|
||||
print(f" - ORACLE_DSN: {'✅ SET' if os.getenv('ORACLE_DSN') else '❌ NOT SET'}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(f"🚀 ROA2WEB Oracle Pool Test - {datetime.now()}")
|
||||
print("=" * 50)
|
||||
|
||||
success = asyncio.run(test_oracle_pool())
|
||||
sys.exit(0 if success else 1)
|
||||
Reference in New Issue
Block a user