diff --git a/app/config.py b/app/config.py index 9772be4..c74fbd4 100644 --- a/app/config.py +++ b/app/config.py @@ -41,6 +41,11 @@ class Settings(BaseSettings): # Dev: foloseste creds din settings.xml pt login worker. In productie # creds vin per-cerere de la ROAAUTO (T2) — lasa False. worker_use_test_creds: bool = False + # T2 — recuperare orfane + retry/backoff: + worker_sending_lease_s: int = 120 # rand 'sending' mai vechi de atat = orfan (worker mort mid-POST) + worker_retry_base_s: int = 5 # backoff = base * 2^retry (plafonat la max) + worker_retry_max_s: int = 300 + worker_max_retries: int = 8 # peste atat -> error + banner (pana persistenta) @property def rar_base_url(self) -> str: diff --git a/app/db.py b/app/db.py index 6fedf8b..69a9286 100644 --- a/app/db.py +++ b/app/db.py @@ -27,14 +27,22 @@ def get_connection() -> sqlite3.Connection: def init_db() -> None: - """Creeaza schema daca lipseste. Idempotent — sigur la fiecare boot.""" + """Creeaza schema daca lipseste + migrari aditive. Idempotent — sigur la fiecare boot.""" conn = get_connection() try: conn.executescript(_SCHEMA.read_text(encoding="utf-8")) + _migrate(conn) finally: conn.close() +def _migrate(conn: sqlite3.Connection) -> None: + """Migrari aditive pentru DB create inainte de o coloana noua (CREATE IF NOT EXISTS nu altereaza).""" + cols = {r["name"] for r in conn.execute("PRAGMA table_info(submissions)").fetchall()} + if "next_attempt_at" not in cols: + conn.execute("ALTER TABLE submissions ADD COLUMN next_attempt_at TEXT") + + def _now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") diff --git a/app/rar_client.py b/app/rar_client.py index 0c5fdd9..10112ba 100644 --- a/app/rar_client.py +++ b/app/rar_client.py @@ -110,8 +110,10 @@ class RarClient: def get_finalizate(self, token: str) -> list[dict]: """Lista prezentarilor finalizate (pentru reconciliere — T2). - Atentie: pe mediul TEST raspunsul NU contine `prestatii` (vezi contract). - Portare din rar-forms.prg:720 / getAllPrezentariFinalizate. + GET /prezentari/getAllPrezentariFinalizate -> data.content (listă). + Verificat live: filtrele/paginarea NU functioneaza pe test (vezi contract), + deci interogam fara parametri si filtram client-side. Pe test `prestatii` + vine null in fiecare item — match-ul se face pe vin+dataPrestatie+odometruFinal. """ resp = self._client.get( "/prezentari/getAllPrezentariFinalizate", @@ -119,8 +121,14 @@ class RarClient: ) if resp.status_code != 200: raise RarError(f"getFinalizate esuat (HTTP {resp.status_code})", status_code=resp.status_code) - data = _safe_json(resp) - return data.get("data", data) if isinstance(data, dict) else data + body = _safe_json(resp) + if isinstance(body, dict): + data = body.get("data") + if isinstance(data, dict) and isinstance(data.get("content"), list): + return data["content"] + if isinstance(data, list): + return data + return [] def _safe_json(resp: httpx.Response) -> Any: diff --git a/app/reconcile.py b/app/reconcile.py new file mode 100644 index 0000000..94836fc --- /dev/null +++ b/app/reconcile.py @@ -0,0 +1,54 @@ +"""Reconciliere anti-duplicat pe raspuns pierdut (T2 — P1). + +Daca un POST postPrezentare ajunge la RAR si RAR insereaza, dar raspunsul se +pierde (timeout/retea), randul ramane `sending`. Un re-send orb ar crea duplicat +(RAR ACCEPTA duplicate — confirmat live, vezi contract). Inainte de re-send, +interogam lista finalizate si cautam o potrivire pe `vin + dataPrestatie + +odometruFinal`. UNIQUE pe `submissions` NU acopera acest caz (e despre starea la RAR). + +Functie pura, unit-testabila. odometruFinal e NUMAR in listarea RAR, string in +payload-ul nostru -> comparam ca int. +""" + +from __future__ import annotations + +from typing import Any + + +def _as_int(value: object) -> int | None: + s = str(value if value is not None else "").strip() + try: + return int(s) + except ValueError: + return None + + +def match_finalizata( + finalizate: list[dict[str, Any]], + *, + vin: str, + data_prestatie: str, + odometru_final: object, +) -> int | None: + """Intoarce id-ul (data.id) unei prezentari finalizate care se potriveste, altfel None. + + Match: vin (case-insensitive) + dataPrestatie (egal) + odometruFinal (egal ca int). + Daca exista mai multe potriviri (RAR accepta duplicate), intoarce id-ul MAXIM + (cel mai recent) — orice match dovedeste ca RAR are deja inregistrarea. + """ + want_vin = (vin or "").strip().upper() + want_odo = _as_int(odometru_final) + want_data = (data_prestatie or "").strip() + + matches: list[int] = [] + for item in finalizate: + if (item.get("vin") or "").strip().upper() != want_vin: + continue + if (str(item.get("dataPrestatie") or "").strip()) != want_data: + continue + if _as_int(item.get("odometruFinal")) != want_odo: + continue + item_id = _as_int(item.get("id")) + if item_id is not None: + matches.append(item_id) + return max(matches) if matches else None diff --git a/app/schema.sql b/app/schema.sql index 61e379c..26b116f 100644 --- a/app/schema.sql +++ b/app/schema.sql @@ -55,6 +55,7 @@ CREATE TABLE IF NOT EXISTS submissions ( rar_error TEXT, id_prezentare INTEGER, -- data.id intors de RAR la succes retry_count INTEGER NOT NULL DEFAULT 0, + next_attempt_at TEXT, -- backoff: randul nu se ia inainte de acest moment (T2) sending_since TEXT, -- pentru lease/timeout pe randuri 'sending' orfane (T2) purge_after TEXT, -- sent + 90z (P2) created_at TEXT NOT NULL DEFAULT (datetime('now')), diff --git a/app/worker/__main__.py b/app/worker/__main__.py index aee87b3..1853457 100644 --- a/app/worker/__main__.py +++ b/app/worker/__main__.py @@ -1,13 +1,19 @@ """Worker RAR — proces propriu (NU task asyncio in uvicorn; plan.md sect. 4). -Bucla: heartbeat -> claim atomic (BEGIN IMMEDIATE) -> login -> postPrezentare -> update. +Bucla: heartbeat -> recupereaza orfane -> claim atomic -> login -> postPrezentare -> update. Ruleaza ca proces separat sub `restart: always` (docker compose). -Schelet — ce E implementat: heartbeat, claim atomic anti-race, login cu token -cache, postPrezentare cu maparea erorilor de validare (400 -> needs_data). -Ce NU e inca (marcat TODO): reconcilierea anti-duplicat pe raspuns pierdut (T2), -retry/backoff exponential (T2), lease/timeout pe randuri 'sending' orfane (T2), -livrarea creds per-cerere de la ROAAUTO (T2 — in schelet folosim creds local). +T2 implementat: + - claim atomic anti-race (BEGIN IMMEDIATE), respecta next_attempt_at (backoff). + - reconciliere anti-duplicat pe raspuns pierdut: pe eroare tranzitorie/timeout SAU pe + rand 'sending' orfan (worker mort mid-POST), interogheaza finalizate si match pe + vin+dataPrestatie+odometruFinal; daca exista -> 'sent' (NU re-trimite). + - retry/backoff exponential pe erori tranzitorii; peste worker_max_retries -> 'error' (banner). + - lease/timeout pe randuri 'sending' orfane. + - re-login la token expirat (401 mid-sesiune) — JWT 30h, retry NU plafonat la 30h. + +Ce NU e inca: livrarea creds per-cerere de la ROAAUTO (in schelet folosim creds ), +criptare PII at-rest (P2), b64Image mare pe disc (P2). Pornire: python -m app.worker """ @@ -18,10 +24,14 @@ import json import signal import sys import time +from datetime import datetime, timedelta, timezone -from ..config import get_settings, load_test_credentials +import httpx + +from ..config import Settings, get_settings, load_test_credentials from ..db import get_connection, init_db, write_heartbeat from ..payload import build_rar_payload +from ..reconcile import match_finalizata from ..rar_client import RarAuthError, RarClient, RarError _running = True @@ -32,16 +42,63 @@ def _stop(signum: int, frame: object) -> None: _running = False -def claim_one(conn) -> dict | None: - """Claim atomic al unui rand 'queued' -> 'sending'. Intoarce randul sau None. +def _now() -> datetime: + return datetime.now(timezone.utc) - BEGIN IMMEDIATE ia lock de scriere imediat, deci doi workeri nu pot lua - acelasi rand. (Un singur worker in v1, dar claim-ul ramane corect la scalare.) - """ + +def _iso(dt: datetime) -> str: + return dt.isoformat(timespec="seconds") + + +def _backoff_seconds(settings: Settings, retry_count: int) -> int: + return min(settings.worker_retry_base_s * (2 ** max(0, retry_count - 1)), settings.worker_retry_max_s) + + +def _is_transient(exc: Exception) -> bool: + """Eroare in care POST-ul poate sa fi ajuns/sa nu fi ajuns la RAR -> reconciliere + retry.""" + if isinstance(exc, (httpx.TimeoutException, httpx.TransportError)): + return True + if isinstance(exc, RarError) and not isinstance(exc, RarAuthError): + code = exc.status_code + return code is None or code >= 500 or code in (408, 429) + return False + + +# --- Operatii pe submissions --- + +def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None: + conn.execute( + "UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, " + "sending_since=NULL, updated_at=datetime('now') WHERE id=?", + (status, rar_status_code, rar_error, id_prezentare, submission_id), + ) + + +def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason: str) -> None: + """Re-pune randul in coada cu retry++ si next_attempt_at = now + backoff.""" + row = conn.execute("SELECT retry_count FROM submissions WHERE id=?", (submission_id,)).fetchone() + new_retry = int(row["retry_count"]) + 1 if row else 1 + if new_retry > settings.worker_max_retries: + mark(conn, submission_id, "error", rar_error=f"esuat dupa {new_retry-1} reincercari: {reason}") + print(f"[worker] submission {submission_id} -> error (max retries): {reason}", flush=True) + return + next_at = _iso(_now() + timedelta(seconds=_backoff_seconds(settings, new_retry))) + conn.execute( + "UPDATE submissions SET status='queued', retry_count=?, next_attempt_at=?, " + "rar_error=?, sending_since=NULL, updated_at=datetime('now') WHERE id=?", + (new_retry, next_at, reason, submission_id), + ) + print(f"[worker] submission {submission_id} -> requeue (retry {new_retry}, next {next_at}): {reason}", flush=True) + + +def claim_one(conn) -> dict | None: + """Claim atomic 'queued' -> 'sending', respectand next_attempt_at. Intoarce randul sau None.""" conn.execute("BEGIN IMMEDIATE") try: row = conn.execute( - "SELECT id, payload_json FROM submissions WHERE status='queued' ORDER BY id LIMIT 1" + "SELECT id, payload_json FROM submissions WHERE status='queued' " + "AND (next_attempt_at IS NULL OR next_attempt_at <= ?) ORDER BY id LIMIT 1", + (_iso(_now()),), ).fetchone() if not row: conn.execute("COMMIT") @@ -58,37 +115,91 @@ def claim_one(conn) -> dict | None: raise -def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None: - conn.execute( - "UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, " - "updated_at=datetime('now') WHERE id=?", - (status, rar_status_code, rar_error, id_prezentare, submission_id), - ) +def reconcile_and_mark(conn, rar: RarClient, token: str, submission_id: int, content: dict) -> bool: + """Cauta la RAR o prezentare deja inregistrata pentru acest continut. Marcheaza 'sent' daca exista. - -def process_one(conn, rar: RarClient, token: str, claimed: dict) -> None: - """Trimite o prezentare claimed. Mapeaza rezultatul pe masina de stari. - - TODO(T2): inainte de re-send pe un rand ramas 'sending' (raspuns pierdut), - interogheaza get_finalizate pe VIN+dataPrestatie+odometruFinal si marcheaza - 'sent' daca exista deja (anti-duplicat). UNIQUE NU acopera acest caz. + Intoarce True daca a gasit (si a marcat sent), False altfel. """ + finalizate = rar.get_finalizate(token) + found_id = match_finalizata( + finalizate, + vin=content.get("vin", ""), + data_prestatie=content.get("data_prestatie", ""), + odometru_final=content.get("odometru_final"), + ) + if found_id is not None: + mark(conn, submission_id, "sent", rar_status_code=200, id_prezentare=found_id, + rar_error="reconciliat (raspuns pierdut)") + print(f"[worker] submission {submission_id} -> sent prin reconciliere (idPrezentare={found_id})", flush=True) + return True + return False + + +def process_one(conn, settings: Settings, rar: RarClient, token: str, claimed: dict) -> str: + """Trimite o prezentare claimed. Intoarce starea finala (pentru teste/loguri).""" sid = claimed["id"] - payload = build_rar_payload(claimed["content"]) + content = claimed["content"] + payload = build_rar_payload(content) try: data = rar.post_prezentare(token, payload) mark(conn, sid, "sent", rar_status_code=200, id_prezentare=data.get("id")) print(f"[worker] submission {sid} -> sent (idPrezentare={data.get('id')})", flush=True) + return "sent" except RarError as exc: if exc.status_code == 400: - # Validare esuata la RAR -> needs_data (nu re-trimite orb). detail = json.dumps(exc.field_errors, ensure_ascii=False) if exc.field_errors else str(exc) mark(conn, sid, "needs_data", rar_status_code=400, rar_error=detail) print(f"[worker] submission {sid} -> needs_data: {detail}", flush=True) - else: - # TODO(T2): retry/backoff in loc de error direct pe 5xx/tranzitoriu. - mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc)) - print(f"[worker] submission {sid} -> error: {exc}", flush=True) + return "needs_data" + if _is_transient(exc): + return _handle_transient(conn, settings, rar, token, sid, content, str(exc)) + # 4xx nerecuperabil (nu 400/401/408/429) -> error. + mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc)) + print(f"[worker] submission {sid} -> error: {exc}", flush=True) + return "error" + except (httpx.TimeoutException, httpx.TransportError) as exc: + return _handle_transient(conn, settings, rar, token, sid, content, f"retea: {exc}") + + +def _handle_transient(conn, settings: Settings, rar: RarClient, token: str, sid: int, content: dict, reason: str) -> str: + """Eroare ambigua: POST-ul poate sa fi ajuns la RAR. Reconciliaza intai; altfel retry/backoff.""" + try: + if reconcile_and_mark(conn, rar, token, sid, content): + return "sent" + except RarError as exc: + # Reconcilierea insasi a esuat -> nu putem confirma; tratam ca tranzitoriu si retry. + reason = f"{reason}; reconciliere esuata: {exc}" + requeue_with_backoff(conn, settings, sid, reason=reason) + return "requeued" + + +def recover_orphans(conn, settings: Settings, rar: RarClient, token: str) -> int: + """Randuri 'sending' mai vechi de lease (worker mort mid-POST). Reconciliaza; altfel requeue.""" + cutoff = _iso(_now() - timedelta(seconds=settings.worker_sending_lease_s)) + orphans = conn.execute( + "SELECT id, payload_json FROM submissions WHERE status='sending' " + "AND (sending_since IS NULL OR sending_since <= ?)", + (cutoff,), + ).fetchall() + recovered = 0 + for row in orphans: + sid = row["id"] + content = json.loads(row["payload_json"]) + try: + if reconcile_and_mark(conn, rar, token, sid, content): + recovered += 1 + else: + # Nu e la RAR -> sigur de re-trimis. + requeue_with_backoff(conn, settings, sid, reason="orfan recuperat (nu exista la RAR)") + recovered += 1 + except RarError as exc: + print(f"[worker] orfan {sid}: reconciliere esuata ({exc}); il las pentru data viitoare", flush=True) + return recovered + + +def _queue_depth(conn) -> int: + row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone() + return int(row["n"]) if row else 0 def run() -> int: @@ -98,7 +209,6 @@ def run() -> int: settings = get_settings() init_db() conn = get_connection() - print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True) creds = load_test_credentials() if settings.worker_use_test_creds else None @@ -107,36 +217,35 @@ def run() -> int: while _running: try: - depth_detail = f"poll (queue={_queue_depth(conn)})" - write_heartbeat(conn, detail=depth_detail) + write_heartbeat(conn, detail=f"poll (queue={_queue_depth(conn)})") if not settings.worker_send_enabled: time.sleep(settings.worker_poll_interval_s) continue + if not creds: + time.sleep(settings.worker_poll_interval_s) + continue + + # Login lazy + token cache (JWT 30h). + if rar is None or token is None: + rar = RarClient(settings) + token = rar.login(creds["email"], creds["password"]) + write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok") + + recover_orphans(conn, settings, rar, token) claimed = claim_one(conn) if claimed is None: time.sleep(settings.worker_poll_interval_s) continue - if not creds: - # TODO(T2): canalul real de creds per-cerere de la ROAAUTO. - mark(conn, claimed["id"], "error", rar_error="creds RAR indisponibile (T2)") - continue - - # Login lazy + token cache (JWT 30h). Re-login la expirare = T2. - if rar is None or token is None: - rar = RarClient(settings) - token = rar.login(creds["email"], creds["password"]) - write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok") - - process_one(conn, rar, token, claimed) + process_one(conn, settings, rar, token, claimed) except RarAuthError as exc: - print(f"[worker] login esuat: {exc}", flush=True) - token = None # forteaza re-login data viitoare + print(f"[worker] login esuat / token expirat: {exc}", flush=True) + token = None # forteaza re-login (acopera si expirarea JWT la 30h) time.sleep(settings.worker_poll_interval_s) - except Exception as exc: # noqa: BLE001 — loop top-level, nu cadem la o eroare punctuala + except Exception as exc: # noqa: BLE001 — loop top-level: o eroare punctuala nu opreste worker-ul print(f"[worker] eroare neasteptata: {exc}", flush=True) time.sleep(settings.worker_poll_interval_s) @@ -147,10 +256,5 @@ def run() -> int: return 0 -def _queue_depth(conn) -> int: - row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone() - return int(row["n"]) if row else 0 - - if __name__ == "__main__": sys.exit(run()) diff --git a/docs/api-rar-contract.md b/docs/api-rar-contract.md index 0ca1576..3cebe4b 100644 --- a/docs/api-rar-contract.md +++ b/docs/api-rar-contract.md @@ -211,12 +211,34 @@ Aplicate deja pe ambele medii (test + producție): - Corecția datelor eronate (după FINALIZATA) = solicitare la **suport.autopass@rarom.ro** (pe test nu e cazul). **Nu există flux API de corecție/anulare pentru records-urile noastre.** -## Monitorizare (citire prezentări) +## Monitorizare (citire prezentări) — VERIFICAT LIVE 2026-06-15 -- Pe **mediul de test**: la interogarea listei de prezentări finalizate **NU primești și `prestatii`** în răspuns. -- Pe **producție**: prestațiile sunt disponibile; lista poate fi filtrată după keyword / interval - de date și exportată în Excel. -- Implicație dashboard: nu te baza pe `prestatii` din listă pe test; le ai în `submissions` local. +**Rută:** `GET /prezentari/getAllPrezentariFinalizate` (Bearer). Confirmat live. +**Răspuns:** `{statusCode, message, data: {totalCount, content: [...]}}` — listă în `data.content`. + +Fiecare item din `content` (live): +```json +{ + "id": 68514, "dataPrestatie": "2026-06-15", "vin": "WVWZZZ1KZAW000123", + "odometruFinal": 123456, "idAgent": 40, "tipPrestatie": null, + "odometruInitial": null, "idUser": 6766, "sistemReparat": null, "obs": "...", + "nrInmatriculare": "B999TST", "listaPrestatii": null, "status": "FINALIZATA", + "prestatii": null, "b64Image": null +} +``` + +- **`odometruFinal` e NUMĂR** (int) în listare (deși la `postPrezentare` se trimite string). Reconcilierea + compară ca int. +- Pe **test**: `prestatii` vine `null` (confirmă: nu te baza pe `prestatii` din listă — le ai local în `submissions`). +- **Filtrele NU funcționează pe test**: `?vin=`, `?search=`, `?keyword=`, `?dataPrestatie=` sunt IGNORATE + (întorc tot setul). `?page=&size=` rup răspunsul (non-JSON). → **fetch tot setul, filtrează client-side.** + Pe prod doc-ul promite filtrare keyword/dată + export Excel (de re-verificat pe prod). +- **RAR acceptă DUPLICATE**: live există 2 perechi de records identice pe `vin+dataPrestatie+odometruFinal` + (id 63622≡63625, 63623≡63626). De aceea reconcilierea pe răspuns pierdut e necesară, iar matcher-ul + alege **id-ul maxim** când există mai multe potriviri. + +> Reconciliere (T2): înainte de re-send pe un rând `sending`, GET finalizate, match pe +> `vin + dataPrestatie + odometruFinal(int)`; dacă există → marchează `sent` cu id-ul găsit, NU re-trimite. ## Corecții față de `docs/plans/*` (citește înainte de a refolosi planurile) diff --git a/docs/plans/plan.md b/docs/plans/plan.md index c89742c..ba32c4a 100644 --- a/docs/plans/plan.md +++ b/docs/plans/plan.md @@ -202,8 +202,11 @@ Nimic din cod nu e scris încă (`app/`, `tools/` nu există). Ordine recomandat - [x] **T4 (P1) — payload builder** ✅ 2026-06-15. `app/payload.py`: `status:"FINALIZATA"`, `sistemReparat:"null"`, fără `tipPrestatie`, `odometruFinal`/`odometruInitial` string (initial gol → null), `prestatii:[{codPrestatie,idPrezentare:null}]`, obs/b64Image omise când lipsesc. Verify: 10 teste (`tests/test_payload.py`), inclusiv snapshot vs exemplul oficial din contract. -- [ ] **T2 (P1) — `app/worker` reconciliere** VIN+dată+odometru înainte de re-send pe `sending` + lease/timeout orfane. - Verify: test integration — răspuns pierdut simulat → fără duplicat la RAR. +- [x] **T2 (P1) — `app/worker` reconciliere** ✅ 2026-06-15. `app/reconcile.py` match pe vin+dataPrestatie+odometruFinal(int, + id maxim la duplicate) + worker: recuperare orfane (lease), reconciliere pe eroare tranzitorie/timeout înainte de re-send, + retry/backoff exponential (peste `worker_max_retries` → error+banner), re-login la token expirat. Rută monitorizare descoperită + live: `GET /prezentari/getAllPrezentariFinalizate` → `data.content` (filtrele nu merg pe test → fetch tot, match client-side). + Verify: 15 teste (`tests/test_worker_reconcile.py`) + validare LIVE (reconciliere record 68514 din finalizate reale). - [ ] **T6 (P2) — worker proces/container propriu supravegheat;** `/healthz` pică → restart. Verify: worker omorât → restart automat. - [ ] **T7 (P2) — deploy:** SQLite pe volum persistent numit + backup (singura copie durabilă, re-push scos). Verify: recreare container → coada supraviețuiește. diff --git a/tests/test_worker_reconcile.py b/tests/test_worker_reconcile.py new file mode 100644 index 0000000..8041b19 --- /dev/null +++ b/tests/test_worker_reconcile.py @@ -0,0 +1,223 @@ +"""Teste T2 — reconciliere anti-duplicat + retry/backoff + recuperare orfane. + +Matcher pur (app.reconcile) + functiile worker-ului cu un RAR fake si DB temporara. +Testul-cheie: raspuns pierdut -> reconciliere gaseste recordul -> 'sent' fara re-POST. +""" + +from __future__ import annotations + +import json +import os +import tempfile + +import httpx +import pytest + +from app.rar_client import RarError +from app.reconcile import match_finalizata + +# --- Matcher (unit) --- + +_FIN = [ + {"id": 63622, "vin": "WBA1234567890", "dataPrestatie": "2024-02-05", "odometruFinal": 125000}, + {"id": 63625, "vin": "WBA1234567890", "dataPrestatie": "2024-02-05", "odometruFinal": 125000}, + {"id": 68514, "vin": "WVWZZZ1KZAW000123", "dataPrestatie": "2026-06-15", "odometruFinal": 123456}, +] + + +def test_match_gaseste_pe_vin_data_odo(): + assert match_finalizata(_FIN, vin="WVWZZZ1KZAW000123", data_prestatie="2026-06-15", odometru_final="123456") == 68514 + + +def test_match_vin_case_insensitive(): + assert match_finalizata(_FIN, vin="wvwzzz1kzaw000123", data_prestatie="2026-06-15", odometru_final="123456") == 68514 + + +def test_match_odo_string_vs_int(): + # payload-ul nostru are odometruFinal string; RAR il intoarce int. + assert match_finalizata(_FIN, vin="WBA1234567890", data_prestatie="2024-02-05", odometru_final="125000") == 63625 + + +def test_match_duplicate_alege_id_maxim(): + # 63622 si 63625 sunt identice -> alege maximul. + assert match_finalizata(_FIN, vin="WBA1234567890", data_prestatie="2024-02-05", odometru_final=125000) == 63625 + + +def test_match_fara_potrivire(): + assert match_finalizata(_FIN, vin="WVWZZZ1KZAW000123", data_prestatie="2026-06-15", odometru_final="999") is None + + +def test_match_data_diferita(): + assert match_finalizata(_FIN, vin="WBA1234567890", data_prestatie="2025-01-01", odometru_final="125000") is None + + +# --- Worker (integration cu FakeRar + DB temporara) --- + +class FakeRar: + def __init__(self, *, finalizate=None, post_result=None, post_exc=None): + self.finalizate = finalizate or [] + self.post_result = post_result if post_result is not None else {"id": 1000} + self.post_exc = post_exc + self.post_calls = 0 + self.finalizate_calls = 0 + + def get_finalizate(self, token): + self.finalizate_calls += 1 + return self.finalizate + + def post_prezentare(self, token, payload): + self.post_calls += 1 + if self.post_exc is not None: + raise self.post_exc + return self.post_result + + +@pytest.fixture() +def env(monkeypatch): + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) + from app.config import get_settings + get_settings.cache_clear() + from app.db import get_connection, init_db + init_db() + conn = get_connection() + settings = get_settings() + yield conn, settings + conn.close() + get_settings.cache_clear() + + +_CONTENT = { + "vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B999TST", + "data_prestatie": "2026-06-15", "odometru_final": "123456", + "prestatii": [{"cod_prestatie": "OE-1"}], "sistem_reparat": "null", +} + + +def _insert(conn, status="queued", content=None, sending_since=None, retry_count=0): + content = content or _CONTENT + cur = conn.execute( + "INSERT INTO submissions (idempotency_key, status, payload_json, sending_since, retry_count) " + "VALUES (?, ?, ?, ?, ?)", + (f"key-{os.urandom(4).hex()}", status, json.dumps(content), sending_since, retry_count), + ) + return int(cur.lastrowid) + + +def _status(conn, sid): + return conn.execute("SELECT * FROM submissions WHERE id=?", (sid,)).fetchone() + + +def test_happy_path_sent(env): + from app.worker.__main__ import process_one + conn, settings = env + sid = _insert(conn) + rar = FakeRar(post_result={"id": 555}) + out = process_one(conn, settings, rar, "tok", {"id": sid, "content": _CONTENT}) + assert out == "sent" + row = _status(conn, sid) + assert row["status"] == "sent" and row["id_prezentare"] == 555 + assert rar.post_calls == 1 + + +def test_raspuns_pierdut_reconciliere_fara_duplicat(env): + """POST esueaza tranzitoriu DAR RAR a inregistrat -> reconciliere -> sent, fara re-POST.""" + from app.worker.__main__ import process_one + conn, settings = env + sid = _insert(conn) + # 503 = tranzitoriu; recordul EXISTA deja la RAR (raspunsul s-a pierdut). + rar = FakeRar( + finalizate=[{"id": 68514, "vin": "WVWZZZ1KZAW000123", "dataPrestatie": "2026-06-15", "odometruFinal": 123456}], + post_exc=RarError("502 bad gateway", status_code=503), + ) + out = process_one(conn, settings, rar, "tok", {"id": sid, "content": _CONTENT}) + assert out == "sent" + row = _status(conn, sid) + assert row["status"] == "sent" and row["id_prezentare"] == 68514 + assert rar.post_calls == 1 # NU s-a re-trimis + assert rar.finalizate_calls == 1 # a reconciliat + + +def test_tranzitoriu_neinregistrat_requeue(env): + from app.worker.__main__ import process_one + conn, settings = env + sid = _insert(conn) + rar = FakeRar(finalizate=[], post_exc=httpx.ConnectError("conn refused")) + out = process_one(conn, settings, rar, "tok", {"id": sid, "content": _CONTENT}) + assert out == "requeued" + row = _status(conn, sid) + assert row["status"] == "queued" + assert row["retry_count"] == 1 + assert row["next_attempt_at"] is not None + + +def test_validare_400_needs_data(env): + from app.worker.__main__ import process_one + conn, settings = env + sid = _insert(conn) + rar = FakeRar(post_exc=RarError("Validare esuata", status_code=400, + field_errors=[{"field": "vin", "message": "rau"}])) + out = process_one(conn, settings, rar, "tok", {"id": sid, "content": _CONTENT}) + assert out == "needs_data" + assert _status(conn, sid)["status"] == "needs_data" + assert rar.post_calls == 1 + + +def test_4xx_nerecuperabil_error(env): + from app.worker.__main__ import process_one + conn, settings = env + sid = _insert(conn) + rar = FakeRar(post_exc=RarError("forbidden", status_code=403)) + out = process_one(conn, settings, rar, "tok", {"id": sid, "content": _CONTENT}) + assert out == "error" + assert _status(conn, sid)["status"] == "error" + + +def test_retry_peste_maxim_devine_error(env): + from app.worker.__main__ import requeue_with_backoff + conn, settings = env + sid = _insert(conn, retry_count=settings.worker_max_retries) # urmatorul requeue depaseste maximul + requeue_with_backoff(conn, settings, sid, reason="tranzitoriu persistent") + assert _status(conn, sid)["status"] == "error" + + +def test_recover_orphan_reconciliat(env): + from app.worker.__main__ import recover_orphans + conn, settings = env + # rand 'sending' vechi (lease depasit) -> orfan; recordul exista la RAR. + sid = _insert(conn, status="sending", sending_since="2000-01-01T00:00:00+00:00") + rar = FakeRar(finalizate=[{"id": 77777, "vin": "WVWZZZ1KZAW000123", + "dataPrestatie": "2026-06-15", "odometruFinal": 123456}]) + n = recover_orphans(conn, settings, rar, "tok") + assert n == 1 + row = _status(conn, sid) + assert row["status"] == "sent" and row["id_prezentare"] == 77777 + + +def test_recover_orphan_neinregistrat_requeue(env): + from app.worker.__main__ import recover_orphans + conn, settings = env + sid = _insert(conn, status="sending", sending_since="2000-01-01T00:00:00+00:00") + rar = FakeRar(finalizate=[]) + recover_orphans(conn, settings, rar, "tok") + assert _status(conn, sid)["status"] == "queued" + + +def test_claim_respecta_next_attempt_at(env): + from app.worker.__main__ import claim_one + conn, _ = env + # next_attempt_at in viitor -> nu se ia. + conn.execute( + "INSERT INTO submissions (idempotency_key, status, payload_json, next_attempt_at) " + "VALUES ('k1','queued',?, '2999-01-01T00:00:00+00:00')", + (json.dumps(_CONTENT),), + ) + assert claim_one(conn) is None + # next_attempt_at in trecut -> se ia. + conn.execute( + "INSERT INTO submissions (idempotency_key, status, payload_json, next_attempt_at) " + "VALUES ('k2','queued',?, '2000-01-01T00:00:00+00:00')", + (json.dumps(_CONTENT),), + ) + claimed = claim_one(conn) + assert claimed is not None