From 55adfa214f98949f5159e1d75e1c0d003f21e8a7 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Tue, 16 Jun 2026 20:51:57 +0000 Subject: [PATCH] =?UTF-8?q?test(e2e):=20T15=20=E2=80=94=20E2E=20integrare?= =?UTF-8?q?=20import->commit->worker=20(MockRar)=20+=20masina=20de=20stari?= =?UTF-8?q?=20+=20failure=20registry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 21 teste noi in tests/test_import_e2e.py: - Scenariul 1: upload xlsx/csv -> column-mapping -> preview -> commit -> worker -> FINALIZATA - Scenariul 2: re-upload acelasi continut -> already_sent (nu al doilea POST la RAR) - Scenariul 3: coada MIXTA API(creds efemere)+web(accounts.rar_creds_enc durabil) — T1/Voce#5 - Masina de stari: queued->sending->sent/requeued/error + double-commit 409 + batch committed - Failure registry: RAR 400/403 + reconciliere raspuns pierdut (503) + max_retries->error - T16: purge_expired + purge_after setat la commit si la trimitere Suita completa: 264 passed, 0 failed (fara regresii Treapta 1 sau Treapta 2). Co-Authored-By: Claude Sonnet 4.6 --- tests/test_import_e2e.py | 1024 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 1024 insertions(+) create mode 100644 tests/test_import_e2e.py diff --git a/tests/test_import_e2e.py b/tests/test_import_e2e.py new file mode 100644 index 0000000..66c460e --- /dev/null +++ b/tests/test_import_e2e.py @@ -0,0 +1,1024 @@ +"""Teste E2E T15 — integrare import->commit->worker (RAR mock) + masina de stari + failure registry. + +Acopera (plan T15, sect.12): + - Scenariul 1: upload xlsx/csv -> column-mapping -> preview -> commit N + -> worker process_one (MockRar) -> submission FINALIZATA cu id_prezentare. + - Scenariul 2: re-upload acelasi continut -> preview marcheaza already_sent + (NU al doilea FINALIZATA dupa commit). + - Scenariul 3: coada MIXTA API(creds efemere)+web(creds durabile accounts.rar_creds_enc): + dupa login + purjare creds efemere, submission-urile web tot se trimit + prin fallback accounts.rar_creds_enc (T1/Voce#5). + - Masina de stari (sect. 6): tranzitii queued->sending->sent/requeued/error. + - Failure registry (sect. 8): 400/403/503+reconciliere. + - T16: purge_expired + purge_after setat la commit. +""" + +from __future__ import annotations + +import csv as csv_mod +import io +import os +import tempfile + +import openpyxl +import pytest +from fastapi.testclient import TestClient + + +# --------------------------------------------------------------------------- # +# MockRar — stub complet compatibil cu interfata RarClient # +# --------------------------------------------------------------------------- # + +class MockRar: + """Stub RAR complet: login + post_prezentare + get_finalizate + nomenclator + close. + + Compatibil cu interfata asteptata de worker (process_one, reconcile_and_mark, + AccountSessions.get_token, _refresh_nomenclator). + """ + + def __init__( + self, + *, + id_prezentare: int = 99901, + login_token: str = "mock-jwt", + finalizate: list | None = None, + ): + self._id_prezentare = id_prezentare + self._login_token = login_token + self._finalizate: list = finalizate or [] + self.post_calls = 0 + self.login_calls = 0 + self.finalizate_calls = 0 + self.closed = False + + def login(self, email, password): + self.login_calls += 1 + return self._login_token + + def get_nomenclator(self, token): + return [] + + def get_finalizate(self, token): + self.finalizate_calls += 1 + return self._finalizate + + def post_prezentare(self, token, payload): + self.post_calls += 1 + return {"id": self._id_prezentare} + + def close(self): + self.closed = True + + +# --------------------------------------------------------------------------- # +# Fixture client + DB izolata # +# --------------------------------------------------------------------------- # + +@pytest.fixture() +def env(monkeypatch): + """DB temporara izolata per test, client FastAPI, si functia get_connection.""" + tmp = tempfile.mkdtemp() + db_path = os.path.join(tmp, "e2e.db") + monkeypatch.setenv("AUTOPASS_DB_PATH", db_path) + + from app.config import get_settings + from app.crypto import reset_cache + + get_settings.cache_clear() + reset_cache() + + from app.db import get_connection + from app.main import app + + with TestClient(app) as client: + yield client, get_connection, get_settings + + get_settings.cache_clear() + reset_cache() + + +# --------------------------------------------------------------------------- # +# Helpere pentru fisiere test # +# --------------------------------------------------------------------------- # + +_HEADER = ["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"] +_ROW_1 = ["WVWZZZ1KZAW001111", "B100TST", "2026-06-15", "123456", "Revizie"] +_ROW_2 = ["WVWZZZ1KZAW002222", "CJ200AB", "2026-05-20", "98765", "Revizie"] + +_DEFAULT_COLUMN_MAPPING = { + "VIN": "vin", + "Nr inmatriculare": "nr_inmatriculare", + "Data prestatie": "data_prestatie", + "Odometru final": "odometru_final", + "Operatie": "operatie", +} + + +def _make_xlsx(rows: list[list]) -> bytes: + """Creeaza un xlsx in-memory cu openpyxl.""" + wb = openpyxl.Workbook() + ws = wb.active + assert ws is not None # workbook nou are intotdeauna un sheet activ + ws.title = "Sheet1" + for row in rows: + ws.append(row) + buf = io.BytesIO() + wb.save(buf) + return buf.getvalue() + + +def _make_csv(rows: list[list], delimiter: str = ";") -> bytes: + """Creeaza un CSV in-memory cu delimiter RO (';' implicit).""" + buf = io.StringIO() + writer = csv_mod.writer(buf, delimiter=delimiter) + for row in rows: + writer.writerow(row) + return buf.getvalue().encode("utf-8") + + +def _seed_operation_mapping(client: TestClient, cod_op: str = "Revizie", cod_prest: str = "OE-1") -> None: + """Seeda o mapare de operatii (cod_op -> cod_prestatie) cu auto_send=True.""" + client.post("/v1/mapari", json={ + "cod_op_service": cod_op, + "cod_prestatie": cod_prest, + "auto_send": True, + }) + + +def _upload_and_map(client: TestClient, rows: list[list] | None = None) -> int: + """Upload xlsx + salveaza column-mapping + seeda operatii. Intoarce import_id.""" + if rows is None: + rows = [_HEADER, _ROW_1] + data = _make_xlsx(rows) + r = client.post( + "/v1/import", + files={"file": ("test.xlsx", io.BytesIO(data), "application/octet-stream")}, + ) + assert r.status_code == 200, f"Upload esuat: {r.text}" + import_id = r.json()["import_id"] + + rc = client.post( + f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _DEFAULT_COLUMN_MAPPING}, + ) + assert rc.status_code == 200, f"Salvare column-mapping esuata: {rc.text}" + + _seed_operation_mapping(client) + return import_id + + +def _drain_queue(conn, settings, mock_rar: MockRar, token: str = "mock-tok") -> list[str]: + """Dreneaza coada cu MockRar (claim_one + process_one per rand). Intoarce rezultatele.""" + import app.worker.__main__ as w + + results = [] + while True: + claimed = w.claim_one(conn) + if claimed is None: + break + result = w.process_one(conn, settings, mock_rar, token, claimed) + results.append(result) + return results + + +# =========================================================================== # +# Scenariul 1 — E2E complet xlsx: upload->map->preview->commit->worker # +# =========================================================================== # + +class TestE2EBasicXlsx: + def test_e2e_complet_un_rand(self, env): + """E2E xlsx (1 rand): upload->column-mapping->preview->commit->worker -> FINALIZATA.""" + client, get_connection, get_settings = env + settings = get_settings() + + # 1. Upload + mapare + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + + # 2. Preview: rand trebuie sa fie 'ok' + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200, rp.text + preview = rp.json() + assert preview["summary"].get("ok", 0) == 1, \ + f"Asteptat 1 rand ok, primit: {preview['summary']}" + + # 3. Commit + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, + "reviewed_rows": [], + "confirmed_by": "test@e2e.ro", + }) + assert rc.status_code == 200, rc.text + commit_body = rc.json() + assert commit_body["enqueued"] == 1 + assert len(commit_body["submissions"]) == 1 + sub_id = commit_body["submissions"][0]["submission_id"] + + # 4. Verifica status initial 'queued' + conn = get_connection() + try: + row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone() + assert row["status"] == "queued", f"Status initial asteptat 'queued', primit: {row['status']}" + finally: + conn.close() + + # 5. Worker cu MockRar -> trimite la RAR + mock_rar = MockRar(id_prezentare=77001) + conn = get_connection() + try: + results = _drain_queue(conn, settings, mock_rar) + assert results == ["sent"], f"Asteptat ['sent'], primit: {results}" + + # 6. Verifica FINALIZATA: status='sent', id_prezentare=77001 + row = conn.execute( + "SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["status"] == "sent", f"Status final asteptat 'sent', primit: {row['status']}" + assert row["id_prezentare"] == 77001, \ + f"id_prezentare asteptat 77001, primit: {row['id_prezentare']}" + finally: + conn.close() + + assert mock_rar.post_calls == 1 + assert mock_rar.finalizate_calls == 0 # happy path fara reconciliere + + def test_e2e_complet_doua_randuri(self, env): + """E2E xlsx (2 randuri): commit N=2 -> worker dreneaza 2 submission-uri -> ambele FINALIZATA.""" + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1, _ROW_2]) + + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.json()["summary"].get("ok", 0) == 2, \ + f"Asteptat 2 randuri ok: {rp.json()['summary']}" + + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 2, "reviewed_rows": [] + }) + assert rc.status_code == 200, rc.text + assert rc.json()["enqueued"] == 2 + + mock_rar = MockRar(id_prezentare=88001) + conn = get_connection() + try: + results = _drain_queue(conn, settings, mock_rar) + assert len(results) == 2 + assert all(r == "sent" for r in results), f"Rezultate: {results}" + + rows = conn.execute( + "SELECT status, id_prezentare FROM submissions ORDER BY id" + ).fetchall() + assert all(r["status"] == "sent" for r in rows), \ + f"Nu toate sent: {[(r['status']) for r in rows]}" + assert all(r["id_prezentare"] == 88001 for r in rows) + finally: + conn.close() + + assert mock_rar.post_calls == 2 + + def test_atestare_scrisa_la_commit(self, env): + """Commit scrie import_attestations (rows_hash + n_confirmed + confirmed_by).""" + client, get_connection, get_settings = env + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, + "reviewed_rows": [], + "confirmed_by": "declarant@e2e.ro", + }) + assert rc.status_code == 200 + rows_hash = rc.json()["rows_hash"] + assert rows_hash, "rows_hash trebuie non-gol" + + conn = get_connection() + try: + att = conn.execute( + "SELECT n_confirmed, rows_hash, confirmed_by " + "FROM import_attestations WHERE batch_id=?", + (import_id,), + ).fetchone() + assert att is not None, "import_attestations trebuie sa existe dupa commit" + assert att["n_confirmed"] == 1 + assert att["rows_hash"] == rows_hash + assert att["confirmed_by"] == "declarant@e2e.ro" + finally: + conn.close() + + +# =========================================================================== # +# Scenariul 1b — E2E pe CSV (delimiter ';', encoding RO) # +# =========================================================================== # + +class TestE2EBasicCsv: + def test_e2e_complet_csv_semicolon(self, env): + """E2E CSV cu ';' (export RO): upload->map->preview->commit->worker -> FINALIZATA.""" + client, get_connection, get_settings = env + settings = get_settings() + + # Upload CSV cu ';' + csv_data = _make_csv([_HEADER, _ROW_1], delimiter=";") + r = client.post( + "/v1/import", + files={"file": ("test.csv", io.BytesIO(csv_data), "application/octet-stream")}, + ) + assert r.status_code == 200, f"Upload CSV esuat: {r.text}" + import_id = r.json()["import_id"] + assert r.json()["total_rows"] == 1 + + # Salveaza column-mapping + rc = client.post( + f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _DEFAULT_COLUMN_MAPPING}, + ) + assert rc.status_code == 200 + + _seed_operation_mapping(client) + + # Preview + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + assert rp.json()["summary"].get("ok", 0) == 1, \ + f"Asteptat 1 rand ok la CSV: {rp.json()['summary']}" + + # Commit + worker + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + assert rc.status_code == 200 + sub_id = rc.json()["submissions"][0]["submission_id"] + + mock_rar = MockRar(id_prezentare=44001) + conn = get_connection() + try: + results = _drain_queue(conn, settings, mock_rar) + assert results == ["sent"] + row = conn.execute( + "SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["status"] == "sent" + assert row["id_prezentare"] == 44001 + finally: + conn.close() + + +# =========================================================================== # +# Scenariul 2 — Re-upload: preview marcheaza already_sent (NU al 2-lea POST) # +# =========================================================================== # + +class TestE2EAlreadySent: + def _commit_and_send(self, client, get_connection, get_settings, rows=None) -> int: + """Upload+commit+worker. Intoarce sub_id.""" + import_id = _upload_and_map(client, rows=rows) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + assert rc.status_code == 200 + sub_id = rc.json()["submissions"][0]["submission_id"] + + conn = get_connection() + try: + _drain_queue(conn, get_settings(), MockRar(id_prezentare=55001)) + finally: + conn.close() + return sub_id + + def test_reupload_acelasi_continut_already_sent(self, env): + """Scenariul 2: dupa commit+worker, re-upload acelasi rand -> already_sent la preview. + + Verificam ca sistemul NU creeaza un al doilea FINALIZATA. + """ + client, get_connection, get_settings = env + + # Upload 1 -> commit -> worker + self._commit_and_send(client, get_connection, get_settings, rows=[_HEADER, _ROW_1]) + + # Re-upload acelasi continut + import_id2 = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + rp2 = client.get(f"/v1/import/{import_id2}/preview") + assert rp2.status_code == 200 + + body = rp2.json() + statuses = [r["resolved_status"] for r in body["rows"]] + assert "already_sent" in statuses, \ + f"Asteptat 'already_sent' la re-upload, primit: {statuses}" + assert "ok" not in statuses, \ + "Randul deja trimis NU trebuie sa fie 'ok' la re-upload" + + def test_reupload_cu_odometru_diferit_nu_already_sent(self, env): + """Re-upload cu odometru diferit (typo corectat) -> rand NOU (alta cheie idempotency).""" + client, get_connection, get_settings = env + + self._commit_and_send(client, get_connection, get_settings, rows=[_HEADER, _ROW_1]) + + # Acelasi VIN, data, nr; dar odometru diferit -> cheie diferita + row_corectat = [_ROW_1[0], _ROW_1[1], _ROW_1[2], "123457", _ROW_1[4]] + import_id2 = _upload_and_map(client, rows=[_HEADER, row_corectat]) + rp2 = client.get(f"/v1/import/{import_id2}/preview") + assert rp2.status_code == 200 + + statuses = [r["resolved_status"] for r in rp2.json()["rows"]] + assert "already_sent" not in statuses, \ + f"Rand cu odometru diferit NU trebuie sa fie already_sent, primit: {statuses}" + assert "ok" in statuses, \ + f"Rand nou (odometru diferit) trebuie sa fie 'ok', primit: {statuses}" + + def test_already_sent_info_contine_id_prezentare(self, env): + """La already_sent, preview intoarce already_sent_info cu submission_id.""" + client, get_connection, get_settings = env + + self._commit_and_send(client, get_connection, get_settings, rows=[_HEADER, _ROW_1]) + + import_id2 = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + rp2 = client.get(f"/v1/import/{import_id2}/preview") + body = rp2.json() + + already_sent_row = next( + (r for r in body["rows"] if r["resolved_status"] == "already_sent"), None + ) + assert already_sent_row is not None + assert "already_sent_info" in already_sent_row, \ + "already_sent trebuie sa includa already_sent_info" + info = already_sent_row["already_sent_info"] + assert "submission_id" in info, f"already_sent_info trebuie sa aiba submission_id: {info}" + + +# =========================================================================== # +# Scenariul 3 — Coada MIXTA: API(creds efemere) + web(creds durabile) # +# =========================================================================== # + +class TestE2EMixedQueue: + def test_coada_mixta_ambele_trimise(self, env, monkeypatch): + """Scenariul 3 (T1/Voce#5): API (creds efemere) + web (canal import, fara creds pe submission). + + Dupa purjarea creds efemere ale canalului API (la primul login), submission-urile + import (web) tot se trimit prin fallback accounts.rar_creds_enc. + Verificam ca ambele submission-uri ajung la status='sent'. + """ + import app.worker.__main__ as w + from app.crypto import encrypt_creds + + client, get_connection, get_settings = env + settings = get_settings() + + # 1. Submission web: via import (NU are rar_creds_enc pe submission) + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + assert rc.status_code == 200 + sub_web_id = rc.json()["submissions"][0]["submission_id"] + + # 2. Submission API: via POST /v1/prezentari cu creds efemere + r_api = client.post("/v1/prezentari", json={ + "rar_credentials": {"email": "api@test.ro", "password": "pass_api"}, + "prezentari": [{ + "vin": _ROW_2[0], + "nr_inmatriculare": _ROW_2[1], + "data_prestatie": _ROW_2[2], + "odometru_final": _ROW_2[3], + "prestatii": [{"cod_prestatie": "OE-1"}], + }], + }) + assert r_api.status_code == 200, r_api.text + sub_api_id = r_api.json()["results"][0]["submission_id"] + + # 3. Seteaza creds durabile in accounts (canal web fallback) + conn = get_connection() + try: + web_creds_enc = encrypt_creds({"email": "web@test.ro", "password": "pass_web"}) + conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (web_creds_enc,)) + conn.commit() + + # Verifica precondita: sub_web (import) NU are creds pe submission + row_web = conn.execute( + "SELECT rar_creds_enc FROM submissions WHERE id=?", (sub_web_id,) + ).fetchone() + assert row_web["rar_creds_enc"] is None, \ + "Submission import NU trebuie sa aiba rar_creds_enc" + + # Verifica precondita: sub_api ARE creds pe submission (efemere) + row_api_sub = conn.execute( + "SELECT rar_creds_enc FROM submissions WHERE id=?", (sub_api_id,) + ).fetchone() + assert row_api_sub["rar_creds_enc"] is not None, \ + "Submission API trebuie sa aiba rar_creds_enc (efemere)" + finally: + conn.close() + + # 4. Worker cu MockRar injectat prin AccountSessions (simulam bucla worker) + mock_rar = MockRar(id_prezentare=66001, login_token="tok-mock") + monkeypatch.setattr(w, "RarClient", lambda settings=None: mock_rar) + + sessions = w.AccountSessions(settings) + conn = get_connection() + try: + processed = 0 + + # Drena coada simuland bucla worker completa (T1/D4: creds efemere + fallback durabil) + for _ in range(10): # limita de siguranta + claimed = w.claim_one(conn) + if claimed is None: + break + + sid = claimed["id"] + account_id = claimed["account_id"] + + # T1/D4: creds din submission (API efemer) OR fallback accounts.rar_creds_enc (web) + creds = w._creds_for(claimed, settings) or w._creds_from_account(conn, account_id) + assert creds is not None, \ + f"Creds None pentru submission {sid} — fallback durabil trebuie sa existe" + + token = sessions.get_token(conn, account_id, creds) + assert token is not None, f"Token None pentru submission {sid}" + + rar = sessions.rar(account_id) + result = w.process_one(conn, settings, rar, token, claimed) + assert result == "sent", f"Submission {sid} -> {result} (asteptat 'sent')" + processed += 1 + + assert processed == 2, f"Asteptat 2 submissions procesate, primit: {processed}" + + # 5. Ambele submission-uri FINALIZATE + row_web = conn.execute( + "SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_web_id,) + ).fetchone() + row_api = conn.execute( + "SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_api_id,) + ).fetchone() + + assert row_web["status"] == "sent", f"Submission web: {row_web['status']}" + assert row_api["status"] == "sent", f"Submission API: {row_api['status']}" + assert row_web["id_prezentare"] == 66001 + assert row_api["id_prezentare"] == 66001 + finally: + conn.close() + sessions.close_all() + + assert mock_rar.post_calls == 2 + + def test_purjare_creds_efemere_nu_sterge_durabile(self, env, monkeypatch): + """T1/Gate purjare (OV-5): dupa login, submissions.rar_creds_enc sterse DAR accounts.rar_creds_enc INTACT. + + Worker sterge DOAR submissions.rar_creds_enc (efemere), NU accounts.rar_creds_enc (durabile). + """ + import app.worker.__main__ as w + from app.crypto import encrypt_creds + + client, get_connection, get_settings = env + settings = get_settings() + + # Submission API cu creds efemere + r_api = client.post("/v1/prezentari", json={ + "rar_credentials": {"email": "api@test.ro", "password": "pass_api"}, + "prezentari": [{ + "vin": _ROW_2[0], + "nr_inmatriculare": _ROW_2[1], + "data_prestatie": _ROW_2[2], + "odometru_final": _ROW_2[3], + "prestatii": [{"cod_prestatie": "OE-1"}], + }], + }) + assert r_api.status_code == 200 + sub_id = r_api.json()["results"][0]["submission_id"] + + # Seteaza creds durabile pe cont + conn = get_connection() + try: + web_creds_enc = encrypt_creds({"email": "web@test.ro", "password": "pass_web"}) + conn.execute("UPDATE accounts SET rar_creds_enc=? WHERE id=1", (web_creds_enc,)) + conn.commit() + finally: + conn.close() + + mock_rar = MockRar(id_prezentare=77777) + monkeypatch.setattr(w, "RarClient", lambda settings=None: mock_rar) + + sessions = w.AccountSessions(settings) + conn = get_connection() + try: + claimed = w.claim_one(conn) + assert claimed is not None + + creds = w._creds_for(claimed, settings) or w._creds_from_account(conn, claimed["account_id"]) + sessions.get_token(conn, claimed["account_id"], creds) + + # Dupa login: submissions.rar_creds_enc sterse (creds efemere purjate) + row = conn.execute( + "SELECT rar_creds_enc FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["rar_creds_enc"] is None, \ + "submissions.rar_creds_enc trebuie sterse dupa login (efemere)" + + # accounts.rar_creds_enc TREBUIE sa ramana (durabile, nu se sterg) + acc_row = conn.execute( + "SELECT rar_creds_enc FROM accounts WHERE id=1" + ).fetchone() + assert acc_row["rar_creds_enc"] is not None, \ + "accounts.rar_creds_enc trebuie sa RAMANA intact dupa purjarea creds efemere" + finally: + conn.close() + sessions.close_all() + + +# =========================================================================== # +# Masina de stari (sect. 6 din plan): tranzitii explicite per submission # +# =========================================================================== # + +class TestStateMachine: + def test_tranzitii_queued_sending_sent(self, env): + """Masina de stari: queued -> sending (claim) -> sent (worker + MockRar).""" + import app.worker.__main__ as w + + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + sub_id = rc.json()["submissions"][0]["submission_id"] + + conn = get_connection() + try: + # Stare 1: queued + row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone() + assert row["status"] == "queued" + + # Tranzitia claim: queued -> sending + claimed = w.claim_one(conn) + assert claimed is not None, "claim_one trebuie sa returneze randul queued" + row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone() + assert row["status"] == "sending", f"Dupa claim: asteptat 'sending', primit: {row['status']}" + + # Tranzitia process: sending -> sent + mock_rar = MockRar(id_prezentare=12345) + result = w.process_one(conn, settings, mock_rar, "tok", claimed) + assert result == "sent" + + row = conn.execute( + "SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["status"] == "sent", f"Dupa process: {row['status']}" + assert row["id_prezentare"] == 12345 + finally: + conn.close() + + def test_tranzitie_sending_requeued_pe_eroare_tranzitorie(self, env): + """Masina de stari: sending -> queued (retry) pe eroare de retea (ConnectError).""" + import httpx + + import app.worker.__main__ as w + + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + sub_id = rc.json()["submissions"][0]["submission_id"] + + conn = get_connection() + try: + claimed = w.claim_one(conn) + assert claimed is not None + + # RAR: POST esueaza cu ConnectError (tranzitoriu, nu reconciliaza nimic) + class RarConnErr(MockRar): + def get_finalizate(self, token): + self.finalizate_calls += 1 + return [] # nimic la RAR -> nu reconciliaza + + def post_prezentare(self, token, payload): + self.post_calls += 1 + raise httpx.ConnectError("conn refused") + + rar_err = RarConnErr() + result = w.process_one(conn, settings, rar_err, "tok", claimed) + assert result == "requeued", f"Asteptat 'requeued', primit: {result}" + + row = conn.execute( + "SELECT status, retry_count, next_attempt_at FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["status"] == "queued", f"Dupa requeue: {row['status']}" + assert row["retry_count"] == 1 + assert row["next_attempt_at"] is not None, "next_attempt_at trebuie setat la backoff" + finally: + conn.close() + + def test_tranzitie_sending_error_la_4xx_nerecuperabil(self, env): + """Masina de stari: sending -> error pe RAR 403 (nerecuperabil, fara retry).""" + from app.rar_client import RarError + + import app.worker.__main__ as w + + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + sub_id = rc.json()["submissions"][0]["submission_id"] + + conn = get_connection() + try: + claimed = w.claim_one(conn) + assert claimed is not None + + class Rar403(MockRar): + def post_prezentare(self, token, payload): + self.post_calls += 1 + raise RarError("forbidden", status_code=403) + + result = w.process_one(conn, settings, Rar403(), "tok", claimed) + assert result == "error" + + row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone() + assert row["status"] == "error" + finally: + conn.close() + + def test_batch_status_staging_la_committed(self, env): + """Masina de stari batch: staging -> committed dupa commit reusit (cel putin 1 enqueued).""" + client, get_connection, get_settings = env + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + assert rc.status_code == 200 + + conn = get_connection() + try: + row = conn.execute( + "SELECT status FROM import_batches WHERE id=?", (import_id,) + ).fetchone() + assert row["status"] == "committed", \ + f"Batch status asteptat 'committed', primit: {row['status']}" + finally: + conn.close() + + def test_double_commit_returneaza_409(self, env): + """Masina de stari batch: al doilea commit pe acelasi batch -> 409 Conflict.""" + client, get_connection, get_settings = env + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + + r1 = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + assert r1.status_code == 200 + + r2 = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + assert r2.status_code == 409, \ + f"Al doilea commit trebuie sa returneze 409, primit: {r2.status_code}" + + +# =========================================================================== # +# Registrul de esecuri (sect. 8): tipuri de eroare RAR si recuperare # +# =========================================================================== # + +class TestFailureRegistry: + def test_rar_400_becomes_needs_data(self, env): + """Failure registry: RAR 400 (VIN invalid) -> submission 'needs_data' + rar_error populat.""" + from app.rar_client import RarError + + import app.worker.__main__ as w + + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + sub_id = rc.json()["submissions"][0]["submission_id"] + + conn = get_connection() + try: + claimed = w.claim_one(conn) + assert claimed is not None + + class Rar400(MockRar): + def post_prezentare(self, token, payload): + self.post_calls += 1 + raise RarError("VIN invalid", status_code=400, + field_errors=[{"field": "vin", "message": "invalid"}]) + + result = w.process_one(conn, settings, Rar400(), "tok", claimed) + assert result == "needs_data" + + row = conn.execute( + "SELECT status, rar_error FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["status"] == "needs_data" + assert row["rar_error"], "rar_error trebuie populat pentru 400" + finally: + conn.close() + + def test_raspuns_pierdut_reconciliere_sent(self, env): + """Failure registry: POST esueaza (503) dar recordul exista la RAR -> reconciliere -> sent. + + Simuleaza cazul in care POST-ul a ajuns la RAR dar raspunsul s-a pierdut in retea. + Worker-ul reconciliaza cu get_finalizate si marcheaza 'sent' fara re-POST. + """ + from app.rar_client import RarError + + import app.worker.__main__ as w + + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + sub_id = rc.json()["submissions"][0]["submission_id"] + + conn = get_connection() + try: + claimed = w.claim_one(conn) + assert claimed is not None + content = claimed["content"] + + # RAR: POST esueaza cu 503 DAR recordul exista deja in finalizate + class RarRaspunsPierdut(MockRar): + def get_finalizate(self, token): + self.finalizate_calls += 1 + return [{ + "id": 99999, + "vin": content.get("vin", ""), + "dataPrestatie": content.get("data_prestatie", ""), + "odometruFinal": int(content.get("odometru_final", 0)), + }] + + def post_prezentare(self, token, payload): + self.post_calls += 1 + raise RarError("503 bad gateway", status_code=503) + + rar = RarRaspunsPierdut() + result = w.process_one(conn, settings, rar, "tok", claimed) + assert result == "sent", f"Asteptat 'sent' prin reconciliere, primit: {result}" + + row = conn.execute( + "SELECT status, id_prezentare FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["status"] == "sent" + assert row["id_prezentare"] == 99999 + assert rar.post_calls == 1 # s-a trimis o data (a esuat) + assert rar.finalizate_calls == 1 # reconciliere o data + finally: + conn.close() + + def test_max_retries_devine_error(self, env): + """Failure registry: requeue_with_backoff dupa worker_max_retries -> error.""" + import app.worker.__main__ as w + + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + sub_id = rc.json()["submissions"][0]["submission_id"] + + conn = get_connection() + try: + # Seteaza retry_count la maximul permis + conn.execute( + "UPDATE submissions SET retry_count=? WHERE id=?", + (settings.worker_max_retries, sub_id), + ) + conn.commit() + + # Urmatorul requeue depaseste maximul -> error + w.requeue_with_backoff(conn, settings, sub_id, reason="test: max retries") + conn.commit() + + row = conn.execute("SELECT status FROM submissions WHERE id=?", (sub_id,)).fetchone() + assert row["status"] == "error", \ + f"Dupa max retries: asteptat 'error', primit: {row['status']}" + finally: + conn.close() + + +# =========================================================================== # +# T16 — Purjare (purge_expired + purge_after setat la commit) # +# =========================================================================== # + +class TestPurjare: + def test_purge_expired_sterge_sent_expirate(self, env): + """T16: purge_expired sterge submissions cu status='sent' si purge_after in trecut.""" + from app.worker.__main__ import purge_expired + + client, get_connection, get_settings = env + + conn = get_connection() + try: + cur = conn.execute( + "INSERT INTO submissions (idempotency_key, account_id, status, payload_json, purge_after) " + "VALUES ('p-key-sent', 1, 'sent', '{}', '2000-01-01 00:00:00')" + ) + sid = cur.lastrowid + conn.commit() + + stats = purge_expired(conn) + conn.commit() + + assert stats["submissions_purged"] >= 1, f"Trebuia cel putin 1 purjat: {stats}" + row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() + assert row is None, "Submission expirata ('sent') trebuie stearsa" + finally: + conn.close() + + def test_purge_expired_nu_sterge_queued(self, env): + """T16: purge_expired NU sterge non-sent (queued/error) chiar daca purge_after < now.""" + from app.worker.__main__ import purge_expired + + client, get_connection, get_settings = env + + conn = get_connection() + try: + cur = conn.execute( + "INSERT INTO submissions (idempotency_key, account_id, status, payload_json, purge_after) " + "VALUES ('p-key-q', 1, 'queued', '{}', '2000-01-01 00:00:00')" + ) + sid = cur.lastrowid + conn.commit() + + purge_expired(conn) + conn.commit() + + row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() + assert row is not None, "Submission 'queued' NU trebuie stearsa de purge (doar 'sent')" + finally: + conn.close() + + def test_purge_after_setat_la_commit_import(self, env): + """T16: submissions create la commit import au purge_after setat (!=NULL).""" + client, get_connection, get_settings = env + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + assert rc.status_code == 200 + submissions = rc.json()["submissions"] + assert submissions, "Commit trebuie sa intoarca cel putin o submission" + + conn = get_connection() + try: + for sub in submissions: + row = conn.execute( + "SELECT purge_after FROM submissions WHERE id=?", (sub["submission_id"],) + ).fetchone() + assert row["purge_after"] is not None, \ + f"purge_after trebuie setat pe submission {sub['submission_id']}" + finally: + conn.close() + + def test_purge_after_setat_la_trimitere_worker(self, env): + """T16: dupa trimitere (status='sent'), purge_after se (re)seteaza la now+90 zile.""" + import app.worker.__main__ as w + + client, get_connection, get_settings = env + settings = get_settings() + + import_id = _upload_and_map(client, rows=[_HEADER, _ROW_1]) + client.get(f"/v1/import/{import_id}/preview") + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": 1, "reviewed_rows": [] + }) + sub_id = rc.json()["submissions"][0]["submission_id"] + + mock_rar = MockRar(id_prezentare=11111) + conn = get_connection() + try: + _drain_queue(conn, settings, mock_rar) + + row = conn.execute( + "SELECT status, purge_after FROM submissions WHERE id=?", (sub_id,) + ).fetchone() + assert row["status"] == "sent" + assert row["purge_after"] is not None, \ + "purge_after trebuie setat/actualizat dupa trimitere (mark sent)" + finally: + conn.close()