Files
space-booking/backend/app/api/bookings.py
Claude Agent 9c2846cf00 feat: add per-space timezone settings and improve booking management
- Add timezone configuration per space with fallback to system default
- Implement timezone-aware datetime display and editing across frontend
- Add migration for per_space_settings table
- Update booking service to handle timezone conversions properly
- Improve .gitignore to exclude build artifacts
- Add comprehensive testing documentation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-11 15:54:51 +00:00

1186 lines
40 KiB
Python

"""Booking endpoints."""
from datetime import datetime, time, timedelta
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.core.deps import get_current_admin, get_current_user, get_db
from app.models.booking import Booking
from app.models.settings import Settings
from app.models.space import Space
from app.models.user import User
from app.services.audit_service import log_action
from app.services.email_service import send_booking_notification
from app.services.google_calendar_service import create_calendar_event, delete_calendar_event
from app.services.notification_service import create_notification
from app.schemas.booking import (
AdminCancelRequest,
AvailabilityCheck,
BookingAdminCreate,
BookingCalendarAdmin,
BookingCalendarPublic,
BookingCreate,
BookingPendingDetail,
BookingRecurringCreate,
BookingReschedule,
BookingResponse,
BookingUpdate,
BookingWithSpace,
ConflictingBooking,
RecurringBookingResult,
RejectRequest,
)
from app.services.booking_service import validate_booking_rules
from app.utils.timezone import convert_to_utc
router = APIRouter(prefix="/spaces", tags=["bookings"])
bookings_router = APIRouter(prefix="/bookings", tags=["bookings"])
@router.get("/{space_id}/bookings")
def get_space_bookings(
space_id: int,
start: Annotated[datetime, Query(description="Start datetime (ISO format)")],
end: Annotated[datetime, Query(description="End datetime (ISO format)")],
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> list[BookingCalendarPublic] | list[BookingCalendarAdmin]:
"""
Get bookings for a space in a given time range.
- **Users**: See only public data (start, end, status, title)
- **Admins**: See all details including user info and descriptions
Query parameters:
- **start**: Start datetime in ISO format (e.g., 2024-01-01T00:00:00)
- **end**: End datetime in ISO format (e.g., 2024-01-31T23:59:59)
"""
# Check if space exists
space = db.query(Space).filter(Space.id == space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Query bookings in the time range
query = db.query(Booking).filter(
Booking.space_id == space_id,
Booking.start_datetime < end,
Booking.end_datetime > start,
)
bookings = query.order_by(Booking.start_datetime).all()
# Return different schemas based on user role
if current_user.role == "admin":
return [BookingCalendarAdmin.model_validate(b) for b in bookings]
else:
return [BookingCalendarPublic.model_validate(b) for b in bookings]
@bookings_router.get("/check-availability", response_model=AvailabilityCheck)
def check_availability(
space_id: Annotated[int, Query(description="Space ID to check")],
start_datetime: Annotated[datetime, Query(description="Start datetime (ISO format)")],
end_datetime: Annotated[datetime, Query(description="End datetime (ISO format)")],
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> AvailabilityCheck:
"""
Check if time slot is available (returns warning if conflicts exist).
Returns information about conflicting bookings (pending + approved) without
blocking the request. This allows users to make informed decisions about
their booking times.
Query parameters:
- **space_id**: ID of the space to check
- **start_datetime**: Start datetime in ISO format (e.g., 2024-06-15T10:00:00)
- **end_datetime**: End datetime in ISO format (e.g., 2024-06-15T12:00:00)
"""
from sqlalchemy import and_, or_
from sqlalchemy.orm import joinedload
# Check if space exists
space = db.query(Space).filter(Space.id == space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Find conflicting bookings (approved + pending)
conflicts = (
db.query(Booking)
.options(joinedload(Booking.user))
.filter(
Booking.space_id == space_id,
Booking.status.in_(["approved", "pending"]),
or_(
# Conflict starts during this booking
and_(
Booking.start_datetime <= start_datetime,
Booking.end_datetime > start_datetime,
),
# Conflict ends during this booking
and_(
Booking.start_datetime < end_datetime,
Booking.end_datetime >= end_datetime,
),
# Conflict is completely within this booking
and_(
Booking.start_datetime >= start_datetime,
Booking.end_datetime <= end_datetime,
),
),
)
.all()
)
if not conflicts:
return AvailabilityCheck(
available=True, conflicts=[], message="Time slot is available"
)
# Count pending vs approved
pending_count = sum(1 for b in conflicts if b.status == "pending")
approved_count = sum(1 for b in conflicts if b.status == "approved")
message = ""
if approved_count > 0:
message = f"Time slot has {approved_count} approved booking(s). Choose another time."
elif pending_count > 0:
message = f"Time slot has {pending_count} pending request(s). Your request will wait for admin review."
return AvailabilityCheck(
available=approved_count == 0, # Available if no approved conflicts
conflicts=[
ConflictingBooking(
id=b.id,
user_name=b.user.full_name,
title=b.title,
status=b.status,
start_datetime=b.start_datetime,
end_datetime=b.end_datetime,
)
for b in conflicts
],
message=message,
)
@bookings_router.get("/my", response_model=list[BookingWithSpace])
def get_my_bookings(
status_filter: Annotated[str | None, Query(alias="status")] = None,
db: Annotated[Session, Depends(get_db)] = None, # type: ignore[assignment]
current_user: Annotated[User, Depends(get_current_user)] = None, # type: ignore[assignment]
) -> list[BookingWithSpace]:
"""
Get all bookings for the current user.
Returns bookings with associated space details, sorted by most recent first.
Query parameters:
- **status** (optional): Filter by booking status (pending/approved/rejected/canceled)
"""
# Base query: user's bookings with space join
query = (
db.query(Booking)
.join(Space, Booking.space_id == Space.id)
.filter(Booking.user_id == current_user.id)
)
# Apply status filter if provided
if status_filter:
query = query.filter(Booking.status == status_filter)
# Order by most recent first
bookings = query.order_by(Booking.created_at.desc()).all()
return [BookingWithSpace.model_validate(b) for b in bookings]
@bookings_router.post("", response_model=BookingResponse, status_code=status.HTTP_201_CREATED)
def create_booking(
booking_data: BookingCreate,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> BookingResponse:
"""
Create a new booking request.
- **space_id**: ID of the space to book
- **start_datetime**: Booking start time (ISO format, in user's timezone)
- **end_datetime**: Booking end time (ISO format, in user's timezone)
- **title**: Booking title (1-200 characters)
- **description**: Optional description
The booking will be validated against:
- Duration limits (min/max minutes)
- Working hours
- Existing bookings (no overlaps)
- User's daily booking limit
Times are converted from user's timezone to UTC for storage.
Returns the created booking with status "pending" (requires admin approval).
"""
# Validate that space exists
space = db.query(Space).filter(Space.id == booking_data.space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Convert input times from user timezone to UTC
user_timezone = current_user.timezone or "UTC" # type: ignore[attr-defined]
start_datetime_utc = convert_to_utc(booking_data.start_datetime, user_timezone)
end_datetime_utc = convert_to_utc(booking_data.end_datetime, user_timezone)
# Validate booking rules (using UTC times)
user_id = int(current_user.id) # type: ignore[arg-type]
errors = validate_booking_rules(
db=db,
space_id=booking_data.space_id,
user_id=user_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0], # Return first error
)
# Create booking (with UTC times)
booking = Booking(
user_id=user_id,
space_id=booking_data.space_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
title=booking_data.title,
description=booking_data.description,
status="pending",
created_at=datetime.utcnow(),
)
db.add(booking)
db.commit()
db.refresh(booking)
# Notify all admins about the new booking request
admins = db.query(User).filter(User.role == "admin").all()
for admin in admins:
create_notification(
db=db,
user_id=admin.id, # type: ignore[arg-type]
type="booking_created",
title="Noua Cerere de Rezervare",
message=f"Utilizatorul {current_user.full_name} a solicitat rezervarea spațiului {space.name} pentru {booking.start_datetime.strftime('%d.%m.%Y %H:%M')}",
booking_id=booking.id,
)
# Send email notification to admin
background_tasks.add_task(
send_booking_notification,
booking,
"created",
admin.email,
current_user.full_name,
None,
)
# Return with timezone conversion
return BookingResponse.from_booking_with_timezone(booking, user_timezone)
@bookings_router.post(
"/recurring", response_model=RecurringBookingResult, status_code=status.HTTP_201_CREATED
)
def create_recurring_booking(
data: BookingRecurringCreate,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> RecurringBookingResult:
"""
Create recurring weekly bookings.
- **space_id**: ID of the space to book
- **start_time**: Time only (e.g., "10:00")
- **duration_minutes**: Duration in minutes
- **title**: Booking title (1-200 characters)
- **description**: Optional description
- **recurrence_days**: List of weekday numbers (0=Monday, 6=Sunday)
- **start_date**: First occurrence date
- **end_date**: Last occurrence date (max 1 year from start)
- **skip_conflicts**: Skip conflicted dates (True) or stop on first conflict (False)
Returns information about created and skipped bookings.
Maximum 52 occurrences allowed.
"""
created_bookings = []
skipped_dates = []
# Validate that space exists
space = db.query(Space).filter(Space.id == data.space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Parse time
try:
hour, minute = map(int, data.start_time.split(':'))
except (ValueError, AttributeError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid start_time format. Use HH:MM (e.g., '10:00')",
)
duration = timedelta(minutes=data.duration_minutes)
# Get user timezone
user_timezone = current_user.timezone or "UTC"
# Generate occurrence dates
occurrences = []
current_date = data.start_date
while current_date <= data.end_date:
if current_date.weekday() in data.recurrence_days:
occurrences.append(current_date)
current_date += timedelta(days=1)
# Limit to 52 occurrences
if len(occurrences) > 52:
occurrences = occurrences[:52]
total_requested = len(occurrences)
# Create bookings for each occurrence
for occurrence_date in occurrences:
# Build datetime in user timezone
start_datetime = datetime.combine(occurrence_date, time(hour, minute))
end_datetime = start_datetime + duration
# Convert to UTC for validation and storage
start_datetime_utc = convert_to_utc(start_datetime, user_timezone)
end_datetime_utc = convert_to_utc(end_datetime, user_timezone)
# Validate
user_id = int(current_user.id) # type: ignore[arg-type]
errors = validate_booking_rules(
db=db,
space_id=data.space_id,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
user_id=user_id,
user_timezone=user_timezone,
)
if errors:
skipped_dates.append({
"date": occurrence_date.isoformat(),
"reason": ", ".join(errors),
})
if not data.skip_conflicts:
# Stop on first conflict
break
else:
# Skip and continue
continue
# Create booking (store UTC times)
booking = Booking(
user_id=user_id,
space_id=data.space_id,
title=data.title,
description=data.description,
start_datetime=start_datetime_utc,
end_datetime=end_datetime_utc,
status="pending",
created_at=datetime.utcnow(),
)
db.add(booking)
created_bookings.append(booking)
db.commit()
# Refresh all created bookings
for booking in created_bookings:
db.refresh(booking)
# Send notifications to admins (in background)
if created_bookings:
admins = db.query(User).filter(User.role == "admin").all()
for admin in admins:
background_tasks.add_task(
create_notification,
db=db,
user_id=admin.id, # type: ignore[arg-type]
type="booking_created",
title="Noi Cereri de Rezervare Recurente",
message=f"Utilizatorul {current_user.full_name} a creat {len(created_bookings)} rezervări recurente.",
)
return RecurringBookingResult(
total_requested=total_requested,
total_created=len(created_bookings),
total_skipped=len(skipped_dates),
created_bookings=[BookingResponse.model_validate(b) for b in created_bookings],
skipped_dates=skipped_dates,
)
@bookings_router.put("/{id}", response_model=BookingResponse)
def update_booking(
id: int,
data: BookingUpdate,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Booking:
"""
Update own booking (pending bookings only).
Users can only edit their own bookings, and only if the booking is still pending.
All fields are optional - only provided fields will be updated.
The booking will be re-validated against all rules after updating fields.
"""
# Get booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check ownership
if booking.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Can only edit your own bookings",
)
# Check status (only pending)
if booking.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Can only edit pending bookings",
)
# Convert input times from user timezone to UTC
user_timezone = current_user.timezone or "UTC" # type: ignore[attr-defined]
# Prepare updated values (don't update model yet - validate first)
# Convert datetimes to UTC if provided
if data.start_datetime is not None:
updated_start = convert_to_utc(data.start_datetime, user_timezone) # type: ignore[assignment]
else:
updated_start = booking.start_datetime # type: ignore[assignment]
if data.end_datetime is not None:
updated_end = convert_to_utc(data.end_datetime, user_timezone) # type: ignore[assignment]
else:
updated_end = booking.end_datetime # type: ignore[assignment]
# Re-validate booking rules BEFORE updating the model
user_id = int(current_user.id) # type: ignore[arg-type]
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
start_datetime=updated_start, # type: ignore[arg-type]
end_datetime=updated_end, # type: ignore[arg-type]
user_id=user_id,
exclude_booking_id=booking.id, # Exclude self from overlap check
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0],
)
# Update fields (only if provided) - validation passed
if data.title is not None:
booking.title = data.title # type: ignore[assignment]
if data.description is not None:
booking.description = data.description # type: ignore[assignment]
if data.start_datetime is not None:
booking.start_datetime = convert_to_utc(data.start_datetime, user_timezone) # type: ignore[assignment]
if data.end_datetime is not None:
booking.end_datetime = convert_to_utc(data.end_datetime, user_timezone) # type: ignore[assignment]
db.commit()
db.refresh(booking)
return booking
@bookings_router.put("/{id}/cancel", response_model=BookingResponse)
def cancel_booking(
id: int,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Booking:
"""
Cancel own booking with time restrictions.
Users can only cancel their own bookings, and only if there is enough time
before the booking start (based on min_hours_before_cancel setting).
Returns the updated booking with status "canceled".
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check if user owns this booking
if booking.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only cancel your own bookings",
)
# Get settings to check min_hours_before_cancel
settings = db.query(Settings).filter(Settings.id == 1).first()
if not settings:
settings = Settings(
id=1,
min_duration_minutes=30,
max_duration_minutes=480,
working_hours_start=8,
working_hours_end=20,
max_bookings_per_day_per_user=3,
min_hours_before_cancel=2,
)
db.add(settings)
db.commit()
db.refresh(settings)
# Calculate hours until booking start
now = datetime.utcnow()
hours_until_start = (booking.start_datetime - now).total_seconds() / 3600 # type: ignore[operator]
# Check if there's enough time to cancel
if hours_until_start < settings.min_hours_before_cancel:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Cannot cancel booking less than {settings.min_hours_before_cancel} hours before start time",
)
# Cancel booking
booking.status = "canceled" # type: ignore[assignment]
# Delete from Google Calendar if event exists
if booking.google_calendar_event_id:
delete_calendar_event(
db=db,
event_id=booking.google_calendar_event_id,
user_id=int(current_user.id), # type: ignore[arg-type]
)
booking.google_calendar_event_id = None # type: ignore[assignment]
db.commit()
db.refresh(booking)
return booking
# Admin endpoints
admin_router = APIRouter(prefix="/admin/bookings", tags=["admin"])
@admin_router.get("/pending", response_model=list[BookingPendingDetail])
def get_pending_bookings(
space_id: Annotated[int | None, Query()] = None,
user_id: Annotated[int | None, Query()] = None,
db: Annotated[Session, Depends(get_db)] = None, # type: ignore[assignment]
current_admin: Annotated[User, Depends(get_current_admin)] = None, # type: ignore[assignment]
) -> list[BookingPendingDetail]:
"""
Get all pending booking requests (admin only).
Returns pending bookings with user and space details, sorted by creation time (FIFO).
Query parameters:
- **space_id** (optional): Filter by space ID
- **user_id** (optional): Filter by user ID
"""
# Base query: pending bookings with joins
query = (
db.query(Booking)
.join(Space, Booking.space_id == Space.id)
.join(User, Booking.user_id == User.id)
.filter(Booking.status == "pending")
)
# Apply filters if provided
if space_id is not None:
query = query.filter(Booking.space_id == space_id)
if user_id is not None:
query = query.filter(Booking.user_id == user_id)
# Order by created_at ascending (FIFO - oldest first)
bookings = query.order_by(Booking.created_at.asc()).all()
return [BookingPendingDetail.model_validate(b) for b in bookings]
@admin_router.put("/{id}/approve", response_model=BookingResponse)
def approve_booking(
id: int,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Booking:
"""
Approve a pending booking request (admin only).
The booking must be in "pending" status. This endpoint will:
1. Re-validate booking rules to prevent race conditions (overlap check)
2. Update status to "approved" if validation passes
3. Record the admin who approved it
Returns the updated booking or an error if validation fails.
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check if booking is pending
if booking.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot approve booking with status '{booking.status}'",
)
# Re-validate booking rules to prevent race conditions
# Use booking owner's timezone for validation
user_timezone = booking.user.timezone or "UTC" if booking.user else "UTC"
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
user_id=int(booking.user_id), # type: ignore[arg-type]
start_datetime=booking.start_datetime, # type: ignore[arg-type]
end_datetime=booking.end_datetime, # type: ignore[arg-type]
exclude_booking_id=int(booking.id), # type: ignore[arg-type]
user_timezone=user_timezone,
)
if errors:
# If overlap or other validation error detected, return 409 Conflict
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=errors[0],
)
# Approve booking
booking.status = "approved" # type: ignore[assignment]
booking.approved_by = current_admin.id # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Create Google Calendar event if user has connected their calendar
google_event_id = create_calendar_event(
db=db, booking=booking, user_id=int(booking.user_id) # type: ignore[arg-type]
)
if google_event_id:
booking.google_calendar_event_id = google_event_id # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Log the action
log_action(
db=db,
action="booking_approved",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details=None
)
# Notify the user about approval
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_approved",
title="Rezervare Aprobată",
message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost aprobată", # type: ignore[union-attr]
booking_id=booking.id,
)
# Send email notification to user
background_tasks.add_task(
send_booking_notification,
booking,
"approved",
booking.user.email,
booking.user.full_name,
None,
)
return booking
@admin_router.put("/{id}/reject", response_model=BookingResponse)
def reject_booking(
id: int,
reject_data: RejectRequest,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Booking:
"""
Reject a pending booking request (admin only).
The booking must be in "pending" status. Optionally provide a rejection reason.
Request body:
- **reason** (optional): Explanation for rejection
Returns the updated booking with status "rejected".
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check if booking is pending
if booking.status != "pending":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot reject booking with status '{booking.status}'",
)
# Reject booking
booking.status = "rejected" # type: ignore[assignment]
booking.rejection_reason = reject_data.reason # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Log the action
log_action(
db=db,
action="booking_rejected",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={"rejection_reason": reject_data.reason or "Nu a fost specificat"}
)
# Notify the user about rejection
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_rejected",
title="Rezervare Respinsă",
message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost respinsă. Motiv: {reject_data.reason or 'Nu a fost specificat'}", # type: ignore[union-attr]
booking_id=booking.id,
)
# Send email notification to user
background_tasks.add_task(
send_booking_notification,
booking,
"rejected",
booking.user.email,
booking.user.full_name,
{"rejection_reason": reject_data.reason},
)
return booking
@admin_router.put("/{id}", response_model=BookingResponse)
def admin_update_booking(
id: int,
data: BookingUpdate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Booking:
"""
Update any booking (admin only).
Admin can edit any booking (pending or approved), but cannot edit bookings
that have already started.
All fields are optional - only provided fields will be updated.
The booking will be re-validated against all rules after updating fields.
"""
# Get booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check if booking already started (cannot edit past bookings)
if booking.start_datetime < datetime.utcnow() and booking.status == "approved": # type: ignore[operator]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot edit bookings that already started",
)
# Update fields (only if provided)
if data.title is not None:
booking.title = data.title # type: ignore[assignment]
if data.description is not None:
booking.description = data.description # type: ignore[assignment]
if data.start_datetime is not None:
booking.start_datetime = data.start_datetime # type: ignore[assignment]
if data.end_datetime is not None:
booking.end_datetime = data.end_datetime # type: ignore[assignment]
# Re-validate booking rules
# Use booking owner's timezone for validation
user_timezone = booking.user.timezone or "UTC" if booking.user else "UTC"
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
start_datetime=booking.start_datetime, # type: ignore[arg-type]
end_datetime=booking.end_datetime, # type: ignore[arg-type]
user_id=int(booking.user_id), # type: ignore[arg-type]
exclude_booking_id=booking.id, # Exclude self from overlap check
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0],
)
# Log audit
log_action(
db=db,
action="booking_updated",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={"updated_by": "admin"}
)
db.commit()
db.refresh(booking)
return booking
@admin_router.put("/{id}/cancel", response_model=BookingResponse)
def admin_cancel_booking(
id: int,
cancel_data: AdminCancelRequest,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Booking:
"""
Cancel any booking (admin only).
Admin can cancel any booking at any time, regardless of status or timing.
No time restrictions apply (unlike user cancellations).
Request body:
- **cancellation_reason** (optional): Explanation for cancellation
Returns the updated booking with status "canceled".
"""
# Find booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Admin can cancel any booking (no status check needed)
# Update booking status
booking.status = "canceled" # type: ignore[assignment]
booking.cancellation_reason = cancel_data.cancellation_reason # type: ignore[assignment]
# Delete from Google Calendar if event exists
if booking.google_calendar_event_id:
delete_calendar_event(
db=db,
event_id=booking.google_calendar_event_id,
user_id=int(booking.user_id), # type: ignore[arg-type]
)
booking.google_calendar_event_id = None # type: ignore[assignment]
db.commit()
db.refresh(booking)
# Log the action
log_action(
db=db,
action="booking_canceled",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={"cancellation_reason": cancel_data.cancellation_reason or "Nu a fost specificat"}
)
# Notify the user about cancellation
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_canceled",
title="Rezervare Anulată",
message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost anulată de administrator. Motiv: {cancel_data.cancellation_reason or 'Nu a fost specificat'}", # type: ignore[union-attr]
booking_id=booking.id,
)
# Send email notification to user
background_tasks.add_task(
send_booking_notification,
booking,
"canceled",
booking.user.email,
booking.user.full_name,
{"cancellation_reason": cancel_data.cancellation_reason},
)
return booking
@admin_router.put("/{id}/reschedule", response_model=BookingResponse)
def reschedule_booking(
id: int,
data: BookingReschedule,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Booking:
"""
Reschedule booking to new time slot (admin only, drag-and-drop).
Validates the new time slot and updates the booking times.
Only approved bookings that haven't started yet can be rescheduled.
- **start_datetime**: New start time (ISO format)
- **end_datetime**: New end time (ISO format)
Returns the updated booking or an error if validation fails.
"""
# Get booking
booking = db.query(Booking).filter(Booking.id == id).first()
if not booking:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Booking not found",
)
# Check if booking already started (cannot reschedule past bookings)
if booking.start_datetime < datetime.utcnow(): # type: ignore[operator]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot reschedule bookings that already started",
)
# Store old times for audit log
old_start = booking.start_datetime
old_end = booking.end_datetime
# Validate new time slot
# Use booking owner's timezone for validation
user_timezone = booking.user.timezone or "UTC" if booking.user else "UTC"
errors = validate_booking_rules(
db=db,
space_id=int(booking.space_id), # type: ignore[arg-type]
start_datetime=data.start_datetime,
end_datetime=data.end_datetime,
user_id=int(booking.user_id), # type: ignore[arg-type]
exclude_booking_id=booking.id, # Exclude self from overlap check
user_timezone=user_timezone,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=", ".join(errors),
)
# Update times
booking.start_datetime = data.start_datetime # type: ignore[assignment]
booking.end_datetime = data.end_datetime # type: ignore[assignment]
# Log audit
log_action(
db=db,
action="booking_rescheduled",
user_id=current_admin.id,
target_type="booking",
target_id=booking.id,
details={
"old_start": old_start.isoformat(), # type: ignore[union-attr]
"new_start": data.start_datetime.isoformat(),
"old_end": old_end.isoformat(), # type: ignore[union-attr]
"new_end": data.end_datetime.isoformat(),
},
)
# Notify user about reschedule
create_notification(
db=db,
user_id=booking.user_id, # type: ignore[arg-type]
type="booking_rescheduled",
title="Rezervare Reprogramată",
message=f"Rezervarea ta pentru {booking.space.name} a fost reprogramată pentru {data.start_datetime.strftime('%d.%m.%Y %H:%M')}",
booking_id=booking.id,
)
db.commit()
db.refresh(booking)
return booking
@admin_router.post("", response_model=BookingResponse, status_code=status.HTTP_201_CREATED)
def admin_create_booking(
booking_data: BookingAdminCreate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Booking:
"""
Create a booking directly with approved status (admin only, bypass approval workflow).
- **space_id**: ID of the space to book
- **user_id**: Optional user ID (defaults to current admin if not provided)
- **start_datetime**: Booking start time (ISO format)
- **end_datetime**: Booking end time (ISO format)
- **title**: Booking title (1-200 characters)
- **description**: Optional description
The booking will be validated against:
- Duration limits (min/max minutes)
- Working hours
- Overlap with other approved bookings only (not pending)
Returns the created booking with status "approved" (bypasses normal approval workflow).
"""
# Validate that space exists
space = db.query(Space).filter(Space.id == booking_data.space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Use current admin ID if user_id not provided
target_user_id = booking_data.user_id if booking_data.user_id is not None else int(current_admin.id) # type: ignore[arg-type]
# Validate user exists if user_id was provided
if booking_data.user_id is not None:
user = db.query(User).filter(User.id == booking_data.user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Validate booking rules (we need to check overlap with approved bookings only)
# For admin direct booking, we need custom validation:
# 1. Duration limits
# 2. Working hours
# 3. Overlap with approved bookings only (not pending)
from app.models.settings import Settings
from sqlalchemy import and_
errors = []
# Fetch settings
settings = db.query(Settings).filter(Settings.id == 1).first()
if not settings:
settings = Settings(
id=1,
min_duration_minutes=30,
max_duration_minutes=480,
working_hours_start=8,
working_hours_end=20,
max_bookings_per_day_per_user=3,
min_hours_before_cancel=2,
)
db.add(settings)
db.commit()
db.refresh(settings)
# a) Validate duration in range
duration_minutes = (booking_data.end_datetime - booking_data.start_datetime).total_seconds() / 60
if (
duration_minutes < settings.min_duration_minutes
or duration_minutes > settings.max_duration_minutes
):
errors.append(
f"Durata rezervării trebuie să fie între {settings.min_duration_minutes} "
f"și {settings.max_duration_minutes} minute"
)
# b) Validate working hours
if (
booking_data.start_datetime.hour < settings.working_hours_start
or booking_data.end_datetime.hour > settings.working_hours_end
):
errors.append(
f"Rezervările sunt permise doar între {settings.working_hours_start}:00 "
f"și {settings.working_hours_end}:00"
)
# c) Check for overlapping approved bookings only
overlapping_bookings = db.query(Booking).filter(
Booking.space_id == booking_data.space_id,
Booking.status == "approved", # Only check approved bookings
and_(
Booking.start_datetime < booking_data.end_datetime,
Booking.end_datetime > booking_data.start_datetime,
),
).first()
if overlapping_bookings:
errors.append("Spațiul este deja rezervat în acest interval")
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0], # Return first error
)
# Create booking with approved status
booking = Booking(
user_id=target_user_id,
space_id=booking_data.space_id,
start_datetime=booking_data.start_datetime,
end_datetime=booking_data.end_datetime,
title=booking_data.title,
description=booking_data.description,
status="approved", # Direct approval, bypass pending state
approved_by=current_admin.id, # type: ignore[assignment]
created_at=datetime.utcnow(),
)
db.add(booking)
db.commit()
db.refresh(booking)
return booking