"""Helper-e utilizatori web (email + parola scrypt). Parola NICIODATA stocata in clar. Fiecare user are un salt per-user generat cu secrets.token_bytes(16). Parametrii scrypt stocati ca eticheta de versiune pentru migrare cost viitoare. """ from __future__ import annotations import hashlib import hmac import secrets import sqlite3 SCRYPT_PARAMS = "n16384_r8_p1" _N = 2**14 _R = 8 _P = 1 _DKLEN = 32 _MAXMEM = 64 * 1024 * 1024 _PASSWORD_MIN = 10 _PASSWORD_MAX = 128 def _parse_scrypt_params(label: str) -> tuple[int, int, int] | None: """Parseaza 'nN_rR_pP' -> (N, R, P). Returneaza None la format necunoscut/corupt.""" try: parts = label.split("_") if len(parts) != 3 or parts[0][0] != "n" or parts[1][0] != "r" or parts[2][0] != "p": return None return (int(parts[0][1:]), int(parts[1][1:]), int(parts[2][1:])) except (ValueError, IndexError): return None def _scrypt_hash(password: str, salt: bytes, n: int = _N, r: int = _R, p: int = _P) -> bytes: return hashlib.scrypt( password.encode("utf-8"), salt=salt, n=n, r=r, p=p, maxmem=_MAXMEM, dklen=_DKLEN, ) def create_user( conn: sqlite3.Connection, account_id: int, email: str, password: str, is_admin: bool = False, ) -> int: """Creeaza un user nou si intoarce id-ul. Valideaza ca: contul exista, parola intre 10 si 128 caractere, emailul nu e duplicat. Stocheaza DOAR hash scrypt + salt (hex), niciodata parola in clar. Email duplicat (case-insensitive, via UNIQUE COLLATE NOCASE) -> ValueError. is_admin: daca True, userul e marcat ca admin (is_admin=1). Apelantul decide logica de bootstrap (count_admins==0 -> primul cont devine admin). """ email = email.strip() acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone() if not acct: raise ValueError(f"cont inexistent: {account_id}") if len(password) < _PASSWORD_MIN: raise ValueError(f"parola prea scurta (minim {_PASSWORD_MIN} caractere)") if len(password) > _PASSWORD_MAX: raise ValueError(f"parola prea lunga (maxim {_PASSWORD_MAX} caractere, anti-DoS)") salt = secrets.token_bytes(16) pw_hash = _scrypt_hash(password, salt) try: cur = conn.execute( "INSERT INTO users (account_id, email, password_hash, salt, scrypt_params, is_admin) " "VALUES (?, ?, ?, ?, ?, ?)", (account_id, email, pw_hash.hex(), salt.hex(), SCRYPT_PARAMS, 1 if is_admin else 0), ) except sqlite3.IntegrityError: raise ValueError("email deja folosit") return int(cur.lastrowid or 0) def count_admins(conn: sqlite3.Connection) -> int: """Numara userii cu is_admin=1 din intreaga baza.""" row = conn.execute("SELECT COUNT(*) AS n FROM users WHERE is_admin=1").fetchone() return int(row["n"]) if row else 0 def set_admin(conn: sqlite3.Connection, account_id: int, is_admin: bool = True) -> None: """Seteaza/sterge rolul admin pe toti userii contului dat. Ridica ValueError daca contul nu exista. Daca contul exista dar nu are useri, e no-op silentios. """ acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone() if not acct: raise ValueError(f"cont inexistent: {account_id}") conn.execute( "UPDATE users SET is_admin=? WHERE account_id=?", (1 if is_admin else 0, account_id), ) def is_account_admin(conn: sqlite3.Connection, account_id: int) -> bool: """Returneaza True daca cel putin un user al contului are is_admin=1.""" row = conn.execute( "SELECT 1 FROM users WHERE account_id=? AND is_admin=1 LIMIT 1", (account_id,), ).fetchone() return row is not None def list_admin_emails(conn: sqlite3.Connection) -> list[str]: """Returneaza emailurile tuturor userilor cu is_admin=1.""" rows = conn.execute( "SELECT email FROM users WHERE is_admin=1" ).fetchall() return [row["email"] for row in rows] def verify_password(conn: sqlite3.Connection, email: str, password: str) -> int | None: """Verifica parola pentru email. Intoarce account_id la potrivire, None altfel. Nu distinge intre email inexistent si parola gresita (evita enumerare useri). Comparatie constant-time cu hmac.compare_digest. """ row = conn.execute( "SELECT account_id, password_hash, salt, scrypt_params FROM users " "WHERE email=? COLLATE NOCASE", (email.strip(),), ).fetchone() if row is None: # Executa un hash dummy pentru a evita timing oracle pe email inexistent _scrypt_hash(password, b"\x00" * 16) return None salt = bytes.fromhex(row["salt"]) expected = bytes.fromhex(row["password_hash"]) params = _parse_scrypt_params(row["scrypt_params"] or "") if params is None: return None n, r, p = params actual = _scrypt_hash(password, salt, n=n, r=r, p=p) if hmac.compare_digest(actual, expected): return int(row["account_id"]) return None def get_user_by_email(conn: sqlite3.Connection, email: str) -> dict | None: """Metadate user dupa email (FARA password_hash si salt).""" row = conn.execute( "SELECT id, account_id, email, is_admin, email_verified, created_at " "FROM users WHERE email=? COLLATE NOCASE", (email.strip(),), ).fetchone() return dict(row) if row else None