feat(T2): reconciliere anti-duplicat + retry/backoff + recuperare orfane
Inchide bucla de trimitere (plan.md sect. 4 worker, failure registry).
- app/reconcile.py: match_finalizata pe vin+dataPrestatie+odometruFinal (int),
alege id maxim la duplicate (RAR accepta duplicate, confirmat live)
- app/rar_client.get_finalizate: parseaza data.content (descoperit live ca
ruta = GET /prezentari/getAllPrezentariFinalizate; filtrele nu merg pe test)
- app/worker rescris:
- recuperare orfane (rand 'sending' peste lease = worker mort mid-POST)
- pe eroare tranzitorie/timeout: reconciliere INAINTE de re-send (anti-duplicat);
daca recordul exista la RAR -> sent fara re-POST
- retry/backoff exponential; peste worker_max_retries -> error + banner
- re-login la token expirat (JWT 30h)
- schema: coloana next_attempt_at (backoff) + migrare aditiva in init_db
- config: worker_sending_lease_s, worker_retry_base_s/max_s, worker_max_retries
- contract: documentata ruta+forma getAllPrezentariFinalizate (verificat live)
Verify: pytest 54 passed (15 noi T2) + validare live (reconciliere record 68514).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,13 +1,19 @@
|
||||
"""Worker RAR — proces propriu (NU task asyncio in uvicorn; plan.md sect. 4).
|
||||
|
||||
Bucla: heartbeat -> claim atomic (BEGIN IMMEDIATE) -> login -> postPrezentare -> update.
|
||||
Bucla: heartbeat -> recupereaza orfane -> claim atomic -> login -> postPrezentare -> update.
|
||||
Ruleaza ca proces separat sub `restart: always` (docker compose).
|
||||
|
||||
Schelet — ce E implementat: heartbeat, claim atomic anti-race, login cu token
|
||||
cache, postPrezentare cu maparea erorilor de validare (400 -> needs_data).
|
||||
Ce NU e inca (marcat TODO): reconcilierea anti-duplicat pe raspuns pierdut (T2),
|
||||
retry/backoff exponential (T2), lease/timeout pe randuri 'sending' orfane (T2),
|
||||
livrarea creds per-cerere de la ROAAUTO (T2 — in schelet folosim creds <test> local).
|
||||
T2 implementat:
|
||||
- claim atomic anti-race (BEGIN IMMEDIATE), respecta next_attempt_at (backoff).
|
||||
- reconciliere anti-duplicat pe raspuns pierdut: pe eroare tranzitorie/timeout SAU pe
|
||||
rand 'sending' orfan (worker mort mid-POST), interogheaza finalizate si match pe
|
||||
vin+dataPrestatie+odometruFinal; daca exista -> 'sent' (NU re-trimite).
|
||||
- retry/backoff exponential pe erori tranzitorii; peste worker_max_retries -> 'error' (banner).
|
||||
- lease/timeout pe randuri 'sending' orfane.
|
||||
- re-login la token expirat (401 mid-sesiune) — JWT 30h, retry NU plafonat la 30h.
|
||||
|
||||
Ce NU e inca: livrarea creds per-cerere de la ROAAUTO (in schelet folosim creds <test>),
|
||||
criptare PII at-rest (P2), b64Image mare pe disc (P2).
|
||||
|
||||
Pornire: python -m app.worker
|
||||
"""
|
||||
@@ -18,10 +24,14 @@ import json
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from ..config import get_settings, load_test_credentials
|
||||
import httpx
|
||||
|
||||
from ..config import Settings, get_settings, load_test_credentials
|
||||
from ..db import get_connection, init_db, write_heartbeat
|
||||
from ..payload import build_rar_payload
|
||||
from ..reconcile import match_finalizata
|
||||
from ..rar_client import RarAuthError, RarClient, RarError
|
||||
|
||||
_running = True
|
||||
@@ -32,16 +42,63 @@ def _stop(signum: int, frame: object) -> None:
|
||||
_running = False
|
||||
|
||||
|
||||
def claim_one(conn) -> dict | None:
|
||||
"""Claim atomic al unui rand 'queued' -> 'sending'. Intoarce randul sau None.
|
||||
def _now() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
BEGIN IMMEDIATE ia lock de scriere imediat, deci doi workeri nu pot lua
|
||||
acelasi rand. (Un singur worker in v1, dar claim-ul ramane corect la scalare.)
|
||||
"""
|
||||
|
||||
def _iso(dt: datetime) -> str:
|
||||
return dt.isoformat(timespec="seconds")
|
||||
|
||||
|
||||
def _backoff_seconds(settings: Settings, retry_count: int) -> int:
|
||||
return min(settings.worker_retry_base_s * (2 ** max(0, retry_count - 1)), settings.worker_retry_max_s)
|
||||
|
||||
|
||||
def _is_transient(exc: Exception) -> bool:
|
||||
"""Eroare in care POST-ul poate sa fi ajuns/sa nu fi ajuns la RAR -> reconciliere + retry."""
|
||||
if isinstance(exc, (httpx.TimeoutException, httpx.TransportError)):
|
||||
return True
|
||||
if isinstance(exc, RarError) and not isinstance(exc, RarAuthError):
|
||||
code = exc.status_code
|
||||
return code is None or code >= 500 or code in (408, 429)
|
||||
return False
|
||||
|
||||
|
||||
# --- Operatii pe submissions ---
|
||||
|
||||
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
|
||||
conn.execute(
|
||||
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
|
||||
"sending_since=NULL, updated_at=datetime('now') WHERE id=?",
|
||||
(status, rar_status_code, rar_error, id_prezentare, submission_id),
|
||||
)
|
||||
|
||||
|
||||
def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason: str) -> None:
|
||||
"""Re-pune randul in coada cu retry++ si next_attempt_at = now + backoff."""
|
||||
row = conn.execute("SELECT retry_count FROM submissions WHERE id=?", (submission_id,)).fetchone()
|
||||
new_retry = int(row["retry_count"]) + 1 if row else 1
|
||||
if new_retry > settings.worker_max_retries:
|
||||
mark(conn, submission_id, "error", rar_error=f"esuat dupa {new_retry-1} reincercari: {reason}")
|
||||
print(f"[worker] submission {submission_id} -> error (max retries): {reason}", flush=True)
|
||||
return
|
||||
next_at = _iso(_now() + timedelta(seconds=_backoff_seconds(settings, new_retry)))
|
||||
conn.execute(
|
||||
"UPDATE submissions SET status='queued', retry_count=?, next_attempt_at=?, "
|
||||
"rar_error=?, sending_since=NULL, updated_at=datetime('now') WHERE id=?",
|
||||
(new_retry, next_at, reason, submission_id),
|
||||
)
|
||||
print(f"[worker] submission {submission_id} -> requeue (retry {new_retry}, next {next_at}): {reason}", flush=True)
|
||||
|
||||
|
||||
def claim_one(conn) -> dict | None:
|
||||
"""Claim atomic 'queued' -> 'sending', respectand next_attempt_at. Intoarce randul sau None."""
|
||||
conn.execute("BEGIN IMMEDIATE")
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT id, payload_json FROM submissions WHERE status='queued' ORDER BY id LIMIT 1"
|
||||
"SELECT id, payload_json FROM submissions WHERE status='queued' "
|
||||
"AND (next_attempt_at IS NULL OR next_attempt_at <= ?) ORDER BY id LIMIT 1",
|
||||
(_iso(_now()),),
|
||||
).fetchone()
|
||||
if not row:
|
||||
conn.execute("COMMIT")
|
||||
@@ -58,37 +115,91 @@ def claim_one(conn) -> dict | None:
|
||||
raise
|
||||
|
||||
|
||||
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
|
||||
conn.execute(
|
||||
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
|
||||
"updated_at=datetime('now') WHERE id=?",
|
||||
(status, rar_status_code, rar_error, id_prezentare, submission_id),
|
||||
)
|
||||
def reconcile_and_mark(conn, rar: RarClient, token: str, submission_id: int, content: dict) -> bool:
|
||||
"""Cauta la RAR o prezentare deja inregistrata pentru acest continut. Marcheaza 'sent' daca exista.
|
||||
|
||||
|
||||
def process_one(conn, rar: RarClient, token: str, claimed: dict) -> None:
|
||||
"""Trimite o prezentare claimed. Mapeaza rezultatul pe masina de stari.
|
||||
|
||||
TODO(T2): inainte de re-send pe un rand ramas 'sending' (raspuns pierdut),
|
||||
interogheaza get_finalizate pe VIN+dataPrestatie+odometruFinal si marcheaza
|
||||
'sent' daca exista deja (anti-duplicat). UNIQUE NU acopera acest caz.
|
||||
Intoarce True daca a gasit (si a marcat sent), False altfel.
|
||||
"""
|
||||
finalizate = rar.get_finalizate(token)
|
||||
found_id = match_finalizata(
|
||||
finalizate,
|
||||
vin=content.get("vin", ""),
|
||||
data_prestatie=content.get("data_prestatie", ""),
|
||||
odometru_final=content.get("odometru_final"),
|
||||
)
|
||||
if found_id is not None:
|
||||
mark(conn, submission_id, "sent", rar_status_code=200, id_prezentare=found_id,
|
||||
rar_error="reconciliat (raspuns pierdut)")
|
||||
print(f"[worker] submission {submission_id} -> sent prin reconciliere (idPrezentare={found_id})", flush=True)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def process_one(conn, settings: Settings, rar: RarClient, token: str, claimed: dict) -> str:
|
||||
"""Trimite o prezentare claimed. Intoarce starea finala (pentru teste/loguri)."""
|
||||
sid = claimed["id"]
|
||||
payload = build_rar_payload(claimed["content"])
|
||||
content = claimed["content"]
|
||||
payload = build_rar_payload(content)
|
||||
try:
|
||||
data = rar.post_prezentare(token, payload)
|
||||
mark(conn, sid, "sent", rar_status_code=200, id_prezentare=data.get("id"))
|
||||
print(f"[worker] submission {sid} -> sent (idPrezentare={data.get('id')})", flush=True)
|
||||
return "sent"
|
||||
except RarError as exc:
|
||||
if exc.status_code == 400:
|
||||
# Validare esuata la RAR -> needs_data (nu re-trimite orb).
|
||||
detail = json.dumps(exc.field_errors, ensure_ascii=False) if exc.field_errors else str(exc)
|
||||
mark(conn, sid, "needs_data", rar_status_code=400, rar_error=detail)
|
||||
print(f"[worker] submission {sid} -> needs_data: {detail}", flush=True)
|
||||
else:
|
||||
# TODO(T2): retry/backoff in loc de error direct pe 5xx/tranzitoriu.
|
||||
mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc))
|
||||
print(f"[worker] submission {sid} -> error: {exc}", flush=True)
|
||||
return "needs_data"
|
||||
if _is_transient(exc):
|
||||
return _handle_transient(conn, settings, rar, token, sid, content, str(exc))
|
||||
# 4xx nerecuperabil (nu 400/401/408/429) -> error.
|
||||
mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc))
|
||||
print(f"[worker] submission {sid} -> error: {exc}", flush=True)
|
||||
return "error"
|
||||
except (httpx.TimeoutException, httpx.TransportError) as exc:
|
||||
return _handle_transient(conn, settings, rar, token, sid, content, f"retea: {exc}")
|
||||
|
||||
|
||||
def _handle_transient(conn, settings: Settings, rar: RarClient, token: str, sid: int, content: dict, reason: str) -> str:
|
||||
"""Eroare ambigua: POST-ul poate sa fi ajuns la RAR. Reconciliaza intai; altfel retry/backoff."""
|
||||
try:
|
||||
if reconcile_and_mark(conn, rar, token, sid, content):
|
||||
return "sent"
|
||||
except RarError as exc:
|
||||
# Reconcilierea insasi a esuat -> nu putem confirma; tratam ca tranzitoriu si retry.
|
||||
reason = f"{reason}; reconciliere esuata: {exc}"
|
||||
requeue_with_backoff(conn, settings, sid, reason=reason)
|
||||
return "requeued"
|
||||
|
||||
|
||||
def recover_orphans(conn, settings: Settings, rar: RarClient, token: str) -> int:
|
||||
"""Randuri 'sending' mai vechi de lease (worker mort mid-POST). Reconciliaza; altfel requeue."""
|
||||
cutoff = _iso(_now() - timedelta(seconds=settings.worker_sending_lease_s))
|
||||
orphans = conn.execute(
|
||||
"SELECT id, payload_json FROM submissions WHERE status='sending' "
|
||||
"AND (sending_since IS NULL OR sending_since <= ?)",
|
||||
(cutoff,),
|
||||
).fetchall()
|
||||
recovered = 0
|
||||
for row in orphans:
|
||||
sid = row["id"]
|
||||
content = json.loads(row["payload_json"])
|
||||
try:
|
||||
if reconcile_and_mark(conn, rar, token, sid, content):
|
||||
recovered += 1
|
||||
else:
|
||||
# Nu e la RAR -> sigur de re-trimis.
|
||||
requeue_with_backoff(conn, settings, sid, reason="orfan recuperat (nu exista la RAR)")
|
||||
recovered += 1
|
||||
except RarError as exc:
|
||||
print(f"[worker] orfan {sid}: reconciliere esuata ({exc}); il las pentru data viitoare", flush=True)
|
||||
return recovered
|
||||
|
||||
|
||||
def _queue_depth(conn) -> int:
|
||||
row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone()
|
||||
return int(row["n"]) if row else 0
|
||||
|
||||
|
||||
def run() -> int:
|
||||
@@ -98,7 +209,6 @@ def run() -> int:
|
||||
settings = get_settings()
|
||||
init_db()
|
||||
conn = get_connection()
|
||||
|
||||
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
|
||||
|
||||
creds = load_test_credentials() if settings.worker_use_test_creds else None
|
||||
@@ -107,36 +217,35 @@ def run() -> int:
|
||||
|
||||
while _running:
|
||||
try:
|
||||
depth_detail = f"poll (queue={_queue_depth(conn)})"
|
||||
write_heartbeat(conn, detail=depth_detail)
|
||||
write_heartbeat(conn, detail=f"poll (queue={_queue_depth(conn)})")
|
||||
|
||||
if not settings.worker_send_enabled:
|
||||
time.sleep(settings.worker_poll_interval_s)
|
||||
continue
|
||||
if not creds:
|
||||
time.sleep(settings.worker_poll_interval_s)
|
||||
continue
|
||||
|
||||
# Login lazy + token cache (JWT 30h).
|
||||
if rar is None or token is None:
|
||||
rar = RarClient(settings)
|
||||
token = rar.login(creds["email"], creds["password"])
|
||||
write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok")
|
||||
|
||||
recover_orphans(conn, settings, rar, token)
|
||||
|
||||
claimed = claim_one(conn)
|
||||
if claimed is None:
|
||||
time.sleep(settings.worker_poll_interval_s)
|
||||
continue
|
||||
|
||||
if not creds:
|
||||
# TODO(T2): canalul real de creds per-cerere de la ROAAUTO.
|
||||
mark(conn, claimed["id"], "error", rar_error="creds RAR indisponibile (T2)")
|
||||
continue
|
||||
|
||||
# Login lazy + token cache (JWT 30h). Re-login la expirare = T2.
|
||||
if rar is None or token is None:
|
||||
rar = RarClient(settings)
|
||||
token = rar.login(creds["email"], creds["password"])
|
||||
write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok")
|
||||
|
||||
process_one(conn, rar, token, claimed)
|
||||
process_one(conn, settings, rar, token, claimed)
|
||||
|
||||
except RarAuthError as exc:
|
||||
print(f"[worker] login esuat: {exc}", flush=True)
|
||||
token = None # forteaza re-login data viitoare
|
||||
print(f"[worker] login esuat / token expirat: {exc}", flush=True)
|
||||
token = None # forteaza re-login (acopera si expirarea JWT la 30h)
|
||||
time.sleep(settings.worker_poll_interval_s)
|
||||
except Exception as exc: # noqa: BLE001 — loop top-level, nu cadem la o eroare punctuala
|
||||
except Exception as exc: # noqa: BLE001 — loop top-level: o eroare punctuala nu opreste worker-ul
|
||||
print(f"[worker] eroare neasteptata: {exc}", flush=True)
|
||||
time.sleep(settings.worker_poll_interval_s)
|
||||
|
||||
@@ -147,10 +256,5 @@ def run() -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def _queue_depth(conn) -> int:
|
||||
row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone()
|
||||
return int(row["n"]) if row else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(run())
|
||||
|
||||
Reference in New Issue
Block a user