Implement complete multi-property architecture: - Properties (groups of spaces) with public/private visibility - Property managers (many-to-many) with role-based permissions - Organizations with member management - Anonymous/guest booking support via public API (/api/public/*) - Property-scoped spaces, bookings, and settings - Frontend: property selector, organization management, public booking views - Migration script and updated seed data Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
116 lines
3.9 KiB
Python
116 lines
3.9 KiB
Python
"""Property access permission utilities."""
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.organization_member import OrganizationMember
|
|
from app.models.property import Property
|
|
from app.models.property_access import PropertyAccess
|
|
from app.models.property_manager import PropertyManager
|
|
from app.models.user import User
|
|
|
|
from fastapi import HTTPException, status
|
|
|
|
|
|
def verify_property_access(
|
|
db: Session, user: User | None, property_id: int, require_manager: bool = False
|
|
) -> bool:
|
|
"""Verify user has access to a property. Raises HTTPException if denied."""
|
|
prop = db.query(Property).filter(Property.id == property_id).first()
|
|
if not prop:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Property not found")
|
|
|
|
if user is None:
|
|
# Anonymous - only public properties
|
|
if not prop.is_public:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Property is private")
|
|
if require_manager:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
return True
|
|
|
|
# Superadmin always has access
|
|
if user.role in ("superadmin", "admin"):
|
|
return True
|
|
|
|
if require_manager:
|
|
# Manager must own this property
|
|
if user.role == "manager":
|
|
pm = db.query(PropertyManager).filter(
|
|
PropertyManager.property_id == property_id,
|
|
PropertyManager.user_id == user.id,
|
|
).first()
|
|
if pm:
|
|
return True
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
|
|
# Manager has access to managed properties
|
|
if user.role == "manager":
|
|
pm = db.query(PropertyManager).filter(
|
|
PropertyManager.property_id == property_id,
|
|
PropertyManager.user_id == user.id,
|
|
).first()
|
|
if pm:
|
|
return True
|
|
|
|
# Public property - anyone has access
|
|
if prop.is_public:
|
|
return True
|
|
|
|
# Check explicit access (user)
|
|
access = db.query(PropertyAccess).filter(
|
|
PropertyAccess.property_id == property_id,
|
|
PropertyAccess.user_id == user.id,
|
|
).first()
|
|
if access:
|
|
return True
|
|
|
|
# Check explicit access (organization)
|
|
org_ids = [
|
|
m.organization_id
|
|
for m in db.query(OrganizationMember).filter(OrganizationMember.user_id == user.id).all()
|
|
]
|
|
if org_ids:
|
|
org_access = db.query(PropertyAccess).filter(
|
|
PropertyAccess.property_id == property_id,
|
|
PropertyAccess.organization_id.in_(org_ids),
|
|
).first()
|
|
if org_access:
|
|
return True
|
|
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No access to this property")
|
|
|
|
|
|
def get_manager_property_ids(db: Session, user_id: int) -> list[int]:
|
|
"""Get list of property IDs managed by user."""
|
|
return [
|
|
pm.property_id
|
|
for pm in db.query(PropertyManager).filter(PropertyManager.user_id == user_id).all()
|
|
]
|
|
|
|
|
|
def get_user_accessible_property_ids(db: Session, user_id: int) -> list[int]:
|
|
"""Get all property IDs accessible by user (public + explicitly granted)."""
|
|
# Public properties
|
|
public_ids = [
|
|
p.id
|
|
for p in db.query(Property).filter(Property.is_public == True, Property.is_active == True).all() # noqa: E712
|
|
]
|
|
|
|
# Directly granted
|
|
direct_ids = [
|
|
a.property_id
|
|
for a in db.query(PropertyAccess).filter(PropertyAccess.user_id == user_id).all()
|
|
]
|
|
|
|
# Org granted
|
|
org_ids = [
|
|
m.organization_id
|
|
for m in db.query(OrganizationMember).filter(OrganizationMember.user_id == user_id).all()
|
|
]
|
|
org_property_ids = []
|
|
if org_ids:
|
|
org_property_ids = [
|
|
a.property_id
|
|
for a in db.query(PropertyAccess).filter(PropertyAccess.organization_id.in_(org_ids)).all()
|
|
]
|
|
|
|
return list(set(public_ids + direct_ids + org_property_ids))
|