- FastAPI app with lifespan, CORS, health endpoint - SQLAlchemy 2.0 async with aiosqlite, Base/UUIDMixin/TenantMixin/TimestampMixin - Tenant and User models with multi-tenant isolation - Auth: register (creates tenant+user), login, /me endpoint - JWT HS256 tokens, bcrypt password hashing - Alembic async setup with initial migration - 6 passing tests (register, login, wrong password, me, no token, health) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
from datetime import UTC, datetime, timedelta
|
|
|
|
from jose import jwt
|
|
import bcrypt
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import settings
|
|
from app.db.base import uuid7
|
|
from app.db.models.tenant import Tenant
|
|
from app.db.models.user import User
|
|
|
|
|
|
def hash_password(p: str) -> str:
|
|
return bcrypt.hashpw(p.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return bcrypt.checkpw(plain.encode(), hashed.encode())
|
|
|
|
|
|
def create_token(user_id: str, tenant_id: str, plan: str) -> str:
|
|
exp = datetime.now(UTC) + timedelta(days=settings.ACCESS_TOKEN_EXPIRE_DAYS)
|
|
return jwt.encode(
|
|
{"sub": user_id, "tenant_id": tenant_id, "plan": plan, "exp": exp},
|
|
settings.SECRET_KEY,
|
|
algorithm="HS256",
|
|
)
|
|
|
|
|
|
async def register(
|
|
db: AsyncSession, email: str, password: str, tenant_name: str, telefon: str
|
|
):
|
|
trial_exp = (datetime.now(UTC) + timedelta(days=settings.TRIAL_DAYS)).isoformat()
|
|
tenant = Tenant(
|
|
id=uuid7(),
|
|
nume=tenant_name,
|
|
telefon=telefon,
|
|
plan="trial",
|
|
trial_expires_at=trial_exp,
|
|
)
|
|
db.add(tenant)
|
|
user = User(
|
|
id=uuid7(),
|
|
tenant_id=tenant.id,
|
|
email=email,
|
|
password_hash=hash_password(password),
|
|
nume=email.split("@")[0],
|
|
rol="owner",
|
|
)
|
|
db.add(user)
|
|
await db.commit()
|
|
return user, tenant
|
|
|
|
|
|
async def authenticate(db: AsyncSession, email: str, password: str):
|
|
r = await db.execute(select(User).where(User.email == email))
|
|
user = r.scalar_one_or_none()
|
|
if not user or not verify_password(password, user.password_hash):
|
|
return None, None
|
|
r = await db.execute(select(Tenant).where(Tenant.id == user.tenant_id))
|
|
return user, r.scalar_one_or_none()
|