Structura repo conform plan.md sect. 4, booteaza cu /healthz verde:
- app/main.py: FastAPI (lifespan init_db), /healthz (worker viu + last login + queue), /metrics
- app/api/v1: POST /v1/prezentari (enqueue + dedup idempotency UNIQUE), GET prezentari/{id}, nomenclator, mapari
- app/rar_client.py: client RAR real (login/JWT, nomenclator, postPrezentare, getFinalizate) cu User-Agent obligatoriu (fix WAF 403)
- app/worker: proces separat, claim atomic BEGIN IMMEDIATE, heartbeat, login+send (send dezactivat by default)
- app/web: dashboard Jinja2+HTMX (coada, banner alerta blocate, worker viu/mort, stari empty)
- app/db.py + schema.sql: SQLite WAL, tabele accounts/api_keys/operations_mapping/nomenclator_rar/submissions/worker_heartbeat
- app/idempotency.py + payload.py: hash continut canonic + builder payload (status FINALIZATA, fara tipPrestatie)
- Dockerfile + docker-compose.yml (api+worker, volum SQLite persistent, restart:always)
- tools/import_dbf.py: stub T5
Verificat live: login prin rar_client OK (token 259), nomenclator 18 coduri, worker heartbeat -> /healthz worker_alive=True.
Ramas: T3 validare Pydantic, T4 snapshot payload, T2 reconciliere/retry worker, T5 import DBF, auth API-key, middleware redactare creds, criptare PII.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
157 lines
5.9 KiB
Python
157 lines
5.9 KiB
Python
"""Worker RAR — proces propriu (NU task asyncio in uvicorn; plan.md sect. 4).
|
|
|
|
Bucla: heartbeat -> claim atomic (BEGIN IMMEDIATE) -> login -> postPrezentare -> update.
|
|
Ruleaza ca proces separat sub `restart: always` (docker compose).
|
|
|
|
Schelet — ce E implementat: heartbeat, claim atomic anti-race, login cu token
|
|
cache, postPrezentare cu maparea erorilor de validare (400 -> needs_data).
|
|
Ce NU e inca (marcat TODO): reconcilierea anti-duplicat pe raspuns pierdut (T2),
|
|
retry/backoff exponential (T2), lease/timeout pe randuri 'sending' orfane (T2),
|
|
livrarea creds per-cerere de la ROAAUTO (T2 — in schelet folosim creds <test> local).
|
|
|
|
Pornire: python -m app.worker
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import signal
|
|
import sys
|
|
import time
|
|
|
|
from ..config import get_settings, load_test_credentials
|
|
from ..db import get_connection, init_db, write_heartbeat
|
|
from ..payload import build_rar_payload
|
|
from ..rar_client import RarAuthError, RarClient, RarError
|
|
|
|
_running = True
|
|
|
|
|
|
def _stop(signum: int, frame: object) -> None:
|
|
global _running
|
|
_running = False
|
|
|
|
|
|
def claim_one(conn) -> dict | None:
|
|
"""Claim atomic al unui rand 'queued' -> 'sending'. Intoarce randul sau None.
|
|
|
|
BEGIN IMMEDIATE ia lock de scriere imediat, deci doi workeri nu pot lua
|
|
acelasi rand. (Un singur worker in v1, dar claim-ul ramane corect la scalare.)
|
|
"""
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT id, payload_json FROM submissions WHERE status='queued' ORDER BY id LIMIT 1"
|
|
).fetchone()
|
|
if not row:
|
|
conn.execute("COMMIT")
|
|
return None
|
|
conn.execute(
|
|
"UPDATE submissions SET status='sending', sending_since=datetime('now'), "
|
|
"updated_at=datetime('now') WHERE id=?",
|
|
(row["id"],),
|
|
)
|
|
conn.execute("COMMIT")
|
|
return {"id": row["id"], "content": json.loads(row["payload_json"])}
|
|
except Exception:
|
|
conn.execute("ROLLBACK")
|
|
raise
|
|
|
|
|
|
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
|
|
conn.execute(
|
|
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
|
|
"updated_at=datetime('now') WHERE id=?",
|
|
(status, rar_status_code, rar_error, id_prezentare, submission_id),
|
|
)
|
|
|
|
|
|
def process_one(conn, rar: RarClient, token: str, claimed: dict) -> None:
|
|
"""Trimite o prezentare claimed. Mapeaza rezultatul pe masina de stari.
|
|
|
|
TODO(T2): inainte de re-send pe un rand ramas 'sending' (raspuns pierdut),
|
|
interogheaza get_finalizate pe VIN+dataPrestatie+odometruFinal si marcheaza
|
|
'sent' daca exista deja (anti-duplicat). UNIQUE NU acopera acest caz.
|
|
"""
|
|
sid = claimed["id"]
|
|
payload = build_rar_payload(claimed["content"])
|
|
try:
|
|
data = rar.post_prezentare(token, payload)
|
|
mark(conn, sid, "sent", rar_status_code=200, id_prezentare=data.get("id"))
|
|
print(f"[worker] submission {sid} -> sent (idPrezentare={data.get('id')})", flush=True)
|
|
except RarError as exc:
|
|
if exc.status_code == 400:
|
|
# Validare esuata la RAR -> needs_data (nu re-trimite orb).
|
|
detail = json.dumps(exc.field_errors, ensure_ascii=False) if exc.field_errors else str(exc)
|
|
mark(conn, sid, "needs_data", rar_status_code=400, rar_error=detail)
|
|
print(f"[worker] submission {sid} -> needs_data: {detail}", flush=True)
|
|
else:
|
|
# TODO(T2): retry/backoff in loc de error direct pe 5xx/tranzitoriu.
|
|
mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc))
|
|
print(f"[worker] submission {sid} -> error: {exc}", flush=True)
|
|
|
|
|
|
def run() -> int:
|
|
signal.signal(signal.SIGTERM, _stop)
|
|
signal.signal(signal.SIGINT, _stop)
|
|
|
|
settings = get_settings()
|
|
init_db()
|
|
conn = get_connection()
|
|
|
|
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
|
|
|
|
creds = load_test_credentials() if settings.worker_use_test_creds else None
|
|
rar: RarClient | None = None
|
|
token: str | None = None
|
|
|
|
while _running:
|
|
try:
|
|
depth_detail = f"poll (queue={_queue_depth(conn)})"
|
|
write_heartbeat(conn, detail=depth_detail)
|
|
|
|
if not settings.worker_send_enabled:
|
|
time.sleep(settings.worker_poll_interval_s)
|
|
continue
|
|
|
|
claimed = claim_one(conn)
|
|
if claimed is None:
|
|
time.sleep(settings.worker_poll_interval_s)
|
|
continue
|
|
|
|
if not creds:
|
|
# TODO(T2): canalul real de creds per-cerere de la ROAAUTO.
|
|
mark(conn, claimed["id"], "error", rar_error="creds RAR indisponibile (T2)")
|
|
continue
|
|
|
|
# Login lazy + token cache (JWT 30h). Re-login la expirare = T2.
|
|
if rar is None or token is None:
|
|
rar = RarClient(settings)
|
|
token = rar.login(creds["email"], creds["password"])
|
|
write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok")
|
|
|
|
process_one(conn, rar, token, claimed)
|
|
|
|
except RarAuthError as exc:
|
|
print(f"[worker] login esuat: {exc}", flush=True)
|
|
token = None # forteaza re-login data viitoare
|
|
time.sleep(settings.worker_poll_interval_s)
|
|
except Exception as exc: # noqa: BLE001 — loop top-level, nu cadem la o eroare punctuala
|
|
print(f"[worker] eroare neasteptata: {exc}", flush=True)
|
|
time.sleep(settings.worker_poll_interval_s)
|
|
|
|
if rar is not None:
|
|
rar.close()
|
|
conn.close()
|
|
print("[worker] oprit curat", flush=True)
|
|
return 0
|
|
|
|
|
|
def _queue_depth(conn) -> int:
|
|
row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone()
|
|
return int(row["n"]) if row else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(run())
|