Toate citirile pe coloana legacy accounts.rar_creds_enc mutate pe sloturile per-env (rar_creds_test_enc/rar_creds_prod_enc): worker fallback+keepalive, are_creds (web) si are_creds_rar (integrare, +are_creds_test/_prod), write-back API la reactivare, purjare la stergere cont, _get_acasa_context/_fetch_cont_env_state. Contract API (aditiv): POST /v1/conturi/rar-creds primeste rar_target optional (test/prod), scrie in slotul corect + activeaza mediul; DELETE primeste ?env (sterge un slot sau ambele). Documentat in docs/api-rar-contract.md. DROP cu garda in db.py (schema.sql fara coloana pe DB fresh): - 6a: eliminat ADD COLUMN rar_creds_enc (fara ping-pong re-ADD dupa DROP) - 6b: try/except fail-safe (nu crapa boot-ul) + garda sqlite_version >= 3.35 - 6c: re-backfill old->new imediat inainte de assert (ancora globala) - garda orfane: DROP anulat daca vreun creds legacy nu a aterizat in slot per-env - backup criptat accounts_rar_creds_enc_backup inainte de DROP - 6d: verificare prin PRAGMA table_info (NU grep — submissions are aceeasi coloana) Garda one-way, idempotenta la boot repetat (verificat). submissions.rar_creds_enc ramane neatinsa. tests/test_retragere_creds_enc.py: niciun read pe coloana veche, conturi rar-creds env-aware, are_creds per-env, DROP blocat de garda la lipsa copiere. 9 teste existente actualizate pe sloturi per-env. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
180 lines
6.1 KiB
Python
180 lines
6.1 KiB
Python
"""Teste FOUNDATION (Task #1): schema + migrari idempotente + openpyxl disponibil."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture()
|
|
def db_conn(monkeypatch):
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_foundation.db"))
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
from app.db import get_connection, init_db
|
|
init_db()
|
|
conn = get_connection()
|
|
yield conn
|
|
conn.close()
|
|
get_settings.cache_clear()
|
|
|
|
|
|
def _table_cols(conn, table: str) -> set[str]:
|
|
return {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
|
|
|
|
|
|
def _tables(conn) -> set[str]:
|
|
return {r["name"] for r in conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()}
|
|
|
|
|
|
# --- Coloane noi ---
|
|
|
|
def test_accounts_rar_creds_enc_dropata(db_conn):
|
|
"""US-013: accounts.rar_creds_enc a fost dropata; submissions.rar_creds_enc ramane."""
|
|
acc_cols = _table_cols(db_conn, "accounts")
|
|
sub_cols = _table_cols(db_conn, "submissions")
|
|
assert "rar_creds_enc" not in acc_cols, \
|
|
"accounts.rar_creds_enc trebuie sa fie ABSENTA dupa US-013 DROP"
|
|
assert "rar_creds_enc" in sub_cols, \
|
|
"submissions.rar_creds_enc trebuie sa RAMANA (creds efemere per-cerere)"
|
|
|
|
|
|
def test_submissions_batch_id(db_conn):
|
|
cols = _table_cols(db_conn, "submissions")
|
|
assert "batch_id" in cols
|
|
|
|
|
|
def test_submissions_row_index(db_conn):
|
|
cols = _table_cols(db_conn, "submissions")
|
|
assert "row_index" in cols
|
|
|
|
|
|
def test_submissions_purge_after(db_conn):
|
|
cols = _table_cols(db_conn, "submissions")
|
|
assert "purge_after" in cols
|
|
|
|
|
|
# --- Tabele noi ---
|
|
|
|
def test_column_mappings_table(db_conn):
|
|
assert "column_mappings" in _tables(db_conn)
|
|
|
|
|
|
def test_import_batches_table(db_conn):
|
|
assert "import_batches" in _tables(db_conn)
|
|
|
|
|
|
def test_import_rows_table(db_conn):
|
|
assert "import_rows" in _tables(db_conn)
|
|
|
|
|
|
def test_import_attestations_table(db_conn):
|
|
assert "import_attestations" in _tables(db_conn)
|
|
|
|
|
|
def test_import_rows_cols(db_conn):
|
|
cols = _table_cols(db_conn, "import_rows")
|
|
for c in ("id", "batch_id", "row_index", "raw_json", "resolved_status", "error"):
|
|
assert c in cols, f"coloana lipsa: {c}"
|
|
|
|
|
|
def test_import_batches_cols(db_conn):
|
|
cols = _table_cols(db_conn, "import_batches")
|
|
for c in ("id", "account_id", "filename", "status", "total", "ok",
|
|
"needs_mapping", "needs_data", "needs_review", "already_sent",
|
|
"duplicate_in_file", "created_at", "purge_after"):
|
|
assert c in cols, f"coloana lipsa: {c}"
|
|
|
|
|
|
def test_import_attestations_cols(db_conn):
|
|
cols = _table_cols(db_conn, "import_attestations")
|
|
for c in ("id", "batch_id", "account_id", "confirmed_by", "ts", "rows_hash", "n_confirmed"):
|
|
assert c in cols, f"coloana lipsa: {c}"
|
|
|
|
|
|
# --- Idempotenta init_db ---
|
|
|
|
def test_init_db_idempotent(monkeypatch):
|
|
"""init_db() ruleaza de doua ori fara eroare."""
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "idem.db"))
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
from app.db import init_db
|
|
init_db()
|
|
init_db() # a doua oara trebuie sa fie silentioasa
|
|
get_settings.cache_clear()
|
|
|
|
|
|
def test_migrate_on_existing_db(monkeypatch):
|
|
"""Migrarea functioneaza pe o DB veche (fara coloane noi)."""
|
|
import sqlite3
|
|
tmp = tempfile.mkdtemp()
|
|
db_path = os.path.join(tmp, "old.db")
|
|
# Creeaza schema minima fara coloanele noi
|
|
old_conn = sqlite3.connect(db_path)
|
|
old_conn.executescript("""
|
|
PRAGMA journal_mode = WAL;
|
|
CREATE TABLE IF NOT EXISTS accounts (id INTEGER PRIMARY KEY, name TEXT NOT NULL, cui TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')));
|
|
INSERT OR IGNORE INTO accounts (id, name) VALUES (1, 'default');
|
|
CREATE TABLE IF NOT EXISTS submissions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
idempotency_key TEXT NOT NULL UNIQUE,
|
|
account_id INTEGER,
|
|
status TEXT NOT NULL DEFAULT 'queued',
|
|
payload_json TEXT NOT NULL,
|
|
rar_status_code INTEGER,
|
|
rar_error TEXT,
|
|
id_prezentare INTEGER,
|
|
retry_count INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
CREATE TABLE IF NOT EXISTS worker_heartbeat (
|
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
|
last_beat TEXT, last_rar_login_ok TEXT, detail TEXT
|
|
);
|
|
INSERT OR IGNORE INTO worker_heartbeat (id, detail) VALUES (1, 'never started');
|
|
CREATE TABLE IF NOT EXISTS api_keys (id INTEGER PRIMARY KEY, account_id INTEGER, key_hash TEXT UNIQUE, active INTEGER DEFAULT 1, created_at TEXT);
|
|
CREATE TABLE IF NOT EXISTS operations_mapping (id INTEGER PRIMARY KEY, account_id INTEGER, cod_op_service TEXT, cod_prestatie TEXT, auto_send INTEGER DEFAULT 1, created_at TEXT, UNIQUE(account_id, cod_op_service));
|
|
CREATE TABLE IF NOT EXISTS nomenclator_rar (cod_prestatie TEXT PRIMARY KEY, nume_prestatie TEXT, updated_at TEXT);
|
|
""")
|
|
old_conn.close()
|
|
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", db_path)
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
from app.db import init_db, get_connection
|
|
init_db()
|
|
|
|
conn = get_connection()
|
|
sub_cols = {r["name"] for r in conn.execute("PRAGMA table_info(submissions)").fetchall()}
|
|
acc_cols = {r["name"] for r in conn.execute("PRAGMA table_info(accounts)").fetchall()}
|
|
assert "batch_id" in sub_cols
|
|
assert "row_index" in sub_cols
|
|
assert "purge_after" in sub_cols
|
|
# US-013: accounts.rar_creds_enc a fost dropata.
|
|
assert "rar_creds_enc" not in acc_cols
|
|
conn.close()
|
|
get_settings.cache_clear()
|
|
|
|
|
|
# --- openpyxl disponibil ---
|
|
|
|
def test_openpyxl_importabil():
|
|
import openpyxl
|
|
assert openpyxl.__version__.startswith("3.1")
|
|
|
|
|
|
def test_openpyxl_create_workbook():
|
|
from openpyxl import Workbook
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws["A1"] = "test"
|
|
assert ws["A1"].value == "test"
|