"""Teste T6: comportament dupa US-001 (PRD 5.11) — auto_send nu mai tine randuri. US-001: has_no_auto_send neutralizat (return False); un cod rezolvat (mapare exacta sau regula text) -> queued direct, indiferent de auto_send=0/1 in mapping_meta. Coloanele DB operations_mapping.auto_send si operation_text_rules.auto_send RAMAN (default 1, ne-citite pentru hold). Functia has_no_auto_send RAMANE DEFINITA (importata in routes.py + import_router.py) dar intoarce mereu False. Inainte de US-001: (a) cod nou-mapat cu auto_send=0 -> nu auto-send, review manual (needs_mapping). (b) mapare existenta cu auto_send=1 -> queued. Dupa US-001: (a) cod nou-mapat cu auto_send=0 -> queued (ca si (b)). (b) mapare existenta cu auto_send=1 -> queued (neschimbat). """ from __future__ import annotations import json import os import tempfile import pytest @pytest.fixture() def env(monkeypatch): tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t6.db")) from app.config import get_settings get_settings.cache_clear() from app.db import init_db init_db() yield monkeypatch get_settings.cache_clear() @pytest.fixture() def conn(env): from app.db import get_connection c = get_connection() yield c c.close() @pytest.fixture() def client(env): from app.main import app from fastapi.testclient import TestClient with TestClient(app) as c: yield c def _insert_needs_mapping(conn, account_id=1, cod_op="ITP-CHECK"): content = { "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1", "data_prestatie": "2026-06-15", "odometru_final": "123456", "prestatii": [{"cod_op_service": cod_op, "denumire": "Inspectie tehnica"}], } cur = conn.execute( "INSERT INTO submissions (idempotency_key, account_id, status, payload_json) VALUES (?, ?, ?, ?)", (f"k-{os.urandom(4).hex()}", account_id, "needs_mapping", json.dumps(content)), ) return int(cur.lastrowid) def _add_mapping(conn, account_id=1, cod_op="ITP-CHECK", cod_prestatie="OE-1", auto_send=True): conn.execute( "INSERT OR REPLACE INTO nomenclator_rar (cod_prestatie, nume_prestatie) VALUES (?, ?)", (cod_prestatie, "Operatie test"), ) conn.execute( "INSERT OR REPLACE INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) " "VALUES (?, ?, ?, ?)", (account_id, cod_op, cod_prestatie, 1 if auto_send else 0), ) # --- load_mapping_meta --- def test_load_mapping_meta_returns_auto_send(conn): """load_mapping_meta expune coloana auto_send din DB (ramane, default 1).""" from app.mapping import load_mapping_meta _add_mapping(conn, cod_op="ITP-1", cod_prestatie="OE-1", auto_send=True) _add_mapping(conn, cod_op="ITP-2", cod_prestatie="OE-2", auto_send=False) conn.execute("INSERT OR IGNORE INTO nomenclator_rar (cod_prestatie, nume_prestatie) VALUES ('OE-2', 'Test2')") meta = load_mapping_meta(conn, 1) assert meta["ITP-1"]["auto_send"] is True assert meta["ITP-2"]["auto_send"] is False # --- has_no_auto_send (neutralizat, mereu False) --- def test_has_no_auto_send_mereu_false_cu_auto_send_fals(conn): """has_no_auto_send intoarce False chiar si cand auto_send=False in mapping_meta (US-001).""" from app.mapping import has_no_auto_send mapping_meta = { "ITP-1": {"cod_prestatie": "OE-1", "auto_send": False}, } resolved = [{"cod_op_service": "ITP-1", "cod_prestatie": "OE-1"}] assert has_no_auto_send(resolved, mapping_meta) is False def test_has_no_auto_send_mereu_false_cu_auto_send_true(conn): """has_no_auto_send intoarce False si cand auto_send=True (neschimbat, mereu False).""" from app.mapping import has_no_auto_send mapping_meta = { "ITP-1": {"cod_prestatie": "OE-1", "auto_send": True}, } resolved = [{"cod_op_service": "ITP-1", "cod_prestatie": "OE-1"}] assert has_no_auto_send(resolved, mapping_meta) is False def test_has_no_auto_send_direct_cod_prestatie(conn): """Item cu cod_prestatie direct (fara cod_op_service) -> False (neschimbat).""" from app.mapping import has_no_auto_send mapping_meta = {} resolved = [{"cod_prestatie": "OE-1"}] assert has_no_auto_send(resolved, mapping_meta) is False # --- reresolve_account cu auto_send=0 (US-001: acum requeue) --- def test_reresolve_auto_send_zero_acum_requeue(conn): """(US-001) cod nou-mapat cu auto_send=0 -> queued (nu mai review_manual).""" from app.mapping import reresolve_account sid = _insert_needs_mapping(conn, cod_op="ITP-CHECK") _add_mapping(conn, cod_op="ITP-CHECK", cod_prestatie="OE-1", auto_send=False) stats = reresolve_account(conn, 1) assert stats["requeued"] == 1, f"asteptat requeued=1, got {stats}" assert stats.get("review_manual", 0) == 0 row = conn.execute("SELECT status FROM submissions WHERE id=?", (sid,)).fetchone() assert row["status"] == "queued" def test_reresolve_auto_send_unu_requeue(conn): """REGRESIE: mapare cu auto_send=1 -> queued (neschimbat).""" from app.mapping import reresolve_account sid = _insert_needs_mapping(conn, cod_op="ITP-CHECK") _add_mapping(conn, cod_op="ITP-CHECK", cod_prestatie="OE-1", auto_send=True) stats = reresolve_account(conn, 1) assert stats["requeued"] == 1 assert stats.get("review_manual", 0) == 0 row = conn.execute("SELECT status FROM submissions WHERE id=?", (sid,)).fetchone() assert row["status"] == "queued" # --- POST /v1/prezentari cu auto_send=0 (US-001: acum queued) --- def _body_with_op(cod_op="ITP-CHECK"): return { "rar_credentials": {"email": "x@y.ro", "password": "s"}, "prezentari": [{ "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B999TST", "data_prestatie": "2026-06-15", "odometru_final": "123456", "prestatii": [{"cod_op_service": cod_op, "denumire": "Test"}], }], } def test_post_auto_send_zero_acum_queued(client, env): """(US-001) Via API: cod nou-mapat cu auto_send=0 -> queued (nu mai needs_mapping).""" from app.db import get_connection conn2 = get_connection() try: _add_mapping(conn2, cod_op="ITP-X", cod_prestatie="OE-1", auto_send=False) finally: conn2.close() r = client.post("/v1/prezentari", json=_body_with_op("ITP-X")) assert r.status_code == 200 status = r.json()["results"][0]["status"] assert status == "queued", f"auto_send=0 dupa US-001 -> queued, e: {status}" def test_post_auto_send_unu_queued(client, env): """REGRESIE: mapare existenta cu auto_send=1 -> queued ca inainte.""" from app.db import get_connection conn2 = get_connection() try: _add_mapping(conn2, cod_op="ITP-Y", cod_prestatie="OE-1", auto_send=True) finally: conn2.close() r = client.post("/v1/prezentari", json=_body_with_op("ITP-Y")) assert r.status_code == 200 status = r.json()["results"][0]["status"] assert status == "queued", f"auto_send=1 trebuie queued, e: {status}" def test_post_cod_prestatie_direct_queued(client): """Cod RAR direct (fara cod_op_service) -> queued indiferent de mapping.""" body = { "rar_credentials": {"email": "x@y.ro", "password": "s"}, "prezentari": [{ "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B999TST", "data_prestatie": "2026-06-15", "odometru_final": "123456", "prestatii": [{"cod_prestatie": "OE-1"}], }], } r = client.post("/v1/prezentari", json=body) assert r.status_code == 200 assert r.json()["results"][0]["status"] == "queued"