Files
rar-autopass/tests/test_web_cont.py
Claude Agent b1d825e66b feat(5.20): US-013 retragere accounts.rar_creds_enc -> per-env + DROP cu garda
Toate citirile pe coloana legacy accounts.rar_creds_enc mutate pe sloturile
per-env (rar_creds_test_enc/rar_creds_prod_enc): worker fallback+keepalive,
are_creds (web) si are_creds_rar (integrare, +are_creds_test/_prod), write-back
API la reactivare, purjare la stergere cont, _get_acasa_context/_fetch_cont_env_state.

Contract API (aditiv): POST /v1/conturi/rar-creds primeste rar_target optional
(test/prod), scrie in slotul corect + activeaza mediul; DELETE primeste ?env
(sterge un slot sau ambele). Documentat in docs/api-rar-contract.md.

DROP cu garda in db.py (schema.sql fara coloana pe DB fresh):
- 6a: eliminat ADD COLUMN rar_creds_enc (fara ping-pong re-ADD dupa DROP)
- 6b: try/except fail-safe (nu crapa boot-ul) + garda sqlite_version >= 3.35
- 6c: re-backfill old->new imediat inainte de assert (ancora globala)
- garda orfane: DROP anulat daca vreun creds legacy nu a aterizat in slot per-env
- backup criptat accounts_rar_creds_enc_backup inainte de DROP
- 6d: verificare prin PRAGMA table_info (NU grep — submissions are aceeasi coloana)
Garda one-way, idempotenta la boot repetat (verificat). submissions.rar_creds_enc
ramane neatinsa.

tests/test_retragere_creds_enc.py: niciun read pe coloana veche, conturi rar-creds
env-aware, are_creds per-env, DROP blocat de garda la lipsa copiere. 9 teste
existente actualizate pe sloturi per-env.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-02 21:03:08 +00:00

363 lines
14 KiB
Python

"""Teste US-007 (PRD 3.3b): sectiunea 'Contul meu' — rotire cheie API + creds RAR din UI.
TDD: testele se scriu INAINTE de implementare; la inceput pica (RED),
dupa implementare trec (GREEN).
Rute testate:
- GET /_fragments/cont -> card "Contul meu"
- POST /cont/roteste-cheie -> cheie noua afisata o singura data
- POST /cont/rar-creds -> seteaza rar_creds_enc per cont din sesiune
"""
from __future__ import annotations
import os
import re
import tempfile
import pytest
from starlette.testclient import TestClient
@pytest.fixture()
def client(monkeypatch):
"""Client fara web_auth_required (dev mode) — sesiunea se seteaza manual."""
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
from app.main import app
with TestClient(app, follow_redirects=False) as c:
yield c
get_settings.cache_clear()
@pytest.fixture()
def client_prod(monkeypatch):
"""Client cu web_auth_required=True (mod prod) — CSRF enforce."""
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true")
from app.config import get_settings
get_settings.cache_clear()
from app.main import app
with TestClient(app, follow_redirects=False) as c:
yield c
get_settings.cache_clear()
def _create_account_user(email: str = "user@test.com", password: str = "parolasecreta10"):
"""Creeaza cont + user + cheie API initiala. Intoarce (acct_id, user_id, api_key)."""
from app.accounts import create_account
from app.users import create_user
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
acct_id = create_account(conn, "Service Test", active=True)
user_id = create_user(conn, acct_id, email, password)
api_key = create_api_key(conn, acct_id)
return acct_id, user_id, api_key
finally:
conn.close()
def _login(client, email: str, password: str) -> None:
"""Face login real prin HTTP si seteaza cookie-ul de sesiune pe client."""
# Obtine CSRF token de pe pagina de login
resp = client.get("/login")
assert resp.status_code == 200
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text)
if not m:
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text)
assert m, "csrf_token negasit pe /login"
csrf = m.group(1)
resp = client.post("/login", data={
"email": email,
"parola": password,
"csrf_token": csrf,
})
# 303 redirect la / inseamna login reusit
assert resp.status_code == 303, f"Login esuat: {resp.status_code} {resp.text[:200]}"
def _get_csrf_from_fragment(client) -> str:
"""Obtine CSRF token din fragmentul /_fragments/cont."""
resp = client.get("/_fragments/cont")
assert resp.status_code == 200
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text)
if not m:
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text)
assert m, f"csrf_token negasit in /_fragments/cont: {resp.text[:500]}"
return m.group(1)
# ============================================================
# test_roteste_cheie_afisata_o_data
# ============================================================
def test_roteste_cheie_afisata_o_data(client):
"""User logat roteste cheia: raspunsul contine 'rfak_'; cheia veche revocata."""
acct_id, user_id, api_key_initiala = _create_account_user("roteste@test.com")
_login(client, "roteste@test.com", "parolasecreta10")
csrf = _get_csrf_from_fragment(client)
resp = client.post("/cont/roteste-cheie", data={"csrf_token": csrf})
assert resp.status_code == 200
assert "rfak_" in resp.text, f"Cheia noua nu apare in raspuns: {resp.text[:500]}"
# Verifica in DB: cheia veche revocata, una noua activa
from app.db import get_connection
conn = get_connection()
try:
rows = conn.execute(
"SELECT id, active FROM api_keys WHERE account_id=? ORDER BY id",
(acct_id,),
).fetchall()
# Trebuie sa avem minim 2 chei: cea initiala (active=0) si cea noua (active=1)
active_keys = [r for r in rows if r["active"] == 1]
inactive_keys = [r for r in rows if r["active"] == 0]
assert len(active_keys) == 1, f"Trebuia exact 1 cheie activa, gasit: {len(active_keys)}"
assert len(inactive_keys) >= 1, "Cheia veche trebuia revocata (active=0)"
finally:
conn.close()
# ============================================================
# test_set_creds_rar_din_sesiune
# ============================================================
def test_set_creds_rar_din_sesiune(client):
"""User logat seteaza creds RAR: slotul per-env (US-013) != NULL, decriptabil."""
acct_id, user_id, _ = _create_account_user("creds@test.com")
_login(client, "creds@test.com", "parolasecreta10")
csrf = _get_csrf_from_fragment(client)
resp = client.post("/cont/rar-creds", data={
"csrf_token": csrf,
"rar_email": "user@rar.ro",
"rar_parola": "parolaRAR123",
})
assert resp.status_code == 200
# Mesaj de succes in raspuns
assert "succes" in resp.text.lower() or "salvat" in resp.text.lower() or "configurat" in resp.text.lower(), \
f"Mesaj de succes lipsa: {resp.text[:500]}"
# Verifica in DB: slotul per-env setat si decriptabil (US-013: rar_creds_enc legacy dropata)
from app.db import get_connection
from app.crypto import decrypt_creds
conn = get_connection()
try:
row = conn.execute(
"SELECT rar_creds_test_enc, rar_creds_prod_enc FROM accounts WHERE id=?", (acct_id,)
).fetchone()
assert row is not None
enc = row["rar_creds_test_enc"] or row["rar_creds_prod_enc"]
assert enc is not None, "Cel putin un slot per-env trebuia setat (US-013)"
creds = decrypt_creds(enc)
assert creds is not None
assert creds.get("email") == "user@rar.ro"
assert creds.get("password") == "parolaRAR123"
finally:
conn.close()
# ============================================================
# test_creds_alt_cont_neafectat
# ============================================================
def test_creds_alt_cont_neafectat(client):
"""User A seteaza creds -> contul B ramane fara creds (sloturi per-env NULL, US-013)."""
acct_a, user_a, _ = _create_account_user("userA@test.com")
acct_b, user_b, _ = _create_account_user("userB@test.com")
# Logam user A si setam creds
_login(client, "userA@test.com", "parolasecreta10")
csrf = _get_csrf_from_fragment(client)
resp = client.post("/cont/rar-creds", data={
"csrf_token": csrf,
"rar_email": "a@rar.ro",
"rar_parola": "parolaA123",
})
assert resp.status_code == 200
# Verifica: contul A are creds in slotul per-env, contul B ramane NULL (US-013)
from app.db import get_connection
conn = get_connection()
try:
row_a = conn.execute(
"SELECT rar_creds_test_enc, rar_creds_prod_enc FROM accounts WHERE id=?", (acct_a,)
).fetchone()
row_b = conn.execute(
"SELECT rar_creds_test_enc, rar_creds_prod_enc FROM accounts WHERE id=?", (acct_b,)
).fetchone()
enc_a = row_a["rar_creds_test_enc"] or row_a["rar_creds_prod_enc"]
enc_b = row_b["rar_creds_test_enc"] or row_b["rar_creds_prod_enc"]
assert enc_a is not None, "Contul A trebuia sa aiba creds in slotul per-env"
assert enc_b is None, "Contul B nu trebuia atins"
finally:
conn.close()
# ============================================================
# test_roteste_fara_csrf_403_in_prod
# ============================================================
def test_roteste_fara_csrf_403_in_prod(client_prod):
"""Prod + sesiune autentificata + CSRF lipsa -> 403."""
# Cream cont + user
acct_id, user_id, _ = _create_account_user("csrf_test@test.com")
# Login real
_login(client_prod, "csrf_test@test.com", "parolasecreta10")
# POST fara csrf_token (sau cu token gresit)
resp = client_prod.post("/cont/roteste-cheie", data={"csrf_token": "token_gresit"})
assert resp.status_code == 403, f"Trebuia 403, got {resp.status_code}"
# ============================================================
# test_fragment_cont_nelogat_redirect
# ============================================================
def test_fragment_cont_nelogat_redirect(monkeypatch):
"""Fara sesiune + web_auth_required=True -> 303 redirect /login."""
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t_nl.db"))
monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true")
from app.config import get_settings
get_settings.cache_clear()
from app.main import app
with TestClient(app, follow_redirects=False) as c:
resp = c.get("/_fragments/cont")
assert resp.status_code == 303
assert "/login" in resp.headers.get("location", "")
get_settings.cache_clear()
# ============================================================
# US-002: sectiunea 'Date firma' + banner cont incomplet
# ============================================================
def _create_complete_account(
name: str = "Firma Test SRL",
login_email: str = "firma_test@test.com",
account_email: str = "contact@firma.com",
cui: str = "RO12345678",
password: str = "parolasecreta10",
):
"""Creeaza cont COMPLET (name+email+CUI) + user. Intoarce (acct_id, user_id)."""
from app.accounts import create_account
from app.users import create_user
from app.db import get_connection
conn = get_connection()
try:
acct_id = create_account(conn, name, cui=cui, email=account_email, active=True)
user_id = create_user(conn, acct_id, login_email, password)
return acct_id, user_id
finally:
conn.close()
def test_cont_afiseaza_companie_email_cui(client):
"""Fragment /_fragments/cont contine sectiunea 'Date firma' cu companie, email, CUI prefilled."""
_create_complete_account(
name="Test Firma SRL",
login_email="tfirma@test.com",
account_email="contact_tf@test.com",
cui="RO11111111",
)
_login(client, "tfirma@test.com", "parolasecreta10")
resp = client.get("/_fragments/cont")
assert resp.status_code == 200
assert "Date firma" in resp.text or "date-firma" in resp.text, \
f"Sectiunea 'Date firma' lipseste: {resp.text[:500]}"
assert "Test Firma SRL" in resp.text, f"Compania nu e prefilled: {resp.text[:500]}"
assert "contact_tf@test.com" in resp.text, f"Email-ul nu e prefilled: {resp.text[:500]}"
assert "RO11111111" in resp.text, f"CUI-ul nu e prefilled: {resp.text[:500]}"
def test_post_date_firma_actualizeaza(client):
"""POST /cont/date-firma actualizeaza accounts.name, accounts.email, accounts.cui in DB."""
acct_id, user_id, _ = _create_account_user("update_df@test.com")
_login(client, "update_df@test.com", "parolasecreta10")
csrf = _get_csrf_from_fragment(client)
resp = client.post("/cont/date-firma", data={
"csrf_token": csrf,
"companie": "Firma Actualizata SRL",
"email": "contact@firma-act.com",
"cui": "RO99887766",
})
assert resp.status_code == 200, f"POST /cont/date-firma a returnat {resp.status_code}"
# Verifica in DB ca datele au fost actualizate
from app.db import get_connection
conn = get_connection()
try:
row = conn.execute(
"SELECT name, email, cui FROM accounts WHERE id=?", (acct_id,)
).fetchone()
assert row["name"] == "Firma Actualizata SRL", f"name neschimbat: {row['name']}"
assert row["email"] == "contact@firma-act.com", f"email neschimbat: {row['email']}"
assert row["cui"] == "RO99887766", f"cui neschimbat: {row['cui']}"
finally:
conn.close()
def test_post_date_firma_cui_duplicat_eroare(client):
"""POST /cont/date-firma cu CUI deja folosit de alt cont -> eroare in raspuns."""
# Cont A cu CUI existent
_create_complete_account(
name="Firma A SRL",
login_email="firma_a_dup@test.com",
account_email="a_dup@test.com",
cui="ROAAA11111",
)
# Cont B fara CUI
acct_b, user_b, _ = _create_account_user("firma_b_dup@test.com")
_login(client, "firma_b_dup@test.com", "parolasecreta10")
csrf = _get_csrf_from_fragment(client)
resp = client.post("/cont/date-firma", data={
"csrf_token": csrf,
"companie": "Firma B SRL",
"email": "firma_b_dup@test.com",
"cui": "ROAAA11111", # CUI-ul lui A — duplicat
})
assert resp.status_code == 200
text = resp.text.lower()
assert "deja" in text or "duplicat" in text or "folosit" in text or "eroare" in text, \
f"Mesaj eroare CUI duplicat lipsa: {resp.text[:500]}"
# Contul B nu trebuie sa aiba CUI-ul lui A in DB
from app.db import get_connection
conn = get_connection()
try:
row = conn.execute("SELECT cui FROM accounts WHERE id=?", (acct_b,)).fetchone()
assert row["cui"] != "ROAAA11111", "CUI-ul duplicat a fost totusi salvat in DB"
finally:
conn.close()
def test_banner_cont_incomplet_pe_legacy(client):
"""Acasa afiseaza banner 'Completeaza datele firmei' cand contul e incomplet (fara email/CUI)."""
# Cont fara email/CUI (legacy: creat fara aceste campuri)
_create_account_user("legacy_test@test.com")
_login(client, "legacy_test@test.com", "parolasecreta10")
resp = client.get("/")
assert resp.status_code == 200
text = resp.text.lower()
# Banner trebuie sa apara cand contul e incomplet
assert "completeaza" in text or "date firm" in text or "incomplet" in text, \
f"Banner 'Completeaza datele firmei' lipsa pe Acasa: {resp.text[:2000]}"