Files
rar-autopass/app/worker/__main__.py
Claude Agent a6df3b636f feat(T5): editor web mapare operatii (hibrid + fuzzy + on-demand needs_mapping)
T5 reinterpretat: nu import DBF, ci editor web al maparii operatie ROAAUTO ->
cod RAR, cu fuzzy lookup si validare de catre utilizator.

- Contract hibrid: item prestatie accepta cod_prestatie (RAR direct, back-compat)
  SAU cod_op_service+denumire (mapat de gateway prin operations_mapping).
- Ingestie: op intern necunoscut -> submission needs_mapping (nu pleaca la RAR);
  codul rezolvat se scrie inapoi in payload_json -> payload builder + worker neatinse.
- Editor HTMX (_mapari.html + GET /_fragments/mapari, POST /mapari): listeaza
  op-urile nemapate, fuzzy preselecteaza codul RAR, save -> re-rezolvare automata
  (queued / needs_data).
- Fuzzy: rapidfuzz.token_sort_ratio pe denumire normalizata (fara diacritice).
- Nomenclator: seed fallback 18 coduri la boot (offline) + refresh live din worker.
- Cont default id=1 cat timp auth API-key (CORE) nu exista (account_id NULL).
- Endpointuri API: GET /v1/mapari/pending, POST /v1/mapari (respinge cod inexistent).
- 15 teste noi (tests/test_mapping.py); 69 pass total.
- Contract actualizat (docs/api-rar-contract.md), rapidfuzz==3.14.5 in requirements.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 19:25:21 +00:00

275 lines
11 KiB
Python

"""Worker RAR — proces propriu (NU task asyncio in uvicorn; plan.md sect. 4).
Bucla: heartbeat -> recupereaza orfane -> claim atomic -> login -> postPrezentare -> update.
Ruleaza ca proces separat sub `restart: always` (docker compose).
T2 implementat:
- claim atomic anti-race (BEGIN IMMEDIATE), respecta next_attempt_at (backoff).
- reconciliere anti-duplicat pe raspuns pierdut: pe eroare tranzitorie/timeout SAU pe
rand 'sending' orfan (worker mort mid-POST), interogheaza finalizate si match pe
vin+dataPrestatie+odometruFinal; daca exista -> 'sent' (NU re-trimite).
- retry/backoff exponential pe erori tranzitorii; peste worker_max_retries -> 'error' (banner).
- lease/timeout pe randuri 'sending' orfane.
- re-login la token expirat (401 mid-sesiune) — JWT 30h, retry NU plafonat la 30h.
Ce NU e inca: livrarea creds per-cerere de la ROAAUTO (in schelet folosim creds <test>),
criptare PII at-rest (P2), b64Image mare pe disc (P2).
Pornire: python -m app.worker
"""
from __future__ import annotations
import json
import signal
import sys
import time
from datetime import datetime, timedelta, timezone
import httpx
from ..config import Settings, get_settings, load_test_credentials
from ..db import get_connection, init_db, write_heartbeat
from ..mapping import upsert_nomenclator
from ..payload import build_rar_payload
from ..reconcile import match_finalizata
from ..rar_client import RarAuthError, RarClient, RarError
_running = True
def _stop(signum: int, frame: object) -> None:
global _running
_running = False
def _now() -> datetime:
return datetime.now(timezone.utc)
def _iso(dt: datetime) -> str:
return dt.isoformat(timespec="seconds")
def _backoff_seconds(settings: Settings, retry_count: int) -> int:
return min(settings.worker_retry_base_s * (2 ** max(0, retry_count - 1)), settings.worker_retry_max_s)
def _is_transient(exc: Exception) -> bool:
"""Eroare in care POST-ul poate sa fi ajuns/sa nu fi ajuns la RAR -> reconciliere + retry."""
if isinstance(exc, (httpx.TimeoutException, httpx.TransportError)):
return True
if isinstance(exc, RarError) and not isinstance(exc, RarAuthError):
code = exc.status_code
return code is None or code >= 500 or code in (408, 429)
return False
# --- Operatii pe submissions ---
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
conn.execute(
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
"sending_since=NULL, updated_at=datetime('now') WHERE id=?",
(status, rar_status_code, rar_error, id_prezentare, submission_id),
)
def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason: str) -> None:
"""Re-pune randul in coada cu retry++ si next_attempt_at = now + backoff."""
row = conn.execute("SELECT retry_count FROM submissions WHERE id=?", (submission_id,)).fetchone()
new_retry = int(row["retry_count"]) + 1 if row else 1
if new_retry > settings.worker_max_retries:
mark(conn, submission_id, "error", rar_error=f"esuat dupa {new_retry-1} reincercari: {reason}")
print(f"[worker] submission {submission_id} -> error (max retries): {reason}", flush=True)
return
next_at = _iso(_now() + timedelta(seconds=_backoff_seconds(settings, new_retry)))
conn.execute(
"UPDATE submissions SET status='queued', retry_count=?, next_attempt_at=?, "
"rar_error=?, sending_since=NULL, updated_at=datetime('now') WHERE id=?",
(new_retry, next_at, reason, submission_id),
)
print(f"[worker] submission {submission_id} -> requeue (retry {new_retry}, next {next_at}): {reason}", flush=True)
def claim_one(conn) -> dict | None:
"""Claim atomic 'queued' -> 'sending', respectand next_attempt_at. Intoarce randul sau None."""
conn.execute("BEGIN IMMEDIATE")
try:
row = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='queued' "
"AND (next_attempt_at IS NULL OR next_attempt_at <= ?) ORDER BY id LIMIT 1",
(_iso(_now()),),
).fetchone()
if not row:
conn.execute("COMMIT")
return None
conn.execute(
"UPDATE submissions SET status='sending', sending_since=datetime('now'), "
"updated_at=datetime('now') WHERE id=?",
(row["id"],),
)
conn.execute("COMMIT")
return {"id": row["id"], "content": json.loads(row["payload_json"])}
except Exception:
conn.execute("ROLLBACK")
raise
def reconcile_and_mark(conn, rar: RarClient, token: str, submission_id: int, content: dict) -> bool:
"""Cauta la RAR o prezentare deja inregistrata pentru acest continut. Marcheaza 'sent' daca exista.
Intoarce True daca a gasit (si a marcat sent), False altfel.
"""
finalizate = rar.get_finalizate(token)
found_id = match_finalizata(
finalizate,
vin=content.get("vin", ""),
data_prestatie=content.get("data_prestatie", ""),
odometru_final=content.get("odometru_final"),
)
if found_id is not None:
mark(conn, submission_id, "sent", rar_status_code=200, id_prezentare=found_id,
rar_error="reconciliat (raspuns pierdut)")
print(f"[worker] submission {submission_id} -> sent prin reconciliere (idPrezentare={found_id})", flush=True)
return True
return False
def process_one(conn, settings: Settings, rar: RarClient, token: str, claimed: dict) -> str:
"""Trimite o prezentare claimed. Intoarce starea finala (pentru teste/loguri)."""
sid = claimed["id"]
content = claimed["content"]
payload = build_rar_payload(content)
try:
data = rar.post_prezentare(token, payload)
mark(conn, sid, "sent", rar_status_code=200, id_prezentare=data.get("id"))
print(f"[worker] submission {sid} -> sent (idPrezentare={data.get('id')})", flush=True)
return "sent"
except RarError as exc:
if exc.status_code == 400:
detail = json.dumps(exc.field_errors, ensure_ascii=False) if exc.field_errors else str(exc)
mark(conn, sid, "needs_data", rar_status_code=400, rar_error=detail)
print(f"[worker] submission {sid} -> needs_data: {detail}", flush=True)
return "needs_data"
if _is_transient(exc):
return _handle_transient(conn, settings, rar, token, sid, content, str(exc))
# 4xx nerecuperabil (nu 400/401/408/429) -> error.
mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc))
print(f"[worker] submission {sid} -> error: {exc}", flush=True)
return "error"
except (httpx.TimeoutException, httpx.TransportError) as exc:
return _handle_transient(conn, settings, rar, token, sid, content, f"retea: {exc}")
def _handle_transient(conn, settings: Settings, rar: RarClient, token: str, sid: int, content: dict, reason: str) -> str:
"""Eroare ambigua: POST-ul poate sa fi ajuns la RAR. Reconciliaza intai; altfel retry/backoff."""
try:
if reconcile_and_mark(conn, rar, token, sid, content):
return "sent"
except RarError as exc:
# Reconcilierea insasi a esuat -> nu putem confirma; tratam ca tranzitoriu si retry.
reason = f"{reason}; reconciliere esuata: {exc}"
requeue_with_backoff(conn, settings, sid, reason=reason)
return "requeued"
def recover_orphans(conn, settings: Settings, rar: RarClient, token: str) -> int:
"""Randuri 'sending' mai vechi de lease (worker mort mid-POST). Reconciliaza; altfel requeue."""
cutoff = _iso(_now() - timedelta(seconds=settings.worker_sending_lease_s))
orphans = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='sending' "
"AND (sending_since IS NULL OR sending_since <= ?)",
(cutoff,),
).fetchall()
recovered = 0
for row in orphans:
sid = row["id"]
content = json.loads(row["payload_json"])
try:
if reconcile_and_mark(conn, rar, token, sid, content):
recovered += 1
else:
# Nu e la RAR -> sigur de re-trimis.
requeue_with_backoff(conn, settings, sid, reason="orfan recuperat (nu exista la RAR)")
recovered += 1
except RarError as exc:
print(f"[worker] orfan {sid}: reconciliere esuata ({exc}); il las pentru data viitoare", flush=True)
return recovered
def _queue_depth(conn) -> int:
row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone()
return int(row["n"]) if row else 0
def _refresh_nomenclator(conn, rar: RarClient, token: str) -> None:
"""Ia nomenclatorul live din RAR si il upsert-eaza local. Erorile NU opresc worker-ul."""
try:
items = rar.get_nomenclator(token)
n = upsert_nomenclator(conn, items)
print(f"[worker] nomenclator refresh: {n} coduri", flush=True)
except Exception as exc: # noqa: BLE001 — refresh best-effort, nu blocheaza trimiterea
print(f"[worker] nomenclator refresh esuat (continui): {exc}", flush=True)
def run() -> int:
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)
settings = get_settings()
init_db()
conn = get_connection()
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
creds = load_test_credentials() if settings.worker_use_test_creds else None
rar: RarClient | None = None
token: str | None = None
while _running:
try:
write_heartbeat(conn, detail=f"poll (queue={_queue_depth(conn)})")
if not settings.worker_send_enabled:
time.sleep(settings.worker_poll_interval_s)
continue
if not creds:
time.sleep(settings.worker_poll_interval_s)
continue
# Login lazy + token cache (JWT 30h).
if rar is None or token is None:
rar = RarClient(settings)
token = rar.login(creds["email"], creds["password"])
write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok")
# Refresh nomenclator live (autoritativ) la fiecare login proaspat —
# alimenteaza fuzzy lookup-ul din editorul de mapari.
_refresh_nomenclator(conn, rar, token)
recover_orphans(conn, settings, rar, token)
claimed = claim_one(conn)
if claimed is None:
time.sleep(settings.worker_poll_interval_s)
continue
process_one(conn, settings, rar, token, claimed)
except RarAuthError as exc:
print(f"[worker] login esuat / token expirat: {exc}", flush=True)
token = None # forteaza re-login (acopera si expirarea JWT la 30h)
time.sleep(settings.worker_poll_interval_s)
except Exception as exc: # noqa: BLE001 — loop top-level: o eroare punctuala nu opreste worker-ul
print(f"[worker] eroare neasteptata: {exc}", flush=True)
time.sleep(settings.worker_poll_interval_s)
if rar is not None:
rar.close()
conn.close()
print("[worker] oprit curat", flush=True)
return 0
if __name__ == "__main__":
sys.exit(run())