Files
roaauto/backend/app/users/service.py
Marius Mutu 8c0346e41f feat(backend): invite system + user management
- InviteToken model with unique token for each invite
- POST /users/invite - create invite by email with role (admin/mecanic)
- POST /auth/accept-invite - accept invite, set password, return JWT
- GET /users - list all users in tenant
- DELETE /users/{id} - deactivate user (cannot deactivate owner)
- Alembic migration for invites table
- 25 passing tests (auth + sync + orders + pdf + portal + invoices + users)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 17:37:06 +02:00

99 lines
2.5 KiB
Python

from datetime import UTC, datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.service import hash_password
from app.db.base import uuid7
from app.db.models.invite import InviteToken
from app.db.models.user import User
async def invite_user(
db: AsyncSession, tenant_id: str, email: str, rol: str
) -> InviteToken:
# Check if user already exists in this tenant
r = await db.execute(
select(User).where(User.email == email, User.tenant_id == tenant_id)
)
if r.scalar_one_or_none():
raise ValueError("User already exists in this tenant")
token = uuid7()
invite = InviteToken(
id=uuid7(),
tenant_id=tenant_id,
email=email,
rol=rol,
token=token,
)
db.add(invite)
await db.commit()
await db.refresh(invite)
return invite
async def accept_invite(
db: AsyncSession, token: str, password: str
) -> User:
r = await db.execute(
select(InviteToken).where(
InviteToken.token == token, InviteToken.used == None
)
)
invite = r.scalar_one_or_none()
if not invite:
raise ValueError("Invalid or already used invite token")
# Check if email already registered
r = await db.execute(select(User).where(User.email == invite.email))
if r.scalar_one_or_none():
raise ValueError("Email already registered")
user = User(
id=uuid7(),
tenant_id=invite.tenant_id,
email=invite.email,
password_hash=hash_password(password),
nume=invite.email.split("@")[0],
rol=invite.rol,
)
db.add(user)
invite.used = datetime.now(UTC).isoformat()
await db.commit()
await db.refresh(user)
return user
async def list_users(db: AsyncSession, tenant_id: str) -> list:
r = await db.execute(
select(User).where(User.tenant_id == tenant_id)
)
users = r.scalars().all()
return [
{
"id": u.id,
"email": u.email,
"rol": u.rol,
"activ": u.activ,
}
for u in users
]
async def deactivate_user(
db: AsyncSession, tenant_id: str, user_id: str
) -> bool:
r = await db.execute(
select(User).where(User.id == user_id, User.tenant_id == tenant_id)
)
user = r.scalar_one_or_none()
if not user:
raise ValueError("User not found")
if user.rol == "owner":
raise ValueError("Cannot deactivate owner")
user.activ = False
await db.commit()
return True