"""Booking endpoints.""" from datetime import datetime, time, timedelta from typing import Annotated from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app.core.deps import get_current_admin, get_current_user, get_db from app.models.booking import Booking from app.models.settings import Settings from app.models.space import Space from app.models.user import User from app.services.audit_service import log_action from app.services.email_service import send_booking_notification from app.services.google_calendar_service import ( create_calendar_event, delete_calendar_event, update_calendar_event, ) from app.services.notification_service import create_notification from app.schemas.booking import ( AdminCancelRequest, AvailabilityCheck, BookingAdminCreate, BookingCalendarAdmin, BookingCalendarPublic, BookingCreate, BookingPendingDetail, BookingRecurringCreate, BookingReschedule, BookingResponse, BookingUpdate, BookingWithSpace, ConflictingBooking, RecurringBookingResult, RejectRequest, ) from app.services.booking_service import validate_booking_rules from app.utils.timezone import convert_to_utc router = APIRouter(prefix="/spaces", tags=["bookings"]) bookings_router = APIRouter(prefix="/bookings", tags=["bookings"]) @router.get("/{space_id}/bookings") def get_space_bookings( space_id: int, start: Annotated[datetime, Query(description="Start datetime (ISO format)")], end: Annotated[datetime, Query(description="End datetime (ISO format)")], db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> list[BookingCalendarPublic] | list[BookingCalendarAdmin]: """ Get bookings for a space in a given time range. - **Users**: See only public data (start, end, status, title) - **Admins**: See all details including user info and descriptions Query parameters: - **start**: Start datetime in ISO format (e.g., 2024-01-01T00:00:00) - **end**: End datetime in ISO format (e.g., 2024-01-31T23:59:59) """ # Check if space exists space = db.query(Space).filter(Space.id == space_id).first() if not space: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Space not found", ) # Query bookings in the time range query = db.query(Booking).filter( Booking.space_id == space_id, Booking.start_datetime < end, Booking.end_datetime > start, ) bookings = query.order_by(Booking.start_datetime).all() # Return different schemas based on user role if current_user.role == "admin": return [BookingCalendarAdmin.model_validate(b) for b in bookings] else: return [BookingCalendarPublic.model_validate(b) for b in bookings] @bookings_router.get("/check-availability", response_model=AvailabilityCheck) def check_availability( space_id: Annotated[int, Query(description="Space ID to check")], start_datetime: Annotated[datetime, Query(description="Start datetime (ISO format)")], end_datetime: Annotated[datetime, Query(description="End datetime (ISO format)")], db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> AvailabilityCheck: """ Check if time slot is available (returns warning if conflicts exist). Returns information about conflicting bookings (pending + approved) without blocking the request. This allows users to make informed decisions about their booking times. Query parameters: - **space_id**: ID of the space to check - **start_datetime**: Start datetime in ISO format (e.g., 2024-06-15T10:00:00) - **end_datetime**: End datetime in ISO format (e.g., 2024-06-15T12:00:00) """ from sqlalchemy import and_, or_ from sqlalchemy.orm import joinedload # Check if space exists space = db.query(Space).filter(Space.id == space_id).first() if not space: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Space not found", ) # Find conflicting bookings (approved + pending) conflicts = ( db.query(Booking) .options(joinedload(Booking.user)) .filter( Booking.space_id == space_id, Booking.status.in_(["approved", "pending"]), or_( # Conflict starts during this booking and_( Booking.start_datetime <= start_datetime, Booking.end_datetime > start_datetime, ), # Conflict ends during this booking and_( Booking.start_datetime < end_datetime, Booking.end_datetime >= end_datetime, ), # Conflict is completely within this booking and_( Booking.start_datetime >= start_datetime, Booking.end_datetime <= end_datetime, ), ), ) .all() ) if not conflicts: return AvailabilityCheck( available=True, conflicts=[], message="Time slot is available" ) # Count pending vs approved pending_count = sum(1 for b in conflicts if b.status == "pending") approved_count = sum(1 for b in conflicts if b.status == "approved") message = "" if approved_count > 0: message = f"Time slot has {approved_count} approved booking(s). Choose another time." elif pending_count > 0: message = f"Time slot has {pending_count} pending request(s). Your request will wait for admin review." return AvailabilityCheck( available=approved_count == 0, # Available if no approved conflicts conflicts=[ ConflictingBooking( id=b.id, user_name=b.user.full_name, title=b.title, status=b.status, start_datetime=b.start_datetime, end_datetime=b.end_datetime, ) for b in conflicts ], message=message, ) @bookings_router.get("/my", response_model=list[BookingWithSpace]) def get_my_bookings( status_filter: Annotated[str | None, Query(alias="status")] = None, db: Annotated[Session, Depends(get_db)] = None, # type: ignore[assignment] current_user: Annotated[User, Depends(get_current_user)] = None, # type: ignore[assignment] ) -> list[BookingWithSpace]: """ Get all bookings for the current user. Returns bookings with associated space details, sorted by most recent first. Query parameters: - **status** (optional): Filter by booking status (pending/approved/rejected/canceled) """ # Base query: user's bookings with space join query = ( db.query(Booking) .join(Space, Booking.space_id == Space.id) .filter(Booking.user_id == current_user.id) ) # Apply status filter if provided if status_filter: query = query.filter(Booking.status == status_filter) # Order by most recent first bookings = query.order_by(Booking.created_at.desc()).all() return [BookingWithSpace.model_validate(b) for b in bookings] @bookings_router.get("/my/calendar", response_model=list[BookingWithSpace]) def get_my_bookings_calendar( start: Annotated[datetime, Query(description="Start datetime (ISO format)")], end: Annotated[datetime, Query(description="End datetime (ISO format)")], db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> list[BookingWithSpace]: """ Get user's bookings for calendar view within date range. Query parameters: - **start**: Start datetime in ISO format (e.g., 2024-01-01T00:00:00) - **end**: End datetime in ISO format (e.g., 2024-01-31T23:59:59) Returns bookings with status approved or pending, sorted by start time. """ bookings = ( db.query(Booking) .join(Space, Booking.space_id == Space.id) .filter( Booking.user_id == current_user.id, Booking.start_datetime < end, Booking.end_datetime > start, Booking.status.in_(["approved", "pending"]), ) .order_by(Booking.start_datetime) .all() ) return [BookingWithSpace.model_validate(b) for b in bookings] @bookings_router.post("", response_model=BookingResponse, status_code=status.HTTP_201_CREATED) def create_booking( booking_data: BookingCreate, background_tasks: BackgroundTasks, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> BookingResponse: """ Create a new booking request. - **space_id**: ID of the space to book - **start_datetime**: Booking start time (ISO format, in user's timezone) - **end_datetime**: Booking end time (ISO format, in user's timezone) - **title**: Booking title (1-200 characters) - **description**: Optional description The booking will be validated against: - Duration limits (min/max minutes) - Working hours - Existing bookings (no overlaps) - User's daily booking limit Times are converted from user's timezone to UTC for storage. Returns the created booking with status "pending" (requires admin approval). """ # Validate that space exists space = db.query(Space).filter(Space.id == booking_data.space_id).first() if not space: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Space not found", ) # Convert input times from user timezone to UTC user_timezone = current_user.timezone or "UTC" # type: ignore[attr-defined] start_datetime_utc = convert_to_utc(booking_data.start_datetime, user_timezone) end_datetime_utc = convert_to_utc(booking_data.end_datetime, user_timezone) # Validate booking rules (using UTC times) user_id = int(current_user.id) # type: ignore[arg-type] errors = validate_booking_rules( db=db, space_id=booking_data.space_id, user_id=user_id, start_datetime=start_datetime_utc, end_datetime=end_datetime_utc, user_timezone=user_timezone, ) if errors: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=errors[0], # Return first error ) # Create booking (with UTC times) booking = Booking( user_id=user_id, space_id=booking_data.space_id, start_datetime=start_datetime_utc, end_datetime=end_datetime_utc, title=booking_data.title, description=booking_data.description, status="pending", created_at=datetime.utcnow(), ) db.add(booking) db.commit() db.refresh(booking) # Notify all admins about the new booking request admins = db.query(User).filter(User.role == "admin").all() for admin in admins: create_notification( db=db, user_id=admin.id, # type: ignore[arg-type] type="booking_created", title="Noua Cerere de Rezervare", message=f"Utilizatorul {current_user.full_name} a solicitat rezervarea spațiului {space.name} pentru {booking.start_datetime.strftime('%d.%m.%Y %H:%M')}", booking_id=booking.id, ) # Send email notification to admin background_tasks.add_task( send_booking_notification, booking, "created", admin.email, current_user.full_name, None, ) # Return with timezone conversion return BookingResponse.from_booking_with_timezone(booking, user_timezone) @bookings_router.post( "/recurring", response_model=RecurringBookingResult, status_code=status.HTTP_201_CREATED ) def create_recurring_booking( data: BookingRecurringCreate, background_tasks: BackgroundTasks, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> RecurringBookingResult: """ Create recurring weekly bookings. - **space_id**: ID of the space to book - **start_time**: Time only (e.g., "10:00") - **duration_minutes**: Duration in minutes - **title**: Booking title (1-200 characters) - **description**: Optional description - **recurrence_days**: List of weekday numbers (0=Monday, 6=Sunday) - **start_date**: First occurrence date - **end_date**: Last occurrence date (max 1 year from start) - **skip_conflicts**: Skip conflicted dates (True) or stop on first conflict (False) Returns information about created and skipped bookings. Maximum 52 occurrences allowed. """ created_bookings = [] skipped_dates = [] # Validate that space exists space = db.query(Space).filter(Space.id == data.space_id).first() if not space: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Space not found", ) # Parse time try: hour, minute = map(int, data.start_time.split(':')) except (ValueError, AttributeError): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid start_time format. Use HH:MM (e.g., '10:00')", ) duration = timedelta(minutes=data.duration_minutes) # Get user timezone user_timezone = current_user.timezone or "UTC" # Generate occurrence dates occurrences = [] current_date = data.start_date while current_date <= data.end_date: if current_date.weekday() in data.recurrence_days: occurrences.append(current_date) current_date += timedelta(days=1) # Limit to 52 occurrences if len(occurrences) > 52: occurrences = occurrences[:52] total_requested = len(occurrences) # Create bookings for each occurrence for occurrence_date in occurrences: # Build datetime in user timezone start_datetime = datetime.combine(occurrence_date, time(hour, minute)) end_datetime = start_datetime + duration # Convert to UTC for validation and storage start_datetime_utc = convert_to_utc(start_datetime, user_timezone) end_datetime_utc = convert_to_utc(end_datetime, user_timezone) # Validate user_id = int(current_user.id) # type: ignore[arg-type] errors = validate_booking_rules( db=db, space_id=data.space_id, start_datetime=start_datetime_utc, end_datetime=end_datetime_utc, user_id=user_id, user_timezone=user_timezone, ) if errors: skipped_dates.append({ "date": occurrence_date.isoformat(), "reason": ", ".join(errors), }) if not data.skip_conflicts: # Stop on first conflict break else: # Skip and continue continue # Create booking (store UTC times) booking = Booking( user_id=user_id, space_id=data.space_id, title=data.title, description=data.description, start_datetime=start_datetime_utc, end_datetime=end_datetime_utc, status="pending", created_at=datetime.utcnow(), ) db.add(booking) created_bookings.append(booking) db.commit() # Refresh all created bookings for booking in created_bookings: db.refresh(booking) # Send notifications to admins (in background) if created_bookings: admins = db.query(User).filter(User.role == "admin").all() for admin in admins: background_tasks.add_task( create_notification, db=db, user_id=admin.id, # type: ignore[arg-type] type="booking_created", title="Noi Cereri de Rezervare Recurente", message=f"Utilizatorul {current_user.full_name} a creat {len(created_bookings)} rezervări recurente.", ) return RecurringBookingResult( total_requested=total_requested, total_created=len(created_bookings), total_skipped=len(skipped_dates), created_bookings=[BookingResponse.model_validate(b) for b in created_bookings], skipped_dates=skipped_dates, ) @bookings_router.put("/{id}", response_model=BookingResponse) def update_booking( id: int, data: BookingUpdate, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> Booking: """ Update own booking (pending bookings only). Users can only edit their own bookings, and only if the booking is still pending. All fields are optional - only provided fields will be updated. The booking will be re-validated against all rules after updating fields. """ # Get booking booking = db.query(Booking).filter(Booking.id == id).first() if not booking: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Booking not found", ) # Check ownership if booking.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Can only edit your own bookings", ) # Check status (only pending) if booking.status != "pending": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Can only edit pending bookings", ) # Convert input times from user timezone to UTC user_timezone = current_user.timezone or "UTC" # type: ignore[attr-defined] # Prepare updated values (don't update model yet - validate first) # Convert datetimes to UTC if provided if data.start_datetime is not None: updated_start = convert_to_utc(data.start_datetime, user_timezone) # type: ignore[assignment] else: updated_start = booking.start_datetime # type: ignore[assignment] if data.end_datetime is not None: updated_end = convert_to_utc(data.end_datetime, user_timezone) # type: ignore[assignment] else: updated_end = booking.end_datetime # type: ignore[assignment] # Re-validate booking rules BEFORE updating the model user_id = int(current_user.id) # type: ignore[arg-type] errors = validate_booking_rules( db=db, space_id=int(booking.space_id), # type: ignore[arg-type] start_datetime=updated_start, # type: ignore[arg-type] end_datetime=updated_end, # type: ignore[arg-type] user_id=user_id, exclude_booking_id=booking.id, # Exclude self from overlap check user_timezone=user_timezone, ) if errors: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=errors[0], ) # Update fields (only if provided) - validation passed if data.title is not None: booking.title = data.title # type: ignore[assignment] if data.description is not None: booking.description = data.description # type: ignore[assignment] if data.start_datetime is not None: booking.start_datetime = convert_to_utc(data.start_datetime, user_timezone) # type: ignore[assignment] if data.end_datetime is not None: booking.end_datetime = convert_to_utc(data.end_datetime, user_timezone) # type: ignore[assignment] db.commit() db.refresh(booking) return booking @bookings_router.put("/{id}/cancel", response_model=BookingResponse) def cancel_booking( id: int, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> Booking: """ Cancel own booking with time restrictions. Users can only cancel their own bookings, and only if there is enough time before the booking start (based on min_hours_before_cancel setting). Returns the updated booking with status "canceled". """ # Find booking booking = db.query(Booking).filter(Booking.id == id).first() if not booking: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Booking not found", ) # Check if user owns this booking if booking.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You can only cancel your own bookings", ) # Get settings to check min_hours_before_cancel settings = db.query(Settings).filter(Settings.id == 1).first() if not settings: settings = Settings( id=1, min_duration_minutes=30, max_duration_minutes=480, working_hours_start=8, working_hours_end=20, max_bookings_per_day_per_user=3, min_hours_before_cancel=2, ) db.add(settings) db.commit() db.refresh(settings) # Calculate hours until booking start now = datetime.utcnow() hours_until_start = (booking.start_datetime - now).total_seconds() / 3600 # type: ignore[operator] # Check if there's enough time to cancel if hours_until_start < settings.min_hours_before_cancel: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Cannot cancel booking less than {settings.min_hours_before_cancel} hours before start time", ) # Cancel booking booking.status = "canceled" # type: ignore[assignment] # Delete from Google Calendar if event exists if booking.google_calendar_event_id: delete_calendar_event( db=db, event_id=booking.google_calendar_event_id, user_id=int(current_user.id), # type: ignore[arg-type] ) booking.google_calendar_event_id = None # type: ignore[assignment] db.commit() db.refresh(booking) return booking # Admin endpoints admin_router = APIRouter(prefix="/admin/bookings", tags=["admin"]) @admin_router.get("/pending", response_model=list[BookingPendingDetail]) def get_pending_bookings( space_id: Annotated[int | None, Query()] = None, user_id: Annotated[int | None, Query()] = None, db: Annotated[Session, Depends(get_db)] = None, # type: ignore[assignment] current_admin: Annotated[User, Depends(get_current_admin)] = None, # type: ignore[assignment] ) -> list[BookingPendingDetail]: """ Get all pending booking requests (admin only). Returns pending bookings with user and space details, sorted by creation time (FIFO). Query parameters: - **space_id** (optional): Filter by space ID - **user_id** (optional): Filter by user ID """ # Base query: pending bookings with joins query = ( db.query(Booking) .join(Space, Booking.space_id == Space.id) .join(User, Booking.user_id == User.id) .filter(Booking.status == "pending") ) # Apply filters if provided if space_id is not None: query = query.filter(Booking.space_id == space_id) if user_id is not None: query = query.filter(Booking.user_id == user_id) # Order by created_at ascending (FIFO - oldest first) bookings = query.order_by(Booking.created_at.asc()).all() return [BookingPendingDetail.model_validate(b) for b in bookings] @admin_router.put("/{id}/approve", response_model=BookingResponse) def approve_booking( id: int, background_tasks: BackgroundTasks, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> Booking: """ Approve a pending booking request (admin only). The booking must be in "pending" status. This endpoint will: 1. Re-validate booking rules to prevent race conditions (overlap check) 2. Update status to "approved" if validation passes 3. Record the admin who approved it Returns the updated booking or an error if validation fails. """ # Find booking booking = db.query(Booking).filter(Booking.id == id).first() if not booking: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Booking not found", ) # Check if booking is pending if booking.status != "pending": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Cannot approve booking with status '{booking.status}'", ) # Re-validate booking rules to prevent race conditions # Use booking owner's timezone for validation user_timezone = booking.user.timezone or "UTC" if booking.user else "UTC" errors = validate_booking_rules( db=db, space_id=int(booking.space_id), # type: ignore[arg-type] user_id=int(booking.user_id), # type: ignore[arg-type] start_datetime=booking.start_datetime, # type: ignore[arg-type] end_datetime=booking.end_datetime, # type: ignore[arg-type] exclude_booking_id=int(booking.id), # type: ignore[arg-type] user_timezone=user_timezone, ) if errors: # If overlap or other validation error detected, return 409 Conflict raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=errors[0], ) # Approve booking booking.status = "approved" # type: ignore[assignment] booking.approved_by = current_admin.id # type: ignore[assignment] db.commit() db.refresh(booking) # Create Google Calendar event if user has connected their calendar google_event_id = create_calendar_event( db=db, booking=booking, user_id=int(booking.user_id) # type: ignore[arg-type] ) if google_event_id: booking.google_calendar_event_id = google_event_id # type: ignore[assignment] db.commit() db.refresh(booking) # Log the action log_action( db=db, action="booking_approved", user_id=current_admin.id, target_type="booking", target_id=booking.id, details=None ) # Notify the user about approval create_notification( db=db, user_id=booking.user_id, # type: ignore[arg-type] type="booking_approved", title="Rezervare Aprobată", message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost aprobată", # type: ignore[union-attr] booking_id=booking.id, ) # Send email notification to user background_tasks.add_task( send_booking_notification, booking, "approved", booking.user.email, booking.user.full_name, None, ) return booking @admin_router.put("/{id}/reject", response_model=BookingResponse) def reject_booking( id: int, reject_data: RejectRequest, background_tasks: BackgroundTasks, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> Booking: """ Reject a pending booking request (admin only). The booking must be in "pending" status. Optionally provide a rejection reason. Request body: - **reason** (optional): Explanation for rejection Returns the updated booking with status "rejected". """ # Find booking booking = db.query(Booking).filter(Booking.id == id).first() if not booking: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Booking not found", ) # Check if booking is pending if booking.status != "pending": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Cannot reject booking with status '{booking.status}'", ) # Reject booking booking.status = "rejected" # type: ignore[assignment] booking.rejection_reason = reject_data.reason # type: ignore[assignment] db.commit() db.refresh(booking) # Log the action log_action( db=db, action="booking_rejected", user_id=current_admin.id, target_type="booking", target_id=booking.id, details={"rejection_reason": reject_data.reason or "Nu a fost specificat"} ) # Notify the user about rejection create_notification( db=db, user_id=booking.user_id, # type: ignore[arg-type] type="booking_rejected", title="Rezervare Respinsă", message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost respinsă. Motiv: {reject_data.reason or 'Nu a fost specificat'}", # type: ignore[union-attr] booking_id=booking.id, ) # Send email notification to user background_tasks.add_task( send_booking_notification, booking, "rejected", booking.user.email, booking.user.full_name, {"rejection_reason": reject_data.reason}, ) return booking @admin_router.put("/{id}", response_model=BookingResponse) def admin_update_booking( id: int, data: BookingUpdate, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> Booking: """ Update any booking (admin only). Admin can edit any booking (pending or approved), but cannot edit bookings that have already started. All fields are optional - only provided fields will be updated. The booking will be re-validated against all rules after updating fields. """ # Get booking booking = db.query(Booking).filter(Booking.id == id).first() if not booking: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Booking not found", ) # Check if booking already started (cannot edit past bookings) if booking.start_datetime < datetime.utcnow() and booking.status == "approved": # type: ignore[operator] raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot edit bookings that already started", ) # Update fields (only if provided) if data.title is not None: booking.title = data.title # type: ignore[assignment] if data.description is not None: booking.description = data.description # type: ignore[assignment] if data.start_datetime is not None: booking.start_datetime = data.start_datetime # type: ignore[assignment] if data.end_datetime is not None: booking.end_datetime = data.end_datetime # type: ignore[assignment] # Re-validate booking rules # Use booking owner's timezone for validation user_timezone = booking.user.timezone or "UTC" if booking.user else "UTC" errors = validate_booking_rules( db=db, space_id=int(booking.space_id), # type: ignore[arg-type] start_datetime=booking.start_datetime, # type: ignore[arg-type] end_datetime=booking.end_datetime, # type: ignore[arg-type] user_id=int(booking.user_id), # type: ignore[arg-type] exclude_booking_id=booking.id, # Exclude self from overlap check user_timezone=user_timezone, ) if errors: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=errors[0], ) # Sync with Google Calendar if event exists if booking.google_calendar_event_id: update_calendar_event( db=db, booking=booking, user_id=int(booking.user_id), # type: ignore[arg-type] event_id=booking.google_calendar_event_id, ) # Log audit log_action( db=db, action="booking_updated", user_id=current_admin.id, target_type="booking", target_id=booking.id, details={"updated_by": "admin"} ) db.commit() db.refresh(booking) return booking @admin_router.put("/{id}/cancel", response_model=BookingResponse) def admin_cancel_booking( id: int, cancel_data: AdminCancelRequest, background_tasks: BackgroundTasks, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> Booking: """ Cancel any booking (admin only). Admin can cancel any booking at any time, regardless of status or timing. No time restrictions apply (unlike user cancellations). Request body: - **cancellation_reason** (optional): Explanation for cancellation Returns the updated booking with status "canceled". """ # Find booking booking = db.query(Booking).filter(Booking.id == id).first() if not booking: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Booking not found", ) # Admin can cancel any booking (no status check needed) # Update booking status booking.status = "canceled" # type: ignore[assignment] booking.cancellation_reason = cancel_data.cancellation_reason # type: ignore[assignment] # Delete from Google Calendar if event exists if booking.google_calendar_event_id: delete_calendar_event( db=db, event_id=booking.google_calendar_event_id, user_id=int(booking.user_id), # type: ignore[arg-type] ) booking.google_calendar_event_id = None # type: ignore[assignment] db.commit() db.refresh(booking) # Log the action log_action( db=db, action="booking_canceled", user_id=current_admin.id, target_type="booking", target_id=booking.id, details={"cancellation_reason": cancel_data.cancellation_reason or "Nu a fost specificat"} ) # Notify the user about cancellation create_notification( db=db, user_id=booking.user_id, # type: ignore[arg-type] type="booking_canceled", title="Rezervare Anulată", message=f"Rezervarea ta pentru {booking.space.name} din {booking.start_datetime.strftime('%d.%m.%Y %H:%M')} a fost anulată de administrator. Motiv: {cancel_data.cancellation_reason or 'Nu a fost specificat'}", # type: ignore[union-attr] booking_id=booking.id, ) # Send email notification to user background_tasks.add_task( send_booking_notification, booking, "canceled", booking.user.email, booking.user.full_name, {"cancellation_reason": cancel_data.cancellation_reason}, ) return booking @admin_router.put("/{id}/reschedule", response_model=BookingResponse) def reschedule_booking( id: int, data: BookingReschedule, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> Booking: """ Reschedule booking to new time slot (admin only, drag-and-drop). Validates the new time slot and updates the booking times. Only approved bookings that haven't started yet can be rescheduled. - **start_datetime**: New start time (ISO format) - **end_datetime**: New end time (ISO format) Returns the updated booking or an error if validation fails. """ # Get booking booking = db.query(Booking).filter(Booking.id == id).first() if not booking: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Booking not found", ) # Check if booking already started (cannot reschedule past bookings) if booking.start_datetime < datetime.utcnow(): # type: ignore[operator] raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot reschedule bookings that already started", ) # Store old times for audit log old_start = booking.start_datetime old_end = booking.end_datetime # Validate new time slot # Use booking owner's timezone for validation user_timezone = booking.user.timezone or "UTC" if booking.user else "UTC" errors = validate_booking_rules( db=db, space_id=int(booking.space_id), # type: ignore[arg-type] start_datetime=data.start_datetime, end_datetime=data.end_datetime, user_id=int(booking.user_id), # type: ignore[arg-type] exclude_booking_id=booking.id, # Exclude self from overlap check user_timezone=user_timezone, ) if errors: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=", ".join(errors), ) # Update times booking.start_datetime = data.start_datetime # type: ignore[assignment] booking.end_datetime = data.end_datetime # type: ignore[assignment] # Sync with Google Calendar if event exists if booking.google_calendar_event_id: update_calendar_event( db=db, booking=booking, user_id=int(booking.user_id), # type: ignore[arg-type] event_id=booking.google_calendar_event_id, ) # Log audit log_action( db=db, action="booking_rescheduled", user_id=current_admin.id, target_type="booking", target_id=booking.id, details={ "old_start": old_start.isoformat(), # type: ignore[union-attr] "new_start": data.start_datetime.isoformat(), "old_end": old_end.isoformat(), # type: ignore[union-attr] "new_end": data.end_datetime.isoformat(), }, ) # Notify user about reschedule create_notification( db=db, user_id=booking.user_id, # type: ignore[arg-type] type="booking_rescheduled", title="Rezervare Reprogramată", message=f"Rezervarea ta pentru {booking.space.name} a fost reprogramată pentru {data.start_datetime.strftime('%d.%m.%Y %H:%M')}", booking_id=booking.id, ) db.commit() db.refresh(booking) return booking @admin_router.post("", response_model=BookingResponse, status_code=status.HTTP_201_CREATED) def admin_create_booking( booking_data: BookingAdminCreate, db: Annotated[Session, Depends(get_db)], current_admin: Annotated[User, Depends(get_current_admin)], ) -> Booking: """ Create a booking directly with approved status (admin only, bypass approval workflow). - **space_id**: ID of the space to book - **user_id**: Optional user ID (defaults to current admin if not provided) - **start_datetime**: Booking start time (ISO format) - **end_datetime**: Booking end time (ISO format) - **title**: Booking title (1-200 characters) - **description**: Optional description The booking will be validated against: - Duration limits (min/max minutes) - Working hours - Overlap with other approved bookings only (not pending) Returns the created booking with status "approved" (bypasses normal approval workflow). """ # Validate that space exists space = db.query(Space).filter(Space.id == booking_data.space_id).first() if not space: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Space not found", ) # Use current admin ID if user_id not provided target_user_id = booking_data.user_id if booking_data.user_id is not None else int(current_admin.id) # type: ignore[arg-type] # Validate user exists if user_id was provided if booking_data.user_id is not None: user = db.query(User).filter(User.id == booking_data.user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Validate booking rules (we need to check overlap with approved bookings only) # For admin direct booking, we need custom validation: # 1. Duration limits # 2. Working hours # 3. Overlap with approved bookings only (not pending) from app.models.settings import Settings from sqlalchemy import and_ errors = [] # Fetch settings settings = db.query(Settings).filter(Settings.id == 1).first() if not settings: settings = Settings( id=1, min_duration_minutes=30, max_duration_minutes=480, working_hours_start=8, working_hours_end=20, max_bookings_per_day_per_user=3, min_hours_before_cancel=2, ) db.add(settings) db.commit() db.refresh(settings) # a) Validate duration in range duration_minutes = (booking_data.end_datetime - booking_data.start_datetime).total_seconds() / 60 if ( duration_minutes < settings.min_duration_minutes or duration_minutes > settings.max_duration_minutes ): errors.append( f"Durata rezervării trebuie să fie între {settings.min_duration_minutes} " f"și {settings.max_duration_minutes} minute" ) # b) Validate working hours if ( booking_data.start_datetime.hour < settings.working_hours_start or booking_data.end_datetime.hour > settings.working_hours_end ): errors.append( f"Rezervările sunt permise doar între {settings.working_hours_start}:00 " f"și {settings.working_hours_end}:00" ) # c) Check for overlapping approved bookings only overlapping_bookings = db.query(Booking).filter( Booking.space_id == booking_data.space_id, Booking.status == "approved", # Only check approved bookings and_( Booking.start_datetime < booking_data.end_datetime, Booking.end_datetime > booking_data.start_datetime, ), ).first() if overlapping_bookings: errors.append("Spațiul este deja rezervat în acest interval") if errors: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=errors[0], # Return first error ) # Create booking with approved status booking = Booking( user_id=target_user_id, space_id=booking_data.space_id, start_datetime=booking_data.start_datetime, end_datetime=booking_data.end_datetime, title=booking_data.title, description=booking_data.description, status="approved", # Direct approval, bypass pending state approved_by=current_admin.id, # type: ignore[assignment] created_at=datetime.utcnow(), ) db.add(booking) db.commit() db.refresh(booking) return booking