"""User endpoints.""" from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy.orm import Session from app.core.deps import get_current_admin, get_current_manager_or_superadmin, get_current_user, get_db from app.core.security import get_password_hash from app.models.user import User from app.schemas.user import ( ResetPasswordRequest, UserCreate, UserResponse, UserStatusUpdate, UserUpdate, ) from app.services.audit_service import log_action from app.utils.timezone import get_available_timezones router = APIRouter(prefix="/users", tags=["users"]) admin_router = APIRouter(prefix="/admin/users", tags=["admin"]) class TimezoneUpdate(BaseModel): """Schema for updating user timezone.""" timezone: str @router.get("/me", response_model=UserResponse) def get_current_user_info( current_user: Annotated[User, Depends(get_current_user)], ) -> User: """Get current user information.""" return current_user @router.get("/timezones", response_model=list[str]) def list_timezones() -> list[str]: """Get list of available timezones.""" return get_available_timezones() @router.put("/me/timezone") def update_timezone( data: TimezoneUpdate, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ): """Update user timezone preference.""" # Validate timezone import pytz try: pytz.timezone(data.timezone) except pytz.exceptions.UnknownTimeZoneError: raise HTTPException(status_code=400, detail="Invalid timezone") current_user.timezone = data.timezone # type: ignore[assignment] db.commit() return {"message": "Timezone updated", "timezone": data.timezone} @admin_router.get("", response_model=list[UserResponse]) def list_users( db: Annotated[Session, Depends(get_db)], _: Annotated[User, Depends(get_current_manager_or_superadmin)], role: str | None = None, organization: str | None = None, ) -> list[User]: """ Get list of users (manager or admin). Supports filtering by role and organization. """ query = db.query(User) if role: query = query.filter(User.role == role) if organization: query = query.filter(User.organization == organization) users = query.order_by(User.full_name).all() return users @admin_router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user_data: UserCreate, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> User: """ Create a new user (admin only). - email: must be unique - password: will be hashed - role: "admin" or "user" - organization: optional """ # Check if user with same email exists existing = db.query(User).filter(User.email == user_data.email).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"User with email '{user_data.email}' already exists", ) # Validate role if user_data.role not in ["admin", "superadmin", "manager", "user"]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Role must be 'superadmin', 'manager', or 'user'", ) user = User( email=user_data.email, full_name=user_data.full_name, hashed_password=get_password_hash(user_data.password), role=user_data.role, organization=user_data.organization, is_active=True, ) db.add(user) db.commit() db.refresh(user) # Log the action log_action( db=db, action="user_created", user_id=current_admin.id, target_type="user", target_id=user.id, details={"email": user.email, "role": user.role} ) return user @admin_router.put("/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_data: UserUpdate, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> User: """ Update an existing user (admin only). Only fields provided in request will be updated (partial update). """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Check if new email conflicts with another user if user_data.email and user_data.email != user.email: existing = db.query(User).filter(User.email == user_data.email).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"User with email '{user_data.email}' already exists", ) # Validate role if user_data.role and user_data.role not in ["admin", "superadmin", "manager", "user"]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Role must be 'superadmin', 'manager', or 'user'", ) # Track what changed updated_fields = [] if user_data.email is not None and user_data.email != user.email: updated_fields.append("email") if user_data.full_name is not None and user_data.full_name != user.full_name: updated_fields.append("full_name") if user_data.role is not None and user_data.role != user.role: updated_fields.append("role") if user_data.organization is not None and user_data.organization != user.organization: updated_fields.append("organization") # Update only provided fields if user_data.email is not None: setattr(user, "email", user_data.email) if user_data.full_name is not None: setattr(user, "full_name", user_data.full_name) if user_data.role is not None: setattr(user, "role", user_data.role) if user_data.organization is not None: setattr(user, "organization", user_data.organization) db.commit() db.refresh(user) # Log the action log_action( db=db, action="user_updated", user_id=current_admin.id, target_type="user", target_id=user.id, details={"updated_fields": updated_fields} ) return user @admin_router.patch("/{user_id}/status", response_model=UserResponse) def update_user_status( user_id: int, status_data: UserStatusUpdate, db: Annotated[Session, Depends(get_db)], _: Annotated[User, Depends(get_current_admin)], ) -> User: """ Activate or deactivate a user (admin only). Deactivated users cannot log in. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) setattr(user, "is_active", status_data.is_active) db.commit() db.refresh(user) return user @admin_router.post("/{user_id}/reset-password", response_model=UserResponse) def reset_user_password( user_id: int, reset_data: ResetPasswordRequest, db: Annotated[Session, Depends(get_db)], _: Annotated[User, Depends(get_current_admin)], ) -> User: """ Reset a user's password (admin only). Password will be hashed before storing. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) setattr(user, "hashed_password", get_password_hash(reset_data.new_password)) db.commit() db.refresh(user) return user