"""Teste US-004: POST /integrare/test-cheie — verifica cheia API lipita de utilizator. TDD: testele se scriu INAINTE de implementare; la inceput pica (RED), dupa implementare trec (GREEN). Ruta testata: - POST /integrare/test-cheie -> fragment HTML cu rezultat validare """ from __future__ import annotations import os import re import tempfile import pytest from starlette.testclient import TestClient # --------------------------------------------------------------------------- # Fixture-uri # --------------------------------------------------------------------------- @pytest.fixture() def client(monkeypatch): """Client fara web_auth_required (dev mode) — sesiunea se seteaza manual.""" tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) from app.config import get_settings get_settings.cache_clear() from app.main import app with TestClient(app, follow_redirects=False) as c: yield c get_settings.cache_clear() @pytest.fixture() def client_prod(monkeypatch): """Client cu web_auth_required=True (mod prod) — CSRF enforce.""" tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true") from app.config import get_settings get_settings.cache_clear() from app.main import app with TestClient(app, follow_redirects=False) as c: yield c get_settings.cache_clear() # --------------------------------------------------------------------------- # Helper-e # --------------------------------------------------------------------------- def _create_account_user(email: str = "user@test.com", password: str = "parolasecreta10"): """Creeaza cont + user + cheie API initiala. Intoarce (acct_id, user_id, api_key).""" from app.accounts import create_account from app.users import create_user from app.auth import create_api_key from app.db import get_connection conn = get_connection() try: acct_id = create_account(conn, "Service Test", active=True) user_id = create_user(conn, acct_id, email, password) api_key = create_api_key(conn, acct_id) return acct_id, user_id, api_key finally: conn.close() def _login(client, email: str, password: str) -> None: """Face login real prin HTTP si seteaza cookie-ul de sesiune pe client.""" resp = client.get("/login") assert resp.status_code == 200 m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) if not m: m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) assert m, "csrf_token negasit pe /login" csrf = m.group(1) resp = client.post("/login", data={ "email": email, "parola": password, "csrf_token": csrf, }) assert resp.status_code == 303, f"Login esuat: {resp.status_code} {resp.text[:200]}" def _get_csrf_from_fragment(client) -> str: """Obtine CSRF token din fragmentul /_fragments/cont.""" resp = client.get("/_fragments/cont") assert resp.status_code == 200 m = re.search(r'name="csrf_token"\s+value="([^"]+)"', resp.text) if not m: m = re.search(r'value="([^"]+)"\s+name="csrf_token"', resp.text) assert m, f"csrf_token negasit in /_fragments/cont: {resp.text[:500]}" return m.group(1) # --------------------------------------------------------------------------- # Teste # --------------------------------------------------------------------------- def test_cheie_valida_a_contului_curent_ok(client): """Cheie activa a contului sesiunii -> raspuns de succes cu mention cont.""" acct_id, user_id, api_key = _create_account_user("cheie_ok@test.com") _login(client, "cheie_ok@test.com", "parolasecreta10") csrf = _get_csrf_from_fragment(client) resp = client.post("/integrare/test-cheie", data={ "csrf_token": csrf, "api_key": api_key, }) assert resp.status_code == 200 body = resp.text.lower() # Trebuie sa contina un mesaj de succes (clasa flash sau text "valida") assert "valida" in body or "succes" in body, f"Mesaj de succes lipsa: {resp.text[:500]}" # Trebuie sa mentioneze contul (account_id) assert str(acct_id) in resp.text, f"account_id {acct_id} absent din raspuns: {resp.text[:500]}" def test_cheie_a_altui_cont_respinsa(client): """Cheia unui alt cont -> mesaj neutru 'nu apartine contului tau', fara sa spuna care cont.""" acct_a, user_a, key_a = _create_account_user("altcont_a@test.com") acct_b, user_b, key_b = _create_account_user("altcont_b@test.com") # Logam user A; testam cu cheia lui B _login(client, "altcont_a@test.com", "parolasecreta10") csrf = _get_csrf_from_fragment(client) resp = client.post("/integrare/test-cheie", data={ "csrf_token": csrf, "api_key": key_b, }) assert resp.status_code == 200 body = resp.text.lower() # Trebuie sa respinga - mesaj ca nu apartine contului assert "nu apartine" in body or "alt cont" in body or "nu este" in body, \ f"Mesaj de respingere lipsa: {resp.text[:500]}" # NU trebuie sa dezvaluie ca e cheia contului B (nu mentionam alt account_id) assert str(acct_b) not in resp.text, \ f"account_id-ul contului {acct_b} (terta) nu trebuia dezvaluit: {resp.text[:500]}" def test_cheie_invalida_mesaj_clar(client): """Cheie inexistenta -> mesaj clar de eroare (nu valida).""" acct_id, user_id, _ = _create_account_user("invalida@test.com") _login(client, "invalida@test.com", "parolasecreta10") csrf = _get_csrf_from_fragment(client) resp = client.post("/integrare/test-cheie", data={ "csrf_token": csrf, "api_key": "rfak_cheie_inexistenta_total_falsa", }) assert resp.status_code == 200 body = resp.text.lower() # Trebuie sa contina un mesaj de eroare (nu succes) assert "invalida" in body or "inexistenta" in body or "negasita" in body or "eroare" in body, \ f"Mesaj de eroare lipsa: {resp.text[:500]}" assert "valida" not in body or "invalida" in body, \ f"Raspuns da fals pozitiv 'valida': {resp.text[:500]}" def test_cheie_revocata_dupa_rotire_respinsa(client): """Cheia veche (revocata dupa rotire) -> tratata ca invalida.""" acct_id, user_id, cheie_veche = _create_account_user("rotire@test.com") _login(client, "rotire@test.com", "parolasecreta10") # Rotim cheia (cheia_veche devine revocata) from app.auth import rotate_api_key from app.db import get_connection conn = get_connection() try: cheie_noua = rotate_api_key(conn, acct_id) finally: conn.close() csrf = _get_csrf_from_fragment(client) resp = client.post("/integrare/test-cheie", data={ "csrf_token": csrf, "api_key": cheie_veche, }) assert resp.status_code == 200 body = resp.text.lower() # Cheia veche revocata trebuie respinsa (invalida sau revocata) assert "invalida" in body or "revocata" in body or "negasita" in body or "eroare" in body, \ f"Cheia revocata nu a fost respinsa: {resp.text[:500]}" assert "valida — cont" not in body.lower(), \ f"Cheia revocata a primit fals pozitiv: {resp.text[:500]}" def test_cheie_goala_nu_da_fals_pozitiv_in_dev(client): """Cheie goala sau whitespace -> eroare clara, NU 'valida cont 1' (fals pozitiv dev).""" acct_id, user_id, _ = _create_account_user("goala@test.com") _login(client, "goala@test.com", "parolasecreta10") csrf = _get_csrf_from_fragment(client) for cheie_goala in ["", " ", "\t"]: resp = client.post("/integrare/test-cheie", data={ "csrf_token": csrf, "api_key": cheie_goala, }) assert resp.status_code == 200 body = resp.text.lower() assert "valida — cont" not in body, \ f"Cheie goala '{repr(cheie_goala)}' da fals pozitiv: {resp.text[:500]}" assert "goala" in body or "lipsa" in body or "invalida" in body or "eroare" in body, \ f"Cheie goala '{repr(cheie_goala)}' nu da mesaj de eroare: {resp.text[:500]}" def test_fara_login_redirect_sau_401(monkeypatch): """Fara sesiune + web_auth_required=True -> 303 redirect /login.""" tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t_nl.db")) monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true") from app.config import get_settings get_settings.cache_clear() from app.main import app with TestClient(app, follow_redirects=False) as c: resp = c.post("/integrare/test-cheie", data={ "api_key": "rfak_orice", }) assert resp.status_code in (303, 401), \ f"Trebuia redirect sau 401 fara login, got: {resp.status_code}" if resp.status_code == 303: assert "/login" in resp.headers.get("location", "") get_settings.cache_clear() def test_csrf_lipsa_respinsa(client): """Post cu CSRF invalid/lipsa si sesiune activa -> 403.""" acct_id, user_id, api_key = _create_account_user("csrf_test@test.com") _login(client, "csrf_test@test.com", "parolasecreta10") resp = client.post("/integrare/test-cheie", data={ "csrf_token": "token_gresit_total", "api_key": api_key, }) assert resp.status_code == 403, f"Trebuia 403 la CSRF invalid, got: {resp.status_code}" def test_cheia_nu_apare_in_raspuns_sau_log(client): """Cheia lipita nu trebuie sa apara in body-ul raspunsului (no-echo).""" acct_id, user_id, api_key = _create_account_user("noecho@test.com") _login(client, "noecho@test.com", "parolasecreta10") csrf = _get_csrf_from_fragment(client) resp = client.post("/integrare/test-cheie", data={ "csrf_token": csrf, "api_key": api_key, }) assert resp.status_code == 200 # Cheia completa nu trebuie sa apara in raspuns (nici macar partial daca e unica) assert api_key not in resp.text, \ f"Cheia API a aparut in body raspuns (no-echo violation): {resp.text[:500]}"