Files
rar-autopass/app/users.py
Claude Agent b92055eb01 feat(web): self-service cheie/creds + admin web + email signup (PRD 3.3b)
US-007: rute web proprii /cont/roteste-cheie + /cont/rar-creds scoped pe
sesiune (C13), sectiune "Contul meu" cu cheie afisata o data.
US-010: rol admin (users.is_admin) + require_admin->403 + CLI set-admin +
bootstrap primul cont=admin (count_admins in BEGIN IMMEDIATE, anti-race).
US-011: panou /admin (activare/dezactivare conturi, CSRF + PRG), link admin
+ logout pe dashboard.
US-012: app/email.py notify_signup best-effort degradat fara SMTP + config smtp_*.
Fix: migrare defensiva users.is_admin/email_verified in _migrate.

VERIFY x2 context curat (PASS) + /code-review high. 393 teste pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:19:06 +00:00

168 lines
5.4 KiB
Python

"""Helper-e utilizatori web (email + parola scrypt). US-001 PRD 3.3.
Parola NICIODATA stocata in clar. Fiecare user are un salt per-user generat cu
secrets.token_bytes(16). Parametrii scrypt stocati ca eticheta de versiune pentru
migrare cost viitoare (C9).
"""
from __future__ import annotations
import hashlib
import hmac
import secrets
import sqlite3
SCRYPT_PARAMS = "n16384_r8_p1"
_N = 2**14
_R = 8
_P = 1
_DKLEN = 32
_MAXMEM = 64 * 1024 * 1024
_PASSWORD_MIN = 10
_PASSWORD_MAX = 128
def _parse_scrypt_params(label: str) -> tuple[int, int, int] | None:
"""Parseaza 'nN_rR_pP' -> (N, R, P). Returneaza None la format necunoscut/corupt."""
try:
parts = label.split("_")
if len(parts) != 3 or parts[0][0] != "n" or parts[1][0] != "r" or parts[2][0] != "p":
return None
return (int(parts[0][1:]), int(parts[1][1:]), int(parts[2][1:]))
except (ValueError, IndexError):
return None
def _scrypt_hash(password: str, salt: bytes, n: int = _N, r: int = _R, p: int = _P) -> bytes:
return hashlib.scrypt(
password.encode("utf-8"),
salt=salt,
n=n,
r=r,
p=p,
maxmem=_MAXMEM,
dklen=_DKLEN,
)
def create_user(
conn: sqlite3.Connection,
account_id: int,
email: str,
password: str,
is_admin: bool = False,
) -> int:
"""Creeaza un user nou si intoarce id-ul.
Valideaza ca: contul exista, parola intre 10 si 128 caractere, emailul nu e duplicat.
Stocheaza DOAR hash scrypt + salt (hex), niciodata parola in clar.
Email duplicat (case-insensitive, via UNIQUE COLLATE NOCASE) -> ValueError.
is_admin: daca True, userul e marcat ca admin (is_admin=1). Apelantul decide
logica de bootstrap (count_admins==0 -> primul cont devine admin).
"""
email = email.strip()
acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone()
if not acct:
raise ValueError(f"cont inexistent: {account_id}")
if len(password) < _PASSWORD_MIN:
raise ValueError(f"parola prea scurta (minim {_PASSWORD_MIN} caractere)")
if len(password) > _PASSWORD_MAX:
raise ValueError(f"parola prea lunga (maxim {_PASSWORD_MAX} caractere, anti-DoS)")
salt = secrets.token_bytes(16)
pw_hash = _scrypt_hash(password, salt)
try:
cur = conn.execute(
"INSERT INTO users (account_id, email, password_hash, salt, scrypt_params, is_admin) "
"VALUES (?, ?, ?, ?, ?, ?)",
(account_id, email, pw_hash.hex(), salt.hex(), SCRYPT_PARAMS, 1 if is_admin else 0),
)
except sqlite3.IntegrityError:
raise ValueError("email deja folosit")
return int(cur.lastrowid or 0)
def count_admins(conn: sqlite3.Connection) -> int:
"""Numara userii cu is_admin=1 din intreaga baza."""
row = conn.execute("SELECT COUNT(*) AS n FROM users WHERE is_admin=1").fetchone()
return int(row["n"]) if row else 0
def set_admin(conn: sqlite3.Connection, account_id: int, is_admin: bool = True) -> None:
"""Seteaza/sterge rolul admin pe toti userii contului dat.
Ridica ValueError daca contul nu exista.
Daca contul exista dar nu are useri, e no-op silentios (confom spec US-010).
"""
acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone()
if not acct:
raise ValueError(f"cont inexistent: {account_id}")
conn.execute(
"UPDATE users SET is_admin=? WHERE account_id=?",
(1 if is_admin else 0, account_id),
)
def is_account_admin(conn: sqlite3.Connection, account_id: int) -> bool:
"""Returneaza True daca cel putin un user al contului are is_admin=1."""
row = conn.execute(
"SELECT 1 FROM users WHERE account_id=? AND is_admin=1 LIMIT 1",
(account_id,),
).fetchone()
return row is not None
def list_admin_emails(conn: sqlite3.Connection) -> list[str]:
"""Returneaza emailurile tuturor userilor cu is_admin=1 (folosit de US-012)."""
rows = conn.execute(
"SELECT email FROM users WHERE is_admin=1"
).fetchall()
return [row["email"] for row in rows]
def verify_password(conn: sqlite3.Connection, email: str, password: str) -> int | None:
"""Verifica parola pentru email. Intoarce account_id la potrivire, None altfel.
Nu distinge intre email inexistent si parola gresita (evita enumerare useri).
Comparatie constant-time cu hmac.compare_digest.
"""
row = conn.execute(
"SELECT account_id, password_hash, salt, scrypt_params FROM users "
"WHERE email=? COLLATE NOCASE",
(email.strip(),),
).fetchone()
if row is None:
# Executa un hash dummy pentru a evita timing oracle pe email inexistent
_scrypt_hash(password, b"\x00" * 16)
return None
salt = bytes.fromhex(row["salt"])
expected = bytes.fromhex(row["password_hash"])
params = _parse_scrypt_params(row["scrypt_params"] or "")
if params is None:
return None
n, r, p = params
actual = _scrypt_hash(password, salt, n=n, r=r, p=p)
if hmac.compare_digest(actual, expected):
return int(row["account_id"])
return None
def get_user_by_email(conn: sqlite3.Connection, email: str) -> dict | None:
"""Metadate user dupa email (FARA password_hash si salt)."""
row = conn.execute(
"SELECT id, account_id, email, is_admin, email_verified, created_at "
"FROM users WHERE email=? COLLATE NOCASE",
(email.strip(),),
).fetchone()
return dict(row) if row else None