US-007: rute web proprii /cont/roteste-cheie + /cont/rar-creds scoped pe sesiune (C13), sectiune "Contul meu" cu cheie afisata o data. US-010: rol admin (users.is_admin) + require_admin->403 + CLI set-admin + bootstrap primul cont=admin (count_admins in BEGIN IMMEDIATE, anti-race). US-011: panou /admin (activare/dezactivare conturi, CSRF + PRG), link admin + logout pe dashboard. US-012: app/email.py notify_signup best-effort degradat fara SMTP + config smtp_*. Fix: migrare defensiva users.is_admin/email_verified in _migrate. VERIFY x2 context curat (PASS) + /code-review high. 393 teste pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
233 lines
8.6 KiB
Python
233 lines
8.6 KiB
Python
"""Teste US-007 (PRD 3.3b): sectiunea 'Contul meu' — rotire cheie API + creds RAR din UI.
|
|
|
|
TDD: testele se scriu INAINTE de implementare; la inceput pica (RED),
|
|
dupa implementare trec (GREEN).
|
|
|
|
Rute testate:
|
|
- GET /_fragments/cont -> card "Contul meu"
|
|
- POST /cont/roteste-cheie -> cheie noua afisata o singura data
|
|
- POST /cont/rar-creds -> seteaza rar_creds_enc per cont din sesiune
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import tempfile
|
|
|
|
import pytest
|
|
from starlette.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture()
|
|
def client(monkeypatch):
|
|
"""Client fara web_auth_required (dev mode) — sesiunea se seteaza manual."""
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
from app.main import app
|
|
with TestClient(app, follow_redirects=False) as c:
|
|
yield c
|
|
get_settings.cache_clear()
|
|
|
|
|
|
@pytest.fixture()
|
|
def client_prod(monkeypatch):
|
|
"""Client cu web_auth_required=True (mod prod) — CSRF enforce."""
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
|
|
monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true")
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
from app.main import app
|
|
with TestClient(app, follow_redirects=False) as c:
|
|
yield c
|
|
get_settings.cache_clear()
|
|
|
|
|
|
def _create_account_user(email: str = "user@test.com", password: str = "parolasecreta10"):
|
|
"""Creeaza cont + user + cheie API initiala. Intoarce (acct_id, user_id, api_key)."""
|
|
from app.accounts import create_account
|
|
from app.users import create_user
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
acct_id = create_account(conn, "Service Test", active=True)
|
|
user_id = create_user(conn, acct_id, email, password)
|
|
api_key = create_api_key(conn, acct_id)
|
|
return acct_id, user_id, api_key
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _login(client, email: str, password: str) -> None:
|
|
"""Face login real prin HTTP si seteaza cookie-ul de sesiune pe client."""
|
|
# Obtine CSRF token de pe pagina de login
|
|
resp = client.get("/login")
|
|
assert resp.status_code == 200
|
|
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text)
|
|
if not m:
|
|
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text)
|
|
assert m, "csrf_token negasit pe /login"
|
|
csrf = m.group(1)
|
|
|
|
resp = client.post("/login", data={
|
|
"email": email,
|
|
"parola": password,
|
|
"csrf_token": csrf,
|
|
})
|
|
# 303 redirect la / inseamna login reusit
|
|
assert resp.status_code == 303, f"Login esuat: {resp.status_code} {resp.text[:200]}"
|
|
|
|
|
|
def _get_csrf_from_fragment(client) -> str:
|
|
"""Obtine CSRF token din fragmentul /_fragments/cont."""
|
|
resp = client.get("/_fragments/cont")
|
|
assert resp.status_code == 200
|
|
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text)
|
|
if not m:
|
|
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text)
|
|
assert m, f"csrf_token negasit in /_fragments/cont: {resp.text[:500]}"
|
|
return m.group(1)
|
|
|
|
|
|
# ============================================================
|
|
# test_roteste_cheie_afisata_o_data
|
|
# ============================================================
|
|
|
|
def test_roteste_cheie_afisata_o_data(client):
|
|
"""User logat roteste cheia: raspunsul contine 'rfak_'; cheia veche revocata."""
|
|
acct_id, user_id, api_key_initiala = _create_account_user("roteste@test.com")
|
|
_login(client, "roteste@test.com", "parolasecreta10")
|
|
|
|
csrf = _get_csrf_from_fragment(client)
|
|
resp = client.post("/cont/roteste-cheie", data={"csrf_token": csrf})
|
|
|
|
assert resp.status_code == 200
|
|
assert "rfak_" in resp.text, f"Cheia noua nu apare in raspuns: {resp.text[:500]}"
|
|
|
|
# Verifica in DB: cheia veche revocata, una noua activa
|
|
from app.db import get_connection
|
|
conn = get_connection()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT id, active FROM api_keys WHERE account_id=? ORDER BY id",
|
|
(acct_id,),
|
|
).fetchall()
|
|
# Trebuie sa avem minim 2 chei: cea initiala (active=0) si cea noua (active=1)
|
|
active_keys = [r for r in rows if r["active"] == 1]
|
|
inactive_keys = [r for r in rows if r["active"] == 0]
|
|
assert len(active_keys) == 1, f"Trebuia exact 1 cheie activa, gasit: {len(active_keys)}"
|
|
assert len(inactive_keys) >= 1, "Cheia veche trebuia revocata (active=0)"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ============================================================
|
|
# test_set_creds_rar_din_sesiune
|
|
# ============================================================
|
|
|
|
def test_set_creds_rar_din_sesiune(client):
|
|
"""User logat seteaza creds RAR: accounts.rar_creds_enc != NULL, decriptabil."""
|
|
acct_id, user_id, _ = _create_account_user("creds@test.com")
|
|
_login(client, "creds@test.com", "parolasecreta10")
|
|
|
|
csrf = _get_csrf_from_fragment(client)
|
|
resp = client.post("/cont/rar-creds", data={
|
|
"csrf_token": csrf,
|
|
"rar_email": "user@rar.ro",
|
|
"rar_parola": "parolaRAR123",
|
|
})
|
|
|
|
assert resp.status_code == 200
|
|
# Mesaj de succes in raspuns
|
|
assert "succes" in resp.text.lower() or "salvat" in resp.text.lower() or "configurat" in resp.text.lower(), \
|
|
f"Mesaj de succes lipsa: {resp.text[:500]}"
|
|
|
|
# Verifica in DB: rar_creds_enc setat si decriptabil
|
|
from app.db import get_connection
|
|
from app.crypto import decrypt_creds
|
|
conn = get_connection()
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT rar_creds_enc FROM accounts WHERE id=?", (acct_id,)
|
|
).fetchone()
|
|
assert row is not None
|
|
assert row["rar_creds_enc"] is not None, "rar_creds_enc trebuia setat"
|
|
creds = decrypt_creds(row["rar_creds_enc"])
|
|
assert creds is not None
|
|
assert creds.get("email") == "user@rar.ro"
|
|
assert creds.get("password") == "parolaRAR123"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ============================================================
|
|
# test_creds_alt_cont_neafectat
|
|
# ============================================================
|
|
|
|
def test_creds_alt_cont_neafectat(client):
|
|
"""User A seteaza creds -> contul B ramane cu rar_creds_enc NULL."""
|
|
acct_a, user_a, _ = _create_account_user("userA@test.com")
|
|
acct_b, user_b, _ = _create_account_user("userB@test.com")
|
|
|
|
# Logam user A si setam creds
|
|
_login(client, "userA@test.com", "parolasecreta10")
|
|
csrf = _get_csrf_from_fragment(client)
|
|
resp = client.post("/cont/rar-creds", data={
|
|
"csrf_token": csrf,
|
|
"rar_email": "a@rar.ro",
|
|
"rar_parola": "parolaA123",
|
|
})
|
|
assert resp.status_code == 200
|
|
|
|
# Verifica: contul A are creds, contul B ramane NULL
|
|
from app.db import get_connection
|
|
conn = get_connection()
|
|
try:
|
|
row_a = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=?", (acct_a,)).fetchone()
|
|
row_b = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=?", (acct_b,)).fetchone()
|
|
assert row_a["rar_creds_enc"] is not None, "Contul A trebuia sa aiba creds"
|
|
assert row_b["rar_creds_enc"] is None, "Contul B nu trebuia atins"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ============================================================
|
|
# test_roteste_fara_csrf_403_in_prod
|
|
# ============================================================
|
|
|
|
def test_roteste_fara_csrf_403_in_prod(client_prod):
|
|
"""Prod + sesiune autentificata + CSRF lipsa -> 403."""
|
|
# Cream cont + user
|
|
acct_id, user_id, _ = _create_account_user("csrf_test@test.com")
|
|
|
|
# Login real
|
|
_login(client_prod, "csrf_test@test.com", "parolasecreta10")
|
|
|
|
# POST fara csrf_token (sau cu token gresit)
|
|
resp = client_prod.post("/cont/roteste-cheie", data={"csrf_token": "token_gresit"})
|
|
assert resp.status_code == 403, f"Trebuia 403, got {resp.status_code}"
|
|
|
|
|
|
# ============================================================
|
|
# test_fragment_cont_nelogat_redirect
|
|
# ============================================================
|
|
|
|
def test_fragment_cont_nelogat_redirect(monkeypatch):
|
|
"""Fara sesiune + web_auth_required=True -> 303 redirect /login."""
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t_nl.db"))
|
|
monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true")
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
from app.main import app
|
|
with TestClient(app, follow_redirects=False) as c:
|
|
resp = c.get("/_fragments/cont")
|
|
assert resp.status_code == 303
|
|
assert "/login" in resp.headers.get("location", "")
|
|
get_settings.cache_clear()
|