Files
roa2web-service-auto/shared/auth/test_auth.py
Marius Mutu a7a1bef375 Add missing test files and update .gitignore to allow test files
This commit addresses the overly restrictive .gitignore pattern that
was excluding all test files (test_*.py), including legitimate pytest
and unittest test suites essential for code quality and CI/CD.

Changes to .gitignore:
- Added negation patterns !**/tests/test_*.py and !**/test_*.py
  to allow proper test files while still blocking temporary scripts
- This enables pytest test suites to be tracked by git

Added test files (17 files):

Telegram Bot Tests (15 files):
- reports-app/telegram-bot/tests/test_auth.py
  Tests for authentication and account linking flow
- reports-app/telegram-bot/tests/test_callbacks.py
  Tests for callback query handlers
- reports-app/telegram-bot/tests/test_formatters.py
  Tests for message formatting utilities
- reports-app/telegram-bot/tests/test_formatters_extended.py
  Extended formatter tests
- reports-app/telegram-bot/tests/test_handlers_menu.py
  Tests for menu handlers
- reports-app/telegram-bot/tests/test_helpers.py
  Tests for helper functions
- reports-app/telegram-bot/tests/test_helpers_extended.py
  Extended helper tests
- reports-app/telegram-bot/tests/test_helpers_real.py
  Real integration tests for helpers
- reports-app/telegram-bot/tests/test_helpers_real_simple.py
  Simplified integration tests
- reports-app/telegram-bot/tests/test_login_flow.py
  Complete login flow integration tests
- reports-app/telegram-bot/tests/test_menus.py
  Menu system tests
- reports-app/telegram-bot/tests/test_session_company.py
  Session and company management tests
- reports-app/telegram-bot/test_claude_integration.py
  Manual integration test (Claude AI)
- reports-app/telegram-bot/test_claude_response.py
  Response formatting test
- reports-app/telegram-bot/test_db.py
  Database operations manual test

Shared Module Tests (2 files):
- shared/auth/test_auth.py
  Authentication system tests
- shared/database/test_pool.py
  Oracle connection pool tests

Security verification:
 All test files use mock objects, fixtures, and environment variables
 No hardcoded credentials or secrets found
 Safe for version control

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-25 15:09:43 +03:00

559 lines
20 KiB
Python

"""
Comprehensive Authentication Tests pentru ROA2WEB
Acest modul conține teste pentru toate componentele sistemului de autentificare:
- JWT Handler functionality
- Oracle authentication service
- FastAPI dependencies și middleware
- Rate limiting și security features
Testele acoperă:
- Unit tests pentru funcționalitatea de bază
- Integration tests cu Oracle database (mock)
- Security tests pentru vulnerabilități comune
- Performance tests pentru scalabilitate
"""
import pytest
import asyncio
import os
import time
from datetime import datetime, timedelta
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from typing import List, Dict, Any, Optional
import jwt as pyjwt
from fastapi import FastAPI, HTTPException, status
from fastapi.testclient import TestClient
from httpx import AsyncClient
# Import modulele de testat
from .jwt_handler import JWTHandler, TokenData, TokenResponse
from .auth_service import UserAuthService, AuthenticationError
from .models import (
LoginRequest, CurrentUser, PermissionType,
CompanyAccessRequest, CompanyAccessResponse
)
from .middleware import AuthenticationMiddleware, RateLimiter
from .dependencies import (
get_current_user_from_token, require_company_access,
require_permissions, get_current_company_from_header
)
from .routes import create_auth_router
class TestJWTHandler:
"""Test suite pentru JWT Handler"""
@pytest.fixture
def jwt_handler(self):
"""Fixture pentru JWT handler cu configurare de test"""
return JWTHandler(
secret_key="test-secret-key",
algorithm="HS256"
)
def test_create_access_token(self, jwt_handler):
"""Test pentru crearea access token-urilor"""
username = "testuser"
companies = ["COMP1", "COMP2"]
permissions = ["read", "write"]
token = jwt_handler.create_access_token(
username=username,
companies=companies,
user_id=123,
permissions=permissions
)
assert isinstance(token, str)
assert len(token) > 0
# Verifică că token-ul poate fi decodat
payload = pyjwt.decode(token, "test-secret-key", algorithms=["HS256"])
assert payload["username"] == username
assert payload["companies"] == companies
assert payload["permissions"] == permissions
assert payload["user_id"] == 123
assert payload["type"] == "access"
def test_create_refresh_token(self, jwt_handler):
"""Test pentru crearea refresh token-urilor"""
username = "testuser"
user_id = 123
token = jwt_handler.create_refresh_token(username, user_id)
assert isinstance(token, str)
assert len(token) > 0
# Verifică payload-ul
payload = pyjwt.decode(token, "test-secret-key", algorithms=["HS256"])
assert payload["username"] == username
assert payload["user_id"] == user_id
assert payload["type"] == "refresh"
def test_verify_valid_token(self, jwt_handler):
"""Test pentru verificarea token-urilor valide"""
username = "testuser"
companies = ["COMP1"]
token = jwt_handler.create_access_token(username, companies)
token_data = jwt_handler.verify_token(token)
assert token_data is not None
assert isinstance(token_data, TokenData)
assert token_data.username == username
assert token_data.companies == companies
assert token_data.token_type == "access"
def test_verify_expired_token(self, jwt_handler):
"""Test pentru token-uri expirate"""
# Creează token cu expirare în trecut
past_time = datetime.utcnow() - timedelta(minutes=10)
payload = {
"username": "testuser",
"companies": ["COMP1"],
"permissions": ["read"],
"exp": past_time,
"iat": past_time - timedelta(minutes=5),
"type": "access"
}
expired_token = pyjwt.encode(payload, "test-secret-key", algorithm="HS256")
token_data = jwt_handler.verify_token(expired_token)
assert token_data is None
def test_verify_invalid_token(self, jwt_handler):
"""Test pentru token-uri invalide"""
invalid_tokens = [
"invalid.token.here",
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.invalid",
"",
None
]
for invalid_token in invalid_tokens:
if invalid_token is not None:
token_data = jwt_handler.verify_token(invalid_token)
assert token_data is None
def test_create_token_response(self, jwt_handler):
"""Test pentru crearea răspunsului complet cu token-uri"""
username = "testuser"
companies = ["COMP1", "COMP2"]
permissions = ["read", "reports"]
response = jwt_handler.create_token_response(
username=username,
companies=companies,
permissions=permissions,
include_refresh=True
)
assert isinstance(response, TokenResponse)
assert response.access_token is not None
assert response.refresh_token is not None
assert response.token_type == "bearer"
assert response.expires_in > 0
def test_refresh_access_token(self, jwt_handler):
"""Test pentru refresh-ul access token-urilor"""
username = "testuser"
refresh_token = jwt_handler.create_refresh_token(username, 123)
companies = ["COMP1", "COMP2"]
new_access_token = jwt_handler.refresh_access_token(
refresh_token, companies, ["read", "write"]
)
assert new_access_token is not None
# Verifică noul token
token_data = jwt_handler.verify_token(new_access_token)
assert token_data.username == username
assert token_data.companies == companies
assert token_data.token_type == "access"
class TestUserAuthService:
"""Test suite pentru User Authentication Service"""
@pytest.fixture
def auth_service(self):
"""Fixture pentru auth service cu mock database"""
return UserAuthService()
@pytest.fixture
def mock_oracle_pool(self):
"""Mock pentru Oracle connection pool"""
with patch('roa2web.shared.auth.auth_service.oracle_pool') as mock_pool:
yield mock_pool
@pytest.mark.asyncio
async def test_verify_user_credentials_success(self, auth_service, mock_oracle_pool):
"""Test pentru verificarea cu succes a credențialelor"""
# Mock pentru conexiunea Oracle
mock_connection = AsyncMock()
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = [1] # Success
mock_connection.__aenter__.return_value = mock_connection
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
mock_oracle_pool.get_connection.return_value = mock_connection
result = await auth_service.verify_user_credentials("testuser", "password")
assert result is True
mock_cursor.execute.assert_called_once()
@pytest.mark.asyncio
async def test_verify_user_credentials_failure(self, auth_service, mock_oracle_pool):
"""Test pentru verificarea eșuată a credențialelor"""
# Mock pentru conexiunea Oracle
mock_connection = AsyncMock()
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = [0] # Failure
mock_connection.__aenter__.return_value = mock_connection
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
mock_oracle_pool.get_connection.return_value = mock_connection
result = await auth_service.verify_user_credentials("testuser", "wrongpassword")
assert result is False
@pytest.mark.asyncio
async def test_get_user_companies(self, auth_service, mock_oracle_pool):
"""Test pentru obținerea firmelor utilizatorului"""
# Mock pentru conexiunea Oracle
mock_connection = AsyncMock()
mock_cursor = MagicMock()
mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"], ["COMP3"]]
mock_connection.__aenter__.return_value = mock_connection
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
mock_oracle_pool.get_connection.return_value = mock_connection
companies = await auth_service.get_user_companies("testuser")
assert companies == ["COMP1", "COMP2", "COMP3"]
mock_cursor.execute.assert_called_once()
@pytest.mark.asyncio
async def test_authenticate_and_create_tokens_success(self, auth_service, mock_oracle_pool):
"""Test pentru autentificare completă cu succes"""
# Mock pentru conexiunea Oracle
mock_connection = AsyncMock()
mock_cursor = MagicMock()
# Prima chiamată pentru verificare credențiale (succes)
# A doua chiamată pentru obținerea firmelor
mock_cursor.fetchone.return_value = [1]
mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"]]
mock_connection.__aenter__.return_value = mock_connection
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
mock_oracle_pool.get_connection.return_value = mock_connection
success, token_response, error = await auth_service.authenticate_and_create_tokens(
"testuser", "password"
)
assert success is True
assert token_response is not None
assert isinstance(token_response, TokenResponse)
assert error is None
@pytest.mark.asyncio
async def test_authenticate_and_create_tokens_failure(self, auth_service, mock_oracle_pool):
"""Test pentru autentificare eșuată"""
# Mock pentru conexiunea Oracle
mock_connection = AsyncMock()
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = [0] # Credențiale invalide
mock_connection.__aenter__.return_value = mock_connection
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
mock_oracle_pool.get_connection.return_value = mock_connection
success, token_response, error = await auth_service.authenticate_and_create_tokens(
"testuser", "wrongpassword"
)
assert success is False
assert token_response is None
assert error is not None
@pytest.mark.asyncio
async def test_validate_user_company_access(self, auth_service, mock_oracle_pool):
"""Test pentru validarea accesului la firmă"""
# Mock pentru conexiunea Oracle
mock_connection = AsyncMock()
mock_cursor = MagicMock()
mock_cursor.fetchall.return_value = [["COMP1"], ["COMP2"]]
mock_connection.__aenter__.return_value = mock_connection
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
mock_oracle_pool.get_connection.return_value = mock_connection
# Test acces valid
has_access = await auth_service.validate_user_company_access("testuser", "COMP1")
assert has_access is True
# Test acces invalid
has_access = await auth_service.validate_user_company_access("testuser", "COMP3")
assert has_access is False
def test_cache_functionality(self, auth_service):
"""Test pentru funcționalitatea de cache"""
# Test cache stats
stats = auth_service.get_cache_stats()
assert isinstance(stats, dict)
assert 'total_entries' in stats
assert 'valid_entries' in stats
assert 'cache_hit_ratio' in stats
# Test clear cache
auth_service.clear_cache()
stats_after_clear = auth_service.get_cache_stats()
assert stats_after_clear['total_entries'] == 0
class TestRateLimiter:
"""Test suite pentru Rate Limiter"""
@pytest.fixture
def rate_limiter(self):
"""Fixture pentru rate limiter"""
return RateLimiter(max_requests=3, time_window=5)
def test_rate_limiting_within_limit(self, rate_limiter):
"""Test pentru request-uri în limita permisă"""
client_ip = "192.168.1.1"
# Primele 3 request-uri trebuie să fie permise
for i in range(3):
assert rate_limiter.is_allowed(client_ip) is True
# Al 4-lea request trebuie refuzat
assert rate_limiter.is_allowed(client_ip) is False
def test_rate_limiting_reset_after_time(self, rate_limiter):
"""Test pentru resetarea rate limiting după expirarea ferestrei"""
client_ip = "192.168.1.2"
# Consumă toate request-urile
for i in range(3):
assert rate_limiter.is_allowed(client_ip) is True
# Request-ul următor trebuie refuzat
assert rate_limiter.is_allowed(client_ip) is False
# Simulează trecerea timpului
time.sleep(6)
# Acum ar trebui să funcționeze din nou
assert rate_limiter.is_allowed(client_ip) is True
def test_rate_limiting_different_ips(self, rate_limiter):
"""Test pentru rate limiting pe IP-uri diferite"""
ip1 = "192.168.1.1"
ip2 = "192.168.1.2"
# Consumă toate request-urile pentru primul IP
for i in range(3):
assert rate_limiter.is_allowed(ip1) is True
assert rate_limiter.is_allowed(ip1) is False
# Al doilea IP ar trebui să funcționeze normal
for i in range(3):
assert rate_limiter.is_allowed(ip2) is True
class TestAuthenticationRoutes:
"""Test suite pentru rutele de autentificare"""
@pytest.fixture
def app(self):
"""Fixture pentru aplicația FastAPI de test"""
app = FastAPI()
auth_router = create_auth_router()
app.include_router(auth_router)
return app
@pytest.fixture
def client(self, app):
"""Fixture pentru client de test"""
return TestClient(app)
@pytest.fixture
def mock_auth_service(self):
"""Mock pentru auth service"""
with patch('roa2web.shared.auth.routes.auth_service') as mock_service:
yield mock_service
def test_login_success(self, client, mock_auth_service):
"""Test pentru login cu succes"""
# Mock pentru autentificare cu succes
mock_token_response = TokenResponse(
access_token="test-access-token",
refresh_token="test-refresh-token",
token_type="bearer",
expires_in=1800,
user=CurrentUser(
username="testuser",
companies=["COMP1", "COMP2"],
permissions=[PermissionType.READ, PermissionType.REPORTS]
)
)
mock_auth_service.authenticate_and_create_tokens.return_value = (
True, mock_token_response, None
)
mock_auth_service.get_user_companies.return_value = ["COMP1", "COMP2"]
response = client.post("/auth/login", json={
"username": "testuser",
"password": "password"
})
assert response.status_code == 200
data = response.json()
assert data["access_token"] == "test-access-token"
assert data["token_type"] == "bearer"
assert data["user"]["username"] == "testuser"
def test_login_failure(self, client, mock_auth_service):
"""Test pentru login eșuat"""
mock_auth_service.authenticate_and_create_tokens.return_value = (
False, None, "Invalid credentials"
)
response = client.post("/auth/login", json={
"username": "testuser",
"password": "wrongpassword"
})
assert response.status_code == 401
assert "Invalid credentials" in response.json()["detail"]
def test_protected_endpoint_without_token(self, client):
"""Test pentru endpoint protejat fără token"""
response = client.get("/auth/me")
assert response.status_code == 401
def test_protected_endpoint_with_valid_token(self, client):
"""Test pentru endpoint protejat cu token valid"""
# Creează un token de test
jwt_handler = JWTHandler(secret_key="test-secret-key")
token = jwt_handler.create_access_token(
username="testuser",
companies=["COMP1"],
permissions=["read"]
)
with patch('roa2web.shared.auth.dependencies.jwt_handler', jwt_handler):
response = client.get("/auth/me", headers={
"Authorization": f"Bearer {token}"
})
assert response.status_code == 200
data = response.json()
assert data["username"] == "testuser"
class TestSecurityFeatures:
"""Test suite pentru funcții de securitate"""
def test_jwt_token_tampering(self):
"""Test pentru detectarea modificării token-urilor JWT"""
jwt_handler = JWTHandler(secret_key="test-secret-key")
token = jwt_handler.create_access_token("testuser", ["COMP1"])
# Modifică token-ul
tampered_token = token[:-5] + "XXXXX"
# Token-ul modificat trebuie să fie invalid
token_data = jwt_handler.verify_token(tampered_token)
assert token_data is None
def test_jwt_secret_key_different(self):
"""Test pentru token-uri semnate cu chei diferite"""
jwt_handler1 = JWTHandler(secret_key="secret1")
jwt_handler2 = JWTHandler(secret_key="secret2")
token = jwt_handler1.create_access_token("testuser", ["COMP1"])
# Token-ul nu trebuie să fie valid cu o cheie diferită
token_data = jwt_handler2.verify_token(token)
assert token_data is None
@pytest.mark.asyncio
async def test_sql_injection_prevention(self):
"""Test pentru prevenirea SQL injection"""
auth_service = UserAuthService()
with patch('roa2web.shared.auth.auth_service.oracle_pool') as mock_pool:
mock_connection = AsyncMock()
mock_cursor = MagicMock()
mock_connection.__aenter__.return_value = mock_connection
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
mock_oracle_pool.get_connection.return_value = mock_connection
# Încearcă SQL injection în username
malicious_username = "admin'; DROP TABLE users; --"
await auth_service.verify_user_credentials(malicious_username, "password")
# Verifică că query-ul folosește parametri legați
mock_cursor.execute.assert_called_once()
call_args = mock_cursor.execute.call_args
assert ':username' in call_args[0][0] # Query cu parametri
assert malicious_username.upper() == call_args[1]['username'] # Parametri legați
@pytest.mark.performance
class TestPerformance:
"""Test suite pentru performanță"""
def test_jwt_token_creation_performance(self):
"""Test pentru performanța creării token-urilor"""
jwt_handler = JWTHandler(secret_key="test-secret-key")
start_time = time.time()
# Creează 1000 de token-uri
for i in range(1000):
jwt_handler.create_access_token(f"user{i}", ["COMP1"])
end_time = time.time()
total_time = end_time - start_time
# Ar trebui să dureze mai puțin de 1 secundă
assert total_time < 1.0
print(f"Created 1000 tokens in {total_time:.4f} seconds")
def test_jwt_token_verification_performance(self):
"""Test pentru performanța verificării token-urilor"""
jwt_handler = JWTHandler(secret_key="test-secret-key")
# Creează 100 de token-uri
tokens = []
for i in range(100):
token = jwt_handler.create_access_token(f"user{i}", ["COMP1"])
tokens.append(token)
start_time = time.time()
# Verifică toate token-urile
for token in tokens:
jwt_handler.verify_token(token)
end_time = time.time()
total_time = end_time - start_time
# Ar trebui să dureze mai puțin de 0.5 secunde
assert total_time < 0.5
print(f"Verified 100 tokens in {total_time:.4f} seconds")
if __name__ == "__main__":
# Rulează testele
pytest.main([__file__, "-v", "--tb=short"])