Files
rar-autopass/tests/test_users.py
Claude Agent 504b490d3b feat(web): self-onboarding multi-tenant + auth sesiune (PRD 3.3a)
Canalul web trece de la 100% deschis (hardcodat cont 1) la autentificat si
multi-tenant. Un service nou se inregistreaza din browser, primeste o cheie API
(o singura data) si o sesiune; contul se creeaza "in asteptare" (active=0) si nu
trimite la RAR pana la activarea de catre admin (tools/account.py activate).

- users + app/users.py: parole scrypt (salt per-user, eticheta parametri onorata
  la verify pentru migrare cost), email unic case-insensitive
- sesiune: SessionMiddleware (same_site=strict, https_only config) + app/web/session.py
  (current_account/web_account/require_login->LoginRequired, set_session clear-inainte)
- CSRF (app/web/csrf.py) enforce in prod inclusiv pe login/signup + rate-limit
  in-proces (app/web/ratelimit.py) pe signup si login
- signup/login/logout (app/web/auth_routes.py): signup tranzactie atomica,
  cheie-o-data, log SIGNUP pentru descoperire admin
- dashboard + import scoped pe contul sesiunii (regula NULL->cont 1); toate rutele
  web care ating date sensibile sub require_login; nomenclator ramane global
- banner "cont in asteptare" pentru conturi active=0
- gate worker: claim_one LEFT JOIN accounts COALESCE(active,1)=1 (account_id NULL=activ)

VERIFY context curat (2 runde): leak cross-account /_fragments/mapari prins+reparat.
/code-review high: csrf_token lipsa pe re-randari de eroare, scrypt_params ignorat,
login fara rate-limit -- toate reparate. 361 teste pass (de la 313).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 16:43:21 +00:00

194 lines
7.1 KiB
Python

"""Teste US-001 (PRD 3.3): tabela users + helper-e parole scrypt in app/users.py."""
from __future__ import annotations
import os
import sqlite3
import tempfile
import pytest
@pytest.fixture()
def conn(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_users.db"))
from app.config import get_settings
get_settings.cache_clear()
from app.db import get_connection, init_db
init_db()
c = get_connection()
yield c
c.close()
get_settings.cache_clear()
@pytest.fixture()
def account_id(conn):
"""Cont de test (nu default id=1)."""
from app.accounts import create_account
return create_account(conn, "Service Test")
def test_create_user_hash_nu_e_plaintext(conn, account_id):
"""password_hash din DB nu contine parola in clar si nu e egal cu ea."""
from app.users import create_user
parola = "parola_sigura_123"
user_id = create_user(conn, account_id, "test@exemplu.ro", parola)
assert isinstance(user_id, int)
row = conn.execute(
"SELECT password_hash, salt FROM users WHERE id=?", (user_id,)
).fetchone()
assert row is not None
assert row["password_hash"] != parola
assert parola not in row["password_hash"]
assert row["salt"] != parola
def test_verify_parola_corecta_si_gresita(conn, account_id):
"""verify_password intoarce account_id la parola corecta, None la cea gresita."""
from app.users import create_user, verify_password
create_user(conn, account_id, "user@exemplu.ro", "parola_corecta_99")
result_ok = verify_password(conn, "user@exemplu.ro", "parola_corecta_99")
assert result_ok == account_id
result_gresit = verify_password(conn, "user@exemplu.ro", "parola_gresita_00")
assert result_gresit is None
result_inexistent = verify_password(conn, "inexistent@exemplu.ro", "parola_corecta_99")
assert result_inexistent is None
def test_email_unic_global(conn, account_id):
"""Al doilea create_user cu acelasi email (diferit doar in case) ridica ValueError."""
from app.users import create_user
create_user(conn, account_id, "Unic@exemplu.ro", "parola_unica_001")
with pytest.raises(ValueError, match="email deja folosit"):
create_user(conn, account_id, "unic@exemplu.ro", "alta_parola_002")
def test_get_user_by_email(conn, account_id):
"""get_user_by_email intoarce metadate fara password_hash si salt."""
from app.users import create_user, get_user_by_email
create_user(conn, account_id, "meta@exemplu.ro", "parola_meta_xyz")
user = get_user_by_email(conn, "meta@exemplu.ro")
assert user is not None
assert user["email"].lower() == "meta@exemplu.ro"
assert user["account_id"] == account_id
assert "id" in user
assert "is_admin" in user
assert "email_verified" in user
assert "created_at" in user
assert "password_hash" not in user
assert "salt" not in user
assert get_user_by_email(conn, "inexistent@exemplu.ro") is None
def test_parola_scurta_si_lunga_eroare(conn, account_id):
"""Parola < 10 caractere sau > 128 ridica ValueError (C9 anti-DoS)."""
from app.users import create_user
with pytest.raises(ValueError):
create_user(conn, account_id, "scurta@ex.ro", "scurt")
with pytest.raises(ValueError):
create_user(conn, account_id, "lunga@ex.ro", "x" * 129)
# exact 10 caractere — trebuie sa mearga
uid = create_user(conn, account_id, "exact10@ex.ro", "a" * 10)
assert uid > 0
# exact 128 caractere — trebuie sa mearga
uid2 = create_user(conn, account_id, "exact128@ex.ro", "b" * 128)
assert uid2 > 0
def test_verify_honoreaza_scrypt_params(conn, account_id, monkeypatch):
"""verify_password foloseste parametrii din DB (scrypt_params), nu constantele globale.
Simuleaza migrare cost: hash creat cu n=4 (vechi), constanta _N ridicata la 2**15 (nou).
verify_password trebuie sa returneze account_id folosind n=4 din DB, nu _N global.
"""
import hashlib
import secrets as _secrets
import app.users as users_mod
email = "legacy@test.com"
password = "parolasecreta"
# Hash cu parametri "vechi" (n=4, rapid pentru teste)
n_old, r_old, p_old = 4, 8, 1
salt = _secrets.token_bytes(16)
pw_hash = hashlib.scrypt(
password.encode("utf-8"),
salt=salt,
n=n_old, r=r_old, p=p_old,
maxmem=64 * 1024 * 1024,
dklen=32,
)
conn.execute(
"INSERT INTO users (account_id, email, password_hash, salt, scrypt_params) "
"VALUES (?, ?, ?, ?, ?)",
(account_id, email, pw_hash.hex(), salt.hex(), "n4_r8_p1"),
)
# Simuleaza cresterea costului: _N e acum mai mare
monkeypatch.setattr(users_mod, "_N", 2**15)
# verify_password trebuie sa onoreze n=4 din DB, nu sa foloseasca _N=2**15
result = users_mod.verify_password(conn, email, password)
assert result == account_id, "verify_password trebuia sa onoreze scrypt_params din DB"
assert users_mod.verify_password(conn, email, "gresita123456") is None
def test_verify_params_corupti_return_none(conn, account_id):
"""scrypt_params corupt/necunoscut -> verify returneaza None (no crash)."""
import hashlib
import secrets as _secrets
email = "corupt@test.com"
password = "parolasecreta"
salt = _secrets.token_bytes(16)
pw_hash = hashlib.scrypt(password.encode(), salt=salt, n=4, r=8, p=1,
maxmem=64 * 1024 * 1024, dklen=32)
conn.execute(
"INSERT INTO users (account_id, email, password_hash, salt, scrypt_params) "
"VALUES (?, ?, ?, ?, ?)",
(account_id, email, pw_hash.hex(), salt.hex(), "FORMAT_NECUNOSCUT"),
)
from app.users import verify_password
result = verify_password(conn, email, password)
assert result is None, "Eticheta corupta trebuia sa returneze None, nu crash"
def test_init_db_pe_db_fara_users_creeaza_tabela(monkeypatch, tmp_path):
"""init_db pe o DB existenta fara tabela users o creeaza fara eroare (migrare idempotenta)."""
db_path = tmp_path / "veche.db"
monkeypatch.setenv("AUTOPASS_DB_PATH", str(db_path))
from app.config import get_settings
get_settings.cache_clear()
# Creeaza DB fara tabela users (simuleaza DB veche)
import sqlite3 as _sq
conn_raw = _sq.connect(str(db_path))
conn_raw.execute("PRAGMA journal_mode = WAL")
conn_raw.execute("PRAGMA foreign_keys = ON")
conn_raw.execute(
"CREATE TABLE IF NOT EXISTS accounts "
"(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, "
"cui TEXT, active INTEGER NOT NULL DEFAULT 1, "
"rar_creds_enc TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"
)
conn_raw.execute("INSERT OR IGNORE INTO accounts (id, name) VALUES (1, 'default')")
conn_raw.commit()
conn_raw.close()
# init_db trebuie sa creeze tabela users fara eroare
from app.db import init_db
init_db()
from app.db import get_connection
c = get_connection()
tables = {r[0] for r in c.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()}
c.close()
assert "users" in tables
get_settings.cache_clear()