PRD 5.16 — propagare design finalizata (system font stack, fara IBM Plex self-hostat): - US-001/002/008: tokeni --font-ui/--font-mono (system stack) + scala --fs-*; zero @font-face si zero /static/fonts/; landing aliniat la acelasi stack - US-003: RAR online = dot compact in antet + meniu burger; banda rosie DOAR pe blocat (invariant zero-silent-failures pastrat) - US-010: antet "ROMFAST AUTOPASS" + nume service + /login brandeit 2 coloane + badge plan; meniu burger cu separatoare; gate strict pe is_authenticated - US-011: selector tema pill icon+eticheta (reuse THEMES) - US-004/005/006/007: bug-fix editor prestatii (picker cod+denumire, add_extra in mod operatii, cod ales se salveaza fara "+", Renunta inchide via closest) - US-012/013: landing Autentificare->/login; wizard import colapsat + 4 pasi pe tokeni - fix VERIFY E2E: contoare duplicate pe 390px (inline display:flex batea @media) -> CSS + test-lock PRD 5.17 — tipuri de cont + trial Pro 30z + enforcement DUR: - US-001/002/008: accounts.tier + trial_until (migrare aditiva defensiva); app/plans.py sursa unica (PLANS, FREE_MONTHLY_LIMIT=60, effective_tier(now injectabil), monthly_usage, CONSUMED_STATUSES); create_account trial Pro 30z; CLI set-tier (protejat id=1, audit) - US-003/004/005: enforce volum 60/luna INAINTE de build_key pe ambele canale (PLAN_LIMITA_LUNARA, 3 niveluri + log_event); gate API Pro+ (PLAN_FARA_API 403 actionabil); valideaza/nomenclator raman permise; downgrade lazy; flag AUTOPASS_ENFORCE_PLANS (kill-switch) - US-006: badge plan antet + linie burger + consum N/60 + warn>=80% + 6 stari + copy RO pluralizat + banner one-time trial->Gratuit + pagina Cont Regresie: 1380 passed, 0 failed, 1 deselected (live). E2E browser pe 390/1280 confirmat. Backend trimitere (worker/masina stari/idempotenta/contract RAR) NEATINS. Lucrul 5.18 (corpus kNN) ramane separat, necomis. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
246 lines
8.2 KiB
Python
246 lines
8.2 KiB
Python
"""Teste CORE securitate: redactare creds (422/log/repr) + auth API-key."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from app.security import (
|
|
MASK,
|
|
CredentialRedactingFilter,
|
|
scrub,
|
|
scrub_text,
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Redactare — pur (scrub / scrub_text / repr) #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def test_scrub_masks_password_keys():
|
|
data = {"email": "x@y.ro", "rar_credentials": {"email": "a", "password": "SECRET"}, "vin": "ABC"}
|
|
out = scrub(data)
|
|
assert out["rar_credentials"] == MASK # subarbore creds mascat integral
|
|
assert out["vin"] == "ABC" # date non-sensibile neatinse
|
|
assert "SECRET" not in repr(out)
|
|
|
|
|
|
def test_scrub_nested_password_in_list():
|
|
data = {"items": [{"password": "p1"}, {"denumire": "ok"}]}
|
|
out = scrub(data)
|
|
assert out["items"][0]["password"] == MASK
|
|
assert out["items"][1]["denumire"] == "ok"
|
|
|
|
|
|
def test_scrub_text_json_form():
|
|
s = '{"email": "x@y.ro", "password": "hunter2"}'
|
|
assert "hunter2" not in scrub_text(s)
|
|
assert MASK in scrub_text(s)
|
|
|
|
|
|
def test_scrub_text_kwargs_and_bearer():
|
|
assert "topsecret" not in scrub_text("login password=topsecret done")
|
|
assert "eyJabc.def" not in scrub_text("Authorization: Bearer eyJabc.def")
|
|
|
|
|
|
def test_scrub_text_keeps_innocent():
|
|
s = "submission 5 -> queued (vin=ABC123)"
|
|
assert scrub_text(s) == s
|
|
|
|
|
|
def test_credentials_repr_hides_password():
|
|
from app.models import RarCredentials
|
|
|
|
c = RarCredentials(email="x@y.ro", password="hunter2")
|
|
assert "hunter2" not in repr(c)
|
|
assert c.password == "hunter2" # valoarea ramane accesibila in cod
|
|
|
|
|
|
def test_logging_filter_redacts(caplog):
|
|
logger = logging.getLogger("test.redact")
|
|
logger.addFilter(CredentialRedactingFilter())
|
|
with caplog.at_level(logging.INFO, logger="test.redact"):
|
|
logger.info('payload {"password": "hunter2"}')
|
|
assert "hunter2" not in caplog.text
|
|
assert MASK in caplog.text
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Integrare API — 422 fara echo creds + auth API-key #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
@pytest.fixture()
|
|
def env(monkeypatch):
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
|
|
from app.config import get_settings
|
|
|
|
get_settings.cache_clear()
|
|
yield monkeypatch
|
|
get_settings.cache_clear()
|
|
|
|
|
|
def _client():
|
|
from app.main import app
|
|
|
|
return TestClient(app)
|
|
|
|
|
|
def _body(**over):
|
|
prez = {
|
|
"vin": "WVWZZZ1KZAW000123",
|
|
"nr_inmatriculare": "B999TST",
|
|
"data_prestatie": "2026-06-15",
|
|
"odometru_final": "123456",
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}
|
|
prez.update(over)
|
|
return {"rar_credentials": {"email": "x@y.ro", "password": "HUNTER2SECRET"}, "prezentari": [prez]}
|
|
|
|
|
|
def test_422_does_not_echo_password(env):
|
|
with _client() as c:
|
|
# Lipseste odometru_final -> 422 de shape; parola NU trebuie sa apara in raspuns.
|
|
bad = _body()
|
|
del bad["prezentari"][0]["odometru_final"]
|
|
r = c.post("/v1/prezentari", json=bad)
|
|
assert r.status_code == 422
|
|
assert "HUNTER2SECRET" not in r.text
|
|
# Pastram totusi info utila clientului (ce camp lipseste).
|
|
assert any("odometru_final" in str(e.get("loc")) for e in r.json()["detail"])
|
|
# input/ctx eliminate (vectorul de scurgere).
|
|
assert all("input" not in e for e in r.json()["detail"])
|
|
|
|
|
|
def test_422_missing_credentials_no_leak(env):
|
|
with _client() as c:
|
|
bad = _body()
|
|
bad["rar_credentials"]["password"] = 12345 # tip gresit -> 422 cu input=parola
|
|
r = c.post("/v1/prezentari", json=bad)
|
|
assert r.status_code == 422
|
|
assert "12345" not in r.text
|
|
|
|
|
|
def test_dev_no_key_uses_default_account(env):
|
|
# Flag off (default): fara cheie -> cont implicit, merge ca inainte.
|
|
with _client() as c:
|
|
r = c.post("/v1/prezentari", json=_body())
|
|
assert r.status_code == 200
|
|
assert r.json()["results"][0]["status"] == "queued"
|
|
|
|
|
|
def test_prod_requires_key(env):
|
|
env.setenv("AUTOPASS_REQUIRE_API_KEY", "true")
|
|
from app.config import get_settings
|
|
|
|
get_settings.cache_clear()
|
|
with _client() as c:
|
|
r = c.post("/v1/prezentari", json=_body())
|
|
assert r.status_code == 401
|
|
# Cu cheie valida emisa via lifecycle -> trece.
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
key = create_api_key(conn, 1)
|
|
# Testul verifica autentificarea, nu planul — tier='pro' ca sa treaca gate-ul API (T4).
|
|
conn.execute("UPDATE accounts SET tier='pro' WHERE id=1")
|
|
finally:
|
|
conn.close()
|
|
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": key})
|
|
assert r2.status_code == 200
|
|
assert r2.json()["results"][0]["status"] == "queued"
|
|
|
|
|
|
def test_invalid_key_rejected_even_in_dev(env):
|
|
# Flag off, dar o cheie PREZENTA si gresita nu trebuie sa treaca tacit.
|
|
with _client() as c:
|
|
r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": "rfak_gresita"})
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_bearer_header_accepted(env):
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
key = create_api_key(conn, 1)
|
|
finally:
|
|
conn.close()
|
|
r = c.post("/v1/prezentari", json=_body(), headers={"Authorization": f"Bearer {key}"})
|
|
assert r.status_code == 200
|
|
|
|
|
|
def test_key_account_routes_idempotency(env):
|
|
# Doua conturi, aceeasi prezentare -> submission-uri distincte (account_id in cheie).
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
# tier='pro' pe ambele conturi — testul verifica idempotenta, nu planuri (T4 PRD 5.17).
|
|
conn.execute("UPDATE accounts SET tier='pro' WHERE id=1")
|
|
conn.execute("INSERT INTO accounts (id, name, tier) VALUES (2, 'al-doilea', 'pro')")
|
|
k1 = create_api_key(conn, 1)
|
|
k2 = create_api_key(conn, 2)
|
|
finally:
|
|
conn.close()
|
|
r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
|
|
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k2})
|
|
id1 = r1.json()["results"][0]["submission_id"]
|
|
res2 = r2.json()["results"][0]
|
|
assert res2["submission_id"] != id1
|
|
assert res2["deduped"] is False
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Lifecycle chei — create / rotate / revoke #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def test_key_lifecycle(env):
|
|
from app.auth import account_for_key, create_api_key, hash_key, revoke_api_key, rotate_api_key
|
|
from app.db import get_connection, init_db
|
|
|
|
init_db()
|
|
conn = get_connection()
|
|
try:
|
|
key = create_api_key(conn, 1)
|
|
assert key.startswith("rfak_")
|
|
assert account_for_key(conn, key) == 1
|
|
|
|
# Rotire: cheia veche moare, una noua o inlocuieste.
|
|
new_key = rotate_api_key(conn, 1)
|
|
assert account_for_key(conn, key) is None
|
|
assert account_for_key(conn, new_key) == 1
|
|
|
|
# Revocare directa dupa id.
|
|
kid = conn.execute(
|
|
"SELECT id FROM api_keys WHERE key_hash=?", (hash_key(new_key),)
|
|
).fetchone()["id"]
|
|
assert revoke_api_key(conn, kid) is True
|
|
assert account_for_key(conn, new_key) is None
|
|
assert revoke_api_key(conn, kid) is False # deja revocata
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_create_key_unknown_account(env):
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection, init_db
|
|
|
|
init_db()
|
|
conn = get_connection()
|
|
try:
|
|
with pytest.raises(ValueError):
|
|
create_api_key(conn, 999)
|
|
finally:
|
|
conn.close()
|