Eliminat zgomotul de trasabilitate (US-xxx, PRD x.x, Rn, OV-x, Tn, decizii/naratiune istorica) din 41 fisiere app/ + template-uri. Pastrate comentariile care documenteaza invarianti si logica ne-evidenta (idempotenta/hash, reconciliere anti-duplicat, RAR 500 esec definitiv, creds per cont, WAF User-Agent, 422 fara echo de parola, scope NULL->1), curatate doar de tokeni. Verificare: pentru cele 27 module .py curatate, structura de cod (tokeni non-comentariu/ non-string) e IDENTICA fata de HEAD -> doar comentarii/docstring-uri schimbate. Singura schimbare de cod e in tests/test_web_responsive.py (scos 3 assert pe markeri US-006/007/008, inlocuite de asertiunile structurale alaturate). 0 tokeni US/PRD reziduali in app/. Regresie: 896 passed, 1 deselected. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
168 lines
5.4 KiB
Python
168 lines
5.4 KiB
Python
"""Helper-e utilizatori web (email + parola scrypt).
|
|
|
|
Parola NICIODATA stocata in clar. Fiecare user are un salt per-user generat cu
|
|
secrets.token_bytes(16). Parametrii scrypt stocati ca eticheta de versiune pentru
|
|
migrare cost viitoare.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import secrets
|
|
import sqlite3
|
|
|
|
SCRYPT_PARAMS = "n16384_r8_p1"
|
|
_N = 2**14
|
|
_R = 8
|
|
_P = 1
|
|
_DKLEN = 32
|
|
_MAXMEM = 64 * 1024 * 1024
|
|
|
|
_PASSWORD_MIN = 10
|
|
_PASSWORD_MAX = 128
|
|
|
|
|
|
def _parse_scrypt_params(label: str) -> tuple[int, int, int] | None:
|
|
"""Parseaza 'nN_rR_pP' -> (N, R, P). Returneaza None la format necunoscut/corupt."""
|
|
try:
|
|
parts = label.split("_")
|
|
if len(parts) != 3 or parts[0][0] != "n" or parts[1][0] != "r" or parts[2][0] != "p":
|
|
return None
|
|
return (int(parts[0][1:]), int(parts[1][1:]), int(parts[2][1:]))
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def _scrypt_hash(password: str, salt: bytes, n: int = _N, r: int = _R, p: int = _P) -> bytes:
|
|
return hashlib.scrypt(
|
|
password.encode("utf-8"),
|
|
salt=salt,
|
|
n=n,
|
|
r=r,
|
|
p=p,
|
|
maxmem=_MAXMEM,
|
|
dklen=_DKLEN,
|
|
)
|
|
|
|
|
|
def create_user(
|
|
conn: sqlite3.Connection,
|
|
account_id: int,
|
|
email: str,
|
|
password: str,
|
|
is_admin: bool = False,
|
|
) -> int:
|
|
"""Creeaza un user nou si intoarce id-ul.
|
|
|
|
Valideaza ca: contul exista, parola intre 10 si 128 caractere, emailul nu e duplicat.
|
|
Stocheaza DOAR hash scrypt + salt (hex), niciodata parola in clar.
|
|
Email duplicat (case-insensitive, via UNIQUE COLLATE NOCASE) -> ValueError.
|
|
|
|
is_admin: daca True, userul e marcat ca admin (is_admin=1). Apelantul decide
|
|
logica de bootstrap (count_admins==0 -> primul cont devine admin).
|
|
"""
|
|
email = email.strip()
|
|
|
|
acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone()
|
|
if not acct:
|
|
raise ValueError(f"cont inexistent: {account_id}")
|
|
|
|
if len(password) < _PASSWORD_MIN:
|
|
raise ValueError(f"parola prea scurta (minim {_PASSWORD_MIN} caractere)")
|
|
if len(password) > _PASSWORD_MAX:
|
|
raise ValueError(f"parola prea lunga (maxim {_PASSWORD_MAX} caractere, anti-DoS)")
|
|
|
|
salt = secrets.token_bytes(16)
|
|
pw_hash = _scrypt_hash(password, salt)
|
|
|
|
try:
|
|
cur = conn.execute(
|
|
"INSERT INTO users (account_id, email, password_hash, salt, scrypt_params, is_admin) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(account_id, email, pw_hash.hex(), salt.hex(), SCRYPT_PARAMS, 1 if is_admin else 0),
|
|
)
|
|
except sqlite3.IntegrityError:
|
|
raise ValueError("email deja folosit")
|
|
|
|
return int(cur.lastrowid or 0)
|
|
|
|
|
|
def count_admins(conn: sqlite3.Connection) -> int:
|
|
"""Numara userii cu is_admin=1 din intreaga baza."""
|
|
row = conn.execute("SELECT COUNT(*) AS n FROM users WHERE is_admin=1").fetchone()
|
|
return int(row["n"]) if row else 0
|
|
|
|
|
|
def set_admin(conn: sqlite3.Connection, account_id: int, is_admin: bool = True) -> None:
|
|
"""Seteaza/sterge rolul admin pe toti userii contului dat.
|
|
|
|
Ridica ValueError daca contul nu exista.
|
|
Daca contul exista dar nu are useri, e no-op silentios.
|
|
"""
|
|
acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone()
|
|
if not acct:
|
|
raise ValueError(f"cont inexistent: {account_id}")
|
|
conn.execute(
|
|
"UPDATE users SET is_admin=? WHERE account_id=?",
|
|
(1 if is_admin else 0, account_id),
|
|
)
|
|
|
|
|
|
def is_account_admin(conn: sqlite3.Connection, account_id: int) -> bool:
|
|
"""Returneaza True daca cel putin un user al contului are is_admin=1."""
|
|
row = conn.execute(
|
|
"SELECT 1 FROM users WHERE account_id=? AND is_admin=1 LIMIT 1",
|
|
(account_id,),
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
def list_admin_emails(conn: sqlite3.Connection) -> list[str]:
|
|
"""Returneaza emailurile tuturor userilor cu is_admin=1."""
|
|
rows = conn.execute(
|
|
"SELECT email FROM users WHERE is_admin=1"
|
|
).fetchall()
|
|
return [row["email"] for row in rows]
|
|
|
|
|
|
def verify_password(conn: sqlite3.Connection, email: str, password: str) -> int | None:
|
|
"""Verifica parola pentru email. Intoarce account_id la potrivire, None altfel.
|
|
|
|
Nu distinge intre email inexistent si parola gresita (evita enumerare useri).
|
|
Comparatie constant-time cu hmac.compare_digest.
|
|
"""
|
|
row = conn.execute(
|
|
"SELECT account_id, password_hash, salt, scrypt_params FROM users "
|
|
"WHERE email=? COLLATE NOCASE",
|
|
(email.strip(),),
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
# Executa un hash dummy pentru a evita timing oracle pe email inexistent
|
|
_scrypt_hash(password, b"\x00" * 16)
|
|
return None
|
|
|
|
salt = bytes.fromhex(row["salt"])
|
|
expected = bytes.fromhex(row["password_hash"])
|
|
|
|
params = _parse_scrypt_params(row["scrypt_params"] or "")
|
|
if params is None:
|
|
return None
|
|
n, r, p = params
|
|
actual = _scrypt_hash(password, salt, n=n, r=r, p=p)
|
|
|
|
if hmac.compare_digest(actual, expected):
|
|
return int(row["account_id"])
|
|
return None
|
|
|
|
|
|
def get_user_by_email(conn: sqlite3.Connection, email: str) -> dict | None:
|
|
"""Metadate user dupa email (FARA password_hash si salt)."""
|
|
row = conn.execute(
|
|
"SELECT id, account_id, email, is_admin, email_verified, created_at "
|
|
"FROM users WHERE email=? COLLATE NOCASE",
|
|
(email.strip(),),
|
|
).fetchone()
|
|
return dict(row) if row else None
|