"""Teste T16: job purjare + purge_after SET la insert (OV-5). Verify: (a) insert/sent -> purge_after populat (sent+90z). (b) rand expirat -> sters de tick. (c) import_rows purjate cu batch-ul (CASCADE). """ from __future__ import annotations import json import os import tempfile import pytest @pytest.fixture() def env(monkeypatch): tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t16.db")) from app.config import get_settings get_settings.cache_clear() from app.db import init_db init_db() yield monkeypatch get_settings.cache_clear() @pytest.fixture() def conn(env): from app.db import get_connection c = get_connection() yield c c.close() def _insert_submission(conn, account_id=1, status="queued", key_sfx=None): content = {"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1", "data_prestatie": "2026-06-15", "odometru_final": "1", "prestatii": [{"cod_prestatie": "OE-1"}]} sfx = key_sfx or os.urandom(4).hex() cur = conn.execute( "INSERT INTO submissions (idempotency_key, account_id, status, payload_json) VALUES (?, ?, ?, ?)", (f"k-{sfx}", account_id, status, json.dumps(content)), ) return int(cur.lastrowid) # --- (a) purge_after populat la 'sent' --- def test_mark_sent_seteaza_purge_after(conn): """(a) mark(sent) seteaza purge_after = now + 90 zile.""" import app.worker.__main__ as w sid = _insert_submission(conn) w.mark(conn, sid, "sent", rar_status_code=200, id_prezentare=12345) row = conn.execute("SELECT status, purge_after FROM submissions WHERE id=?", (sid,)).fetchone() assert row["status"] == "sent" assert row["purge_after"] is not None, "purge_after trebuie setat la 'sent'" # purge_after trebuie sa fie in viitor (>= now) is_future = conn.execute( "SELECT purge_after > datetime('now') AS ok FROM submissions WHERE id=?", (sid,) ).fetchone()["ok"] assert is_future, "purge_after trebuie sa fie in viitor" def test_mark_queued_nu_seteaza_purge_after(conn): """Starile active (queued) nu primesc purge_after. NOTA US-013: error/needs_data/needs_mapping primesc ACUM purge_after (retentie blocate, 30z) — vezi tests/test_purge_blocate.py. Doar queued/sending raman fara. """ import app.worker.__main__ as w sid = _insert_submission(conn) w.mark(conn, sid, "queued") row = conn.execute("SELECT purge_after FROM submissions WHERE id=?", (sid,)).fetchone() assert row["purge_after"] is None, "purge_after nu trebuie setat la 'queued'" # --- (b) purge_expired sterge randurile expirate --- def test_purge_expired_sterge_sent_expirat(conn): """(b) purge_expired sterge submissions cu purge_after < now si status=sent.""" import app.worker.__main__ as w sid = _insert_submission(conn) # Simuleaza un rand deja expirat (purge_after in trecut) conn.execute( "UPDATE submissions SET status='sent', purge_after=datetime('now', '-1 day') WHERE id=?", (sid,), ) stats = w.purge_expired(conn) assert stats["submissions_purged"] == 1 row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() assert row is None, "randul expirat trebuie sters" def test_purge_expired_pastreaza_neexpirat(conn): """Randul cu purge_after in viitor nu e sters.""" import app.worker.__main__ as w sid = _insert_submission(conn) conn.execute( "UPDATE submissions SET status='sent', purge_after=datetime('now', '+90 days') WHERE id=?", (sid,), ) stats = w.purge_expired(conn) assert stats["submissions_purged"] == 0 row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() assert row is not None, "randul ne-expirat trebuie pastrat" def test_purge_expired_nu_sterge_queued(conn): """submissions in stare queued cu purge_after NULL nu sunt sterse.""" import app.worker.__main__ as w sid = _insert_submission(conn, status="queued") stats = w.purge_expired(conn) assert stats["submissions_purged"] == 0 row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() assert row is not None # --- (c) import_rows purjate cu batch-ul (CASCADE) --- def test_import_rows_cascade_cu_batch(conn): """(c) Stergerea import_batches sterge import_rows via CASCADE.""" import app.worker.__main__ as w # Creeaza batch cu purge_after in trecut cur_batch = conn.execute( "INSERT INTO import_batches (account_id, filename, status, purge_after) " "VALUES (1, 'test.xlsx', 'committed', datetime('now', '-1 day'))" ) batch_id = cur_batch.lastrowid # Adauga import_rows conn.execute( "INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) " "VALUES (?, 0, 'test', 'ok')", (batch_id,), ) conn.execute( "INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) " "VALUES (?, 1, 'test2', 'ok')", (batch_id,), ) # Verifica ca avem randuri n_rows = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"] assert n_rows == 2 stats = w.purge_expired(conn) assert stats["batches_purged"] == 1 # Batch sters batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone() assert batch is None # import_rows sterse via CASCADE n_rows_after = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"] assert n_rows_after == 0, "import_rows trebuie sterse odata cu batch-ul (CASCADE)" def test_import_batch_neexpirat_pastrat(conn): """import_batch cu purge_after in viitor nu e sters.""" import app.worker.__main__ as w cur = conn.execute( "INSERT INTO import_batches (account_id, filename, status, purge_after) " "VALUES (1, 'future.xlsx', 'committed', datetime('now', '+90 days'))" ) batch_id = cur.lastrowid stats = w.purge_expired(conn) assert stats["batches_purged"] == 0 batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone() assert batch is not None def test_import_batch_fara_purge_after_pastrat(conn): """import_batch fara purge_after nu e sters (NULL = nu expira).""" import app.worker.__main__ as w cur = conn.execute( "INSERT INTO import_batches (account_id, filename, status) " "VALUES (1, 'no_purge.xlsx', 'staging')" ) batch_id = cur.lastrowid stats = w.purge_expired(conn) assert stats["batches_purged"] == 0 batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone() assert batch is not None