Files
rar-autopass/tests/test_creds_delivery.py
Claude Agent fbb2695336 feat(creds): livrare creds per-cerere la worker (criptat efemer + sesiuni per-cont)
Plan sect.5: parola RAR vine per-cerere, stocata CRIPTATA in submission pana la
primul login reusit pe cont, apoi stearsa; JWT 30h acopera restul.

- app/crypto.py: Fernet, cheie din AUTOPASS_creds_key (nesetata -> efemera la
  runtime, creds nu supravietuiesc restartului). encrypt/decrypt_creds.
- schema + migrare: submissions.rar_creds_enc (creds criptate).
- ingestie: cripteaza rar_credentials, le lipeste de fiecare submission nou.
  Niciodata in clar in DB.
- worker: AccountSessions (login per-cont cu creds decriptate, cache JWT in
  memorie, sterge creds-urile contului dupa primul login + refresh nomenclator).
  401 creds gresite -> error fara retry; token expirat -> invalidare + requeue;
  fara creds (restart) -> requeue "indisponibile" (ROAAUTO re-trimite).
  claim_one intoarce account_id + creds_enc; recover_orphans filtrabil pe cont.
- requirements: cryptography==46.0.5.

Nota: refresh nomenclator e acum lazy la primul login per-cont (nu la pornire);
seed-ul fallback acopera editorul offline.

10 teste noi (tests/test_creds_delivery.py). 95 pass total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 20:16:16 +00:00

253 lines
8.5 KiB
Python

"""Teste livrare creds per-cerere: criptare efemera + sesiuni worker per-cont.
Acopera: round-trip crypto, stocarea creds criptate la ingestie (niciodata in
clar), login per-cont cu stergere creds dupa primul login, fallback creds <test>,
401 creds gresite -> error fara retry.
"""
from __future__ import annotations
import json
import os
import tempfile
import pytest
from cryptography.fernet import Fernet
from fastapi.testclient import TestClient
from app.rar_client import RarAuthError
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
from app.config import get_settings
from app import crypto
get_settings.cache_clear()
crypto.reset_cache()
from app.db import get_connection, init_db
init_db()
yield monkeypatch
get_settings.cache_clear()
crypto.reset_cache()
# --------------------------------------------------------------------------- #
# Crypto round-trip #
# --------------------------------------------------------------------------- #
def test_crypto_roundtrip(env):
from app.crypto import decrypt_creds, encrypt_creds
creds = {"email": "x@y.ro", "password": "HUNTER2"}
tok = encrypt_creds(creds)
assert "HUNTER2" not in tok # criptat, nu in clar
assert decrypt_creds(tok) == creds
def test_crypto_bad_token_returns_none(env):
from app.crypto import decrypt_creds
assert decrypt_creds(None) is None
assert decrypt_creds("") is None
assert decrypt_creds("garbage-not-a-token") is None
def test_crypto_wrong_key_returns_none(env, monkeypatch):
from app import crypto
from app.crypto import encrypt_creds
tok = encrypt_creds({"email": "a", "password": "b"})
# Rotim cheia -> token-ul vechi nu se mai decripteaza (degradare acceptata).
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
from app.config import get_settings
get_settings.cache_clear()
crypto.reset_cache()
assert crypto.decrypt_creds(tok) is None
# --------------------------------------------------------------------------- #
# Ingestie: creds stocate criptat, niciodata in clar #
# --------------------------------------------------------------------------- #
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "SECRETPW"}, "prezentari": [prez]}
def test_ingestie_stocheaza_creds_criptate(env):
from app.crypto import decrypt_creds
from app.db import get_connection
from app.main import app
with TestClient(app) as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 200
sid = r.json()["results"][0]["submission_id"]
conn = get_connection()
try:
row = conn.execute("SELECT rar_creds_enc, payload_json FROM submissions WHERE id=?", (sid,)).fetchone()
finally:
conn.close()
# Creds criptate prezente, dar parola NU apare in clar nicaieri in rand.
assert row["rar_creds_enc"]
assert "SECRETPW" not in row["rar_creds_enc"]
assert "SECRETPW" not in row["payload_json"]
assert decrypt_creds(row["rar_creds_enc"]) == {"email": "x@y.ro", "password": "SECRETPW"}
# --------------------------------------------------------------------------- #
# Worker: sesiuni per-cont #
# --------------------------------------------------------------------------- #
class FakeRarClient:
"""Stub RarClient: login intoarce token determinist, get_nomenclator gol."""
made: list = []
def __init__(self, settings=None, login_exc=None):
self.closed = False
self.login_calls = 0
self._login_exc = login_exc
FakeRarClient.made.append(self)
def login(self, email, password):
self.login_calls += 1
if self._login_exc is not None:
raise self._login_exc
return f"TOK-{email}"
def get_nomenclator(self, token):
return []
def close(self):
self.closed = True
def _insert(conn, account_id=1, creds_enc=None, status="queued"):
content = {
"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
"data_prestatie": "2026-06-15", "odometru_final": "1",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
cur = conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_creds_enc) "
"VALUES (?, ?, ?, ?, ?)",
(f"k-{os.urandom(4).hex()}", account_id, status, json.dumps(content), creds_enc),
)
return int(cur.lastrowid)
def test_get_token_login_clears_creds(env, monkeypatch):
import app.worker.__main__ as w
from app.crypto import encrypt_creds
from app.db import get_connection
FakeRarClient.made = []
monkeypatch.setattr(w, "RarClient", FakeRarClient)
conn = get_connection()
try:
enc = encrypt_creds({"email": "a@b.ro", "password": "p"})
s1 = _insert(conn, account_id=1, creds_enc=enc)
s2 = _insert(conn, account_id=1, creds_enc=enc)
sessions = w.AccountSessions(w.get_settings())
token = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
assert token == "TOK-a@b.ro"
# Creds sterse pentru tot contul dupa primul login.
for sid in (s1, s2):
assert conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (sid,)).fetchone()["rar_creds_enc"] is None
# Token cache-uit: al doilea apel NU re-loghează.
assert sessions.get_token(conn, 1, None) == "TOK-a@b.ro"
assert FakeRarClient.made[0].login_calls == 1
finally:
conn.close()
def test_get_token_no_creds_returns_none(env, monkeypatch):
import app.worker.__main__ as w
from app.db import get_connection
monkeypatch.setattr(w, "RarClient", FakeRarClient)
conn = get_connection()
try:
sessions = w.AccountSessions(w.get_settings())
assert sessions.get_token(conn, 1, None) is None
finally:
conn.close()
def test_get_token_bad_creds_raises(env, monkeypatch):
import app.worker.__main__ as w
from app.db import get_connection
def _factory(settings=None):
return FakeRarClient(settings, login_exc=RarAuthError("Credentiale RAR invalide", status_code=401))
monkeypatch.setattr(w, "RarClient", _factory)
conn = get_connection()
try:
sessions = w.AccountSessions(w.get_settings())
with pytest.raises(RarAuthError):
sessions.get_token(conn, 1, {"email": "a", "password": "x"})
finally:
conn.close()
def test_per_account_isolation(env, monkeypatch):
import app.worker.__main__ as w
from app.db import get_connection
FakeRarClient.made = []
monkeypatch.setattr(w, "RarClient", FakeRarClient)
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'doi')")
sessions = w.AccountSessions(w.get_settings())
t1 = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
t2 = sessions.get_token(conn, 2, {"email": "c@d.ro", "password": "q"})
assert t1 == "TOK-a@b.ro" and t2 == "TOK-c@d.ro"
assert len(sessions.active()) == 2
sessions.invalidate(1)
assert len(sessions.active()) == 1
assert FakeRarClient.made[0].closed is True
finally:
conn.close()
def test_creds_for_fallback_test_creds(env, monkeypatch):
import app.worker.__main__ as w
# Fara enc + flag test on -> creds <test>.
monkeypatch.setattr(w, "load_test_credentials", lambda: {"email": "t@test", "password": "tp"})
settings = w.get_settings()
object.__setattr__(settings, "worker_use_test_creds", True)
assert w._creds_for({"creds_enc": None}, settings) == {"email": "t@test", "password": "tp"}
# Flag off -> None.
object.__setattr__(settings, "worker_use_test_creds", False)
assert w._creds_for({"creds_enc": None}, settings) is None
def test_creds_for_prefers_enc(env):
import app.worker.__main__ as w
from app.crypto import encrypt_creds
enc = encrypt_creds({"email": "real@x", "password": "rp"})
settings = w.get_settings()
assert w._creds_for({"creds_enc": enc}, settings) == {"email": "real@x", "password": "rp"}