feat(foundation): schema Treapta 2 + migrari aditive + openpyxl pinned (#1)
- accounts.rar_creds_enc TEXT (creds RAR durabile per-cont, D4) - submissions.batch_id, row_index (T7 scoping R1) - submissions.purge_after (T16 GDPR) - Tabele noi: column_mappings, import_batches, import_rows, import_attestations - _migrate idempotent pe DB veche (ALTER aditiv, pattern existent) - openpyxl==3.1.5 adaugat in requirements.txt (Issue 4, PINNED) - 15 teste noi: coloane, tabele, idempotenta, migrare DB veche, openpyxl Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
173
tests/test_foundation.py
Normal file
173
tests/test_foundation.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""Teste FOUNDATION (Task #1): schema + migrari idempotente + openpyxl disponibil."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def db_conn(monkeypatch):
|
||||
tmp = tempfile.mkdtemp()
|
||||
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "test_foundation.db"))
|
||||
from app.config import get_settings
|
||||
get_settings.cache_clear()
|
||||
from app.db import get_connection, init_db
|
||||
init_db()
|
||||
conn = get_connection()
|
||||
yield conn
|
||||
conn.close()
|
||||
get_settings.cache_clear()
|
||||
|
||||
|
||||
def _table_cols(conn, table: str) -> set[str]:
|
||||
return {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
|
||||
|
||||
|
||||
def _tables(conn) -> set[str]:
|
||||
return {r["name"] for r in conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table'"
|
||||
).fetchall()}
|
||||
|
||||
|
||||
# --- Coloane noi ---
|
||||
|
||||
def test_accounts_rar_creds_enc(db_conn):
|
||||
cols = _table_cols(db_conn, "accounts")
|
||||
assert "rar_creds_enc" in cols
|
||||
|
||||
|
||||
def test_submissions_batch_id(db_conn):
|
||||
cols = _table_cols(db_conn, "submissions")
|
||||
assert "batch_id" in cols
|
||||
|
||||
|
||||
def test_submissions_row_index(db_conn):
|
||||
cols = _table_cols(db_conn, "submissions")
|
||||
assert "row_index" in cols
|
||||
|
||||
|
||||
def test_submissions_purge_after(db_conn):
|
||||
cols = _table_cols(db_conn, "submissions")
|
||||
assert "purge_after" in cols
|
||||
|
||||
|
||||
# --- Tabele noi ---
|
||||
|
||||
def test_column_mappings_table(db_conn):
|
||||
assert "column_mappings" in _tables(db_conn)
|
||||
|
||||
|
||||
def test_import_batches_table(db_conn):
|
||||
assert "import_batches" in _tables(db_conn)
|
||||
|
||||
|
||||
def test_import_rows_table(db_conn):
|
||||
assert "import_rows" in _tables(db_conn)
|
||||
|
||||
|
||||
def test_import_attestations_table(db_conn):
|
||||
assert "import_attestations" in _tables(db_conn)
|
||||
|
||||
|
||||
def test_import_rows_cols(db_conn):
|
||||
cols = _table_cols(db_conn, "import_rows")
|
||||
for c in ("id", "batch_id", "row_index", "raw_json", "resolved_status", "error"):
|
||||
assert c in cols, f"coloana lipsa: {c}"
|
||||
|
||||
|
||||
def test_import_batches_cols(db_conn):
|
||||
cols = _table_cols(db_conn, "import_batches")
|
||||
for c in ("id", "account_id", "filename", "status", "total", "ok",
|
||||
"needs_mapping", "needs_data", "needs_review", "already_sent",
|
||||
"duplicate_in_file", "created_at", "purge_after"):
|
||||
assert c in cols, f"coloana lipsa: {c}"
|
||||
|
||||
|
||||
def test_import_attestations_cols(db_conn):
|
||||
cols = _table_cols(db_conn, "import_attestations")
|
||||
for c in ("id", "batch_id", "account_id", "confirmed_by", "ts", "rows_hash", "n_confirmed"):
|
||||
assert c in cols, f"coloana lipsa: {c}"
|
||||
|
||||
|
||||
# --- Idempotenta init_db ---
|
||||
|
||||
def test_init_db_idempotent(monkeypatch):
|
||||
"""init_db() ruleaza de doua ori fara eroare."""
|
||||
tmp = tempfile.mkdtemp()
|
||||
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "idem.db"))
|
||||
from app.config import get_settings
|
||||
get_settings.cache_clear()
|
||||
from app.db import init_db
|
||||
init_db()
|
||||
init_db() # a doua oara trebuie sa fie silentioasa
|
||||
get_settings.cache_clear()
|
||||
|
||||
|
||||
def test_migrate_on_existing_db(monkeypatch):
|
||||
"""Migrarea functioneaza pe o DB veche (fara coloane noi)."""
|
||||
import sqlite3
|
||||
tmp = tempfile.mkdtemp()
|
||||
db_path = os.path.join(tmp, "old.db")
|
||||
# Creeaza schema minima fara coloanele noi
|
||||
old_conn = sqlite3.connect(db_path)
|
||||
old_conn.executescript("""
|
||||
PRAGMA journal_mode = WAL;
|
||||
CREATE TABLE IF NOT EXISTS accounts (id INTEGER PRIMARY KEY, name TEXT NOT NULL, cui TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')));
|
||||
INSERT OR IGNORE INTO accounts (id, name) VALUES (1, 'default');
|
||||
CREATE TABLE IF NOT EXISTS submissions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
idempotency_key TEXT NOT NULL UNIQUE,
|
||||
account_id INTEGER,
|
||||
status TEXT NOT NULL DEFAULT 'queued',
|
||||
payload_json TEXT NOT NULL,
|
||||
rar_status_code INTEGER,
|
||||
rar_error TEXT,
|
||||
id_prezentare INTEGER,
|
||||
retry_count INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS worker_heartbeat (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
last_beat TEXT, last_rar_login_ok TEXT, detail TEXT
|
||||
);
|
||||
INSERT OR IGNORE INTO worker_heartbeat (id, detail) VALUES (1, 'never started');
|
||||
CREATE TABLE IF NOT EXISTS api_keys (id INTEGER PRIMARY KEY, account_id INTEGER, key_hash TEXT UNIQUE, active INTEGER DEFAULT 1, created_at TEXT);
|
||||
CREATE TABLE IF NOT EXISTS operations_mapping (id INTEGER PRIMARY KEY, account_id INTEGER, cod_op_service TEXT, cod_prestatie TEXT, auto_send INTEGER DEFAULT 1, created_at TEXT, UNIQUE(account_id, cod_op_service));
|
||||
CREATE TABLE IF NOT EXISTS nomenclator_rar (cod_prestatie TEXT PRIMARY KEY, nume_prestatie TEXT, updated_at TEXT);
|
||||
""")
|
||||
old_conn.close()
|
||||
|
||||
monkeypatch.setenv("AUTOPASS_DB_PATH", db_path)
|
||||
from app.config import get_settings
|
||||
get_settings.cache_clear()
|
||||
from app.db import init_db, get_connection
|
||||
init_db()
|
||||
|
||||
conn = get_connection()
|
||||
sub_cols = {r["name"] for r in conn.execute("PRAGMA table_info(submissions)").fetchall()}
|
||||
acc_cols = {r["name"] for r in conn.execute("PRAGMA table_info(accounts)").fetchall()}
|
||||
assert "batch_id" in sub_cols
|
||||
assert "row_index" in sub_cols
|
||||
assert "purge_after" in sub_cols
|
||||
assert "rar_creds_enc" in acc_cols
|
||||
conn.close()
|
||||
get_settings.cache_clear()
|
||||
|
||||
|
||||
# --- openpyxl disponibil ---
|
||||
|
||||
def test_openpyxl_importabil():
|
||||
import openpyxl
|
||||
assert openpyxl.__version__.startswith("3.1")
|
||||
|
||||
|
||||
def test_openpyxl_create_workbook():
|
||||
from openpyxl import Workbook
|
||||
wb = Workbook()
|
||||
ws = wb.active
|
||||
ws["A1"] = "test"
|
||||
assert ws["A1"].value == "test"
|
||||
Reference in New Issue
Block a user