"""Teste US-013 (PRD 5.20): retragere coloana legacy accounts.rar_creds_enc. Teste: test_niciun_read_pe_coloana_veche -- DB fresh: accounts N-are rar_creds_enc, submissions ARE test_conturi_rar_creds_env_aware -- POST /v1/conturi/rar-creds scrie in slotul per-env corect test_are_creds_pe_per_env -- are_creds (web/integrare) reflecta sloturile per-env test_drop_cu_garda_blocat_daca_lipsa_copiere -- garda refuza DROP cand exista orfane """ from __future__ import annotations import os import sqlite3 import tempfile import pytest from cryptography.fernet import Fernet from starlette.testclient import TestClient # --------------------------------------------------------------------------- # Fixture # --------------------------------------------------------------------------- @pytest.fixture() def client(monkeypatch): """Client izolat cu DB temporara + cheie Fernet pentru criptare creds.""" tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t_retragere.db")) monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode()) monkeypatch.setenv("AUTOPASS_RAR_ENV", "test") monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "false") from app.config import get_settings from app import crypto get_settings.cache_clear() crypto.reset_cache() from app.main import app with TestClient(app, follow_redirects=False) as c: yield c get_settings.cache_clear() crypto.reset_cache() @pytest.fixture() def conn_izolat(monkeypatch): """Conexiune directa la DB temporara (fara app), pentru teste de migrare in-process.""" tmp = tempfile.mkdtemp() db_path = os.path.join(tmp, "t_migrare.db") monkeypatch.setenv("AUTOPASS_DB_PATH", db_path) monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode()) monkeypatch.setenv("AUTOPASS_RAR_ENV", "test") from app.config import get_settings from app import crypto get_settings.cache_clear() crypto.reset_cache() from app.db import init_db, get_connection init_db() conn = get_connection() conn.row_factory = sqlite3.Row yield conn conn.close() get_settings.cache_clear() crypto.reset_cache() # --------------------------------------------------------------------------- # Helpere # --------------------------------------------------------------------------- def _create_account_user( name: str = "Service Test SRL", email: str = "user@test.com", password: str = "parolasecreta10", ): """Creeaza cont + user. Returneaza (acct_id, user_id).""" from app.accounts import create_account from app.users import create_user from app.db import get_connection conn = get_connection() try: acct_id = create_account(conn, name, active=True) user_id = create_user(conn, acct_id, email, password) return acct_id, user_id finally: conn.close() def _login_web(client, email: str, password: str) -> None: """Face login prin HTTP si seteaza cookie-ul de sesiune.""" import re resp = client.get("/login") assert resp.status_code == 200 m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) if not m: m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) assert m, "csrf_token negasit pe /login" csrf = m.group(1) resp = client.post("/login", data={"email": email, "parola": password, "csrf_token": csrf}) assert resp.status_code == 303, f"Login esuat: {resp.status_code} {resp.text[:200]}" # --------------------------------------------------------------------------- # Teste # --------------------------------------------------------------------------- def test_niciun_read_pe_coloana_veche(client): """DB fresh dupa init_db NU are accounts.rar_creds_enc; submissions INCA are rar_creds_enc.""" from app.db import get_connection conn = get_connection() try: acc_cols = {r["name"] for r in conn.execute("PRAGMA table_info(accounts)").fetchall()} sub_cols = {r["name"] for r in conn.execute("PRAGMA table_info(submissions)").fetchall()} finally: conn.close() assert "rar_creds_enc" not in acc_cols, ( "accounts.rar_creds_enc INCA prezenta dupa init_db pe DB fresh — DROP esuat sau schema neactualizata" ) assert "rar_creds_enc" in sub_cols, ( "submissions.rar_creds_enc TREBUIE sa ramana (creds efemere per-cerere)" ) def test_conturi_rar_creds_env_aware(client): """POST /v1/conturi/rar-creds cu rar_target='test' scrie in rar_creds_test_enc + rar_test_enabled=1. Fara rar_target -> scrie in slotul ancorei globale (AUTOPASS_RAR_ENV='test'). """ from app.db import get_connection acct_id, _ = _create_account_user("Firma ENV1", "env1@test.com") acct2_id, _ = _create_account_user("Firma ENV2", "env2@test.com") # Cu rar_target='test' explicit resp = client.post( "/v1/conturi/rar-creds", json={"email": "rar@firma.ro", "password": "parolatest", "rar_target": "test"}, headers={"X-Account-ID": str(acct_id)}, ) # Fara cheie API in dev, cont implicit id=1; dar testam prin conn directa # POST cu rar_target='test' pe contul 1 (dev, WEB_AUTH_REQUIRED=false) resp_t = client.post( "/v1/conturi/rar-creds", json={"email": "rar_test@firma.ro", "password": "parolatest123", "rar_target": "test"}, ) assert resp_t.status_code == 200, f"set_rar_creds(test) esuat: {resp_t.text}" body_t = resp_t.json() assert body_t.get("ok") is True assert body_t.get("rar_env") == "test", f"rar_env asteptat 'test', primit: {body_t}" conn = get_connection() try: row_t = conn.execute( "SELECT rar_creds_test_enc, rar_test_enabled FROM accounts WHERE id=1" ).fetchone() finally: conn.close() assert row_t["rar_creds_test_enc"] is not None, "rar_creds_test_enc trebuia scris" assert row_t["rar_test_enabled"] == 1, "rar_test_enabled trebuia setat la 1" # Fara rar_target -> ancora globala = 'test' (AUTOPASS_RAR_ENV=test) resp_n = client.post( "/v1/conturi/rar-creds", json={"email": "rar_noenv@firma.ro", "password": "paroladefault"}, ) assert resp_n.status_code == 200, f"set_rar_creds(no env) esuat: {resp_n.text}" body_n = resp_n.json() assert body_n.get("rar_env") == "test", f"rar_env implicit asteptat 'test': {body_n}" # Cu rar_target='prod' -> scrie in rar_creds_prod_enc resp_p = client.post( "/v1/conturi/rar-creds", json={"email": "rar_prod@firma.ro", "password": "parolaprod456", "rar_target": "prod"}, ) assert resp_p.status_code == 200, f"set_rar_creds(prod) esuat: {resp_p.text}" body_p = resp_p.json() assert body_p.get("rar_env") == "prod", f"rar_env asteptat 'prod': {body_p}" conn = get_connection() try: row_p = conn.execute( "SELECT rar_creds_prod_enc, rar_prod_enabled FROM accounts WHERE id=1" ).fetchone() finally: conn.close() assert row_p["rar_creds_prod_enc"] is not None, "rar_creds_prod_enc trebuia scris la rar_target=prod" assert row_p["rar_prod_enabled"] == 1, "rar_prod_enabled trebuia setat la 1" def test_are_creds_pe_per_env(client): """GET /v1/ping are_creds_rar/are_creds_test/are_creds_prod reflecta sloturile per-env. Fara creds -> are_creds_rar=False. Cu creds test -> are_creds_test=True, are_creds_rar=True. """ # Stare initiala: niciun slot cu creds resp0 = client.get("/v1/ping") assert resp0.status_code == 200 body0 = resp0.json() assert body0.get("are_creds_rar") is False, f"are_creds_rar asteptat False initial: {body0}" assert body0.get("are_creds_test") is False, f"are_creds_test asteptat False: {body0}" assert body0.get("are_creds_prod") is False, f"are_creds_prod asteptat False: {body0}" # Seteaza creds test client.post( "/v1/conturi/rar-creds", json={"email": "rar@test.ro", "password": "parola123", "rar_target": "test"}, ) resp1 = client.get("/v1/ping") assert resp1.status_code == 200 body1 = resp1.json() assert body1.get("are_creds_rar") is True, f"are_creds_rar asteptat True dupa set test: {body1}" assert body1.get("are_creds_test") is True, f"are_creds_test asteptat True: {body1}" assert body1.get("are_creds_prod") is False, f"are_creds_prod asteptat False: {body1}" # DELETE ?env=test -> test dispare, prod ramane False resp_d = client.delete("/v1/conturi/rar-creds?env=test") assert resp_d.status_code == 200 resp2 = client.get("/v1/ping") body2 = resp2.json() assert body2.get("are_creds_rar") is False, f"are_creds_rar asteptat False dupa delete test: {body2}" assert body2.get("are_creds_test") is False def test_drop_cu_garda_blocat_daca_lipsa_copiere(conn_izolat): """Garda refuza DROP cand exista cont cu rar_creds_enc dar ambele sloturi per-env goale. Apelam _garda_si_drop direct (fara re-backfill 6c) pe o stare adversariala: cont cu valoare in coloana legacy, rar_creds_test_enc=NULL, rar_creds_prod_enc=NULL. Rezultat asteptat: DROP refuzat (coloana inca prezenta pe accounts). """ conn = conn_izolat # Adaugam manual coloana legacy daca nu exista (DB fresh a dropat-o deja prin _drop_legacy) acc_cols = {r["name"] for r in conn.execute("PRAGMA table_info(accounts)").fetchall()} if "rar_creds_enc" not in acc_cols: conn.execute("ALTER TABLE accounts ADD COLUMN rar_creds_enc TEXT") conn.commit() # Stare adversariala: insert valoare dummy in coloana legacy, sloturi per-env goale conn.execute( "UPDATE accounts SET rar_creds_enc='DUMMY_CRIPTAT', " "rar_creds_test_enc=NULL, rar_creds_prod_enc=NULL WHERE id=1" ) conn.commit() # Verifica ca starea adversariala e corecta inainte de test row_before = conn.execute( "SELECT rar_creds_enc, rar_creds_test_enc, rar_creds_prod_enc FROM accounts WHERE id=1" ).fetchone() assert row_before["rar_creds_enc"] == "DUMMY_CRIPTAT" assert row_before["rar_creds_test_enc"] is None assert row_before["rar_creds_prod_enc"] is None # Apelam _garda_si_drop direct (fara re-backfill 6c) from app.db import _garda_si_drop _garda_si_drop(conn) # Garda trebuia sa refuze DROP-ul: coloana rar_creds_enc inca prezenta pe accounts acc_cols_after = {r["name"] for r in conn.execute("PRAGMA table_info(accounts)").fetchall()} assert "rar_creds_enc" in acc_cols_after, ( "Garda a permis DROP cand exista orfane — pierdere de date (BUG CRITIC)" ) # Valoarea legacy trebuie sa fie inca prezenta (neatinsa) row_after = conn.execute( "SELECT rar_creds_enc FROM accounts WHERE id=1" ).fetchone() assert row_after["rar_creds_enc"] == "DUMMY_CRIPTAT", ( "Valoarea rar_creds_enc disparuta desi DROP a fost refuzat" )