- worker: _creds_from_account(conn, account_id) — fallback la accounts.rar_creds_enc cand submission n-are creds (canal web fara re-pusher, restart worker) - run(): creds = _creds_for(claimed, settings) OR _creds_from_account(conn, account_id) - gate purjare (Voce#5): comentariu explicit — sterge DOAR submissions.rar_creds_enc, NU accounts.rar_creds_enc (inofensiv pt canal web, neatins pt canal API) - POST /v1/conturi/rar-creds: seteaza creds durabile criptate Fernet per cont - DELETE /v1/conturi/rar-creds: revenire la modelul efemer Treapta 1 - 7 teste: fallback, restart, coada mixta, endpoint set/delete, gate purjare Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
257 lines
8.9 KiB
Python
257 lines
8.9 KiB
Python
"""Teste T1: accounts.rar_creds_enc durabile + worker re-login fallback + gate purjare.
|
|
|
|
Verify:
|
|
(a) Serie web, worker restart (sesiune goala), token expirat -> re-login din accounts -> trimite.
|
|
(b) Coada MIXTA API(efemer)+web(durabil): dupa login web, submission-urile API tot se trimit
|
|
(purjarea nu le-a rupt prematur).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
from cryptography.fernet import Fernet
|
|
|
|
|
|
@pytest.fixture()
|
|
def env(monkeypatch):
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t1.db"))
|
|
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
|
|
from app.config import get_settings
|
|
from app import crypto
|
|
get_settings.cache_clear()
|
|
crypto.reset_cache()
|
|
from app.db import init_db
|
|
init_db()
|
|
yield monkeypatch
|
|
get_settings.cache_clear()
|
|
crypto.reset_cache()
|
|
|
|
|
|
class FakeRar:
|
|
"""Stub RarClient pentru teste."""
|
|
|
|
def __init__(self, settings=None):
|
|
self.login_calls = 0
|
|
self.closed = False
|
|
|
|
def login(self, email, password):
|
|
self.login_calls += 1
|
|
return f"TOK-{email}"
|
|
|
|
def get_nomenclator(self, token):
|
|
return []
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
def _insert(conn, account_id=1, creds_enc=None, status="queued", key_suffix=""):
|
|
content = {
|
|
"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
|
|
"data_prestatie": "2026-06-15", "odometru_final": "1",
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}
|
|
suffix = key_suffix or os.urandom(4).hex()
|
|
cur = conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_creds_enc) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(f"k-{suffix}", account_id, status, json.dumps(content), creds_enc),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
# --- (a) re-login din accounts dupa restart ---
|
|
|
|
def test_creds_from_account_fallback(env, monkeypatch):
|
|
"""Worker re-citeste creds din accounts daca submission n-are creds_enc."""
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
from app.db import get_connection
|
|
|
|
monkeypatch.setattr(w, "RarClient", FakeRar)
|
|
conn = get_connection()
|
|
try:
|
|
enc = encrypt_creds({"email": "web@test.ro", "password": "webpass"})
|
|
conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (enc,))
|
|
|
|
# Submission web fara creds_enc (ex: dupa ce s-au purjat)
|
|
_insert(conn, account_id=1, creds_enc=None)
|
|
|
|
# _creds_from_account trebuie sa returneze creds
|
|
creds = w._creds_from_account(conn, 1)
|
|
assert creds == {"email": "web@test.ro", "password": "webpass"}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_creds_from_account_no_creds(env):
|
|
"""Cont fara rar_creds_enc -> None (canal API pur, neatins)."""
|
|
import app.worker.__main__ as w
|
|
from app.db import get_connection
|
|
|
|
conn = get_connection()
|
|
try:
|
|
assert w._creds_from_account(conn, 1) is None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_worker_relogin_dupa_restart(env, monkeypatch):
|
|
"""(a) Worker restart: sesiune goala, submission fara creds -> re-login din accounts."""
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
from app.db import get_connection
|
|
|
|
FakeRar.login_calls_total = 0
|
|
monkeypatch.setattr(w, "RarClient", FakeRar)
|
|
|
|
conn = get_connection()
|
|
try:
|
|
enc = encrypt_creds({"email": "web@test.ro", "password": "webpass"})
|
|
conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (enc,))
|
|
|
|
# Submission web fara creds (creds deja purjate de primul login)
|
|
_insert(conn, account_id=1, creds_enc=None)
|
|
|
|
# Sesiune noua (simuleaza restart) — cache gol
|
|
sessions = w.AccountSessions(w.get_settings())
|
|
assert sessions.get_token(conn, 1, None) is None # fara creds directe
|
|
|
|
# Creds din account -> login posibil
|
|
creds = w._creds_from_account(conn, 1)
|
|
assert creds is not None
|
|
token = sessions.get_token(conn, 1, creds)
|
|
assert token == "TOK-web@test.ro"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --- (b) coada MIXTA API+web ---
|
|
|
|
def test_coada_mixta_api_web(env, monkeypatch):
|
|
"""(b) Coada mixta: dupa login web, submission-urile API (efemere) tot se trimit.
|
|
|
|
Scenariul:
|
|
1. S1 = submission API cu creds efemere in submission.rar_creds_enc
|
|
2. S2 = submission WEB fara creds (foloseste accounts.rar_creds_enc)
|
|
3. Login cu creds S1 -> purjare S1.rar_creds_enc -> OK (worker are token)
|
|
4. S2 tot se poate procesa (creds din accounts)
|
|
"""
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
from app.db import get_connection
|
|
|
|
monkeypatch.setattr(w, "RarClient", FakeRar)
|
|
|
|
conn = get_connection()
|
|
try:
|
|
# Creds durabile pentru contul web
|
|
enc_web = encrypt_creds({"email": "web@test.ro", "password": "webpass"})
|
|
conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (enc_web,))
|
|
|
|
# S1: canal API cu creds efemere
|
|
enc_api = encrypt_creds({"email": "api@test.ro", "password": "apipass"})
|
|
s1 = _insert(conn, account_id=1, creds_enc=enc_api, key_suffix="api1")
|
|
# S2: canal web fara creds in submission
|
|
s2 = _insert(conn, account_id=1, creds_enc=None, key_suffix="web1")
|
|
|
|
sessions = w.AccountSessions(w.get_settings())
|
|
|
|
# Procesare S1: login cu creds API -> purjare rar_creds_enc pe TOATE submission-urile contului
|
|
creds_s1 = w._creds_for({"creds_enc": enc_api}, w.get_settings())
|
|
assert creds_s1 is not None
|
|
sessions.get_token(conn, 1, creds_s1) # login + purjare
|
|
|
|
# Verifica purjarea: S1.rar_creds_enc = NULL acum
|
|
row_s1 = conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (s1,)).fetchone()
|
|
assert row_s1["rar_creds_enc"] is None, "creds efemere trebuie sterse dupa login"
|
|
|
|
# S2 nu mai are creds in submission (nici nu a avut); fallback la accounts
|
|
creds_s2 = w._creds_for({"creds_enc": None}, w.get_settings()) or w._creds_from_account(conn, 1)
|
|
assert creds_s2 == {"email": "web@test.ro", "password": "webpass"}, \
|
|
"S2 trebuie sa ia creds din accounts.rar_creds_enc"
|
|
|
|
# accounts.rar_creds_enc NU a fost sters de purjare
|
|
row_acc = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=1").fetchone()
|
|
assert row_acc["rar_creds_enc"] is not None, \
|
|
"accounts.rar_creds_enc trebuie sa ramana dupa purjare submissions"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --- Endpoint API set/delete rar-creds ---
|
|
|
|
@pytest.fixture()
|
|
def client(env):
|
|
from app.main import app
|
|
from fastapi.testclient import TestClient
|
|
with TestClient(app) as c:
|
|
yield c
|
|
|
|
|
|
def test_endpoint_set_rar_creds(client, env):
|
|
"""POST /v1/conturi/rar-creds seteaza creds criptate in accounts."""
|
|
from app.crypto import decrypt_creds
|
|
from app.db import get_connection
|
|
|
|
r = client.post("/v1/conturi/rar-creds", json={"email": "u@test.ro", "password": "pass123"})
|
|
assert r.status_code == 200
|
|
assert r.json()["ok"] is True
|
|
|
|
conn = get_connection()
|
|
try:
|
|
row = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=1").fetchone()
|
|
assert row["rar_creds_enc"] is not None
|
|
creds = decrypt_creds(row["rar_creds_enc"])
|
|
assert creds == {"email": "u@test.ro", "password": "pass123"}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_endpoint_delete_rar_creds(client, env):
|
|
"""DELETE /v1/conturi/rar-creds sterge creds durabile."""
|
|
# Mai intai seteaza
|
|
client.post("/v1/conturi/rar-creds", json={"email": "u@test.ro", "password": "pass123"})
|
|
# Sterge
|
|
r = client.delete("/v1/conturi/rar-creds")
|
|
assert r.status_code == 200
|
|
assert r.json()["ok"] is True
|
|
|
|
from app.db import get_connection
|
|
conn = get_connection()
|
|
try:
|
|
row = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=1").fetchone()
|
|
assert row["rar_creds_enc"] is None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_gate_purjare_nu_sterge_accounts(env, monkeypatch):
|
|
"""Gate purjare T1: stergerea submissions.rar_creds_enc NU atinge accounts.rar_creds_enc."""
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
from app.db import get_connection
|
|
|
|
monkeypatch.setattr(w, "RarClient", FakeRar)
|
|
|
|
conn = get_connection()
|
|
try:
|
|
enc = encrypt_creds({"email": "u@test.ro", "password": "p"})
|
|
conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (enc,))
|
|
_insert(conn, account_id=1, creds_enc=enc)
|
|
|
|
sessions = w.AccountSessions(w.get_settings())
|
|
sessions.get_token(conn, 1, {"email": "u@test.ro", "password": "p"})
|
|
|
|
# accounts.rar_creds_enc trebuie sa fie intact
|
|
row = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=1").fetchone()
|
|
assert row["rar_creds_enc"] is not None, \
|
|
"gate purjare: accounts.rar_creds_enc trebuie sa ramana intact"
|
|
finally:
|
|
conn.close()
|