Cod_prestatie necunoscut in nomenclator nu se mai trimite raw la RAR (HTTP 500 ORA-12899 + record partial FINALIZATA pe care reconcilierea il marca fals sent): e promovat la cod_op_service si tratat ca operatie de mapat. Optiune top-level boolean on_unmapped_error pe POST /v1/prezentari + /valideaza: - false (default) -> submission needs_mapping (intra in editor) - true -> respinge fara enqueue (status error, submission_id=null, erori) - None -> default per-cont accounts.on_unmapped_error_default (implicit 0) Inlocuieste enum-ul anterior on_unmapped (needs_mapping/error) cu un boolean mai simplu; coloana de cont migrata aditiv la INTEGER on_unmapped_error_default. Izolare teste de .env-ul de dezvoltare: tests/conftest.py fixeaza default sigur pe AUTOPASS_REQUIRE_API_KEY / AUTOPASS_WORKER_USE_TEST_CREDS (precedenta peste .env in pydantic-settings) + fixturile env din test_creds_delivery/test_t1 pineaza explicit aceste flag-uri, ca fallback-ul creds pe cont sa fie atins. Teste: 752 passed (fara flag pe CLI). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
292 lines
10 KiB
Python
292 lines
10 KiB
Python
"""Teste livrare creds per-cerere: criptare efemera + sesiuni worker per-cont.
|
|
|
|
Acopera: round-trip crypto, stocarea creds criptate la ingestie (niciodata in
|
|
clar), login per-cont cu stergere creds dupa primul login, fallback creds <test>,
|
|
401 creds gresite -> error fara retry.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
from cryptography.fernet import Fernet
|
|
from fastapi.testclient import TestClient
|
|
|
|
from app.rar_client import RarAuthError
|
|
|
|
|
|
@pytest.fixture()
|
|
def env(monkeypatch):
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
|
|
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
|
|
# Izolare de .env-ul de dezvoltare (creds <test> + cheie API): testele isi
|
|
# controleaza explicit aceste flag-uri, altfel fallback-ul pe cont nu se atinge.
|
|
monkeypatch.setenv("AUTOPASS_WORKER_USE_TEST_CREDS", "false")
|
|
monkeypatch.setenv("AUTOPASS_REQUIRE_API_KEY", "false")
|
|
from app.config import get_settings
|
|
from app import crypto
|
|
|
|
get_settings.cache_clear()
|
|
crypto.reset_cache()
|
|
from app.db import get_connection, init_db
|
|
|
|
init_db()
|
|
yield monkeypatch
|
|
get_settings.cache_clear()
|
|
crypto.reset_cache()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Crypto round-trip #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def test_crypto_roundtrip(env):
|
|
from app.crypto import decrypt_creds, encrypt_creds
|
|
|
|
creds = {"email": "x@y.ro", "password": "HUNTER2"}
|
|
tok = encrypt_creds(creds)
|
|
assert "HUNTER2" not in tok # criptat, nu in clar
|
|
assert decrypt_creds(tok) == creds
|
|
|
|
|
|
def test_crypto_bad_token_returns_none(env):
|
|
from app.crypto import decrypt_creds
|
|
|
|
assert decrypt_creds(None) is None
|
|
assert decrypt_creds("") is None
|
|
assert decrypt_creds("garbage-not-a-token") is None
|
|
|
|
|
|
def test_crypto_wrong_key_returns_none(env, monkeypatch):
|
|
from app import crypto
|
|
from app.crypto import encrypt_creds
|
|
|
|
tok = encrypt_creds({"email": "a", "password": "b"})
|
|
# Rotim cheia -> token-ul vechi nu se mai decripteaza (degradare acceptata).
|
|
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
|
|
from app.config import get_settings
|
|
|
|
get_settings.cache_clear()
|
|
crypto.reset_cache()
|
|
assert crypto.decrypt_creds(tok) is None
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Ingestie: creds stocate criptat, niciodata in clar #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def _body(**over):
|
|
prez = {
|
|
"vin": "WVWZZZ1KZAW000123",
|
|
"nr_inmatriculare": "B999TST",
|
|
"data_prestatie": "2026-06-15",
|
|
"odometru_final": "123456",
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}
|
|
prez.update(over)
|
|
return {"rar_credentials": {"email": "x@y.ro", "password": "SECRETPW"}, "prezentari": [prez]}
|
|
|
|
|
|
def test_ingestie_stocheaza_creds_criptate(env):
|
|
from app.crypto import decrypt_creds
|
|
from app.db import get_connection
|
|
from app.main import app
|
|
|
|
with TestClient(app) as c:
|
|
r = c.post("/v1/prezentari", json=_body())
|
|
assert r.status_code == 200
|
|
sid = r.json()["results"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
row = conn.execute("SELECT rar_creds_enc, payload_json FROM submissions WHERE id=?", (sid,)).fetchone()
|
|
finally:
|
|
conn.close()
|
|
# Creds criptate prezente, dar parola NU apare in clar nicaieri in rand.
|
|
assert row["rar_creds_enc"]
|
|
assert "SECRETPW" not in row["rar_creds_enc"]
|
|
assert "SECRETPW" not in row["payload_json"]
|
|
assert decrypt_creds(row["rar_creds_enc"]) == {"email": "x@y.ro", "password": "SECRETPW"}
|
|
|
|
|
|
def test_ingestie_fara_creds_foloseste_contul(env):
|
|
"""POST /v1/prezentari fara rar_credentials -> submission fara creds efemere;
|
|
worker-ul cade pe creds-urile durabile ale contului (accounts.rar_creds_enc)."""
|
|
import app.worker.__main__ as w
|
|
from app.db import get_connection
|
|
from app.main import app
|
|
|
|
with TestClient(app) as c:
|
|
# Contul (id=1 in dev) isi salveaza creds RAR durabile o data.
|
|
r0 = c.post("/v1/conturi/rar-creds", json={"email": "web@y.ro", "password": "WEBPW"})
|
|
assert r0.status_code == 200
|
|
|
|
# Trimitere FARA rar_credentials (doar payload). Identificarea contului
|
|
# ramane pe API key / sesiune; creds RAR nu mai sunt necesare in cerere.
|
|
body = {"prezentari": [{
|
|
"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B999TST",
|
|
"data_prestatie": "2026-06-15", "odometru_final": "123456",
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}]}
|
|
r = c.post("/v1/prezentari", json=body)
|
|
assert r.status_code == 200, r.text
|
|
sid = r.json()["results"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
row = conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (sid,)).fetchone()
|
|
# Submission-ul nu poarta creds efemere...
|
|
assert row["rar_creds_enc"] is None
|
|
# ...iar lantul de rezolvare al worker-ului ia creds din cont.
|
|
creds = w._creds_for({"creds_enc": None}, w.get_settings()) or w._creds_from_account(conn, 1)
|
|
assert creds == {"email": "web@y.ro", "password": "WEBPW"}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Worker: sesiuni per-cont #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
class FakeRarClient:
|
|
"""Stub RarClient: login intoarce token determinist, get_nomenclator gol."""
|
|
|
|
made: list = []
|
|
|
|
def __init__(self, settings=None, login_exc=None):
|
|
self.closed = False
|
|
self.login_calls = 0
|
|
self._login_exc = login_exc
|
|
FakeRarClient.made.append(self)
|
|
|
|
def login(self, email, password):
|
|
self.login_calls += 1
|
|
if self._login_exc is not None:
|
|
raise self._login_exc
|
|
return f"TOK-{email}"
|
|
|
|
def get_nomenclator(self, token):
|
|
return []
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
def _insert(conn, account_id=1, creds_enc=None, status="queued"):
|
|
content = {
|
|
"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
|
|
"data_prestatie": "2026-06-15", "odometru_final": "1",
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}
|
|
cur = conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_creds_enc) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(f"k-{os.urandom(4).hex()}", account_id, status, json.dumps(content), creds_enc),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def test_get_token_login_clears_creds(env, monkeypatch):
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
from app.db import get_connection
|
|
|
|
FakeRarClient.made = []
|
|
monkeypatch.setattr(w, "RarClient", FakeRarClient)
|
|
|
|
conn = get_connection()
|
|
try:
|
|
enc = encrypt_creds({"email": "a@b.ro", "password": "p"})
|
|
s1 = _insert(conn, account_id=1, creds_enc=enc)
|
|
s2 = _insert(conn, account_id=1, creds_enc=enc)
|
|
sessions = w.AccountSessions(w.get_settings())
|
|
|
|
token = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
|
|
assert token == "TOK-a@b.ro"
|
|
# Creds sterse pentru tot contul dupa primul login.
|
|
for sid in (s1, s2):
|
|
assert conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (sid,)).fetchone()["rar_creds_enc"] is None
|
|
# Token cache-uit: al doilea apel NU re-loghează.
|
|
assert sessions.get_token(conn, 1, None) == "TOK-a@b.ro"
|
|
assert FakeRarClient.made[0].login_calls == 1
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_get_token_no_creds_returns_none(env, monkeypatch):
|
|
import app.worker.__main__ as w
|
|
from app.db import get_connection
|
|
|
|
monkeypatch.setattr(w, "RarClient", FakeRarClient)
|
|
conn = get_connection()
|
|
try:
|
|
sessions = w.AccountSessions(w.get_settings())
|
|
assert sessions.get_token(conn, 1, None) is None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_get_token_bad_creds_raises(env, monkeypatch):
|
|
import app.worker.__main__ as w
|
|
from app.db import get_connection
|
|
|
|
def _factory(settings=None):
|
|
return FakeRarClient(settings, login_exc=RarAuthError("Credentiale RAR invalide", status_code=401))
|
|
|
|
monkeypatch.setattr(w, "RarClient", _factory)
|
|
conn = get_connection()
|
|
try:
|
|
sessions = w.AccountSessions(w.get_settings())
|
|
with pytest.raises(RarAuthError):
|
|
sessions.get_token(conn, 1, {"email": "a", "password": "x"})
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_per_account_isolation(env, monkeypatch):
|
|
import app.worker.__main__ as w
|
|
from app.db import get_connection
|
|
|
|
FakeRarClient.made = []
|
|
monkeypatch.setattr(w, "RarClient", FakeRarClient)
|
|
conn = get_connection()
|
|
try:
|
|
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'doi')")
|
|
sessions = w.AccountSessions(w.get_settings())
|
|
t1 = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
|
|
t2 = sessions.get_token(conn, 2, {"email": "c@d.ro", "password": "q"})
|
|
assert t1 == "TOK-a@b.ro" and t2 == "TOK-c@d.ro"
|
|
assert len(sessions.active()) == 2
|
|
sessions.invalidate(1)
|
|
assert len(sessions.active()) == 1
|
|
assert FakeRarClient.made[0].closed is True
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_creds_for_fallback_test_creds(env, monkeypatch):
|
|
import app.worker.__main__ as w
|
|
|
|
# Fara enc + flag test on -> creds <test>.
|
|
monkeypatch.setattr(w, "load_test_credentials", lambda: {"email": "t@test", "password": "tp"})
|
|
settings = w.get_settings()
|
|
object.__setattr__(settings, "worker_use_test_creds", True)
|
|
assert w._creds_for({"creds_enc": None}, settings) == {"email": "t@test", "password": "tp"}
|
|
|
|
# Flag off -> None.
|
|
object.__setattr__(settings, "worker_use_test_creds", False)
|
|
assert w._creds_for({"creds_enc": None}, settings) is None
|
|
|
|
|
|
def test_creds_for_prefers_enc(env):
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
|
|
enc = encrypt_creds({"email": "real@x", "password": "rp"})
|
|
settings = w.get_settings()
|
|
assert w._creds_for({"creds_enc": enc}, settings) == {"email": "real@x", "password": "rp"}
|