Inchide scurgerea cross-account pe GET /v1/prezentari(/{id}),
/v1/mapari(/pending) si /v1/audit/export. Toate primesc
Depends(resolve_account_id) + account_scope_clause (regula NULL->cont 1,
OV-2). Nomenclatorul ramane global (referinta partajata, fara PII).
- B3: 404 cross-account byte-identic cu 404 inexistent (fara oracol enumerare)
- B4: get_prezentare cu allowlist de campuri (nu mai expune rar_creds_enc/
payload_json/idempotency_key/rar_error)
- B1: pending_unmapped filtreaza in SQL; default None = global doar pentru web
- B2: helper account_scope_clause (DRY, doar pe submissions nullable)
- B5: index idx_submissions_account_status
- B8: regula de scope documentata in api-rar-contract.md
- TD-3.2: ?account_id != contul cheii -> 400
14 teste noi (cross-account, legacy NULL, B3, B4); suita 313 passed.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
195 lines
7.5 KiB
Python
195 lines
7.5 KiB
Python
"""Teste scope cont pe GET /v1/mapari + /pending (US-002, PRD 3.2).
|
|
|
|
TD-3.2 (decis la poarta): parametrul ?account_id= din query se pastreaza DAR:
|
|
- daca e prezent SI difera de contul cheii -> 400 explicit
|
|
- daca e prezent si egal -> ok
|
|
- daca lipseste -> contul cheii
|
|
Contul efectiv vine MEREU din cheie (nespoofabil).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture()
|
|
def env(monkeypatch):
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
yield monkeypatch
|
|
get_settings.cache_clear()
|
|
|
|
|
|
def _client():
|
|
from app.main import app
|
|
return TestClient(app)
|
|
|
|
|
|
def test_mapari_doar_contul_cheii(env):
|
|
"""Cheia A vede doar maparile contului A; cheia B nu vede maparile lui A."""
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
conn = get_connection()
|
|
try:
|
|
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
|
|
k1 = create_api_key(conn, 1)
|
|
k2 = create_api_key(conn, 2)
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO nomenclator_rar (cod_prestatie, nume_prestatie) "
|
|
"VALUES ('OE-1', 'test')"
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) "
|
|
"VALUES (1, 'OP_A', 'OE-1', 1)"
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) "
|
|
"VALUES (2, 'OP_B', 'OE-1', 1)"
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
mapari1 = c.get("/v1/mapari", headers={"X-API-Key": k1}).json()["mapari"]
|
|
ops1 = [m["cod_op_service"] for m in mapari1]
|
|
assert "OP_A" in ops1
|
|
assert "OP_B" not in ops1
|
|
|
|
mapari2 = c.get("/v1/mapari", headers={"X-API-Key": k2}).json()["mapari"]
|
|
ops2 = [m["cod_op_service"] for m in mapari2]
|
|
assert "OP_B" in ops2
|
|
assert "OP_A" not in ops2
|
|
|
|
|
|
def test_mapari_query_account_id_diferit_400(env):
|
|
"""Daca ?account_id difera de contul cheii -> 400 explicit (TD-3.2)."""
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
conn = get_connection()
|
|
try:
|
|
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
|
|
k1 = create_api_key(conn, 1)
|
|
finally:
|
|
conn.close()
|
|
|
|
resp = c.get("/v1/mapari?account_id=2", headers={"X-API-Key": k1})
|
|
assert resp.status_code == 400
|
|
|
|
|
|
def test_mapari_query_account_id_egal_ok(env):
|
|
"""Daca ?account_id egal cu contul cheii -> 200 (TD-3.2)."""
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
conn = get_connection()
|
|
try:
|
|
k1 = create_api_key(conn, 1)
|
|
finally:
|
|
conn.close()
|
|
|
|
resp = c.get("/v1/mapari?account_id=1", headers={"X-API-Key": k1})
|
|
assert resp.status_code == 200
|
|
|
|
|
|
def test_pending_doar_contul_cheii(env):
|
|
"""GET /v1/mapari/pending cu cheia A returneaza doar operatiile contului A."""
|
|
with _client() as c:
|
|
from app.auth import create_api_key
|
|
from app.db import get_connection
|
|
conn = get_connection()
|
|
try:
|
|
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
|
|
k1 = create_api_key(conn, 1)
|
|
k2 = create_api_key(conn, 2)
|
|
payload1 = json.dumps({"prestatii": [{"cod_op_service": "OP_A", "denumire": "Reparatie"}]})
|
|
conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
|
|
"VALUES ('pm_key1', 1, 'needs_mapping', ?)", (payload1,)
|
|
)
|
|
payload2 = json.dumps({"prestatii": [{"cod_op_service": "OP_B", "denumire": "Vopsire"}]})
|
|
conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
|
|
"VALUES ('pm_key2', 2, 'needs_mapping', ?)", (payload2,)
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
pending1 = c.get("/v1/mapari/pending", headers={"X-API-Key": k1}).json()["pending"]
|
|
ops1 = [p["cod_op_service"] for p in pending1]
|
|
assert "OP_A" in ops1
|
|
assert "OP_B" not in ops1
|
|
|
|
pending2 = c.get("/v1/mapari/pending", headers={"X-API-Key": k2}).json()["pending"]
|
|
ops2 = [p["cod_op_service"] for p in pending2]
|
|
assert "OP_B" in ops2
|
|
assert "OP_A" not in ops2
|
|
|
|
|
|
def test_pending_web_global_neschimbat(env):
|
|
"""pending_unmapped(conn) fara argument returneaza global (back-compat pentru web/routes.py)."""
|
|
with _client() as c:
|
|
from app.db import get_connection
|
|
from app.mapping import pending_unmapped
|
|
conn = get_connection()
|
|
try:
|
|
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
|
|
payload1 = json.dumps({"prestatii": [{"cod_op_service": "OP_A", "denumire": "Reparatie"}]})
|
|
conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
|
|
"VALUES ('pgn_key1', 1, 'needs_mapping', ?)", (payload1,)
|
|
)
|
|
payload2 = json.dumps({"prestatii": [{"cod_op_service": "OP_B", "denumire": "Vopsire"}]})
|
|
conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
|
|
"VALUES ('pgn_key2', 2, 'needs_mapping', ?)", (payload2,)
|
|
)
|
|
# Apel fara argument -> global (ambele conturi)
|
|
result = pending_unmapped(conn)
|
|
ops = [p["cod_op_service"] for p in result]
|
|
assert "OP_A" in ops
|
|
assert "OP_B" in ops
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_pending_filtreaza_in_sql_cu_regula_null(env):
|
|
"""B1: pending_unmapped(conn, account_id=1) include si randuri cu account_id=NULL (legacy).
|
|
|
|
Filtrarea trebuie sa se faca IN SQL cu:
|
|
WHERE status='needs_mapping' AND (account_id=? OR (account_id IS NULL AND ?=1))
|
|
Nu post-hoc in Python.
|
|
"""
|
|
with _client() as c:
|
|
from app.db import get_connection
|
|
from app.mapping import pending_unmapped
|
|
conn = get_connection()
|
|
try:
|
|
payload1 = json.dumps({"prestatii": [{"cod_op_service": "OP_EXPLICIT", "denumire": "X"}]})
|
|
conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
|
|
"VALUES ('pfn_key1', 1, 'needs_mapping', ?)", (payload1,)
|
|
)
|
|
payload2 = json.dumps({"prestatii": [{"cod_op_service": "OP_NULL", "denumire": "Y"}]})
|
|
conn.execute(
|
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
|
|
"VALUES ('pfn_key2', NULL, 'needs_mapping', ?)", (payload2,)
|
|
)
|
|
# Cu account_id=1 -> vede si randul legacy (NULL -> cont 1)
|
|
result = pending_unmapped(conn, account_id=1)
|
|
ops = [p["cod_op_service"] for p in result]
|
|
assert "OP_EXPLICIT" in ops
|
|
assert "OP_NULL" in ops
|
|
# Cu account_id=2 -> nu vede nimic (nu are pending submissions)
|
|
result2 = pending_unmapped(conn, account_id=2)
|
|
assert result2 == []
|
|
finally:
|
|
conn.close()
|