Files
rar-autopass/tests/test_t16_purjare.py
Claude Agent c842e3352a feat(5.6): observabilitate + jurnal aplicatie + lifecycle trimiteri blocate
Implementeaza PRD 5.6 complet (14 stories, TDD). Doua axe:

Lifecycle trimiteri blocate (Val A):
- submissions_admin.py: sterge/repune scoped (404 cross-account inaintea lui 409 stare)
- reactivare dedup peste `error` cu CAS (WHERE id=? AND status='error'), creds noi in
  submissions + accounts.rar_creds_enc; worker invalideaza sesiunea RAR la creds proaspete
  (JWT 30h vechi nu mai trimite cu parola gresita); camp aditiv `reactivated:true`
- retentie randuri blocate 30z; purge_expired exclude queued/sending; purge_after curatat
  la reactivare/requeue
- API DELETE /v1/prezentari/{id} + /repune (200+JSON); UI butoane + bulk + banner actionabil

Observabilitate:
- app/observ.py log_event: dublu canal app_events (DB) + RotatingFileHandler per-proces,
  redactare creds/PII la scriere (redact_pii/vin_partial)
- request_id middleware + X-Request-ID pe toate raspunsurile
- handler global excepții -> 500 envelope 6-chei + request_id (traceback doar in jurnal)
- audit cerere API (api_prezentari/api_auth_esuat) + audit worker (rar_login/tranzitii)
- tab "Jurnal" filtrabil scoped (non-admin doar contul sau); retentie jurnal 90z
- rar_error expus in GET /v1/prezentari/{id} (recovery observabil)

pytest -q: 741 passed, 0 failed. Docs: PRD raport VERIFY, contract endpointuri noi, ROADMAP.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-23 18:45:39 +00:00

202 lines
6.7 KiB
Python

"""Teste T16: job purjare + purge_after SET la insert (OV-5).
Verify:
(a) insert/sent -> purge_after populat (sent+90z).
(b) rand expirat -> sters de tick.
(c) import_rows purjate cu batch-ul (CASCADE).
"""
from __future__ import annotations
import json
import os
import tempfile
import pytest
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t16.db"))
from app.config import get_settings
get_settings.cache_clear()
from app.db import init_db
init_db()
yield monkeypatch
get_settings.cache_clear()
@pytest.fixture()
def conn(env):
from app.db import get_connection
c = get_connection()
yield c
c.close()
def _insert_submission(conn, account_id=1, status="queued", key_sfx=None):
content = {"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
"data_prestatie": "2026-06-15", "odometru_final": "1",
"prestatii": [{"cod_prestatie": "OE-1"}]}
sfx = key_sfx or os.urandom(4).hex()
cur = conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) VALUES (?, ?, ?, ?)",
(f"k-{sfx}", account_id, status, json.dumps(content)),
)
return int(cur.lastrowid)
# --- (a) purge_after populat la 'sent' ---
def test_mark_sent_seteaza_purge_after(conn):
"""(a) mark(sent) seteaza purge_after = now + 90 zile."""
import app.worker.__main__ as w
sid = _insert_submission(conn)
w.mark(conn, sid, "sent", rar_status_code=200, id_prezentare=12345)
row = conn.execute("SELECT status, purge_after FROM submissions WHERE id=?", (sid,)).fetchone()
assert row["status"] == "sent"
assert row["purge_after"] is not None, "purge_after trebuie setat la 'sent'"
# purge_after trebuie sa fie in viitor (>= now)
is_future = conn.execute(
"SELECT purge_after > datetime('now') AS ok FROM submissions WHERE id=?", (sid,)
).fetchone()["ok"]
assert is_future, "purge_after trebuie sa fie in viitor"
def test_mark_queued_nu_seteaza_purge_after(conn):
"""Starile active (queued) nu primesc purge_after.
NOTA US-013: error/needs_data/needs_mapping primesc ACUM purge_after (retentie
blocate, 30z) — vezi tests/test_purge_blocate.py. Doar queued/sending raman fara.
"""
import app.worker.__main__ as w
sid = _insert_submission(conn)
w.mark(conn, sid, "queued")
row = conn.execute("SELECT purge_after FROM submissions WHERE id=?", (sid,)).fetchone()
assert row["purge_after"] is None, "purge_after nu trebuie setat la 'queued'"
# --- (b) purge_expired sterge randurile expirate ---
def test_purge_expired_sterge_sent_expirat(conn):
"""(b) purge_expired sterge submissions cu purge_after < now si status=sent."""
import app.worker.__main__ as w
sid = _insert_submission(conn)
# Simuleaza un rand deja expirat (purge_after in trecut)
conn.execute(
"UPDATE submissions SET status='sent', purge_after=datetime('now', '-1 day') WHERE id=?",
(sid,),
)
stats = w.purge_expired(conn)
assert stats["submissions_purged"] == 1
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
assert row is None, "randul expirat trebuie sters"
def test_purge_expired_pastreaza_neexpirat(conn):
"""Randul cu purge_after in viitor nu e sters."""
import app.worker.__main__ as w
sid = _insert_submission(conn)
conn.execute(
"UPDATE submissions SET status='sent', purge_after=datetime('now', '+90 days') WHERE id=?",
(sid,),
)
stats = w.purge_expired(conn)
assert stats["submissions_purged"] == 0
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
assert row is not None, "randul ne-expirat trebuie pastrat"
def test_purge_expired_nu_sterge_queued(conn):
"""submissions in stare queued cu purge_after NULL nu sunt sterse."""
import app.worker.__main__ as w
sid = _insert_submission(conn, status="queued")
stats = w.purge_expired(conn)
assert stats["submissions_purged"] == 0
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
assert row is not None
# --- (c) import_rows purjate cu batch-ul (CASCADE) ---
def test_import_rows_cascade_cu_batch(conn):
"""(c) Stergerea import_batches sterge import_rows via CASCADE."""
import app.worker.__main__ as w
# Creeaza batch cu purge_after in trecut
cur_batch = conn.execute(
"INSERT INTO import_batches (account_id, filename, status, purge_after) "
"VALUES (1, 'test.xlsx', 'committed', datetime('now', '-1 day'))"
)
batch_id = cur_batch.lastrowid
# Adauga import_rows
conn.execute(
"INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) "
"VALUES (?, 0, 'test', 'ok')",
(batch_id,),
)
conn.execute(
"INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) "
"VALUES (?, 1, 'test2', 'ok')",
(batch_id,),
)
# Verifica ca avem randuri
n_rows = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"]
assert n_rows == 2
stats = w.purge_expired(conn)
assert stats["batches_purged"] == 1
# Batch sters
batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone()
assert batch is None
# import_rows sterse via CASCADE
n_rows_after = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"]
assert n_rows_after == 0, "import_rows trebuie sterse odata cu batch-ul (CASCADE)"
def test_import_batch_neexpirat_pastrat(conn):
"""import_batch cu purge_after in viitor nu e sters."""
import app.worker.__main__ as w
cur = conn.execute(
"INSERT INTO import_batches (account_id, filename, status, purge_after) "
"VALUES (1, 'future.xlsx', 'committed', datetime('now', '+90 days'))"
)
batch_id = cur.lastrowid
stats = w.purge_expired(conn)
assert stats["batches_purged"] == 0
batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone()
assert batch is not None
def test_import_batch_fara_purge_after_pastrat(conn):
"""import_batch fara purge_after nu e sters (NULL = nu expira)."""
import app.worker.__main__ as w
cur = conn.execute(
"INSERT INTO import_batches (account_id, filename, status) "
"VALUES (1, 'no_purge.xlsx', 'staging')"
)
batch_id = cur.lastrowid
stats = w.purge_expired(conn)
assert stats["batches_purged"] == 0
batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone()
assert batch is not None