From fd64cf3f1ec28ffb2c5c1e5ace61b6daefa104ad Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Mon, 13 Apr 2026 20:09:55 +0000 Subject: [PATCH] test(service-auto): unit tests multi-tenant + lookup + partener + pc_nr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Acoperire 49 tests offline (fără Oracle real): test_comanda_helpers (16): _build_pc_nr toate prefixele VFP + fallback, _build_sir_id_operatii csv + limit 4000 chars, _PREFIX_MAP regression. test_router_authorization (9): _company_id fallback JWT companies[0], 403 firmă neautorizată, 400 companies[] gol, string→int coercion; _server_id extragere din request.state. test_lookup_endpoints (15): cache hit/miss per schema pentru tip_deviz, masini, asiguratori, inspectori (per-asig), operatii; LIKE escape %/_/\; min 2 chars short-circuit; server_id propagat la get_connection. test_partener_create (9): 5 Pydantic validation (denumire min 2, id_firma ge 1, cui opțional), 4 service mocked (happy path, 409 duplicat CUI, fără CUI, lipsă GRANT → 500 log.critical). Pattern mock Oracle: fake context managers (async get_connection + sync cursor), monkeypatch pe lookup_service.get_schema (not _context, din cauza binding copy la import). Rulare: pytest backend/modules/service_auto/tests/ -q → 62 passed. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../tests/test_comanda_helpers.py | 92 +++++ .../tests/test_lookup_endpoints.py | 365 ++++++++++++++++++ .../tests/test_partener_create.py | 197 ++++++++++ .../tests/test_router_authorization.py | 89 +++++ 4 files changed, 743 insertions(+) create mode 100644 backend/modules/service_auto/tests/test_comanda_helpers.py create mode 100644 backend/modules/service_auto/tests/test_lookup_endpoints.py create mode 100644 backend/modules/service_auto/tests/test_partener_create.py create mode 100644 backend/modules/service_auto/tests/test_router_authorization.py diff --git a/backend/modules/service_auto/tests/test_comanda_helpers.py b/backend/modules/service_auto/tests/test_comanda_helpers.py new file mode 100644 index 0000000..6bca1c4 --- /dev/null +++ b/backend/modules/service_auto/tests/test_comanda_helpers.py @@ -0,0 +1,92 @@ +""" +Unit tests pentru helperii din comanda_service (fără DB, fără mocks). + +Acoperire: + - _build_pc_nr: toate prefixele VFP (tip_id=1..7) + fallback pe tip_id necunoscut + - _build_sir_id_operatii: None, empty, CSV, limit 4000 chars +""" +import pytest +from fastapi import HTTPException + +from backend.modules.service_auto.services.comanda_service import ( + _build_pc_nr, + _build_sir_id_operatii, + _MAX_OPERATII_CSV, + _PREFIX_MAP, +) + + +# ---- _build_pc_nr ---- + +@pytest.mark.parametrize("tip_id,expected_prefix", [ + (1, ""), # POST GARANTIE — fără prefix (VFP default) + (2, "G"), # GARANTIE + (3, "R"), # REGIE + (4, "P"), # PREGATIRE + (6, "PR"), # PRODUCTIE + (7, "C"), # CONSTATARE +]) +def test_pc_nr_known_tip_ids_use_vfp_prefix(tip_id, expected_prefix): + """Toate cele 6 tip_id-uri cu prefix VFP verificat (oproceduri_devize.prg).""" + nrord = _build_pc_nr(tip_id, 123, "B-32-CTL") + assert nrord == f"{expected_prefix}123/B-32-CTL" + + +def test_pc_nr_tip_5_regie_2_no_vfp_mapping_uses_empty_prefix(): + """tip_id=5 (REGIE 2) nu are mapare VFP → fallback prefix ''.""" + assert _build_pc_nr(5, 42, "B-10-ABC") == "42/B-10-ABC" + + +def test_pc_nr_unknown_tip_id_uses_empty_prefix(): + """tip_id necunoscut (ex: 99) → fallback prefix '' + warning logat.""" + assert _build_pc_nr(99, 1, "XYZ") == "1/XYZ" + + +def test_pc_nr_format_matches_vfp_structure(): + """Format final: / — nu '//'.""" + nrord = _build_pc_nr(2, 777, "CT-10-EEE") + assert nrord == "G777/CT-10-EEE" + assert "/" in nrord + assert nrord.count("/") == 1 # o singură bară + + +def test_prefix_map_covers_all_vfp_mappings(): + """Regression guard: _PREFIX_MAP nu trebuie scăpat la refactor.""" + assert _PREFIX_MAP == {1: "", 2: "G", 3: "R", 4: "P", 6: "PR", 7: "C"} + + +# ---- _build_sir_id_operatii ---- + +def test_sir_operatii_none_returns_none(): + """None → None (nu trimite param la SP).""" + assert _build_sir_id_operatii(None) is None + + +def test_sir_operatii_empty_list_returns_none(): + """Listă goală → None (echivalent cu 'fără operații').""" + assert _build_sir_id_operatii([]) is None + + +def test_sir_operatii_single_id(): + assert _build_sir_id_operatii([42]) == "42" + + +def test_sir_operatii_multiple_ids_csv(): + assert _build_sir_id_operatii([1, 2, 3]) == "1,2,3" + + +def test_sir_operatii_below_limit_passes(): + """600 ID-uri cu 2 cifre + virgulă = ~1800 chars, sub limita 4000.""" + ids = list(range(10, 110)) # 100 IDs, 3 cifre → ~400 chars + result = _build_sir_id_operatii(ids) + assert result is not None + assert len(result) < _MAX_OPERATII_CSV + + +def test_sir_operatii_over_limit_raises_422(): + """~1000 IDs cu 6 cifre → peste 4000 chars → HTTPException 422.""" + big_ids = list(range(100000, 101000)) # 1000 IDs × 7 chars (6 cifre + virgulă) = 7000 chars + with pytest.raises(HTTPException) as exc_info: + _build_sir_id_operatii(big_ids) + assert exc_info.value.status_code == 422 + assert "Prea multe" in exc_info.value.detail diff --git a/backend/modules/service_auto/tests/test_lookup_endpoints.py b/backend/modules/service_auto/tests/test_lookup_endpoints.py new file mode 100644 index 0000000..a3446c5 --- /dev/null +++ b/backend/modules/service_auto/tests/test_lookup_endpoints.py @@ -0,0 +1,365 @@ +""" +Unit tests pentru `LookupService` (mock Oracle). + +Acoperire: + - Cache hit/miss per schema (tip_deviz, masini, asiguratori, inspectori, operatii) + - LIKE escape în search_parteneri (%, _, \\ neutralizate) + - min 2 chars validation pentru search_parteneri + - get_masina_details: row absent → None + - Reset cache între teste (autouse) — atât `_cache` din lookup_service + cât și `_schema_cache` din _context. + +Niciun test nu atinge Oracle real: `oracle_pool.get_connection` și `get_schema` +sunt monkeypatched. Stilul urmează `test_comanda_helpers.py` (pytest-asyncio +auto-mode din pyproject.toml — fără decoratori). +""" +from typing import List, Optional, Sequence +from unittest.mock import MagicMock + +import pytest + +from backend.modules.service_auto.services import lookup_service +from backend.modules.service_auto.services._context import reset_schema_cache + + +# ============================================================ +# Fakes pentru oracle_pool.get_connection +# ============================================================ + +class _FakeCursor: + """Cursor sincron: __enter__/__exit__ + execute/fetchall/fetchone.""" + + def __init__(self, fetchall_rows=None, fetchone_row=None): + self._fetchall_rows = fetchall_rows if fetchall_rows is not None else [] + self._fetchone_row = fetchone_row + self.execute = MagicMock() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def fetchall(self): + return self._fetchall_rows + + def fetchone(self): + return self._fetchone_row + + +class _FakeConn: + def __init__(self, cursor: _FakeCursor): + self._cursor = cursor + + def cursor(self): + return self._cursor + + +class _FakeConnCM: + """Async context manager imitând `@asynccontextmanager` din oracle_pool.""" + + def __init__(self, conn: _FakeConn): + self._conn = conn + + async def __aenter__(self): + return self._conn + + async def __aexit__(self, exc_type, exc, tb): + return False + + +class _PoolStub: + """ + Înlocuiește `oracle_pool.get_connection` — întoarce cursori în ordinea dată. + Numără apelurile (per server_id, dacă vrem să verificăm propagarea). + """ + + def __init__(self, cursors: Sequence[_FakeCursor]): + self._cursors = list(cursors) + self.call_count = 0 + self.server_ids: List[Optional[str]] = [] + + def get_connection(self, server_id=None): + self.call_count += 1 + self.server_ids.append(server_id) + if not self._cursors: + raise AssertionError("PoolStub epuizat: get_connection apelat de mai multe ori decât cursori furnizați") + cursor = self._cursors.pop(0) + return _FakeConnCM(_FakeConn(cursor)) + + +def _install_pool(monkeypatch, cursors: Sequence[_FakeCursor]) -> _PoolStub: + """Patchează `oracle_pool.get_connection` în lookup_service.""" + stub = _PoolStub(cursors) + monkeypatch.setattr(lookup_service.oracle_pool, "get_connection", stub.get_connection) + return stub + + +def _install_schema(monkeypatch, schema: str = "MARIUSM_AUTO"): + """Patchează `get_schema` ca să nu lovim DB pentru rezolvarea schemei.""" + async def _fake_get_schema(company_id, server_id): # noqa: ARG001 + return schema + monkeypatch.setattr(lookup_service, "get_schema", _fake_get_schema) + + +# ============================================================ +# Reset cache între teste — OBLIGATORIU +# ============================================================ + +@pytest.fixture(autouse=True) +def _reset_caches(): + lookup_service.reset_cache() + reset_schema_cache() + yield + lookup_service.reset_cache() + reset_schema_cache() + + +# ============================================================ +# get_tip_deviz +# ============================================================ + +async def test_tip_deviz_cache_miss_then_hit(monkeypatch): + """A doua chemare pentru aceeași schemă → fără query nou.""" + cursor = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)]) + pool = _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch, "MARIUSM_AUTO") + + res1 = await lookup_service.LookupService.get_tip_deviz(167) + res2 = await lookup_service.LookupService.get_tip_deviz(167) + + assert pool.call_count == 1, "A doua chemare trebuia să vină din cache" + assert res1 == res2 + assert res1[0].id_tip == 1 + assert res1[0].denumire == "POST GARANTIE" + assert res1[0].inch_validare == 1 + + +async def test_tip_deviz_inch_validare_null_defaults_to_zero(monkeypatch): + """`inch_validare` NULL în DB → 0 (Pydantic int).""" + cursor = _FakeCursor(fetchall_rows=[(2, "GARANTIE", None)]) + _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + res = await lookup_service.LookupService.get_tip_deviz(167) + assert res[0].inch_validare == 0 + + +async def test_tip_deviz_different_schema_triggers_new_query(monkeypatch): + """Schemă diferită (alt id_firma) → query nou (cache key e per schema).""" + cur1 = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)]) + cur2 = _FakeCursor(fetchall_rows=[(1, "POST GARANTIE", 1)]) + pool = _install_pool(monkeypatch, [cur1, cur2]) + + schemas = iter(["MARIUSM_AUTO", "ALTA_FIRMA_AUTO"]) + async def _switching_schema(_company_id, _server_id): + return next(schemas) + monkeypatch.setattr(lookup_service, "get_schema", _switching_schema) + + await lookup_service.LookupService.get_tip_deviz(167) + await lookup_service.LookupService.get_tip_deviz(110) + + assert pool.call_count == 2, "Schemă diferită ⇒ cache miss ⇒ query nou" + + +# ============================================================ +# get_masini +# ============================================================ + +async def test_masini_cache_hit_avoids_second_query(monkeypatch): + cursor = _FakeCursor(fetchall_rows=[ + (101, "B-32-CTL", "DACIA", "LOGAN", 2018, "ION ION SRL"), + ]) + pool = _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + res1 = await lookup_service.LookupService.get_masini(167) + res2 = await lookup_service.LookupService.get_masini(167) + + assert pool.call_count == 1 + assert res1[0].id_masiniclient == 101 + assert "ION ION SRL" in res1[0].label + assert "B-32-CTL" in res1[0].label + assert "(2018)" in res1[0].label + assert res1 == res2 + + +async def test_masini_label_handles_missing_marca_and_year(monkeypatch): + """Vehicul fără marca/an: fallback labels '?' fără paranteze.""" + cursor = _FakeCursor(fetchall_rows=[ + (102, "CT-10-EEE", None, None, None, None), + ]) + _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + res = await lookup_service.LookupService.get_masini(167) + assert res[0].label == "? — ?, CT-10-EEE" + + +# ============================================================ +# get_asiguratori +# ============================================================ + +async def test_asiguratori_cache_miss_then_hit(monkeypatch): + cursor = _FakeCursor(fetchall_rows=[(7, "ALLIANZ ȚIRIAC")]) + pool = _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + await lookup_service.LookupService.get_asiguratori(167) + res2 = await lookup_service.LookupService.get_asiguratori(167) + + assert pool.call_count == 1 + assert res2[0].id_asigurator == 7 + assert res2[0].denumire == "ALLIANZ ȚIRIAC" + + +# ============================================================ +# get_inspectori (cache key e per (schema, id_asigurator)) +# ============================================================ + +async def test_inspectori_cache_per_asigurator(monkeypatch): + """Cache cheie include id_asigurator → schimbare asigurator ⇒ query nou.""" + cur1 = _FakeCursor(fetchall_rows=[(11, "POPESCU ION", 7)]) + cur2 = _FakeCursor(fetchall_rows=[(22, "IONESCU MARIA", 8)]) + pool = _install_pool(monkeypatch, [cur1, cur2]) + _install_schema(monkeypatch) + + await lookup_service.LookupService.get_inspectori(7, 167) + await lookup_service.LookupService.get_inspectori(7, 167) # hit + await lookup_service.LookupService.get_inspectori(8, 167) # miss (alt asigurator) + + assert pool.call_count == 2 + + +# ============================================================ +# get_operatii +# ============================================================ + +async def test_operatii_cache_and_timpn_null_handling(monkeypatch): + """Cache hit + timpn NULL rămâne None (nu 0.0).""" + cursor = _FakeCursor(fetchall_rows=[ + (501, "OP-001", "Schimb ulei", 1.5), + (502, "OP-002", "Aliniere", None), + ]) + pool = _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + res1 = await lookup_service.LookupService.get_operatii(167) + res2 = await lookup_service.LookupService.get_operatii(167) + + assert pool.call_count == 1 + assert res1 == res2 + assert res1[0].timpn == 1.5 + assert res1[1].timpn is None + + +# ============================================================ +# search_parteneri — LIKE escape + min 2 chars +# ============================================================ + +async def test_search_parteneri_min_2_chars_returns_empty_without_query(monkeypatch): + """q='a' (1 char) → [] FĂRĂ să atingă DB.""" + pool = _install_pool(monkeypatch, []) # zero cursori — orice apel ⇒ AssertionError + _install_schema(monkeypatch) + + res = await lookup_service.LookupService.search_parteneri("a", 167) + assert res == [] + assert pool.call_count == 0 + + +async def test_search_parteneri_escapes_like_wildcards(monkeypatch): + """%, _, \\ trebuie escape-uite înainte de a fi trimise în LIKE.""" + cursor = _FakeCursor(fetchall_rows=[]) + _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + await lookup_service.LookupService.search_parteneri("foo%bar_baz", 167) + + # cursor.execute(query, {"q": ...}) — verificăm al doilea pozițional + args, _kwargs = cursor.execute.call_args + assert args[1] == {"q": "foo\\%bar\\_baz%"}, ( + f"Expected escaped LIKE arg; got {args[1]!r}" + ) + + +async def test_search_parteneri_escapes_backslash_first(monkeypatch): + """Ordinea escape-ului: \\ se face prima, ca să nu dublezi escape-urile %/_ ulterioare.""" + cursor = _FakeCursor(fetchall_rows=[]) + _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + await lookup_service.LookupService.search_parteneri("a\\b", 167) + + args, _ = cursor.execute.call_args + # 'a\\b' (3 chars: a, \, b) → 'a\\\\b' (a, \, \, b) + '%' + assert args[1] == {"q": "a\\\\b%"} + + +async def test_search_parteneri_returns_results(monkeypatch): + """Happy path: query trimis cu suffix '%', rezultate mapate la PartenerItem.""" + cursor = _FakeCursor(fetchall_rows=[ + (4321, "POPESCU IMPEX SRL"), + (4322, "POPESCU SERVICE"), + ]) + _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + res = await lookup_service.LookupService.search_parteneri("pop", 167) + + args, _ = cursor.execute.call_args + assert args[1] == {"q": "pop%"} + assert len(res) == 2 + assert res[0].id_part == 4321 + assert res[0].denumire == "POPESCU IMPEX SRL" + + +# ============================================================ +# get_masina_details — None pentru row lipsă +# ============================================================ + +async def test_masina_details_returns_none_when_row_missing(monkeypatch): + """Row inexistent → None (nu raise).""" + cursor = _FakeCursor(fetchone_row=None) + _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + res = await lookup_service.LookupService.get_masina_details(99999, 167) + assert res is None + + +async def test_masina_details_maps_row_to_pydantic(monkeypatch): + """Row complet → MasinaDetails cu toate câmpurile populate.""" + cursor = _FakeCursor(fetchone_row=( + 101, "B-32-CTL", "DACIA", "LOGAN", 2018, "ION ION SRL", + "UU1LSDA8N12345678", 1461, 90, 66, + )) + _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + res = await lookup_service.LookupService.get_masina_details(101, 167) + assert res is not None + assert res.id_masiniclient == 101 + assert res.nr_inmatriculare == "B-32-CTL" + assert res.marca == "DACIA" + assert res.model == "LOGAN" + assert res.serie_sasiu == "UU1LSDA8N12345678" + assert res.cilindree == 1461 + assert res.putere_cp == 90 + assert res.putere_kw == 66 + assert res.client_nume == "ION ION SRL" + assert "DACIA LOGAN" in res.label + + +# ============================================================ +# server_id propagation +# ============================================================ + +async def test_server_id_propagated_to_pool(monkeypatch): + """server_id din JWT trebuie să ajungă la oracle_pool.get_connection.""" + cursor = _FakeCursor(fetchall_rows=[]) + pool = _install_pool(monkeypatch, [cursor]) + _install_schema(monkeypatch) + + await lookup_service.LookupService.get_tip_deviz(167, server_id="mariusm_test") + + assert pool.server_ids == ["mariusm_test"] diff --git a/backend/modules/service_auto/tests/test_partener_create.py b/backend/modules/service_auto/tests/test_partener_create.py new file mode 100644 index 0000000..38fd87f --- /dev/null +++ b/backend/modules/service_auto/tests/test_partener_create.py @@ -0,0 +1,197 @@ +""" +Unit tests pentru creare partener nou: + - Validare PartnerCreateRequest (denumire min_length=2, id_firma ge=1) + - LookupService.create_partener — happy path + duplicat CUI (409) + lipsă GRANT (500) + +Folosește mock pentru oracle_pool și _context.get_schema (fără DB). +""" +from unittest.mock import AsyncMock, MagicMock, patch + +import oracledb +import pytest +from fastapi import HTTPException +from pydantic import ValidationError + +from backend.modules.service_auto.schemas.comanda import PartnerCreateRequest +from backend.modules.service_auto.services.lookup_service import LookupService + + +# ---- PartnerCreateRequest validation ---- + +def test_partner_request_denumire_too_short_raises(): + """denumire cu 1 caracter → ValidationError (min_length=2).""" + with pytest.raises(ValidationError) as exc: + PartnerCreateRequest(denumire="X", id_firma=167) + assert "denumire" in str(exc.value).lower() + + +def test_partner_request_denumire_empty_raises(): + """denumire goală → ValidationError.""" + with pytest.raises(ValidationError): + PartnerCreateRequest(denumire="", id_firma=167) + + +def test_partner_request_minimal_valid(): + """Doar denumire + id_firma → CUI și adresa optionale = None.""" + req = PartnerCreateRequest(denumire="ACME SRL", id_firma=167) + assert req.denumire == "ACME SRL" + assert req.cui is None + assert req.adresa is None + assert req.id_firma == 167 + + +def test_partner_request_full(): + req = PartnerCreateRequest( + denumire="ACME SRL", + cui="RO12345678", + adresa="Str. Exemplu nr. 1, București", + id_firma=167, + ) + assert req.cui == "RO12345678" + assert req.adresa is not None and req.adresa.startswith("Str.") + + +def test_partner_request_id_firma_zero_raises(): + """id_firma=0 → ValidationError (ge=1).""" + with pytest.raises(ValidationError): + PartnerCreateRequest(denumire="ACME", id_firma=0) + + +# ---- LookupService.create_partener (mocked) ---- + +def _make_pool_ctx(cursor_mock): + """ + Construiește un context manager async pentru oracle_pool.get_connection. + Returnează: pool_mock cu .get_connection() → async ctx → conn cu .cursor() + sync ctx care returnează cursor_mock. + """ + conn_mock = MagicMock() + conn_mock.cursor.return_value.__enter__.return_value = cursor_mock + conn_mock.cursor.return_value.__exit__.return_value = None + conn_mock.commit = MagicMock() + + async_ctx = MagicMock() + async_ctx.__aenter__ = AsyncMock(return_value=conn_mock) + async_ctx.__aexit__ = AsyncMock(return_value=None) + + pool_mock = MagicMock() + pool_mock.get_connection = MagicMock(return_value=async_ctx) + return pool_mock, conn_mock + + +@pytest.mark.asyncio +async def test_create_partener_happy_path(): + """ + Cazul nominal: + - Pre-check CUI: nicio coliziune (fetchone() → None) + - SELECT MAX(id_part)+1 → 4242 + - INSERT reușește; conn.commit() apelat; întoarce PartenerItem. + """ + cursor = MagicMock() + # fetchone secvență: pre-check CUI (None), SELECT MAX (4242,) + cursor.fetchone.side_effect = [None, (4242,)] + cursor.execute = MagicMock() + + pool_mock, conn_mock = _make_pool_ctx(cursor) + + with patch( + "backend.modules.service_auto.services.lookup_service.oracle_pool", + pool_mock, + ), patch( + "backend.modules.service_auto.services.lookup_service.get_schema", + new=AsyncMock(return_value="MARIUSM_AUTO"), + ): + req = PartnerCreateRequest( + denumire="ACME SRL", cui="RO12345678", adresa="Str. X", id_firma=167, + ) + result = await LookupService.create_partener(req, server_id="mariusm_test") + + assert result.id_part == 4242 + assert result.denumire == "ACME SRL" + conn_mock.commit.assert_called_once() + + +@pytest.mark.asyncio +async def test_create_partener_duplicate_cui_raises_409(): + """Pre-check CUI găsește rând existent → HTTPException 409, NU INSERT.""" + cursor = MagicMock() + cursor.fetchone.return_value = (1,) # CUI deja există + cursor.execute = MagicMock() + + pool_mock, conn_mock = _make_pool_ctx(cursor) + + with patch( + "backend.modules.service_auto.services.lookup_service.oracle_pool", + pool_mock, + ), patch( + "backend.modules.service_auto.services.lookup_service.get_schema", + new=AsyncMock(return_value="MARIUSM_AUTO"), + ): + req = PartnerCreateRequest( + denumire="ACME SRL", cui="RO12345678", id_firma=167, + ) + with pytest.raises(HTTPException) as exc: + await LookupService.create_partener(req, server_id="mariusm_test") + + assert exc.value.status_code == 409 + assert "CUI" in exc.value.detail + conn_mock.commit.assert_not_called() + + +@pytest.mark.asyncio +async def test_create_partener_no_cui_skips_precheck(): + """Fără CUI → pre-check sărit, doar SELECT MAX + INSERT.""" + cursor = MagicMock() + cursor.fetchone.side_effect = [(99,)] # doar SELECT MAX + cursor.execute = MagicMock() + + pool_mock, conn_mock = _make_pool_ctx(cursor) + + with patch( + "backend.modules.service_auto.services.lookup_service.oracle_pool", + pool_mock, + ), patch( + "backend.modules.service_auto.services.lookup_service.get_schema", + new=AsyncMock(return_value="MARIUSM_AUTO"), + ): + req = PartnerCreateRequest(denumire="Persoană fizică", id_firma=167) + result = await LookupService.create_partener(req, server_id=None) + + assert result.id_part == 99 + conn_mock.commit.assert_called_once() + + +@pytest.mark.asyncio +async def test_create_partener_missing_grant_raises_500(): + """ORA-01031 (lipsă INSERT privilege) → HTTPException 500 cu mesaj clar.""" + cursor = MagicMock() + # CUI furnizat → fetchone secvență: pre-check (None=fără duplicat), SELECT MAX (1,) + cursor.fetchone.side_effect = [None, (1,)] + # INSERT primește ORA-01031 + err = oracledb.DatabaseError() + err.args = (MagicMock(code=1031, message="ORA-01031: insufficient privileges"),) + + def execute_side_effect(sql, *args, **kw): + del args, kw + if "INSERT" in sql.upper(): + raise err + cursor.execute.side_effect = execute_side_effect + + pool_mock, conn_mock = _make_pool_ctx(cursor) + + with patch( + "backend.modules.service_auto.services.lookup_service.oracle_pool", + pool_mock, + ), patch( + "backend.modules.service_auto.services.lookup_service.get_schema", + new=AsyncMock(return_value="MARIUSM_AUTO"), + ): + req = PartnerCreateRequest( + denumire="ACME SRL", cui="RO99999999", id_firma=167, + ) + with pytest.raises(HTTPException) as exc: + await LookupService.create_partener(req, server_id="mariusm_test") + + assert exc.value.status_code == 500 + assert "privilegii" in exc.value.detail.lower() + conn_mock.commit.assert_not_called() diff --git a/backend/modules/service_auto/tests/test_router_authorization.py b/backend/modules/service_auto/tests/test_router_authorization.py new file mode 100644 index 0000000..587a369 --- /dev/null +++ b/backend/modules/service_auto/tests/test_router_authorization.py @@ -0,0 +1,89 @@ +""" +Unit tests pentru _company_id() și _server_id() din routers/comanda.py. + +Acoperă izolarea multi-tenant: + - fallback la JWT companies[0] când nu e specificat id_firma + - 403 dacă id_firma nu e în JWT companies[] + - 400 dacă JWT nu are nicio firmă + - extragere server_id din request.state +""" +from types import SimpleNamespace + +import pytest +from fastapi import HTTPException + +from backend.modules.service_auto.routers.comanda import _company_id, _server_id + + +def _user(companies, username="MARIUS M", user_id=1): + """Construiește un CurrentUser minimal pentru teste (duck typing).""" + return SimpleNamespace( + username=username, + user_id=user_id, + companies=companies, + permissions=["read", "write"], + ) + + +# ---- _company_id ---- + +def test_company_id_explicit_in_allowed_list_passes(): + """id_firma explicit + în JWT → OK.""" + user = _user(["110", "167", "169"]) + assert _company_id(user, 167) == 167 + + +def test_company_id_explicit_not_in_allowed_raises_403(): + """id_firma explicit NU în JWT → 403.""" + user = _user(["110", "167"]) + with pytest.raises(HTTPException) as exc: + _company_id(user, 999) + assert exc.value.status_code == 403 + assert "neautorizat" in exc.value.detail.lower() + + +def test_company_id_none_falls_back_to_first_company(): + """Fără id_firma → prima firmă din JWT companies[].""" + user = _user(["167", "110", "169"]) + assert _company_id(user, None) == 167 + + +def test_company_id_empty_companies_raises_400(): + """JWT fără companies[] → 400 (nu putem alege firmă implicită).""" + user = _user([]) + with pytest.raises(HTTPException) as exc: + _company_id(user, None) + assert exc.value.status_code == 400 + + +def test_company_id_string_companies_converted_to_int(): + """JWT stochează companies[] ca list[str]; comparația se face pe int.""" + user = _user(["110", "167", "169"]) + # comparație cu int funcționează + assert _company_id(user, 110) == 110 + + +def test_company_id_accepts_string_id_from_first_company(): + """Prima firmă e string în JWT → e convertită corect la int.""" + user = _user(["42"]) + assert _company_id(user, None) == 42 + + +# ---- _server_id ---- + +def test_server_id_from_request_state(): + """Extragere server_id injectat de AuthenticationMiddleware.""" + request = SimpleNamespace(state=SimpleNamespace(server_id="mariusm_test")) + assert _server_id(request) == "mariusm_test" + + +def test_server_id_none_when_missing(): + """request.state fără server_id → None (pool folosește primul server).""" + request = SimpleNamespace(state=SimpleNamespace()) + assert _server_id(request) is None + + +def test_server_id_none_when_explicit_none(): + """server_id explicit None în state → None.""" + request = SimpleNamespace(state=SimpleNamespace(server_id=None)) + assert _server_id(request) is None