"""Tests for user registration and email verification.""" from datetime import datetime, timedelta from unittest.mock import patch import pytest from jose import jwt from app.core.config import settings from app.models.user import User def test_register_success(client): """Test successful registration.""" response = client.post( "/api/auth/register", json={ "email": "newuser@example.com", "password": "Test1234", "confirm_password": "Test1234", "full_name": "New User", "organization": "Test Org", }, ) assert response.status_code == 201 data = response.json() assert data["email"] == "newuser@example.com" assert "verify" in data["message"].lower() def test_register_duplicate_email(client, test_user): """Test registering with existing email.""" response = client.post( "/api/auth/register", json={ "email": test_user.email, "password": "Test1234", "confirm_password": "Test1234", "full_name": "Duplicate User", "organization": "Test Org", }, ) assert response.status_code == 400 assert "already registered" in response.json()["detail"].lower() def test_register_weak_password(client): """Test password validation.""" # No uppercase response = client.post( "/api/auth/register", json={ "email": "test@example.com", "password": "test1234", "confirm_password": "test1234", "full_name": "Test User", "organization": "Test Org", }, ) assert response.status_code == 422 # No digit response = client.post( "/api/auth/register", json={ "email": "test@example.com", "password": "Testtest", "confirm_password": "Testtest", "full_name": "Test User", "organization": "Test Org", }, ) assert response.status_code == 422 # Too short response = client.post( "/api/auth/register", json={ "email": "test@example.com", "password": "Test12", "confirm_password": "Test12", "full_name": "Test User", "organization": "Test Org", }, ) assert response.status_code == 422 def test_register_passwords_mismatch(client): """Test password confirmation.""" response = client.post( "/api/auth/register", json={ "email": "test@example.com", "password": "Test1234", "confirm_password": "Different1234", "full_name": "Test User", "organization": "Test Org", }, ) assert response.status_code == 422 assert "password" in response.json()["detail"][0]["msg"].lower() def test_verify_email_success(client, db_session): """Test email verification.""" # Create unverified user user = User( email="verify@example.com", hashed_password="hashed", full_name="Test User", organization="Test Org", role="user", is_active=False, ) db_session.add(user) db_session.commit() db_session.refresh(user) # Generate token token = jwt.encode( { "sub": str(user.id), "type": "email_verification", "exp": datetime.utcnow() + timedelta(hours=24), }, settings.secret_key, algorithm="HS256", ) # Verify response = client.post("/api/auth/verify", json={"token": token}) assert response.status_code == 200 assert "successfully" in response.json()["message"].lower() # Check user is now active db_session.refresh(user) assert user.is_active is True def test_verify_email_expired_token(client, db_session): """Test expired verification token.""" # Create unverified user user = User( email="verify@example.com", hashed_password="hashed", full_name="Test User", organization="Test Org", role="user", is_active=False, ) db_session.add(user) db_session.commit() db_session.refresh(user) # Generate expired token token = jwt.encode( { "sub": str(user.id), "type": "email_verification", "exp": datetime.utcnow() - timedelta(hours=1), # Expired }, settings.secret_key, algorithm="HS256", ) # Try to verify response = client.post("/api/auth/verify", json={"token": token}) assert response.status_code == 400 assert "expired" in response.json()["detail"].lower() def test_verify_email_invalid_token(client): """Test invalid verification token.""" response = client.post("/api/auth/verify", json={"token": "invalid-token"}) assert response.status_code == 400 assert "invalid" in response.json()["detail"].lower() def test_verify_email_wrong_token_type(client, db_session): """Test token with wrong type.""" # Create unverified user user = User( email="verify@example.com", hashed_password="hashed", full_name="Test User", organization="Test Org", role="user", is_active=False, ) db_session.add(user) db_session.commit() db_session.refresh(user) # Generate token with wrong type token = jwt.encode( { "sub": str(user.id), "type": "access_token", # Wrong type "exp": datetime.utcnow() + timedelta(hours=24), }, settings.secret_key, algorithm="HS256", ) # Try to verify response = client.post("/api/auth/verify", json={"token": token}) assert response.status_code == 400 assert "invalid" in response.json()["detail"].lower() def test_verify_email_already_verified(client, db_session): """Test verifying already verified account.""" # Create verified user user = User( email="verify@example.com", hashed_password="hashed", full_name="Test User", organization="Test Org", role="user", is_active=True, # Already active ) db_session.add(user) db_session.commit() db_session.refresh(user) # Generate token token = jwt.encode( { "sub": str(user.id), "type": "email_verification", "exp": datetime.utcnow() + timedelta(hours=24), }, settings.secret_key, algorithm="HS256", ) # Try to verify response = client.post("/api/auth/verify", json={"token": token}) assert response.status_code == 200 assert "already verified" in response.json()["message"].lower() def test_resend_verification(client, db_session): """Test resending verification email.""" # Create unverified user user = User( email="resend@example.com", hashed_password="hashed", full_name="Test User", organization="Test Org", role="user", is_active=False, ) db_session.add(user) db_session.commit() # Request resend response = client.post( "/api/auth/resend-verification", params={"email": user.email} ) assert response.status_code == 200 assert "verification link" in response.json()["message"].lower() def test_resend_verification_nonexistent_email(client): """Test resending to non-existent email.""" response = client.post( "/api/auth/resend-verification", params={"email": "nonexistent@example.com"}, ) # Should not reveal if email exists assert response.status_code == 200 assert "if the email exists" in response.json()["message"].lower() def test_resend_verification_already_verified(client, db_session): """Test resending for already verified account.""" # Create verified user user = User( email="verified@example.com", hashed_password="hashed", full_name="Test User", organization="Test Org", role="user", is_active=True, ) db_session.add(user) db_session.commit() # Try to resend response = client.post( "/api/auth/resend-verification", params={"email": user.email} ) assert response.status_code == 400 assert "already verified" in response.json()["detail"].lower() def test_login_before_verification(client, db_session): """Test that unverified users cannot log in.""" # Create unverified user from app.core.security import get_password_hash password = "Test1234" user = User( email="unverified@example.com", hashed_password=get_password_hash(password), full_name="Test User", organization="Test Org", role="user", is_active=False, # Not verified ) db_session.add(user) db_session.commit() # Try to login response = client.post( "/api/auth/login", json={"email": user.email, "password": password}, ) assert response.status_code == 403 assert "disabled" in response.json()["detail"].lower()