feat(5.6): observabilitate + jurnal aplicatie + lifecycle trimiteri blocate

Implementeaza PRD 5.6 complet (14 stories, TDD). Doua axe:

Lifecycle trimiteri blocate (Val A):
- submissions_admin.py: sterge/repune scoped (404 cross-account inaintea lui 409 stare)
- reactivare dedup peste `error` cu CAS (WHERE id=? AND status='error'), creds noi in
  submissions + accounts.rar_creds_enc; worker invalideaza sesiunea RAR la creds proaspete
  (JWT 30h vechi nu mai trimite cu parola gresita); camp aditiv `reactivated:true`
- retentie randuri blocate 30z; purge_expired exclude queued/sending; purge_after curatat
  la reactivare/requeue
- API DELETE /v1/prezentari/{id} + /repune (200+JSON); UI butoane + bulk + banner actionabil

Observabilitate:
- app/observ.py log_event: dublu canal app_events (DB) + RotatingFileHandler per-proces,
  redactare creds/PII la scriere (redact_pii/vin_partial)
- request_id middleware + X-Request-ID pe toate raspunsurile
- handler global excepții -> 500 envelope 6-chei + request_id (traceback doar in jurnal)
- audit cerere API (api_prezentari/api_auth_esuat) + audit worker (rar_login/tranzitii)
- tab "Jurnal" filtrabil scoped (non-admin doar contul sau); retentie jurnal 90z
- rar_error expus in GET /v1/prezentari/{id} (recovery observabil)

pytest -q: 741 passed, 0 failed. Docs: PRD raport VERIFY, contract endpointuri noi, ROADMAP.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-06-23 18:45:39 +00:00
parent f48346de5c
commit c842e3352a
40 changed files with 2851 additions and 64 deletions

View File

@@ -38,6 +38,7 @@ from .. import errors
from ..config import Settings, get_settings, load_test_credentials
from ..crypto import decrypt_creds
from ..db import get_connection, init_db, write_heartbeat
from ..observ import log_event, set_source
from ..mapping import DEFAULT_ACCOUNT_ID, upsert_nomenclator
from ..payload import build_rar_payload
from ..reconcile import match_finalizata
@@ -59,6 +60,14 @@ def _iso(dt: datetime) -> str:
return dt.isoformat(timespec="seconds")
def _wlog(conn, tip: str, mesaj: str, *, nivel: str = "INFO", account_id=None, cod=None, context=None) -> None:
"""Migrare print -> jurnal structurat (US-005): emite evenimentul (sursa=worker, dublu
canal DB+fisier) SI pastreaza linia in stdout (operatorul tailuieste .run/worker.log)."""
print(f"[worker] {mesaj}", flush=True)
log_event(tip, nivel=nivel, account_id=account_id, cod=cod, mesaj=mesaj, context=context,
conn=conn, sursa="worker")
def _backoff_seconds(settings: Settings, retry_count: int) -> int:
return min(settings.worker_retry_base_s * (2 ** max(0, retry_count - 1)), settings.worker_retry_max_s)
@@ -75,13 +84,26 @@ def _is_transient(exc: Exception) -> bool:
# --- Operatii pe submissions ---
# Stari blocate ne-sent care primesc retentie proprie (US-013). Mai scurta decat
# cele 90z ale `sent`: un blocat n-are valoare de audit ca o trimitere reusita.
_BLOCKED_STATES = ("error", "needs_data", "needs_mapping")
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
if status == "sent":
# T16: purge_after = sent + 90 zile (GDPR/L.142 retentie maxima).
purge_expr = "datetime('now', '+90 days')"
elif status in _BLOCKED_STATES:
# US-013: randurile blocate primesc si ele purge_after (altfel raman permanent).
days = int(get_settings().blocked_retention_days)
purge_expr = f"datetime('now', '+{days} days')"
else:
purge_expr = None
if purge_expr is not None:
conn.execute(
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
"sending_since=NULL, updated_at=datetime('now'), "
"purge_after=datetime('now', '+90 days') WHERE id=?",
f"sending_since=NULL, updated_at=datetime('now'), purge_after={purge_expr} WHERE id=?",
(status, rar_status_code, rar_error, id_prezentare, submission_id),
)
else:
@@ -99,19 +121,26 @@ _PURGE_INTERVAL_S = 3600
def purge_expired(conn) -> dict[str, int]:
"""Sterge randurile expirate (purge_after < now).
T16/OV-5: purge_after era exportat dar setat de nimeni si niciun job nu exista.
Acum: submissions sent + expirate, import_batches expirate (import_rows via CASCADE).
Intoarce {submissions_purged, batches_purged}.
T16/OV-5 + US-013/US-008: submissions `sent` SI blocate (error/needs_data/needs_mapping)
expirate; import_batches expirate (import_rows via CASCADE); app_events expirate (jurnal).
EXCLUDE explicit `queued`/`sending` (randuri active — nu se purjeaza niciodata, chiar
daca ar avea un purge_after rezidual; reactivarea il curata oricum).
Intoarce {submissions_purged, batches_purged, events_purged}.
"""
cur_sub = conn.execute(
"DELETE FROM submissions WHERE purge_after IS NOT NULL AND purge_after < datetime('now') AND status='sent'"
"DELETE FROM submissions WHERE purge_after IS NOT NULL AND purge_after < datetime('now') "
"AND status IN ('sent','error','needs_data','needs_mapping')"
)
cur_batch = conn.execute(
"DELETE FROM import_batches WHERE purge_after IS NOT NULL AND purge_after < datetime('now')"
)
cur_events = conn.execute(
"DELETE FROM app_events WHERE purge_after IS NOT NULL AND purge_after < datetime('now')"
)
return {
"submissions_purged": cur_sub.rowcount,
"batches_purged": cur_batch.rowcount,
"events_purged": cur_events.rowcount,
}
@@ -186,7 +215,9 @@ def reconcile_and_mark(conn, rar: RarClient, token: str, submission_id: int, con
if found_id is not None:
mark(conn, submission_id, "sent", rar_status_code=200, id_prezentare=found_id,
rar_error="reconciliat (raspuns pierdut)")
print(f"[worker] submission {submission_id} -> sent prin reconciliere (idPrezentare={found_id})", flush=True)
_wlog(conn, "submission_reconciliat",
f"submission {submission_id} -> sent prin reconciliere (idPrezentare={found_id})",
context={"submission_id": submission_id, "id_prezentare": found_id})
return True
return False
@@ -194,12 +225,14 @@ def reconcile_and_mark(conn, rar: RarClient, token: str, submission_id: int, con
def process_one(conn, settings: Settings, rar: RarClient, token: str, claimed: dict) -> str:
"""Trimite o prezentare claimed. Intoarce starea finala (pentru teste/loguri)."""
sid = claimed["id"]
account_id = claimed.get("account_id")
content = claimed["content"]
payload = build_rar_payload(content)
try:
data = rar.post_prezentare(token, payload)
mark(conn, sid, "sent", rar_status_code=200, id_prezentare=data.get("id"))
print(f"[worker] submission {sid} -> sent (idPrezentare={data.get('id')})", flush=True)
_wlog(conn, "submission_sent", f"submission {sid} -> sent (idPrezentare={data.get('id')})",
account_id=account_id, context={"submission_id": sid, "id_prezentare": data.get("id")})
return "sent"
except RarError as exc:
if exc.status_code == 400:
@@ -212,13 +245,17 @@ def process_one(conn, settings: Settings, rar: RarClient, token: str, claimed: d
enriched = [errors.eroare("RAR_VALIDARE", cauza=str(exc))]
detail = json.dumps(enriched, ensure_ascii=False)
mark(conn, sid, "needs_data", rar_status_code=400, rar_error=detail)
print(f"[worker] submission {sid} -> needs_data: {detail}", flush=True)
_wlog(conn, "submission_needs_data", f"submission {sid} -> needs_data (RAR 400)",
nivel="WARNING", account_id=account_id, cod="RAR_VALIDARE",
context={"submission_id": sid})
return "needs_data"
if _is_transient(exc):
return _handle_transient(conn, settings, rar, token, sid, content, str(exc))
# 4xx nerecuperabil (nu 400/401/408/429) -> error.
mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc))
print(f"[worker] submission {sid} -> error: {exc}", flush=True)
_wlog(conn, "submission_error", f"submission {sid} -> error: {exc}",
nivel="ERROR", account_id=account_id,
context={"submission_id": sid, "http": exc.status_code})
return "error"
except (httpx.TimeoutException, httpx.TransportError) as exc:
return _handle_transient(conn, settings, rar, token, sid, content, f"retea: {exc}")
@@ -242,18 +279,23 @@ def recover_orphans(conn, settings: Settings, rar: RarClient, token: str, accoun
`account_id` filtreaza la orfanii unui cont (login-ul e per-cont); None = toti
(compat teste / single-account).
"""
cutoff = _iso(_now() - timedelta(seconds=settings.worker_sending_lease_s))
# Cutoff calculat SQLite-side, in ACELASI format ca sending_since (scris cu
# datetime('now') in claim_one -> 'YYYY-MM-DD HH:MM:SS', cu spatiu). Daca am
# compara cu _iso() (format ISO cu 'T'), spatiul (0x20) < 'T' (0x54) ar face
# orice rand 'sending' sa para mereu <= cutoff -> lease-ul de 120s ignorat,
# iar fiecare rand proaspat revendicat ar fi tratat instant ca orfan.
lease = f"-{int(settings.worker_sending_lease_s)} seconds"
if account_id is not None:
orphans = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='sending' "
"AND (sending_since IS NULL OR sending_since <= ?) AND account_id=?",
(cutoff, account_id),
"AND (sending_since IS NULL OR sending_since <= datetime('now', ?)) AND account_id=?",
(lease, account_id),
).fetchall()
else:
orphans = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='sending' "
"AND (sending_since IS NULL OR sending_since <= ?)",
(cutoff,),
"AND (sending_since IS NULL OR sending_since <= datetime('now', ?))",
(lease,),
).fetchall()
recovered = 0
for row in orphans:
@@ -308,11 +350,23 @@ class AccountSessions:
rar = RarClient(self.settings)
try:
token = rar.login(creds["email"], creds["password"])
except RarAuthError as exc:
rar.close()
# US-005: login esuat (401) — FARA email/parola (doar codul HTTP + contul).
log_event("rar_login", nivel="WARNING", account_id=account_id,
cod="RAR_CREDS_INVALIDE",
mesaj=f"login RAR esuat (cont {account_id}): {exc.status_code or 401}",
context={"rezultat": "esuat", "http": exc.status_code or 401},
conn=conn, sursa="worker")
raise
except Exception:
rar.close()
raise
self._sessions[account_id] = (rar, token)
write_heartbeat(conn, rar_login_ok=True, detail=f"login RAR ok (cont {account_id})")
# US-005: login reusit (fara email/parola in clar — context curat).
log_event("rar_login", account_id=account_id, mesaj=f"login RAR ok (cont {account_id})",
context={"rezultat": "ok", "http": 200}, conn=conn, sursa="worker")
# Creds efemere pe submissions nu mai sunt necesare: JWT acopera retry-urile -> sterge.
# GATE PURJARE (T1/Voce#5): sterge DOAR submissions.rar_creds_enc, NU accounts.rar_creds_enc.
# Canal web: fallback exista in accounts -> purjarea e inofensiva (re-login dupa restart).
@@ -371,6 +425,7 @@ def run() -> int:
signal.signal(signal.SIGINT, _stop)
settings = get_settings()
set_source("worker") # US-005: evenimentele worker-ului au sursa=worker (fisier app-worker.log)
init_db()
conn = get_connection()
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
@@ -386,10 +441,10 @@ def run() -> int:
now_ts = time.time()
if now_ts - _last_purge_time >= _PURGE_INTERVAL_S:
stats = purge_expired(conn)
if stats["submissions_purged"] or stats["batches_purged"]:
if stats["submissions_purged"] or stats["batches_purged"] or stats["events_purged"]:
print(
f"[worker] purjare: {stats['submissions_purged']} submissions, "
f"{stats['batches_purged']} batches sterse",
f"{stats['batches_purged']} batches, {stats['events_purged']} evenimente sterse",
flush=True,
)
_last_purge_time = now_ts
@@ -408,6 +463,12 @@ def run() -> int:
sid = claimed["id"]
account_id = claimed["account_id"]
# T1/US-012: randul poarta creds proaspete (rar_creds_enc != NULL) — fie prima
# trimitere a contului, fie o REACTIVARE dupa creds gresite. Invalidam sesiunea
# RAR cache-uita ca un JWT vechi (30h) din parola GRESITA sa nu trimita cu ea,
# ignorand corectia. Re-login imediat cu creds-urile noi.
if claimed.get("creds_enc"):
sessions.invalidate(account_id)
# T1/D4: incearca creds din submission (canal API efemer), cu fallback la
# accounts.rar_creds_enc (canal web durabil). Canal web n-are re-pusher.
creds = _creds_for(claimed, settings) or _creds_from_account(conn, account_id)
@@ -418,7 +479,10 @@ def run() -> int:
# Creds gresite (login 401): NU se face retry (plan, failure registry).
mark(conn, sid, "error", rar_status_code=401,
rar_error=json.dumps(errors.eroare("RAR_CREDS_INVALIDE", cauza="credentiale RAR invalide"), ensure_ascii=False))
print(f"[worker] submission {sid} (cont {account_id}) -> error: {exc}", flush=True)
# rar_login esuat e deja logat in get_token; aici doar tranzitia submission-ului.
_wlog(conn, "submission_error", f"submission {sid} (cont {account_id}) -> error: creds RAR invalide",
nivel="ERROR", account_id=account_id, cod="RAR_CREDS_INVALIDE",
context={"submission_id": sid, "http": 401})
continue
if token is None:
@@ -430,6 +494,14 @@ def run() -> int:
rar = sessions.rar(account_id)
# Recupereaza orfanii contului inainte de trimitere (acelasi token).
recover_orphans(conn, settings, rar, token, account_id=account_id)
# Guard: recover_orphans putea atinge chiar randul tocmai revendicat
# (reconciliat 'sent' sau requeue 'queued'). Daca nu mai e 'sending',
# NU mai face POST — altfel s-ar crea un duplicat la RAR.
still_sending = conn.execute(
"SELECT 1 FROM submissions WHERE id=? AND status='sending'", (sid,)
).fetchone()
if still_sending is None:
continue
try:
process_one(conn, settings, rar, token, claimed)
except RarAuthError as exc: