Redactare: - handler RequestValidationError dropeaza input/ctx din 422 (vectorul de scurgere a rar_credentials.password pe /v1/prezentari); pastreaza type/loc/msg - app/security.py: scrub/scrub_text + CredentialRedactingFilter pe root+uvicorn - models.py: password cu repr=False Auth API-key: - app/auth.py: hash SHA-256 in api_keys (cheia in clar emisa o singura data), header X-API-Key / Authorization: Bearer, dependency resolve_account_id - enforcement pe flag AUTOPASS_require_api_key (prod on->401, dev off->cont default id=1; cheie prezenta invalida->401 mereu) - account_id real curge din cheie in ingestie + mapare - tools/apikey.py: CLI create/rotate/revoke/list (fara endpoint HTTP admin) 16 teste noi (tests/test_security.py). 85 pass total. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
242 lines
7.8 KiB
Python
242 lines
7.8 KiB
Python
"""Teste CORE securitate: redactare creds (422/log/repr) + auth API-key."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from app.security import (
|
|
MASK,
|
|
CredentialRedactingFilter,
|
|
scrub,
|
|
scrub_text,
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Redactare — pur (scrub / scrub_text / repr) #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def test_scrub_masks_password_keys():
|
|
data = {"email": "x@y.ro", "rar_credentials": {"email": "a", "password": "SECRET"}, "vin": "ABC"}
|
|
out = scrub(data)
|
|
assert out["rar_credentials"] == MASK # subarbore creds mascat integral
|
|
assert out["vin"] == "ABC" # date non-sensibile neatinse
|
|
assert "SECRET" not in repr(out)
|
|
|
|
|
|
def test_scrub_nested_password_in_list():
|
|
data = {"items": [{"password": "p1"}, {"denumire": "ok"}]}
|
|
out = scrub(data)
|
|
assert out["items"][0]["password"] == MASK
|
|
assert out["items"][1]["denumire"] == "ok"
|
|
|
|
|
|
def test_scrub_text_json_form():
|
|
s = '{"email": "x@y.ro", "password": "hunter2"}'
|
|
assert "hunter2" not in scrub_text(s)
|
|
assert MASK in scrub_text(s)
|
|
|
|
|
|
def test_scrub_text_kwargs_and_bearer():
|
|
assert "topsecret" not in scrub_text("login password=topsecret done")
|
|
assert "eyJabc.def" not in scrub_text("Authorization: Bearer eyJabc.def")
|
|
|
|
|
|
def test_scrub_text_keeps_innocent():
|
|
s = "submission 5 -> queued (vin=ABC123)"
|
|
assert scrub_text(s) == s
|
|
|
|
|
|
def test_credentials_repr_hides_password():
|
|
from app.models import RarCredentials
|
|
|
|
c = RarCredentials(email="x@y.ro", password="hunter2")
|
|
assert "hunter2" not in repr(c)
|
|
assert c.password == "hunter2" # valoarea ramane accesibila in cod
|
|
|
|
|
|
def test_logging_filter_redacts(caplog):
|
|
logger = logging.getLogger("test.redact")
|
|
logger.addFilter(CredentialRedactingFilter())
|
|
with caplog.at_level(logging.INFO, logger="test.redact"):
|
|
logger.info('payload {"password": "hunter2"}')
|
|
assert "hunter2" not in caplog.text
|
|
assert MASK in caplog.text
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Integrare API — 422 fara echo creds + auth API-key #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
@pytest.fixture()
|
|
def env(monkeypatch):
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
|
|
from app.config import get_settings
|
|
|
|
get_settings.cache_clear()
|
|
yield monkeypatch
|
|
get_settings.cache_clear()
|
|
|
|
|
|
def _client():
|
|
from app.main import app
|
|
|
|
return TestClient(app)
|
|
|
|
|
|
def _body(**over):
|
|
prez = {
|
|
"vin": "WVWZZZ1KZAW000123",
|
|
"nr_inmatriculare": "B999TST",
|
|
"data_prestatie": "2026-06-15",
|
|
"odometru_final": "123456",
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}
|
|
prez.update(over)
|
|
return {"rar_credentials": {"email": "x@y.ro", "password": "HUNTER2SECRET"}, "prezentari": [prez]}
|
|
|
|
|
|
def test_422_does_not_echo_password(env):
|
|
with _client() as c:
|
|
# Lipseste odometru_final -> 422 de shape; parola NU trebuie sa apara in raspuns.
|
|
bad = _body()
|
|
del bad["prezentari"][0]["odometru_final"]
|
|
r = c.post("/v1/prezentari", json=bad)
|
|
assert r.status_code == 422
|
|
assert "HUNTER2SECRET" not in r.text
|
|
# Pastram totusi info utila clientului (ce camp lipseste).
|
|
assert any("odometru_final" in str(e.get("loc")) for e in r.json()["detail"])
|
|
# input/ctx eliminate (vectorul de scurgere).
|
|
assert all("input" not in e for e in r.json()["detail"])
|
|
|
|
|
|
def test_422_missing_credentials_no_leak(env):
|
|
with _client() as c:
|
|
bad = _body()
|
|
bad["rar_credentials"]["password"] = 12345 # tip gresit -> 422 cu input=parola
|
|
r = c.post("/v1/prezentari", json=bad)
|
|
assert r.status_code == 422
|
|
assert "12345" not in r.text
|
|
|
|
|
|
def test_dev_no_key_uses_default_account(env):
|
|
# Flag off (default): fara cheie -> cont implicit, merge ca inainte.
|
|
with _client() as c:
|
|
r = c.post("/v1/prezentari", json=_body())
|
|
assert r.status_code == 200
|
|
assert r.json()["results"][0]["status"] == "queued"
|
|
|
|
|
|
def test_prod_requires_key(env):
|
|
env.setenv("AUTOPASS_REQUIRE_API_KEY", "true")
|
|
from app.config import get_settings
|
|
|
|
get_settings.cache_clear()
|
|
with _client() as c:
|
|
r = c.post("/v1/prezentari", json=_body())
|
|
assert r.status_code == 401
|
|
# Cu cheie valida emisa via lifecycle -> trece.
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
key = create_api_key(conn, 1)
|
|
finally:
|
|
conn.close()
|
|
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": key})
|
|
assert r2.status_code == 200
|
|
assert r2.json()["results"][0]["status"] == "queued"
|
|
|
|
|
|
def test_invalid_key_rejected_even_in_dev(env):
|
|
# Flag off, dar o cheie PREZENTA si gresita nu trebuie sa treaca tacit.
|
|
with _client() as c:
|
|
r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": "rfak_gresita"})
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_bearer_header_accepted(env):
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
key = create_api_key(conn, 1)
|
|
finally:
|
|
conn.close()
|
|
r = c.post("/v1/prezentari", json=_body(), headers={"Authorization": f"Bearer {key}"})
|
|
assert r.status_code == 200
|
|
|
|
|
|
def test_key_account_routes_idempotency(env):
|
|
# Doua conturi, aceeasi prezentare -> submission-uri distincte (account_id in cheie).
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
|
|
k1 = create_api_key(conn, 1)
|
|
k2 = create_api_key(conn, 2)
|
|
finally:
|
|
conn.close()
|
|
r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
|
|
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k2})
|
|
id1 = r1.json()["results"][0]["submission_id"]
|
|
res2 = r2.json()["results"][0]
|
|
assert res2["submission_id"] != id1
|
|
assert res2["deduped"] is False
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Lifecycle chei — create / rotate / revoke #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def test_key_lifecycle(env):
|
|
from app.auth import account_for_key, create_api_key, hash_key, revoke_api_key, rotate_api_key
|
|
from app.db import get_connection, init_db
|
|
|
|
init_db()
|
|
conn = get_connection()
|
|
try:
|
|
key = create_api_key(conn, 1)
|
|
assert key.startswith("rfak_")
|
|
assert account_for_key(conn, key) == 1
|
|
|
|
# Rotire: cheia veche moare, una noua o inlocuieste.
|
|
new_key = rotate_api_key(conn, 1)
|
|
assert account_for_key(conn, key) is None
|
|
assert account_for_key(conn, new_key) == 1
|
|
|
|
# Revocare directa dupa id.
|
|
kid = conn.execute(
|
|
"SELECT id FROM api_keys WHERE key_hash=?", (hash_key(new_key),)
|
|
).fetchone()["id"]
|
|
assert revoke_api_key(conn, kid) is True
|
|
assert account_for_key(conn, new_key) is None
|
|
assert revoke_api_key(conn, kid) is False # deja revocata
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_create_key_unknown_account(env):
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection, init_db
|
|
|
|
init_db()
|
|
conn = get_connection()
|
|
try:
|
|
with pytest.raises(ValueError):
|
|
create_api_key(conn, 999)
|
|
finally:
|
|
conn.close()
|