"""Rute autentificare web: /signup, /login, /logout.""" from __future__ import annotations from datetime import datetime, timezone from pathlib import Path from fastapi import APIRouter, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from .. import __version__ from ..accounts import VALID_TIERS, create_account from ..auth import create_api_key from ..config import get_settings from ..db import get_connection from ..email import notify_signup from ..users import count_admins, create_user, list_admin_emails, verify_password from ..web.csrf import get_csrf_token, verify_csrf from ..web.ratelimit import check_rate_limit from ..web.session import clear_session, set_session router = APIRouter() _TMPL = Jinja2Templates(directory=str(Path(__file__).resolve().parent / "templates")) _RATE_MSG = "Prea multe cereri. Incearca mai tarziu." _PASSWORD_MIN = 10 def _ctx(request: Request, **extra) -> dict: settings = get_settings() return {"rar_env": settings.rar_env, "version": __version__, **extra} # --- Signup --- @router.get("/signup", response_class=HTMLResponse) async def signup_get(request: Request): return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request) )) @router.post("/signup", response_class=HTMLResponse) async def signup_post( request: Request, name: str = Form(default=""), cui: str = Form(default=""), email: str = Form(default=""), parola: str = Form(default=""), plan: str = Form(default=""), consent: str = Form(default=""), csrf_token: str = Form(default=""), ): verify_csrf(request, csrf_token) # Planul CERUT (intentie, nu drept): pastram doar valori valide; orice altceva -> 'free'. # `tier`-ul real ramane 'free' la creare; planul ales se onoreaza dupa plata (admin/webhook). requested_plan = plan.strip().lower() if plan else "" if requested_plan not in VALID_TIERS: requested_plan = "free" settings = get_settings() ip = request.client.host if request.client else "unknown" if not check_rate_limit(ip, settings.signup_rate_max, settings.signup_rate_window_s): return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), error=_RATE_MSG, name=name, cui=cui, email=email, plan=requested_plan, ), status_code=429) if len(parola) < _PASSWORD_MIN: return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), error=f"Parola trebuie sa aiba cel putin {_PASSWORD_MIN} caractere.", name=name, cui=cui, email=email, plan=requested_plan, ), status_code=422) # CUI obligatoriu la signup (US-001, PRD 5.12) cui_norm = cui.strip().upper() if cui else "" if not cui_norm: return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), error="CUI-ul firmei este obligatoriu.", name=name, cui=cui, email=email, plan=requested_plan, ), status_code=422) # Consimtamant Termeni + GDPR obligatoriu (proba). Checkbox bifat -> valoare ne-goala. if not (consent and consent.strip()): return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), error="Trebuie sa accepti Termenii si prelucrarea datelor (GDPR) pentru a crea cont.", name=name, cui=cui, email=email, plan=requested_plan, ), status_code=422) consent_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # Bootstrap admin: count_admins se citeste INAUNTRUL tranzactiei BEGIN IMMEDIATE, # astfel lock-ul RESERVED serializeaza scriitorii si al doilea signup vede count==1. conn = get_connection() try: conn.execute("BEGIN IMMEDIATE") try: is_first = count_admins(conn) == 0 account_id = create_account( conn, name, cui=cui_norm, email=email, active=False, requested_plan=requested_plan, consent_at=consent_at, ) user_id = create_user(conn, account_id, email, parola, is_admin=is_first) api_key = create_api_key(conn, account_id) conn.execute("COMMIT") except ValueError as exc: conn.execute("ROLLBACK") exc_msg = str(exc) # Ordinea conteaza: verifica EMAIL inainte de CUI (ambele contin 'deja folosit'). # create_user ridica exact "email deja folosit"; create_account ridica "CUI X e deja folosit". if "email deja folosit" in exc_msg: # Email duplicat -> mesaj specific emailului (T3, D#14-email) error_msg = ( "Acest email este deja folosit. " "Daca ai deja cont, autentifica-te." ) elif "deja folosit" in exc_msg or "IntegrityError" in exc_msg: # CUI duplicat -> mesaj prietenos, NU mesajul tehnic cu 'activate --account' (T3, D#14) settings = get_settings() if settings.support_email: error_msg = ( f"Aceasta firma (CUI {cui_norm}) e deja inregistrata. " f"Cere accesul de la administratorul contului sau contacteaza suportul: " f"{settings.support_email}" ) else: error_msg = ( f"Aceasta firma (CUI {cui_norm}) e deja inregistrata. " f"Cere accesul de la administratorul contului." ) else: error_msg = exc_msg return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), error=error_msg, name=name, cui=cui, email=email, plan=requested_plan, ), status_code=422) except Exception as exc: conn.execute("ROLLBACK") return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), error=str(exc), name=name, cui=cui, email=email, plan=requested_plan, ), status_code=422) finally: conn.close() set_session(request, account_id, user_id) print(f"SIGNUP cont={account_id} email={email}", flush=True) # Notificare email admin (best-effort, nu blocheaza signup-ul) try: conn2 = get_connection() try: admins = list_admin_emails(conn2) finally: conn2.close() notify_signup(admins, account_id, email) except Exception as exc_notify: print(f"SIGNUP-NOTIFY exceptie neasteptata cont={account_id}: {type(exc_notify).__name__}", flush=True) return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), api_key=api_key, account_id=account_id, )) # --- Login / Logout --- @router.get("/login", response_class=HTMLResponse) async def login_get(request: Request): return _TMPL.TemplateResponse(request, "login.html", _ctx( request, csrf_token=get_csrf_token(request) )) @router.post("/login", response_class=HTMLResponse) async def login_post( request: Request, email: str = Form(default=""), parola: str = Form(default=""), csrf_token: str = Form(default=""), ): verify_csrf(request, csrf_token) settings = get_settings() ip = request.client.host if request.client else "unknown" if not check_rate_limit("login:" + ip, settings.login_rate_max, settings.signup_rate_window_s): return _TMPL.TemplateResponse(request, "login.html", _ctx( request, csrf_token=get_csrf_token(request), error=_RATE_MSG, ), status_code=429) conn = get_connection() try: account_id = verify_password(conn, email, parola) if account_id is None: return _TMPL.TemplateResponse(request, "login.html", _ctx( request, csrf_token=get_csrf_token(request), error="Email sau parola incorecte.", ), status_code=401) row = conn.execute( "SELECT id FROM users WHERE email=? COLLATE NOCASE", (email.strip(),) ).fetchone() user_id = int(row["id"]) if row else 0 finally: conn.close() set_session(request, account_id, user_id) return RedirectResponse("/", status_code=303) @router.post("/logout", response_class=HTMLResponse) async def logout_post( request: Request, csrf_token: str = Form(default=""), ): verify_csrf(request, csrf_token) clear_session(request) return RedirectResponse("/login", status_code=303)