feat(backend): FastAPI + libSQL + auth register/login/me + tests (TDD)

- FastAPI app with lifespan, CORS, health endpoint
- SQLAlchemy 2.0 async with aiosqlite, Base/UUIDMixin/TenantMixin/TimestampMixin
- Tenant and User models with multi-tenant isolation
- Auth: register (creates tenant+user), login, /me endpoint
- JWT HS256 tokens, bcrypt password hashing
- Alembic async setup with initial migration
- 6 passing tests (register, login, wrong password, me, no token, health)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-13 17:26:31 +02:00
parent c3482bba8d
commit 907b7be0fd
24 changed files with 623 additions and 0 deletions

View File

View File

@@ -0,0 +1,58 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth import schemas, service
from app.db.session import get_db
from app.deps import get_current_user
router = APIRouter()
@router.post("/register", response_model=schemas.TokenResponse)
async def register(
data: schemas.RegisterRequest, db: AsyncSession = Depends(get_db)
):
user, tenant = await service.register(
db, data.email, data.password, data.tenant_name, data.telefon
)
return schemas.TokenResponse(
access_token=service.create_token(user.id, tenant.id, tenant.plan),
tenant_id=tenant.id,
plan=tenant.plan,
)
@router.post("/login", response_model=schemas.TokenResponse)
async def login(data: schemas.LoginRequest, db: AsyncSession = Depends(get_db)):
user, tenant = await service.authenticate(db, data.email, data.password)
if not user:
raise HTTPException(status_code=401, detail="Credentiale invalide")
return schemas.TokenResponse(
access_token=service.create_token(user.id, tenant.id, tenant.plan),
tenant_id=tenant.id,
plan=tenant.plan,
)
@router.get("/me", response_model=schemas.UserResponse)
async def me(
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
from sqlalchemy import select
from app.db.models.user import User
from app.db.models.tenant import Tenant
r = await db.execute(select(User).where(User.id == current_user["sub"]))
user = r.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
r = await db.execute(select(Tenant).where(Tenant.id == user.tenant_id))
tenant = r.scalar_one()
return schemas.UserResponse(
id=user.id,
email=user.email,
tenant_id=user.tenant_id,
plan=tenant.plan,
rol=user.rol,
)

View File

@@ -0,0 +1,28 @@
from pydantic import BaseModel, EmailStr
class RegisterRequest(BaseModel):
email: EmailStr
password: str
tenant_name: str
telefon: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
tenant_id: str
plan: str
class UserResponse(BaseModel):
id: str
email: str
tenant_id: str
plan: str
rol: str

View File

@@ -0,0 +1,62 @@
from datetime import UTC, datetime, timedelta
from jose import jwt
import bcrypt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.db.base import uuid7
from app.db.models.tenant import Tenant
from app.db.models.user import User
def hash_password(p: str) -> str:
return bcrypt.hashpw(p.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
def create_token(user_id: str, tenant_id: str, plan: str) -> str:
exp = datetime.now(UTC) + timedelta(days=settings.ACCESS_TOKEN_EXPIRE_DAYS)
return jwt.encode(
{"sub": user_id, "tenant_id": tenant_id, "plan": plan, "exp": exp},
settings.SECRET_KEY,
algorithm="HS256",
)
async def register(
db: AsyncSession, email: str, password: str, tenant_name: str, telefon: str
):
trial_exp = (datetime.now(UTC) + timedelta(days=settings.TRIAL_DAYS)).isoformat()
tenant = Tenant(
id=uuid7(),
nume=tenant_name,
telefon=telefon,
plan="trial",
trial_expires_at=trial_exp,
)
db.add(tenant)
user = User(
id=uuid7(),
tenant_id=tenant.id,
email=email,
password_hash=hash_password(password),
nume=email.split("@")[0],
rol="owner",
)
db.add(user)
await db.commit()
return user, tenant
async def authenticate(db: AsyncSession, email: str, password: str):
r = await db.execute(select(User).where(User.email == email))
user = r.scalar_one_or_none()
if not user or not verify_password(password, user.password_hash):
return None, None
r = await db.execute(select(Tenant).where(Tenant.id == user.tenant_id))
return user, r.scalar_one_or_none()