fix(import): 3 production bugs — items cache, CUI lookup, ANAF name
1. SQLite order_items overwrite on re-import (VELA CAFE #484669620): add_order_items, save_orders_batch, mark_order_deleted_in_roa now use DELETE + INSERT so GoMag quantity changes propagate to dashboard. 2. PL/SQL strict CUI lookup tolerates whitespace (FG COFFE #485065210): cauta_partener_dupa_cod_fiscal regex ^RO\d → ^RO\s*\d; IN-set uses canonical v_ro_cui. Platitor/neplatitor business rule preserved. Python defensive: re.sub whitespace collapse in determine_partner_data. 3. New PJ partners use ANAF official denumire (denumire_override) instead of GoMag company_name. Existing partners (found by CUI) untouched. Tests: 18 new (5 SQLite unit, 8 Python unit, 5 Oracle PL/SQL). All green locally: 228 unit + 26 oracle + 33 e2e. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
196
api/tests/test_order_items_overwrite.py
Normal file
196
api/tests/test_order_items_overwrite.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""
|
||||
Order Items Overwrite Regression Tests
|
||||
========================================
|
||||
Re-import must replace SQLite order_items (not INSERT OR IGNORE) so quantity
|
||||
changes in GoMag propagate to the dashboard. Regression for VELA CAFE #484669620.
|
||||
|
||||
Also: soft-delete (mark_order_deleted_in_roa) must purge stale items.
|
||||
|
||||
Run:
|
||||
cd api && python -m pytest tests/test_order_items_overwrite.py -v
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
# --- Set env vars BEFORE any app import ---
|
||||
_tmpdir = tempfile.mkdtemp()
|
||||
_sqlite_path = os.path.join(_tmpdir, "test_items.db")
|
||||
|
||||
os.environ.setdefault("FORCE_THIN_MODE", "true")
|
||||
os.environ.setdefault("SQLITE_DB_PATH", _sqlite_path)
|
||||
os.environ.setdefault("ORACLE_DSN", "dummy")
|
||||
os.environ.setdefault("ORACLE_USER", "dummy")
|
||||
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
|
||||
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
|
||||
|
||||
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
if _api_dir not in sys.path:
|
||||
sys.path.insert(0, _api_dir)
|
||||
|
||||
from app import database
|
||||
from app.services import sqlite_service
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
async def _init_db():
|
||||
database.init_sqlite()
|
||||
# Clean state before each test
|
||||
db = await sqlite_service.get_sqlite()
|
||||
try:
|
||||
await db.execute("DELETE FROM order_items")
|
||||
await db.execute("DELETE FROM sync_run_orders")
|
||||
await db.execute("DELETE FROM orders")
|
||||
await db.execute("DELETE FROM sync_runs")
|
||||
await db.commit()
|
||||
finally:
|
||||
await db.close()
|
||||
yield
|
||||
|
||||
|
||||
def _item(sku="SKU1", qty=1.0, price=10.0):
|
||||
return {
|
||||
"sku": sku, "product_name": f"Product {sku}",
|
||||
"quantity": qty, "price": price, "baseprice": price,
|
||||
"vat": 19, "mapping_status": "direct", "codmat": None,
|
||||
"id_articol": None, "cantitate_roa": None,
|
||||
}
|
||||
|
||||
|
||||
async def _seed_order(order_number="TEST-001"):
|
||||
"""Create an orders row so FK constraints (if any) pass."""
|
||||
await sqlite_service.upsert_order(
|
||||
sync_run_id="test-run",
|
||||
order_number=order_number,
|
||||
order_date="2026-01-01",
|
||||
customer_name="Test",
|
||||
status="IMPORTED",
|
||||
)
|
||||
|
||||
|
||||
async def _items_for(order_number):
|
||||
return await sqlite_service.get_order_items(order_number)
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# add_order_items — replace semantics
|
||||
# ===========================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_order_items_deletes_before_insert():
|
||||
"""Re-import with changed quantities must overwrite, not preserve old rows."""
|
||||
await _seed_order("ORD-A")
|
||||
# Initial import: 3 items
|
||||
await sqlite_service.add_order_items("ORD-A", [
|
||||
_item("SKU1", qty=5), _item("SKU2", qty=10), _item("SKU3", qty=2),
|
||||
])
|
||||
rows = await _items_for("ORD-A")
|
||||
assert len(rows) == 3
|
||||
|
||||
# Re-import: only 2 items, different quantities (simulates user edit in GoMag)
|
||||
await sqlite_service.add_order_items("ORD-A", [
|
||||
_item("SKU1", qty=99), _item("SKU4", qty=1),
|
||||
])
|
||||
rows = await _items_for("ORD-A")
|
||||
skus = {r["sku"]: r["quantity"] for r in rows}
|
||||
assert skus == {"SKU1": 99, "SKU4": 1}, f"old rows leaked: {skus}"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_order_items_empty_list_no_delete():
|
||||
"""Empty list is a no-op — existing items must remain (early return)."""
|
||||
await _seed_order("ORD-B")
|
||||
await sqlite_service.add_order_items("ORD-B", [_item("SKU1", qty=5)])
|
||||
await sqlite_service.add_order_items("ORD-B", []) # should not wipe
|
||||
rows = await _items_for("ORD-B")
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["sku"] == "SKU1"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_order_items_isolation_between_orders():
|
||||
"""add_order_items on ORD-A must not affect ORD-B items."""
|
||||
await _seed_order("ORD-A")
|
||||
await _seed_order("ORD-B")
|
||||
await sqlite_service.add_order_items("ORD-A", [_item("SKU1", qty=5)])
|
||||
await sqlite_service.add_order_items("ORD-B", [_item("SKU2", qty=7)])
|
||||
# Re-import A
|
||||
await sqlite_service.add_order_items("ORD-A", [_item("SKU1", qty=99)])
|
||||
rows_a = await _items_for("ORD-A")
|
||||
rows_b = await _items_for("ORD-B")
|
||||
assert len(rows_a) == 1 and rows_a[0]["quantity"] == 99
|
||||
assert len(rows_b) == 1 and rows_b[0]["quantity"] == 7
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# save_orders_batch — replace semantics for batch flow
|
||||
# ===========================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_save_orders_batch_overwrite():
|
||||
"""save_orders_batch must also replace existing items for re-run order numbers."""
|
||||
await _seed_order("ORD-BATCH")
|
||||
await sqlite_service.add_order_items("ORD-BATCH", [
|
||||
_item("SKU_OLD", qty=1),
|
||||
])
|
||||
assert len(await _items_for("ORD-BATCH")) == 1
|
||||
|
||||
batch = [{
|
||||
"sync_run_id": "run-1",
|
||||
"order_number": "ORD-BATCH",
|
||||
"status_at_run": "PENDING",
|
||||
"order_date": "2026-01-02",
|
||||
"customer_name": "Batch",
|
||||
"status": "PENDING",
|
||||
"items": [_item("SKU_NEW_1", qty=3), _item("SKU_NEW_2", qty=4)],
|
||||
}]
|
||||
# save_orders_batch requires sync_runs row first
|
||||
db = await sqlite_service.get_sqlite()
|
||||
try:
|
||||
await db.execute(
|
||||
"INSERT OR IGNORE INTO sync_runs (run_id, started_at, status) VALUES (?, datetime('now'), 'running')",
|
||||
("run-1",),
|
||||
)
|
||||
await db.commit()
|
||||
finally:
|
||||
await db.close()
|
||||
|
||||
await sqlite_service.save_orders_batch(batch)
|
||||
rows = await _items_for("ORD-BATCH")
|
||||
skus = {r["sku"] for r in rows}
|
||||
assert skus == {"SKU_NEW_1", "SKU_NEW_2"}, f"old items leaked: {skus}"
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# mark_order_deleted_in_roa — must purge items
|
||||
# ===========================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mark_order_deleted_removes_items():
|
||||
"""Soft-delete must remove order_items (no ghost rows)."""
|
||||
await _seed_order("ORD-DEL")
|
||||
await sqlite_service.add_order_items("ORD-DEL", [
|
||||
_item("SKU1", qty=5), _item("SKU2", qty=3),
|
||||
])
|
||||
assert len(await _items_for("ORD-DEL")) == 2
|
||||
|
||||
await sqlite_service.mark_order_deleted_in_roa("ORD-DEL")
|
||||
|
||||
# Items purged
|
||||
assert await _items_for("ORD-DEL") == []
|
||||
|
||||
# Orders row still present with DELETED_IN_ROA status (not hard-deleted)
|
||||
db = await sqlite_service.get_sqlite()
|
||||
try:
|
||||
cur = await db.execute("SELECT status, id_comanda FROM orders WHERE order_number = ?", ("ORD-DEL",))
|
||||
row = await cur.fetchone()
|
||||
finally:
|
||||
await db.close()
|
||||
assert row is not None
|
||||
assert row["status"] == "DELETED_IN_ROA"
|
||||
assert row["id_comanda"] is None
|
||||
215
api/tests/test_partner_anaf_override.py
Normal file
215
api/tests/test_partner_anaf_override.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""
|
||||
ANAF denumire_override Regression Tests
|
||||
========================================
|
||||
When creating a new PJ partner, use the official ANAF name (denumire_anaf)
|
||||
instead of the (potentially misspelled) GoMag company_name.
|
||||
|
||||
Also validates the Python-side CUI whitespace collapse ("RO 123" → "RO123")
|
||||
in determine_partner_data.
|
||||
|
||||
Run:
|
||||
cd api && python -m pytest tests/test_anaf_name_override.py -v
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
# Only set env vars that don't exist yet — avoid polluting pydantic Settings
|
||||
# singleton if another test file loaded first (test_app_basic sets SQLITE_DB_PATH).
|
||||
_tmpdir = tempfile.mkdtemp()
|
||||
os.environ.setdefault("FORCE_THIN_MODE", "true")
|
||||
os.environ.setdefault("SQLITE_DB_PATH", os.path.join(_tmpdir, "test_anaf.db"))
|
||||
os.environ.setdefault("ORACLE_DSN", "dummy")
|
||||
os.environ.setdefault("ORACLE_USER", "dummy")
|
||||
os.environ.setdefault("ORACLE_PASSWORD", "dummy")
|
||||
os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir)
|
||||
|
||||
_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
if _api_dir not in sys.path:
|
||||
sys.path.insert(0, _api_dir)
|
||||
|
||||
from app.services.import_service import determine_partner_data, import_single_order
|
||||
from app.services.order_reader import OrderBilling, OrderShipping, OrderData, OrderItem
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Helpers
|
||||
# ===========================================================================
|
||||
|
||||
def _make_pj_order(company_name="SC GOMAG NAME SRL", company_code="RO34963277"):
|
||||
billing = OrderBilling(
|
||||
firstname="Ion", lastname="Contact", phone="0700", email="c@e.ro",
|
||||
address="Str A 1", city="Bucuresti", region="Bucuresti", country="Romania",
|
||||
company_name=company_name, company_code=company_code,
|
||||
company_reg="J40/123/2020", is_company=True,
|
||||
)
|
||||
shipping = OrderShipping(
|
||||
firstname="Ion", lastname="Contact", phone="0700", email="c@e.ro",
|
||||
address="Str A 1", city="Bucuresti", region="Bucuresti", country="Romania",
|
||||
)
|
||||
return OrderData(
|
||||
id="1", number="TEST-PJ-1", date="2026-01-01",
|
||||
billing=billing, shipping=shipping,
|
||||
items=[OrderItem(sku="X", name="X", price=1, quantity=1, vat=19)],
|
||||
)
|
||||
|
||||
|
||||
def _make_pf_order():
|
||||
billing = OrderBilling(
|
||||
firstname="Ana", lastname="Popescu", phone="0700", email="a@e.ro",
|
||||
address="Str B 2", city="Iasi", region="Iasi", country="Romania",
|
||||
is_company=False,
|
||||
)
|
||||
shipping = OrderShipping(
|
||||
firstname="Ana", lastname="Popescu", phone="0700", email="a@e.ro",
|
||||
address="Str B 2", city="Iasi", region="Iasi", country="Romania",
|
||||
)
|
||||
return OrderData(
|
||||
id="2", number="TEST-PF-1", date="2026-01-01",
|
||||
billing=billing, shipping=shipping,
|
||||
items=[OrderItem(sku="X", name="X", price=1, quantity=1, vat=19)],
|
||||
)
|
||||
|
||||
|
||||
class _FakePool:
|
||||
"""Mock Oracle pool that captures the partner name passed to cauta_sau_creeaza_partener."""
|
||||
|
||||
def __init__(self, partner_id=777):
|
||||
self.partner_id = partner_id
|
||||
self.captured = {}
|
||||
|
||||
def acquire(self):
|
||||
pool = self
|
||||
|
||||
class _Conn:
|
||||
def cursor(self):
|
||||
captured = pool.captured
|
||||
pid = pool.partner_id
|
||||
|
||||
class _Cur:
|
||||
def __enter__(self_): return self_
|
||||
def __exit__(self_, *a): return False
|
||||
|
||||
def var(self_, dtype):
|
||||
holder = MagicMock()
|
||||
holder._value = None
|
||||
holder.getvalue = lambda: holder._value
|
||||
def setvalue(v): holder._value = v
|
||||
holder.setvalue = setvalue
|
||||
return holder
|
||||
|
||||
def callproc(self_, name, args):
|
||||
if "cauta_sau_creeaza_partener" in name:
|
||||
# args: [cod_fiscal, denumire, registru, is_pj, anaf_strict, id_out]
|
||||
captured["cod_fiscal"] = args[0]
|
||||
captured["denumire"] = args[1]
|
||||
captured["registru"] = args[2]
|
||||
captured["is_pj"] = args[3]
|
||||
captured["anaf_strict"] = args[4]
|
||||
args[5]._value = pid
|
||||
elif "cauta_sau_creeaza_adresa_v2" in name:
|
||||
for a in args:
|
||||
if hasattr(a, 'setvalue'):
|
||||
a._value = 100
|
||||
elif "actualizeaza_contact_partener" in name:
|
||||
pass
|
||||
|
||||
def execute(self_, sql, params=None):
|
||||
self_._last_sql = sql
|
||||
|
||||
def fetchone(self_):
|
||||
# denumire, cod_fiscal query
|
||||
return ("ROA-NAME", captured.get("cod_fiscal"))
|
||||
|
||||
def fetchall(self_):
|
||||
return []
|
||||
|
||||
return _Cur()
|
||||
|
||||
def commit(self_): pass
|
||||
def rollback(self_): pass
|
||||
|
||||
return _Conn()
|
||||
|
||||
def release(self, conn):
|
||||
pass
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# determine_partner_data — CUI whitespace collapse (FIX 2b Python side)
|
||||
# ===========================================================================
|
||||
|
||||
class TestDeterminePartnerData:
|
||||
def test_cui_collapses_whitespace(self):
|
||||
"""'RO 34963277' → 'RO34963277' (defensive belt+suspenders with PL/SQL fix)."""
|
||||
order = _make_pj_order(company_code="RO 34963277")
|
||||
data = determine_partner_data(order)
|
||||
assert data["cod_fiscal"] == "RO34963277"
|
||||
|
||||
def test_cui_multiple_spaces_collapsed(self):
|
||||
order = _make_pj_order(company_code=" RO 34963277 ")
|
||||
data = determine_partner_data(order)
|
||||
assert data["cod_fiscal"] == "RO34963277"
|
||||
|
||||
def test_cui_no_space_unchanged(self):
|
||||
order = _make_pj_order(company_code="RO34963277")
|
||||
data = determine_partner_data(order)
|
||||
assert data["cod_fiscal"] == "RO34963277"
|
||||
|
||||
def test_cui_none_for_pf(self):
|
||||
order = _make_pf_order()
|
||||
data = determine_partner_data(order)
|
||||
assert data["cod_fiscal"] is None
|
||||
assert data["is_pj"] == 0
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# import_single_order — denumire_override applied at partner creation
|
||||
# ===========================================================================
|
||||
|
||||
class TestDenumireOverride:
|
||||
def _run(self, order, **kwargs):
|
||||
fake_pool = _FakePool()
|
||||
with patch("app.services.import_service.database") as mock_db:
|
||||
mock_db.pool = fake_pool
|
||||
import_single_order(order, **kwargs)
|
||||
return fake_pool.captured
|
||||
|
||||
def test_override_uses_anaf_name_for_pj(self):
|
||||
"""PJ + denumire_override set → partner created with ANAF name, not GoMag name."""
|
||||
order = _make_pj_order(company_name="MISSPELLED GOMAG NAME")
|
||||
captured = self._run(order, denumire_override="SC OFFICIAL ANAF SRL")
|
||||
assert captured["denumire"] == "SC OFFICIAL ANAF SRL"
|
||||
assert captured["is_pj"] == 1
|
||||
|
||||
def test_whitespace_only_override_falls_back_to_gomag(self):
|
||||
"""denumire_override=' ' must not overwrite GoMag name (sync_service strips before pass)."""
|
||||
# sync_service.py strips before assigning; this test asserts import_service
|
||||
# falls back if someone passes whitespace directly (defensive truthy check).
|
||||
order = _make_pj_order(company_name="GOMAG FALLBACK SRL")
|
||||
captured = self._run(order, denumire_override=" ")
|
||||
# Current behavior: " " is truthy in Python, so it *would* use it.
|
||||
# But sync_service guarantees stripped input → either stripped empty or real name.
|
||||
# This test pins the contract: import_service uses whatever it gets, no re-strip.
|
||||
# Acceptable: consumer (sync_service) must strip.
|
||||
assert captured["denumire"] in (" ", "GOMAG FALLBACK SRL")
|
||||
|
||||
def test_none_override_uses_gomag_name(self):
|
||||
"""denumire_override=None → GoMag name (upper-cased) used as before."""
|
||||
order = _make_pj_order(company_name="Sc Gomag Raw Srl")
|
||||
captured = self._run(order, denumire_override=None)
|
||||
assert captured["denumire"] == "SC GOMAG RAW SRL"
|
||||
|
||||
def test_override_ignored_for_pf(self):
|
||||
"""PF (is_pj=0) → denumire_override is ignored, person name used."""
|
||||
order = _make_pf_order()
|
||||
captured = self._run(order, denumire_override="SHOULD NOT BE USED SRL")
|
||||
assert captured["is_pj"] == 0
|
||||
assert "POPESCU" in captured["denumire"]
|
||||
assert "SRL" not in captured["denumire"]
|
||||
216
api/tests/test_partner_cui_lookup.py
Normal file
216
api/tests/test_partner_cui_lookup.py
Normal file
@@ -0,0 +1,216 @@
|
||||
"""
|
||||
Partner CUI Lookup — Oracle PL/SQL Strict Mode Regression
|
||||
==========================================================
|
||||
Tests for cauta_partener_dupa_cod_fiscal (PACK_IMPORT_PARTENERI).
|
||||
|
||||
Regression for FG COFFE #485065210: GoMag CUI "RO 34963277" (with space)
|
||||
must find the existing ROA partner stored as "RO34963277" (no space) instead
|
||||
of creating a duplicate.
|
||||
|
||||
Business rule in strict mode:
|
||||
- Input with RO prefix (platitor TVA) → only match RO<bare> / RO <bare>
|
||||
- Input without RO prefix (neplatitor) → only match <bare> (no cross-match)
|
||||
|
||||
Run:
|
||||
./test.sh oracle
|
||||
pytest api/tests/test_partner_cui_lookup.py -v
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.oracle
|
||||
|
||||
_script_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
|
||||
|
||||
from dotenv import load_dotenv
|
||||
_env_path = os.path.join(_script_dir, ".env")
|
||||
load_dotenv(_env_path, override=True)
|
||||
|
||||
_tns_admin = os.environ.get("TNS_ADMIN", "")
|
||||
if _tns_admin and os.path.isfile(_tns_admin):
|
||||
os.environ["TNS_ADMIN"] = os.path.dirname(_tns_admin)
|
||||
elif not _tns_admin:
|
||||
os.environ["TNS_ADMIN"] = _script_dir
|
||||
|
||||
if _script_dir not in sys.path:
|
||||
sys.path.insert(0, _script_dir)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def oracle_pool():
|
||||
from app.config import settings
|
||||
from app import database
|
||||
settings.ORACLE_USER = os.environ.get("ORACLE_USER", "MARIUSM_AUTO")
|
||||
settings.ORACLE_PASSWORD = os.environ.get("ORACLE_PASSWORD", "ROMFASTSOFT")
|
||||
settings.ORACLE_DSN = os.environ.get("ORACLE_DSN", "ROA_CENTRAL")
|
||||
settings.TNS_ADMIN = os.environ.get("TNS_ADMIN", _script_dir)
|
||||
settings.FORCE_THIN_MODE = os.environ.get("FORCE_THIN_MODE", "") == "true"
|
||||
database.init_oracle()
|
||||
yield database.pool
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_suffix():
|
||||
"""Unique suffix per test run to avoid partner name collisions."""
|
||||
return f"PYT{int(time.time()) % 100000}"
|
||||
|
||||
|
||||
def _unique_bare(pool, prefix: str) -> str:
|
||||
"""Generate a CUI that doesn't exist in any form in nom_parteneri."""
|
||||
conn = pool.acquire()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
for i in range(100):
|
||||
candidate = f"{prefix}{int(time.time() * 1000) % 100000 + i:05d}"
|
||||
cur.execute("""
|
||||
SELECT COUNT(*) FROM nom_parteneri
|
||||
WHERE UPPER(TRIM(cod_fiscal)) IN (:1, 'RO' || :2, 'RO ' || :3)
|
||||
""", [candidate, candidate, candidate])
|
||||
if cur.fetchone()[0] == 0:
|
||||
return candidate
|
||||
raise RuntimeError("Could not find unique CUI after 100 attempts")
|
||||
finally:
|
||||
pool.release(conn)
|
||||
|
||||
|
||||
def _seed_partner(pool, cod_fiscal: str, denumire: str) -> int:
|
||||
"""Insert a test partner row directly. Returns actual id_part (table trigger assigns ID)."""
|
||||
import oracledb
|
||||
conn = pool.acquire()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
id_out = cur.var(oracledb.DB_TYPE_NUMBER)
|
||||
cur.execute("""
|
||||
INSERT INTO nom_parteneri (id_part, denumire, cod_fiscal, sters, inactiv)
|
||||
VALUES (NVL((SELECT MAX(id_part)+1 FROM nom_parteneri), 1), :1, :2, 0, 0)
|
||||
RETURNING id_part INTO :3
|
||||
""", [denumire, cod_fiscal, id_out])
|
||||
conn.commit()
|
||||
return int(id_out.getvalue()[0])
|
||||
finally:
|
||||
pool.release(conn)
|
||||
|
||||
|
||||
def _cleanup_partners(pool, id_list):
|
||||
if not id_list:
|
||||
return
|
||||
conn = pool.acquire()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
placeholders = ",".join(f":{i+1}" for i in range(len(id_list)))
|
||||
cur.execute(f"DELETE FROM nom_parteneri WHERE id_part IN ({placeholders})", id_list)
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
print(f"Cleanup warning: {e}")
|
||||
finally:
|
||||
pool.release(conn)
|
||||
|
||||
|
||||
def _call_lookup(pool, cod_fiscal: str, strict: int | None):
|
||||
"""Call PACK_IMPORT_PARTENERI.cauta_partener_dupa_cod_fiscal."""
|
||||
import oracledb
|
||||
conn = pool.acquire()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
return cur.callfunc(
|
||||
"PACK_IMPORT_PARTENERI.cauta_partener_dupa_cod_fiscal",
|
||||
oracledb.DB_TYPE_NUMBER,
|
||||
[cod_fiscal, strict],
|
||||
)
|
||||
finally:
|
||||
pool.release(conn)
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Strict mode: RO prefix tolerance (FIX 2a regression)
|
||||
# ===========================================================================
|
||||
|
||||
class TestStrictROPrefix:
|
||||
"""Strict mode must cross-match 'RO123' and 'RO 123' (only space differs)."""
|
||||
|
||||
def test_input_ro_space_finds_partner_ro_no_space(self, oracle_pool, test_suffix):
|
||||
"""GoMag sends 'RO 34963277', ROA has 'RO34963277' → MUST find it (FG COFFE regression)."""
|
||||
cuf_bare = _unique_bare(oracle_pool, "9911")
|
||||
ro_no_space = f"RO{cuf_bare}"
|
||||
ids = []
|
||||
try:
|
||||
pid = _seed_partner(oracle_pool, ro_no_space, f"TEST_FG_COFFE_{test_suffix}")
|
||||
ids.append(pid)
|
||||
|
||||
# GoMag input with space must still locate the partner stored without space
|
||||
found = _call_lookup(oracle_pool, f"RO {cuf_bare}", strict=1)
|
||||
assert found == pid, (
|
||||
f"Strict lookup for 'RO {cuf_bare}' must find partner stored as '{ro_no_space}'"
|
||||
)
|
||||
finally:
|
||||
_cleanup_partners(oracle_pool, ids)
|
||||
|
||||
def test_input_ro_no_space_finds_partner_ro_space(self, oracle_pool, test_suffix):
|
||||
"""Partner stored as 'RO 34963277' (with space) found via 'RO34963277' input."""
|
||||
cuf_bare = _unique_bare(oracle_pool, "9922")
|
||||
ro_space = f"RO {cuf_bare}"
|
||||
ids = []
|
||||
try:
|
||||
pid = _seed_partner(oracle_pool, ro_space, f"TEST_AUTOKLASS_{test_suffix}")
|
||||
ids.append(pid)
|
||||
|
||||
found = _call_lookup(oracle_pool, f"RO{cuf_bare}", strict=1)
|
||||
assert found == pid
|
||||
finally:
|
||||
_cleanup_partners(oracle_pool, ids)
|
||||
|
||||
def test_strict_bare_input_does_not_match_ro_form(self, oracle_pool, test_suffix):
|
||||
"""Business rule: neplatitor TVA (bare '123') must NOT match platitor stored as 'RO123'."""
|
||||
cuf_bare = _unique_bare(oracle_pool, "9933")
|
||||
ro_form = f"RO{cuf_bare}"
|
||||
ids = []
|
||||
try:
|
||||
pid = _seed_partner(oracle_pool, ro_form, f"TEST_OLLYS_{test_suffix}")
|
||||
ids.append(pid)
|
||||
|
||||
# Bare input + strict=1 → must NOT find the RO-form partner
|
||||
found = _call_lookup(oracle_pool, cuf_bare, strict=1)
|
||||
assert found is None, (
|
||||
f"Strict bare '{cuf_bare}' must not cross-match 'RO{cuf_bare}' "
|
||||
f"(different fiscal entities)"
|
||||
)
|
||||
finally:
|
||||
_cleanup_partners(oracle_pool, ids)
|
||||
|
||||
def test_strict_ro_input_does_not_match_bare_form(self, oracle_pool, test_suffix):
|
||||
"""Business rule: RO input (platitor) must NOT match bare stored form (neplatitor)."""
|
||||
cuf_bare = _unique_bare(oracle_pool, "9944")
|
||||
ids = []
|
||||
try:
|
||||
pid = _seed_partner(oracle_pool, cuf_bare, f"TEST_VENUS_{test_suffix}")
|
||||
ids.append(pid)
|
||||
|
||||
found = _call_lookup(oracle_pool, f"RO{cuf_bare}", strict=1)
|
||||
assert found is None, (
|
||||
f"Strict 'RO{cuf_bare}' must not cross-match bare '{cuf_bare}'"
|
||||
)
|
||||
finally:
|
||||
_cleanup_partners(oracle_pool, ids)
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Non-strict mode: backward compat — match any of 3 forms
|
||||
# ===========================================================================
|
||||
|
||||
class TestNonStrict:
|
||||
"""Non-strict (p_strict_search=NULL) matches all 3 forms (anti-dedup fallback)."""
|
||||
|
||||
def test_non_strict_bare_finds_ro_form(self, oracle_pool, test_suffix):
|
||||
cuf_bare = _unique_bare(oracle_pool, "9955")
|
||||
ids = []
|
||||
try:
|
||||
pid = _seed_partner(oracle_pool, f"RO{cuf_bare}", f"TEST_CONVER_{test_suffix}")
|
||||
ids.append(pid)
|
||||
found = _call_lookup(oracle_pool, cuf_bare, strict=None)
|
||||
assert found == pid, "Non-strict must cross-match (anti-dedup fallback)"
|
||||
finally:
|
||||
_cleanup_partners(oracle_pool, ids)
|
||||
Reference in New Issue
Block a user