Files
rar-autopass/app/web/auth_routes.py
Claude Agent b26dbb79e1 feat(5.12): modal editare + cont obligatoriu la import; design.md + PRD 5.13 revizuit (/autoplan)
5.12 (livrat): editare in modal a randurilor de preview, cont obligatoriu inainte de
import, formular editare extras (_form_editare, _editare_preview_modal), plus suita de
teste aferenta (preview edit/compact, mapare op, form editare, signup, admin panel).

Design + planificare:
- docs/design.md: sistem de design (tokeni, breakpoints, scara control, componente, a11y).
- docs/prd/prd-5.12-* si prd-5.13-* (5.13 cu raport /autoplan: CEO+Design+Eng, audit trail).

Curatare: sterse PNG-urile de test/mockup temporare din radacina.

Nota: implementarea CSS 5.13 (responsive compact + sistem butoane) NU e inca facuta —
planul revizuit cere refactorul testelor fragile din test_web_responsive.py INAINTE de CSS.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:52:20 +00:00

214 lines
7.7 KiB
Python

"""Rute autentificare web: /signup, /login, /logout."""
from __future__ import annotations
from pathlib import Path
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from .. import __version__
from ..accounts import create_account
from ..auth import create_api_key
from ..config import get_settings
from ..db import get_connection
from ..email import notify_signup
from ..users import count_admins, create_user, list_admin_emails, verify_password
from ..web.csrf import get_csrf_token, verify_csrf
from ..web.ratelimit import check_rate_limit
from ..web.session import clear_session, set_session
router = APIRouter()
_TMPL = Jinja2Templates(directory=str(Path(__file__).resolve().parent / "templates"))
_RATE_MSG = "Prea multe cereri. Incearca mai tarziu."
_PASSWORD_MIN = 10
def _ctx(request: Request, **extra) -> dict:
settings = get_settings()
return {"rar_env": settings.rar_env, "version": __version__, **extra}
# --- Signup ---
@router.get("/signup", response_class=HTMLResponse)
async def signup_get(request: Request):
return _TMPL.TemplateResponse(request, "signup.html", _ctx(
request, csrf_token=get_csrf_token(request)
))
@router.post("/signup", response_class=HTMLResponse)
async def signup_post(
request: Request,
name: str = Form(default=""),
cui: str = Form(default=""),
email: str = Form(default=""),
parola: str = Form(default=""),
csrf_token: str = Form(default=""),
):
verify_csrf(request, csrf_token)
settings = get_settings()
ip = request.client.host if request.client else "unknown"
if not check_rate_limit(ip, settings.signup_rate_max, settings.signup_rate_window_s):
return _TMPL.TemplateResponse(request, "signup.html", _ctx(
request,
csrf_token=get_csrf_token(request),
error=_RATE_MSG,
name=name, cui=cui, email=email,
), status_code=429)
if len(parola) < _PASSWORD_MIN:
return _TMPL.TemplateResponse(request, "signup.html", _ctx(
request,
csrf_token=get_csrf_token(request),
error=f"Parola trebuie sa aiba cel putin {_PASSWORD_MIN} caractere.",
name=name, cui=cui, email=email,
), status_code=422)
# CUI obligatoriu la signup (US-001, PRD 5.12)
cui_norm = cui.strip().upper() if cui else ""
if not cui_norm:
return _TMPL.TemplateResponse(request, "signup.html", _ctx(
request,
csrf_token=get_csrf_token(request),
error="CUI-ul firmei este obligatoriu.",
name=name, cui=cui, email=email,
), status_code=422)
# Bootstrap admin: count_admins se citeste INAUNTRUL tranzactiei BEGIN IMMEDIATE,
# astfel lock-ul RESERVED serializeaza scriitorii si al doilea signup vede count==1.
conn = get_connection()
try:
conn.execute("BEGIN IMMEDIATE")
try:
is_first = count_admins(conn) == 0
account_id = create_account(conn, name, cui=cui_norm, email=email, active=False)
user_id = create_user(conn, account_id, email, parola, is_admin=is_first)
api_key = create_api_key(conn, account_id)
conn.execute("COMMIT")
except ValueError as exc:
conn.execute("ROLLBACK")
exc_msg = str(exc)
# Ordinea conteaza: verifica EMAIL inainte de CUI (ambele contin 'deja folosit').
# create_user ridica exact "email deja folosit"; create_account ridica "CUI X e deja folosit".
if "email deja folosit" in exc_msg:
# Email duplicat -> mesaj specific emailului (T3, D#14-email)
error_msg = (
"Acest email este deja folosit. "
"Daca ai deja cont, autentifica-te."
)
elif "deja folosit" in exc_msg or "IntegrityError" in exc_msg:
# CUI duplicat -> mesaj prietenos, NU mesajul tehnic cu 'activate --account' (T3, D#14)
settings = get_settings()
if settings.support_email:
error_msg = (
f"Aceasta firma (CUI {cui_norm}) e deja inregistrata. "
f"Cere accesul de la administratorul contului sau contacteaza suportul: "
f"{settings.support_email}"
)
else:
error_msg = (
f"Aceasta firma (CUI {cui_norm}) e deja inregistrata. "
f"Cere accesul de la administratorul contului."
)
else:
error_msg = exc_msg
return _TMPL.TemplateResponse(request, "signup.html", _ctx(
request,
csrf_token=get_csrf_token(request),
error=error_msg,
name=name, cui=cui, email=email,
), status_code=422)
except Exception as exc:
conn.execute("ROLLBACK")
return _TMPL.TemplateResponse(request, "signup.html", _ctx(
request,
csrf_token=get_csrf_token(request),
error=str(exc),
name=name, cui=cui, email=email,
), status_code=422)
finally:
conn.close()
set_session(request, account_id, user_id)
print(f"SIGNUP cont={account_id} email={email}", flush=True)
# Notificare email admin (best-effort, nu blocheaza signup-ul)
try:
conn2 = get_connection()
try:
admins = list_admin_emails(conn2)
finally:
conn2.close()
notify_signup(admins, account_id, email)
except Exception as exc_notify:
print(f"SIGNUP-NOTIFY exceptie neasteptata cont={account_id}: {type(exc_notify).__name__}", flush=True)
return _TMPL.TemplateResponse(request, "signup.html", _ctx(
request,
csrf_token=get_csrf_token(request),
api_key=api_key,
account_id=account_id,
))
# --- Login / Logout ---
@router.get("/login", response_class=HTMLResponse)
async def login_get(request: Request):
return _TMPL.TemplateResponse(request, "login.html", _ctx(
request, csrf_token=get_csrf_token(request)
))
@router.post("/login", response_class=HTMLResponse)
async def login_post(
request: Request,
email: str = Form(default=""),
parola: str = Form(default=""),
csrf_token: str = Form(default=""),
):
verify_csrf(request, csrf_token)
settings = get_settings()
ip = request.client.host if request.client else "unknown"
if not check_rate_limit("login:" + ip, settings.login_rate_max, settings.signup_rate_window_s):
return _TMPL.TemplateResponse(request, "login.html", _ctx(
request,
csrf_token=get_csrf_token(request),
error=_RATE_MSG,
), status_code=429)
conn = get_connection()
try:
account_id = verify_password(conn, email, parola)
if account_id is None:
return _TMPL.TemplateResponse(request, "login.html", _ctx(
request,
csrf_token=get_csrf_token(request),
error="Email sau parola incorecte.",
), status_code=401)
row = conn.execute(
"SELECT id FROM users WHERE email=? COLLATE NOCASE", (email.strip(),)
).fetchone()
user_id = int(row["id"]) if row else 0
finally:
conn.close()
set_session(request, account_id, user_id)
return RedirectResponse("/", status_code=303)
@router.post("/logout", response_class=HTMLResponse)
async def logout_post(
request: Request,
csrf_token: str = Form(default=""),
):
verify_csrf(request, csrf_token)
clear_session(request)
return RedirectResponse("/login", status_code=303)