feat(api): scope pe cont la GET-urile de listare /v1/* (PRD 3.2)

Inchide scurgerea cross-account pe GET /v1/prezentari(/{id}),
/v1/mapari(/pending) si /v1/audit/export. Toate primesc
Depends(resolve_account_id) + account_scope_clause (regula NULL->cont 1,
OV-2). Nomenclatorul ramane global (referinta partajata, fara PII).

- B3: 404 cross-account byte-identic cu 404 inexistent (fara oracol enumerare)
- B4: get_prezentare cu allowlist de campuri (nu mai expune rar_creds_enc/
  payload_json/idempotency_key/rar_error)
- B1: pending_unmapped filtreaza in SQL; default None = global doar pentru web
- B2: helper account_scope_clause (DRY, doar pe submissions nullable)
- B5: index idx_submissions_account_status
- B8: regula de scope documentata in api-rar-contract.md
- TD-3.2: ?account_id != contul cheii -> 400

14 teste noi (cross-account, legacy NULL, B3, B4); suita 313 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-06-17 17:35:50 +00:00
parent 1c5b0cbc18
commit 748ab8b289
10 changed files with 655 additions and 61 deletions

View File

@@ -0,0 +1,128 @@
"""Teste scope cont pe GET /v1/audit/export (US-003, PRD 3.2)."""
from __future__ import annotations
import csv
import io
import json
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
yield monkeypatch
get_settings.cache_clear()
def _client():
from app.main import app
return TestClient(app)
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "s"}, "prezentari": [prez]}
def _csv_vins(content: bytes) -> list[str]:
reader = csv.DictReader(io.StringIO(content.decode("utf-8")))
return [r["vin"] for r in reader if r.get("vin")]
def test_export_doar_contul_cheii(env):
"""Exportul CSV contine doar randurile contului asociat cheii."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
finally:
conn.close()
c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
c.post("/v1/prezentari", json=_body(vin="WVWZZZ1KZAW000456"), headers={"X-API-Key": k2})
# Marcheaza ca sent pentru ca audit/export default e status=sent
conn2 = get_connection()
try:
conn2.execute("UPDATE submissions SET status='sent'")
finally:
conn2.close()
resp1 = c.get("/v1/audit/export", headers={"X-API-Key": k1})
assert resp1.status_code == 200
vins1 = _csv_vins(resp1.content)
assert "WVWZZZ1KZAW000123" in vins1
assert "WVWZZZ1KZAW000456" not in vins1
resp2 = c.get("/v1/audit/export", headers={"X-API-Key": k2})
vins2 = _csv_vins(resp2.content)
assert "WVWZZZ1KZAW000456" in vins2
assert "WVWZZZ1KZAW000123" not in vins2
def test_export_legacy_null_pentru_cont_1(env):
"""Randuri cu account_id=NULL apartin contului 1 in exportul de audit; contul 2 nu le vede."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
payload = json.dumps({"vin": "LEGACYVIN12345678", "prestatii": []})
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('legacy_audit_key', NULL, 'sent', ?)", (payload,)
)
finally:
conn.close()
resp1 = c.get("/v1/audit/export", headers={"X-API-Key": k1})
vins1 = _csv_vins(resp1.content)
assert "LEGACYVIN12345678" in vins1
resp2 = c.get("/v1/audit/export", headers={"X-API-Key": k2})
vins2 = _csv_vins(resp2.content)
assert "LEGACYVIN12345678" not in vins2
def test_export_status_all_tot_scoped(env):
"""status=all ramane scoped pe cont (nu exporta global)."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
finally:
conn.close()
c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
c.post("/v1/prezentari", json=_body(vin="WVWZZZ1KZAW000456"), headers={"X-API-Key": k2})
resp1 = c.get("/v1/audit/export?status=all", headers={"X-API-Key": k1})
vins1 = _csv_vins(resp1.content)
assert "WVWZZZ1KZAW000123" in vins1
assert "WVWZZZ1KZAW000456" not in vins1

View File

@@ -0,0 +1,194 @@
"""Teste scope cont pe GET /v1/mapari + /pending (US-002, PRD 3.2).
TD-3.2 (decis la poarta): parametrul ?account_id= din query se pastreaza DAR:
- daca e prezent SI difera de contul cheii -> 400 explicit
- daca e prezent si egal -> ok
- daca lipseste -> contul cheii
Contul efectiv vine MEREU din cheie (nespoofabil).
"""
from __future__ import annotations
import json
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
yield monkeypatch
get_settings.cache_clear()
def _client():
from app.main import app
return TestClient(app)
def test_mapari_doar_contul_cheii(env):
"""Cheia A vede doar maparile contului A; cheia B nu vede maparile lui A."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
conn.execute(
"INSERT OR IGNORE INTO nomenclator_rar (cod_prestatie, nume_prestatie) "
"VALUES ('OE-1', 'test')"
)
conn.execute(
"INSERT INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) "
"VALUES (1, 'OP_A', 'OE-1', 1)"
)
conn.execute(
"INSERT INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) "
"VALUES (2, 'OP_B', 'OE-1', 1)"
)
finally:
conn.close()
mapari1 = c.get("/v1/mapari", headers={"X-API-Key": k1}).json()["mapari"]
ops1 = [m["cod_op_service"] for m in mapari1]
assert "OP_A" in ops1
assert "OP_B" not in ops1
mapari2 = c.get("/v1/mapari", headers={"X-API-Key": k2}).json()["mapari"]
ops2 = [m["cod_op_service"] for m in mapari2]
assert "OP_B" in ops2
assert "OP_A" not in ops2
def test_mapari_query_account_id_diferit_400(env):
"""Daca ?account_id difera de contul cheii -> 400 explicit (TD-3.2)."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
finally:
conn.close()
resp = c.get("/v1/mapari?account_id=2", headers={"X-API-Key": k1})
assert resp.status_code == 400
def test_mapari_query_account_id_egal_ok(env):
"""Daca ?account_id egal cu contul cheii -> 200 (TD-3.2)."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
k1 = create_api_key(conn, 1)
finally:
conn.close()
resp = c.get("/v1/mapari?account_id=1", headers={"X-API-Key": k1})
assert resp.status_code == 200
def test_pending_doar_contul_cheii(env):
"""GET /v1/mapari/pending cu cheia A returneaza doar operatiile contului A."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
payload1 = json.dumps({"prestatii": [{"cod_op_service": "OP_A", "denumire": "Reparatie"}]})
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('pm_key1', 1, 'needs_mapping', ?)", (payload1,)
)
payload2 = json.dumps({"prestatii": [{"cod_op_service": "OP_B", "denumire": "Vopsire"}]})
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('pm_key2', 2, 'needs_mapping', ?)", (payload2,)
)
finally:
conn.close()
pending1 = c.get("/v1/mapari/pending", headers={"X-API-Key": k1}).json()["pending"]
ops1 = [p["cod_op_service"] for p in pending1]
assert "OP_A" in ops1
assert "OP_B" not in ops1
pending2 = c.get("/v1/mapari/pending", headers={"X-API-Key": k2}).json()["pending"]
ops2 = [p["cod_op_service"] for p in pending2]
assert "OP_B" in ops2
assert "OP_A" not in ops2
def test_pending_web_global_neschimbat(env):
"""pending_unmapped(conn) fara argument returneaza global (back-compat pentru web/routes.py)."""
with _client() as c:
from app.db import get_connection
from app.mapping import pending_unmapped
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
payload1 = json.dumps({"prestatii": [{"cod_op_service": "OP_A", "denumire": "Reparatie"}]})
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('pgn_key1', 1, 'needs_mapping', ?)", (payload1,)
)
payload2 = json.dumps({"prestatii": [{"cod_op_service": "OP_B", "denumire": "Vopsire"}]})
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('pgn_key2', 2, 'needs_mapping', ?)", (payload2,)
)
# Apel fara argument -> global (ambele conturi)
result = pending_unmapped(conn)
ops = [p["cod_op_service"] for p in result]
assert "OP_A" in ops
assert "OP_B" in ops
finally:
conn.close()
def test_pending_filtreaza_in_sql_cu_regula_null(env):
"""B1: pending_unmapped(conn, account_id=1) include si randuri cu account_id=NULL (legacy).
Filtrarea trebuie sa se faca IN SQL cu:
WHERE status='needs_mapping' AND (account_id=? OR (account_id IS NULL AND ?=1))
Nu post-hoc in Python.
"""
with _client() as c:
from app.db import get_connection
from app.mapping import pending_unmapped
conn = get_connection()
try:
payload1 = json.dumps({"prestatii": [{"cod_op_service": "OP_EXPLICIT", "denumire": "X"}]})
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('pfn_key1', 1, 'needs_mapping', ?)", (payload1,)
)
payload2 = json.dumps({"prestatii": [{"cod_op_service": "OP_NULL", "denumire": "Y"}]})
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('pfn_key2', NULL, 'needs_mapping', ?)", (payload2,)
)
# Cu account_id=1 -> vede si randul legacy (NULL -> cont 1)
result = pending_unmapped(conn, account_id=1)
ops = [p["cod_op_service"] for p in result]
assert "OP_EXPLICIT" in ops
assert "OP_NULL" in ops
# Cu account_id=2 -> nu vede nimic (nu are pending submissions)
result2 = pending_unmapped(conn, account_id=2)
assert result2 == []
finally:
conn.close()

View File

@@ -0,0 +1,169 @@
"""Teste scope cont pe GET /v1/prezentari + /{id} (US-001, PRD 3.2).
Metoda TDD: testele se scriu inainte de implementare (RED) si trebuie sa ramana
verzi dupa implementare (GREEN).
"""
from __future__ import annotations
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
yield monkeypatch
get_settings.cache_clear()
def _client():
from app.main import app
return TestClient(app)
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "s"}, "prezentari": [prez]}
def test_lista_doar_contul_cheii(env):
"""Cheia A vede doar submission-urile contului A; cheia B nu vede submission-urile lui A."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
finally:
conn.close()
r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
assert r1.status_code == 200
sid1 = r1.json()["results"][0]["submission_id"]
r2 = c.post("/v1/prezentari", json=_body(vin="WVWZZZ1KZAW000456"), headers={"X-API-Key": k2})
assert r2.status_code == 200
sid2 = r2.json()["results"][0]["submission_id"]
lista1 = c.get("/v1/prezentari", headers={"X-API-Key": k1}).json()["submissions"]
ids1 = [s["id"] for s in lista1]
assert sid1 in ids1
assert sid2 not in ids1
lista2 = c.get("/v1/prezentari", headers={"X-API-Key": k2}).json()["submissions"]
ids2 = [s["id"] for s in lista2]
assert sid2 in ids2
assert sid1 not in ids2
def test_detaliu_cross_account_404(env):
"""GET /{id} cu cheia contului B pentru submission-ul contului A -> 404.
B3: detail-ul 404 cross-account trebuie byte-identic cu cel al unui id inexistent
(acelasi status + acelasi mesaj) — nu dam indicii ca randul exista.
"""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
finally:
conn.close()
r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
sid1 = r.json()["results"][0]["submission_id"]
cross = c.get(f"/v1/prezentari/{sid1}", headers={"X-API-Key": k2})
nonexist = c.get("/v1/prezentari/99999", headers={"X-API-Key": k2})
assert cross.status_code == 404
assert nonexist.status_code == 404
assert cross.json()["detail"] == nonexist.json()["detail"] == "submission inexistent"
def test_legacy_null_vizibil_pentru_cont_1(env):
"""Randuri cu account_id=NULL apartin contului 1 (legacy OV-2); contul 2 nu le vede."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) "
"VALUES ('legacy_null_key', NULL, 'queued', '{}')"
)
finally:
conn.close()
lista1 = c.get("/v1/prezentari", headers={"X-API-Key": k1}).json()["submissions"]
lista2 = c.get("/v1/prezentari", headers={"X-API-Key": k2}).json()["submissions"]
assert len(lista1) >= 1
assert len(lista2) == 0
def test_fara_cheie_flag_off_vede_contul_1(env):
"""Fara cheie cu AUTOPASS_REQUIRE_API_KEY=false -> cont implicit (id=1, back-compat dev)."""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'alt')")
k2 = create_api_key(conn, 2)
finally:
conn.close()
# Submission pentru contul 1 (fara cheie, flag off -> cont implicit)
c.post("/v1/prezentari", json=_body())
# Submission pentru contul 2
c.post("/v1/prezentari", json=_body(vin="WVWZZZ1KZAW000456"), headers={"X-API-Key": k2})
# Fara cheie -> vede DOAR contul 1 (1 submission)
lista = c.get("/v1/prezentari").json()["submissions"]
assert len(lista) == 1
def test_detaliu_nu_expune_creds(env):
"""B4: GET /v1/prezentari/{id} nu expune campuri sensibile (rar_creds_enc, payload_json,
idempotency_key, rar_error).
"""
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
k1 = create_api_key(conn, 1)
finally:
conn.close()
r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
sid = r.json()["results"][0]["submission_id"]
resp = c.get(f"/v1/prezentari/{sid}", headers={"X-API-Key": k1})
assert resp.status_code == 200
data = resp.json()
for field in ("rar_creds_enc", "payload_json", "idempotency_key", "rar_error"):
assert field not in data, f"camp sensibil expus: {field}"