"""US-001 (PRD 5.20): schema medii per cont + env pe submission + migrare/backfill.""" from __future__ import annotations import json import os import sqlite3 import tempfile import pytest @pytest.fixture() def fresh_conn(monkeypatch): """DB nou cu schema curenta (init_db).""" tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) from app.config import get_settings get_settings.cache_clear() from app.db import get_connection, init_db init_db() c = get_connection() yield c c.close() get_settings.cache_clear() def _old_db(path: str) -> sqlite3.Connection: """Construieste un DB in forma PRE-5.20 (fara coloanele de mediu).""" conn = sqlite3.connect(path, isolation_level=None) conn.row_factory = sqlite3.Row conn.execute( "CREATE TABLE accounts (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, " "cui TEXT, rar_creds_enc TEXT)" ) conn.execute( "CREATE TABLE submissions (id INTEGER PRIMARY KEY AUTOINCREMENT, " "idempotency_key TEXT NOT NULL UNIQUE, account_id INTEGER, status TEXT, " "payload_json TEXT NOT NULL)" ) return conn def _migrate_old(path: str, env: str, monkeypatch) -> sqlite3.Connection: monkeypatch.setenv("AUTOPASS_RAR_ENV", env) from app.config import get_settings get_settings.cache_clear() conn = sqlite3.connect(path, isolation_level=None) conn.row_factory = sqlite3.Row from app.db import _migrate _migrate(conn) return conn def test_coloane_medii_pe_cont(fresh_conn): acc = {r["name"] for r in fresh_conn.execute("PRAGMA table_info(accounts)").fetchall()} assert { "rar_test_enabled", "rar_prod_enabled", "rar_creds_test_enc", "rar_creds_prod_enc", "rar_env_default", } <= acc sub = {r["name"] for r in fresh_conn.execute("PRAGMA table_info(submissions)").fetchall()} assert "rar_env" in sub def test_default_client_prod_on_test_off(fresh_conn): from app.accounts import create_account aid = create_account(fresh_conn, "Service X") row = fresh_conn.execute( "SELECT rar_test_enabled, rar_prod_enabled, rar_env_default FROM accounts WHERE id=?", (aid,), ).fetchone() assert row["rar_prod_enabled"] == 1 assert row["rar_test_enabled"] == 0 assert row["rar_env_default"] == "prod" @pytest.mark.parametrize("env,slot,other", [ ("test", "rar_creds_test_enc", "rar_creds_prod_enc"), ("prod", "rar_creds_prod_enc", "rar_creds_test_enc"), ]) def test_migrare_creds_in_slotul_env_global(tmp_path, monkeypatch, env, slot, other): path = str(tmp_path / "old.db") old = _old_db(path) old.execute( "INSERT INTO accounts (id, name, rar_creds_enc) VALUES (5, 'Legacy', 'TOKEN_CREDS')" ) old.close() conn = _migrate_old(path, env, monkeypatch) row = conn.execute("SELECT * FROM accounts WHERE id=5").fetchone() assert row[slot] == "TOKEN_CREDS" assert row[other] is None assert row[f"rar_{env}_enabled"] == 1 assert row[f"rar_{'prod' if env == 'test' else 'test'}_enabled"] == 0 assert row["rar_env_default"] == env conn.close() def test_migrare_cont_fara_creds_ramane_pe_default(tmp_path, monkeypatch): path = str(tmp_path / "old.db") old = _old_db(path) old.execute("INSERT INTO accounts (id, name, rar_creds_enc) VALUES (6, 'NoCreds', NULL)") old.close() conn = _migrate_old(path, "test", monkeypatch) row = conn.execute("SELECT * FROM accounts WHERE id=6").fetchone() assert row["rar_prod_enabled"] == 1 assert row["rar_test_enabled"] == 0 assert row["rar_env_default"] == "prod" conn.close() def test_submissions_rar_env(tmp_path, monkeypatch): """Un rand PRE-migrare ajunge cu env-ul global (NU 'test') + cheie recalculata env-aware.""" path = str(tmp_path / "old.db") old = _old_db(path) payload = { "vin": "WVWZZZ1JZXW000001", "nr_inmatriculare": "B123ABC", "data_prestatie": "2026-01-10", "odometru_final": "123456", "prestatii": [{"cod_prestatie": "OE-1"}], } old.execute( "INSERT INTO submissions (idempotency_key, account_id, status, payload_json) " "VALUES ('LEGACY_KEY', 7, 'sent', ?)", (json.dumps(payload),), ) old.close() conn = _migrate_old(path, "prod", monkeypatch) row = conn.execute("SELECT rar_env, idempotency_key FROM submissions").fetchone() assert row["rar_env"] == "prod" # ancora globala, NU DEFAULT 'test' from app.idempotency import build_key, canonicalize_row canon = canonicalize_row(payload) canon["prestatii"] = payload["prestatii"] assert row["idempotency_key"] == build_key(7, canon, "prod") # si difera de varianta env-aware pe test (reconciliere pe endpoint corect) assert row["idempotency_key"] != build_key(7, canon, "test") conn.close() def test_migrare_idempotenta(fresh_conn): """A doua rulare _migrate pe DB deja migrat nu strica nimic.""" from app.db import _migrate _migrate(fresh_conn) # nu arunca, nu dubleaza coloane acc = {r["name"] for r in fresh_conn.execute("PRAGMA table_info(accounts)").fetchall()} assert "rar_env_default" in acc