"""Property access permission utilities.""" from sqlalchemy.orm import Session from app.models.organization_member import OrganizationMember from app.models.property import Property from app.models.property_access import PropertyAccess from app.models.property_manager import PropertyManager from app.models.user import User from fastapi import HTTPException, status def verify_property_access( db: Session, user: User | None, property_id: int, require_manager: bool = False ) -> bool: """Verify user has access to a property. Raises HTTPException if denied.""" prop = db.query(Property).filter(Property.id == property_id).first() if not prop: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Property not found") if user is None: # Anonymous - only public properties if not prop.is_public: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Property is private") if require_manager: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions") return True # Superadmin always has access if user.role in ("superadmin", "admin"): return True if require_manager: # Manager must own this property if user.role == "manager": pm = db.query(PropertyManager).filter( PropertyManager.property_id == property_id, PropertyManager.user_id == user.id, ).first() if pm: return True raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions") # Manager has access to managed properties if user.role == "manager": pm = db.query(PropertyManager).filter( PropertyManager.property_id == property_id, PropertyManager.user_id == user.id, ).first() if pm: return True # Public property - anyone has access if prop.is_public: return True # Check explicit access (user) access = db.query(PropertyAccess).filter( PropertyAccess.property_id == property_id, PropertyAccess.user_id == user.id, ).first() if access: return True # Check explicit access (organization) org_ids = [ m.organization_id for m in db.query(OrganizationMember).filter(OrganizationMember.user_id == user.id).all() ] if org_ids: org_access = db.query(PropertyAccess).filter( PropertyAccess.property_id == property_id, PropertyAccess.organization_id.in_(org_ids), ).first() if org_access: return True raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No access to this property") def get_manager_property_ids(db: Session, user_id: int) -> list[int]: """Get list of property IDs managed by user.""" return [ pm.property_id for pm in db.query(PropertyManager).filter(PropertyManager.user_id == user_id).all() ] def get_user_accessible_property_ids(db: Session, user_id: int) -> list[int]: """Get all property IDs accessible by user (public + explicitly granted).""" # Public properties public_ids = [ p.id for p in db.query(Property).filter(Property.is_public == True, Property.is_active == True).all() # noqa: E712 ] # Directly granted direct_ids = [ a.property_id for a in db.query(PropertyAccess).filter(PropertyAccess.user_id == user_id).all() ] # Org granted org_ids = [ m.organization_id for m in db.query(OrganizationMember).filter(OrganizationMember.user_id == user_id).all() ] org_property_ids = [] if org_ids: org_property_ids = [ a.property_id for a in db.query(PropertyAccess).filter(PropertyAccess.organization_id.in_(org_ids)).all() ] return list(set(public_ids + direct_ids + org_property_ids))