Files
rar-autopass/tests/test_t6_auto_send.py
Claude Agent 61a7b4ea1c feat(import): T6 gate auto_send pe coduri nou-mapate (OV-1)
- load_mapping_meta: {cod_op_service -> {cod_prestatie, auto_send}}
- has_no_auto_send: verifica daca vreun item rezolvat via mapping are auto_send=0
- reresolve_account: auto_send=0 -> ramane needs_mapping (review_manual stat),
  NU trece pe queued; previne FINALIZATA eronat permanent
- reresolve_account primeste batch_id optional (pregatire T7, urmeaza)
- POST /v1/prezentari: auto_send=0 -> needs_mapping + motiv explicit
- 9 teste: load_mapping_meta, has_no_auto_send, reresolve (zero/unu), POST API

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-16 20:21:32 +00:00

203 lines
6.7 KiB
Python

"""Teste T6: gate auto_send pe coduri nou-mapate (OV-1).
Verify:
(a) cod nou-mapat cu auto_send=0 -> nu auto-send, review manual.
(b) REGRESIE: mapare existenta cu auto_send=1 tot se requeue ca azi.
"""
from __future__ import annotations
import json
import os
import tempfile
import pytest
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t6.db"))
from app.config import get_settings
get_settings.cache_clear()
from app.db import init_db
init_db()
yield monkeypatch
get_settings.cache_clear()
@pytest.fixture()
def conn(env):
from app.db import get_connection
c = get_connection()
yield c
c.close()
@pytest.fixture()
def client(env):
from app.main import app
from fastapi.testclient import TestClient
with TestClient(app) as c:
yield c
def _insert_needs_mapping(conn, account_id=1, cod_op="ITP-CHECK"):
content = {
"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
"data_prestatie": "2026-06-15", "odometru_final": "123456",
"prestatii": [{"cod_op_service": cod_op, "denumire": "Inspectie tehnica"}],
}
cur = conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) VALUES (?, ?, ?, ?)",
(f"k-{os.urandom(4).hex()}", account_id, "needs_mapping", json.dumps(content)),
)
return int(cur.lastrowid)
def _add_mapping(conn, account_id=1, cod_op="ITP-CHECK", cod_prestatie="OE-1", auto_send=True):
conn.execute(
"INSERT OR REPLACE INTO nomenclator_rar (cod_prestatie, nume_prestatie) VALUES (?, ?)",
(cod_prestatie, "Operatie test"),
)
conn.execute(
"INSERT OR REPLACE INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) "
"VALUES (?, ?, ?, ?)",
(account_id, cod_op, cod_prestatie, 1 if auto_send else 0),
)
# --- load_mapping_meta ---
def test_load_mapping_meta_returns_auto_send(conn):
from app.mapping import load_mapping_meta
_add_mapping(conn, cod_op="ITP-1", cod_prestatie="OE-1", auto_send=True)
_add_mapping(conn, cod_op="ITP-2", cod_prestatie="OE-2", auto_send=False)
conn.execute("INSERT OR IGNORE INTO nomenclator_rar (cod_prestatie, nume_prestatie) VALUES ('OE-2', 'Test2')")
meta = load_mapping_meta(conn, 1)
assert meta["ITP-1"]["auto_send"] is True
assert meta["ITP-2"]["auto_send"] is False
# --- has_no_auto_send ---
def test_has_no_auto_send_detecteaza_false(conn):
from app.mapping import has_no_auto_send
mapping_meta = {
"ITP-1": {"cod_prestatie": "OE-1", "auto_send": False},
}
resolved = [{"cod_op_service": "ITP-1", "cod_prestatie": "OE-1"}]
assert has_no_auto_send(resolved, mapping_meta) is True
def test_has_no_auto_send_trece_cu_true(conn):
from app.mapping import has_no_auto_send
mapping_meta = {
"ITP-1": {"cod_prestatie": "OE-1", "auto_send": True},
}
resolved = [{"cod_op_service": "ITP-1", "cod_prestatie": "OE-1"}]
assert has_no_auto_send(resolved, mapping_meta) is False
def test_has_no_auto_send_direct_cod_prestatie(conn):
"""Item cu cod_prestatie direct (fara cod_op_service) nu e afectat de auto_send."""
from app.mapping import has_no_auto_send
mapping_meta = {}
resolved = [{"cod_prestatie": "OE-1"}]
assert has_no_auto_send(resolved, mapping_meta) is False
# --- reresolve_account cu auto_send=0 ---
def test_reresolve_auto_send_zero_nu_requeue(conn):
"""(a) cod nou-mapat cu auto_send=0 -> ramane needs_mapping (nu trece pe queued)."""
from app.mapping import reresolve_account
sid = _insert_needs_mapping(conn, cod_op="ITP-CHECK")
_add_mapping(conn, cod_op="ITP-CHECK", cod_prestatie="OE-1", auto_send=False)
stats = reresolve_account(conn, 1)
assert stats["review_manual"] == 1
assert stats["requeued"] == 0
row = conn.execute("SELECT status, rar_error FROM submissions WHERE id=?", (sid,)).fetchone()
assert row["status"] == "needs_mapping"
err = json.loads(row["rar_error"])
assert "auto_send" in err
def test_reresolve_auto_send_unu_requeue(conn):
"""(b) REGRESIE: mapare cu auto_send=1 tot se requeue ca azi."""
from app.mapping import reresolve_account
sid = _insert_needs_mapping(conn, cod_op="ITP-CHECK")
_add_mapping(conn, cod_op="ITP-CHECK", cod_prestatie="OE-1", auto_send=True)
stats = reresolve_account(conn, 1)
assert stats["requeued"] == 1
assert stats["review_manual"] == 0
row = conn.execute("SELECT status FROM submissions WHERE id=?", (sid,)).fetchone()
assert row["status"] == "queued"
# --- POST /v1/prezentari cu auto_send=0 ---
def _body_with_op(cod_op="ITP-CHECK"):
return {
"rar_credentials": {"email": "x@y.ro", "password": "s"},
"prezentari": [{
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_op_service": cod_op, "denumire": "Test"}],
}],
}
def test_post_auto_send_zero_nu_queued(client, env):
"""(a) Via API: cod nou-mapat cu auto_send=0 -> nu 'queued', review manual."""
from app.db import get_connection
conn2 = get_connection()
try:
_add_mapping(conn2, cod_op="ITP-X", cod_prestatie="OE-1", auto_send=False)
finally:
conn2.close()
r = client.post("/v1/prezentari", json=_body_with_op("ITP-X"))
assert r.status_code == 200
status = r.json()["results"][0]["status"]
assert status != "queued", f"auto_send=0 nu trebuie sa fie queued, e: {status}"
assert status == "needs_mapping"
def test_post_auto_send_unu_queued(client, env):
"""(b) REGRESIE: mapare existenta cu auto_send=1 -> queued ca azi."""
from app.db import get_connection
conn2 = get_connection()
try:
_add_mapping(conn2, cod_op="ITP-Y", cod_prestatie="OE-1", auto_send=True)
finally:
conn2.close()
r = client.post("/v1/prezentari", json=_body_with_op("ITP-Y"))
assert r.status_code == 200
status = r.json()["results"][0]["status"]
assert status == "queued", f"auto_send=1 trebuie queued, e: {status}"
def test_post_cod_prestatie_direct_queued(client):
"""Cod RAR direct (fara cod_op_service) -> queued indiferent de mapping."""
body = {
"rar_credentials": {"email": "x@y.ro", "password": "s"},
"prezentari": [{
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}],
}
r = client.post("/v1/prezentari", json=body)
assert r.status_code == 200
assert r.json()["results"][0]["status"] == "queued"