feat: Space Booking System - MVP complet

Sistem web pentru rezervarea de birouri și săli de ședință
cu flux de aprobare administrativă.

Stack: FastAPI + Vue.js 3 + SQLite + TypeScript

Features implementate:
- Autentificare JWT + Self-registration cu email verification
- CRUD Spații, Utilizatori, Settings (Admin)
- Calendar interactiv (FullCalendar) cu drag-and-drop
- Creare rezervări cu validare (durată, program, overlap, max/zi)
- Rezervări recurente (săptămânal)
- Admin: aprobare/respingere/anulare cereri
- Admin: creare directă rezervări (bypass approval)
- Admin: editare orice rezervare
- User: editare/anulare rezervări proprii
- Notificări in-app (bell icon + dropdown)
- Notificări email (async SMTP cu BackgroundTasks)
- Jurnal acțiuni administrative (audit log)
- Rapoarte avansate (utilizare, top users, approval rate)
- Șabloane rezervări (booking templates)
- Atașamente fișiere (upload/download)
- Conflict warnings (verificare disponibilitate real-time)
- Integrare Google Calendar (OAuth2)
- Suport timezone (UTC storage + user preference)
- 225+ teste backend

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-02-09 17:51:29 +00:00
commit df4031d99c
113 changed files with 24491 additions and 0 deletions

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Space Booking Backend

View File

@@ -0,0 +1 @@
# API module

View File

@@ -0,0 +1,192 @@
"""Attachments API endpoints."""
import os
import uuid
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session, joinedload
from app.core.deps import get_current_user, get_db
from app.models.attachment import Attachment
from app.models.booking import Booking
from app.models.user import User
from app.schemas.attachment import AttachmentRead
router = APIRouter()
UPLOAD_DIR = Path(__file__).parent.parent.parent / "uploads"
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
MAX_FILES_PER_BOOKING = 5
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".pptx", ".xlsx", ".xls", ".png", ".jpg", ".jpeg"}
@router.post("/bookings/{booking_id}/attachments", response_model=AttachmentRead, status_code=201)
async def upload_attachment(
booking_id: int,
file: Annotated[UploadFile, File()],
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> AttachmentRead:
"""Upload file attachment to booking."""
# Check booking exists and user is owner
booking = db.query(Booking).filter(Booking.id == booking_id).first()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
if booking.user_id != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="Can only attach files to your own bookings")
# Check max files limit
existing_count = db.query(Attachment).filter(Attachment.booking_id == booking_id).count()
if existing_count >= MAX_FILES_PER_BOOKING:
raise HTTPException(
status_code=400, detail=f"Maximum {MAX_FILES_PER_BOOKING} files per booking"
)
# Validate file extension
if not file.filename:
raise HTTPException(status_code=400, detail="Filename is required")
file_ext = Path(file.filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400, detail=f"File type not allowed. Allowed: {', '.join(ALLOWED_EXTENSIONS)}"
)
# Read file
content = await file.read()
file_size = len(content)
# Check file size
if file_size > MAX_FILE_SIZE:
raise HTTPException(
status_code=400, detail=f"File too large. Max size: {MAX_FILE_SIZE // (1024*1024)}MB"
)
# Generate unique filename
unique_filename = f"{uuid.uuid4()}{file_ext}"
filepath = UPLOAD_DIR / unique_filename
# Save file
with open(filepath, "wb") as f:
f.write(content)
# Create attachment record
attachment = Attachment(
booking_id=booking_id,
filename=file.filename,
stored_filename=unique_filename,
filepath=str(filepath),
size=file_size,
content_type=file.content_type or "application/octet-stream",
uploaded_by=current_user.id,
)
db.add(attachment)
db.commit()
db.refresh(attachment)
return AttachmentRead(
id=attachment.id,
booking_id=attachment.booking_id,
filename=attachment.filename,
size=attachment.size,
content_type=attachment.content_type,
uploaded_by=attachment.uploaded_by,
uploader_name=current_user.full_name,
created_at=attachment.created_at,
)
@router.get("/bookings/{booking_id}/attachments", response_model=list[AttachmentRead])
def list_attachments(
booking_id: int,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> list[AttachmentRead]:
"""List all attachments for a booking."""
booking = db.query(Booking).filter(Booking.id == booking_id).first()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
attachments = (
db.query(Attachment)
.options(joinedload(Attachment.uploader))
.filter(Attachment.booking_id == booking_id)
.all()
)
return [
AttachmentRead(
id=a.id,
booking_id=a.booking_id,
filename=a.filename,
size=a.size,
content_type=a.content_type,
uploaded_by=a.uploaded_by,
uploader_name=a.uploader.full_name,
created_at=a.created_at,
)
for a in attachments
]
@router.get("/attachments/{attachment_id}/download")
def download_attachment(
attachment_id: int,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> FileResponse:
"""Download attachment file."""
attachment = (
db.query(Attachment)
.options(joinedload(Attachment.booking))
.filter(Attachment.id == attachment_id)
.first()
)
if not attachment:
raise HTTPException(status_code=404, detail="Attachment not found")
# Check if user can access (owner or admin)
if attachment.booking.user_id != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="Cannot access this attachment")
# Return file
return FileResponse(
path=attachment.filepath, filename=attachment.filename, media_type=attachment.content_type
)
@router.delete("/attachments/{attachment_id}", status_code=204)
def delete_attachment(
attachment_id: int,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> None:
"""Delete attachment."""
attachment = (
db.query(Attachment)
.options(joinedload(Attachment.booking))
.filter(Attachment.id == attachment_id)
.first()
)
if not attachment:
raise HTTPException(status_code=404, detail="Attachment not found")
# Check permission
if attachment.uploaded_by != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="Can only delete your own attachments")
# Delete file from disk
if os.path.exists(attachment.filepath):
os.remove(attachment.filepath)
# Delete record
db.delete(attachment)
db.commit()

View File

@@ -0,0 +1,59 @@
"""Audit log API endpoints."""
from datetime import datetime
from typing import Annotated, Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session, joinedload
from app.core.deps import get_current_admin, get_db
from app.models.audit_log import AuditLog
from app.models.user import User
from app.schemas.audit_log import AuditLogRead
router = APIRouter()
@router.get("/admin/audit-log", response_model=list[AuditLogRead])
def get_audit_logs(
action: Annotated[Optional[str], Query()] = None,
start_date: Annotated[Optional[datetime], Query()] = None,
end_date: Annotated[Optional[datetime], Query()] = None,
page: Annotated[int, Query(ge=1)] = 1,
limit: Annotated[int, Query(ge=1, le=100)] = 50,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin),
) -> list[AuditLogRead]:
"""
Get audit logs with filtering and pagination.
Admin only endpoint to view audit trail of administrative actions.
"""
query = db.query(AuditLog).options(joinedload(AuditLog.user))
# Apply filters
if action:
query = query.filter(AuditLog.action == action)
if start_date:
query = query.filter(AuditLog.created_at >= start_date)
if end_date:
query = query.filter(AuditLog.created_at <= end_date)
# Pagination
offset = (page - 1) * limit
logs = query.order_by(AuditLog.created_at.desc()).offset(offset).limit(limit).all()
# Map to response schema with user details
return [
AuditLogRead(
id=log.id,
action=log.action,
user_id=log.user_id,
user_name=log.user.full_name,
user_email=log.user.email,
target_type=log.target_type,
target_id=log.target_id,
details=log.details,
created_at=log.created_at,
)
for log in logs
]

217
backend/app/api/auth.py Normal file
View File

@@ -0,0 +1,217 @@
"""Authentication endpoints."""
from datetime import datetime, timedelta
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.deps import get_db
from app.core.security import create_access_token, get_password_hash, verify_password
from app.models.user import User
from app.schemas.auth import EmailVerificationRequest, LoginRequest, UserRegister
from app.schemas.user import Token
from app.services.email_service import send_email
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login", response_model=Token)
def login(
login_data: LoginRequest,
db: Annotated[Session, Depends(get_db)],
) -> Token:
"""
Login with email and password.
Returns JWT token for authenticated requests.
"""
user = db.query(User).filter(User.email == login_data.email).first()
if not user or not verify_password(login_data.password, str(user.hashed_password)):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled",
)
access_token = create_access_token(subject=int(user.id))
return Token(access_token=access_token, token_type="bearer")
@router.post("/register", status_code=201)
async def register(
data: UserRegister,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
) -> dict:
"""
Register new user with email verification.
Creates an inactive user account and sends verification email.
"""
# Check if email already exists
existing = db.query(User).filter(User.email == data.email).first()
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
# Create user (inactive until verified)
user = User(
email=data.email,
hashed_password=get_password_hash(data.password),
full_name=data.full_name,
organization=data.organization,
role="user", # Default role
is_active=False, # Inactive until email verified
)
db.add(user)
db.commit()
db.refresh(user)
# Generate verification token (JWT, expires in 24h)
verification_token = jwt.encode(
{
"sub": str(user.id),
"type": "email_verification",
"exp": datetime.utcnow() + timedelta(hours=24),
},
settings.secret_key,
algorithm="HS256",
)
# Send verification email (background task)
verification_link = f"{settings.frontend_url}/verify?token={verification_token}"
subject = "Verifică contul tău - Space Booking"
body = f"""Bună ziua {user.full_name},
Bine ai venit pe platforma Space Booking!
Pentru a-ți activa contul, te rugăm să accesezi link-ul de mai jos:
{verification_link}
Link-ul va expira în 24 de ore.
Dacă nu ai creat acest cont, te rugăm să ignori acest email.
Cu stimă,
Echipa Space Booking
"""
background_tasks.add_task(send_email, user.email, subject, body)
return {
"message": "Registration successful. Please check your email to verify your account.",
"email": user.email,
}
@router.post("/verify")
def verify_email(
data: EmailVerificationRequest,
db: Annotated[Session, Depends(get_db)],
) -> dict:
"""
Verify email address with token.
Activates the user account if token is valid.
"""
try:
# Decode token
payload = jwt.decode(
data.token,
settings.secret_key,
algorithms=["HS256"],
)
# Check token type
if payload.get("type") != "email_verification":
raise HTTPException(status_code=400, detail="Invalid verification token")
user_id = int(payload.get("sub"))
# Get user
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Check if already verified
if user.is_active:
return {"message": "Email already verified"}
# Activate user
user.is_active = True
db.commit()
return {"message": "Email verified successfully. You can now log in."}
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=400,
detail="Verification link expired. Please request a new one.",
)
except JWTError:
raise HTTPException(status_code=400, detail="Invalid verification token")
@router.post("/resend-verification")
async def resend_verification(
email: str,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
) -> dict:
"""
Resend verification email.
For security, always returns success even if email doesn't exist.
"""
user = db.query(User).filter(User.email == email).first()
if not user:
# Don't reveal if email exists
return {"message": "If the email exists, a verification link has been sent."}
if user.is_active:
raise HTTPException(status_code=400, detail="Account already verified")
# Generate new token
verification_token = jwt.encode(
{
"sub": str(user.id),
"type": "email_verification",
"exp": datetime.utcnow() + timedelta(hours=24),
},
settings.secret_key,
algorithm="HS256",
)
# Send email
verification_link = f"{settings.frontend_url}/verify?token={verification_token}"
subject = "Verifică contul tău - Space Booking"
body = f"""Bună ziua {user.full_name},
Ai solicitat un nou link de verificare pentru contul tău pe Space Booking.
Pentru a-ți activa contul, te rugăm să accesezi link-ul de mai jos:
{verification_link}
Link-ul va expira în 24 de ore.
Cu stimă,
Echipa Space Booking
"""
background_tasks.add_task(send_email, user.email, subject, body)
return {"message": "If the email exists, a verification link has been sent."}

View File

@@ -0,0 +1,229 @@
"""Booking template endpoints."""
from datetime import datetime, timedelta
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from sqlalchemy.orm import Session, joinedload
from app.core.deps import get_current_user, get_db
from app.models.booking import Booking
from app.models.booking_template import BookingTemplate
from app.models.user import User
from app.schemas.booking import BookingResponse
from app.schemas.booking_template import BookingTemplateCreate, BookingTemplateRead
from app.services.booking_service import validate_booking_rules
from app.services.email_service import send_booking_notification
from app.services.notification_service import create_notification
router = APIRouter(prefix="/booking-templates", tags=["booking-templates"])
@router.post("", response_model=BookingTemplateRead, status_code=status.HTTP_201_CREATED)
def create_template(
data: BookingTemplateCreate,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> BookingTemplateRead:
"""
Create a new booking template.
- **name**: Template name (e.g., "Weekly Team Sync")
- **space_id**: Optional default space ID
- **duration_minutes**: Default duration in minutes
- **title**: Default booking title
- **description**: Optional default description
Returns the created template.
"""
template = BookingTemplate(
user_id=current_user.id, # type: ignore[arg-type]
name=data.name,
space_id=data.space_id,
duration_minutes=data.duration_minutes,
title=data.title,
description=data.description,
)
db.add(template)
db.commit()
db.refresh(template)
return BookingTemplateRead(
id=template.id, # type: ignore[arg-type]
user_id=template.user_id, # type: ignore[arg-type]
name=template.name, # type: ignore[arg-type]
space_id=template.space_id, # type: ignore[arg-type]
space_name=template.space.name if template.space else None, # type: ignore[union-attr]
duration_minutes=template.duration_minutes, # type: ignore[arg-type]
title=template.title, # type: ignore[arg-type]
description=template.description, # type: ignore[arg-type]
usage_count=template.usage_count, # type: ignore[arg-type]
)
@router.get("", response_model=list[BookingTemplateRead])
def list_templates(
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> list[BookingTemplateRead]:
"""
List all booking templates for the current user.
Returns templates sorted by name.
"""
templates = (
db.query(BookingTemplate)
.options(joinedload(BookingTemplate.space))
.filter(BookingTemplate.user_id == current_user.id)
.order_by(BookingTemplate.name)
.all()
)
return [
BookingTemplateRead(
id=t.id, # type: ignore[arg-type]
user_id=t.user_id, # type: ignore[arg-type]
name=t.name, # type: ignore[arg-type]
space_id=t.space_id, # type: ignore[arg-type]
space_name=t.space.name if t.space else None, # type: ignore[union-attr]
duration_minutes=t.duration_minutes, # type: ignore[arg-type]
title=t.title, # type: ignore[arg-type]
description=t.description, # type: ignore[arg-type]
usage_count=t.usage_count, # type: ignore[arg-type]
)
for t in templates
]
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_template(
id: int,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> None:
"""
Delete a booking template.
Users can only delete their own templates.
"""
template = (
db.query(BookingTemplate)
.filter(
BookingTemplate.id == id,
BookingTemplate.user_id == current_user.id,
)
.first()
)
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
db.delete(template)
db.commit()
return None
@router.post("/from-template/{template_id}", response_model=BookingResponse, status_code=status.HTTP_201_CREATED)
def create_booking_from_template(
template_id: int,
start_datetime: datetime,
background_tasks: BackgroundTasks,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Booking:
"""
Create a booking from a template.
- **template_id**: ID of the template to use
- **start_datetime**: When the booking should start (ISO format)
The booking will use the template's space, title, description, and duration.
The end time is calculated automatically based on the template's duration.
Returns the created booking with status "pending" (requires admin approval).
"""
# Find template
template = (
db.query(BookingTemplate)
.filter(
BookingTemplate.id == template_id,
BookingTemplate.user_id == current_user.id,
)
.first()
)
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
if not template.space_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Template does not have a default space",
)
# Calculate end time
end_datetime = start_datetime + timedelta(minutes=template.duration_minutes) # type: ignore[arg-type]
# Validate booking rules
user_id = int(current_user.id) # type: ignore[arg-type]
errors = validate_booking_rules(
db=db,
space_id=int(template.space_id), # type: ignore[arg-type]
start_datetime=start_datetime,
end_datetime=end_datetime,
user_id=user_id,
)
if errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=errors[0], # Return first error
)
# Create booking
booking = Booking(
user_id=user_id,
space_id=template.space_id, # type: ignore[arg-type]
title=template.title, # type: ignore[arg-type]
description=template.description, # type: ignore[arg-type]
start_datetime=start_datetime,
end_datetime=end_datetime,
status="pending",
created_at=datetime.utcnow(),
)
db.add(booking)
# Increment usage count
template.usage_count = int(template.usage_count) + 1 # type: ignore[arg-type, assignment]
db.commit()
db.refresh(booking)
# Notify all admins about the new booking request
admins = db.query(User).filter(User.role == "admin").all()
for admin in admins:
create_notification(
db=db,
user_id=admin.id, # type: ignore[arg-type]
type="booking_created",
title="Noua Cerere de Rezervare",
message=f"Utilizatorul {current_user.full_name} a solicitat rezervarea spațiului {template.space.name} pentru {booking.start_datetime.strftime('%d.%m.%Y %H:%M')}", # type: ignore[union-attr, union-attr]
booking_id=booking.id,
)
# Send email notification to admin
background_tasks.add_task(
send_booking_notification,
booking,
"created",
admin.email,
current_user.full_name,
None,
)
return booking

1155
backend/app/api/bookings.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,176 @@
"""Google Calendar integration endpoints."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, status
from google_auth_oauthlib.flow import Flow
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.deps import get_current_user, get_db
from app.models.google_calendar_token import GoogleCalendarToken
from app.models.user import User
router = APIRouter()
@router.get("/integrations/google/connect")
def connect_google(
current_user: Annotated[User, Depends(get_current_user)],
) -> dict[str, str]:
"""
Start Google OAuth flow.
Returns authorization URL that user should visit to grant access.
"""
if not settings.google_client_id or not settings.google_client_secret:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Google Calendar integration not configured",
)
try:
flow = Flow.from_client_config(
{
"web": {
"client_id": settings.google_client_id,
"client_secret": settings.google_client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [settings.google_redirect_uri],
}
},
scopes=[
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/calendar.events",
],
redirect_uri=settings.google_redirect_uri,
)
authorization_url, state = flow.authorization_url(
access_type="offline", include_granted_scopes="true", prompt="consent"
)
# Note: In production, store state in session/cache and validate it in callback
return {"authorization_url": authorization_url, "state": state}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start OAuth flow: {str(e)}",
)
@router.get("/integrations/google/callback")
def google_callback(
code: Annotated[str, Query()],
state: Annotated[str, Query()],
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> dict[str, str]:
"""
Handle Google OAuth callback.
Exchange authorization code for tokens and store them.
"""
if not settings.google_client_id or not settings.google_client_secret:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Google Calendar integration not configured",
)
try:
flow = Flow.from_client_config(
{
"web": {
"client_id": settings.google_client_id,
"client_secret": settings.google_client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [settings.google_redirect_uri],
}
},
scopes=[
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/calendar.events",
],
redirect_uri=settings.google_redirect_uri,
state=state,
)
# Exchange code for tokens
flow.fetch_token(code=code)
credentials = flow.credentials
# Store tokens
token_record = (
db.query(GoogleCalendarToken)
.filter(GoogleCalendarToken.user_id == current_user.id)
.first()
)
if token_record:
token_record.access_token = credentials.token # type: ignore[assignment]
token_record.refresh_token = credentials.refresh_token # type: ignore[assignment]
token_record.token_expiry = credentials.expiry # type: ignore[assignment]
else:
token_record = GoogleCalendarToken(
user_id=current_user.id, # type: ignore[arg-type]
access_token=credentials.token,
refresh_token=credentials.refresh_token,
token_expiry=credentials.expiry,
)
db.add(token_record)
db.commit()
return {"message": "Google Calendar connected successfully"}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"OAuth failed: {str(e)}",
)
@router.delete("/integrations/google/disconnect")
def disconnect_google(
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> dict[str, str]:
"""
Disconnect Google Calendar.
Removes stored tokens for the current user.
"""
token_record = (
db.query(GoogleCalendarToken)
.filter(GoogleCalendarToken.user_id == current_user.id)
.first()
)
if token_record:
db.delete(token_record)
db.commit()
return {"message": "Google Calendar disconnected"}
@router.get("/integrations/google/status")
def google_status(
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> dict[str, bool | str | None]:
"""
Check Google Calendar connection status.
Returns whether user has connected their Google Calendar account.
"""
token_record = (
db.query(GoogleCalendarToken)
.filter(GoogleCalendarToken.user_id == current_user.id)
.first()
)
return {
"connected": token_record is not None,
"expires_at": token_record.token_expiry.isoformat() if token_record and token_record.token_expiry else None,
}

View File

@@ -0,0 +1,67 @@
"""Notifications API endpoints."""
from typing import Annotated, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.core.deps import get_current_user, get_db
from app.models.notification import Notification
from app.models.user import User
from app.schemas.notification import NotificationRead
router = APIRouter(prefix="/notifications", tags=["notifications"])
@router.get("", response_model=list[NotificationRead])
def get_notifications(
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
is_read: Optional[bool] = Query(None, description="Filter by read status"),
) -> list[Notification]:
"""
Get notifications for the current user.
Optional filter by read status (true/false/all).
Returns notifications ordered by created_at DESC.
"""
query = db.query(Notification).filter(Notification.user_id == current_user.id)
if is_read is not None:
query = query.filter(Notification.is_read == is_read)
notifications = query.order_by(Notification.created_at.desc()).all()
return notifications
@router.put("/{notification_id}/read", response_model=NotificationRead)
def mark_notification_as_read(
notification_id: int,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Notification:
"""
Mark a notification as read.
Verifies the notification belongs to the current user.
"""
notification = (
db.query(Notification).filter(Notification.id == notification_id).first()
)
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Notification not found",
)
if notification.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only mark your own notifications as read",
)
notification.is_read = True
db.commit()
db.refresh(notification)
return notification

218
backend/app/api/reports.py Normal file
View File

@@ -0,0 +1,218 @@
"""Reports API endpoints."""
from datetime import date, datetime
from typing import Annotated
from fastapi import APIRouter, Depends, Query
from sqlalchemy import and_, case, func
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.deps import get_current_admin, get_db
from app.models.booking import Booking
from app.models.space import Space
from app.models.user import User
from app.schemas.reports import (
ApprovalRateReport,
SpaceUsageItem,
SpaceUsageReport,
TopUserItem,
TopUsersReport,
)
router = APIRouter()
def calculate_hours_expr() -> any:
"""Get database-specific expression for calculating hours between datetimes."""
if "sqlite" in settings.database_url:
# SQLite: Use julianday function
# Returns difference in days, multiply by 24 to get hours
return (
func.julianday(Booking.end_datetime)
- func.julianday(Booking.start_datetime)
) * 24
else:
# PostgreSQL: Use EXTRACT(EPOCH)
return func.extract("epoch", Booking.end_datetime - Booking.start_datetime) / 3600
@router.get("/admin/reports/usage", response_model=SpaceUsageReport)
def get_usage_report(
start_date: date | None = Query(None),
end_date: date | None = Query(None),
space_id: int | None = Query(None),
db: Annotated[Session, Depends(get_db)] = None,
current_admin: Annotated[User, Depends(get_current_admin)] = None,
) -> SpaceUsageReport:
"""Get booking usage report by space."""
query = (
db.query(
Booking.space_id,
Space.name.label("space_name"),
func.count(Booking.id).label("total_bookings"),
func.sum(case((Booking.status == "approved", 1), else_=0)).label(
"approved_bookings"
),
func.sum(case((Booking.status == "pending", 1), else_=0)).label(
"pending_bookings"
),
func.sum(case((Booking.status == "rejected", 1), else_=0)).label(
"rejected_bookings"
),
func.sum(case((Booking.status == "canceled", 1), else_=0)).label(
"canceled_bookings"
),
func.sum(calculate_hours_expr()).label("total_hours"),
)
.join(Space)
.group_by(Booking.space_id, Space.name)
)
# Apply filters
filters = []
if start_date:
filters.append(
Booking.start_datetime
>= datetime.combine(start_date, datetime.min.time())
)
if end_date:
filters.append(
Booking.start_datetime <= datetime.combine(end_date, datetime.max.time())
)
if space_id:
filters.append(Booking.space_id == space_id)
if filters:
query = query.filter(and_(*filters))
results = query.all()
items = [
SpaceUsageItem(
space_id=r.space_id,
space_name=r.space_name,
total_bookings=r.total_bookings,
approved_bookings=r.approved_bookings or 0,
pending_bookings=r.pending_bookings or 0,
rejected_bookings=r.rejected_bookings or 0,
canceled_bookings=r.canceled_bookings or 0,
total_hours=round(float(r.total_hours or 0), 2),
)
for r in results
]
return SpaceUsageReport(
items=items,
total_bookings=sum(item.total_bookings for item in items),
date_range={"start": start_date, "end": end_date},
)
@router.get("/admin/reports/top-users", response_model=TopUsersReport)
def get_top_users_report(
start_date: date | None = Query(None),
end_date: date | None = Query(None),
limit: int = Query(10, ge=1, le=100),
db: Annotated[Session, Depends(get_db)] = None,
current_admin: Annotated[User, Depends(get_current_admin)] = None,
) -> TopUsersReport:
"""Get top users by booking count."""
query = (
db.query(
Booking.user_id,
User.full_name.label("user_name"),
User.email.label("user_email"),
func.count(Booking.id).label("total_bookings"),
func.sum(case((Booking.status == "approved", 1), else_=0)).label(
"approved_bookings"
),
func.sum(calculate_hours_expr()).label("total_hours"),
)
.join(User, Booking.user_id == User.id)
.group_by(Booking.user_id, User.full_name, User.email)
)
# Apply date filters
if start_date:
query = query.filter(
Booking.start_datetime
>= datetime.combine(start_date, datetime.min.time())
)
if end_date:
query = query.filter(
Booking.start_datetime <= datetime.combine(end_date, datetime.max.time())
)
# Order by total bookings desc
query = query.order_by(func.count(Booking.id).desc()).limit(limit)
results = query.all()
items = [
TopUserItem(
user_id=r.user_id,
user_name=r.user_name,
user_email=r.user_email,
total_bookings=r.total_bookings,
approved_bookings=r.approved_bookings or 0,
total_hours=round(float(r.total_hours or 0), 2),
)
for r in results
]
return TopUsersReport(
items=items,
date_range={"start": start_date, "end": end_date},
)
@router.get("/admin/reports/approval-rate", response_model=ApprovalRateReport)
def get_approval_rate_report(
start_date: date | None = Query(None),
end_date: date | None = Query(None),
db: Annotated[Session, Depends(get_db)] = None,
current_admin: Annotated[User, Depends(get_current_admin)] = None,
) -> ApprovalRateReport:
"""Get approval/rejection rate report."""
query = db.query(
func.count(Booking.id).label("total"),
func.sum(case((Booking.status == "approved", 1), else_=0)).label("approved"),
func.sum(case((Booking.status == "rejected", 1), else_=0)).label("rejected"),
func.sum(case((Booking.status == "pending", 1), else_=0)).label("pending"),
func.sum(case((Booking.status == "canceled", 1), else_=0)).label("canceled"),
)
# Apply date filters
if start_date:
query = query.filter(
Booking.start_datetime
>= datetime.combine(start_date, datetime.min.time())
)
if end_date:
query = query.filter(
Booking.start_datetime <= datetime.combine(end_date, datetime.max.time())
)
result = query.first()
total = result.total or 0
approved = result.approved or 0
rejected = result.rejected or 0
pending = result.pending or 0
canceled = result.canceled or 0
# Calculate rates (exclude pending from denominator)
decided = approved + rejected
approval_rate = (approved / decided * 100) if decided > 0 else 0
rejection_rate = (rejected / decided * 100) if decided > 0 else 0
return ApprovalRateReport(
total_requests=total,
approved=approved,
rejected=rejected,
pending=pending,
canceled=canceled,
approval_rate=round(approval_rate, 2),
rejection_rate=round(rejection_rate, 2),
date_range={"start": start_date, "end": end_date},
)

131
backend/app/api/settings.py Normal file
View File

@@ -0,0 +1,131 @@
"""Settings management endpoints."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.deps import get_current_admin, get_db
from app.models.settings import Settings
from app.models.user import User
from app.schemas.settings import SettingsResponse, SettingsUpdate
from app.services.audit_service import log_action
router = APIRouter(prefix="/admin/settings", tags=["admin"])
@router.get("", response_model=SettingsResponse)
def get_settings(
db: Annotated[Session, Depends(get_db)],
_: Annotated[User, Depends(get_current_admin)],
) -> Settings:
"""
Get global settings (admin only).
Returns the current global booking rules.
"""
settings = db.query(Settings).filter(Settings.id == 1).first()
if not settings:
# Create default settings if not exist
settings = Settings(
id=1,
min_duration_minutes=30,
max_duration_minutes=480,
working_hours_start=8,
working_hours_end=20,
max_bookings_per_day_per_user=3,
min_hours_before_cancel=2,
)
db.add(settings)
db.commit()
db.refresh(settings)
return settings
@router.put("", response_model=SettingsResponse)
def update_settings(
settings_data: SettingsUpdate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Settings:
"""
Update global settings (admin only).
All booking rules are validated on the client side and applied
to all new booking requests.
"""
settings = db.query(Settings).filter(Settings.id == 1).first()
if not settings:
# Create if not exist
settings = Settings(id=1)
db.add(settings)
# Validate: min_duration <= max_duration
if settings_data.min_duration_minutes > settings_data.max_duration_minutes:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Minimum duration cannot be greater than maximum duration",
)
# Validate: working_hours_start < working_hours_end
if settings_data.working_hours_start >= settings_data.working_hours_end:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Working hours start must be before working hours end",
)
# Track which fields changed
changed_fields = {}
if settings.min_duration_minutes != settings_data.min_duration_minutes:
changed_fields["min_duration_minutes"] = {
"old": settings.min_duration_minutes,
"new": settings_data.min_duration_minutes
}
if settings.max_duration_minutes != settings_data.max_duration_minutes:
changed_fields["max_duration_minutes"] = {
"old": settings.max_duration_minutes,
"new": settings_data.max_duration_minutes
}
if settings.working_hours_start != settings_data.working_hours_start:
changed_fields["working_hours_start"] = {
"old": settings.working_hours_start,
"new": settings_data.working_hours_start
}
if settings.working_hours_end != settings_data.working_hours_end:
changed_fields["working_hours_end"] = {
"old": settings.working_hours_end,
"new": settings_data.working_hours_end
}
if settings.max_bookings_per_day_per_user != settings_data.max_bookings_per_day_per_user:
changed_fields["max_bookings_per_day_per_user"] = {
"old": settings.max_bookings_per_day_per_user,
"new": settings_data.max_bookings_per_day_per_user
}
if settings.min_hours_before_cancel != settings_data.min_hours_before_cancel:
changed_fields["min_hours_before_cancel"] = {
"old": settings.min_hours_before_cancel,
"new": settings_data.min_hours_before_cancel
}
# Update all fields
setattr(settings, "min_duration_minutes", settings_data.min_duration_minutes)
setattr(settings, "max_duration_minutes", settings_data.max_duration_minutes)
setattr(settings, "working_hours_start", settings_data.working_hours_start)
setattr(settings, "working_hours_end", settings_data.working_hours_end)
setattr(settings, "max_bookings_per_day_per_user", settings_data.max_bookings_per_day_per_user)
setattr(settings, "min_hours_before_cancel", settings_data.min_hours_before_cancel)
db.commit()
db.refresh(settings)
# Log the action
log_action(
db=db,
action="settings_updated",
user_id=current_admin.id,
target_type="settings",
target_id=1, # Settings ID is always 1 (singleton)
details={"changed_fields": changed_fields}
)
return settings

169
backend/app/api/spaces.py Normal file
View File

@@ -0,0 +1,169 @@
"""Space management endpoints."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.deps import get_current_admin, get_current_user, get_db
from app.models.space import Space
from app.models.user import User
from app.schemas.space import SpaceCreate, SpaceResponse, SpaceStatusUpdate, SpaceUpdate
from app.services.audit_service import log_action
router = APIRouter(prefix="/spaces", tags=["spaces"])
admin_router = APIRouter(prefix="/admin/spaces", tags=["admin"])
@router.get("", response_model=list[SpaceResponse])
def list_spaces(
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> list[Space]:
"""
Get list of spaces.
- Users see only active spaces
- Admins see all spaces (active + inactive)
"""
query = db.query(Space)
# Filter by active status for non-admin users
if current_user.role != "admin":
query = query.filter(Space.is_active == True) # noqa: E712
spaces = query.order_by(Space.name).all()
return spaces
@admin_router.post("", response_model=SpaceResponse, status_code=status.HTTP_201_CREATED)
def create_space(
space_data: SpaceCreate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Space:
"""
Create a new space (admin only).
- name: required, non-empty
- type: "sala" or "birou"
- capacity: must be > 0
- description: optional
"""
# Check if space with same name exists
existing = db.query(Space).filter(Space.name == space_data.name).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Space with name '{space_data.name}' already exists",
)
space = Space(
name=space_data.name,
type=space_data.type,
capacity=space_data.capacity,
description=space_data.description,
is_active=True,
)
db.add(space)
db.commit()
db.refresh(space)
# Log the action
log_action(
db=db,
action="space_created",
user_id=current_admin.id,
target_type="space",
target_id=space.id,
details={"name": space.name, "type": space.type, "capacity": space.capacity}
)
return space
@admin_router.put("/{space_id}", response_model=SpaceResponse)
def update_space(
space_id: int,
space_data: SpaceUpdate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> Space:
"""
Update an existing space (admin only).
All fields are required (full update).
"""
space = db.query(Space).filter(Space.id == space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
# Check if new name conflicts with another space
if space_data.name != space.name:
existing = db.query(Space).filter(Space.name == space_data.name).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Space with name '{space_data.name}' already exists",
)
# Track what changed
updated_fields = []
if space.name != space_data.name:
updated_fields.append("name")
if space.type != space_data.type:
updated_fields.append("type")
if space.capacity != space_data.capacity:
updated_fields.append("capacity")
if space.description != space_data.description:
updated_fields.append("description")
setattr(space, "name", space_data.name)
setattr(space, "type", space_data.type)
setattr(space, "capacity", space_data.capacity)
setattr(space, "description", space_data.description)
db.commit()
db.refresh(space)
# Log the action
log_action(
db=db,
action="space_updated",
user_id=current_admin.id,
target_type="space",
target_id=space.id,
details={"updated_fields": updated_fields}
)
return space
@admin_router.patch("/{space_id}/status", response_model=SpaceResponse)
def update_space_status(
space_id: int,
status_data: SpaceStatusUpdate,
db: Annotated[Session, Depends(get_db)],
_: Annotated[User, Depends(get_current_admin)],
) -> Space:
"""
Activate or deactivate a space (admin only).
Deactivated spaces will not appear in booking lists for users.
"""
space = db.query(Space).filter(Space.id == space_id).first()
if not space:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Space not found",
)
setattr(space, "is_active", status_data.is_active)
db.commit()
db.refresh(space)
return space

267
backend/app/api/users.py Normal file
View File

@@ -0,0 +1,267 @@
"""User endpoints."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.deps import get_current_admin, get_current_user, get_db
from app.core.security import get_password_hash
from app.models.user import User
from app.schemas.user import (
ResetPasswordRequest,
UserCreate,
UserResponse,
UserStatusUpdate,
UserUpdate,
)
from app.services.audit_service import log_action
from app.utils.timezone import get_available_timezones
router = APIRouter(prefix="/users", tags=["users"])
admin_router = APIRouter(prefix="/admin/users", tags=["admin"])
class TimezoneUpdate(BaseModel):
"""Schema for updating user timezone."""
timezone: str
@router.get("/me", response_model=UserResponse)
def get_current_user_info(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Get current user information."""
return current_user
@router.get("/timezones", response_model=list[str])
def list_timezones() -> list[str]:
"""Get list of available timezones."""
return get_available_timezones()
@router.put("/me/timezone")
def update_timezone(
data: TimezoneUpdate,
db: Annotated[Session, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
):
"""Update user timezone preference."""
# Validate timezone
import pytz
try:
pytz.timezone(data.timezone)
except pytz.exceptions.UnknownTimeZoneError:
raise HTTPException(status_code=400, detail="Invalid timezone")
current_user.timezone = data.timezone # type: ignore[assignment]
db.commit()
return {"message": "Timezone updated", "timezone": data.timezone}
@admin_router.get("", response_model=list[UserResponse])
def list_users(
db: Annotated[Session, Depends(get_db)],
_: Annotated[User, Depends(get_current_admin)],
role: str | None = None,
organization: str | None = None,
) -> list[User]:
"""
Get list of users (admin only).
Supports filtering by role and organization.
"""
query = db.query(User)
if role:
query = query.filter(User.role == role)
if organization:
query = query.filter(User.organization == organization)
users = query.order_by(User.full_name).all()
return users
@admin_router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
user_data: UserCreate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> User:
"""
Create a new user (admin only).
- email: must be unique
- password: will be hashed
- role: "admin" or "user"
- organization: optional
"""
# Check if user with same email exists
existing = db.query(User).filter(User.email == user_data.email).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"User with email '{user_data.email}' already exists",
)
# Validate role
if user_data.role not in ["admin", "user"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Role must be 'admin' or 'user'",
)
user = User(
email=user_data.email,
full_name=user_data.full_name,
hashed_password=get_password_hash(user_data.password),
role=user_data.role,
organization=user_data.organization,
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
# Log the action
log_action(
db=db,
action="user_created",
user_id=current_admin.id,
target_type="user",
target_id=user.id,
details={"email": user.email, "role": user.role}
)
return user
@admin_router.put("/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
user_data: UserUpdate,
db: Annotated[Session, Depends(get_db)],
current_admin: Annotated[User, Depends(get_current_admin)],
) -> User:
"""
Update an existing user (admin only).
Only fields provided in request will be updated (partial update).
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Check if new email conflicts with another user
if user_data.email and user_data.email != user.email:
existing = db.query(User).filter(User.email == user_data.email).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"User with email '{user_data.email}' already exists",
)
# Validate role
if user_data.role and user_data.role not in ["admin", "user"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Role must be 'admin' or 'user'",
)
# Track what changed
updated_fields = []
if user_data.email is not None and user_data.email != user.email:
updated_fields.append("email")
if user_data.full_name is not None and user_data.full_name != user.full_name:
updated_fields.append("full_name")
if user_data.role is not None and user_data.role != user.role:
updated_fields.append("role")
if user_data.organization is not None and user_data.organization != user.organization:
updated_fields.append("organization")
# Update only provided fields
if user_data.email is not None:
setattr(user, "email", user_data.email)
if user_data.full_name is not None:
setattr(user, "full_name", user_data.full_name)
if user_data.role is not None:
setattr(user, "role", user_data.role)
if user_data.organization is not None:
setattr(user, "organization", user_data.organization)
db.commit()
db.refresh(user)
# Log the action
log_action(
db=db,
action="user_updated",
user_id=current_admin.id,
target_type="user",
target_id=user.id,
details={"updated_fields": updated_fields}
)
return user
@admin_router.patch("/{user_id}/status", response_model=UserResponse)
def update_user_status(
user_id: int,
status_data: UserStatusUpdate,
db: Annotated[Session, Depends(get_db)],
_: Annotated[User, Depends(get_current_admin)],
) -> User:
"""
Activate or deactivate a user (admin only).
Deactivated users cannot log in.
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
setattr(user, "is_active", status_data.is_active)
db.commit()
db.refresh(user)
return user
@admin_router.post("/{user_id}/reset-password", response_model=UserResponse)
def reset_user_password(
user_id: int,
reset_data: ResetPasswordRequest,
db: Annotated[Session, Depends(get_db)],
_: Annotated[User, Depends(get_current_admin)],
) -> User:
"""
Reset a user's password (admin only).
Password will be hashed before storing.
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
setattr(user, "hashed_password", get_password_hash(reset_data.new_password))
db.commit()
db.refresh(user)
return user

View File

@@ -0,0 +1 @@
# Core module

View File

@@ -0,0 +1,48 @@
"""Application configuration."""
from typing import List
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False
)
# App
app_name: str = "Space Booking API"
debug: bool = False
# Database
database_url: str = "sqlite:///./space_booking.db"
# JWT
secret_key: str = "your-secret-key-change-in-production"
algorithm: str = "HS256"
access_token_expire_minutes: int = 1440 # 24 hours
# SMTP
smtp_host: str = "localhost"
smtp_port: int = 1025 # MailHog default
smtp_user: str = ""
smtp_password: str = ""
smtp_from_address: str = "noreply@space-booking.local"
smtp_enabled: bool = False # Disable by default for dev
# Frontend
frontend_url: str = "http://localhost:5173"
# Google Calendar OAuth
google_client_id: str = ""
google_client_secret: str = ""
google_redirect_uri: str = "http://localhost:8000/api/integrations/google/callback"
google_scopes: List[str] = [
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/calendar.events"
]
settings = Settings()

52
backend/app/core/deps.py Normal file
View File

@@ -0,0 +1,52 @@
"""Dependencies for FastAPI routes."""
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import get_db
from app.models.user import User
security = HTTPBearer()
def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
db: Annotated[Session, Depends(get_db)],
) -> User:
"""Get current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = credentials.credentials
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
user_id: str | None = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.id == int(user_id)).first()
if user is None or not user.is_active:
raise credentials_exception
return user
def get_current_admin(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Verify current user is admin."""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
return current_user

View File

@@ -0,0 +1,36 @@
"""Security utilities for authentication and authorization."""
from datetime import datetime, timedelta
from typing import Any
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash."""
result: bool = pwd_context.verify(plain_password, hashed_password)
return result
def get_password_hash(password: str) -> str:
"""Generate password hash."""
hashed: str = pwd_context.hash(password)
return hashed
def create_access_token(subject: str | int, expires_delta: timedelta | None = None) -> str:
"""Create JWT access token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.access_token_expire_minutes
)
to_encode: dict[str, Any] = {"exp": expire, "sub": str(subject)}
encoded_jwt: str = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt

View File

@@ -0,0 +1 @@
# Database module

25
backend/app/db/session.py Normal file
View File

@@ -0,0 +1,25 @@
"""Database session management."""
from collections.abc import Generator
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, declarative_base, sessionmaker
from app.core.config import settings
engine = create_engine(
settings.database_url,
connect_args={"check_same_thread": False} if "sqlite" in settings.database_url else {},
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db() -> Generator[Session, None, None]:
"""Get database session."""
db = SessionLocal()
try:
yield db
finally:
db.close()

64
backend/app/main.py Normal file
View File

@@ -0,0 +1,64 @@
"""FastAPI application entry point."""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.attachments import router as attachments_router
from app.api.audit_log import router as audit_log_router
from app.api.auth import router as auth_router
from app.api.booking_templates import router as booking_templates_router
from app.api.bookings import admin_router as bookings_admin_router
from app.api.bookings import bookings_router
from app.api.bookings import router as spaces_bookings_router
from app.api.google_calendar import router as google_calendar_router
from app.api.notifications import router as notifications_router
from app.api.reports import router as reports_router
from app.api.settings import router as settings_router
from app.api.spaces import admin_router as spaces_admin_router
from app.api.spaces import router as spaces_router
from app.api.users import admin_router as users_admin_router
from app.api.users import router as users_router
from app.core.config import settings
from app.db.session import Base, engine
# Create database tables
Base.metadata.create_all(bind=engine)
app = FastAPI(title=settings.app_name)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173"], # Frontend dev server
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(auth_router, prefix="/api")
app.include_router(users_router, prefix="/api")
app.include_router(users_admin_router, prefix="/api")
app.include_router(spaces_router, prefix="/api")
app.include_router(spaces_admin_router, prefix="/api")
app.include_router(spaces_bookings_router, prefix="/api")
app.include_router(bookings_router, prefix="/api")
app.include_router(bookings_admin_router, prefix="/api")
app.include_router(booking_templates_router, prefix="/api")
app.include_router(settings_router, prefix="/api")
app.include_router(notifications_router, prefix="/api")
app.include_router(audit_log_router, prefix="/api", tags=["audit-log"])
app.include_router(attachments_router, prefix="/api", tags=["attachments"])
app.include_router(reports_router, prefix="/api", tags=["reports"])
app.include_router(google_calendar_router, prefix="/api", tags=["google-calendar"])
@app.get("/")
def root() -> dict[str, str]:
"""Root endpoint."""
return {"message": "Space Booking API"}
@app.get("/health")
def health() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "ok"}

View File

@@ -0,0 +1,11 @@
"""Models package."""
from app.models.attachment import Attachment
from app.models.audit_log import AuditLog
from app.models.booking import Booking
from app.models.booking_template import BookingTemplate
from app.models.notification import Notification
from app.models.settings import Settings
from app.models.space import Space
from app.models.user import User
__all__ = ["User", "Space", "Settings", "Booking", "BookingTemplate", "Notification", "AuditLog", "Attachment"]

View File

@@ -0,0 +1,27 @@
"""Attachment model."""
from datetime import datetime
from sqlalchemy import BigInteger, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.session import Base
class Attachment(Base):
"""Attachment model for booking files."""
__tablename__ = "attachments"
id = Column(Integer, primary_key=True, index=True)
booking_id = Column(Integer, ForeignKey("bookings.id"), nullable=False, index=True)
filename = Column(String(255), nullable=False) # Original filename
stored_filename = Column(String(255), nullable=False) # UUID-based filename
filepath = Column(String(500), nullable=False) # Full path
size = Column(BigInteger, nullable=False) # File size in bytes
content_type = Column(String(100), nullable=False)
uploaded_by = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
# Relationships
booking = relationship("Booking", back_populates="attachments")
uploader = relationship("User")

View File

@@ -0,0 +1,24 @@
"""AuditLog model for tracking admin actions."""
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, JSON, String
from sqlalchemy.orm import relationship
from app.db.session import Base
class AuditLog(Base):
"""Audit log for tracking admin actions."""
__tablename__ = "audit_logs"
id = Column(Integer, primary_key=True, index=True)
action = Column(String(100), nullable=False, index=True) # booking_approved, space_created, etc
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
target_type = Column(String(50), nullable=False, index=True) # booking, space, user, settings
target_id = Column(Integer, nullable=False)
details = Column(JSON, nullable=True) # Additional info (reasons, changed fields, etc)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
# Relationships
user = relationship("User", back_populates="audit_logs")

View File

@@ -0,0 +1,36 @@
"""Booking model."""
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.session import Base
class Booking(Base):
"""Booking model for space reservations."""
__tablename__ = "bookings"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
space_id = Column(Integer, ForeignKey("spaces.id"), nullable=False, index=True)
title = Column(String, nullable=False)
description = Column(String, nullable=True)
start_datetime = Column(DateTime, nullable=False, index=True)
end_datetime = Column(DateTime, nullable=False, index=True)
status = Column(
String, nullable=False, default="pending", index=True
) # pending/approved/rejected/canceled
rejection_reason = Column(String, nullable=True)
cancellation_reason = Column(String, nullable=True)
approved_by = Column(Integer, ForeignKey("users.id"), nullable=True)
google_calendar_event_id = Column(String, nullable=True) # Store Google Calendar event ID
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
# Relationships
user = relationship("User", foreign_keys=[user_id], backref="bookings")
space = relationship("Space", foreign_keys=[space_id], backref="bookings")
approver = relationship("User", foreign_keys=[approved_by], backref="approved_bookings")
notifications = relationship("Notification", back_populates="booking")
attachments = relationship("Attachment", back_populates="booking", cascade="all, delete-orphan")

View File

@@ -0,0 +1,24 @@
"""Booking template model."""
from sqlalchemy import Column, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship
from app.db.session import Base
class BookingTemplate(Base):
"""Booking template model for reusable booking configurations."""
__tablename__ = "booking_templates"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
name = Column(String(200), nullable=False) # Template name
space_id = Column(Integer, ForeignKey("spaces.id"), nullable=True) # Optional default space
duration_minutes = Column(Integer, nullable=False) # Default duration
title = Column(String(200), nullable=False) # Default title
description = Column(Text, nullable=True) # Default description
usage_count = Column(Integer, default=0) # Track usage
# Relationships
user = relationship("User", back_populates="booking_templates")
space = relationship("Space")

View File

@@ -0,0 +1,26 @@
"""Google Calendar Token model."""
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Text
from sqlalchemy.orm import relationship
from app.db.session import Base
class GoogleCalendarToken(Base):
"""Google Calendar OAuth token storage."""
__tablename__ = "google_calendar_tokens"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), unique=True, nullable=False)
access_token = Column(Text, nullable=False)
refresh_token = Column(Text, nullable=False)
token_expiry = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)
# Relationships
user = relationship("User", back_populates="google_calendar_token")

View File

@@ -0,0 +1,26 @@
"""Notification model."""
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship
from app.db.session import Base
class Notification(Base):
"""Notification model for in-app notifications."""
__tablename__ = "notifications"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
type = Column(String(50), nullable=False) # booking_created, booking_approved, etc
title = Column(String(200), nullable=False)
message = Column(Text, nullable=False)
booking_id = Column(Integer, ForeignKey("bookings.id"), nullable=True)
is_read = Column(Boolean, default=False, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
# Relationships
user = relationship("User", back_populates="notifications")
booking = relationship("Booking", back_populates="notifications")

View File

@@ -0,0 +1,18 @@
"""Settings model."""
from sqlalchemy import Column, Integer
from app.db.session import Base
class Settings(Base):
"""Global settings model (singleton - only one row with id=1)."""
__tablename__ = "settings"
id = Column(Integer, primary_key=True, default=1)
min_duration_minutes = Column(Integer, nullable=False, default=30)
max_duration_minutes = Column(Integer, nullable=False, default=480) # 8 hours
working_hours_start = Column(Integer, nullable=False, default=8) # 8 AM
working_hours_end = Column(Integer, nullable=False, default=20) # 8 PM
max_bookings_per_day_per_user = Column(Integer, nullable=False, default=3)
min_hours_before_cancel = Column(Integer, nullable=False, default=2)

View File

@@ -0,0 +1,17 @@
"""Space model."""
from sqlalchemy import Boolean, Column, Integer, String
from app.db.session import Base
class Space(Base):
"""Space model for offices and meeting rooms."""
__tablename__ = "spaces"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False, index=True)
type = Column(String, nullable=False) # "sala" or "birou"
capacity = Column(Integer, nullable=False)
description = Column(String, nullable=True)
is_active = Column(Boolean, default=True, nullable=False)

View File

@@ -0,0 +1,28 @@
"""User model."""
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import relationship
from app.db.session import Base
class User(Base):
"""User model."""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
full_name = Column(String, nullable=False)
hashed_password = Column(String, nullable=False)
role = Column(String, nullable=False, default="user") # "admin" or "user"
organization = Column(String, nullable=True)
is_active = Column(Boolean, default=True, nullable=False)
timezone = Column(String(50), default="UTC", nullable=False) # IANA timezone
# Relationships
notifications = relationship("Notification", back_populates="user")
audit_logs = relationship("AuditLog", back_populates="user")
booking_templates = relationship("BookingTemplate", back_populates="user")
google_calendar_token = relationship(
"GoogleCalendarToken", back_populates="user", uselist=False
)

View File

@@ -0,0 +1 @@
# Schemas module

View File

@@ -0,0 +1,22 @@
"""Attachment schemas."""
from datetime import datetime
from pydantic import BaseModel
class AttachmentRead(BaseModel):
"""Attachment read schema."""
id: int
booking_id: int
filename: str
size: int
content_type: str
uploaded_by: int
uploader_name: str
created_at: datetime
class Config:
"""Pydantic config."""
from_attributes = True

View File

@@ -0,0 +1,21 @@
"""Audit log schemas."""
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict
class AuditLogRead(BaseModel):
"""Schema for reading audit log entries."""
id: int
action: str
user_id: int
user_name: str # From relationship
user_email: str # From relationship
target_type: str
target_id: int
details: Optional[dict[str, Any]] = None
created_at: datetime
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,49 @@
"""Authentication schemas."""
import re
from pydantic import BaseModel, EmailStr, Field, field_validator
class LoginRequest(BaseModel):
"""Login request schema."""
email: EmailStr
password: str
class UserRegister(BaseModel):
"""User registration schema."""
email: EmailStr
password: str = Field(..., min_length=8)
confirm_password: str
full_name: str = Field(..., min_length=2, max_length=200)
organization: str = Field(..., min_length=2, max_length=200)
@field_validator("password")
@classmethod
def validate_password(cls, v: str) -> str:
"""Validate password strength."""
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r"[a-z]", v):
raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r"[0-9]", v):
raise ValueError("Password must contain at least one digit")
return v
@field_validator("confirm_password")
@classmethod
def passwords_match(cls, v: str, info) -> str:
"""Ensure passwords match."""
if "password" in info.data and v != info.data["password"]:
raise ValueError("Passwords do not match")
return v
class EmailVerificationRequest(BaseModel):
"""Email verification request schema."""
token: str

View File

@@ -0,0 +1,244 @@
"""Booking schemas for request/response."""
from datetime import datetime, date
from typing import Optional
from pydantic import BaseModel, Field, field_validator
class BookingCalendarPublic(BaseModel):
"""Public booking data for regular users (calendar view)."""
id: int
start_datetime: datetime
end_datetime: datetime
status: str
title: str
model_config = {"from_attributes": True}
class BookingCalendarAdmin(BaseModel):
"""Full booking data for admins (calendar view)."""
id: int
user_id: int
space_id: int
start_datetime: datetime
end_datetime: datetime
status: str
title: str
description: str | None
rejection_reason: str | None
cancellation_reason: str | None
approved_by: int | None
created_at: datetime
model_config = {"from_attributes": True}
class BookingCreate(BaseModel):
"""Schema for creating a new booking."""
space_id: int
start_datetime: datetime
end_datetime: datetime
title: str = Field(..., min_length=1, max_length=200)
description: str | None = None
class BookingResponse(BaseModel):
"""Schema for booking response after creation."""
id: int
user_id: int
space_id: int
start_datetime: datetime
end_datetime: datetime
status: str
title: str
description: str | None
created_at: datetime
# Timezone-aware formatted strings (optional, set by endpoint)
start_datetime_tz: Optional[str] = None
end_datetime_tz: Optional[str] = None
model_config = {"from_attributes": True}
@classmethod
def from_booking_with_timezone(cls, booking, user_timezone: str = "UTC"):
"""Create response with timezone conversion."""
from app.utils.timezone import format_datetime_tz
return cls(
id=booking.id,
user_id=booking.user_id,
space_id=booking.space_id,
start_datetime=booking.start_datetime,
end_datetime=booking.end_datetime,
status=booking.status,
title=booking.title,
description=booking.description,
created_at=booking.created_at,
start_datetime_tz=format_datetime_tz(booking.start_datetime, user_timezone),
end_datetime_tz=format_datetime_tz(booking.end_datetime, user_timezone)
)
class SpaceInBooking(BaseModel):
"""Space details embedded in booking response."""
id: int
name: str
type: str
model_config = {"from_attributes": True}
class BookingWithSpace(BaseModel):
"""Booking with associated space details for user's booking list."""
id: int
space_id: int
space: SpaceInBooking
start_datetime: datetime
end_datetime: datetime
status: str
title: str
description: str | None
created_at: datetime
model_config = {"from_attributes": True}
class UserInBooking(BaseModel):
"""User details embedded in booking response."""
id: int
full_name: str
email: str
organization: str | None
model_config = {"from_attributes": True}
class BookingPendingDetail(BaseModel):
"""Detailed booking information for admin pending list."""
id: int
space_id: int
space: SpaceInBooking
user_id: int
user: UserInBooking
start_datetime: datetime
end_datetime: datetime
status: str
title: str
description: str | None
created_at: datetime
model_config = {"from_attributes": True}
class RejectRequest(BaseModel):
"""Schema for rejecting a booking."""
reason: str | None = None
class BookingAdminCreate(BaseModel):
"""Schema for admin to create a booking directly (bypass approval)."""
space_id: int
user_id: int | None = None
start_datetime: datetime
end_datetime: datetime
title: str = Field(..., min_length=1, max_length=200)
description: str | None = None
class AdminCancelRequest(BaseModel):
"""Schema for admin cancelling a booking."""
cancellation_reason: str | None = None
class BookingUpdate(BaseModel):
"""Schema for updating a booking."""
title: str | None = None
description: str | None = None
start_datetime: datetime | None = None
end_datetime: datetime | None = None
class ConflictingBooking(BaseModel):
"""Schema for a conflicting booking in availability check."""
id: int
user_name: str
title: str
status: str
start_datetime: datetime
end_datetime: datetime
model_config = {"from_attributes": True}
class AvailabilityCheck(BaseModel):
"""Schema for availability check response."""
available: bool
conflicts: list[ConflictingBooking]
message: str
class BookingRecurringCreate(BaseModel):
"""Schema for creating recurring weekly bookings."""
space_id: int
start_time: str # Time only (e.g., "10:00")
duration_minutes: int
title: str = Field(..., min_length=1, max_length=200)
description: str | None = None
recurrence_days: list[int] = Field(..., min_length=1, max_length=7) # 0=Monday, 6=Sunday
start_date: date # First occurrence date
end_date: date # Last occurrence date
skip_conflicts: bool = True # Skip conflicted dates or stop
@field_validator('recurrence_days')
@classmethod
def validate_days(cls, v: list[int]) -> list[int]:
"""Validate recurrence days are valid weekdays."""
if not all(0 <= day <= 6 for day in v):
raise ValueError('Days must be 0-6 (0=Monday, 6=Sunday)')
return sorted(list(set(v))) # Remove duplicates and sort
@field_validator('end_date')
@classmethod
def validate_date_range(cls, v: date, info) -> date:
"""Validate end date is after start date and within 1 year."""
if 'start_date' in info.data and v < info.data['start_date']:
raise ValueError('end_date must be after start_date')
# Max 1 year
if 'start_date' in info.data and (v - info.data['start_date']).days > 365:
raise ValueError('Recurrence period cannot exceed 1 year')
return v
class RecurringBookingResult(BaseModel):
"""Schema for recurring booking creation result."""
total_requested: int
total_created: int
total_skipped: int
created_bookings: list[BookingResponse]
skipped_dates: list[dict] # [{"date": "2024-01-01", "reason": "..."}, ...]
class BookingReschedule(BaseModel):
"""Schema for rescheduling a booking (drag-and-drop)."""
start_datetime: datetime
end_datetime: datetime

View File

@@ -0,0 +1,28 @@
"""Booking template schemas for request/response."""
from pydantic import BaseModel, Field
class BookingTemplateCreate(BaseModel):
"""Schema for creating a new booking template."""
name: str = Field(..., min_length=1, max_length=200)
space_id: int | None = None
duration_minutes: int = Field(..., gt=0)
title: str = Field(..., min_length=1, max_length=200)
description: str | None = None
class BookingTemplateRead(BaseModel):
"""Schema for reading booking template data."""
id: int
user_id: int
name: str
space_id: int | None
space_name: str | None # From relationship
duration_minutes: int
title: str
description: str | None
usage_count: int
model_config = {"from_attributes": True}

View File

@@ -0,0 +1,23 @@
"""Notification schemas."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class NotificationRead(BaseModel):
"""Schema for reading notifications."""
id: int
user_id: int
type: str
title: str
message: str
booking_id: Optional[int]
is_read: bool
created_at: datetime
class Config:
"""Pydantic config."""
from_attributes = True

View File

@@ -0,0 +1,64 @@
"""Report schemas."""
from datetime import date
from typing import Any
from pydantic import BaseModel
class DateRangeFilter(BaseModel):
"""Date range filter for reports."""
start_date: date | None = None
end_date: date | None = None
class SpaceUsageItem(BaseModel):
"""Space usage report item."""
space_id: int
space_name: str
total_bookings: int
approved_bookings: int
pending_bookings: int
rejected_bookings: int
canceled_bookings: int
total_hours: float
class SpaceUsageReport(BaseModel):
"""Space usage report."""
items: list[SpaceUsageItem]
total_bookings: int
date_range: dict[str, Any]
class TopUserItem(BaseModel):
"""Top user report item."""
user_id: int
user_name: str
user_email: str
total_bookings: int
approved_bookings: int
total_hours: float
class TopUsersReport(BaseModel):
"""Top users report."""
items: list[TopUserItem]
date_range: dict[str, Any]
class ApprovalRateReport(BaseModel):
"""Approval rate report."""
total_requests: int
approved: int
rejected: int
pending: int
canceled: int
approval_rate: float
rejection_rate: float
date_range: dict[str, Any]

View File

@@ -0,0 +1,30 @@
"""Settings schemas."""
from pydantic import BaseModel, Field
class SettingsBase(BaseModel):
"""Base settings schema."""
min_duration_minutes: int = Field(ge=15, le=480, default=30)
max_duration_minutes: int = Field(ge=30, le=1440, default=480)
working_hours_start: int = Field(ge=0, le=23, default=8)
working_hours_end: int = Field(ge=1, le=24, default=20)
max_bookings_per_day_per_user: int = Field(ge=1, le=20, default=3)
min_hours_before_cancel: int = Field(ge=0, le=72, default=2)
class SettingsUpdate(SettingsBase):
"""Settings update schema."""
pass
class SettingsResponse(SettingsBase):
"""Settings response schema."""
id: int
class Config:
"""Pydantic config."""
from_attributes = True

View File

@@ -0,0 +1,38 @@
"""Space schemas for request/response."""
from pydantic import BaseModel, Field
class SpaceBase(BaseModel):
"""Base space schema."""
name: str = Field(..., min_length=1)
type: str = Field(..., pattern="^(sala|birou)$")
capacity: int = Field(..., gt=0)
description: str | None = None
class SpaceCreate(SpaceBase):
"""Space creation schema."""
pass
class SpaceUpdate(SpaceBase):
"""Space update schema."""
pass
class SpaceStatusUpdate(BaseModel):
"""Space status update schema."""
is_active: bool
class SpaceResponse(SpaceBase):
"""Space response schema."""
id: int
is_active: bool
model_config = {"from_attributes": True}

View File

@@ -0,0 +1,62 @@
"""User schemas for request/response."""
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
"""Base user schema."""
email: EmailStr
full_name: str
organization: str | None = None
class UserCreate(UserBase):
"""User creation schema."""
password: str
role: str = "user"
class UserResponse(UserBase):
"""User response schema."""
id: int
role: str
is_active: bool
timezone: str = "UTC"
model_config = {"from_attributes": True}
class Token(BaseModel):
"""Token response schema."""
access_token: str
token_type: str
class UserUpdate(BaseModel):
"""User update schema."""
email: EmailStr | None = None
full_name: str | None = None
role: str | None = None
organization: str | None = None
class UserStatusUpdate(BaseModel):
"""User status update schema."""
is_active: bool
class ResetPasswordRequest(BaseModel):
"""Reset password request schema."""
new_password: str
class TokenData(BaseModel):
"""Token data schema."""
user_id: int | None = None

View File

@@ -0,0 +1,41 @@
"""Audit service for logging admin actions."""
from typing import Any, Dict, Optional
from sqlalchemy.orm import Session
from app.models.audit_log import AuditLog
def log_action(
db: Session,
action: str,
user_id: int,
target_type: str,
target_id: int,
details: Optional[Dict[str, Any]] = None
) -> AuditLog:
"""
Log an admin action to the audit log.
Args:
db: Database session
action: Action performed (e.g., 'booking_approved', 'space_created')
user_id: ID of the admin user who performed the action
target_type: Type of target entity ('booking', 'space', 'user', 'settings')
target_id: ID of the target entity
details: Optional dictionary with additional information
Returns:
Created AuditLog instance
"""
audit_log = AuditLog(
action=action,
user_id=user_id,
target_type=target_type,
target_id=target_id,
details=details or {}
)
db.add(audit_log)
db.commit()
db.refresh(audit_log)
return audit_log

View File

@@ -0,0 +1,112 @@
"""Booking validation service."""
from datetime import datetime
from sqlalchemy import and_
from sqlalchemy.orm import Session
from app.models.booking import Booking
from app.models.settings import Settings
def validate_booking_rules(
db: Session,
space_id: int,
user_id: int,
start_datetime: datetime,
end_datetime: datetime,
exclude_booking_id: int | None = None,
) -> list[str]:
"""
Validate booking against global settings rules.
Args:
db: Database session
space_id: ID of the space to book
user_id: ID of the user making the booking
start_datetime: Booking start time
end_datetime: Booking end time
exclude_booking_id: Optional booking ID to exclude from overlap check
(used when re-validating an existing booking)
Returns:
List of validation error messages (empty list = validation OK)
"""
errors = []
# Fetch settings (create default if not exists)
settings = db.query(Settings).filter(Settings.id == 1).first()
if not settings:
settings = Settings(
id=1,
min_duration_minutes=30,
max_duration_minutes=480,
working_hours_start=8,
working_hours_end=20,
max_bookings_per_day_per_user=3,
min_hours_before_cancel=2,
)
db.add(settings)
db.commit()
db.refresh(settings)
# a) Validate duration in range
duration_minutes = (end_datetime - start_datetime).total_seconds() / 60
if (
duration_minutes < settings.min_duration_minutes
or duration_minutes > settings.max_duration_minutes
):
errors.append(
f"Durata rezervării trebuie să fie între {settings.min_duration_minutes} "
f"și {settings.max_duration_minutes} minute"
)
# b) Validate working hours
if (
start_datetime.hour < settings.working_hours_start
or end_datetime.hour > settings.working_hours_end
):
errors.append(
f"Rezervările sunt permise doar între {settings.working_hours_start}:00 "
f"și {settings.working_hours_end}:00"
)
# c) Check for overlapping bookings
query = db.query(Booking).filter(
Booking.space_id == space_id,
Booking.status.in_(["approved", "pending"]),
and_(
Booking.start_datetime < end_datetime,
Booking.end_datetime > start_datetime,
),
)
# Exclude current booking if re-validating
if exclude_booking_id is not None:
query = query.filter(Booking.id != exclude_booking_id)
overlapping_bookings = query.first()
if overlapping_bookings:
errors.append("Spațiul este deja rezervat în acest interval")
# d) Check max bookings per day per user
booking_date = start_datetime.date()
start_of_day = datetime.combine(booking_date, datetime.min.time())
end_of_day = datetime.combine(booking_date, datetime.max.time())
user_bookings_count = (
db.query(Booking)
.filter(
Booking.user_id == user_id,
Booking.status.in_(["approved", "pending"]),
Booking.start_datetime >= start_of_day,
Booking.start_datetime <= end_of_day,
)
.count()
)
if user_bookings_count >= settings.max_bookings_per_day_per_user:
errors.append(
f"Ai atins limita de {settings.max_bookings_per_day_per_user} rezervări pe zi"
)
return errors

View File

@@ -0,0 +1,153 @@
"""Email service for sending booking notifications."""
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Optional
import aiosmtplib
from app.core.config import settings
from app.models.booking import Booking
logger = logging.getLogger(__name__)
async def send_email(to: str, subject: str, body: str) -> bool:
"""Send email via SMTP. Returns True if successful."""
if not settings.smtp_enabled:
# Development mode: just log the email
logger.info(f"[EMAIL] To: {to}")
logger.info(f"[EMAIL] Subject: {subject}")
logger.info(f"[EMAIL] Body:\n{body}")
print(f"\n--- EMAIL ---")
print(f"To: {to}")
print(f"Subject: {subject}")
print(f"Body:\n{body}")
print(f"--- END EMAIL ---\n")
return True
try:
message = MIMEMultipart()
message["From"] = settings.smtp_from_address
message["To"] = to
message["Subject"] = subject
message.attach(MIMEText(body, "plain", "utf-8"))
await aiosmtplib.send(
message,
hostname=settings.smtp_host,
port=settings.smtp_port,
username=settings.smtp_user if settings.smtp_user else None,
password=settings.smtp_password if settings.smtp_password else None,
)
return True
except Exception as e:
logger.error(f"Failed to send email to {to}: {e}")
return False
def generate_booking_email(
booking: Booking,
event_type: str,
user_email: str,
user_name: str,
extra_data: Optional[dict] = None,
) -> tuple[str, str]:
"""Generate email subject and body for booking events.
Returns: (subject, body)
"""
extra_data = extra_data or {}
space_name = booking.space.name if booking.space else "Unknown Space"
start_str = booking.start_datetime.strftime("%d.%m.%Y %H:%M")
end_str = booking.end_datetime.strftime("%H:%M")
if event_type == "created":
subject = "Cerere Nouă de Rezervare"
body = f"""Bună ziua,
O nouă cerere de rezervare necesită aprobarea dumneavoastră:
Utilizator: {user_name}
Spațiu: {space_name}
Data și ora: {start_str} - {end_str}
Titlu: {booking.title}
Descriere: {booking.description or 'N/A'}
Vă rugăm să accesați panoul de administrare pentru a aproba sau respinge această cerere.
Cu stimă,
Sistemul de Rezervări
"""
elif event_type == "approved":
subject = "Rezervare Aprobată"
body = f"""Bună ziua {user_name},
Rezervarea dumneavoastră a fost aprobată:
Spațiu: {space_name}
Data și ora: {start_str} - {end_str}
Titlu: {booking.title}
Vă așteptăm!
Cu stimă,
Sistemul de Rezervări
"""
elif event_type == "rejected":
reason = extra_data.get("rejection_reason", "Nu a fost specificat")
subject = "Rezervare Respinsă"
body = f"""Bună ziua {user_name},
Rezervarea dumneavoastră a fost respinsă:
Spațiu: {space_name}
Data și ora: {start_str} - {end_str}
Titlu: {booking.title}
Motiv: {reason}
Vă rugăm să contactați administratorul pentru detalii.
Cu stimă,
Sistemul de Rezervări
"""
elif event_type == "canceled":
reason = extra_data.get("cancellation_reason", "Nu a fost specificat")
subject = "Rezervare Anulată"
body = f"""Bună ziua {user_name},
Rezervarea dumneavoastră a fost anulată de către administrator:
Spațiu: {space_name}
Data și ora: {start_str} - {end_str}
Titlu: {booking.title}
Motiv: {reason}
Vă rugăm să contactați administratorul pentru detalii.
Cu stimă,
Sistemul de Rezervări
"""
else:
subject = "Notificare Rezervare"
body = f"Notificare despre rezervarea pentru {space_name} din {start_str}"
return subject, body
async def send_booking_notification(
booking: Booking,
event_type: str,
user_email: str,
user_name: str,
extra_data: Optional[dict] = None,
) -> bool:
"""Send booking notification email."""
subject, body = generate_booking_email(
booking, event_type, user_email, user_name, extra_data
)
return await send_email(user_email, subject, body)

View File

@@ -0,0 +1,173 @@
"""Google Calendar integration service."""
from datetime import datetime
from typing import Optional
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.booking import Booking
from app.models.google_calendar_token import GoogleCalendarToken
def get_google_calendar_service(db: Session, user_id: int):
"""
Get authenticated Google Calendar service for user.
Args:
db: Database session
user_id: User ID
Returns:
Google Calendar service object or None if not connected
"""
token_record = (
db.query(GoogleCalendarToken)
.filter(GoogleCalendarToken.user_id == user_id)
.first()
)
if not token_record:
return None
# Create credentials
creds = Credentials(
token=token_record.access_token,
refresh_token=token_record.refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=settings.google_client_id,
client_secret=settings.google_client_secret,
)
# Refresh if expired
if creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
# Update tokens in DB
token_record.access_token = creds.token # type: ignore[assignment]
if creds.expiry:
token_record.token_expiry = creds.expiry # type: ignore[assignment]
db.commit()
except Exception as e:
print(f"Failed to refresh Google token: {e}")
return None
# Build service
try:
service = build("calendar", "v3", credentials=creds)
return service
except Exception as e:
print(f"Failed to build Google Calendar service: {e}")
return None
def create_calendar_event(
db: Session, booking: Booking, user_id: int
) -> Optional[str]:
"""
Create Google Calendar event for booking.
Args:
db: Database session
booking: Booking object
user_id: User ID
Returns:
Google Calendar event ID or None if failed
"""
try:
service = get_google_calendar_service(db, user_id)
if not service:
return None
# Create event
event = {
"summary": f"{booking.space.name}: {booking.title}",
"description": booking.description or "",
"start": {
"dateTime": booking.start_datetime.isoformat(), # type: ignore[union-attr]
"timeZone": "UTC",
},
"end": {
"dateTime": booking.end_datetime.isoformat(), # type: ignore[union-attr]
"timeZone": "UTC",
},
}
created_event = service.events().insert(calendarId="primary", body=event).execute()
return created_event.get("id")
except Exception as e:
print(f"Failed to create Google Calendar event: {e}")
return None
def update_calendar_event(
db: Session, booking: Booking, user_id: int, event_id: str
) -> bool:
"""
Update Google Calendar event for booking.
Args:
db: Database session
booking: Booking object
user_id: User ID
event_id: Google Calendar event ID
Returns:
True if successful, False otherwise
"""
try:
service = get_google_calendar_service(db, user_id)
if not service:
return False
# Update event
event = {
"summary": f"{booking.space.name}: {booking.title}",
"description": booking.description or "",
"start": {
"dateTime": booking.start_datetime.isoformat(), # type: ignore[union-attr]
"timeZone": "UTC",
},
"end": {
"dateTime": booking.end_datetime.isoformat(), # type: ignore[union-attr]
"timeZone": "UTC",
},
}
service.events().update(
calendarId="primary", eventId=event_id, body=event
).execute()
return True
except Exception as e:
print(f"Failed to update Google Calendar event: {e}")
return False
def delete_calendar_event(db: Session, event_id: str, user_id: int) -> bool:
"""
Delete Google Calendar event.
Args:
db: Database session
event_id: Google Calendar event ID
user_id: User ID
Returns:
True if successful, False otherwise
"""
try:
service = get_google_calendar_service(db, user_id)
if not service:
return False
service.events().delete(calendarId="primary", eventId=event_id).execute()
return True
except Exception as e:
print(f"Failed to delete Google Calendar event: {e}")
return False

View File

@@ -0,0 +1,41 @@
"""Notification service."""
from typing import Optional
from sqlalchemy.orm import Session
from app.models.notification import Notification
def create_notification(
db: Session,
user_id: int,
type: str,
title: str,
message: str,
booking_id: Optional[int] = None,
) -> Notification:
"""
Create a new notification.
Args:
db: Database session
user_id: ID of the user to notify
type: Notification type (e.g., 'booking_created', 'booking_approved')
title: Notification title
message: Notification message
booking_id: Optional booking ID this notification relates to
Returns:
Created notification object
"""
notification = Notification(
user_id=user_id,
type=type,
title=title,
message=message,
booking_id=booking_id,
)
db.add(notification)
db.commit()
db.refresh(notification)
return notification

View File

@@ -0,0 +1 @@
"""Utilities module."""

View File

@@ -0,0 +1,79 @@
"""Timezone utilities for converting between UTC and user timezones."""
from datetime import datetime
from typing import Optional
import pytz
from dateutil import parser
def convert_to_utc(dt: datetime, from_timezone: str = "UTC") -> datetime:
"""Convert datetime from user timezone to UTC.
Args:
dt: Datetime to convert (naive or aware)
from_timezone: IANA timezone name (e.g., "Europe/Bucharest")
Returns:
Naive datetime in UTC
"""
if dt.tzinfo is None:
# Naive datetime, assume it's in user timezone
tz = pytz.timezone(from_timezone)
dt = tz.localize(dt)
# Convert to UTC
return dt.astimezone(pytz.UTC).replace(tzinfo=None)
def convert_from_utc(dt: datetime, to_timezone: str = "UTC") -> datetime:
"""Convert datetime from UTC to user timezone.
Args:
dt: Datetime in UTC (naive or aware)
to_timezone: IANA timezone name (e.g., "Europe/Bucharest")
Returns:
Timezone-aware datetime in target timezone
"""
if dt.tzinfo is None:
# Add UTC timezone if naive
dt = pytz.UTC.localize(dt)
# Convert to target timezone
tz = pytz.timezone(to_timezone)
return dt.astimezone(tz)
def format_datetime_tz(dt: datetime, timezone: str = "UTC", format_str: str = "%Y-%m-%d %H:%M %Z") -> str:
"""Format datetime with timezone abbreviation.
Args:
dt: Datetime in UTC (naive or aware)
timezone: IANA timezone name for display
format_str: Format string (default includes timezone abbreviation)
Returns:
Formatted datetime string with timezone
"""
dt_tz = convert_from_utc(dt, timezone)
return dt_tz.strftime(format_str)
def get_available_timezones():
"""Get list of common timezones for user selection."""
common_timezones = [
"UTC",
"Europe/Bucharest",
"Europe/London",
"Europe/Paris",
"Europe/Berlin",
"Europe/Amsterdam",
"America/New_York",
"America/Los_Angeles",
"America/Chicago",
"America/Denver",
"Asia/Tokyo",
"Asia/Shanghai",
"Asia/Dubai",
"Australia/Sydney",
]
return common_timezones