21 teste noi in tests/test_import_e2e.py: - Scenariul 1: upload xlsx/csv -> column-mapping -> preview -> commit -> worker -> FINALIZATA - Scenariul 2: re-upload acelasi continut -> already_sent (nu al doilea POST la RAR) - Scenariul 3: coada MIXTA API(creds efemere)+web(accounts.rar_creds_enc durabil) — T1/Voce#5 - Masina de stari: queued->sending->sent/requeued/error + double-commit 409 + batch committed - Failure registry: RAR 400/403 + reconciliere raspuns pierdut (503) + max_retries->error - T16: purge_expired + purge_after setat la commit si la trimitere Suita completa: 264 passed, 0 failed (fara regresii Treapta 1 sau Treapta 2). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1025 lines
40 KiB
Python
1025 lines
40 KiB
Python
"""Teste E2E T15 — integrare import->commit->worker (RAR mock) + masina de stari + failure registry.
|
|
|
|
Acopera (plan T15, sect.12):
|
|
- Scenariul 1: upload xlsx/csv -> column-mapping -> preview -> commit N
|
|
-> worker process_one (MockRar) -> submission FINALIZATA cu id_prezentare.
|
|
- Scenariul 2: re-upload acelasi continut -> preview marcheaza already_sent
|
|
(NU al doilea FINALIZATA dupa commit).
|
|
- Scenariul 3: coada MIXTA API(creds efemere)+web(creds durabile accounts.rar_creds_enc):
|
|
dupa login + purjare creds efemere, submission-urile web tot se trimit
|
|
prin fallback accounts.rar_creds_enc (T1/Voce#5).
|
|
- Masina de stari (sect. 6): tranzitii queued->sending->sent/requeued/error.
|
|
- Failure registry (sect. 8): 400/403/503+reconciliere.
|
|
- T16: purge_expired + purge_after setat la commit.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv as csv_mod
|
|
import io
|
|
import os
|
|
import tempfile
|
|
|
|
import openpyxl
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# MockRar — stub complet compatibil cu interfata RarClient #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
class MockRar:
|
|
"""Stub RAR complet: login + post_prezentare + get_finalizate + nomenclator + close.
|
|
|
|
Compatibil cu interfata asteptata de worker (process_one, reconcile_and_mark,
|
|
AccountSessions.get_token, _refresh_nomenclator).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
id_prezentare: int = 99901,
|
|
login_token: str = "mock-jwt",
|
|
finalizate: list | None = None,
|
|
):
|
|
self._id_prezentare = id_prezentare
|
|
self._login_token = login_token
|
|
self._finalizate: list = finalizate or []
|
|
self.post_calls = 0
|
|
self.login_calls = 0
|
|
self.finalizate_calls = 0
|
|
self.closed = False
|
|
|
|
def login(self, email, password):
|
|
self.login_calls += 1
|
|
return self._login_token
|
|
|
|
def get_nomenclator(self, token):
|
|
return []
|
|
|
|
def get_finalizate(self, token):
|
|
self.finalizate_calls += 1
|
|
return self._finalizate
|
|
|
|
def post_prezentare(self, token, payload):
|
|
self.post_calls += 1
|
|
return {"id": self._id_prezentare}
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Fixture client + DB izolata #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
@pytest.fixture()
|
|
def env(monkeypatch):
|
|
"""DB temporara izolata per test, client FastAPI, si functia get_connection."""
|
|
tmp = tempfile.mkdtemp()
|
|
db_path = os.path.join(tmp, "e2e.db")
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", db_path)
|
|
|
|
from app.config import get_settings
|
|
from app.crypto import reset_cache
|
|
|
|
get_settings.cache_clear()
|
|
reset_cache()
|
|
|
|
from app.db import get_connection
|
|
from app.main import app
|
|
|
|
with TestClient(app) as client:
|
|
yield client, get_connection, get_settings
|
|
|
|
get_settings.cache_clear()
|
|
reset_cache()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Helpere pentru fisiere test #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
_HEADER = ["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]
|
|
_ROW_1 = ["WVWZZZ1KZAW001111", "B100TST", "2026-06-15", "123456", "Revizie"]
|
|
_ROW_2 = ["WVWZZZ1KZAW002222", "CJ200AB", "2026-05-20", "98765", "Revizie"]
|
|
|
|
_DEFAULT_COLUMN_MAPPING = {
|
|
"VIN": "vin",
|
|
"Nr inmatriculare": "nr_inmatriculare",
|
|
"Data prestatie": "data_prestatie",
|
|
"Odometru final": "odometru_final",
|
|
"Operatie": "operatie",
|
|
}
|
|
|
|
|
|
def _make_xlsx(rows: list[list]) -> bytes:
|
|
"""Creeaza un xlsx in-memory cu openpyxl."""
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
assert ws is not None # workbook nou are intotdeauna un sheet activ
|
|
ws.title = "Sheet1"
|
|
for row in rows:
|
|
ws.append(row)
|
|
buf = io.BytesIO()
|
|
wb.save(buf)
|
|
return buf.getvalue()
|
|
|
|
|
|
def _make_csv(rows: list[list], delimiter: str = ";") -> bytes:
|
|
"""Creeaza un CSV in-memory cu delimiter RO (';' implicit)."""
|
|
buf = io.StringIO()
|
|
writer = csv_mod.writer(buf, delimiter=delimiter)
|
|
for row in rows:
|
|
writer.writerow(row)
|
|
return buf.getvalue().encode("utf-8")
|
|
|
|
|
|
def _seed_operation_mapping(client: TestClient, cod_op: str = "Revizie", cod_prest: str = "OE-1") -> None:
|
|
"""Seeda o mapare de operatii (cod_op -> cod_prestatie) cu auto_send=True."""
|
|
client.post("/v1/mapari", json={
|
|
"cod_op_service": cod_op,
|
|
"cod_prestatie": cod_prest,
|
|
"auto_send": True,
|
|
})
|
|
|
|
|
|
def _upload_and_map(client: TestClient, rows: list[list] | None = None) -> int:
|
|
"""Upload xlsx + salveaza column-mapping + seeda operatii. Intoarce import_id."""
|
|
if rows is None:
|
|
rows = [_HEADER, _ROW_1]
|
|
data = _make_xlsx(rows)
|
|
r = client.post(
|
|
"/v1/import",
|
|
files={"file": ("test.xlsx", io.BytesIO(data), "application/octet-stream")},
|
|
)
|
|
assert r.status_code == 200, f"Upload esuat: {r.text}"
|
|
import_id = r.json()["import_id"]
|
|
|
|
rc = client.post(
|
|
f"/v1/import/{import_id}/column-mapping",
|
|
json={"json_mapare": _DEFAULT_COLUMN_MAPPING},
|
|
)
|
|
assert rc.status_code == 200, f"Salvare column-mapping esuata: {rc.text}"
|
|
|
|
_seed_operation_mapping(client)
|
|
return import_id
|
|
|
|
|
|
def _drain_queue(conn, settings, mock_rar: MockRar, token: str = "mock-tok") -> list[str]:
|
|
"""Dreneaza coada cu MockRar (claim_one + process_one per rand). Intoarce rezultatele."""
|
|
import app.worker.__main__ as w
|
|
|
|
results = []
|
|
while True:
|
|
claimed = w.claim_one(conn)
|
|
if claimed is None:
|
|
break
|
|
result = w.process_one(conn, settings, mock_rar, token, claimed)
|
|
results.append(result)
|
|
return results
|
|
|
|
|
|
# =========================================================================== #
|
|
# Scenariul 1 — E2E complet xlsx: upload->map->preview->commit->worker #
|
|
# =========================================================================== #
|
|
|
|
class TestE2EBasicXlsx:
|
|
def test_e2e_complet_un_rand(self, env):
|
|
"""E2E xlsx (1 rand): upload->column-mapping->preview->commit->worker -> FINALIZATA."""
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
# 1. Upload + mapare
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
|
|
# 2. Preview: rand trebuie sa fie 'ok'
|
|
rp = client.get(f"/v1/import/{import_id}/preview")
|
|
assert rp.status_code == 200, rp.text
|
|
preview = rp.json()
|
|
assert preview["summary"].get("ok", 0) == 1, \
|
|
f"Asteptat 1 rand ok, primit: {preview['summary']}"
|
|
|
|
# 3. Commit
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1,
|
|
"reviewed_rows": [],
|
|
"confirmed_by": "test@e2e.ro",
|
|
})
|
|
assert rc.status_code == 200, rc.text
|
|
commit_body = rc.json()
|
|
assert commit_body["enqueued"] == 1
|
|
assert len(commit_body["submissions"]) == 1
|
|
sub_id = commit_body["submissions"][0]["submission_id"]
|
|
|
|
# 4. Verifica status initial 'queued'
|
|
conn = get_connection()
|
|
try:
|
|
row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone()
|
|
assert row["status"] == "queued", f"Status initial asteptat 'queued', primit: {row['status']}"
|
|
finally:
|
|
conn.close()
|
|
|
|
# 5. Worker cu MockRar -> trimite la RAR
|
|
mock_rar = MockRar(id_prezentare=77001)
|
|
conn = get_connection()
|
|
try:
|
|
results = _drain_queue(conn, settings, mock_rar)
|
|
assert results == ["sent"], f"Asteptat ['sent'], primit: {results}"
|
|
|
|
# 6. Verifica FINALIZATA: status='sent', id_prezentare=77001
|
|
row = conn.execute(
|
|
"SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["status"] == "sent", f"Status final asteptat 'sent', primit: {row['status']}"
|
|
assert row["id_prezentare"] == 77001, \
|
|
f"id_prezentare asteptat 77001, primit: {row['id_prezentare']}"
|
|
finally:
|
|
conn.close()
|
|
|
|
assert mock_rar.post_calls == 1
|
|
assert mock_rar.finalizate_calls == 0 # happy path fara reconciliere
|
|
|
|
def test_e2e_complet_doua_randuri(self, env):
|
|
"""E2E xlsx (2 randuri): commit N=2 -> worker dreneaza 2 submission-uri -> ambele FINALIZATA."""
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1, _ROW_2])
|
|
|
|
rp = client.get(f"/v1/import/{import_id}/preview")
|
|
assert rp.json()["summary"].get("ok", 0) == 2, \
|
|
f"Asteptat 2 randuri ok: {rp.json()['summary']}"
|
|
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 2, "reviewed_rows": []
|
|
})
|
|
assert rc.status_code == 200, rc.text
|
|
assert rc.json()["enqueued"] == 2
|
|
|
|
mock_rar = MockRar(id_prezentare=88001)
|
|
conn = get_connection()
|
|
try:
|
|
results = _drain_queue(conn, settings, mock_rar)
|
|
assert len(results) == 2
|
|
assert all(r == "sent" for r in results), f"Rezultate: {results}"
|
|
|
|
rows = conn.execute(
|
|
"SELECT status, id_prezentare FROM submissions ORDER BY id"
|
|
).fetchall()
|
|
assert all(r["status"] == "sent" for r in rows), \
|
|
f"Nu toate sent: {[(r['status']) for r in rows]}"
|
|
assert all(r["id_prezentare"] == 88001 for r in rows)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert mock_rar.post_calls == 2
|
|
|
|
def test_atestare_scrisa_la_commit(self, env):
|
|
"""Commit scrie import_attestations (rows_hash + n_confirmed + confirmed_by)."""
|
|
client, get_connection, get_settings = env
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1,
|
|
"reviewed_rows": [],
|
|
"confirmed_by": "declarant@e2e.ro",
|
|
})
|
|
assert rc.status_code == 200
|
|
rows_hash = rc.json()["rows_hash"]
|
|
assert rows_hash, "rows_hash trebuie non-gol"
|
|
|
|
conn = get_connection()
|
|
try:
|
|
att = conn.execute(
|
|
"SELECT n_confirmed, rows_hash, confirmed_by "
|
|
"FROM import_attestations WHERE batch_id=?",
|
|
(import_id,),
|
|
).fetchone()
|
|
assert att is not None, "import_attestations trebuie sa existe dupa commit"
|
|
assert att["n_confirmed"] == 1
|
|
assert att["rows_hash"] == rows_hash
|
|
assert att["confirmed_by"] == "declarant@e2e.ro"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =========================================================================== #
|
|
# Scenariul 1b — E2E pe CSV (delimiter ';', encoding RO) #
|
|
# =========================================================================== #
|
|
|
|
class TestE2EBasicCsv:
|
|
def test_e2e_complet_csv_semicolon(self, env):
|
|
"""E2E CSV cu ';' (export RO): upload->map->preview->commit->worker -> FINALIZATA."""
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
# Upload CSV cu ';'
|
|
csv_data = _make_csv([_HEADER, _ROW_1], delimiter=";")
|
|
r = client.post(
|
|
"/v1/import",
|
|
files={"file": ("test.csv", io.BytesIO(csv_data), "application/octet-stream")},
|
|
)
|
|
assert r.status_code == 200, f"Upload CSV esuat: {r.text}"
|
|
import_id = r.json()["import_id"]
|
|
assert r.json()["total_rows"] == 1
|
|
|
|
# Salveaza column-mapping
|
|
rc = client.post(
|
|
f"/v1/import/{import_id}/column-mapping",
|
|
json={"json_mapare": _DEFAULT_COLUMN_MAPPING},
|
|
)
|
|
assert rc.status_code == 200
|
|
|
|
_seed_operation_mapping(client)
|
|
|
|
# Preview
|
|
rp = client.get(f"/v1/import/{import_id}/preview")
|
|
assert rp.status_code == 200
|
|
assert rp.json()["summary"].get("ok", 0) == 1, \
|
|
f"Asteptat 1 rand ok la CSV: {rp.json()['summary']}"
|
|
|
|
# Commit + worker
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
assert rc.status_code == 200
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
mock_rar = MockRar(id_prezentare=44001)
|
|
conn = get_connection()
|
|
try:
|
|
results = _drain_queue(conn, settings, mock_rar)
|
|
assert results == ["sent"]
|
|
row = conn.execute(
|
|
"SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["status"] == "sent"
|
|
assert row["id_prezentare"] == 44001
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =========================================================================== #
|
|
# Scenariul 2 — Re-upload: preview marcheaza already_sent (NU al 2-lea POST) #
|
|
# =========================================================================== #
|
|
|
|
class TestE2EAlreadySent:
|
|
def _commit_and_send(self, client, get_connection, get_settings, rows=None) -> int:
|
|
"""Upload+commit+worker. Intoarce sub_id."""
|
|
import_id = _upload_and_map(client, rows=rows)
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
assert rc.status_code == 200
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
_drain_queue(conn, get_settings(), MockRar(id_prezentare=55001))
|
|
finally:
|
|
conn.close()
|
|
return sub_id
|
|
|
|
def test_reupload_acelasi_continut_already_sent(self, env):
|
|
"""Scenariul 2: dupa commit+worker, re-upload acelasi rand -> already_sent la preview.
|
|
|
|
Verificam ca sistemul NU creeaza un al doilea FINALIZATA.
|
|
"""
|
|
client, get_connection, get_settings = env
|
|
|
|
# Upload 1 -> commit -> worker
|
|
self._commit_and_send(client, get_connection, get_settings, rows=[_HEADER, _ROW_1])
|
|
|
|
# Re-upload acelasi continut
|
|
import_id2 = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
rp2 = client.get(f"/v1/import/{import_id2}/preview")
|
|
assert rp2.status_code == 200
|
|
|
|
body = rp2.json()
|
|
statuses = [r["resolved_status"] for r in body["rows"]]
|
|
assert "already_sent" in statuses, \
|
|
f"Asteptat 'already_sent' la re-upload, primit: {statuses}"
|
|
assert "ok" not in statuses, \
|
|
"Randul deja trimis NU trebuie sa fie 'ok' la re-upload"
|
|
|
|
def test_reupload_cu_odometru_diferit_nu_already_sent(self, env):
|
|
"""Re-upload cu odometru diferit (typo corectat) -> rand NOU (alta cheie idempotency)."""
|
|
client, get_connection, get_settings = env
|
|
|
|
self._commit_and_send(client, get_connection, get_settings, rows=[_HEADER, _ROW_1])
|
|
|
|
# Acelasi VIN, data, nr; dar odometru diferit -> cheie diferita
|
|
row_corectat = [_ROW_1[0], _ROW_1[1], _ROW_1[2], "123457", _ROW_1[4]]
|
|
import_id2 = _upload_and_map(client, rows=[_HEADER, row_corectat])
|
|
rp2 = client.get(f"/v1/import/{import_id2}/preview")
|
|
assert rp2.status_code == 200
|
|
|
|
statuses = [r["resolved_status"] for r in rp2.json()["rows"]]
|
|
assert "already_sent" not in statuses, \
|
|
f"Rand cu odometru diferit NU trebuie sa fie already_sent, primit: {statuses}"
|
|
assert "ok" in statuses, \
|
|
f"Rand nou (odometru diferit) trebuie sa fie 'ok', primit: {statuses}"
|
|
|
|
def test_already_sent_info_contine_id_prezentare(self, env):
|
|
"""La already_sent, preview intoarce already_sent_info cu submission_id."""
|
|
client, get_connection, get_settings = env
|
|
|
|
self._commit_and_send(client, get_connection, get_settings, rows=[_HEADER, _ROW_1])
|
|
|
|
import_id2 = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
rp2 = client.get(f"/v1/import/{import_id2}/preview")
|
|
body = rp2.json()
|
|
|
|
already_sent_row = next(
|
|
(r for r in body["rows"] if r["resolved_status"] == "already_sent"), None
|
|
)
|
|
assert already_sent_row is not None
|
|
assert "already_sent_info" in already_sent_row, \
|
|
"already_sent trebuie sa includa already_sent_info"
|
|
info = already_sent_row["already_sent_info"]
|
|
assert "submission_id" in info, f"already_sent_info trebuie sa aiba submission_id: {info}"
|
|
|
|
|
|
# =========================================================================== #
|
|
# Scenariul 3 — Coada MIXTA: API(creds efemere) + web(creds durabile) #
|
|
# =========================================================================== #
|
|
|
|
class TestE2EMixedQueue:
|
|
def test_coada_mixta_ambele_trimise(self, env, monkeypatch):
|
|
"""Scenariul 3 (T1/Voce#5): API (creds efemere) + web (canal import, fara creds pe submission).
|
|
|
|
Dupa purjarea creds efemere ale canalului API (la primul login), submission-urile
|
|
import (web) tot se trimit prin fallback accounts.rar_creds_enc.
|
|
Verificam ca ambele submission-uri ajung la status='sent'.
|
|
"""
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
# 1. Submission web: via import (NU are rar_creds_enc pe submission)
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
assert rc.status_code == 200
|
|
sub_web_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
# 2. Submission API: via POST /v1/prezentari cu creds efemere
|
|
r_api = client.post("/v1/prezentari", json={
|
|
"rar_credentials": {"email": "api@test.ro", "password": "pass_api"},
|
|
"prezentari": [{
|
|
"vin": _ROW_2[0],
|
|
"nr_inmatriculare": _ROW_2[1],
|
|
"data_prestatie": _ROW_2[2],
|
|
"odometru_final": _ROW_2[3],
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}],
|
|
})
|
|
assert r_api.status_code == 200, r_api.text
|
|
sub_api_id = r_api.json()["results"][0]["submission_id"]
|
|
|
|
# 3. Seteaza creds durabile in accounts (canal web fallback)
|
|
conn = get_connection()
|
|
try:
|
|
web_creds_enc = encrypt_creds({"email": "web@test.ro", "password": "pass_web"})
|
|
conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (web_creds_enc,))
|
|
conn.commit()
|
|
|
|
# Verifica precondita: sub_web (import) NU are creds pe submission
|
|
row_web = conn.execute(
|
|
"SELECT rar_creds_enc FROM submissions WHERE id=?", (sub_web_id,)
|
|
).fetchone()
|
|
assert row_web["rar_creds_enc"] is None, \
|
|
"Submission import NU trebuie sa aiba rar_creds_enc"
|
|
|
|
# Verifica precondita: sub_api ARE creds pe submission (efemere)
|
|
row_api_sub = conn.execute(
|
|
"SELECT rar_creds_enc FROM submissions WHERE id=?", (sub_api_id,)
|
|
).fetchone()
|
|
assert row_api_sub["rar_creds_enc"] is not None, \
|
|
"Submission API trebuie sa aiba rar_creds_enc (efemere)"
|
|
finally:
|
|
conn.close()
|
|
|
|
# 4. Worker cu MockRar injectat prin AccountSessions (simulam bucla worker)
|
|
mock_rar = MockRar(id_prezentare=66001, login_token="tok-mock")
|
|
monkeypatch.setattr(w, "RarClient", lambda settings=None: mock_rar)
|
|
|
|
sessions = w.AccountSessions(settings)
|
|
conn = get_connection()
|
|
try:
|
|
processed = 0
|
|
|
|
# Drena coada simuland bucla worker completa (T1/D4: creds efemere + fallback durabil)
|
|
for _ in range(10): # limita de siguranta
|
|
claimed = w.claim_one(conn)
|
|
if claimed is None:
|
|
break
|
|
|
|
sid = claimed["id"]
|
|
account_id = claimed["account_id"]
|
|
|
|
# T1/D4: creds din submission (API efemer) OR fallback accounts.rar_creds_enc (web)
|
|
creds = w._creds_for(claimed, settings) or w._creds_from_account(conn, account_id)
|
|
assert creds is not None, \
|
|
f"Creds None pentru submission {sid} — fallback durabil trebuie sa existe"
|
|
|
|
token = sessions.get_token(conn, account_id, creds)
|
|
assert token is not None, f"Token None pentru submission {sid}"
|
|
|
|
rar = sessions.rar(account_id)
|
|
result = w.process_one(conn, settings, rar, token, claimed)
|
|
assert result == "sent", f"Submission {sid} -> {result} (asteptat 'sent')"
|
|
processed += 1
|
|
|
|
assert processed == 2, f"Asteptat 2 submissions procesate, primit: {processed}"
|
|
|
|
# 5. Ambele submission-uri FINALIZATE
|
|
row_web = conn.execute(
|
|
"SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_web_id,)
|
|
).fetchone()
|
|
row_api = conn.execute(
|
|
"SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_api_id,)
|
|
).fetchone()
|
|
|
|
assert row_web["status"] == "sent", f"Submission web: {row_web['status']}"
|
|
assert row_api["status"] == "sent", f"Submission API: {row_api['status']}"
|
|
assert row_web["id_prezentare"] == 66001
|
|
assert row_api["id_prezentare"] == 66001
|
|
finally:
|
|
conn.close()
|
|
sessions.close_all()
|
|
|
|
assert mock_rar.post_calls == 2
|
|
|
|
def test_purjare_creds_efemere_nu_sterge_durabile(self, env, monkeypatch):
|
|
"""T1/Gate purjare (OV-5): dupa login, submissions.rar_creds_enc sterse DAR accounts.rar_creds_enc INTACT.
|
|
|
|
Worker sterge DOAR submissions.rar_creds_enc (efemere), NU accounts.rar_creds_enc (durabile).
|
|
"""
|
|
import app.worker.__main__ as w
|
|
from app.crypto import encrypt_creds
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
# Submission API cu creds efemere
|
|
r_api = client.post("/v1/prezentari", json={
|
|
"rar_credentials": {"email": "api@test.ro", "password": "pass_api"},
|
|
"prezentari": [{
|
|
"vin": _ROW_2[0],
|
|
"nr_inmatriculare": _ROW_2[1],
|
|
"data_prestatie": _ROW_2[2],
|
|
"odometru_final": _ROW_2[3],
|
|
"prestatii": [{"cod_prestatie": "OE-1"}],
|
|
}],
|
|
})
|
|
assert r_api.status_code == 200
|
|
sub_id = r_api.json()["results"][0]["submission_id"]
|
|
|
|
# Seteaza creds durabile pe cont
|
|
conn = get_connection()
|
|
try:
|
|
web_creds_enc = encrypt_creds({"email": "web@test.ro", "password": "pass_web"})
|
|
conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (web_creds_enc,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
mock_rar = MockRar(id_prezentare=77777)
|
|
monkeypatch.setattr(w, "RarClient", lambda settings=None: mock_rar)
|
|
|
|
sessions = w.AccountSessions(settings)
|
|
conn = get_connection()
|
|
try:
|
|
claimed = w.claim_one(conn)
|
|
assert claimed is not None
|
|
|
|
creds = w._creds_for(claimed, settings) or w._creds_from_account(conn, claimed["account_id"])
|
|
sessions.get_token(conn, claimed["account_id"], creds)
|
|
|
|
# Dupa login: submissions.rar_creds_enc sterse (creds efemere purjate)
|
|
row = conn.execute(
|
|
"SELECT rar_creds_enc FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["rar_creds_enc"] is None, \
|
|
"submissions.rar_creds_enc trebuie sterse dupa login (efemere)"
|
|
|
|
# accounts.rar_creds_enc TREBUIE sa ramana (durabile, nu se sterg)
|
|
acc_row = conn.execute(
|
|
"SELECT rar_creds_enc FROM accounts WHERE id=1"
|
|
).fetchone()
|
|
assert acc_row["rar_creds_enc"] is not None, \
|
|
"accounts.rar_creds_enc trebuie sa RAMANA intact dupa purjarea creds efemere"
|
|
finally:
|
|
conn.close()
|
|
sessions.close_all()
|
|
|
|
|
|
# =========================================================================== #
|
|
# Masina de stari (sect. 6 din plan): tranzitii explicite per submission #
|
|
# =========================================================================== #
|
|
|
|
class TestStateMachine:
|
|
def test_tranzitii_queued_sending_sent(self, env):
|
|
"""Masina de stari: queued -> sending (claim) -> sent (worker + MockRar)."""
|
|
import app.worker.__main__ as w
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
# Stare 1: queued
|
|
row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone()
|
|
assert row["status"] == "queued"
|
|
|
|
# Tranzitia claim: queued -> sending
|
|
claimed = w.claim_one(conn)
|
|
assert claimed is not None, "claim_one trebuie sa returneze randul queued"
|
|
row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone()
|
|
assert row["status"] == "sending", f"Dupa claim: asteptat 'sending', primit: {row['status']}"
|
|
|
|
# Tranzitia process: sending -> sent
|
|
mock_rar = MockRar(id_prezentare=12345)
|
|
result = w.process_one(conn, settings, mock_rar, "tok", claimed)
|
|
assert result == "sent"
|
|
|
|
row = conn.execute(
|
|
"SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["status"] == "sent", f"Dupa process: {row['status']}"
|
|
assert row["id_prezentare"] == 12345
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_tranzitie_sending_requeued_pe_eroare_tranzitorie(self, env):
|
|
"""Masina de stari: sending -> queued (retry) pe eroare de retea (ConnectError)."""
|
|
import httpx
|
|
|
|
import app.worker.__main__ as w
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
claimed = w.claim_one(conn)
|
|
assert claimed is not None
|
|
|
|
# RAR: POST esueaza cu ConnectError (tranzitoriu, nu reconciliaza nimic)
|
|
class RarConnErr(MockRar):
|
|
def get_finalizate(self, token):
|
|
self.finalizate_calls += 1
|
|
return [] # nimic la RAR -> nu reconciliaza
|
|
|
|
def post_prezentare(self, token, payload):
|
|
self.post_calls += 1
|
|
raise httpx.ConnectError("conn refused")
|
|
|
|
rar_err = RarConnErr()
|
|
result = w.process_one(conn, settings, rar_err, "tok", claimed)
|
|
assert result == "requeued", f"Asteptat 'requeued', primit: {result}"
|
|
|
|
row = conn.execute(
|
|
"SELECT status, retry_count, next_attempt_at FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["status"] == "queued", f"Dupa requeue: {row['status']}"
|
|
assert row["retry_count"] == 1
|
|
assert row["next_attempt_at"] is not None, "next_attempt_at trebuie setat la backoff"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_tranzitie_sending_error_la_4xx_nerecuperabil(self, env):
|
|
"""Masina de stari: sending -> error pe RAR 403 (nerecuperabil, fara retry)."""
|
|
from app.rar_client import RarError
|
|
|
|
import app.worker.__main__ as w
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
claimed = w.claim_one(conn)
|
|
assert claimed is not None
|
|
|
|
class Rar403(MockRar):
|
|
def post_prezentare(self, token, payload):
|
|
self.post_calls += 1
|
|
raise RarError("forbidden", status_code=403)
|
|
|
|
result = w.process_one(conn, settings, Rar403(), "tok", claimed)
|
|
assert result == "error"
|
|
|
|
row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone()
|
|
assert row["status"] == "error"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_batch_status_staging_la_committed(self, env):
|
|
"""Masina de stari batch: staging -> committed dupa commit reusit (cel putin 1 enqueued)."""
|
|
client, get_connection, get_settings = env
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
assert rc.status_code == 200
|
|
|
|
conn = get_connection()
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT status FROM import_batches WHERE id=?", (import_id,)
|
|
).fetchone()
|
|
assert row["status"] == "committed", \
|
|
f"Batch status asteptat 'committed', primit: {row['status']}"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_double_commit_returneaza_409(self, env):
|
|
"""Masina de stari batch: al doilea commit pe acelasi batch -> 409 Conflict."""
|
|
client, get_connection, get_settings = env
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
|
|
r1 = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
assert r1.status_code == 200
|
|
|
|
r2 = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
assert r2.status_code == 409, \
|
|
f"Al doilea commit trebuie sa returneze 409, primit: {r2.status_code}"
|
|
|
|
|
|
# =========================================================================== #
|
|
# Registrul de esecuri (sect. 8): tipuri de eroare RAR si recuperare #
|
|
# =========================================================================== #
|
|
|
|
class TestFailureRegistry:
|
|
def test_rar_400_becomes_needs_data(self, env):
|
|
"""Failure registry: RAR 400 (VIN invalid) -> submission 'needs_data' + rar_error populat."""
|
|
from app.rar_client import RarError
|
|
|
|
import app.worker.__main__ as w
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
claimed = w.claim_one(conn)
|
|
assert claimed is not None
|
|
|
|
class Rar400(MockRar):
|
|
def post_prezentare(self, token, payload):
|
|
self.post_calls += 1
|
|
raise RarError("VIN invalid", status_code=400,
|
|
field_errors=[{"field": "vin", "message": "invalid"}])
|
|
|
|
result = w.process_one(conn, settings, Rar400(), "tok", claimed)
|
|
assert result == "needs_data"
|
|
|
|
row = conn.execute(
|
|
"SELECT status, rar_error FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["status"] == "needs_data"
|
|
assert row["rar_error"], "rar_error trebuie populat pentru 400"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_raspuns_pierdut_reconciliere_sent(self, env):
|
|
"""Failure registry: POST esueaza (503) dar recordul exista la RAR -> reconciliere -> sent.
|
|
|
|
Simuleaza cazul in care POST-ul a ajuns la RAR dar raspunsul s-a pierdut in retea.
|
|
Worker-ul reconciliaza cu get_finalizate si marcheaza 'sent' fara re-POST.
|
|
"""
|
|
from app.rar_client import RarError
|
|
|
|
import app.worker.__main__ as w
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
claimed = w.claim_one(conn)
|
|
assert claimed is not None
|
|
content = claimed["content"]
|
|
|
|
# RAR: POST esueaza cu 503 DAR recordul exista deja in finalizate
|
|
class RarRaspunsPierdut(MockRar):
|
|
def get_finalizate(self, token):
|
|
self.finalizate_calls += 1
|
|
return [{
|
|
"id": 99999,
|
|
"vin": content.get("vin", ""),
|
|
"dataPrestatie": content.get("data_prestatie", ""),
|
|
"odometruFinal": int(content.get("odometru_final", 0)),
|
|
}]
|
|
|
|
def post_prezentare(self, token, payload):
|
|
self.post_calls += 1
|
|
raise RarError("503 bad gateway", status_code=503)
|
|
|
|
rar = RarRaspunsPierdut()
|
|
result = w.process_one(conn, settings, rar, "tok", claimed)
|
|
assert result == "sent", f"Asteptat 'sent' prin reconciliere, primit: {result}"
|
|
|
|
row = conn.execute(
|
|
"SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["status"] == "sent"
|
|
assert row["id_prezentare"] == 99999
|
|
assert rar.post_calls == 1 # s-a trimis o data (a esuat)
|
|
assert rar.finalizate_calls == 1 # reconciliere o data
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_max_retries_devine_error(self, env):
|
|
"""Failure registry: requeue_with_backoff dupa worker_max_retries -> error."""
|
|
import app.worker.__main__ as w
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
conn = get_connection()
|
|
try:
|
|
# Seteaza retry_count la maximul permis
|
|
conn.execute(
|
|
"UPDATE submissions SET retry_count=? WHERE id=?",
|
|
(settings.worker_max_retries, sub_id),
|
|
)
|
|
conn.commit()
|
|
|
|
# Urmatorul requeue depaseste maximul -> error
|
|
w.requeue_with_backoff(conn, settings, sub_id, reason="test: max retries")
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone()
|
|
assert row["status"] == "error", \
|
|
f"Dupa max retries: asteptat 'error', primit: {row['status']}"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =========================================================================== #
|
|
# T16 — Purjare (purge_expired + purge_after setat la commit) #
|
|
# =========================================================================== #
|
|
|
|
class TestPurjare:
|
|
def test_purge_expired_sterge_sent_expirate(self, env):
|
|
"""T16: purge_expired sterge submissions cu status='sent' si purge_after in trecut."""
|
|
from app.worker.__main__ import purge_expired
|
|
|
|
client, get_connection, get_settings = env
|
|
|
|
conn = get_connection()
|
|
try:
|
|
cur = conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, purge_after) "
|
|
"VALUES ('p-key-sent', 1, 'sent', '{}', '2000-01-01 00:00:00')"
|
|
)
|
|
sid = cur.lastrowid
|
|
conn.commit()
|
|
|
|
stats = purge_expired(conn)
|
|
conn.commit()
|
|
|
|
assert stats["submissions_purged"] >= 1, f"Trebuia cel putin 1 purjat: {stats}"
|
|
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
|
|
assert row is None, "Submission expirata ('sent') trebuie stearsa"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_purge_expired_nu_sterge_queued(self, env):
|
|
"""T16: purge_expired NU sterge non-sent (queued/error) chiar daca purge_after < now."""
|
|
from app.worker.__main__ import purge_expired
|
|
|
|
client, get_connection, get_settings = env
|
|
|
|
conn = get_connection()
|
|
try:
|
|
cur = conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, purge_after) "
|
|
"VALUES ('p-key-q', 1, 'queued', '{}', '2000-01-01 00:00:00')"
|
|
)
|
|
sid = cur.lastrowid
|
|
conn.commit()
|
|
|
|
purge_expired(conn)
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
|
|
assert row is not None, "Submission 'queued' NU trebuie stearsa de purge (doar 'sent')"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_purge_after_setat_la_commit_import(self, env):
|
|
"""T16: submissions create la commit import au purge_after setat (!=NULL)."""
|
|
client, get_connection, get_settings = env
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
assert rc.status_code == 200
|
|
submissions = rc.json()["submissions"]
|
|
assert submissions, "Commit trebuie sa intoarca cel putin o submission"
|
|
|
|
conn = get_connection()
|
|
try:
|
|
for sub in submissions:
|
|
row = conn.execute(
|
|
"SELECT purge_after FROM submissions WHERE id=?", (sub["submission_id"],)
|
|
).fetchone()
|
|
assert row["purge_after"] is not None, \
|
|
f"purge_after trebuie setat pe submission {sub['submission_id']}"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_purge_after_setat_la_trimitere_worker(self, env):
|
|
"""T16: dupa trimitere (status='sent'), purge_after se (re)seteaza la now+90 zile."""
|
|
import app.worker.__main__ as w
|
|
|
|
client, get_connection, get_settings = env
|
|
settings = get_settings()
|
|
|
|
import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1])
|
|
client.get(f"/v1/import/{import_id}/preview")
|
|
rc = client.post(f"/v1/import/{import_id}/commit", json={
|
|
"n_confirmat": 1, "reviewed_rows": []
|
|
})
|
|
sub_id = rc.json()["submissions"][0]["submission_id"]
|
|
|
|
mock_rar = MockRar(id_prezentare=11111)
|
|
conn = get_connection()
|
|
try:
|
|
_drain_queue(conn, settings, mock_rar)
|
|
|
|
row = conn.execute(
|
|
"SELECT status, purge_after FROM submissions WHERE id=?", (sub_id,)
|
|
).fetchone()
|
|
assert row["status"] == "sent"
|
|
assert row["purge_after"] is not None, \
|
|
"purge_after trebuie setat/actualizat dupa trimitere (mark sent)"
|
|
finally:
|
|
conn.close()
|