Files
space-booking/backend/app/api/bookings.py
Claude Agent 953f3121cf fix(bookings): increase admin bookings limit validation to 500
Frontend sends limit=200 which exceeded the le=100 validation,
causing 422 errors on the History page for managers.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-04 14:41:48 +00:00

1398 lines
49 KiB
Python

"""Booking endpoints."""
from datetime import datetime, time, timedelta
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.core.deps import get_current_admin, get_current_manager_or_superadmin, get_current_user, get_db
from app.core.permissions import get_manager_property_ids, verify_property_access
from app.models.booking import Booking
from app.models.property_manager import PropertyManager
from app.models.settings import Settings
from app.models.space import Space
from app.models.user import User
from app.services.audit_service import log_action
from app.services.email_service import send_booking_notification
from app.services.google_calendar_service import (
create_calendar_event,
delete_calendar_event,
update_calendar_event,
)
from app.services.notification_service import create_notification
from app.schemas.booking import (
AdminCancelRequest,
AvailabilityCheck,
BookingAdminCreate,
BookingCalendarAdmin,
BookingCalendarPublic,
BookingCreate,
BookingPendingDetail,
BookingRecurringCreate,
BookingReschedule,
BookingResponse,
BookingUpdate,
BookingWithSpace,
ConflictingBooking,
RecurringBookingResult,
RejectRequest,
)
from app.services.booking_service import validate_booking_rules
from app.utils.timezone import convert_to_utc
router = APIRouter(prefix="/spaces", tags=["bookings"])
def _verify_manager_booking_access(db: Session, booking: Booking, current_user: User) -> None:
"""Verify that a manager has access to the booking's property.
Superadmins always have access. Managers can only act on bookings
for spaces within their managed properties.
"""
if current_user.role in ("superadmin", "admin"):
return
if current_user.role == "manager":
managed_ids = get_manager_property_ids(db, current_user.id)
space = booking.space
if space and space.property_id and space.property_id not in managed_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this property's bookings",
)
def _verify_manager_space_access(db: Session, space: Space, current_user: User) -> None:
"""Verify that a manager has access to a space's property.
Used for creating bookings where we have the space but no booking yet.
"""
if current_user.role in ("superadmin", "admin"):
return
if current_user.role == "manager":
managed_ids = get_manager_property_ids(db, current_user.id)
if space.property_id and space.property_id not in managed_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this property's spaces",
)
bookings_router = APIRouter(prefix="/bookings", tags=["bookings"])
@router.get("/{space_id}/bookings")
def get_space_bookings(
space_id: int,
start: Annotated[datetime, Query(description="Start datetime (ISO format)")],
end: Annotated[datetime, Query(description="End datetime (ISO format)")],
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> list[BookingCalendarPublic] | list[BookingCalendarAdmin]:
"""
Get bookings for a space in a given time range.
- **Users**: See only public data (start, end, status, title)
- **Admins**: See all details including user info and descriptions
Query parameters:
- **start**: Start datetime in ISO format (e.g., 2024-01-01T00:00:00)
- **end**: End datetime in ISO format (e.g., 2024-01-31T23:59:59)
"""
# Check if space exists
space = db.query(Space).filter(Space.id == space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Verify user has access to the space's property
if space.property_id:
verify_property_access(db, current_user, space.property_id)
# Query bookings in the time range (only active bookings)
query = db.query(Booking).filter(
Booking.space_id == space_id,
Booking.status.in_(["approved", "pending"]),
Booking.start_datetime < end,
Booking.end_datetime > start,
)
bookings = query.order_by(Booking.start_datetime).all()
# Return different schemas based on user role
if current_user.role in ("admin", "superadmin", "manager"):
return [BookingCalendarAdmin.model_validate(b) for b in bookings]
else:
return [BookingCalendarPublic.model_validate(b) for b in bookings]
@bookings_router.get("/check-availability", response_model=AvailabilityCheck)
def check_availability(
space_id: Annotated[int, Query(description="Space ID to check")],
start_datetime: Annotated[datetime, Query(description="Start datetime (ISO format)")],
end_datetime: Annotated[datetime, Query(description="End datetime (ISO format)")],
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> AvailabilityCheck:
"""
Check if time slot is available (returns warning if conflicts exist).
Returns information about conflicting bookings (pending + approved) without
blocking the request. This allows users to make informed decisions about
their booking times.
Query parameters:
- **space_id**: ID of the space to check
- **start_datetime**: Start datetime in ISO format (e.g., 2024-06-15T10:00:00)
- **end_datetime**: End datetime in ISO format (e.g., 2024-06-15T12:00:00)
"""
from sqlalchemy import and_, or_
from sqlalchemy.orm import joinedload
# Check if space exists
space = db.query(Space).filter(Space.id == space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Verify user has access to the space's property
if space.property_id:
verify_property_access(db, current_user, space.property_id)
# Find conflicting bookings (approved + pending)
conflicts = (
db.query(Booking)
.options(joinedload(Booking.user))
.filter(
Booking.space_id == space_id,
Booking.status.in_(["approved", "pending"]),
or_(
# Conflict starts during this booking
and_(
Booking.start_datetime <= start_datetime,
Booking.end_datetime > start_datetime,
),
# Conflict ends during this booking
and_(
Booking.start_datetime < end_datetime,
Booking.end_datetime >= end_datetime,
),
# Conflict is completely within this booking
and_(
Booking.start_datetime >= start_datetime,
Booking.end_datetime <= end_datetime,
),
),
)
.all()
)
if not conflicts:
return AvailabilityCheck(
available=True, conflicts=[], message="Time slot is available"
)
# Count pending vs approved
pending_count = sum(1 for b in conflicts if b.status == "pending")
approved_count = sum(1 for b in conflicts if b.status == "approved")
message = ""
if approved_count > 0:
message = f"Time slot has {approved_count} approved booking(s). Choose another time."
elif pending_count > 0:
message = f"Time slot has {pending_count} pending request(s). Your request will wait for admin review."
return AvailabilityCheck(
available=approved_count == 0, # Available if no approved conflicts
conflicts=[
ConflictingBooking(
id=b.id,
user_name=b.user.full_name,
title=b.title,
status=b.status,
start_datetime=b.start_datetime,
end_datetime=b.end_datetime,
)
for b in conflicts
],
message=message,
)
@bookings_router.get("/my", response_model=list[BookingWithSpace])
def get_my_bookings(
status_filter: Annotated[str | None, Query(alias="status")] = None,
db: Annotated[Session, Depends(get_db)] = None, # type: ignore[assignment]
current_user: Annotated[User, Depends(get_current_user)] = None, # type: ignore[assignment]
) -> list[BookingWithSpace]:
"""
Get all bookings for the current user.
Returns bookings with associated space details, sorted by most recent first.
Query parameters:
- **status** (optional): Filter by booking status (pending/approved/rejected/canceled)
"""
# Base query: user's bookings with space join
query = (
db.query(Booking)
.join(Space, Booking.space_id == Space.id)
.filter(Booking.user_id == current_user.id)
)
# Apply status filter if provided
if status_filter:
query = query.filter(Booking.status == status_filter)
# Order by most recent first
bookings = query.order_by(Booking.created_at.desc()).all()
return [BookingWithSpace.model_validate(b) for b in bookings]
@bookings_router.get("/my/calendar", response_model=list[BookingWithSpace])
def get_my_bookings_calendar(
start: Annotated[datetime, Query(description="Start datetime (ISO format)")],
end: Annotated[datetime, Query(description="End datetime (ISO format)")],
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> list[BookingWithSpace]:
"""
Get user's bookings for calendar view within date range.
Query parameters:
- **start**: Start datetime in ISO format (e.g., 2024-01-01T00:00:00)
- **end**: End datetime in ISO format (e.g., 2024-01-31T23:59:59)
Returns bookings with status approved or pending, sorted by start time.
"""
bookings = (
db.query(Booking)
.join(Space, Booking.space_id == Space.id)
.filter(
Booking.user_id == current_user.id,
Booking.start_datetime < end,
Booking.end_datetime > start,
Booking.status.in_(["approved", "pending"]),
)
.order_by(Booking.start_datetime)
.all()
)
return [BookingWithSpace.model_validate(b) for b in bookings]
@bookings_router.post("", response_model=BookingResponse, status_code=status.HTTP_201_CREATED)
def create_booking(
booking_data: BookingCreate,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> BookingResponse:
"""
Create a new booking request.
- **space_id**: ID of the space to book
- **start_datetime**: Booking start time (ISO format, in user's timezone)
- **end_datetime**: Booking end time (ISO format, in user's timezone)
- **title**: Booking title (1-200 characters)
- **description**: Optional description
The booking will be validated against:
- Duration limits (min/max minutes)
- Working hours
- Existing bookings (no overlaps)
- User's daily booking limit
Times are converted from user's timezone to UTC for storage.
Returns the created booking with status "pending" (requires admin approval).
"""
# Validate that space exists
space = db.query(Space).filter(Space.id == booking_data.space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Verify user has access to the space's property
if space.property_id:
verify_property_access(db, current_user, space.property_id)
# Convert input times from user timezone to UTC
user_timezone = current_user.timezone or "UTC" # type: ignore[attr-defined]
start_datetime_utc = convert_to_utc(booking_data.start_datetime, user_timezone)
end_datetime_utc = convert_to_utc(booking_data.end_datetime, user_timezone)
# Validate booking rules (using UTC times)
user_id = int(current_user.id) # type: ignore[arg-type]
errors = validate_booking_rules(
db=db,
space_id=booking_data.space_id,
user_id=user_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0], # Return first error
)
# Auto-approve if admin/superadmin, otherwise pending
is_admin = current_user.role in ("admin", "superadmin")
# Create booking (with UTC times)
booking = Booking(
user_id=user_id,
space_id=booking_data.space_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
title=booking_data.title,
description=booking_data.description,
status="approved" if is_admin else "pending",
approved_by=current_user.id if is_admin else None, # type: ignore[assignment]
created_at=datetime.utcnow(),
)
db.add(booking)
db.commit()
db.refresh(booking)
if not is_admin:
# Notify admins and property managers
notify_users = {}
# Get superadmins/admins
admins = db.query(User).filter(User.role.in_(["admin", "superadmin"])).all()
for admin in admins:
notify_users[admin.id] = admin
# Get property managers for the space's property
if space.property_id:
manager_ids = [
pm.user_id
for pm in db.query(PropertyManager).filter(PropertyManager.property_id == space.property_id).all()
]
managers = db.query(User).filter(User.id.in_(manager_ids)).all() if manager_ids else []
for mgr in managers:
notify_users[mgr.id] = mgr
for user in notify_users.values():
create_notification(
db=db,
user_id=user.id, # type: ignore[arg-type]
type="booking_created",
title="Noua Cerere de Rezervare",
message=f"Utilizatorul {current_user.full_name} a solicitat rezervarea spațiului {space.name} pentru {booking.start_datetime.strftime('%d.%m.%Y %H:%M')}",
booking_id=booking.id,
)
# Send email notification
background_tasks.add_task(
send_booking_notification,
booking,
"created",
user.email,
current_user.full_name,
None,
)
# Return with timezone conversion
return BookingResponse.from_booking_with_timezone(booking, user_timezone)
@bookings_router.post(
"/recurring", response_model=RecurringBookingResult, status_code=status.HTTP_201_CREATED
)
def create_recurring_booking(
data: BookingRecurringCreate,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> RecurringBookingResult:
"""
Create recurring weekly bookings.
- **space_id**: ID of the space to book
- **start_time**: Time only (e.g., "10:00")
- **duration_minutes**: Duration in minutes
- **title**: Booking title (1-200 characters)
- **description**: Optional description
- **recurrence_days**: List of weekday numbers (0=Monday, 6=Sunday)
- **start_date**: First occurrence date
- **end_date**: Last occurrence date (max 1 year from start)
- **skip_conflicts**: Skip conflicted dates (True) or stop on first conflict (False)
Returns information about created and skipped bookings.
Maximum 52 occurrences allowed.
"""
created_bookings = []
skipped_dates = []
# Validate that space exists
space = db.query(Space).filter(Space.id == data.space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Verify user has access to the space's property
if space.property_id:
verify_property_access(db, current_user, space.property_id)
# Parse time
try:
hour, minute = map(int, data.start_time.split(':'))
except (ValueError, AttributeError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid start_time format. Use HH:MM (e.g., '10:00')",
)
duration = timedelta(minutes=data.duration_minutes)
# Get user timezone
user_timezone = current_user.timezone or "UTC"
# Generate occurrence dates
occurrences = []
current_date = data.start_date
while current_date <= data.end_date:
if current_date.weekday() in data.recurrence_days:
occurrences.append(current_date)
current_date += timedelta(days=1)
# Limit to 52 occurrences
if len(occurrences) > 52:
occurrences = occurrences[:52]
total_requested = len(occurrences)
# Create bookings for each occurrence
for occurrence_date in occurrences:
# Build datetime in user timezone
start_datetime = datetime.combine(occurrence_date, time(hour, minute))
end_datetime = start_datetime + duration
# Convert to UTC for validation and storage
start_datetime_utc = convert_to_utc(start_datetime, user_timezone)
end_datetime_utc = convert_to_utc(end_datetime, user_timezone)
# Validate
user_id = int(current_user.id) # type: ignore[arg-type]
errors = validate_booking_rules(
db=db,
space_id=data.space_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
user_id=user_id,
user_timezone=user_timezone,
)
if errors:
skipped_dates.append({
"date": occurrence_date.isoformat(),
"reason": ", ".join(errors),
})
if not data.skip_conflicts:
# Stop on first conflict
break
else:
# Skip and continue
continue
# Create booking (store UTC times)
booking = Booking(
user_id=user_id,
space_id=data.space_id,
title=data.title,
description=data.description,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
status="pending",
created_at=datetime.utcnow(),
)
db.add(booking)
created_bookings.append(booking)
db.commit()
# Refresh all created bookings
for booking in created_bookings:
db.refresh(booking)
# Send notifications to admins and property managers (in background)
if created_bookings:
notify_users = {}
admins = db.query(User).filter(User.role.in_(["admin", "superadmin"])).all()
for admin in admins:
notify_users[admin.id] = admin
if space.property_id:
manager_ids = [
pm.user_id
for pm in db.query(PropertyManager).filter(PropertyManager.property_id == space.property_id).all()
]
managers = db.query(User).filter(User.id.in_(manager_ids)).all() if manager_ids else []
for mgr in managers:
notify_users[mgr.id] = mgr
for user in notify_users.values():
background_tasks.add_task(
create_notification,
db=db,
user_id=user.id, # type: ignore[arg-type]
type="booking_created",
title="Noi Cereri de Rezervare Recurente",
message=f"Utilizatorul {current_user.full_name} a creat {len(created_bookings)} rezervări recurente.",
)
return RecurringBookingResult(
total_requested=total_requested,
total_created=len(created_bookings),
total_skipped=len(skipped_dates),
created_bookings=[BookingResponse.model_validate(b) for b in created_bookings],
skipped_dates=skipped_dates,
)
@bookings_router.put("/{id}", response_model=BookingResponse)
def update_booking(
id: int,
data: BookingUpdate,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Booking:
"""
Update own booking (pending bookings only).
Users can only edit their own bookings, and only if the booking is still pending.
All fields are optional - only provided fields will be updated.
The booking will be re-validated against all rules after updating fields.
"""
# Get booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check ownership
if booking.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Can only edit your own bookings",
)
# Check status (only pending)
if booking.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Can only edit pending bookings",
)
# Convert input times from user timezone to UTC
user_timezone = current_user.timezone or "UTC" # type: ignore[attr-defined]
# Prepare updated values (don't update model yet - validate first)
# Convert datetimes to UTC if provided
if data.start_datetime is not None:
updated_start = convert_to_utc(data.start_datetime, user_timezone) # type: ignore[assignment]
else:
updated_start = booking.start_datetime # type: ignore[assignment]
if data.end_datetime is not None:
updated_end = convert_to_utc(data.end_datetime, user_timezone) # type: ignore[assignment]
else:
updated_end = booking.end_datetime # type: ignore[assignment]
# Re-validate booking rules BEFORE updating the model
user_id = int(current_user.id) # type: ignore[arg-type]
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
start_datetime=updated_start, # type: ignore[arg-type]
end_datetime=updated_end, # type: ignore[arg-type]
user_id=user_id,
exclude_booking_id=booking.id, # Exclude self from overlap check
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0],
)
# Update fields (only if provided) - validation passed
if data.title is not None:
booking.title = data.title # type: ignore[assignment]
if data.description is not None:
booking.description = data.description # type: ignore[assignment]
if data.start_datetime is not None:
booking.start_datetime = convert_to_utc(data.start_datetime, user_timezone) # type: ignore[assignment]
if data.end_datetime is not None:
booking.end_datetime = convert_to_utc(data.end_datetime, user_timezone) # type: ignore[assignment]
db.commit()
db.refresh(booking)
return booking
@bookings_router.put("/{id}/cancel", response_model=BookingResponse)
def cancel_booking(
id: int,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Booking:
"""
Cancel own booking with time restrictions.
Users can only cancel their own bookings, and only if there is enough time
before the booking start (based on min_hours_before_cancel setting).
Returns the updated booking with status "canceled".
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check if user owns this booking
if booking.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only cancel your own bookings",
)
# Get settings to check min_hours_before_cancel
settings = db.query(Settings).filter(Settings.id == 1).first()
if not settings:
settings = Settings(
id=1,
min_duration_minutes=30,
max_duration_minutes=480,
working_hours_start=8,
working_hours_end=20,
max_bookings_per_day_per_user=3,
min_hours_before_cancel=2,
)
db.add(settings)
db.commit()
db.refresh(settings)
# Calculate hours until booking start
now = datetime.utcnow()
hours_until_start = (booking.start_datetime - now).total_seconds() / 3600 # type: ignore[operator]
# Check if there's enough time to cancel
if hours_until_start < settings.min_hours_before_cancel:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Cannot cancel booking less than {settings.min_hours_before_cancel} hours before start time",
)
# Cancel booking
booking.status = "canceled" # type: ignore[assignment]
# Delete from Google Calendar if event exists
if booking.google_calendar_event_id:
delete_calendar_event(
db=db,
event_id=booking.google_calendar_event_id,
user_id=int(current_user.id), # type: ignore[arg-type]
)
booking.google_calendar_event_id = None # type: ignore[assignment]
db.commit()
db.refresh(booking)
return booking
# Admin endpoints
admin_router = APIRouter(prefix="/admin/bookings", tags=["admin"])
@admin_router.get("/all", response_model=list[BookingPendingDetail])
def get_all_bookings(
status_filter: Annotated[str | None, Query(alias="status")] = None,
space_id: Annotated[int | None, Query()] = None,
user_id: Annotated[int | None, Query()] = None,
property_id: Annotated[int | None, Query()] = None,
start: Annotated[datetime | None, Query(description="Start datetime (ISO format)")] = None,
limit: Annotated[int, Query(ge=1, le=500)] = 20,
db: Annotated[Session, Depends(get_db)] = None, # type: ignore[assignment]
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)] = None, # type: ignore[assignment]
) -> list[BookingPendingDetail]:
"""
Get all bookings across all users (admin/manager).
Returns bookings with user and space details.
Query parameters:
- **status** (optional): Filter by status (pending/approved/rejected/canceled)
- **space_id** (optional): Filter by space ID
- **user_id** (optional): Filter by user ID
- **property_id** (optional): Filter by property ID
- **start** (optional): Only bookings starting from this datetime
- **limit** (optional): Max results (1-100, default 20)
"""
query = (
db.query(Booking)
.join(Space, Booking.space_id == Space.id)
.outerjoin(User, Booking.user_id == User.id)
)
# Property scoping for managers
if current_admin.role == "manager":
managed_ids = get_manager_property_ids(db, current_admin.id)
query = query.filter(Space.property_id.in_(managed_ids))
if property_id is not None:
query = query.filter(Space.property_id == property_id)
if status_filter is not None:
query = query.filter(Booking.status == status_filter)
if space_id is not None:
query = query.filter(Booking.space_id == space_id)
if user_id is not None:
query = query.filter(Booking.user_id == user_id)
if start is not None:
query = query.filter(Booking.end_datetime > start)
bookings = (
query.order_by(Booking.start_datetime.asc())
.limit(limit)
.all()
)
return [BookingPendingDetail.model_validate(b) for b in bookings]
@admin_router.get("/pending", response_model=list[BookingPendingDetail])
def get_pending_bookings(
space_id: Annotated[int | None, Query()] = None,
user_id: Annotated[int | None, Query()] = None,
property_id: Annotated[int | None, Query()] = None,
db: Annotated[Session, Depends(get_db)] = None, # type: ignore[assignment]
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)] = None, # type: ignore[assignment]
) -> list[BookingPendingDetail]:
"""
Get all pending booking requests (admin/manager).
Returns pending bookings with user and space details, sorted by creation time (FIFO).
Query parameters:
- **space_id** (optional): Filter by space ID
- **user_id** (optional): Filter by user ID
- **property_id** (optional): Filter by property ID
"""
# Base query: pending bookings with joins (outerjoin for anonymous bookings)
query = (
db.query(Booking)
.join(Space, Booking.space_id == Space.id)
.outerjoin(User, Booking.user_id == User.id)
.filter(Booking.status == "pending")
)
# Property scoping for managers
if current_admin.role == "manager":
managed_ids = get_manager_property_ids(db, current_admin.id)
query = query.filter(Space.property_id.in_(managed_ids))
if property_id is not None:
query = query.filter(Space.property_id == property_id)
# Apply filters if provided
if space_id is not None:
query = query.filter(Booking.space_id == space_id)
if user_id is not None:
query = query.filter(Booking.user_id == user_id)
# Order by created_at ascending (FIFO - oldest first)
bookings = query.order_by(Booking.created_at.asc()).all()
return [BookingPendingDetail.model_validate(b) for b in bookings]
@admin_router.put("/{id}/approve", response_model=BookingResponse)
def approve_booking(
id: int,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)],
) -> Booking:
"""
Approve a pending booking request (admin only).
The booking must be in "pending" status. This endpoint will:
1. Re-validate booking rules to prevent race conditions (overlap check)
2. Update status to "approved" if validation passes
3. Record the admin who approved it
Returns the updated booking or an error if validation fails.
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Verify manager has access to this booking's property
_verify_manager_booking_access(db, booking, current_admin)
# Check if booking is pending
if booking.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot approve booking with status '{booking.status}'",
)
# Re-validate booking rules to prevent race conditions
# Use booking owner's timezone for validation
user_timezone = (booking.user.timezone or "UTC") if booking.user else "UTC"
booking_user_id = int(booking.user_id) if booking.user_id else 0
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
user_id=booking_user_id,
start_datetime=booking.start_datetime, # type: ignore[arg-type]
end_datetime=booking.end_datetime, # type: ignore[arg-type]
exclude_booking_id=int(booking.id), # type: ignore[arg-type]
user_timezone=user_timezone,
)
if errors:
# If overlap or other validation error detected, return 409 Conflict
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=errors[0],
)
# Approve booking
booking.status = "approved" # type: ignore[assignment]
booking.approved_by = current_admin.id # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Create Google Calendar event if user has connected their calendar
if booking.user_id:
google_event_id = create_calendar_event(
db=db, booking=booking, user_id=int(booking.user_id) # type: ignore[arg-type]
)
if google_event_id:
booking.google_calendar_event_id = google_event_id # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Log the action
log_action(
db=db,
action="booking_approved",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details=None
)
# Notify the user about approval
if booking.user_id and booking.user:
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_approved",
title="Rezervare Aprobată",
message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost aprobată", # type: ignore[union-attr]
booking_id=booking.id,
)
# Send email notification to user
background_tasks.add_task(
send_booking_notification,
booking,
"approved",
booking.user.email,
booking.user.full_name,
None,
)
elif booking.guest_email:
# Send email notification to anonymous guest
background_tasks.add_task(
send_booking_notification,
booking,
"anonymous_approved",
booking.guest_email,
booking.guest_name or "Guest",
None,
)
return booking
@admin_router.put("/{id}/reject", response_model=BookingResponse)
def reject_booking(
id: int,
reject_data: RejectRequest,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)],
) -> Booking:
"""
Reject a pending booking request (admin only).
The booking must be in "pending" status. Optionally provide a rejection reason.
Request body:
- **reason** (optional): Explanation for rejection
Returns the updated booking with status "rejected".
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Verify manager has access to this booking's property
_verify_manager_booking_access(db, booking, current_admin)
# Check if booking is pending
if booking.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot reject booking with status '{booking.status}'",
)
# Reject booking
booking.status = "rejected" # type: ignore[assignment]
booking.rejection_reason = reject_data.reason # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Log the action
log_action(
db=db,
action="booking_rejected",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={"rejection_reason": reject_data.reason or "Nu a fost specificat"}
)
# Notify the user about rejection
if booking.user_id and booking.user:
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_rejected",
title="Rezervare Respinsă",
message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost respinsă. Motiv: {reject_data.reason or 'Nu a fost specificat'}", # type: ignore[union-attr]
booking_id=booking.id,
)
# Send email notification to user
background_tasks.add_task(
send_booking_notification,
booking,
"rejected",
booking.user.email,
booking.user.full_name,
{"rejection_reason": reject_data.reason},
)
elif booking.guest_email:
background_tasks.add_task(
send_booking_notification,
booking,
"anonymous_rejected",
booking.guest_email,
booking.guest_name or "Guest",
{"rejection_reason": reject_data.reason},
)
return booking
@admin_router.put("/{id}", response_model=BookingResponse)
def admin_update_booking(
id: int,
data: BookingUpdate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)],
) -> Booking:
"""
Update any booking (admin/manager).
Admin can edit any booking (pending or approved), but cannot edit bookings
that have already started.
All fields are optional - only provided fields will be updated.
The booking will be re-validated against all rules after updating fields.
"""
# Get booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Verify manager has access to this booking's property
_verify_manager_booking_access(db, booking, current_admin)
# Check if booking already started (cannot edit past bookings)
if booking.start_datetime < datetime.utcnow() and booking.status == "approved": # type: ignore[operator]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot edit bookings that already started",
)
# Update fields (only if provided)
# Convert datetimes from admin timezone to UTC for storage
user_timezone = current_admin.timezone or "UTC" # type: ignore[attr-defined]
if data.title is not None:
booking.title = data.title # type: ignore[assignment]
if data.description is not None:
booking.description = data.description # type: ignore[assignment]
if data.start_datetime is not None:
booking.start_datetime = convert_to_utc(data.start_datetime, user_timezone) # type: ignore[assignment]
if data.end_datetime is not None:
booking.end_datetime = convert_to_utc(data.end_datetime, user_timezone) # type: ignore[assignment]
# Re-validate booking rules
# Use booking owner's timezone for validation
user_timezone = (booking.user.timezone or "UTC") if booking.user else "UTC"
booking_user_id = int(booking.user_id) if booking.user_id else 0
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
start_datetime=booking.start_datetime, # type: ignore[arg-type]
end_datetime=booking.end_datetime, # type: ignore[arg-type]
user_id=booking_user_id,
exclude_booking_id=booking.id, # Exclude self from overlap check
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0],
)
# Sync with Google Calendar if event exists
if booking.google_calendar_event_id and booking.user_id:
update_calendar_event(
db=db,
booking=booking,
user_id=int(booking.user_id), # type: ignore[arg-type]
event_id=booking.google_calendar_event_id,
)
# Log audit
log_action(
db=db,
action="booking_updated",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={"updated_by": "admin"}
)
db.commit()
db.refresh(booking)
return booking
@admin_router.put("/{id}/cancel", response_model=BookingResponse)
def admin_cancel_booking(
id: int,
cancel_data: AdminCancelRequest,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)],
) -> Booking:
"""
Cancel any booking (admin/manager).
Admin can cancel any booking at any time, regardless of status or timing.
No time restrictions apply (unlike user cancellations).
Request body:
- **cancellation_reason** (optional): Explanation for cancellation
Returns the updated booking with status "canceled".
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Verify manager has access to this booking's property
_verify_manager_booking_access(db, booking, current_admin)
# Admin can cancel any booking (no status check needed)
# Update booking status
booking.status = "canceled" # type: ignore[assignment]
booking.cancellation_reason = cancel_data.cancellation_reason # type: ignore[assignment]
# Delete from Google Calendar if event exists
if booking.google_calendar_event_id and booking.user_id:
delete_calendar_event(
db=db,
event_id=booking.google_calendar_event_id,
user_id=int(booking.user_id), # type: ignore[arg-type]
)
booking.google_calendar_event_id = None # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Log the action
log_action(
db=db,
action="booking_canceled",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={"cancellation_reason": cancel_data.cancellation_reason or "Nu a fost specificat"}
)
# Notify the user about cancellation
if booking.user_id and booking.user:
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_canceled",
title="Rezervare Anulată",
message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost anulată de administrator. Motiv: {cancel_data.cancellation_reason or 'Nu a fost specificat'}", # type: ignore[union-attr]
booking_id=booking.id,
)
# Send email notification to user
background_tasks.add_task(
send_booking_notification,
booking,
"canceled",
booking.user.email,
booking.user.full_name,
{"cancellation_reason": cancel_data.cancellation_reason},
)
return booking
@admin_router.put("/{id}/reschedule", response_model=BookingResponse)
def reschedule_booking(
id: int,
data: BookingReschedule,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)],
) -> Booking:
"""
Reschedule booking to new time slot (admin/manager, drag-and-drop).
Validates the new time slot and updates the booking times.
Only approved bookings that haven't started yet can be rescheduled.
- **start_datetime**: New start time (ISO format)
- **end_datetime**: New end time (ISO format)
Returns the updated booking or an error if validation fails.
"""
# Get booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Verify manager has access to this booking's property
_verify_manager_booking_access(db, booking, current_admin)
# Check if booking already started (cannot reschedule past bookings)
if booking.start_datetime < datetime.utcnow(): # type: ignore[operator]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot reschedule bookings that already started",
)
# Store old times for audit log
old_start = booking.start_datetime
old_end = booking.end_datetime
# Convert input times from admin timezone to UTC
user_timezone = current_admin.timezone or "UTC" # type: ignore[attr-defined]
start_datetime_utc = convert_to_utc(data.start_datetime, user_timezone)
end_datetime_utc = convert_to_utc(data.end_datetime, user_timezone)
# Validate new time slot (using UTC times)
booking_user_id = int(booking.user_id) if booking.user_id else 0
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
user_id=booking_user_id,
exclude_booking_id=booking.id, # Exclude self from overlap check
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=", ".join(errors),
)
# Update times (already UTC)
booking.start_datetime = start_datetime_utc # type: ignore[assignment]
booking.end_datetime = end_datetime_utc # type: ignore[assignment]
# Sync with Google Calendar if event exists
if booking.google_calendar_event_id and booking.user_id:
update_calendar_event(
db=db,
booking=booking,
user_id=int(booking.user_id), # type: ignore[arg-type]
event_id=booking.google_calendar_event_id,
)
# Log audit
log_action(
db=db,
action="booking_rescheduled",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={
"old_start": old_start.isoformat(), # type: ignore[union-attr]
"new_start": data.start_datetime.isoformat(),
"old_end": old_end.isoformat(), # type: ignore[union-attr]
"new_end": data.end_datetime.isoformat(),
},
)
# Notify user about reschedule
if booking.user_id:
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_rescheduled",
title="Rezervare Reprogramată",
message=f"Rezervarea ta pentru {booking.space.name} a fost reprogramată pentru {data.start_datetime.strftime('%d.%m.%Y %H:%M')}",
booking_id=booking.id,
)
db.commit()
db.refresh(booking)
return booking
@admin_router.post("", response_model=BookingResponse, status_code=status.HTTP_201_CREATED)
def admin_create_booking(
booking_data: BookingAdminCreate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_manager_or_superadmin)],
) -> Booking:
"""
Create a booking directly with approved status (admin/manager, bypass approval workflow).
- **space_id**: ID of the space to book
- **user_id**: Optional user ID (defaults to current admin if not provided)
- **start_datetime**: Booking start time (ISO format)
- **end_datetime**: Booking end time (ISO format)
- **title**: Booking title (1-200 characters)
- **description**: Optional description
The booking will be validated against:
- Duration limits (min/max minutes)
- Working hours
- Overlap with other approved bookings only (not pending)
Returns the created booking with status "approved" (bypasses normal approval workflow).
"""
# Validate that space exists
space = db.query(Space).filter(Space.id == booking_data.space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Verify manager has access to this space's property
_verify_manager_space_access(db, space, current_admin)
# Use current admin ID if user_id not provided
target_user_id = booking_data.user_id if booking_data.user_id is not None else int(current_admin.id) # type: ignore[arg-type]
# Validate user exists if user_id was provided
if booking_data.user_id is not None:
user = db.query(User).filter(User.id == booking_data.user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Convert input times from admin timezone to UTC
user_timezone = current_admin.timezone or "UTC" # type: ignore[attr-defined]
start_datetime_utc = convert_to_utc(booking_data.start_datetime, user_timezone)
end_datetime_utc = convert_to_utc(booking_data.end_datetime, user_timezone)
# Validate booking rules (using UTC times, same as create_booking)
errors = validate_booking_rules(
db=db,
space_id=booking_data.space_id,
user_id=target_user_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0], # Return first error
)
# Create booking with approved status (UTC times)
booking = Booking(
user_id=target_user_id,
space_id=booking_data.space_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
title=booking_data.title,
description=booking_data.description,
status="approved", # Direct approval, bypass pending state
approved_by=current_admin.id, # type: ignore[assignment]
created_at=datetime.utcnow(),
)
db.add(booking)
db.commit()
db.refresh(booking)
return booking