From b92055eb0135a5861d217985d98e1f6d490f25a2 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Thu, 18 Jun 2026 17:19:06 +0000 Subject: [PATCH] feat(web): self-service cheie/creds + admin web + email signup (PRD 3.3b) US-007: rute web proprii /cont/roteste-cheie + /cont/rar-creds scoped pe sesiune (C13), sectiune "Contul meu" cu cheie afisata o data. US-010: rol admin (users.is_admin) + require_admin->403 + CLI set-admin + bootstrap primul cont=admin (count_admins in BEGIN IMMEDIATE, anti-race). US-011: panou /admin (activare/dezactivare conturi, CSRF + PRG), link admin + logout pe dashboard. US-012: app/email.py notify_signup best-effort degradat fara SMTP + config smtp_*. Fix: migrare defensiva users.is_admin/email_verified in _migrate. VERIFY x2 context curat (PASS) + /code-review high. 393 teste pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/config.py | 9 + app/db.py | 11 + app/email.py | 59 +++++ app/main.py | 9 +- app/users.py | 55 ++++- app/web/admin_routes.py | 129 +++++++++++ app/web/auth_routes.py | 19 +- app/web/routes.py | 119 ++++++++++ app/web/session.py | 24 ++ app/web/templates/_cont.html | 76 +++++++ app/web/templates/admin.html | 105 +++++++++ app/web/templates/dashboard.html | 13 ++ docs/ROADMAP.md | 6 +- docs/prd/prd-3.3-self-onboarding-web.md | 42 +++- tests/test_admin_panel.py | 213 ++++++++++++++++++ tests/test_admin_role.py | 277 ++++++++++++++++++++++++ tests/test_dashboard_admin_link.py | 55 +++++ tests/test_migrate_users.py | 124 +++++++++++ tests/test_signup_notify.py | 181 ++++++++++++++++ tests/test_web_cont.py | 232 ++++++++++++++++++++ tools/account.py | 18 ++ 21 files changed, 1766 insertions(+), 10 deletions(-) create mode 100644 app/email.py create mode 100644 app/web/admin_routes.py create mode 100644 app/web/templates/_cont.html create mode 100644 app/web/templates/admin.html create mode 100644 tests/test_admin_panel.py create mode 100644 tests/test_admin_role.py create mode 100644 tests/test_dashboard_admin_link.py create mode 100644 tests/test_migrate_users.py create mode 100644 tests/test_signup_notify.py create mode 100644 tests/test_web_cont.py diff --git a/app/config.py b/app/config.py index 07a432e..32b615a 100644 --- a/app/config.py +++ b/app/config.py @@ -46,6 +46,15 @@ class Settings(BaseSettings): # False (dev): cookie fara Secure, functioneaza pe HTTP. session_https_only: bool = False + # --- Notificare email admin la signup (US-012, PRD 3.3b) --- + # Nesetat (smtp_host None) -> notificarea e DEGRADATA (doar log SIGNUP); + # follow-up cand exista SMTP real configurat in .env. + smtp_host: str | None = None + smtp_port: int = 587 + smtp_user: str | None = None + smtp_password: str | None = None + smtp_from: str | None = None + # --- Rate-limit signup + login (US-009, PRD 3.3 C5) --- # Max cereri POST /signup per IP in fereastra de timp (in-proces, fara dependinta noua). signup_rate_max: int = 5 diff --git a/app/db.py b/app/db.py index 4f3589c..08a76f3 100644 --- a/app/db.py +++ b/app/db.py @@ -68,6 +68,17 @@ def _migrate(conn: sqlite3.Connection) -> None: "CREATE UNIQUE INDEX IF NOT EXISTS ux_accounts_cui ON accounts(cui) WHERE cui IS NOT NULL" ) + # Coloane users (DB cu users creata inaintea acestor coloane) + user_tbl = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='users'" + ).fetchone() + if user_tbl: + user_cols = {r["name"] for r in conn.execute("PRAGMA table_info(users)").fetchall()} + if "is_admin" not in user_cols: + conn.execute("ALTER TABLE users ADD COLUMN is_admin INTEGER NOT NULL DEFAULT 0") + if "email_verified" not in user_cols: + conn.execute("ALTER TABLE users ADD COLUMN email_verified INTEGER NOT NULL DEFAULT 0") + # Index batch_id pe submissions (poate lipsi pe DB veche) existing_idx = {r["name"] for r in conn.execute( "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='submissions'" diff --git a/app/email.py b/app/email.py new file mode 100644 index 0000000..821596a --- /dev/null +++ b/app/email.py @@ -0,0 +1,59 @@ +"""Helper notificare email admin la signup (US-012, PRD 3.3b). + +Livrare DEGRADATA: daca smtp_host nu e configurat, functia e no-op (log doar). +Orice eroare SMTP e prinsa si logata — signup-ul NU e blocat niciodata. +""" + +from __future__ import annotations + +import smtplib +import textwrap +from email.message import EmailMessage + +from .config import get_settings + + +def notify_signup(admin_emails: list[str], account_id: int, email: str) -> None: + """Notifica adminii despre un cont nou in asteptare (best-effort). + + Daca smtp_host e None SAU admin_emails e gol -> log si return (degradat). + Daca SMTP ridica exceptie -> log eroare si return (NU se propaga). + Timeout mic (5s) pe conexiunea SMTP. + """ + settings = get_settings() + + if not settings.smtp_host or not admin_emails: + print( + f"SIGNUP-NOTIFY degradat (fara SMTP) cont={account_id} " + f"email={email} admins={len(admin_emails)}", + flush=True, + ) + return + + try: + msg = EmailMessage() + expeditor = settings.smtp_from or settings.smtp_user or "autopass@localhost" + msg["From"] = expeditor + msg["To"] = ", ".join(admin_emails) + msg["Subject"] = f"AutoPass: cont nou {account_id} in asteptare" + msg.set_content(textwrap.dedent(f"""\ + Cont nou inregistrat si in asteptare de activare. + + ID cont: {account_id} + Email: {email} + + Actioneaza din panoul admin /admin sau din CLI: + python3 -m tools.account activate --account {account_id} + """)) + + with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=5) as smtp: + if settings.smtp_user and settings.smtp_password: + smtp.starttls() + smtp.login(settings.smtp_user, settings.smtp_password) + smtp.send_message(msg) + + except Exception as exc: + print( + f"SIGNUP-NOTIFY esuat cont={account_id}: {type(exc).__name__}", + flush=True, + ) diff --git a/app/main.py b/app/main.py index 335f580..f43cdce 100644 --- a/app/main.py +++ b/app/main.py @@ -28,8 +28,9 @@ from .db import get_connection, init_db, queue_depth, read_heartbeat from .security import install_log_redaction from .web.routes import router as web_router from .web.auth_routes import router as auth_router +from .web.admin_routes import router as admin_router from .web.csrf import CsrfError -from .web.session import LoginRequired +from .web.session import AdminRequired, LoginRequired @asynccontextmanager @@ -57,6 +58,11 @@ async def login_required_handler(request: Request, exc: LoginRequired) -> Redire return RedirectResponse("/login", status_code=303) +@app.exception_handler(AdminRequired) +async def admin_required_handler(request: Request, exc: AdminRequired) -> JSONResponse: + return JSONResponse(status_code=403, content={"detail": "acces interzis (necesita admin)"}) + + @app.exception_handler(CsrfError) async def csrf_error_handler(request: Request, exc: CsrfError) -> JSONResponse: return JSONResponse(status_code=403, content={"detail": "CSRF invalid"}) @@ -86,6 +92,7 @@ app.include_router(api_v1_router) app.include_router(import_v1_router) app.include_router(web_router) app.include_router(auth_router) +app.include_router(admin_router) @app.get("/healthz") diff --git a/app/users.py b/app/users.py index cc95900..467bf36 100644 --- a/app/users.py +++ b/app/users.py @@ -46,12 +46,21 @@ def _scrypt_hash(password: str, salt: bytes, n: int = _N, r: int = _R, p: int = ) -def create_user(conn: sqlite3.Connection, account_id: int, email: str, password: str) -> int: +def create_user( + conn: sqlite3.Connection, + account_id: int, + email: str, + password: str, + is_admin: bool = False, +) -> int: """Creeaza un user nou si intoarce id-ul. Valideaza ca: contul exista, parola intre 10 si 128 caractere, emailul nu e duplicat. Stocheaza DOAR hash scrypt + salt (hex), niciodata parola in clar. Email duplicat (case-insensitive, via UNIQUE COLLATE NOCASE) -> ValueError. + + is_admin: daca True, userul e marcat ca admin (is_admin=1). Apelantul decide + logica de bootstrap (count_admins==0 -> primul cont devine admin). """ email = email.strip() @@ -69,9 +78,9 @@ def create_user(conn: sqlite3.Connection, account_id: int, email: str, password: try: cur = conn.execute( - "INSERT INTO users (account_id, email, password_hash, salt, scrypt_params) " - "VALUES (?, ?, ?, ?, ?)", - (account_id, email, pw_hash.hex(), salt.hex(), SCRYPT_PARAMS), + "INSERT INTO users (account_id, email, password_hash, salt, scrypt_params, is_admin) " + "VALUES (?, ?, ?, ?, ?, ?)", + (account_id, email, pw_hash.hex(), salt.hex(), SCRYPT_PARAMS, 1 if is_admin else 0), ) except sqlite3.IntegrityError: raise ValueError("email deja folosit") @@ -79,6 +88,44 @@ def create_user(conn: sqlite3.Connection, account_id: int, email: str, password: return int(cur.lastrowid or 0) +def count_admins(conn: sqlite3.Connection) -> int: + """Numara userii cu is_admin=1 din intreaga baza.""" + row = conn.execute("SELECT COUNT(*) AS n FROM users WHERE is_admin=1").fetchone() + return int(row["n"]) if row else 0 + + +def set_admin(conn: sqlite3.Connection, account_id: int, is_admin: bool = True) -> None: + """Seteaza/sterge rolul admin pe toti userii contului dat. + + Ridica ValueError daca contul nu exista. + Daca contul exista dar nu are useri, e no-op silentios (confom spec US-010). + """ + acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone() + if not acct: + raise ValueError(f"cont inexistent: {account_id}") + conn.execute( + "UPDATE users SET is_admin=? WHERE account_id=?", + (1 if is_admin else 0, account_id), + ) + + +def is_account_admin(conn: sqlite3.Connection, account_id: int) -> bool: + """Returneaza True daca cel putin un user al contului are is_admin=1.""" + row = conn.execute( + "SELECT 1 FROM users WHERE account_id=? AND is_admin=1 LIMIT 1", + (account_id,), + ).fetchone() + return row is not None + + +def list_admin_emails(conn: sqlite3.Connection) -> list[str]: + """Returneaza emailurile tuturor userilor cu is_admin=1 (folosit de US-012).""" + rows = conn.execute( + "SELECT email FROM users WHERE is_admin=1" + ).fetchall() + return [row["email"] for row in rows] + + def verify_password(conn: sqlite3.Connection, email: str, password: str) -> int | None: """Verifica parola pentru email. Intoarce account_id la potrivire, None altfel. diff --git a/app/web/admin_routes.py b/app/web/admin_routes.py new file mode 100644 index 0000000..02cf0d1 --- /dev/null +++ b/app/web/admin_routes.py @@ -0,0 +1,129 @@ +"""Panou admin web /admin. US-011 PRD 3.3b. + +Rute: + GET /admin — listeaza conturi in asteptare + active (require_admin) + POST /admin/activate — activeaza un cont (require_admin + CSRF, PRG) + POST /admin/deactivate — dezactiveaza un cont, nu permite id=1 (require_admin + CSRF, PRG) +""" + +from __future__ import annotations + +from pathlib import Path + +from fastapi import APIRouter, Form, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi.templating import Jinja2Templates + +from .. import __version__ +from ..accounts import list_accounts, set_active +from ..config import get_settings +from ..db import get_connection +from ..web.csrf import get_csrf_token, verify_csrf +from ..web.session import require_admin + +router = APIRouter() +_TMPL = Jinja2Templates(directory=str(Path(__file__).resolve().parent / "templates")) + + +def _ctx(request: Request, **extra) -> dict: + settings = get_settings() + return {"rar_env": settings.rar_env, "version": __version__, **extra} + + +def _emails_by_account(conn) -> dict[int, str | None]: + """Intoarce primul email per account_id, intr-un singur query (fara N+1).""" + rows = conn.execute( + "SELECT account_id, email FROM users ORDER BY id" + ).fetchall() + result: dict[int, str | None] = {} + for row in rows: + acc_id = int(row["account_id"]) + if acc_id not in result: + result[acc_id] = row["email"] + return result + + +def _render_admin(request: Request, conn, *, error: str | None = None, status_code: int = 200): + """Randeaza pagina admin.html cu lista de conturi si optional un mesaj de eroare.""" + accounts = list_accounts(conn) + emails = _emails_by_account(conn) + for acct in accounts: + acct["email"] = emails.get(acct["id"]) + pending = [a for a in accounts if not a["active"] and a["id"] != 1] + active = [a for a in accounts if a["active"] and a["id"] != 1] + default = next((a for a in accounts if a["id"] == 1), None) + return _TMPL.TemplateResponse(request, "admin.html", _ctx( + request, + csrf_token=get_csrf_token(request), + pending=pending, + active=active, + default_account=default, + error=error, + ), status_code=status_code) + + +@router.get("/admin", response_class=HTMLResponse) +async def admin_get(request: Request): + """Panou admin: conturi in asteptare + active.""" + require_admin(request) + + conn = get_connection() + try: + return _render_admin(request, conn) + finally: + conn.close() + + +@router.post("/admin/activate", response_class=HTMLResponse) +async def admin_activate( + request: Request, + account_id: int = Form(...), + csrf_token: str = Form(default=""), +): + """Activeaza un cont. PRG: redirect 303 la /admin dupa succes.""" + require_admin(request) + verify_csrf(request, csrf_token) + + conn = get_connection() + try: + try: + set_active(conn, account_id, True) + except ValueError as exc: + return _render_admin(request, conn, error=str(exc), status_code=422) + finally: + conn.close() + + return RedirectResponse("/admin", status_code=303) + + +@router.post("/admin/deactivate", response_class=HTMLResponse) +async def admin_deactivate( + request: Request, + account_id: int = Form(...), + csrf_token: str = Form(default=""), +): + """Dezactiveaza un cont. Nu permite dezactivarea contului default id=1. PRG: redirect 303.""" + require_admin(request) + verify_csrf(request, csrf_token) + + if account_id == 1: + conn = get_connection() + try: + return _render_admin( + request, conn, + error="Contul default (id=1) nu poate fi dezactivat.", + status_code=422, + ) + finally: + conn.close() + + conn = get_connection() + try: + try: + set_active(conn, account_id, False) + except ValueError as exc: + return _render_admin(request, conn, error=str(exc), status_code=422) + finally: + conn.close() + + return RedirectResponse("/admin", status_code=303) diff --git a/app/web/auth_routes.py b/app/web/auth_routes.py index 1963039..45a9236 100644 --- a/app/web/auth_routes.py +++ b/app/web/auth_routes.py @@ -13,7 +13,8 @@ from ..accounts import create_account from ..auth import create_api_key from ..config import get_settings from ..db import get_connection -from ..users import create_user, verify_password +from ..email import notify_signup +from ..users import count_admins, create_user, list_admin_emails, verify_password from ..web.csrf import get_csrf_token, verify_csrf from ..web.ratelimit import check_rate_limit from ..web.session import clear_session, set_session @@ -68,12 +69,15 @@ async def signup_post( name=name, cui=cui, email=email, ), status_code=422) + # Bootstrap admin: count_admins se citeste INAUNTRUL tranzactiei BEGIN IMMEDIATE, + # astfel lock-ul RESERVED serializeaza scriitorii si al doilea signup vede count==1. conn = get_connection() try: conn.execute("BEGIN IMMEDIATE") try: + is_first = count_admins(conn) == 0 account_id = create_account(conn, name, cui.strip() or None, active=False) - user_id = create_user(conn, account_id, email, parola) + user_id = create_user(conn, account_id, email, parola, is_admin=is_first) api_key = create_api_key(conn, account_id) conn.execute("COMMIT") except Exception as exc: @@ -90,6 +94,17 @@ async def signup_post( set_session(request, account_id, user_id) print(f"SIGNUP cont={account_id} email={email}", flush=True) + # Notificare email admin (best-effort, nu blocheaza signup-ul) + try: + conn2 = get_connection() + try: + admins = list_admin_emails(conn2) + finally: + conn2.close() + notify_signup(admins, account_id, email) + except Exception as exc_notify: + print(f"SIGNUP-NOTIFY exceptie neasteptata cont={account_id}: {type(exc_notify).__name__}", flush=True) + return _TMPL.TemplateResponse(request, "signup.html", _ctx( request, csrf_token=get_csrf_token(request), diff --git a/app/web/routes.py b/app/web/routes.py index 2ab8037..dd04c2e 100644 --- a/app/web/routes.py +++ b/app/web/routes.py @@ -22,6 +22,7 @@ from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from .. import __version__ +from ..auth import rotate_api_key from ..web.csrf import get_csrf_token, verify_csrf from ..web.session import require_login from ..api.v1.import_router import ( @@ -37,6 +38,7 @@ from ..crypto import decrypt_creds, encrypt_creds from ..db import get_connection, read_heartbeat from ..idempotency import build_key, canonicalize_row from ..import_parse import FileTooLarge, HeaderError, MultipleSheets, parse_date_value, parse_file +from ..users import is_account_admin from ..mapping import ( DEFAULT_ACCOUNT_ID, account_or_default, @@ -131,6 +133,7 @@ def dashboard(request: Request) -> HTMLResponse: "last_login": hb["last_rar_login_ok"] if hb else None, "rar_state": _rar_state(hb, worker_alive), "account_active": _account_active(conn, account_id), + "is_admin": is_account_admin(conn, account_id), "csrf_token": get_csrf_token(request), } return templates.TemplateResponse("dashboard.html", ctx) @@ -907,3 +910,119 @@ async def web_confirma_import( finally: conn.close() + + +# =========================================================================== # +# US-007 — Sectiune "Contul meu": rotire cheie API + creds RAR din UI # +# Rute web proprii scoped pe sesiune (C13: nu reutilizeaza /v1/conturi/rar-creds +# care cere cheie API; sesiunea web e suficienta ca identitate). # +# =========================================================================== # + +def _render_cont( + request: Request, + *, + api_key: str | None = None, + are_creds: bool = False, + creds_mesaj: str | None = None, + creds_eroare: str | None = None, + rot_eroare: str | None = None, +) -> HTMLResponse: + """Randeaza cardul 'Contul meu'. Parola niciodata in value=.""" + return templates.TemplateResponse( + "_cont.html", + _ctx( + request, + api_key=api_key, + are_creds=are_creds, + creds_mesaj=creds_mesaj, + creds_eroare=creds_eroare, + rot_eroare=rot_eroare, + ), + ) + + +@router.get("/_fragments/cont", response_class=HTMLResponse) +def fragment_cont(request: Request) -> HTMLResponse: + """Fragment HTMX card 'Contul meu': stare cheie + creds RAR (fara a le expune).""" + account_id = require_login(request) + acct = account_or_default(account_id) + conn = get_connection() + try: + row = conn.execute( + "SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,) + ).fetchone() + are_creds = bool(row and row["rar_creds_enc"]) + return _render_cont(request, are_creds=are_creds) + finally: + conn.close() + + +@router.post("/cont/roteste-cheie", response_class=HTMLResponse) +def cont_roteste_cheie( + request: Request, + csrf_token: str | None = Form(None), +) -> HTMLResponse: + """Revoca toate cheile active si emite una noua. Afisata O SINGURA DATA.""" + account_id = require_login(request) + verify_csrf(request, csrf_token) + acct = account_or_default(account_id) + conn = get_connection() + try: + new_key = rotate_api_key(conn, acct) + row = conn.execute( + "SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,) + ).fetchone() + are_creds = bool(row and row["rar_creds_enc"]) + return _render_cont(request, api_key=new_key, are_creds=are_creds) + finally: + conn.close() + + +@router.post("/cont/rar-creds", response_class=HTMLResponse) +def cont_rar_creds( + request: Request, + rar_email: str = Form(""), + rar_parola: str = Form(""), + csrf_token: str | None = Form(None), +) -> HTMLResponse: + """Seteaza creds RAR per cont din sesiune (ruta web proprie, C13). + + Camp parola NICIODATA re-pus in value= la re-randare. + Validare minima: email si parola negoale. + """ + account_id = require_login(request) + verify_csrf(request, csrf_token) + acct = account_or_default(account_id) + + email = rar_email.strip() + parola = rar_parola.strip() + + if not email or not parola: + conn = get_connection() + try: + row = conn.execute( + "SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,) + ).fetchone() + are_creds = bool(row and row["rar_creds_enc"]) + finally: + conn.close() + return _render_cont( + request, + are_creds=are_creds, + creds_eroare="Email si parola sunt obligatorii.", + ) + + enc = encrypt_creds({"email": email, "password": parola}) + conn = get_connection() + try: + conn.execute( + "UPDATE accounts SET rar_creds_enc=? WHERE id=?", + (enc, acct), + ) + return _render_cont( + request, + are_creds=True, + creds_mesaj="Credentialele RAR au fost salvate cu succes.", + ) + finally: + conn.close() diff --git a/app/web/session.py b/app/web/session.py index 1ee9b61..0417370 100644 --- a/app/web/session.py +++ b/app/web/session.py @@ -20,6 +20,10 @@ class LoginRequired(Exception): """Ridica pentru a redirectiona la /login (prinsa de exception_handler in main.py).""" +class AdminRequired(Exception): + """Ridica cand contul sesiunii nu are rol admin (prinsa de exception_handler in main.py).""" + + def current_account(request: Request) -> int | None: """account_id din sesiune sau None daca nu e logat.""" val = request.session.get("account_id") @@ -63,6 +67,26 @@ def require_login(request: Request) -> int: return aid +def require_admin(request: Request) -> int: + """Verifica ca userul logat are rol admin pe contul sesiunii. + + Intai cheama require_login (nelogat -> LoginRequired -> /login redirect). + Daca e logat dar nu e admin -> ridica AdminRequired. + Intoarce account_id la succes. + """ + account_id = require_login(request) + from ..db import get_connection + from ..users import is_account_admin + conn = get_connection() + try: + admin = is_account_admin(conn, account_id) + finally: + conn.close() + if not admin: + raise AdminRequired() + return account_id + + def set_session(request: Request, account_id: int, user_id: int) -> None: """Seteaza sesiunea dupa login. Curata mai intai (C3 anti-fixare sesiune).""" request.session.clear() diff --git a/app/web/templates/_cont.html b/app/web/templates/_cont.html new file mode 100644 index 0000000..e581ff9 --- /dev/null +++ b/app/web/templates/_cont.html @@ -0,0 +1,76 @@ +
+

Contul meu

+ + +
+

Cheia mea API

+ + {% if api_key %} +
Cheia a fost rotita. Salveaz-o acum — nu o vei mai putea vedea.
+ +
+ {{ api_key }} +
+ + + +

+ Atentie: la urmatoarea vizita aceasta cheie dispare. Daca o pierzi, roteste din nou. +

+ {% endif %} + + {% if rot_eroare %} + + {% endif %} + +
+ + + Cheia veche se revoca imediat. +
+
+ + +
+

Credentiale RAR (portal AUTOPASS)

+ + {% if are_creds %} +
Credentiale RAR configurate.
+ {% endif %} + + {% if creds_mesaj %} +
{{ creds_mesaj }}
+ {% endif %} + + {% if creds_eroare %} + + {% endif %} + +
+ +

+
+ +

+

+
+ +

+ + Parola stocata criptat, niciodata in clar. +
+
+
diff --git a/app/web/templates/admin.html b/app/web/templates/admin.html new file mode 100644 index 0000000..4bf6678 --- /dev/null +++ b/app/web/templates/admin.html @@ -0,0 +1,105 @@ +{% extends "base.html" %} +{% block title %}Panou admin — Gateway RAR AUTOPASS{% endblock %} +{% block content %} +
+

Panou admin

+ Inapoi la dashboard +
+ +{% if error %} + +{% endif %} + + +
+

Conturi in asteptare ({{ pending|length }})

+ {% if pending %} +
+ + + + + + + + + + + + + {% for acct in pending %} + + + + + + + + + {% endfor %} + +
IDCompanieCUIEmailInregistratActiune
{{ acct.id }}{{ acct.name }}{{ acct.cui or "—" }}{{ acct.email or "—" }}{{ acct.created_at or "—" }} +
+ + + +
+
+
+ {% else %} +

Niciun cont in asteptare.

+ {% endif %} +
+ + +
+

Conturi active ({{ active|length }})

+ {% if active %} +
+ + + + + + + + + + + + + {% for acct in active %} + + + + + + + + + {% endfor %} + +
IDCompanieCUIEmailInregistratActiune
{{ acct.id }}{{ acct.name }}{{ acct.cui or "—" }}{{ acct.email or "—" }}{{ acct.created_at or "—" }} +
+ + + +
+
+
+ {% else %} +

Niciun cont activ (in afara de contul dev).

+ {% endif %} +
+ + +{% if default_account %} +
+

+ Cont dev implicit (id=1): {{ default_account.name }} + — activ={{ default_account.active }} — fara buton de activare/dezactivare (cont de sistem). +

+
+{% endif %} + +{% endblock %} diff --git a/app/web/templates/dashboard.html b/app/web/templates/dashboard.html index 0489ef8..b1e319a 100644 --- a/app/web/templates/dashboard.html +++ b/app/web/templates/dashboard.html @@ -1,6 +1,15 @@ {% extends "base.html" %} {% block content %} + +
+ {% if is_admin %}Panou admin{% endif %} +
+ + +
+
+ {% include '_upload.html' %} @@ -32,6 +41,10 @@
se incarca mapari…
+
+
se incarca cont…
+
+

Coada submissions

diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 2e3b0c2..5961efd 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -48,7 +48,9 @@ Reguli de contract (detalii in `docs/api-rar-contract.md`): `FINALIZATA` e termi > PRD-uri (`docs/prd/prd-X.Y-*.md`), linkate in coloana Detalii. La fiecare livrabila terminata: > schimba statusul + data + linkul PRD si actualizeaza "Ultima actualizare". -**Ultima actualizare**: 2026-06-17 — 3.3a LIVRAT (self-onboarding web core: `app/users.py` parole scrypt cu eticheta de parametri onorata la verify; `SessionMiddleware` same_site=strict + `app/web/session.py` guard `require_login`→`LoginRequired`; CSRF per-sesiune enforce in prod inclusiv pe login/signup + rate-limit signup & login in-proces; signup `active=0` tranzactie atomica + cheie-o-data + log `SIGNUP`; login/logout; dashboard & import multi-tenant scoped pe sesiune cu regula NULL→cont 1 — toate rutele web care ating date sensibile sub `require_login` + scope; gate worker `claim_one` `LEFT JOIN ... COALESCE(active,1)=1`. 2 runde VERIFY context curat — runda 1 a prins un leak cross-account pe `/_fragments/mapari`, reparat; runda 2 PASS. `/code-review` high a prins 3 findings, reparate. 361 teste pass). Urmeaza 3.3b (self-service cheie/creds + admin web + email). Deferat din 3.1 (P3): `rename`/`set-cui`, `--if-not-exists`. +**Ultima actualizare**: 2026-06-18 — 3.3b LIVRAT (self-service cheie/creds + admin web + email). US-007 rute web proprii pentru rotire cheie + setare creds RAR scoped pe sesiune (C13, nu endpointul API). US-010 rol admin (`users.is_admin`) + `require_admin`→`AdminRequired`→403 + CLI `tools/account.py set-admin` + bootstrap automat (primul cont care se inregistreaza = admin, citit in `BEGIN IMMEDIATE` anti-race). US-011 panou `/admin` (conturi in asteptare/active, activare/dezactivare cu CSRF + PRG, contul dev id=1 protejat) + link "Panou admin" pe dashboard doar pentru admini + buton logout. US-012 `app/email.py notify_signup` best-effort DEGRADAT fara SMTP (no-op + log, prinde orice exceptie, nu blocheaza signup) + config `smtp_*`. Fix migrare defensiva `users.is_admin`/`email_verified` in `_migrate` (gap prins de VERIFY r1, ca C1 pe `accounts.active`). 2 runde VERIFY context curat (r2 PASS, sweep securitate toate rutele noi sub require_login/require_admin + CSRF, scoped sesiune). `/code-review` high: TOCTOU bootstrap mutat in tranzactie + `_render_admin` extras (anti-duplicare + N+1). 393 teste pass. Urmeaza Etapa 4 (4.1 mapare AI/MCP). Deferat din 3.1 (P3): `rename`/`set-cui`, `--if-not-exists`. SMTP real = follow-up pe US-012. + +> 3.3a LIVRAT (self-onboarding web core: `app/users.py` parole scrypt cu eticheta de parametri onorata la verify; `SessionMiddleware` same_site=strict + `app/web/session.py` guard `require_login`→`LoginRequired`; CSRF per-sesiune enforce in prod inclusiv pe login/signup + rate-limit signup & login in-proces; signup `active=0` tranzactie atomica + cheie-o-data + log `SIGNUP`; login/logout; dashboard & import multi-tenant scoped pe sesiune cu regula NULL→cont 1 — toate rutele web care ating date sensibile sub `require_login` + scope; gate worker `claim_one` `LEFT JOIN ... COALESCE(active,1)=1`. 2 runde VERIFY context curat — runda 1 a prins un leak cross-account pe `/_fragments/mapari`, reparat; runda 2 PASS. `/code-review` high a prins 3 findings, reparate. 361 teste pass). Urmeaza 3.3b (self-service cheie/creds + admin web + email). Deferat din 3.1 (P3): `rename`/`set-cui`, `--if-not-exists`. ### Etapa 1 — Canal API ROAAUTO (Treapta 1) @@ -75,7 +77,7 @@ Reguli de contract (detalii in `docs/api-rar-contract.md`): `FINALIZATA` e termi | 3.1 | Creare cont nou (CLI dedicat) | DONE | 2026-06-17 | CLI `tools/account.py` (create/list[--pending]/activate/deactivate, `--with-key` atomic) + `accounts.active` + index unic CUI + `app/accounts.py`. 20 teste noi. PRD: [prd-3.1](prd/prd-3.1-creare-cont.md) | | 3.2 | Filtrare pe cont a GET-urilor de listare | DONE | 2026-06-17 | scope cheie pe `/v1/prezentari(/{id})`, `/v1/mapari(/pending)`, `/v1/audit/export` (NULL→cont 1); nomenclator global; 404 cross-account identic (B3) + allowlist campuri detaliu (B4) + helper `account_scope_clause` (B2) + index (B5). 14 teste noi, 313 pass. PRD: [prd-3.2](prd/prd-3.2-filtrare-cont-get.md) | | 3.3a | Self-onboarding web (core) | DONE | 2026-06-17 | `users` (scrypt) + sesiune (`SessionMiddleware`, same_site=strict) + CSRF (enforce prod, inclusiv login/signup) + rate-limit signup/login + signup/login/logout + dashboard & import scoped pe sesiune (NULL→1, anti-leak C6) + gate worker `active=0` (`COALESCE`). 2 runde VERIFY (leak `/_fragments/mapari` prins+reparat) + code-review (csrf erori, scrypt_params, login rate-limit). 361 teste. PRD: [prd-3.3](prd/prd-3.3-self-onboarding-web.md) | -| 3.3b | Self-service cheie/creds + admin web + email | TODO (PRD aprobat) | | US-007 (rotire cheie + creds RAR pe ruta web), US-010 rol admin (primul cont=admin), US-011 panou `/admin` activare, US-012 email signup (degradat: log+`/admin`+`list --pending`, SMTP follow-up). PRD: [prd-3.3](prd/prd-3.3-self-onboarding-web.md) | +| 3.3b | Self-service cheie/creds + admin web + email | DONE | 2026-06-18 | US-007 (rute web proprii `/cont/roteste-cheie`+`/cont/rar-creds` scoped sesiune, C13), US-010 (rol admin `is_admin` + `require_admin`→403 + CLI `set-admin` + bootstrap primul cont=admin), US-011 (`/admin` activare/dezactivare cu CSRF+PRG, link doar pt admini + logout), US-012 (`app/email.py` notify best-effort degradat fara SMTP + log `SIGNUP`). Fix migrare defensiva `users.is_admin`/`email_verified`. 2 runde VERIFY context curat (r1 a prins migrarea lipsa, reparat; r2 PASS) + `/code-review` high (TOCTOU bootstrap admin mutat in tranzactie + extras `_render_admin` anti-duplicare/N+1). 393 teste. PRD: [prd-3.3](prd/prd-3.3-self-onboarding-web.md) | ### Etapa 4 — Viitor (Treapta 3) diff --git a/docs/prd/prd-3.3-self-onboarding-web.md b/docs/prd/prd-3.3-self-onboarding-web.md index 986fc54..56147db 100644 --- a/docs/prd/prd-3.3-self-onboarding-web.md +++ b/docs/prd/prd-3.3-self-onboarding-web.md @@ -1,6 +1,6 @@ # PRD 3.3 — Self-onboarding web (login email+parola → emite cheie) -**Stare**: verify-pass (3.3a) — 3.3b deschis +**Stare**: inchis (3.3a + 3.3b livrate) > **Decizii la poarta EXECUTE (2026-06-17, confirmate de utilizator):** > - **Livrabila sparta in doua faze** (scope 12 stories prea mare pentru un singur EXECUTE): @@ -479,3 +479,43 @@ izolare pe 2 conturi (`tests/test_mapari_scope.py`). Suita finala: **361 passed, 0 fail.** Findings low/by-design neactionate (documentate): dev-fallback cont 1 cand `web_auth_required=False` (C12, intentionat — atentie ops la deploy prod), 500 rar la DB-locked in signup, `request.client is None` → bucket rate-limit 'unknown' partajat. + +## Progres executie 3.3b (lead) + +> Sub-livrabila 3.3b (self-service cheie/creds + admin web + email). Decizii confirmate la poarta: +> primul cont care se inregistreaza devine admin (bootstrap automat); US-012 livrare DEGRADATA fara +> SMTP (helper `app/email.py` best-effort no-op + log `SIGNUP` deja existent din 3.3a). + +Valuri (fisiere disjuncte intre stories paralele): +- **Val 1:** US-010 (`users.py`/`session.py`/`tools/account.py`/`main.py` handler) ‖ US-007 (`routes.py`/`_cont.html`/`dashboard.html`) +- **Val 2:** US-011 (`admin_routes.py` nou/`admin.html`/`main.py` register) ‖ US-012 (`email.py` nou/`config.py`/`auth_routes.py`) + +- [x] **US-010** — rol admin (`is_admin`) + helper-e (`count_admins`/`set_admin`/`is_account_admin`/`list_admin_emails`) + `require_admin`→`AdminRequired`→403 + CLI `set-admin`. 13 teste. Bootstrap (primul cont=admin) cablat in signup de US-012 (evita conflict pe auth_routes.py). +- [x] **US-007** — sectiune "Contul meu" (`/_fragments/cont`): rotire cheie (afisata o data) + creds RAR pe ruta web proprie scoped pe sesiune (`POST /cont/roteste-cheie`, `POST /cont/rar-creds`, C13, NU endpointul API). 5 teste. +- [x] **US-011** — panou `/admin` (`admin_routes.py`): conturi in asteptare/active + activare/dezactivare (require_admin + CSRF + PRG); contul dev id=1 fara butoane. Link "Panou admin" pe dashboard doar pentru admini + buton logout. 5 + 2 teste. +- [x] **US-012** — `app/email.py` `notify_signup` best-effort (no-op fara `smtp_host`, prinde orice exceptie SMTP, timeout 5s) + config `smtp_*` + cablaj signup: bootstrap admin (primul cont = admin via `count_admins==0`) + notificare degradata dupa `set_session`. 5 teste. +- [x] **Fix migrare (din VERIFY r1):** `_migrate` adauga defensiv `users.is_admin`/`email_verified` pe DB cu tabela `users` fara ele (idempotent, guard pe existenta tabelei). 2 teste. + +## Raport VERIFY (3.3b) + +> Doua runde de verificare independenta (subagent context curat, §5.6). + +**Runda 1 — FAIL (1 criteriu):** suita 391 pass, toate criteriile US-007/010/011/012 confirmate + sweep +securitate complet (toate rutele noi `/cont/*`, `/_fragments/cont`, `/admin*` sub `require_login`/`require_admin` ++ `verify_csrf` pe POST; `/cont/*` scoped strict pe sesiune, nu accepta `account_id` din form; `/admin` nu +expune hash/chei/creds in clar). DAR `_migrate` nu adauga defensiv `users.is_admin`/`email_verified` → +o tabela `users` fara ele ar ceda cu `OperationalError` (acelasi tip de gap ca C1 pe `accounts.active`). → fix. + +**Fix:** bloc `# Coloane users` in `app/db.py::_migrate` (guard pe existenta tabelei + ALTER idempotent). 2 teste. + +**Runda 2 — PASS global (subagent NOU):** +- Suita: **393 passed**, 0 fail. +- Fix migrare confirmat (test pe `users` minima fara coloane → `_migrate` → coloane prezente; idempotent). +- E2E mod prod (`web_auth_required=true`): `GET /admin` fara cookie → 303 `/login`; non-admin logat → 403; + `POST /admin/activate` fara CSRF → 403. Rute `/cont/*` scoped pe sesiune, CSRF enforce, parola RAR niciodata in `value=`. +- US-010 bootstrap (primul signup → `is_admin=1`, al doilea → 0), CLI `set-admin`, `require_admin`→403 confirmate. +- US-012 `notify_signup` best-effort no-op fara SMTP + nu blocheaza signup + log `SIGNUP` pastrat. +- Regresia de aur: `test_import_e2e` + `test_api` + `test_worker_active_gate` = 31 pass. + +**Verdict: PASS.** Send live RAR ramane de confirmat manual la deploy (fara creds/retea in mediul VERIFY). +La deploy prod: `AUTOPASS_session_secret` persistent, `AUTOPASS_WEB_AUTH_REQUIRED=true`, optional `AUTOPASS_smtp_*`. diff --git a/tests/test_admin_panel.py b/tests/test_admin_panel.py new file mode 100644 index 0000000..b3f561f --- /dev/null +++ b/tests/test_admin_panel.py @@ -0,0 +1,213 @@ +"""Teste US-011 (PRD 3.3b): panou admin web /admin — conturi in asteptare + activare. + +TDD strict: testele se scriu INAINTE de implementare; la inceput pica (RED), +dupa implementare trec (GREEN). + +Fisiere testate: app/web/admin_routes.py, app/web/templates/admin.html. +""" + +from __future__ import annotations + +import os +import re +import tempfile + +import pytest +from starlette.testclient import TestClient + + +# --------------------------------------------------------------------------- +# Fixture client (web_auth_required=true -> CSRF enforce) +# --------------------------------------------------------------------------- + +@pytest.fixture() +def client(monkeypatch): + """TestClient pe aplicatia completa, cu DB izolata si web_auth_required=true.""" + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_admin_panel.db")) + monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true") + # Ridica limita rate-limit pentru signup ca testele nu se blocheze intre ele + monkeypatch.setenv("AUTOPASS_SIGNUP_RATE_MAX", "100") + from app.config import get_settings + get_settings.cache_clear() + # Curata hits-urile rate-limit intre teste + from app.web import ratelimit + ratelimit._hits.clear() + from app.main import app + with TestClient(app, follow_redirects=False) as c: + yield c + get_settings.cache_clear() + + +# --------------------------------------------------------------------------- +# Helper-e +# --------------------------------------------------------------------------- + +def _get_csrf(client: TestClient, url: str) -> str: + """Extrage csrf_token din pagina HTML.""" + resp = client.get(url, follow_redirects=True) + m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) + if not m: + m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) + assert m, f"csrf_token negasit in {url}: {resp.text[:500]}" + return m.group(1) + + +def _signup(client: TestClient, name: str, email: str, password: str = "parola_test_001") -> int: + """Creeaza cont via POST /signup si intoarce account_id.""" + token = _get_csrf(client, "/signup") + resp = client.post("/signup", data={ + "name": name, + "email": email, + "parola": password, + "csrf_token": token, + }, follow_redirects=True) + assert resp.status_code == 200, f"signup esuat: {resp.text[:300]}" + # Extrage account_id din raspuns (pagina afiseaza cheia rfak_ + account_id) + m = re.search(r"cont=(\d+)", resp.text) + if not m: + # fallback: citeste din DB + from app.db import get_connection + conn = get_connection() + row = conn.execute( + "SELECT account_id FROM users WHERE email=? COLLATE NOCASE", (email,) + ).fetchone() + conn.close() + assert row, f"userul {email} nu a fost creat" + return int(row["account_id"]) + return int(m.group(1)) + + +def _login(client: TestClient, email: str, password: str = "parola_test_001") -> None: + """Autentifica userul (seteaza sesiunea cookie).""" + token = _get_csrf(client, "/login") + resp = client.post("/login", data={ + "email": email, + "parola": password, + "csrf_token": token, + }, follow_redirects=False) + assert resp.status_code == 303, f"login esuat cu {email}: {resp.status_code} {resp.text[:200]}" + + +def _make_admin(account_id: int) -> None: + """Marcheaza contul ca admin direct in DB.""" + from app.db import get_connection + from app.users import set_admin + conn = get_connection() + try: + set_admin(conn, account_id, is_admin=True) + conn.commit() + finally: + conn.close() + + +def _get_account_active(account_id: int) -> bool: + """Citeste accounts.active din DB.""" + from app.db import get_connection + conn = get_connection() + try: + row = conn.execute("SELECT active FROM accounts WHERE id=?", (account_id,)).fetchone() + return bool(row["active"]) if row else False + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Cazuri de test +# --------------------------------------------------------------------------- + +def test_admin_vede_conturi_pending(client): + """Admin logat -> GET /admin contine numele contului pending (active=0).""" + # Creeaza un cont pending (active=0 implicit la signup) + _signup(client, "Service Pending SRL", "pending@test.ro") + + # Creeaza contul admin + admin_id = _signup(client, "Admin Corp SA", "admin@test.ro") + _make_admin(admin_id) + + # Login ca admin + _login(client, "admin@test.ro") + + resp = client.get("/admin") + assert resp.status_code == 200, f"GET /admin a returnat {resp.status_code}" + assert "Service Pending SRL" in resp.text, ( + f"Contul pending nu apare in /admin. Raspuns: {resp.text[:600]}" + ) + + +def test_activare_din_admin(client): + """POST /admin/activate cu CSRF -> accounts.active=1; redirect 303.""" + # Cont pending + pending_id = _signup(client, "Firma De Activat SRL", "firma@test.ro") + assert not _get_account_active(pending_id), "contul trebuie sa fie inactiv initial" + + # Cont admin + admin_id = _signup(client, "Admin Activator SA", "activator@test.ro") + _make_admin(admin_id) + _login(client, "activator@test.ro") + + # Obtine CSRF din pagina /admin + resp = client.get("/admin") + assert resp.status_code == 200 + m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) + if not m: + m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) + assert m, "csrf_token negasit in /admin" + csrf = m.group(1) + + resp2 = client.post("/admin/activate", data={ + "account_id": str(pending_id), + "csrf_token": csrf, + }) + assert resp2.status_code == 303, ( + f"POST /admin/activate trebuia redirect 303, got {resp2.status_code}: {resp2.text[:300]}" + ) + + assert _get_account_active(pending_id), "contul trebuia sa fie activat dupa POST /admin/activate" + + +def test_non_admin_403(client): + """User logat NON-admin -> GET /admin -> 403. + + Creeaza intai un cont admin (bootstrap: primul user devine admin), + apoi un al doilea cont (non-admin) si verifica ca al doilea primeste 403. + """ + # Primul signup devine automat admin (bootstrap US-010) + _signup(client, "Admin Bootstrap SA", "bootstrap@test.ro") + + # Al doilea user NU e admin + _signup(client, "User Simplu SRL", "user@test.ro") + _login(client, "user@test.ro") + + resp = client.get("/admin") + assert resp.status_code == 403, ( + f"User non-admin trebuia 403 pe /admin, got {resp.status_code}" + ) + + +def test_admin_nelogat_redirect(client): + """Fara sesiune -> GET /admin -> 303 redirect la /login.""" + resp = client.get("/admin") + assert resp.status_code == 303, ( + f"Nelogat pe /admin trebuia 303, got {resp.status_code}" + ) + loc = resp.headers.get("location", "") + assert "/login" in loc, f"Redirect gresit: {loc}" + + +def test_activate_fara_csrf_403(client): + """Admin logat, POST /admin/activate fara token CSRF -> 403.""" + pending_id = _signup(client, "Firma Fara CSRF SRL", "nocsrf@test.ro") + + admin_id = _signup(client, "Admin CSRF Test SA", "csrfadmin@test.ro") + _make_admin(admin_id) + _login(client, "csrfadmin@test.ro") + + # POST fara token (sau token gol) + resp = client.post("/admin/activate", data={ + "account_id": str(pending_id), + "csrf_token": "", + }) + assert resp.status_code == 403, ( + f"POST fara CSRF trebuia 403, got {resp.status_code}" + ) diff --git a/tests/test_admin_role.py b/tests/test_admin_role.py new file mode 100644 index 0000000..8945b6d --- /dev/null +++ b/tests/test_admin_role.py @@ -0,0 +1,277 @@ +"""Teste US-010 (PRD 3.3b): rol admin + bootstrap + guard require_admin. + +Fisiere testate: app/users.py (helper-e admin), app/web/session.py (require_admin), +tools/account.py (subcomanda set-admin). +""" + +from __future__ import annotations + +import os +import tempfile + +import pytest +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +from starlette.middleware.sessions import SessionMiddleware +from starlette.testclient import TestClient + + +# --------------------------------------------------------------------------- +# Fixture DB +# --------------------------------------------------------------------------- + +@pytest.fixture() +def conn(monkeypatch): + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_admin_role.db")) + from app.config import get_settings + get_settings.cache_clear() + from app.db import get_connection, init_db + init_db() + c = get_connection() + yield c + c.close() + get_settings.cache_clear() + + +@pytest.fixture() +def account_id(conn): + """Cont de test (nu default id=1).""" + from app.accounts import create_account + return create_account(conn, "Service Test Admin") + + +@pytest.fixture() +def user_id(conn, account_id): + """User de test pe contul de test.""" + from app.users import create_user + return create_user(conn, account_id, "admin_test@exemplu.ro", "parola_sigura_123") + + +@pytest.fixture() +def env_cli(monkeypatch): + """Fixture pentru CLI: DB separata + clear settings cache.""" + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_admin_cli.db")) + from app.config import get_settings + get_settings.cache_clear() + from app.db import init_db + init_db() + yield + get_settings.cache_clear() + + +# --------------------------------------------------------------------------- +# Helper app minimal pentru require_admin +# --------------------------------------------------------------------------- + +def _make_app_admin() -> FastAPI: + """App FastAPI minimal cu ruta protejata de require_admin.""" + mini = FastAPI() + mini.add_middleware( + SessionMiddleware, + secret_key="test-secret-admin", + session_cookie="autopass_session", + https_only=False, + same_site="strict", + ) + + from app.web.session import LoginRequired, AdminRequired + + @mini.exception_handler(LoginRequired) + async def login_required_handler(request: Request, exc: LoginRequired): + return JSONResponse(status_code=401, content={"detail": "neautentificat"}) + + @mini.exception_handler(AdminRequired) + async def admin_required_handler(request: Request, exc: AdminRequired): + return JSONResponse(status_code=403, content={"detail": "acces interzis (necesita admin)"}) + + @mini.get("/set-session") + def set_sess(request: Request, account_id: int = 1, user_id: int = 1): + from app.web.session import set_session + set_session(request, account_id, user_id) + return {"ok": True} + + @mini.get("/admin-only") + def admin_only(request: Request): + from app.web.session import require_admin + aid = require_admin(request) + return {"account_id": aid} + + return mini + + +# --------------------------------------------------------------------------- +# Teste helper-e app/users.py +# --------------------------------------------------------------------------- + +def test_count_admins_initial_zero(conn): + """Fara niciun user cu is_admin=1, count_admins returneaza 0.""" + from app.users import count_admins + assert count_admins(conn) == 0 + + +def test_set_admin_marcheaza_userii_contului(conn, account_id, user_id): + """set_admin(conn, account_id) seteaza is_admin=1 pe toti userii contului.""" + from app.users import set_admin, count_admins + assert count_admins(conn) == 0 + set_admin(conn, account_id, is_admin=True) + assert count_admins(conn) == 1 + + +def test_create_user_is_admin_flag(conn, account_id): + """create_user cu is_admin=True seteaza coloana la 1.""" + from app.users import create_user, count_admins + create_user(conn, account_id, "newadmin@exemplu.ro", "parola_sigura_456", is_admin=True) + assert count_admins(conn) == 1 + + +def test_is_account_admin(conn, account_id, user_id): + """is_account_admin returneaza False inainte si True dupa set_admin.""" + from app.users import is_account_admin, set_admin + assert is_account_admin(conn, account_id) is False + set_admin(conn, account_id, is_admin=True) + assert is_account_admin(conn, account_id) is True + + +def test_list_admin_emails(conn, account_id): + """list_admin_emails returneaza emailurile userilor cu is_admin=1.""" + from app.users import create_user, set_admin, list_admin_emails + uid = create_user(conn, account_id, "admin1@exemplu.ro", "parola_sigura_789") + # Inainte de set_admin -> lista goala + assert list_admin_emails(conn) == [] + set_admin(conn, account_id, is_admin=True) + emails = list_admin_emails(conn) + assert "admin1@exemplu.ro" in emails + + +def test_set_admin_cont_inexistent_valueerror(conn): + """set_admin pe cont care nu exista ridica ValueError.""" + from app.users import set_admin + with pytest.raises(ValueError, match="cont inexistent"): + set_admin(conn, 9999, is_admin=True) + + +def test_set_admin_cont_fara_users_silentios(conn): + """set_admin pe cont existent fara useri e no-op silentios (nu ridica exceptie).""" + from app.accounts import create_account + from app.users import set_admin, count_admins + acct_fara_user = create_account(conn, "Cont Fara User") + # Nu trebuie sa ridice + set_admin(conn, acct_fara_user, is_admin=True) + assert count_admins(conn) == 0 # niciun user modificat + + +# --------------------------------------------------------------------------- +# Teste require_admin (app/web/session.py) prin TestClient +# --------------------------------------------------------------------------- + +@pytest.fixture() +def client_admin(monkeypatch): + """TestClient pe app cu require_admin; DB cu cont admin + cont non-admin.""" + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_require_admin.db")) + monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true") + from app.config import get_settings + get_settings.cache_clear() + from app.db import get_connection, init_db + init_db() + conn = get_connection() + from app.accounts import create_account + from app.users import create_user, set_admin + # Cont admin (id=2) + acct_admin = create_account(conn, "Admin Corp") + uid_admin = create_user(conn, acct_admin, "admin@corp.ro", "parola_admin_001") + set_admin(conn, acct_admin, is_admin=True) + # Cont non-admin (id=3) + acct_user = create_account(conn, "User Corp") + uid_user = create_user(conn, acct_user, "user@corp.ro", "parola_user_001") + conn.close() + + app = _make_app_admin() + with TestClient(app, follow_redirects=False) as c: + yield c, acct_admin, uid_admin, acct_user, uid_user + get_settings.cache_clear() + + +def test_require_admin_blocheaza_non_admin(client_admin): + """User logat NON-admin pe ruta admin -> 403.""" + client, acct_admin, uid_admin, acct_user, uid_user = client_admin + client.get(f"/set-session?account_id={acct_user}&user_id={uid_user}") + resp = client.get("/admin-only") + assert resp.status_code == 403 + assert "admin" in resp.json()["detail"] + + +def test_require_admin_lasa_admin(client_admin): + """User logat ADMIN pe ruta admin -> 200 cu account_id.""" + client, acct_admin, uid_admin, acct_user, uid_user = client_admin + client.get(f"/set-session?account_id={acct_admin}&user_id={uid_admin}") + resp = client.get("/admin-only") + assert resp.status_code == 200 + assert resp.json()["account_id"] == acct_admin + + +def test_require_admin_nelogat_ridica_login_required(client_admin): + """Fara sesiune, require_admin ridica LoginRequired (-> 401 in app-ul nostru de test).""" + client, *_ = client_admin + resp = client.get("/admin-only") + # In app-ul de test, LoginRequired -> 401 + assert resp.status_code == 401 + + +# --------------------------------------------------------------------------- +# Teste CLI tools/account.py set-admin +# --------------------------------------------------------------------------- + +def _run_account(argv): + from tools.account import main + return main(argv) + + +def test_cli_set_admin_marcheaza_contul(env_cli, capsys): + """CLI set-admin --account N seteaza is_admin=1 pe userii contului.""" + # Creeaza cont cu user + from app.db import get_connection + from app.accounts import create_account + from app.users import create_user, is_account_admin + conn = get_connection() + acct_id = create_account(conn, "Service CLI Admin") + create_user(conn, acct_id, "cli_admin@corp.ro", "parola_cli_admin_1") + conn.close() + + rc = _run_account(["set-admin", "--account", str(acct_id)]) + out = capsys.readouterr().out + assert rc == 0 + assert str(acct_id) in out or "admin" in out.lower() + + conn2 = get_connection() + assert is_account_admin(conn2, acct_id) is True + conn2.close() + + +def test_cli_set_admin_remove(env_cli, capsys): + """CLI set-admin --account N --remove scoate adminul.""" + from app.db import get_connection + from app.accounts import create_account + from app.users import create_user, set_admin, is_account_admin + conn = get_connection() + acct_id = create_account(conn, "Service CLI Remove") + create_user(conn, acct_id, "remove@corp.ro", "parola_remove_001") + set_admin(conn, acct_id, is_admin=True) + conn.close() + + rc = _run_account(["set-admin", "--account", str(acct_id), "--remove"]) + assert rc == 0 + + conn2 = get_connection() + assert is_account_admin(conn2, acct_id) is False + conn2.close() + + +def test_cli_set_admin_cont_inexistent_exit_2(env_cli, capsys): + """CLI set-admin pe cont inexistent -> exit code 2 + mesaj pe stderr.""" + rc = _run_account(["set-admin", "--account", "9999"]) + err = capsys.readouterr().err + assert rc == 2 + assert "inexistent" in err or "eroare" in err.lower() diff --git a/tests/test_dashboard_admin_link.py b/tests/test_dashboard_admin_link.py new file mode 100644 index 0000000..ff0530c --- /dev/null +++ b/tests/test_dashboard_admin_link.py @@ -0,0 +1,55 @@ +"""Test US-011 (discoverability): linkul 'Panou admin' apare pe dashboard doar pentru admini. + +Completeaza intentia US-011 — adminul trebuie sa poata descoperi /admin din UI, nu doar +prin URL direct. +""" + +from __future__ import annotations + +import os +import tempfile + +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture() +def env(monkeypatch): + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) + from app.config import get_settings + get_settings.cache_clear() + from app.main import app + with TestClient(app, follow_redirects=False) as c: + from app.db import get_connection + conn = get_connection() + yield c, conn + conn.close() + get_settings.cache_clear() + + +def _account_with_user(conn, name, *, is_admin): + from app.accounts import create_account + from app.users import create_user + acct = create_account(conn, name) + email = f"{name.replace(' ', '').lower()}@test.ro" + create_user(conn, acct, email, "parolaSuperSecreta", is_admin=is_admin) + return acct + + +def test_admin_vede_link_panou_admin(env, monkeypatch): + client, conn = env + acct = _account_with_user(conn, "Admin Co", is_admin=True) + monkeypatch.setattr("app.web.routes.require_login", lambda r: acct) + r = client.get("/") + assert r.status_code == 200 + assert 'href="/admin"' in r.text + + +def test_non_admin_nu_vede_link(env, monkeypatch): + client, conn = env + acct = _account_with_user(conn, "Service Normal", is_admin=False) + monkeypatch.setattr("app.web.routes.require_login", lambda r: acct) + r = client.get("/") + assert r.status_code == 200 + assert 'href="/admin"' not in r.text diff --git a/tests/test_migrate_users.py b/tests/test_migrate_users.py new file mode 100644 index 0000000..be9fb50 --- /dev/null +++ b/tests/test_migrate_users.py @@ -0,0 +1,124 @@ +"""Teste migrare defensiva coloane users (is_admin, email_verified). + +TDD: RED -> implementare in _migrate -> GREEN. +""" + +from __future__ import annotations + +import sqlite3 +import tempfile +from pathlib import Path + +import pytest + +from app.db import _migrate, init_db, get_connection +from app.config import get_settings + + +# --------------------------------------------------------------------------- +# Test 1: _migrate adauga is_admin si email_verified pe o tabela users minima +# --------------------------------------------------------------------------- + +def test_migrate_adauga_is_admin_pe_users_veche() -> None: + """Pe un DB cu tabela users creata fara is_admin/email_verified, + _migrate trebuie sa adauge ambele coloane.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + db_path = Path(f.name) + + conn = sqlite3.connect(str(db_path), isolation_level=None) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA foreign_keys = OFF") # evitam FK checks pe schema minima + + # Tabela accounts minima (necesara pentru FK la users). + # Trebuie sa aiba cui + rar_creds_enc + active ca _migrate sa nu crape + # pe ALTER/CREATE INDEX care le refera inainte de blocul users. + conn.execute(""" + CREATE TABLE accounts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + cui TEXT, + active INTEGER NOT NULL DEFAULT 1, + rar_creds_enc TEXT + ) + """) + conn.execute("INSERT INTO accounts (id, name) VALUES (1, 'default')") + + # Tabela submissions minima (necesara pentru _migrate -- coloane submissions) + conn.execute(""" + CREATE TABLE submissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + idempotency_key TEXT NOT NULL UNIQUE, + account_id INTEGER, + status TEXT NOT NULL DEFAULT 'queued', + payload_json TEXT NOT NULL, + rar_creds_enc TEXT, + rar_status_code INTEGER, + rar_error TEXT, + id_prezentare INTEGER, + retry_count INTEGER NOT NULL DEFAULT 0, + next_attempt_at TEXT, + sending_since TEXT, + purge_after TEXT, + batch_id INTEGER, + row_index INTEGER, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """) + + # Tabela users MINIMA: fara is_admin, fara email_verified + conn.execute(""" + CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + account_id INTEGER NOT NULL, + email TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL, + salt TEXT NOT NULL, + scrypt_params TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """) + + try: + # Rulam migrarea + _migrate(conn) + + # Verificam ca ambele coloane au fost adaugate + cols = {r["name"] for r in conn.execute("PRAGMA table_info(users)").fetchall()} + assert "is_admin" in cols, f"is_admin lipseste dupa migrare; coloane prezente: {cols}" + assert "email_verified" in cols, f"email_verified lipseste dupa migrare; coloane prezente: {cols}" + finally: + conn.close() + db_path.unlink(missing_ok=True) + + +# --------------------------------------------------------------------------- +# Test 2: _migrate este idempotent pe un DB initializat normal cu init_db() +# --------------------------------------------------------------------------- + +def test_migrate_idempotent_pe_users_curenta(tmp_path: pytest.TempPathFactory, monkeypatch: pytest.MonkeyPatch) -> None: + """Pe un DB initializat normal, re-apelarea _migrate nu ridica exceptie + si coloanele is_admin/email_verified raman prezente.""" + db_file = tmp_path / "test_idem.db" + monkeypatch.setenv("AUTOPASS_DB_PATH", str(db_file)) + + # Resetam settings-ul cached ca sa preia noul AUTOPASS_DB_PATH + get_settings.cache_clear() # type: ignore[attr-defined] + + try: + # Initializare normala + init_db() + + # Re-apelam _migrate direct (idempotenta) + conn = get_connection() + try: + _migrate(conn) # nu trebuie sa ridice + + cols = {r["name"] for r in conn.execute("PRAGMA table_info(users)").fetchall()} + assert "is_admin" in cols + assert "email_verified" in cols + finally: + conn.close() + finally: + get_settings.cache_clear() # type: ignore[attr-defined] diff --git a/tests/test_signup_notify.py b/tests/test_signup_notify.py new file mode 100644 index 0000000..a8ca676 --- /dev/null +++ b/tests/test_signup_notify.py @@ -0,0 +1,181 @@ +"""Teste US-012 (PRD 3.3b): notificare email admin la signup (degradat) + bootstrap admin. + +TDD: testele se scriu INAINTE de implementare; la inceput pica (RED), +dupa implementare trec (GREEN). +""" + +from __future__ import annotations + +import os +import tempfile + +import pytest +from starlette.testclient import TestClient + + +# --------------------------------------------------------------------------- +# Fixture client (pattern identic cu test_web_signup.py) +# --------------------------------------------------------------------------- + +@pytest.fixture() +def client(monkeypatch): + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) + monkeypatch.setenv("AUTOPASS_SIGNUP_RATE_MAX", "100") + from app.config import get_settings + get_settings.cache_clear() + from app.web import ratelimit + ratelimit._hits.clear() + from app.main import app + with TestClient(app) as c: + yield c + get_settings.cache_clear() + + +def _csrf(c: TestClient) -> str: + """Obtine un token CSRF proaspat de la GET /signup.""" + import re + resp = c.get("/signup") + assert resp.status_code == 200 + m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) + if not m: + m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) + assert m, "csrf_token negasit in HTML" + return m.group(1) + + +def _do_signup(c: TestClient, name: str, email: str, parola: str = "parolasecreta") -> object: + token = _csrf(c) + return c.post("/signup", data={ + "name": name, + "email": email, + "parola": parola, + "csrf_token": token, + }) + + +# --------------------------------------------------------------------------- +# Teste notify_signup (unitare, fara TestClient) +# --------------------------------------------------------------------------- + +def test_notify_noop_fara_smtp(monkeypatch): + """smtp_host None -> notify_signup nu ridica si nu incearca sa trimita.""" + import smtplib + + # Daca smtplib.SMTP e apelat, testul pica + def fake_smtp(*a, **kw): + raise AssertionError("smtplib.SMTP nu trebuia apelat fara smtp_host configurat") + + monkeypatch.setattr(smtplib, "SMTP", fake_smtp) + + # Asigura smtp_host = None + from app.config import get_settings + get_settings.cache_clear() + monkeypatch.setenv("AUTOPASS_SMTP_HOST", "") # string gol -> None in Settings + get_settings.cache_clear() + + from app.email import notify_signup + # Nu trebuie sa ridice + notify_signup(["admin@test.com"], account_id=1, email="nou@test.com") + + get_settings.cache_clear() + + +def test_notify_nu_blocheaza_la_eroare(monkeypatch): + """smtp_host setat, SMTP ridica exceptie -> notify_signup returneaza normal (best-effort).""" + import smtplib + + class FakeSMTP: + def __init__(self, *a, **kw): + raise ConnectionRefusedError("simulam eroare retea") + + monkeypatch.setattr(smtplib, "SMTP", FakeSMTP) + + monkeypatch.setenv("AUTOPASS_SMTP_HOST", "smtp.test.local") + monkeypatch.setenv("AUTOPASS_SMTP_PORT", "587") + monkeypatch.setenv("AUTOPASS_SMTP_FROM", "autopass@test.local") + from app.config import get_settings + get_settings.cache_clear() + + from app.email import notify_signup + # Nu trebuie sa ridice, chiar daca SMTP esueaza + notify_signup(["admin@test.com"], account_id=5, email="nou@test.com") + + get_settings.cache_clear() + + +# --------------------------------------------------------------------------- +# Teste bootstrap admin (prin TestClient) +# --------------------------------------------------------------------------- + +def test_primul_signup_devine_admin(client): + """Primul signup -> userul are is_admin=1 (count_admins==1). + Al doilea signup cu alt email -> is_admin=0 (count_admins ramane 1).""" + # Primul signup + resp = _do_signup(client, "Primul Service", "primul@test.com") + assert resp.status_code == 200 + assert "rfak_" in resp.text + + from app.db import get_connection + from app.users import count_admins + conn = get_connection() + try: + n_admins = count_admins(conn) + assert n_admins == 1, f"Dupa primul signup, count_admins trebuie sa fie 1, nu {n_admins}" + user = conn.execute( + "SELECT is_admin FROM users WHERE email='primul@test.com'" + ).fetchone() + assert user is not None + assert user["is_admin"] == 1, "Primul user trebuie sa fie admin" + finally: + conn.close() + + # Al doilea signup + resp2 = _do_signup(client, "Al Doilea Service", "aldoilea@test.com") + assert resp2.status_code == 200 + assert "rfak_" in resp2.text + + conn = get_connection() + try: + n_admins = count_admins(conn) + assert n_admins == 1, f"Dupa al doilea signup, count_admins trebuie sa ramana 1, nu {n_admins}" + user2 = conn.execute( + "SELECT is_admin FROM users WHERE email='aldoilea@test.com'" + ).fetchone() + assert user2 is not None + assert user2["is_admin"] == 0, "Al doilea user NU trebuie sa fie admin" + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Teste C16 (log SIGNUP pastrat) si best-effort E2E +# --------------------------------------------------------------------------- + +def test_signup_inca_logheaza_si_notifica(client, capsys): + """Signup reusit -> stdout contine 'SIGNUP cont=' (C16 pastrat).""" + resp = _do_signup(client, "Service Log Test", "log@test.com") + assert resp.status_code == 200 + assert "rfak_" in resp.text + + captured = capsys.readouterr() + assert "SIGNUP cont=" in captured.out, ( + f"Linia de log C16 'SIGNUP cont=' lipseste din stdout. Capturat: {captured.out!r}" + ) + + +def test_signup_neblocat_de_notify(monkeypatch, client): + """notify_signup ridica -> signup returneaza totusi 200 cu cheia (best-effort E2E).""" + # Monkeypatch notify_signup sa ridice + def notify_always_raises(*a, **kw): + raise RuntimeError("simulam eroare fatala in notify") + + # Importam modulul inainte de monkeypatch + import app.email as email_mod + monkeypatch.setattr(email_mod, "notify_signup", notify_always_raises) + + resp = _do_signup(client, "Service Robust", "robust@test.com") + assert resp.status_code == 200, ( + f"Signup trebuia sa reuseasca indiferent de eroarea din notify. status={resp.status_code}" + ) + assert "rfak_" in resp.text, "Cheia API trebuia afisata chiar daca notify a esuat" diff --git a/tests/test_web_cont.py b/tests/test_web_cont.py new file mode 100644 index 0000000..73273c0 --- /dev/null +++ b/tests/test_web_cont.py @@ -0,0 +1,232 @@ +"""Teste US-007 (PRD 3.3b): sectiunea 'Contul meu' — rotire cheie API + creds RAR din UI. + +TDD: testele se scriu INAINTE de implementare; la inceput pica (RED), +dupa implementare trec (GREEN). + +Rute testate: +- GET /_fragments/cont -> card "Contul meu" +- POST /cont/roteste-cheie -> cheie noua afisata o singura data +- POST /cont/rar-creds -> seteaza rar_creds_enc per cont din sesiune +""" + +from __future__ import annotations + +import os +import re +import tempfile + +import pytest +from starlette.testclient import TestClient + + +@pytest.fixture() +def client(monkeypatch): + """Client fara web_auth_required (dev mode) — sesiunea se seteaza manual.""" + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) + from app.config import get_settings + get_settings.cache_clear() + from app.main import app + with TestClient(app, follow_redirects=False) as c: + yield c + get_settings.cache_clear() + + +@pytest.fixture() +def client_prod(monkeypatch): + """Client cu web_auth_required=True (mod prod) — CSRF enforce.""" + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) + monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true") + from app.config import get_settings + get_settings.cache_clear() + from app.main import app + with TestClient(app, follow_redirects=False) as c: + yield c + get_settings.cache_clear() + + +def _create_account_user(email: str = "user@test.com", password: str = "parolasecreta10"): + """Creeaza cont + user + cheie API initiala. Intoarce (acct_id, user_id, api_key).""" + from app.accounts import create_account + from app.users import create_user + from app.auth import create_api_key + from app.db import get_connection + + conn = get_connection() + try: + acct_id = create_account(conn, "Service Test", active=True) + user_id = create_user(conn, acct_id, email, password) + api_key = create_api_key(conn, acct_id) + return acct_id, user_id, api_key + finally: + conn.close() + + +def _login(client, email: str, password: str) -> None: + """Face login real prin HTTP si seteaza cookie-ul de sesiune pe client.""" + # Obtine CSRF token de pe pagina de login + resp = client.get("/login") + assert resp.status_code == 200 + m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) + if not m: + m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) + assert m, "csrf_token negasit pe /login" + csrf = m.group(1) + + resp = client.post("/login", data={ + "email": email, + "parola": password, + "csrf_token": csrf, + }) + # 303 redirect la / inseamna login reusit + assert resp.status_code == 303, f"Login esuat: {resp.status_code} {resp.text[:200]}" + + +def _get_csrf_from_fragment(client) -> str: + """Obtine CSRF token din fragmentul /_fragments/cont.""" + resp = client.get("/_fragments/cont") + assert resp.status_code == 200 + m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) + if not m: + m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) + assert m, f"csrf_token negasit in /_fragments/cont: {resp.text[:500]}" + return m.group(1) + + +# ============================================================ +# test_roteste_cheie_afisata_o_data +# ============================================================ + +def test_roteste_cheie_afisata_o_data(client): + """User logat roteste cheia: raspunsul contine 'rfak_'; cheia veche revocata.""" + acct_id, user_id, api_key_initiala = _create_account_user("roteste@test.com") + _login(client, "roteste@test.com", "parolasecreta10") + + csrf = _get_csrf_from_fragment(client) + resp = client.post("/cont/roteste-cheie", data={"csrf_token": csrf}) + + assert resp.status_code == 200 + assert "rfak_" in resp.text, f"Cheia noua nu apare in raspuns: {resp.text[:500]}" + + # Verifica in DB: cheia veche revocata, una noua activa + from app.db import get_connection + conn = get_connection() + try: + rows = conn.execute( + "SELECT id, active FROM api_keys WHERE account_id=? ORDER BY id", + (acct_id,), + ).fetchall() + # Trebuie sa avem minim 2 chei: cea initiala (active=0) si cea noua (active=1) + active_keys = [r for r in rows if r["active"] == 1] + inactive_keys = [r for r in rows if r["active"] == 0] + assert len(active_keys) == 1, f"Trebuia exact 1 cheie activa, gasit: {len(active_keys)}" + assert len(inactive_keys) >= 1, "Cheia veche trebuia revocata (active=0)" + finally: + conn.close() + + +# ============================================================ +# test_set_creds_rar_din_sesiune +# ============================================================ + +def test_set_creds_rar_din_sesiune(client): + """User logat seteaza creds RAR: accounts.rar_creds_enc != NULL, decriptabil.""" + acct_id, user_id, _ = _create_account_user("creds@test.com") + _login(client, "creds@test.com", "parolasecreta10") + + csrf = _get_csrf_from_fragment(client) + resp = client.post("/cont/rar-creds", data={ + "csrf_token": csrf, + "rar_email": "user@rar.ro", + "rar_parola": "parolaRAR123", + }) + + assert resp.status_code == 200 + # Mesaj de succes in raspuns + assert "succes" in resp.text.lower() or "salvat" in resp.text.lower() or "configurat" in resp.text.lower(), \ + f"Mesaj de succes lipsa: {resp.text[:500]}" + + # Verifica in DB: rar_creds_enc setat si decriptabil + from app.db import get_connection + from app.crypto import decrypt_creds + conn = get_connection() + try: + row = conn.execute( + "SELECT rar_creds_enc FROM accounts WHERE id=?", (acct_id,) + ).fetchone() + assert row is not None + assert row["rar_creds_enc"] is not None, "rar_creds_enc trebuia setat" + creds = decrypt_creds(row["rar_creds_enc"]) + assert creds is not None + assert creds.get("email") == "user@rar.ro" + assert creds.get("password") == "parolaRAR123" + finally: + conn.close() + + +# ============================================================ +# test_creds_alt_cont_neafectat +# ============================================================ + +def test_creds_alt_cont_neafectat(client): + """User A seteaza creds -> contul B ramane cu rar_creds_enc NULL.""" + acct_a, user_a, _ = _create_account_user("userA@test.com") + acct_b, user_b, _ = _create_account_user("userB@test.com") + + # Logam user A si setam creds + _login(client, "userA@test.com", "parolasecreta10") + csrf = _get_csrf_from_fragment(client) + resp = client.post("/cont/rar-creds", data={ + "csrf_token": csrf, + "rar_email": "a@rar.ro", + "rar_parola": "parolaA123", + }) + assert resp.status_code == 200 + + # Verifica: contul A are creds, contul B ramane NULL + from app.db import get_connection + conn = get_connection() + try: + row_a = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=?", (acct_a,)).fetchone() + row_b = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=?", (acct_b,)).fetchone() + assert row_a["rar_creds_enc"] is not None, "Contul A trebuia sa aiba creds" + assert row_b["rar_creds_enc"] is None, "Contul B nu trebuia atins" + finally: + conn.close() + + +# ============================================================ +# test_roteste_fara_csrf_403_in_prod +# ============================================================ + +def test_roteste_fara_csrf_403_in_prod(client_prod): + """Prod + sesiune autentificata + CSRF lipsa -> 403.""" + # Cream cont + user + acct_id, user_id, _ = _create_account_user("csrf_test@test.com") + + # Login real + _login(client_prod, "csrf_test@test.com", "parolasecreta10") + + # POST fara csrf_token (sau cu token gresit) + resp = client_prod.post("/cont/roteste-cheie", data={"csrf_token": "token_gresit"}) + assert resp.status_code == 403, f"Trebuia 403, got {resp.status_code}" + + +# ============================================================ +# test_fragment_cont_nelogat_redirect +# ============================================================ + +def test_fragment_cont_nelogat_redirect(monkeypatch): + """Fara sesiune + web_auth_required=True -> 303 redirect /login.""" + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t_nl.db")) + monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true") + from app.config import get_settings + get_settings.cache_clear() + from app.main import app + with TestClient(app, follow_redirects=False) as c: + resp = c.get("/_fragments/cont") + assert resp.status_code == 303 + assert "/login" in resp.headers.get("location", "") + get_settings.cache_clear() diff --git a/tools/account.py b/tools/account.py index c1ea8d6..19c020b 100644 --- a/tools/account.py +++ b/tools/account.py @@ -25,6 +25,7 @@ import sys from app.accounts import create_account, list_accounts, set_active from app.auth import create_api_key from app.db import get_connection, init_db +from app.users import set_admin def _create(conn: sqlite3.Connection, args: argparse.Namespace) -> int: @@ -67,6 +68,17 @@ def _set_active(conn: sqlite3.Connection, account_id: int, active: bool) -> int: return 0 +def _set_admin(conn: sqlite3.Connection, account_id: int, is_admin: bool) -> int: + try: + set_admin(conn, account_id, is_admin=is_admin) + except ValueError as exc: + print(f"eroare: {exc}", file=sys.stderr) + return 2 + actiune = "admin" if is_admin else "non-admin" + print(f"Cont {account_id}: marcat ca {actiune}") + return 0 + + def _list(conn: sqlite3.Connection, pending_only: bool) -> int: rows = list_accounts(conn) if pending_only: @@ -102,6 +114,10 @@ def main(argv: list[str] | None = None) -> int: p_deact = sub.add_parser("deactivate", help="dezactiveaza un cont") p_deact.add_argument("--account", type=int, required=True, help="account_id") + p_sadmin = sub.add_parser("set-admin", help="seteaza/sterge rol admin pe un cont") + p_sadmin.add_argument("--account", type=int, required=True, help="account_id") + p_sadmin.add_argument("--remove", action="store_true", help="sterge rolul admin (implicit: adauga)") + args = parser.parse_args(argv) init_db() # asigura schema (accounts.active + index CUI) + cont default @@ -115,6 +131,8 @@ def main(argv: list[str] | None = None) -> int: return _set_active(conn, args.account, True) if args.cmd == "deactivate": return _set_active(conn, args.account, False) + if args.cmd == "set-admin": + return _set_admin(conn, args.account, is_admin=not args.remove) finally: conn.close() return 0