Files
space-booking/backend/app/core/deps.py
Claude Agent e21cf03a16 feat: add multi-tenant system with properties, organizations, and public booking
Implement complete multi-property architecture:
- Properties (groups of spaces) with public/private visibility
- Property managers (many-to-many) with role-based permissions
- Organizations with member management
- Anonymous/guest booking support via public API (/api/public/*)
- Property-scoped spaces, bookings, and settings
- Frontend: property selector, organization management, public booking views
- Migration script and updated seed data

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 00:17:21 +00:00

99 lines
3.1 KiB
Python

"""Dependencies for FastAPI routes."""
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import get_db
from app.models.user import User
security = HTTPBearer()
optional_security = HTTPBearer(auto_error=False)
def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
db: Annotated[Session, Depends(get_db)],
) -> User:
"""Get current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = credentials.credentials
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
user_id: str | None = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.id == int(user_id)).first()
if user is None or not user.is_active:
raise credentials_exception
return user
def get_optional_user(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(optional_security)],
db: Annotated[Session, Depends(get_db)],
) -> User | None:
"""Get current user or None for anonymous access."""
if credentials is None:
return None
try:
token = credentials.credentials
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
user_id = payload.get("sub")
if user_id is None:
return None
user = db.query(User).filter(User.id == int(user_id)).first()
if user is None or not user.is_active:
return None
return user
except JWTError:
return None
def get_current_admin(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Verify current user is admin (superadmin or legacy admin)."""
if current_user.role not in ("admin", "superadmin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
return current_user
def get_current_superadmin(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Verify current user is superadmin."""
if current_user.role not in ("admin", "superadmin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Superadmin access required",
)
return current_user
def get_current_manager_or_superadmin(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Verify current user is manager or superadmin."""
if current_user.role not in ("admin", "superadmin", "manager"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Manager or admin access required",
)
return current_user