Acoperire 49 tests offline (fără Oracle real): test_comanda_helpers (16): _build_pc_nr toate prefixele VFP + fallback, _build_sir_id_operatii csv + limit 4000 chars, _PREFIX_MAP regression. test_router_authorization (9): _company_id fallback JWT companies[0], 403 firmă neautorizată, 400 companies[] gol, string→int coercion; _server_id extragere din request.state. test_lookup_endpoints (15): cache hit/miss per schema pentru tip_deviz, masini, asiguratori, inspectori (per-asig), operatii; LIKE escape %/_/\; min 2 chars short-circuit; server_id propagat la get_connection. test_partener_create (9): 5 Pydantic validation (denumire min 2, id_firma ge 1, cui opțional), 4 service mocked (happy path, 409 duplicat CUI, fără CUI, lipsă GRANT → 500 log.critical). Pattern mock Oracle: fake context managers (async get_connection + sync cursor), monkeypatch pe lookup_service.get_schema (not _context, din cauza binding copy la import). Rulare: pytest backend/modules/service_auto/tests/ -q → 62 passed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
366 lines
13 KiB
Python
366 lines
13 KiB
Python
"""
|
|
Unit tests pentru `LookupService` (mock Oracle).
|
|
|
|
Acoperire:
|
|
- Cache hit/miss per schema (tip_deviz, masini, asiguratori, inspectori, operatii)
|
|
- LIKE escape în search_parteneri (%, _, \\ neutralizate)
|
|
- min 2 chars validation pentru search_parteneri
|
|
- get_masina_details: row absent → None
|
|
- Reset cache între teste (autouse) — atât `_cache` din lookup_service
|
|
cât și `_schema_cache` din _context.
|
|
|
|
Niciun test nu atinge Oracle real: `oracle_pool.get_connection` și `get_schema`
|
|
sunt monkeypatched. Stilul urmează `test_comanda_helpers.py` (pytest-asyncio
|
|
auto-mode din pyproject.toml — fără decoratori).
|
|
"""
|
|
from typing import List, Optional, Sequence
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from backend.modules.service_auto.services import lookup_service
|
|
from backend.modules.service_auto.services._context import reset_schema_cache
|
|
|
|
|
|
# ============================================================
|
|
# Fakes pentru oracle_pool.get_connection
|
|
# ============================================================
|
|
|
|
class _FakeCursor:
|
|
"""Cursor sincron: __enter__/__exit__ + execute/fetchall/fetchone."""
|
|
|
|
def __init__(self, fetchall_rows=None, fetchone_row=None):
|
|
self._fetchall_rows = fetchall_rows if fetchall_rows is not None else []
|
|
self._fetchone_row = fetchone_row
|
|
self.execute = MagicMock()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def fetchall(self):
|
|
return self._fetchall_rows
|
|
|
|
def fetchone(self):
|
|
return self._fetchone_row
|
|
|
|
|
|
class _FakeConn:
|
|
def __init__(self, cursor: _FakeCursor):
|
|
self._cursor = cursor
|
|
|
|
def cursor(self):
|
|
return self._cursor
|
|
|
|
|
|
class _FakeConnCM:
|
|
"""Async context manager imitând `@asynccontextmanager` din oracle_pool."""
|
|
|
|
def __init__(self, conn: _FakeConn):
|
|
self._conn = conn
|
|
|
|
async def __aenter__(self):
|
|
return self._conn
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
|
|
class _PoolStub:
|
|
"""
|
|
Înlocuiește `oracle_pool.get_connection` — întoarce cursori în ordinea dată.
|
|
Numără apelurile (per server_id, dacă vrem să verificăm propagarea).
|
|
"""
|
|
|
|
def __init__(self, cursors: Sequence[_FakeCursor]):
|
|
self._cursors = list(cursors)
|
|
self.call_count = 0
|
|
self.server_ids: List[Optional[str]] = []
|
|
|
|
def get_connection(self, server_id=None):
|
|
self.call_count += 1
|
|
self.server_ids.append(server_id)
|
|
if not self._cursors:
|
|
raise AssertionError("PoolStub epuizat: get_connection apelat de mai multe ori decât cursori furnizați")
|
|
cursor = self._cursors.pop(0)
|
|
return _FakeConnCM(_FakeConn(cursor))
|
|
|
|
|
|
def _install_pool(monkeypatch, cursors: Sequence[_FakeCursor]) -> _PoolStub:
|
|
"""Patchează `oracle_pool.get_connection` în lookup_service."""
|
|
stub = _PoolStub(cursors)
|
|
monkeypatch.setattr(lookup_service.oracle_pool, "get_connection", stub.get_connection)
|
|
return stub
|
|
|
|
|
|
def _install_schema(monkeypatch, schema: str = "MARIUSM_AUTO"):
|
|
"""Patchează `get_schema` ca să nu lovim DB pentru rezolvarea schemei."""
|
|
async def _fake_get_schema(company_id, server_id): # noqa: ARG001
|
|
return schema
|
|
monkeypatch.setattr(lookup_service, "get_schema", _fake_get_schema)
|
|
|
|
|
|
# ============================================================
|
|
# Reset cache între teste — OBLIGATORIU
|
|
# ============================================================
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_caches():
|
|
lookup_service.reset_cache()
|
|
reset_schema_cache()
|
|
yield
|
|
lookup_service.reset_cache()
|
|
reset_schema_cache()
|
|
|
|
|
|
# ============================================================
|
|
# get_tip_deviz
|
|
# ============================================================
|
|
|
|
async def test_tip_deviz_cache_miss_then_hit(monkeypatch):
|
|
"""A doua chemare pentru aceeași schemă → fără query nou."""
|
|
cursor = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)])
|
|
pool = _install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch, "MARIUSM_AUTO")
|
|
|
|
res1 = await lookup_service.LookupService.get_tip_deviz(167)
|
|
res2 = await lookup_service.LookupService.get_tip_deviz(167)
|
|
|
|
assert pool.call_count == 1, "A doua chemare trebuia să vină din cache"
|
|
assert res1 == res2
|
|
assert res1[0].id_tip == 1
|
|
assert res1[0].denumire == "POST GARANTIE"
|
|
assert res1[0].inch_validare == 1
|
|
|
|
|
|
async def test_tip_deviz_inch_validare_null_defaults_to_zero(monkeypatch):
|
|
"""`inch_validare` NULL în DB → 0 (Pydantic int)."""
|
|
cursor = _FakeCursor(fetchall_rows=[(2, "GARANTIE", None)])
|
|
_install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
res = await lookup_service.LookupService.get_tip_deviz(167)
|
|
assert res[0].inch_validare == 0
|
|
|
|
|
|
async def test_tip_deviz_different_schema_triggers_new_query(monkeypatch):
|
|
"""Schemă diferită (alt id_firma) → query nou (cache key e per schema)."""
|
|
cur1 = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)])
|
|
cur2 = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)])
|
|
pool = _install_pool(monkeypatch, [cur1, cur2])
|
|
|
|
schemas = iter(["MARIUSM_AUTO", "ALTA_FIRMA_AUTO"])
|
|
async def _switching_schema(_company_id, _server_id):
|
|
return next(schemas)
|
|
monkeypatch.setattr(lookup_service, "get_schema", _switching_schema)
|
|
|
|
await lookup_service.LookupService.get_tip_deviz(167)
|
|
await lookup_service.LookupService.get_tip_deviz(110)
|
|
|
|
assert pool.call_count == 2, "Schemă diferită ⇒ cache miss ⇒ query nou"
|
|
|
|
|
|
# ============================================================
|
|
# get_masini
|
|
# ============================================================
|
|
|
|
async def test_masini_cache_hit_avoids_second_query(monkeypatch):
|
|
cursor = _FakeCursor(fetchall_rows=[
|
|
(101, "B-32-CTL", "DACIA", "LOGAN", 2018, "ION ION SRL"),
|
|
])
|
|
pool = _install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
res1 = await lookup_service.LookupService.get_masini(167)
|
|
res2 = await lookup_service.LookupService.get_masini(167)
|
|
|
|
assert pool.call_count == 1
|
|
assert res1[0].id_masiniclient == 101
|
|
assert "ION ION SRL" in res1[0].label
|
|
assert "B-32-CTL" in res1[0].label
|
|
assert "(2018)" in res1[0].label
|
|
assert res1 == res2
|
|
|
|
|
|
async def test_masini_label_handles_missing_marca_and_year(monkeypatch):
|
|
"""Vehicul fără marca/an: fallback labels '?' fără paranteze."""
|
|
cursor = _FakeCursor(fetchall_rows=[
|
|
(102, "CT-10-EEE", None, None, None, None),
|
|
])
|
|
_install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
res = await lookup_service.LookupService.get_masini(167)
|
|
assert res[0].label == "? — ?, CT-10-EEE"
|
|
|
|
|
|
# ============================================================
|
|
# get_asiguratori
|
|
# ============================================================
|
|
|
|
async def test_asiguratori_cache_miss_then_hit(monkeypatch):
|
|
cursor = _FakeCursor(fetchall_rows=[(7, "ALLIANZ ȚIRIAC")])
|
|
pool = _install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
await lookup_service.LookupService.get_asiguratori(167)
|
|
res2 = await lookup_service.LookupService.get_asiguratori(167)
|
|
|
|
assert pool.call_count == 1
|
|
assert res2[0].id_asigurator == 7
|
|
assert res2[0].denumire == "ALLIANZ ȚIRIAC"
|
|
|
|
|
|
# ============================================================
|
|
# get_inspectori (cache key e per (schema, id_asigurator))
|
|
# ============================================================
|
|
|
|
async def test_inspectori_cache_per_asigurator(monkeypatch):
|
|
"""Cache cheie include id_asigurator → schimbare asigurator ⇒ query nou."""
|
|
cur1 = _FakeCursor(fetchall_rows=[(11, "POPESCU ION", 7)])
|
|
cur2 = _FakeCursor(fetchall_rows=[(22, "IONESCU MARIA", 8)])
|
|
pool = _install_pool(monkeypatch, [cur1, cur2])
|
|
_install_schema(monkeypatch)
|
|
|
|
await lookup_service.LookupService.get_inspectori(7, 167)
|
|
await lookup_service.LookupService.get_inspectori(7, 167) # hit
|
|
await lookup_service.LookupService.get_inspectori(8, 167) # miss (alt asigurator)
|
|
|
|
assert pool.call_count == 2
|
|
|
|
|
|
# ============================================================
|
|
# get_operatii
|
|
# ============================================================
|
|
|
|
async def test_operatii_cache_and_timpn_null_handling(monkeypatch):
|
|
"""Cache hit + timpn NULL rămâne None (nu 0.0)."""
|
|
cursor = _FakeCursor(fetchall_rows=[
|
|
(501, "OP-001", "Schimb ulei", 1.5),
|
|
(502, "OP-002", "Aliniere", None),
|
|
])
|
|
pool = _install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
res1 = await lookup_service.LookupService.get_operatii(167)
|
|
res2 = await lookup_service.LookupService.get_operatii(167)
|
|
|
|
assert pool.call_count == 1
|
|
assert res1 == res2
|
|
assert res1[0].timpn == 1.5
|
|
assert res1[1].timpn is None
|
|
|
|
|
|
# ============================================================
|
|
# search_parteneri — LIKE escape + min 2 chars
|
|
# ============================================================
|
|
|
|
async def test_search_parteneri_min_2_chars_returns_empty_without_query(monkeypatch):
|
|
"""q='a' (1 char) → [] FĂRĂ să atingă DB."""
|
|
pool = _install_pool(monkeypatch, []) # zero cursori — orice apel ⇒ AssertionError
|
|
_install_schema(monkeypatch)
|
|
|
|
res = await lookup_service.LookupService.search_parteneri("a", 167)
|
|
assert res == []
|
|
assert pool.call_count == 0
|
|
|
|
|
|
async def test_search_parteneri_escapes_like_wildcards(monkeypatch):
|
|
"""%, _, \\ trebuie escape-uite înainte de a fi trimise în LIKE."""
|
|
cursor = _FakeCursor(fetchall_rows=[])
|
|
_install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
await lookup_service.LookupService.search_parteneri("foo%bar_baz", 167)
|
|
|
|
# cursor.execute(query, {"q": ...}) — verificăm al doilea pozițional
|
|
args, _kwargs = cursor.execute.call_args
|
|
assert args[1] == {"q": "foo\\%bar\\_baz%"}, (
|
|
f"Expected escaped LIKE arg; got {args[1]!r}"
|
|
)
|
|
|
|
|
|
async def test_search_parteneri_escapes_backslash_first(monkeypatch):
|
|
"""Ordinea escape-ului: \\ se face prima, ca să nu dublezi escape-urile %/_ ulterioare."""
|
|
cursor = _FakeCursor(fetchall_rows=[])
|
|
_install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
await lookup_service.LookupService.search_parteneri("a\\b", 167)
|
|
|
|
args, _ = cursor.execute.call_args
|
|
# 'a\\b' (3 chars: a, \, b) → 'a\\\\b' (a, \, \, b) + '%'
|
|
assert args[1] == {"q": "a\\\\b%"}
|
|
|
|
|
|
async def test_search_parteneri_returns_results(monkeypatch):
|
|
"""Happy path: query trimis cu suffix '%', rezultate mapate la PartenerItem."""
|
|
cursor = _FakeCursor(fetchall_rows=[
|
|
(4321, "POPESCU IMPEX SRL"),
|
|
(4322, "POPESCU SERVICE"),
|
|
])
|
|
_install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
res = await lookup_service.LookupService.search_parteneri("pop", 167)
|
|
|
|
args, _ = cursor.execute.call_args
|
|
assert args[1] == {"q": "pop%"}
|
|
assert len(res) == 2
|
|
assert res[0].id_part == 4321
|
|
assert res[0].denumire == "POPESCU IMPEX SRL"
|
|
|
|
|
|
# ============================================================
|
|
# get_masina_details — None pentru row lipsă
|
|
# ============================================================
|
|
|
|
async def test_masina_details_returns_none_when_row_missing(monkeypatch):
|
|
"""Row inexistent → None (nu raise)."""
|
|
cursor = _FakeCursor(fetchone_row=None)
|
|
_install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
res = await lookup_service.LookupService.get_masina_details(99999, 167)
|
|
assert res is None
|
|
|
|
|
|
async def test_masina_details_maps_row_to_pydantic(monkeypatch):
|
|
"""Row complet → MasinaDetails cu toate câmpurile populate."""
|
|
cursor = _FakeCursor(fetchone_row=(
|
|
101, "B-32-CTL", "DACIA", "LOGAN", 2018, "ION ION SRL",
|
|
"UU1LSDA8N12345678", 1461, 90, 66,
|
|
))
|
|
_install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
res = await lookup_service.LookupService.get_masina_details(101, 167)
|
|
assert res is not None
|
|
assert res.id_masiniclient == 101
|
|
assert res.nr_inmatriculare == "B-32-CTL"
|
|
assert res.marca == "DACIA"
|
|
assert res.model == "LOGAN"
|
|
assert res.serie_sasiu == "UU1LSDA8N12345678"
|
|
assert res.cilindree == 1461
|
|
assert res.putere_cp == 90
|
|
assert res.putere_kw == 66
|
|
assert res.client_nume == "ION ION SRL"
|
|
assert "DACIA LOGAN" in res.label
|
|
|
|
|
|
# ============================================================
|
|
# server_id propagation
|
|
# ============================================================
|
|
|
|
async def test_server_id_propagated_to_pool(monkeypatch):
|
|
"""server_id din JWT trebuie să ajungă la oracle_pool.get_connection."""
|
|
cursor = _FakeCursor(fetchall_rows=[])
|
|
pool = _install_pool(monkeypatch, [cursor])
|
|
_install_schema(monkeypatch)
|
|
|
|
await lookup_service.LookupService.get_tip_deviz(167, server_id="mariusm_test")
|
|
|
|
assert pool.server_ids == ["mariusm_test"]
|