Files
rar-autopass/tests/test_security.py
Claude Agent c9f9a1ca0e feat(5.16+5.17): tipografie/antet branded + tipuri cont, planuri si enforcement
PRD 5.16 — propagare design finalizata (system font stack, fara IBM Plex self-hostat):
- US-001/002/008: tokeni --font-ui/--font-mono (system stack) + scala --fs-*; zero
  @font-face si zero /static/fonts/; landing aliniat la acelasi stack
- US-003: RAR online = dot compact in antet + meniu burger; banda rosie DOAR pe blocat
  (invariant zero-silent-failures pastrat)
- US-010: antet "ROMFAST AUTOPASS" + nume service + /login brandeit 2 coloane + badge plan;
  meniu burger cu separatoare; gate strict pe is_authenticated
- US-011: selector tema pill icon+eticheta (reuse THEMES)
- US-004/005/006/007: bug-fix editor prestatii (picker cod+denumire, add_extra in mod
  operatii, cod ales se salveaza fara "+", Renunta inchide via closest)
- US-012/013: landing Autentificare->/login; wizard import colapsat + 4 pasi pe tokeni
- fix VERIFY E2E: contoare duplicate pe 390px (inline display:flex batea @media) -> CSS + test-lock

PRD 5.17 — tipuri de cont + trial Pro 30z + enforcement DUR:
- US-001/002/008: accounts.tier + trial_until (migrare aditiva defensiva); app/plans.py
  sursa unica (PLANS, FREE_MONTHLY_LIMIT=60, effective_tier(now injectabil), monthly_usage,
  CONSUMED_STATUSES); create_account trial Pro 30z; CLI set-tier (protejat id=1, audit)
- US-003/004/005: enforce volum 60/luna INAINTE de build_key pe ambele canale
  (PLAN_LIMITA_LUNARA, 3 niveluri + log_event); gate API Pro+ (PLAN_FARA_API 403 actionabil);
  valideaza/nomenclator raman permise; downgrade lazy; flag AUTOPASS_ENFORCE_PLANS (kill-switch)
- US-006: badge plan antet + linie burger + consum N/60 + warn>=80% + 6 stari + copy RO
  pluralizat + banner one-time trial->Gratuit + pagina Cont

Regresie: 1380 passed, 0 failed, 1 deselected (live). E2E browser pe 390/1280 confirmat.
Backend trimitere (worker/masina stari/idempotenta/contract RAR) NEATINS. Lucrul 5.18
(corpus kNN) ramane separat, necomis.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 06:02:40 +00:00

246 lines
8.2 KiB
Python

"""Teste CORE securitate: redactare creds (422/log/repr) + auth API-key."""
from __future__ import annotations
import logging
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
from app.security import (
MASK,
CredentialRedactingFilter,
scrub,
scrub_text,
)
# --------------------------------------------------------------------------- #
# Redactare — pur (scrub / scrub_text / repr) #
# --------------------------------------------------------------------------- #
def test_scrub_masks_password_keys():
data = {"email": "x@y.ro", "rar_credentials": {"email": "a", "password": "SECRET"}, "vin": "ABC"}
out = scrub(data)
assert out["rar_credentials"] == MASK # subarbore creds mascat integral
assert out["vin"] == "ABC" # date non-sensibile neatinse
assert "SECRET" not in repr(out)
def test_scrub_nested_password_in_list():
data = {"items": [{"password": "p1"}, {"denumire": "ok"}]}
out = scrub(data)
assert out["items"][0]["password"] == MASK
assert out["items"][1]["denumire"] == "ok"
def test_scrub_text_json_form():
s = '{"email": "x@y.ro", "password": "hunter2"}'
assert "hunter2" not in scrub_text(s)
assert MASK in scrub_text(s)
def test_scrub_text_kwargs_and_bearer():
assert "topsecret" not in scrub_text("login password=topsecret done")
assert "eyJabc.def" not in scrub_text("Authorization: Bearer eyJabc.def")
def test_scrub_text_keeps_innocent():
s = "submission 5 -> queued (vin=ABC123)"
assert scrub_text(s) == s
def test_credentials_repr_hides_password():
from app.models import RarCredentials
c = RarCredentials(email="x@y.ro", password="hunter2")
assert "hunter2" not in repr(c)
assert c.password == "hunter2" # valoarea ramane accesibila in cod
def test_logging_filter_redacts(caplog):
logger = logging.getLogger("test.redact")
logger.addFilter(CredentialRedactingFilter())
with caplog.at_level(logging.INFO, logger="test.redact"):
logger.info('payload {"password": "hunter2"}')
assert "hunter2" not in caplog.text
assert MASK in caplog.text
# --------------------------------------------------------------------------- #
# Integrare API — 422 fara echo creds + auth API-key #
# --------------------------------------------------------------------------- #
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
yield monkeypatch
get_settings.cache_clear()
def _client():
from app.main import app
return TestClient(app)
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "HUNTER2SECRET"}, "prezentari": [prez]}
def test_422_does_not_echo_password(env):
with _client() as c:
# Lipseste odometru_final -> 422 de shape; parola NU trebuie sa apara in raspuns.
bad = _body()
del bad["prezentari"][0]["odometru_final"]
r = c.post("/v1/prezentari", json=bad)
assert r.status_code == 422
assert "HUNTER2SECRET" not in r.text
# Pastram totusi info utila clientului (ce camp lipseste).
assert any("odometru_final" in str(e.get("loc")) for e in r.json()["detail"])
# input/ctx eliminate (vectorul de scurgere).
assert all("input" not in e for e in r.json()["detail"])
def test_422_missing_credentials_no_leak(env):
with _client() as c:
bad = _body()
bad["rar_credentials"]["password"] = 12345 # tip gresit -> 422 cu input=parola
r = c.post("/v1/prezentari", json=bad)
assert r.status_code == 422
assert "12345" not in r.text
def test_dev_no_key_uses_default_account(env):
# Flag off (default): fara cheie -> cont implicit, merge ca inainte.
with _client() as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 200
assert r.json()["results"][0]["status"] == "queued"
def test_prod_requires_key(env):
env.setenv("AUTOPASS_REQUIRE_API_KEY", "true")
from app.config import get_settings
get_settings.cache_clear()
with _client() as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 401
# Cu cheie valida emisa via lifecycle -> trece.
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
key = create_api_key(conn, 1)
# Testul verifica autentificarea, nu planul — tier='pro' ca sa treaca gate-ul API (T4).
conn.execute("UPDATE accounts SET tier='pro' WHERE id=1")
finally:
conn.close()
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": key})
assert r2.status_code == 200
assert r2.json()["results"][0]["status"] == "queued"
def test_invalid_key_rejected_even_in_dev(env):
# Flag off, dar o cheie PREZENTA si gresita nu trebuie sa treaca tacit.
with _client() as c:
r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": "rfak_gresita"})
assert r.status_code == 401
def test_bearer_header_accepted(env):
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
key = create_api_key(conn, 1)
finally:
conn.close()
r = c.post("/v1/prezentari", json=_body(), headers={"Authorization": f"Bearer {key}"})
assert r.status_code == 200
def test_key_account_routes_idempotency(env):
# Doua conturi, aceeasi prezentare -> submission-uri distincte (account_id in cheie).
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
# tier='pro' pe ambele conturi — testul verifica idempotenta, nu planuri (T4 PRD 5.17).
conn.execute("UPDATE accounts SET tier='pro' WHERE id=1")
conn.execute("INSERT INTO accounts (id, name, tier) VALUES (2, 'al-doilea', 'pro')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
finally:
conn.close()
r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k2})
id1 = r1.json()["results"][0]["submission_id"]
res2 = r2.json()["results"][0]
assert res2["submission_id"] != id1
assert res2["deduped"] is False
# --------------------------------------------------------------------------- #
# Lifecycle chei — create / rotate / revoke #
# --------------------------------------------------------------------------- #
def test_key_lifecycle(env):
from app.auth import account_for_key, create_api_key, hash_key, revoke_api_key, rotate_api_key
from app.db import get_connection, init_db
init_db()
conn = get_connection()
try:
key = create_api_key(conn, 1)
assert key.startswith("rfak_")
assert account_for_key(conn, key) == 1
# Rotire: cheia veche moare, una noua o inlocuieste.
new_key = rotate_api_key(conn, 1)
assert account_for_key(conn, key) is None
assert account_for_key(conn, new_key) == 1
# Revocare directa dupa id.
kid = conn.execute(
"SELECT id FROM api_keys WHERE key_hash=?", (hash_key(new_key),)
).fetchone()["id"]
assert revoke_api_key(conn, kid) is True
assert account_for_key(conn, new_key) is None
assert revoke_api_key(conn, kid) is False # deja revocata
finally:
conn.close()
def test_create_key_unknown_account(env):
from app.auth import create_api_key
from app.db import get_connection, init_db
init_db()
conn = get_connection()
try:
with pytest.raises(ValueError):
create_api_key(conn, 999)
finally:
conn.close()