From ef52dc2823c7870aabddff194133b102d084595c Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Tue, 16 Jun 2026 20:24:59 +0000 Subject: [PATCH] feat(import): T16 job purjare + purge_after SET la sent (OV-5) - mark(sent): seteaza purge_after = now + 90 zile (GDPR/L.142) - purge_expired(conn): sterge submissions sent expirate + import_batches expirate (import_rows via ON DELETE CASCADE). NULL purge_after = nu expira. - run(): tick de purjare odata pe ora (guard _last_purge_time + _PURGE_INTERVAL_S) NU mai agresiv, nu blocheaza trimiterea - 8 teste: purge_after la sent, alte stari fara purge, expirati vs neexpirat, queued neatins, cascade import_rows, null purge_after pastrat Co-Authored-By: Claude Sonnet 4.6 --- app/worker/__main__.py | 53 +++++++++- tests/test_t16_purjare.py | 197 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 tests/test_t16_purjare.py diff --git a/app/worker/__main__.py b/app/worker/__main__.py index 97124b9..a6a5d1e 100644 --- a/app/worker/__main__.py +++ b/app/worker/__main__.py @@ -75,11 +75,43 @@ def _is_transient(exc: Exception) -> bool: # --- Operatii pe submissions --- def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None: - conn.execute( - "UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, " - "sending_since=NULL, updated_at=datetime('now') WHERE id=?", - (status, rar_status_code, rar_error, id_prezentare, submission_id), + if status == "sent": + # T16: purge_after = sent + 90 zile (GDPR/L.142 retentie maxima). + conn.execute( + "UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, " + "sending_since=NULL, updated_at=datetime('now'), " + "purge_after=datetime('now', '+90 days') WHERE id=?", + (status, rar_status_code, rar_error, id_prezentare, submission_id), + ) + else: + conn.execute( + "UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, " + "sending_since=NULL, updated_at=datetime('now') WHERE id=?", + (status, rar_status_code, rar_error, id_prezentare, submission_id), + ) + + +# T16: purge interval in secunde (odata pe ora, nu prea agresiv) +_PURGE_INTERVAL_S = 3600 + + +def purge_expired(conn) -> dict[str, int]: + """Sterge randurile expirate (purge_after < now). + + T16/OV-5: purge_after era exportat dar setat de nimeni si niciun job nu exista. + Acum: submissions sent + expirate, import_batches expirate (import_rows via CASCADE). + Intoarce {submissions_purged, batches_purged}. + """ + cur_sub = conn.execute( + "DELETE FROM submissions WHERE purge_after IS NOT NULL AND purge_after < datetime('now') AND status='sent'" ) + cur_batch = conn.execute( + "DELETE FROM import_batches WHERE purge_after IS NOT NULL AND purge_after < datetime('now')" + ) + return { + "submissions_purged": cur_sub.rowcount, + "batches_purged": cur_batch.rowcount, + } def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason: str) -> None: @@ -330,11 +362,24 @@ def run() -> int: print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True) sessions = AccountSessions(settings) + _last_purge_time: float = 0.0 while _running: try: write_heartbeat(conn, detail=f"poll (queue={_queue_depth(conn)})") + # T16: purjare periodica (odata pe ora) — NU mai frecvent. + now_ts = time.time() + if now_ts - _last_purge_time >= _PURGE_INTERVAL_S: + stats = purge_expired(conn) + if stats["submissions_purged"] or stats["batches_purged"]: + print( + f"[worker] purjare: {stats['submissions_purged']} submissions, " + f"{stats['batches_purged']} batches sterse", + flush=True, + ) + _last_purge_time = now_ts + if not settings.worker_send_enabled: time.sleep(settings.worker_poll_interval_s) continue diff --git a/tests/test_t16_purjare.py b/tests/test_t16_purjare.py new file mode 100644 index 0000000..a08d13c --- /dev/null +++ b/tests/test_t16_purjare.py @@ -0,0 +1,197 @@ +"""Teste T16: job purjare + purge_after SET la insert (OV-5). + +Verify: +(a) insert/sent -> purge_after populat (sent+90z). +(b) rand expirat -> sters de tick. +(c) import_rows purjate cu batch-ul (CASCADE). +""" + +from __future__ import annotations + +import json +import os +import tempfile + +import pytest + + +@pytest.fixture() +def env(monkeypatch): + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t16.db")) + from app.config import get_settings + get_settings.cache_clear() + from app.db import init_db + init_db() + yield monkeypatch + get_settings.cache_clear() + + +@pytest.fixture() +def conn(env): + from app.db import get_connection + c = get_connection() + yield c + c.close() + + +def _insert_submission(conn, account_id=1, status="queued", key_sfx=None): + content = {"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1", + "data_prestatie": "2026-06-15", "odometru_final": "1", + "prestatii": [{"cod_prestatie": "OE-1"}]} + sfx = key_sfx or os.urandom(4).hex() + cur = conn.execute( + "INSERT INTO submissions (idempotency_key, account_id, status, payload_json) VALUES (?, ?, ?, ?)", + (f"k-{sfx}", account_id, status, json.dumps(content)), + ) + return int(cur.lastrowid) + + +# --- (a) purge_after populat la 'sent' --- + +def test_mark_sent_seteaza_purge_after(conn): + """(a) mark(sent) seteaza purge_after = now + 90 zile.""" + import app.worker.__main__ as w + sid = _insert_submission(conn) + w.mark(conn, sid, "sent", rar_status_code=200, id_prezentare=12345) + + row = conn.execute("SELECT status, purge_after FROM submissions WHERE id=?", (sid,)).fetchone() + assert row["status"] == "sent" + assert row["purge_after"] is not None, "purge_after trebuie setat la 'sent'" + # purge_after trebuie sa fie in viitor (>= now) + is_future = conn.execute( + "SELECT purge_after > datetime('now') AS ok FROM submissions WHERE id=?", (sid,) + ).fetchone()["ok"] + assert is_future, "purge_after trebuie sa fie in viitor" + + +def test_mark_other_status_nu_seteaza_purge_after(conn): + """Alte statusuri (error, needs_data) nu seteaza purge_after.""" + import app.worker.__main__ as w + sid = _insert_submission(conn) + w.mark(conn, sid, "error", rar_error="test") + + row = conn.execute("SELECT purge_after FROM submissions WHERE id=?", (sid,)).fetchone() + assert row["purge_after"] is None, "purge_after nu trebuie setat la 'error'" + + +# --- (b) purge_expired sterge randurile expirate --- + +def test_purge_expired_sterge_sent_expirat(conn): + """(b) purge_expired sterge submissions cu purge_after < now si status=sent.""" + import app.worker.__main__ as w + sid = _insert_submission(conn) + # Simuleaza un rand deja expirat (purge_after in trecut) + conn.execute( + "UPDATE submissions SET status='sent', purge_after=datetime('now', '-1 day') WHERE id=?", + (sid,), + ) + + stats = w.purge_expired(conn) + assert stats["submissions_purged"] == 1 + + row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() + assert row is None, "randul expirat trebuie sters" + + +def test_purge_expired_pastreaza_neexpirat(conn): + """Randul cu purge_after in viitor nu e sters.""" + import app.worker.__main__ as w + sid = _insert_submission(conn) + conn.execute( + "UPDATE submissions SET status='sent', purge_after=datetime('now', '+90 days') WHERE id=?", + (sid,), + ) + + stats = w.purge_expired(conn) + assert stats["submissions_purged"] == 0 + + row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() + assert row is not None, "randul ne-expirat trebuie pastrat" + + +def test_purge_expired_nu_sterge_queued(conn): + """submissions in stare queued cu purge_after NULL nu sunt sterse.""" + import app.worker.__main__ as w + sid = _insert_submission(conn, status="queued") + + stats = w.purge_expired(conn) + assert stats["submissions_purged"] == 0 + + row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone() + assert row is not None + + +# --- (c) import_rows purjate cu batch-ul (CASCADE) --- + +def test_import_rows_cascade_cu_batch(conn): + """(c) Stergerea import_batches sterge import_rows via CASCADE.""" + import app.worker.__main__ as w + + # Creeaza batch cu purge_after in trecut + cur_batch = conn.execute( + "INSERT INTO import_batches (account_id, filename, status, purge_after) " + "VALUES (1, 'test.xlsx', 'committed', datetime('now', '-1 day'))" + ) + batch_id = cur_batch.lastrowid + + # Adauga import_rows + conn.execute( + "INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) " + "VALUES (?, 0, 'test', 'ok')", + (batch_id,), + ) + conn.execute( + "INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) " + "VALUES (?, 1, 'test2', 'ok')", + (batch_id,), + ) + + # Verifica ca avem randuri + n_rows = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"] + assert n_rows == 2 + + stats = w.purge_expired(conn) + assert stats["batches_purged"] == 1 + + # Batch sters + batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone() + assert batch is None + + # import_rows sterse via CASCADE + n_rows_after = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"] + assert n_rows_after == 0, "import_rows trebuie sterse odata cu batch-ul (CASCADE)" + + +def test_import_batch_neexpirat_pastrat(conn): + """import_batch cu purge_after in viitor nu e sters.""" + import app.worker.__main__ as w + + cur = conn.execute( + "INSERT INTO import_batches (account_id, filename, status, purge_after) " + "VALUES (1, 'future.xlsx', 'committed', datetime('now', '+90 days'))" + ) + batch_id = cur.lastrowid + + stats = w.purge_expired(conn) + assert stats["batches_purged"] == 0 + + batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone() + assert batch is not None + + +def test_import_batch_fara_purge_after_pastrat(conn): + """import_batch fara purge_after nu e sters (NULL = nu expira).""" + import app.worker.__main__ as w + + cur = conn.execute( + "INSERT INTO import_batches (account_id, filename, status) " + "VALUES (1, 'no_purge.xlsx', 'staging')" + ) + batch_id = cur.lastrowid + + stats = w.purge_expired(conn) + assert stats["batches_purged"] == 0 + + batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone() + assert batch is not None