""" Unit tests pentru `LookupService` (mock Oracle). Acoperire: - Cache hit/miss per schema (tip_deviz, masini, asiguratori, inspectori, operatii) - LIKE escape în search_parteneri (%, _, \\ neutralizate) - min 2 chars validation pentru search_parteneri - get_masina_details: row absent → None - Reset cache între teste (autouse) — atât `_cache` din lookup_service cât și `_schema_cache` din _context. Niciun test nu atinge Oracle real: `oracle_pool.get_connection` și `get_schema` sunt monkeypatched. Stilul urmează `test_comanda_helpers.py` (pytest-asyncio auto-mode din pyproject.toml — fără decoratori). """ from typing import List, Optional, Sequence from unittest.mock import MagicMock import pytest from backend.modules.service_auto.services import lookup_service from backend.modules.service_auto.services._context import reset_schema_cache # ============================================================ # Fakes pentru oracle_pool.get_connection # ============================================================ class _FakeCursor: """Cursor sincron: __enter__/__exit__ + execute/fetchall/fetchone.""" def __init__(self, fetchall_rows=None, fetchone_row=None): self._fetchall_rows = fetchall_rows if fetchall_rows is not None else [] self._fetchone_row = fetchone_row self.execute = MagicMock() def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def fetchall(self): return self._fetchall_rows def fetchone(self): return self._fetchone_row class _FakeConn: def __init__(self, cursor: _FakeCursor): self._cursor = cursor def cursor(self): return self._cursor class _FakeConnCM: """Async context manager imitând `@asynccontextmanager` din oracle_pool.""" def __init__(self, conn: _FakeConn): self._conn = conn async def __aenter__(self): return self._conn async def __aexit__(self, exc_type, exc, tb): return False class _PoolStub: """ Înlocuiește `oracle_pool.get_connection` — întoarce cursori în ordinea dată. Numără apelurile (per server_id, dacă vrem să verificăm propagarea). """ def __init__(self, cursors: Sequence[_FakeCursor]): self._cursors = list(cursors) self.call_count = 0 self.server_ids: List[Optional[str]] = [] def get_connection(self, server_id=None): self.call_count += 1 self.server_ids.append(server_id) if not self._cursors: raise AssertionError("PoolStub epuizat: get_connection apelat de mai multe ori decât cursori furnizați") cursor = self._cursors.pop(0) return _FakeConnCM(_FakeConn(cursor)) def _install_pool(monkeypatch, cursors: Sequence[_FakeCursor]) -> _PoolStub: """Patchează `oracle_pool.get_connection` în lookup_service.""" stub = _PoolStub(cursors) monkeypatch.setattr(lookup_service.oracle_pool, "get_connection", stub.get_connection) return stub def _install_schema(monkeypatch, schema: str = "MARIUSM_AUTO"): """Patchează `get_schema` ca să nu lovim DB pentru rezolvarea schemei.""" async def _fake_get_schema(company_id, server_id): # noqa: ARG001 return schema monkeypatch.setattr(lookup_service, "get_schema", _fake_get_schema) # ============================================================ # Reset cache între teste — OBLIGATORIU # ============================================================ @pytest.fixture(autouse=True) def _reset_caches(): lookup_service.reset_cache() reset_schema_cache() yield lookup_service.reset_cache() reset_schema_cache() # ============================================================ # get_tip_deviz # ============================================================ async def test_tip_deviz_cache_miss_then_hit(monkeypatch): """A doua chemare pentru aceeași schemă → fără query nou.""" cursor = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)]) pool = _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch, "MARIUSM_AUTO") res1 = await lookup_service.LookupService.get_tip_deviz(167) res2 = await lookup_service.LookupService.get_tip_deviz(167) assert pool.call_count == 1, "A doua chemare trebuia să vină din cache" assert res1 == res2 assert res1[0].id_tip == 1 assert res1[0].denumire == "POST GARANTIE" assert res1[0].inch_validare == 1 async def test_tip_deviz_inch_validare_null_defaults_to_zero(monkeypatch): """`inch_validare` NULL în DB → 0 (Pydantic int).""" cursor = _FakeCursor(fetchall_rows=[(2, "GARANTIE", None)]) _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) res = await lookup_service.LookupService.get_tip_deviz(167) assert res[0].inch_validare == 0 async def test_tip_deviz_different_schema_triggers_new_query(monkeypatch): """Schemă diferită (alt id_firma) → query nou (cache key e per schema).""" cur1 = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)]) cur2 = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)]) pool = _install_pool(monkeypatch, [cur1, cur2]) schemas = iter(["MARIUSM_AUTO", "ALTA_FIRMA_AUTO"]) async def _switching_schema(_company_id, _server_id): return next(schemas) monkeypatch.setattr(lookup_service, "get_schema", _switching_schema) await lookup_service.LookupService.get_tip_deviz(167) await lookup_service.LookupService.get_tip_deviz(110) assert pool.call_count == 2, "Schemă diferită ⇒ cache miss ⇒ query nou" # ============================================================ # get_masini # ============================================================ async def test_masini_cache_hit_avoids_second_query(monkeypatch): cursor = _FakeCursor(fetchall_rows=[ (101, "B-32-CTL", "DACIA", "LOGAN", 2018, "ION ION SRL"), ]) pool = _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) res1 = await lookup_service.LookupService.get_masini(167) res2 = await lookup_service.LookupService.get_masini(167) assert pool.call_count == 1 assert res1[0].id_masiniclient == 101 assert "ION ION SRL" in res1[0].label assert "B-32-CTL" in res1[0].label assert "(2018)" in res1[0].label assert res1 == res2 async def test_masini_label_handles_missing_marca_and_year(monkeypatch): """Vehicul fără marca/an: fallback labels '?' fără paranteze.""" cursor = _FakeCursor(fetchall_rows=[ (102, "CT-10-EEE", None, None, None, None), ]) _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) res = await lookup_service.LookupService.get_masini(167) assert res[0].label == "? — ?, CT-10-EEE" # ============================================================ # get_asiguratori # ============================================================ async def test_asiguratori_cache_miss_then_hit(monkeypatch): cursor = _FakeCursor(fetchall_rows=[(7, "ALLIANZ ȚIRIAC")]) pool = _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) await lookup_service.LookupService.get_asiguratori(167) res2 = await lookup_service.LookupService.get_asiguratori(167) assert pool.call_count == 1 assert res2[0].id_asigurator == 7 assert res2[0].denumire == "ALLIANZ ȚIRIAC" # ============================================================ # get_inspectori (cache key e per (schema, id_asigurator)) # ============================================================ async def test_inspectori_cache_per_asigurator(monkeypatch): """Cache cheie include id_asigurator → schimbare asigurator ⇒ query nou.""" cur1 = _FakeCursor(fetchall_rows=[(11, "POPESCU ION", 7)]) cur2 = _FakeCursor(fetchall_rows=[(22, "IONESCU MARIA", 8)]) pool = _install_pool(monkeypatch, [cur1, cur2]) _install_schema(monkeypatch) await lookup_service.LookupService.get_inspectori(7, 167) await lookup_service.LookupService.get_inspectori(7, 167) # hit await lookup_service.LookupService.get_inspectori(8, 167) # miss (alt asigurator) assert pool.call_count == 2 # ============================================================ # get_operatii # ============================================================ async def test_operatii_cache_and_timpn_null_handling(monkeypatch): """Cache hit + timpn NULL rămâne None (nu 0.0).""" cursor = _FakeCursor(fetchall_rows=[ (501, "OP-001", "Schimb ulei", 1.5), (502, "OP-002", "Aliniere", None), ]) pool = _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) res1 = await lookup_service.LookupService.get_operatii(167) res2 = await lookup_service.LookupService.get_operatii(167) assert pool.call_count == 1 assert res1 == res2 assert res1[0].timpn == 1.5 assert res1[1].timpn is None # ============================================================ # search_parteneri — LIKE escape + min 2 chars # ============================================================ async def test_search_parteneri_min_2_chars_returns_empty_without_query(monkeypatch): """q='a' (1 char) → [] FĂRĂ să atingă DB.""" pool = _install_pool(monkeypatch, []) # zero cursori — orice apel ⇒ AssertionError _install_schema(monkeypatch) res = await lookup_service.LookupService.search_parteneri("a", 167) assert res == [] assert pool.call_count == 0 async def test_search_parteneri_escapes_like_wildcards(monkeypatch): """%, _, \\ trebuie escape-uite înainte de a fi trimise în LIKE.""" cursor = _FakeCursor(fetchall_rows=[]) _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) await lookup_service.LookupService.search_parteneri("foo%bar_baz", 167) # cursor.execute(query, {"q": ...}) — verificăm al doilea pozițional args, _kwargs = cursor.execute.call_args assert args[1] == {"q": "foo\\%bar\\_baz%"}, ( f"Expected escaped LIKE arg; got {args[1]!r}" ) async def test_search_parteneri_escapes_backslash_first(monkeypatch): """Ordinea escape-ului: \\ se face prima, ca să nu dublezi escape-urile %/_ ulterioare.""" cursor = _FakeCursor(fetchall_rows=[]) _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) await lookup_service.LookupService.search_parteneri("a\\b", 167) args, _ = cursor.execute.call_args # 'a\\b' (3 chars: a, \, b) → 'a\\\\b' (a, \, \, b) + '%' assert args[1] == {"q": "a\\\\b%"} async def test_search_parteneri_returns_results(monkeypatch): """Happy path: query trimis cu suffix '%', rezultate mapate la PartenerItem.""" cursor = _FakeCursor(fetchall_rows=[ (4321, "POPESCU IMPEX SRL"), (4322, "POPESCU SERVICE"), ]) _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) res = await lookup_service.LookupService.search_parteneri("pop", 167) args, _ = cursor.execute.call_args assert args[1] == {"q": "pop%"} assert len(res) == 2 assert res[0].id_part == 4321 assert res[0].denumire == "POPESCU IMPEX SRL" # ============================================================ # get_masina_details — None pentru row lipsă # ============================================================ async def test_masina_details_returns_none_when_row_missing(monkeypatch): """Row inexistent → None (nu raise).""" cursor = _FakeCursor(fetchone_row=None) _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) res = await lookup_service.LookupService.get_masina_details(99999, 167) assert res is None async def test_masina_details_maps_row_to_pydantic(monkeypatch): """Row complet → MasinaDetails cu toate câmpurile populate.""" cursor = _FakeCursor(fetchone_row=( 101, "B-32-CTL", "DACIA", "LOGAN", 2018, "ION ION SRL", "UU1LSDA8N12345678", 1461, 90, 66, )) _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) res = await lookup_service.LookupService.get_masina_details(101, 167) assert res is not None assert res.id_masiniclient == 101 assert res.nr_inmatriculare == "B-32-CTL" assert res.marca == "DACIA" assert res.model == "LOGAN" assert res.serie_sasiu == "UU1LSDA8N12345678" assert res.cilindree == 1461 assert res.putere_cp == 90 assert res.putere_kw == 66 assert res.client_nume == "ION ION SRL" assert "DACIA LOGAN" in res.label # ============================================================ # server_id propagation # ============================================================ async def test_server_id_propagated_to_pool(monkeypatch): """server_id din JWT trebuie să ajungă la oracle_pool.get_connection.""" cursor = _FakeCursor(fetchall_rows=[]) pool = _install_pool(monkeypatch, [cursor]) _install_schema(monkeypatch) await lookup_service.LookupService.get_tip_deviz(167, server_id="mariusm_test") assert pool.server_ids == ["mariusm_test"]