"""Worker RAR — proces propriu (NU task asyncio in uvicorn; plan.md sect. 4). Bucla: heartbeat -> claim atomic (BEGIN IMMEDIATE) -> login -> postPrezentare -> update. Ruleaza ca proces separat sub `restart: always` (docker compose). Schelet — ce E implementat: heartbeat, claim atomic anti-race, login cu token cache, postPrezentare cu maparea erorilor de validare (400 -> needs_data). Ce NU e inca (marcat TODO): reconcilierea anti-duplicat pe raspuns pierdut (T2), retry/backoff exponential (T2), lease/timeout pe randuri 'sending' orfane (T2), livrarea creds per-cerere de la ROAAUTO (T2 — in schelet folosim creds local). Pornire: python -m app.worker """ from __future__ import annotations import json import signal import sys import time from ..config import get_settings, load_test_credentials from ..db import get_connection, init_db, write_heartbeat from ..payload import build_rar_payload from ..rar_client import RarAuthError, RarClient, RarError _running = True def _stop(signum: int, frame: object) -> None: global _running _running = False def claim_one(conn) -> dict | None: """Claim atomic al unui rand 'queued' -> 'sending'. Intoarce randul sau None. BEGIN IMMEDIATE ia lock de scriere imediat, deci doi workeri nu pot lua acelasi rand. (Un singur worker in v1, dar claim-ul ramane corect la scalare.) """ conn.execute("BEGIN IMMEDIATE") try: row = conn.execute( "SELECT id, payload_json FROM submissions WHERE status='queued' ORDER BY id LIMIT 1" ).fetchone() if not row: conn.execute("COMMIT") return None conn.execute( "UPDATE submissions SET status='sending', sending_since=datetime('now'), " "updated_at=datetime('now') WHERE id=?", (row["id"],), ) conn.execute("COMMIT") return {"id": row["id"], "content": json.loads(row["payload_json"])} except Exception: conn.execute("ROLLBACK") raise def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None: conn.execute( "UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, " "updated_at=datetime('now') WHERE id=?", (status, rar_status_code, rar_error, id_prezentare, submission_id), ) def process_one(conn, rar: RarClient, token: str, claimed: dict) -> None: """Trimite o prezentare claimed. Mapeaza rezultatul pe masina de stari. TODO(T2): inainte de re-send pe un rand ramas 'sending' (raspuns pierdut), interogheaza get_finalizate pe VIN+dataPrestatie+odometruFinal si marcheaza 'sent' daca exista deja (anti-duplicat). UNIQUE NU acopera acest caz. """ sid = claimed["id"] payload = build_rar_payload(claimed["content"]) try: data = rar.post_prezentare(token, payload) mark(conn, sid, "sent", rar_status_code=200, id_prezentare=data.get("id")) print(f"[worker] submission {sid} -> sent (idPrezentare={data.get('id')})", flush=True) except RarError as exc: if exc.status_code == 400: # Validare esuata la RAR -> needs_data (nu re-trimite orb). detail = json.dumps(exc.field_errors, ensure_ascii=False) if exc.field_errors else str(exc) mark(conn, sid, "needs_data", rar_status_code=400, rar_error=detail) print(f"[worker] submission {sid} -> needs_data: {detail}", flush=True) else: # TODO(T2): retry/backoff in loc de error direct pe 5xx/tranzitoriu. mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc)) print(f"[worker] submission {sid} -> error: {exc}", flush=True) def run() -> int: signal.signal(signal.SIGTERM, _stop) signal.signal(signal.SIGINT, _stop) settings = get_settings() init_db() conn = get_connection() print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True) creds = load_test_credentials() if settings.worker_use_test_creds else None rar: RarClient | None = None token: str | None = None while _running: try: depth_detail = f"poll (queue={_queue_depth(conn)})" write_heartbeat(conn, detail=depth_detail) if not settings.worker_send_enabled: time.sleep(settings.worker_poll_interval_s) continue claimed = claim_one(conn) if claimed is None: time.sleep(settings.worker_poll_interval_s) continue if not creds: # TODO(T2): canalul real de creds per-cerere de la ROAAUTO. mark(conn, claimed["id"], "error", rar_error="creds RAR indisponibile (T2)") continue # Login lazy + token cache (JWT 30h). Re-login la expirare = T2. if rar is None or token is None: rar = RarClient(settings) token = rar.login(creds["email"], creds["password"]) write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok") process_one(conn, rar, token, claimed) except RarAuthError as exc: print(f"[worker] login esuat: {exc}", flush=True) token = None # forteaza re-login data viitoare time.sleep(settings.worker_poll_interval_s) except Exception as exc: # noqa: BLE001 — loop top-level, nu cadem la o eroare punctuala print(f"[worker] eroare neasteptata: {exc}", flush=True) time.sleep(settings.worker_poll_interval_s) if rar is not None: rar.close() conn.close() print("[worker] oprit curat", flush=True) return 0 def _queue_depth(conn) -> int: row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone() return int(row["n"]) if row else 0 if __name__ == "__main__": sys.exit(run())