"""Teste livrare creds per-cerere: criptare efemera + sesiuni worker per-cont. Acopera: round-trip crypto, stocarea creds criptate la ingestie (niciodata in clar), login per-cont cu stergere creds dupa primul login, fallback creds , 401 creds gresite -> error fara retry. """ from __future__ import annotations import json import os import tempfile import pytest from cryptography.fernet import Fernet from fastapi.testclient import TestClient from app.rar_client import RarAuthError @pytest.fixture() def env(monkeypatch): tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode()) from app.config import get_settings from app import crypto get_settings.cache_clear() crypto.reset_cache() from app.db import get_connection, init_db init_db() yield monkeypatch get_settings.cache_clear() crypto.reset_cache() # --------------------------------------------------------------------------- # # Crypto round-trip # # --------------------------------------------------------------------------- # def test_crypto_roundtrip(env): from app.crypto import decrypt_creds, encrypt_creds creds = {"email": "x@y.ro", "password": "HUNTER2"} tok = encrypt_creds(creds) assert "HUNTER2" not in tok # criptat, nu in clar assert decrypt_creds(tok) == creds def test_crypto_bad_token_returns_none(env): from app.crypto import decrypt_creds assert decrypt_creds(None) is None assert decrypt_creds("") is None assert decrypt_creds("garbage-not-a-token") is None def test_crypto_wrong_key_returns_none(env, monkeypatch): from app import crypto from app.crypto import encrypt_creds tok = encrypt_creds({"email": "a", "password": "b"}) # Rotim cheia -> token-ul vechi nu se mai decripteaza (degradare acceptata). monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode()) from app.config import get_settings get_settings.cache_clear() crypto.reset_cache() assert crypto.decrypt_creds(tok) is None # --------------------------------------------------------------------------- # # Ingestie: creds stocate criptat, niciodata in clar # # --------------------------------------------------------------------------- # def _body(**over): prez = { "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B999TST", "data_prestatie": "2026-06-15", "odometru_final": "123456", "prestatii": [{"cod_prestatie": "OE-1"}], } prez.update(over) return {"rar_credentials": {"email": "x@y.ro", "password": "SECRETPW"}, "prezentari": [prez]} def test_ingestie_stocheaza_creds_criptate(env): from app.crypto import decrypt_creds from app.db import get_connection from app.main import app with TestClient(app) as c: r = c.post("/v1/prezentari", json=_body()) assert r.status_code == 200 sid = r.json()["results"][0]["submission_id"] conn = get_connection() try: row = conn.execute("SELECT rar_creds_enc, payload_json FROM submissions WHERE id=?", (sid,)).fetchone() finally: conn.close() # Creds criptate prezente, dar parola NU apare in clar nicaieri in rand. assert row["rar_creds_enc"] assert "SECRETPW" not in row["rar_creds_enc"] assert "SECRETPW" not in row["payload_json"] assert decrypt_creds(row["rar_creds_enc"]) == {"email": "x@y.ro", "password": "SECRETPW"} def test_ingestie_fara_creds_foloseste_contul(env): """POST /v1/prezentari fara rar_credentials -> submission fara creds efemere; worker-ul cade pe creds-urile durabile ale contului (accounts.rar_creds_enc).""" import app.worker.__main__ as w from app.db import get_connection from app.main import app with TestClient(app) as c: # Contul (id=1 in dev) isi salveaza creds RAR durabile o data. r0 = c.post("/v1/conturi/rar-creds", json={"email": "web@y.ro", "password": "WEBPW"}) assert r0.status_code == 200 # Trimitere FARA rar_credentials (doar payload). Identificarea contului # ramane pe API key / sesiune; creds RAR nu mai sunt necesare in cerere. body = {"prezentari": [{ "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B999TST", "data_prestatie": "2026-06-15", "odometru_final": "123456", "prestatii": [{"cod_prestatie": "OE-1"}], }]} r = c.post("/v1/prezentari", json=body) assert r.status_code == 200, r.text sid = r.json()["results"][0]["submission_id"] conn = get_connection() try: row = conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (sid,)).fetchone() # Submission-ul nu poarta creds efemere... assert row["rar_creds_enc"] is None # ...iar lantul de rezolvare al worker-ului ia creds din cont. creds = w._creds_for({"creds_enc": None}, w.get_settings()) or w._creds_from_account(conn, 1) assert creds == {"email": "web@y.ro", "password": "WEBPW"} finally: conn.close() # --------------------------------------------------------------------------- # # Worker: sesiuni per-cont # # --------------------------------------------------------------------------- # class FakeRarClient: """Stub RarClient: login intoarce token determinist, get_nomenclator gol.""" made: list = [] def __init__(self, settings=None, login_exc=None): self.closed = False self.login_calls = 0 self._login_exc = login_exc FakeRarClient.made.append(self) def login(self, email, password): self.login_calls += 1 if self._login_exc is not None: raise self._login_exc return f"TOK-{email}" def get_nomenclator(self, token): return [] def close(self): self.closed = True def _insert(conn, account_id=1, creds_enc=None, status="queued"): content = { "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1", "data_prestatie": "2026-06-15", "odometru_final": "1", "prestatii": [{"cod_prestatie": "OE-1"}], } cur = conn.execute( "INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_creds_enc) " "VALUES (?, ?, ?, ?, ?)", (f"k-{os.urandom(4).hex()}", account_id, status, json.dumps(content), creds_enc), ) return int(cur.lastrowid) def test_get_token_login_clears_creds(env, monkeypatch): import app.worker.__main__ as w from app.crypto import encrypt_creds from app.db import get_connection FakeRarClient.made = [] monkeypatch.setattr(w, "RarClient", FakeRarClient) conn = get_connection() try: enc = encrypt_creds({"email": "a@b.ro", "password": "p"}) s1 = _insert(conn, account_id=1, creds_enc=enc) s2 = _insert(conn, account_id=1, creds_enc=enc) sessions = w.AccountSessions(w.get_settings()) token = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"}) assert token == "TOK-a@b.ro" # Creds sterse pentru tot contul dupa primul login. for sid in (s1, s2): assert conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (sid,)).fetchone()["rar_creds_enc"] is None # Token cache-uit: al doilea apel NU re-loghează. assert sessions.get_token(conn, 1, None) == "TOK-a@b.ro" assert FakeRarClient.made[0].login_calls == 1 finally: conn.close() def test_get_token_no_creds_returns_none(env, monkeypatch): import app.worker.__main__ as w from app.db import get_connection monkeypatch.setattr(w, "RarClient", FakeRarClient) conn = get_connection() try: sessions = w.AccountSessions(w.get_settings()) assert sessions.get_token(conn, 1, None) is None finally: conn.close() def test_get_token_bad_creds_raises(env, monkeypatch): import app.worker.__main__ as w from app.db import get_connection def _factory(settings=None): return FakeRarClient(settings, login_exc=RarAuthError("Credentiale RAR invalide", status_code=401)) monkeypatch.setattr(w, "RarClient", _factory) conn = get_connection() try: sessions = w.AccountSessions(w.get_settings()) with pytest.raises(RarAuthError): sessions.get_token(conn, 1, {"email": "a", "password": "x"}) finally: conn.close() def test_per_account_isolation(env, monkeypatch): import app.worker.__main__ as w from app.db import get_connection FakeRarClient.made = [] monkeypatch.setattr(w, "RarClient", FakeRarClient) conn = get_connection() try: conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'doi')") sessions = w.AccountSessions(w.get_settings()) t1 = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"}) t2 = sessions.get_token(conn, 2, {"email": "c@d.ro", "password": "q"}) assert t1 == "TOK-a@b.ro" and t2 == "TOK-c@d.ro" assert len(sessions.active()) == 2 sessions.invalidate(1) assert len(sessions.active()) == 1 assert FakeRarClient.made[0].closed is True finally: conn.close() def test_creds_for_fallback_test_creds(env, monkeypatch): import app.worker.__main__ as w # Fara enc + flag test on -> creds . monkeypatch.setattr(w, "load_test_credentials", lambda: {"email": "t@test", "password": "tp"}) settings = w.get_settings() object.__setattr__(settings, "worker_use_test_creds", True) assert w._creds_for({"creds_enc": None}, settings) == {"email": "t@test", "password": "tp"} # Flag off -> None. object.__setattr__(settings, "worker_use_test_creds", False) assert w._creds_for({"creds_enc": None}, settings) is None def test_creds_for_prefers_enc(env): import app.worker.__main__ as w from app.crypto import encrypt_creds enc = encrypt_creds({"email": "real@x", "password": "rp"}) settings = w.get_settings() assert w._creds_for({"creds_enc": enc}, settings) == {"email": "real@x", "password": "rp"}