Toate citirile pe coloana legacy accounts.rar_creds_enc mutate pe sloturile per-env (rar_creds_test_enc/rar_creds_prod_enc): worker fallback+keepalive, are_creds (web) si are_creds_rar (integrare, +are_creds_test/_prod), write-back API la reactivare, purjare la stergere cont, _get_acasa_context/_fetch_cont_env_state. Contract API (aditiv): POST /v1/conturi/rar-creds primeste rar_target optional (test/prod), scrie in slotul corect + activeaza mediul; DELETE primeste ?env (sterge un slot sau ambele). Documentat in docs/api-rar-contract.md. DROP cu garda in db.py (schema.sql fara coloana pe DB fresh): - 6a: eliminat ADD COLUMN rar_creds_enc (fara ping-pong re-ADD dupa DROP) - 6b: try/except fail-safe (nu crapa boot-ul) + garda sqlite_version >= 3.35 - 6c: re-backfill old->new imediat inainte de assert (ancora globala) - garda orfane: DROP anulat daca vreun creds legacy nu a aterizat in slot per-env - backup criptat accounts_rar_creds_enc_backup inainte de DROP - 6d: verificare prin PRAGMA table_info (NU grep — submissions are aceeasi coloana) Garda one-way, idempotenta la boot repetat (verificat). submissions.rar_creds_enc ramane neatinsa. tests/test_retragere_creds_enc.py: niciun read pe coloana veche, conturi rar-creds env-aware, are_creds per-env, DROP blocat de garda la lipsa copiere. 9 teste existente actualizate pe sloturi per-env. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
167 lines
6.3 KiB
Python
167 lines
6.3 KiB
Python
"""Teste US-004 (PRD 5.5): model de stare a contului `accounts.status` + helperi.
|
|
|
|
Acopera: derivarea status din active la migrare, invariantul active=1 <=> status='active',
|
|
helperii set_status/delete_account, protectia contului de sistem id=1, excluderea 'deleted'
|
|
din listare.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture()
|
|
def conn(monkeypatch):
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_status.db"))
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
from app.db import get_connection, init_db
|
|
init_db()
|
|
c = get_connection()
|
|
yield c
|
|
c.close()
|
|
get_settings.cache_clear()
|
|
|
|
|
|
def _status(conn, acct_id):
|
|
return conn.execute("SELECT status, active FROM accounts WHERE id=?", (acct_id,)).fetchone()
|
|
|
|
|
|
def test_create_account_activ_status_active(conn):
|
|
from app.accounts import create_account
|
|
acct_id = create_account(conn, "Service X", active=True)
|
|
row = _status(conn, acct_id)
|
|
assert row["status"] == "active" and row["active"] == 1
|
|
|
|
|
|
def test_create_account_inactiv_status_pending(conn):
|
|
from app.accounts import create_account
|
|
acct_id = create_account(conn, "Service Y", active=False)
|
|
row = _status(conn, acct_id)
|
|
assert row["status"] == "pending" and row["active"] == 0
|
|
|
|
|
|
def test_default_account_id1_active(conn):
|
|
row = _status(conn, 1)
|
|
assert row["status"] == "active" and row["active"] == 1
|
|
|
|
|
|
def test_set_status_mentine_invariant_active(conn):
|
|
from app.accounts import create_account, set_status
|
|
acct_id = create_account(conn, "Service Z", active=True)
|
|
for st, exp_active in [("blocked", 0), ("archived", 0), ("active", 1), ("pending", 0)]:
|
|
set_status(conn, acct_id, st)
|
|
row = _status(conn, acct_id)
|
|
assert row["status"] == st and row["active"] == exp_active
|
|
|
|
|
|
def test_set_status_invalid_ridica(conn):
|
|
from app.accounts import create_account, set_status
|
|
acct_id = create_account(conn, "Service W", active=True)
|
|
with pytest.raises(ValueError):
|
|
set_status(conn, acct_id, "inexistent")
|
|
|
|
|
|
def test_set_status_cont_inexistent_ridica(conn):
|
|
from app.accounts import set_status
|
|
with pytest.raises(ValueError):
|
|
set_status(conn, 9999, "blocked")
|
|
|
|
|
|
def test_set_active_mirror_status(conn):
|
|
from app.accounts import create_account, set_active
|
|
acct_id = create_account(conn, "Service M", active=True)
|
|
set_active(conn, acct_id, False)
|
|
assert _status(conn, acct_id)["status"] == "pending"
|
|
set_active(conn, acct_id, True)
|
|
assert _status(conn, acct_id)["status"] == "active"
|
|
|
|
|
|
def test_delete_account_soft(conn):
|
|
from app.accounts import create_account, delete_account, list_accounts
|
|
acct_id = create_account(conn, "Service D", active=True)
|
|
delete_account(conn, acct_id)
|
|
assert _status(conn, acct_id)["status"] == "deleted"
|
|
# exclus din listare
|
|
assert all(a["id"] != acct_id for a in list_accounts(conn))
|
|
|
|
|
|
def test_delete_purjeaza_pii_si_elibereaza_cui(conn):
|
|
"""Stergerea soft purjeaza creds RAR + revoca cheile API + elibereaza CUI (re-inregistrabil)."""
|
|
from app.accounts import create_account, delete_account, list_accounts
|
|
acct_id = create_account(conn, "Service GDPR", cui="RO12345", active=True)
|
|
# US-013: creds in sloturi per-env (rar_creds_enc legacy dropata)
|
|
conn.execute(
|
|
"UPDATE accounts SET rar_creds_test_enc='secret_enc_test', rar_creds_prod_enc='secret_enc_prod' WHERE id=?",
|
|
(acct_id,),
|
|
)
|
|
conn.execute("INSERT INTO api_keys (account_id, key_hash, active) VALUES (?, 'h', 1)", (acct_id,))
|
|
conn.commit()
|
|
|
|
delete_account(conn, acct_id)
|
|
|
|
row = conn.execute("SELECT status, rar_creds_test_enc, rar_creds_prod_enc, cui FROM accounts WHERE id=?",
|
|
(acct_id,)).fetchone()
|
|
assert row["status"] == "deleted"
|
|
assert row["rar_creds_test_enc"] is None, "creds RAR test trebuie purjate la stergere"
|
|
assert row["rar_creds_prod_enc"] is None, "creds RAR prod trebuie purjate la stergere"
|
|
assert row["cui"] is None, "CUI trebuie eliberat la stergere"
|
|
key_active = conn.execute("SELECT active FROM api_keys WHERE account_id=?", (acct_id,)).fetchone()
|
|
assert key_active["active"] == 0, "cheile API trebuie revocate"
|
|
# CUI eliberat -> se poate re-inregistra acelasi CUI
|
|
new_id = create_account(conn, "Service Nou", cui="RO12345", active=True)
|
|
assert new_id != acct_id
|
|
assert all(a["id"] != acct_id for a in list_accounts(conn))
|
|
|
|
|
|
def test_dev_id1_protejat_de_status_negativ(conn):
|
|
from app.accounts import set_status, delete_account
|
|
for verb in ("blocked", "archived", "deleted"):
|
|
with pytest.raises(ValueError):
|
|
set_status(conn, 1, verb)
|
|
with pytest.raises(ValueError):
|
|
delete_account(conn, 1)
|
|
# ramane activ
|
|
assert _status(conn, 1)["status"] == "active"
|
|
|
|
|
|
def test_migrare_deriva_status_din_active(conn):
|
|
"""DB veche fara coloana status -> _migrate o adauga si o deriva din active.
|
|
|
|
Pornim de la schema reala (fixtura `conn` a rulat init_db), reconstruim tabela accounts
|
|
FARA coloana status (simuleaza DB pre-5.5), apoi rulam _migrate.
|
|
"""
|
|
from app.db import _migrate
|
|
|
|
# Reconstruim accounts fara `status` (rebuild de tabela — singura cale in SQLite vechi).
|
|
# US-013: rar_creds_enc nu mai exista in accounts (dropata); nu e in legacy DDL.
|
|
conn.executescript(
|
|
"""
|
|
PRAGMA foreign_keys=OFF;
|
|
CREATE TABLE accounts_legacy (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, cui TEXT,
|
|
active INTEGER NOT NULL DEFAULT 1, created_at TEXT
|
|
);
|
|
INSERT INTO accounts_legacy (id, name, cui, active, created_at)
|
|
SELECT id, name, cui, active, created_at FROM accounts;
|
|
DROP TABLE accounts;
|
|
ALTER TABLE accounts_legacy RENAME TO accounts;
|
|
"""
|
|
)
|
|
conn.execute("INSERT INTO accounts (name, active) VALUES ('Activ', 1)")
|
|
conn.execute("INSERT INTO accounts (name, active) VALUES ('Inactiv', 0)")
|
|
conn.commit()
|
|
assert "status" not in {r["name"] for r in conn.execute("PRAGMA table_info(accounts)")}
|
|
|
|
_migrate(conn)
|
|
conn.commit()
|
|
|
|
rows = {r["name"]: r["status"] for r in conn.execute("SELECT name, status FROM accounts")}
|
|
assert rows["default"] == "active" # id=1
|
|
assert rows["Activ"] == "active"
|
|
assert rows["Inactiv"] == "pending"
|