feat(creds): livrare creds per-cerere la worker (criptat efemer + sesiuni per-cont)
Plan sect.5: parola RAR vine per-cerere, stocata CRIPTATA in submission pana la primul login reusit pe cont, apoi stearsa; JWT 30h acopera restul. - app/crypto.py: Fernet, cheie din AUTOPASS_creds_key (nesetata -> efemera la runtime, creds nu supravietuiesc restartului). encrypt/decrypt_creds. - schema + migrare: submissions.rar_creds_enc (creds criptate). - ingestie: cripteaza rar_credentials, le lipeste de fiecare submission nou. Niciodata in clar in DB. - worker: AccountSessions (login per-cont cu creds decriptate, cache JWT in memorie, sterge creds-urile contului dupa primul login + refresh nomenclator). 401 creds gresite -> error fara retry; token expirat -> invalidare + requeue; fara creds (restart) -> requeue "indisponibile" (ROAAUTO re-trimite). claim_one intoarce account_id + creds_enc; recover_orphans filtrabil pe cont. - requirements: cryptography==46.0.5. Nota: refresh nomenclator e acum lazy la primul login per-cont (nu la pornire); seed-ul fallback acopera editorul offline. 10 teste noi (tests/test_creds_delivery.py). 95 pass total. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
252
tests/test_creds_delivery.py
Normal file
252
tests/test_creds_delivery.py
Normal file
@@ -0,0 +1,252 @@
|
||||
"""Teste livrare creds per-cerere: criptare efemera + sesiuni worker per-cont.
|
||||
|
||||
Acopera: round-trip crypto, stocarea creds criptate la ingestie (niciodata in
|
||||
clar), login per-cont cu stergere creds dupa primul login, fallback creds <test>,
|
||||
401 creds gresite -> error fara retry.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
from cryptography.fernet import Fernet
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.rar_client import RarAuthError
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def env(monkeypatch):
|
||||
tmp = tempfile.mkdtemp()
|
||||
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
|
||||
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
|
||||
from app.config import get_settings
|
||||
from app import crypto
|
||||
|
||||
get_settings.cache_clear()
|
||||
crypto.reset_cache()
|
||||
from app.db import get_connection, init_db
|
||||
|
||||
init_db()
|
||||
yield monkeypatch
|
||||
get_settings.cache_clear()
|
||||
crypto.reset_cache()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Crypto round-trip #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_crypto_roundtrip(env):
|
||||
from app.crypto import decrypt_creds, encrypt_creds
|
||||
|
||||
creds = {"email": "x@y.ro", "password": "HUNTER2"}
|
||||
tok = encrypt_creds(creds)
|
||||
assert "HUNTER2" not in tok # criptat, nu in clar
|
||||
assert decrypt_creds(tok) == creds
|
||||
|
||||
|
||||
def test_crypto_bad_token_returns_none(env):
|
||||
from app.crypto import decrypt_creds
|
||||
|
||||
assert decrypt_creds(None) is None
|
||||
assert decrypt_creds("") is None
|
||||
assert decrypt_creds("garbage-not-a-token") is None
|
||||
|
||||
|
||||
def test_crypto_wrong_key_returns_none(env, monkeypatch):
|
||||
from app import crypto
|
||||
from app.crypto import encrypt_creds
|
||||
|
||||
tok = encrypt_creds({"email": "a", "password": "b"})
|
||||
# Rotim cheia -> token-ul vechi nu se mai decripteaza (degradare acceptata).
|
||||
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
|
||||
from app.config import get_settings
|
||||
|
||||
get_settings.cache_clear()
|
||||
crypto.reset_cache()
|
||||
assert crypto.decrypt_creds(tok) is None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Ingestie: creds stocate criptat, niciodata in clar #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def _body(**over):
|
||||
prez = {
|
||||
"vin": "WVWZZZ1KZAW000123",
|
||||
"nr_inmatriculare": "B999TST",
|
||||
"data_prestatie": "2026-06-15",
|
||||
"odometru_final": "123456",
|
||||
"prestatii": [{"cod_prestatie": "OE-1"}],
|
||||
}
|
||||
prez.update(over)
|
||||
return {"rar_credentials": {"email": "x@y.ro", "password": "SECRETPW"}, "prezentari": [prez]}
|
||||
|
||||
|
||||
def test_ingestie_stocheaza_creds_criptate(env):
|
||||
from app.crypto import decrypt_creds
|
||||
from app.db import get_connection
|
||||
from app.main import app
|
||||
|
||||
with TestClient(app) as c:
|
||||
r = c.post("/v1/prezentari", json=_body())
|
||||
assert r.status_code == 200
|
||||
sid = r.json()["results"][0]["submission_id"]
|
||||
|
||||
conn = get_connection()
|
||||
try:
|
||||
row = conn.execute("SELECT rar_creds_enc, payload_json FROM submissions WHERE id=?", (sid,)).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
# Creds criptate prezente, dar parola NU apare in clar nicaieri in rand.
|
||||
assert row["rar_creds_enc"]
|
||||
assert "SECRETPW" not in row["rar_creds_enc"]
|
||||
assert "SECRETPW" not in row["payload_json"]
|
||||
assert decrypt_creds(row["rar_creds_enc"]) == {"email": "x@y.ro", "password": "SECRETPW"}
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Worker: sesiuni per-cont #
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
class FakeRarClient:
|
||||
"""Stub RarClient: login intoarce token determinist, get_nomenclator gol."""
|
||||
|
||||
made: list = []
|
||||
|
||||
def __init__(self, settings=None, login_exc=None):
|
||||
self.closed = False
|
||||
self.login_calls = 0
|
||||
self._login_exc = login_exc
|
||||
FakeRarClient.made.append(self)
|
||||
|
||||
def login(self, email, password):
|
||||
self.login_calls += 1
|
||||
if self._login_exc is not None:
|
||||
raise self._login_exc
|
||||
return f"TOK-{email}"
|
||||
|
||||
def get_nomenclator(self, token):
|
||||
return []
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
|
||||
def _insert(conn, account_id=1, creds_enc=None, status="queued"):
|
||||
content = {
|
||||
"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
|
||||
"data_prestatie": "2026-06-15", "odometru_final": "1",
|
||||
"prestatii": [{"cod_prestatie": "OE-1"}],
|
||||
}
|
||||
cur = conn.execute(
|
||||
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_creds_enc) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(f"k-{os.urandom(4).hex()}", account_id, status, json.dumps(content), creds_enc),
|
||||
)
|
||||
return int(cur.lastrowid)
|
||||
|
||||
|
||||
def test_get_token_login_clears_creds(env, monkeypatch):
|
||||
import app.worker.__main__ as w
|
||||
from app.crypto import encrypt_creds
|
||||
from app.db import get_connection
|
||||
|
||||
FakeRarClient.made = []
|
||||
monkeypatch.setattr(w, "RarClient", FakeRarClient)
|
||||
|
||||
conn = get_connection()
|
||||
try:
|
||||
enc = encrypt_creds({"email": "a@b.ro", "password": "p"})
|
||||
s1 = _insert(conn, account_id=1, creds_enc=enc)
|
||||
s2 = _insert(conn, account_id=1, creds_enc=enc)
|
||||
sessions = w.AccountSessions(w.get_settings())
|
||||
|
||||
token = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
|
||||
assert token == "TOK-a@b.ro"
|
||||
# Creds sterse pentru tot contul dupa primul login.
|
||||
for sid in (s1, s2):
|
||||
assert conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (sid,)).fetchone()["rar_creds_enc"] is None
|
||||
# Token cache-uit: al doilea apel NU re-loghează.
|
||||
assert sessions.get_token(conn, 1, None) == "TOK-a@b.ro"
|
||||
assert FakeRarClient.made[0].login_calls == 1
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_get_token_no_creds_returns_none(env, monkeypatch):
|
||||
import app.worker.__main__ as w
|
||||
from app.db import get_connection
|
||||
|
||||
monkeypatch.setattr(w, "RarClient", FakeRarClient)
|
||||
conn = get_connection()
|
||||
try:
|
||||
sessions = w.AccountSessions(w.get_settings())
|
||||
assert sessions.get_token(conn, 1, None) is None
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_get_token_bad_creds_raises(env, monkeypatch):
|
||||
import app.worker.__main__ as w
|
||||
from app.db import get_connection
|
||||
|
||||
def _factory(settings=None):
|
||||
return FakeRarClient(settings, login_exc=RarAuthError("Credentiale RAR invalide", status_code=401))
|
||||
|
||||
monkeypatch.setattr(w, "RarClient", _factory)
|
||||
conn = get_connection()
|
||||
try:
|
||||
sessions = w.AccountSessions(w.get_settings())
|
||||
with pytest.raises(RarAuthError):
|
||||
sessions.get_token(conn, 1, {"email": "a", "password": "x"})
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_per_account_isolation(env, monkeypatch):
|
||||
import app.worker.__main__ as w
|
||||
from app.db import get_connection
|
||||
|
||||
FakeRarClient.made = []
|
||||
monkeypatch.setattr(w, "RarClient", FakeRarClient)
|
||||
conn = get_connection()
|
||||
try:
|
||||
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'doi')")
|
||||
sessions = w.AccountSessions(w.get_settings())
|
||||
t1 = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
|
||||
t2 = sessions.get_token(conn, 2, {"email": "c@d.ro", "password": "q"})
|
||||
assert t1 == "TOK-a@b.ro" and t2 == "TOK-c@d.ro"
|
||||
assert len(sessions.active()) == 2
|
||||
sessions.invalidate(1)
|
||||
assert len(sessions.active()) == 1
|
||||
assert FakeRarClient.made[0].closed is True
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_creds_for_fallback_test_creds(env, monkeypatch):
|
||||
import app.worker.__main__ as w
|
||||
|
||||
# Fara enc + flag test on -> creds <test>.
|
||||
monkeypatch.setattr(w, "load_test_credentials", lambda: {"email": "t@test", "password": "tp"})
|
||||
settings = w.get_settings()
|
||||
object.__setattr__(settings, "worker_use_test_creds", True)
|
||||
assert w._creds_for({"creds_enc": None}, settings) == {"email": "t@test", "password": "tp"}
|
||||
|
||||
# Flag off -> None.
|
||||
object.__setattr__(settings, "worker_use_test_creds", False)
|
||||
assert w._creds_for({"creds_enc": None}, settings) is None
|
||||
|
||||
|
||||
def test_creds_for_prefers_enc(env):
|
||||
import app.worker.__main__ as w
|
||||
from app.crypto import encrypt_creds
|
||||
|
||||
enc = encrypt_creds({"email": "real@x", "password": "rp"})
|
||||
settings = w.get_settings()
|
||||
assert w._creds_for({"creds_enc": enc}, settings) == {"email": "real@x", "password": "rp"}
|
||||
Reference in New Issue
Block a user