Files
roaauto/backend/app/auth/service.py
Marius Mutu 907b7be0fd feat(backend): FastAPI + libSQL + auth register/login/me + tests (TDD)
- FastAPI app with lifespan, CORS, health endpoint
- SQLAlchemy 2.0 async with aiosqlite, Base/UUIDMixin/TenantMixin/TimestampMixin
- Tenant and User models with multi-tenant isolation
- Auth: register (creates tenant+user), login, /me endpoint
- JWT HS256 tokens, bcrypt password hashing
- Alembic async setup with initial migration
- 6 passing tests (register, login, wrong password, me, no token, health)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 17:26:31 +02:00

63 lines
1.8 KiB
Python

from datetime import UTC, datetime, timedelta
from jose import jwt
import bcrypt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.db.base import uuid7
from app.db.models.tenant import Tenant
from app.db.models.user import User
def hash_password(p: str) -> str:
return bcrypt.hashpw(p.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
def create_token(user_id: str, tenant_id: str, plan: str) -> str:
exp = datetime.now(UTC) + timedelta(days=settings.ACCESS_TOKEN_EXPIRE_DAYS)
return jwt.encode(
{"sub": user_id, "tenant_id": tenant_id, "plan": plan, "exp": exp},
settings.SECRET_KEY,
algorithm="HS256",
)
async def register(
db: AsyncSession, email: str, password: str, tenant_name: str, telefon: str
):
trial_exp = (datetime.now(UTC) + timedelta(days=settings.TRIAL_DAYS)).isoformat()
tenant = Tenant(
id=uuid7(),
nume=tenant_name,
telefon=telefon,
plan="trial",
trial_expires_at=trial_exp,
)
db.add(tenant)
user = User(
id=uuid7(),
tenant_id=tenant.id,
email=email,
password_hash=hash_password(password),
nume=email.split("@")[0],
rol="owner",
)
db.add(user)
await db.commit()
return user, tenant
async def authenticate(db: AsyncSession, email: str, password: str):
r = await db.execute(select(User).where(User.email == email))
user = r.scalar_one_or_none()
if not user or not verify_password(password, user.password_hash):
return None, None
r = await db.execute(select(Tenant).where(Tenant.id == user.tenant_id))
return user, r.scalar_one_or_none()