"""Teste CORE securitate: redactare creds (422/log/repr) + auth API-key.""" from __future__ import annotations import logging import os import tempfile import pytest from fastapi.testclient import TestClient from app.security import ( MASK, CredentialRedactingFilter, scrub, scrub_text, ) # --------------------------------------------------------------------------- # # Redactare — pur (scrub / scrub_text / repr) # # --------------------------------------------------------------------------- # def test_scrub_masks_password_keys(): data = {"email": "x@y.ro", "rar_credentials": {"email": "a", "password": "SECRET"}, "vin": "ABC"} out = scrub(data) assert out["rar_credentials"] == MASK # subarbore creds mascat integral assert out["vin"] == "ABC" # date non-sensibile neatinse assert "SECRET" not in repr(out) def test_scrub_nested_password_in_list(): data = {"items": [{"password": "p1"}, {"denumire": "ok"}]} out = scrub(data) assert out["items"][0]["password"] == MASK assert out["items"][1]["denumire"] == "ok" def test_scrub_text_json_form(): s = '{"email": "x@y.ro", "password": "hunter2"}' assert "hunter2" not in scrub_text(s) assert MASK in scrub_text(s) def test_scrub_text_kwargs_and_bearer(): assert "topsecret" not in scrub_text("login password=topsecret done") assert "eyJabc.def" not in scrub_text("Authorization: Bearer eyJabc.def") def test_scrub_text_keeps_innocent(): s = "submission 5 -> queued (vin=ABC123)" assert scrub_text(s) == s def test_credentials_repr_hides_password(): from app.models import RarCredentials c = RarCredentials(email="x@y.ro", password="hunter2") assert "hunter2" not in repr(c) assert c.password == "hunter2" # valoarea ramane accesibila in cod def test_logging_filter_redacts(caplog): logger = logging.getLogger("test.redact") logger.addFilter(CredentialRedactingFilter()) with caplog.at_level(logging.INFO, logger="test.redact"): logger.info('payload {"password": "hunter2"}') assert "hunter2" not in caplog.text assert MASK in caplog.text # --------------------------------------------------------------------------- # # Integrare API — 422 fara echo creds + auth API-key # # --------------------------------------------------------------------------- # @pytest.fixture() def env(monkeypatch): tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) from app.config import get_settings get_settings.cache_clear() yield monkeypatch get_settings.cache_clear() def _client(): from app.main import app return TestClient(app) def _body(**over): prez = { "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B999TST", "data_prestatie": "2026-06-15", "odometru_final": "123456", "prestatii": [{"cod_prestatie": "OE-1"}], } prez.update(over) return {"rar_credentials": {"email": "x@y.ro", "password": "HUNTER2SECRET"}, "prezentari": [prez]} def test_422_does_not_echo_password(env): with _client() as c: # Lipseste odometru_final -> 422 de shape; parola NU trebuie sa apara in raspuns. bad = _body() del bad["prezentari"][0]["odometru_final"] r = c.post("/v1/prezentari", json=bad) assert r.status_code == 422 assert "HUNTER2SECRET" not in r.text # Pastram totusi info utila clientului (ce camp lipseste). assert any("odometru_final" in str(e.get("loc")) for e in r.json()["detail"]) # input/ctx eliminate (vectorul de scurgere). assert all("input" not in e for e in r.json()["detail"]) def test_422_missing_credentials_no_leak(env): with _client() as c: bad = _body() bad["rar_credentials"]["password"] = 12345 # tip gresit -> 422 cu input=parola r = c.post("/v1/prezentari", json=bad) assert r.status_code == 422 assert "12345" not in r.text def test_dev_no_key_uses_default_account(env): # Flag off (default): fara cheie -> cont implicit, merge ca inainte. with _client() as c: r = c.post("/v1/prezentari", json=_body()) assert r.status_code == 200 assert r.json()["results"][0]["status"] == "queued" def test_prod_requires_key(env): env.setenv("AUTOPASS_REQUIRE_API_KEY", "true") from app.config import get_settings get_settings.cache_clear() with _client() as c: r = c.post("/v1/prezentari", json=_body()) assert r.status_code == 401 # Cu cheie valida emisa via lifecycle -> trece. from app.auth import create_api_key from app.db import get_connection conn = get_connection() try: key = create_api_key(conn, 1) # Testul verifica autentificarea, nu planul — tier='pro' ca sa treaca gate-ul API (T4). conn.execute("UPDATE accounts SET tier='pro' WHERE id=1") finally: conn.close() r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": key}) assert r2.status_code == 200 assert r2.json()["results"][0]["status"] == "queued" def test_invalid_key_rejected_even_in_dev(env): # Flag off, dar o cheie PREZENTA si gresita nu trebuie sa treaca tacit. with _client() as c: r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": "rfak_gresita"}) assert r.status_code == 401 def test_bearer_header_accepted(env): with _client() as c: from app.auth import create_api_key from app.db import get_connection conn = get_connection() try: key = create_api_key(conn, 1) finally: conn.close() r = c.post("/v1/prezentari", json=_body(), headers={"Authorization": f"Bearer {key}"}) assert r.status_code == 200 def test_key_account_routes_idempotency(env): # Doua conturi, aceeasi prezentare -> submission-uri distincte (account_id in cheie). with _client() as c: from app.auth import create_api_key from app.db import get_connection conn = get_connection() try: # tier='pro' pe ambele conturi — testul verifica idempotenta, nu planuri (T4 PRD 5.17). conn.execute("UPDATE accounts SET tier='pro' WHERE id=1") conn.execute("INSERT INTO accounts (id, name, tier) VALUES (2, 'al-doilea', 'pro')") k1 = create_api_key(conn, 1) k2 = create_api_key(conn, 2) finally: conn.close() r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1}) r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k2}) id1 = r1.json()["results"][0]["submission_id"] res2 = r2.json()["results"][0] assert res2["submission_id"] != id1 assert res2["deduped"] is False # --------------------------------------------------------------------------- # # Lifecycle chei — create / rotate / revoke # # --------------------------------------------------------------------------- # def test_key_lifecycle(env): from app.auth import account_for_key, create_api_key, hash_key, revoke_api_key, rotate_api_key from app.db import get_connection, init_db init_db() conn = get_connection() try: key = create_api_key(conn, 1) assert key.startswith("rfak_") assert account_for_key(conn, key) == 1 # Rotire: cheia veche moare, una noua o inlocuieste. new_key = rotate_api_key(conn, 1) assert account_for_key(conn, key) is None assert account_for_key(conn, new_key) == 1 # Revocare directa dupa id. kid = conn.execute( "SELECT id FROM api_keys WHERE key_hash=?", (hash_key(new_key),) ).fetchone()["id"] assert revoke_api_key(conn, kid) is True assert account_for_key(conn, new_key) is None assert revoke_api_key(conn, kid) is False # deja revocata finally: conn.close() def test_create_key_unknown_account(env): from app.auth import create_api_key from app.db import get_connection, init_db init_db() conn = get_connection() try: with pytest.raises(ValueError): create_api_key(conn, 999) finally: conn.close()