Files
rar-autopass/tests/test_security.py
Claude Agent c17c1aa4f4 feat(securitate-CORE): redactare creds + auth API-key per cont
Redactare:
- handler RequestValidationError dropeaza input/ctx din 422 (vectorul de
  scurgere a rar_credentials.password pe /v1/prezentari); pastreaza type/loc/msg
- app/security.py: scrub/scrub_text + CredentialRedactingFilter pe root+uvicorn
- models.py: password cu repr=False

Auth API-key:
- app/auth.py: hash SHA-256 in api_keys (cheia in clar emisa o singura data),
  header X-API-Key / Authorization: Bearer, dependency resolve_account_id
- enforcement pe flag AUTOPASS_require_api_key (prod on->401, dev off->cont
  default id=1; cheie prezenta invalida->401 mereu)
- account_id real curge din cheie in ingestie + mapare
- tools/apikey.py: CLI create/rotate/revoke/list (fara endpoint HTTP admin)

16 teste noi (tests/test_security.py). 85 pass total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 20:02:07 +00:00

242 lines
7.8 KiB
Python

"""Teste CORE securitate: redactare creds (422/log/repr) + auth API-key."""
from __future__ import annotations
import logging
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
from app.security import (
MASK,
CredentialRedactingFilter,
scrub,
scrub_text,
)
# --------------------------------------------------------------------------- #
# Redactare — pur (scrub / scrub_text / repr) #
# --------------------------------------------------------------------------- #
def test_scrub_masks_password_keys():
data = {"email": "x@y.ro", "rar_credentials": {"email": "a", "password": "SECRET"}, "vin": "ABC"}
out = scrub(data)
assert out["rar_credentials"] == MASK # subarbore creds mascat integral
assert out["vin"] == "ABC" # date non-sensibile neatinse
assert "SECRET" not in repr(out)
def test_scrub_nested_password_in_list():
data = {"items": [{"password": "p1"}, {"denumire": "ok"}]}
out = scrub(data)
assert out["items"][0]["password"] == MASK
assert out["items"][1]["denumire"] == "ok"
def test_scrub_text_json_form():
s = '{"email": "x@y.ro", "password": "hunter2"}'
assert "hunter2" not in scrub_text(s)
assert MASK in scrub_text(s)
def test_scrub_text_kwargs_and_bearer():
assert "topsecret" not in scrub_text("login password=topsecret done")
assert "eyJabc.def" not in scrub_text("Authorization: Bearer eyJabc.def")
def test_scrub_text_keeps_innocent():
s = "submission 5 -> queued (vin=ABC123)"
assert scrub_text(s) == s
def test_credentials_repr_hides_password():
from app.models import RarCredentials
c = RarCredentials(email="x@y.ro", password="hunter2")
assert "hunter2" not in repr(c)
assert c.password == "hunter2" # valoarea ramane accesibila in cod
def test_logging_filter_redacts(caplog):
logger = logging.getLogger("test.redact")
logger.addFilter(CredentialRedactingFilter())
with caplog.at_level(logging.INFO, logger="test.redact"):
logger.info('payload {"password": "hunter2"}')
assert "hunter2" not in caplog.text
assert MASK in caplog.text
# --------------------------------------------------------------------------- #
# Integrare API — 422 fara echo creds + auth API-key #
# --------------------------------------------------------------------------- #
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
yield monkeypatch
get_settings.cache_clear()
def _client():
from app.main import app
return TestClient(app)
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "HUNTER2SECRET"}, "prezentari": [prez]}
def test_422_does_not_echo_password(env):
with _client() as c:
# Lipseste odometru_final -> 422 de shape; parola NU trebuie sa apara in raspuns.
bad = _body()
del bad["prezentari"][0]["odometru_final"]
r = c.post("/v1/prezentari", json=bad)
assert r.status_code == 422
assert "HUNTER2SECRET" not in r.text
# Pastram totusi info utila clientului (ce camp lipseste).
assert any("odometru_final" in str(e.get("loc")) for e in r.json()["detail"])
# input/ctx eliminate (vectorul de scurgere).
assert all("input" not in e for e in r.json()["detail"])
def test_422_missing_credentials_no_leak(env):
with _client() as c:
bad = _body()
bad["rar_credentials"]["password"] = 12345 # tip gresit -> 422 cu input=parola
r = c.post("/v1/prezentari", json=bad)
assert r.status_code == 422
assert "12345" not in r.text
def test_dev_no_key_uses_default_account(env):
# Flag off (default): fara cheie -> cont implicit, merge ca inainte.
with _client() as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 200
assert r.json()["results"][0]["status"] == "queued"
def test_prod_requires_key(env):
env.setenv("AUTOPASS_REQUIRE_API_KEY", "true")
from app.config import get_settings
get_settings.cache_clear()
with _client() as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 401
# Cu cheie valida emisa via lifecycle -> trece.
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
key = create_api_key(conn, 1)
finally:
conn.close()
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": key})
assert r2.status_code == 200
assert r2.json()["results"][0]["status"] == "queued"
def test_invalid_key_rejected_even_in_dev(env):
# Flag off, dar o cheie PREZENTA si gresita nu trebuie sa treaca tacit.
with _client() as c:
r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": "rfak_gresita"})
assert r.status_code == 401
def test_bearer_header_accepted(env):
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
key = create_api_key(conn, 1)
finally:
conn.close()
r = c.post("/v1/prezentari", json=_body(), headers={"Authorization": f"Bearer {key}"})
assert r.status_code == 200
def test_key_account_routes_idempotency(env):
# Doua conturi, aceeasi prezentare -> submission-uri distincte (account_id in cheie).
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
finally:
conn.close()
r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k2})
id1 = r1.json()["results"][0]["submission_id"]
res2 = r2.json()["results"][0]
assert res2["submission_id"] != id1
assert res2["deduped"] is False
# --------------------------------------------------------------------------- #
# Lifecycle chei — create / rotate / revoke #
# --------------------------------------------------------------------------- #
def test_key_lifecycle(env):
from app.auth import account_for_key, create_api_key, hash_key, revoke_api_key, rotate_api_key
from app.db import get_connection, init_db
init_db()
conn = get_connection()
try:
key = create_api_key(conn, 1)
assert key.startswith("rfak_")
assert account_for_key(conn, key) == 1
# Rotire: cheia veche moare, una noua o inlocuieste.
new_key = rotate_api_key(conn, 1)
assert account_for_key(conn, key) is None
assert account_for_key(conn, new_key) == 1
# Revocare directa dupa id.
kid = conn.execute(
"SELECT id FROM api_keys WHERE key_hash=?", (hash_key(new_key),)
).fetchone()["id"]
assert revoke_api_key(conn, kid) is True
assert account_for_key(conn, new_key) is None
assert revoke_api_key(conn, kid) is False # deja revocata
finally:
conn.close()
def test_create_key_unknown_account(env):
from app.auth import create_api_key
from app.db import get_connection, init_db
init_db()
conn = get_connection()
try:
with pytest.raises(ValueError):
create_api_key(conn, 999)
finally:
conn.close()