feat(import): T16 job purjare + purge_after SET la sent (OV-5)
- mark(sent): seteaza purge_after = now + 90 zile (GDPR/L.142) - purge_expired(conn): sterge submissions sent expirate + import_batches expirate (import_rows via ON DELETE CASCADE). NULL purge_after = nu expira. - run(): tick de purjare odata pe ora (guard _last_purge_time + _PURGE_INTERVAL_S) NU mai agresiv, nu blocheaza trimiterea - 8 teste: purge_after la sent, alte stari fara purge, expirati vs neexpirat, queued neatins, cascade import_rows, null purge_after pastrat Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -75,6 +75,15 @@ def _is_transient(exc: Exception) -> bool:
|
|||||||
# --- Operatii pe submissions ---
|
# --- Operatii pe submissions ---
|
||||||
|
|
||||||
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
|
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
|
||||||
|
if status == "sent":
|
||||||
|
# T16: purge_after = sent + 90 zile (GDPR/L.142 retentie maxima).
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
|
||||||
|
"sending_since=NULL, updated_at=datetime('now'), "
|
||||||
|
"purge_after=datetime('now', '+90 days') WHERE id=?",
|
||||||
|
(status, rar_status_code, rar_error, id_prezentare, submission_id),
|
||||||
|
)
|
||||||
|
else:
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
|
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
|
||||||
"sending_since=NULL, updated_at=datetime('now') WHERE id=?",
|
"sending_since=NULL, updated_at=datetime('now') WHERE id=?",
|
||||||
@@ -82,6 +91,29 @@ def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_err
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# T16: purge interval in secunde (odata pe ora, nu prea agresiv)
|
||||||
|
_PURGE_INTERVAL_S = 3600
|
||||||
|
|
||||||
|
|
||||||
|
def purge_expired(conn) -> dict[str, int]:
|
||||||
|
"""Sterge randurile expirate (purge_after < now).
|
||||||
|
|
||||||
|
T16/OV-5: purge_after era exportat dar setat de nimeni si niciun job nu exista.
|
||||||
|
Acum: submissions sent + expirate, import_batches expirate (import_rows via CASCADE).
|
||||||
|
Intoarce {submissions_purged, batches_purged}.
|
||||||
|
"""
|
||||||
|
cur_sub = conn.execute(
|
||||||
|
"DELETE FROM submissions WHERE purge_after IS NOT NULL AND purge_after < datetime('now') AND status='sent'"
|
||||||
|
)
|
||||||
|
cur_batch = conn.execute(
|
||||||
|
"DELETE FROM import_batches WHERE purge_after IS NOT NULL AND purge_after < datetime('now')"
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"submissions_purged": cur_sub.rowcount,
|
||||||
|
"batches_purged": cur_batch.rowcount,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason: str) -> None:
|
def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason: str) -> None:
|
||||||
"""Re-pune randul in coada cu retry++ si next_attempt_at = now + backoff."""
|
"""Re-pune randul in coada cu retry++ si next_attempt_at = now + backoff."""
|
||||||
row = conn.execute("SELECT retry_count FROM submissions WHERE id=?", (submission_id,)).fetchone()
|
row = conn.execute("SELECT retry_count FROM submissions WHERE id=?", (submission_id,)).fetchone()
|
||||||
@@ -330,11 +362,24 @@ def run() -> int:
|
|||||||
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
|
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
|
||||||
|
|
||||||
sessions = AccountSessions(settings)
|
sessions = AccountSessions(settings)
|
||||||
|
_last_purge_time: float = 0.0
|
||||||
|
|
||||||
while _running:
|
while _running:
|
||||||
try:
|
try:
|
||||||
write_heartbeat(conn, detail=f"poll (queue={_queue_depth(conn)})")
|
write_heartbeat(conn, detail=f"poll (queue={_queue_depth(conn)})")
|
||||||
|
|
||||||
|
# T16: purjare periodica (odata pe ora) — NU mai frecvent.
|
||||||
|
now_ts = time.time()
|
||||||
|
if now_ts - _last_purge_time >= _PURGE_INTERVAL_S:
|
||||||
|
stats = purge_expired(conn)
|
||||||
|
if stats["submissions_purged"] or stats["batches_purged"]:
|
||||||
|
print(
|
||||||
|
f"[worker] purjare: {stats['submissions_purged']} submissions, "
|
||||||
|
f"{stats['batches_purged']} batches sterse",
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
_last_purge_time = now_ts
|
||||||
|
|
||||||
if not settings.worker_send_enabled:
|
if not settings.worker_send_enabled:
|
||||||
time.sleep(settings.worker_poll_interval_s)
|
time.sleep(settings.worker_poll_interval_s)
|
||||||
continue
|
continue
|
||||||
|
|||||||
197
tests/test_t16_purjare.py
Normal file
197
tests/test_t16_purjare.py
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
"""Teste T16: job purjare + purge_after SET la insert (OV-5).
|
||||||
|
|
||||||
|
Verify:
|
||||||
|
(a) insert/sent -> purge_after populat (sent+90z).
|
||||||
|
(b) rand expirat -> sters de tick.
|
||||||
|
(c) import_rows purjate cu batch-ul (CASCADE).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def env(monkeypatch):
|
||||||
|
tmp = tempfile.mkdtemp()
|
||||||
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t16.db"))
|
||||||
|
from app.config import get_settings
|
||||||
|
get_settings.cache_clear()
|
||||||
|
from app.db import init_db
|
||||||
|
init_db()
|
||||||
|
yield monkeypatch
|
||||||
|
get_settings.cache_clear()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def conn(env):
|
||||||
|
from app.db import get_connection
|
||||||
|
c = get_connection()
|
||||||
|
yield c
|
||||||
|
c.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _insert_submission(conn, account_id=1, status="queued", key_sfx=None):
|
||||||
|
content = {"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
|
||||||
|
"data_prestatie": "2026-06-15", "odometru_final": "1",
|
||||||
|
"prestatii": [{"cod_prestatie": "OE-1"}]}
|
||||||
|
sfx = key_sfx or os.urandom(4).hex()
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json) VALUES (?, ?, ?, ?)",
|
||||||
|
(f"k-{sfx}", account_id, status, json.dumps(content)),
|
||||||
|
)
|
||||||
|
return int(cur.lastrowid)
|
||||||
|
|
||||||
|
|
||||||
|
# --- (a) purge_after populat la 'sent' ---
|
||||||
|
|
||||||
|
def test_mark_sent_seteaza_purge_after(conn):
|
||||||
|
"""(a) mark(sent) seteaza purge_after = now + 90 zile."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
sid = _insert_submission(conn)
|
||||||
|
w.mark(conn, sid, "sent", rar_status_code=200, id_prezentare=12345)
|
||||||
|
|
||||||
|
row = conn.execute("SELECT status, purge_after FROM submissions WHERE id=?", (sid,)).fetchone()
|
||||||
|
assert row["status"] == "sent"
|
||||||
|
assert row["purge_after"] is not None, "purge_after trebuie setat la 'sent'"
|
||||||
|
# purge_after trebuie sa fie in viitor (>= now)
|
||||||
|
is_future = conn.execute(
|
||||||
|
"SELECT purge_after > datetime('now') AS ok FROM submissions WHERE id=?", (sid,)
|
||||||
|
).fetchone()["ok"]
|
||||||
|
assert is_future, "purge_after trebuie sa fie in viitor"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mark_other_status_nu_seteaza_purge_after(conn):
|
||||||
|
"""Alte statusuri (error, needs_data) nu seteaza purge_after."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
sid = _insert_submission(conn)
|
||||||
|
w.mark(conn, sid, "error", rar_error="test")
|
||||||
|
|
||||||
|
row = conn.execute("SELECT purge_after FROM submissions WHERE id=?", (sid,)).fetchone()
|
||||||
|
assert row["purge_after"] is None, "purge_after nu trebuie setat la 'error'"
|
||||||
|
|
||||||
|
|
||||||
|
# --- (b) purge_expired sterge randurile expirate ---
|
||||||
|
|
||||||
|
def test_purge_expired_sterge_sent_expirat(conn):
|
||||||
|
"""(b) purge_expired sterge submissions cu purge_after < now si status=sent."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
sid = _insert_submission(conn)
|
||||||
|
# Simuleaza un rand deja expirat (purge_after in trecut)
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE submissions SET status='sent', purge_after=datetime('now', '-1 day') WHERE id=?",
|
||||||
|
(sid,),
|
||||||
|
)
|
||||||
|
|
||||||
|
stats = w.purge_expired(conn)
|
||||||
|
assert stats["submissions_purged"] == 1
|
||||||
|
|
||||||
|
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
|
||||||
|
assert row is None, "randul expirat trebuie sters"
|
||||||
|
|
||||||
|
|
||||||
|
def test_purge_expired_pastreaza_neexpirat(conn):
|
||||||
|
"""Randul cu purge_after in viitor nu e sters."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
sid = _insert_submission(conn)
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE submissions SET status='sent', purge_after=datetime('now', '+90 days') WHERE id=?",
|
||||||
|
(sid,),
|
||||||
|
)
|
||||||
|
|
||||||
|
stats = w.purge_expired(conn)
|
||||||
|
assert stats["submissions_purged"] == 0
|
||||||
|
|
||||||
|
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
|
||||||
|
assert row is not None, "randul ne-expirat trebuie pastrat"
|
||||||
|
|
||||||
|
|
||||||
|
def test_purge_expired_nu_sterge_queued(conn):
|
||||||
|
"""submissions in stare queued cu purge_after NULL nu sunt sterse."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
sid = _insert_submission(conn, status="queued")
|
||||||
|
|
||||||
|
stats = w.purge_expired(conn)
|
||||||
|
assert stats["submissions_purged"] == 0
|
||||||
|
|
||||||
|
row = conn.execute("SELECT id FROM submissions WHERE id=?", (sid,)).fetchone()
|
||||||
|
assert row is not None
|
||||||
|
|
||||||
|
|
||||||
|
# --- (c) import_rows purjate cu batch-ul (CASCADE) ---
|
||||||
|
|
||||||
|
def test_import_rows_cascade_cu_batch(conn):
|
||||||
|
"""(c) Stergerea import_batches sterge import_rows via CASCADE."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
|
||||||
|
# Creeaza batch cu purge_after in trecut
|
||||||
|
cur_batch = conn.execute(
|
||||||
|
"INSERT INTO import_batches (account_id, filename, status, purge_after) "
|
||||||
|
"VALUES (1, 'test.xlsx', 'committed', datetime('now', '-1 day'))"
|
||||||
|
)
|
||||||
|
batch_id = cur_batch.lastrowid
|
||||||
|
|
||||||
|
# Adauga import_rows
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) "
|
||||||
|
"VALUES (?, 0, 'test', 'ok')",
|
||||||
|
(batch_id,),
|
||||||
|
)
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status) "
|
||||||
|
"VALUES (?, 1, 'test2', 'ok')",
|
||||||
|
(batch_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verifica ca avem randuri
|
||||||
|
n_rows = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"]
|
||||||
|
assert n_rows == 2
|
||||||
|
|
||||||
|
stats = w.purge_expired(conn)
|
||||||
|
assert stats["batches_purged"] == 1
|
||||||
|
|
||||||
|
# Batch sters
|
||||||
|
batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone()
|
||||||
|
assert batch is None
|
||||||
|
|
||||||
|
# import_rows sterse via CASCADE
|
||||||
|
n_rows_after = conn.execute("SELECT COUNT(*) AS n FROM import_rows WHERE batch_id=?", (batch_id,)).fetchone()["n"]
|
||||||
|
assert n_rows_after == 0, "import_rows trebuie sterse odata cu batch-ul (CASCADE)"
|
||||||
|
|
||||||
|
|
||||||
|
def test_import_batch_neexpirat_pastrat(conn):
|
||||||
|
"""import_batch cu purge_after in viitor nu e sters."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT INTO import_batches (account_id, filename, status, purge_after) "
|
||||||
|
"VALUES (1, 'future.xlsx', 'committed', datetime('now', '+90 days'))"
|
||||||
|
)
|
||||||
|
batch_id = cur.lastrowid
|
||||||
|
|
||||||
|
stats = w.purge_expired(conn)
|
||||||
|
assert stats["batches_purged"] == 0
|
||||||
|
|
||||||
|
batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone()
|
||||||
|
assert batch is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_import_batch_fara_purge_after_pastrat(conn):
|
||||||
|
"""import_batch fara purge_after nu e sters (NULL = nu expira)."""
|
||||||
|
import app.worker.__main__ as w
|
||||||
|
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT INTO import_batches (account_id, filename, status) "
|
||||||
|
"VALUES (1, 'no_purge.xlsx', 'staging')"
|
||||||
|
)
|
||||||
|
batch_id = cur.lastrowid
|
||||||
|
|
||||||
|
stats = w.purge_expired(conn)
|
||||||
|
assert stats["batches_purged"] == 0
|
||||||
|
|
||||||
|
batch = conn.execute("SELECT id FROM import_batches WHERE id=?", (batch_id,)).fetchone()
|
||||||
|
assert batch is not None
|
||||||
Reference in New Issue
Block a user