Files
rar-autopass/app/worker/__main__.py
Claude Agent c05fa00007 fix(worker): keepalive RAR ca dashboard-ul sa nu afiseze fals "RAR inaccesibil"
Dashboard-ul deduce starea RAR din vechimea ultimului login reusit (>30h ->
"indisponibil?"). Cand coada e goala, worker-ul nu are de ce sa se logheze,
deci timestamp-ul devine stale si banner-ul "Blocat: RAR inaccesibil —
declaratiile NU pleaca" apare fals, desi RAR raspunde.

Worker-ul face acum un login de proba o data pe zi (interval configurabil,
24h < pragul de 30h) cand coada e goala: pe succes reimprospateaza
last_rar_login_ok; pe esec real last_rar_login_ok ramane vechi -> dashboard
degradeaza corect. Forteaza login real (invalideaza sesiunea) ca proba sa fie
autentica. Gating: cel mult o sondare pe interval, sa nu hartuiasca RAR jos.

_keepalive_target sare conturile ale caror creds NU se decripteaza sub cheia
curenta (start.sh both genereaza cheie efemera noua la fiecare pornire ->
creds durabile vechi dau decrypt None) si cade pe creds <test> in dev.

Teste: tests/test_worker_keepalive_rar.py (6).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 06:48:32 +00:00

599 lines
27 KiB
Python

"""Worker RAR — proces propriu (NU task asyncio in uvicorn).
Bucla: heartbeat -> recupereaza orfane -> claim atomic -> login -> postPrezentare -> update.
Ruleaza ca proces separat sub `restart: always` (docker compose).
- claim atomic anti-race (BEGIN IMMEDIATE), respecta next_attempt_at (backoff).
- reconciliere anti-duplicat pe raspuns pierdut: pe eroare tranzitorie/timeout SAU pe
rand 'sending' orfan (worker mort mid-POST), interogheaza finalizate si match pe
vin+dataPrestatie+odometruFinal; daca exista -> 'sent' (NU re-trimite).
- retry/backoff exponential pe erori tranzitorii; peste worker_max_retries -> 'error'.
- lease/timeout pe randuri 'sending' orfane.
- re-login la token expirat (401 mid-sesiune) — JWT 30h, retry NU plafonat la 30h.
Creds per-cerere: fiecare submission poarta creds RAR CRIPTATE (rar_creds_enc).
Worker-ul face login per CONT cu acele creds, cache-uieste JWT (30h) in memorie si
STERGE creds-urile contului dupa primul login reusit. Token-ul in memorie acopera
restul trimiterilor; la restart token-ul se pierde si contul re-logheaza la urmatorul
submission care aduce creds proaspete (degradare acceptata).
Dev: `worker_use_test_creds` foloseste creds <test> cand submission-ul nu are enc.
Pornire: python -m app.worker
"""
from __future__ import annotations
import json
import signal
import sys
import time
from datetime import datetime, timedelta, timezone
import httpx
from .. import errors
from ..config import Settings, get_settings, load_test_credentials
from ..crypto import decrypt_creds
from ..db import get_connection, init_db, read_heartbeat, write_heartbeat
from ..observ import log_event, set_source
from ..mapping import DEFAULT_ACCOUNT_ID, upsert_nomenclator
from ..payload import build_rar_payload
from ..reconcile import match_finalizata
from ..rar_client import RarAuthError, RarClient, RarError
_running = True
def _stop(signum: int, frame: object) -> None:
global _running
_running = False
def _now() -> datetime:
return datetime.now(timezone.utc)
def _iso(dt: datetime) -> str:
return dt.isoformat(timespec="seconds")
def _wlog(conn, tip: str, mesaj: str, *, nivel: str = "INFO", account_id=None, cod=None, context=None) -> None:
"""Emite evenimentul (sursa=worker, dublu canal DB+fisier) SI pastreaza linia in
stdout (operatorul tailuieste .run/worker.log)."""
print(f"[worker] {mesaj}", flush=True)
log_event(tip, nivel=nivel, account_id=account_id, cod=cod, mesaj=mesaj, context=context,
conn=conn, sursa="worker")
def _backoff_seconds(settings: Settings, retry_count: int) -> int:
return min(settings.worker_retry_base_s * (2 ** max(0, retry_count - 1)), settings.worker_retry_max_s)
def _is_transient(exc: Exception) -> bool:
"""Eroare in care POST-ul poate sa fi ajuns/sa nu fi ajuns la RAR -> reconciliere + retry."""
if isinstance(exc, (httpx.TimeoutException, httpx.TransportError)):
return True
if isinstance(exc, RarError) and not isinstance(exc, RarAuthError):
code = exc.status_code
return code is None or code >= 500 or code in (408, 429)
return False
# --- Operatii pe submissions ---
# Stari blocate ne-sent care primesc retentie proprie. Mai scurta decat cele 90z
# ale `sent`: un blocat n-are valoare de audit ca o trimitere reusita.
_BLOCKED_STATES = ("error", "needs_data", "needs_mapping")
def mark(conn, submission_id: int, status: str, *, rar_status_code=None, rar_error=None, id_prezentare=None) -> None:
if status == "sent":
# purge_after = sent + 90 zile (GDPR/L.142 retentie maxima).
purge_expr = "datetime('now', '+90 days')"
elif status in _BLOCKED_STATES:
# Randurile blocate primesc si ele purge_after (altfel raman permanent).
days = int(get_settings().blocked_retention_days)
purge_expr = f"datetime('now', '+{days} days')"
else:
purge_expr = None
if purge_expr is not None:
conn.execute(
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
f"sending_since=NULL, updated_at=datetime('now'), purge_after={purge_expr} WHERE id=?",
(status, rar_status_code, rar_error, id_prezentare, submission_id),
)
else:
conn.execute(
"UPDATE submissions SET status=?, rar_status_code=?, rar_error=?, id_prezentare=?, "
"sending_since=NULL, updated_at=datetime('now') WHERE id=?",
(status, rar_status_code, rar_error, id_prezentare, submission_id),
)
# Purge interval in secunde (odata pe ora, nu prea agresiv)
_PURGE_INTERVAL_S = 3600
def purge_expired(conn) -> dict[str, int]:
"""Sterge randurile expirate (purge_after < now).
Submissions `sent` SI blocate (error/needs_data/needs_mapping) expirate;
import_batches expirate (import_rows via CASCADE); app_events expirate (jurnal).
EXCLUDE explicit `queued`/`sending` (randuri active — nu se purjeaza niciodata, chiar
daca ar avea un purge_after rezidual; reactivarea il curata oricum).
Intoarce {submissions_purged, batches_purged, events_purged}.
"""
cur_sub = conn.execute(
"DELETE FROM submissions WHERE purge_after IS NOT NULL AND purge_after < datetime('now') "
"AND status IN ('sent','error','needs_data','needs_mapping')"
)
cur_batch = conn.execute(
"DELETE FROM import_batches WHERE purge_after IS NOT NULL AND purge_after < datetime('now')"
)
cur_events = conn.execute(
"DELETE FROM app_events WHERE purge_after IS NOT NULL AND purge_after < datetime('now')"
)
return {
"submissions_purged": cur_sub.rowcount,
"batches_purged": cur_batch.rowcount,
"events_purged": cur_events.rowcount,
}
def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason: str) -> None:
"""Re-pune randul in coada cu retry++ si next_attempt_at = now + backoff."""
row = conn.execute("SELECT retry_count FROM submissions WHERE id=?", (submission_id,)).fetchone()
new_retry = int(row["retry_count"]) + 1 if row else 1
if new_retry > settings.worker_max_retries:
mark(conn, submission_id, "error", rar_error=f"esuat dupa {new_retry-1} reincercari: {reason}")
print(f"[worker] submission {submission_id} -> error (max retries): {reason}", flush=True)
return
next_at = _iso(_now() + timedelta(seconds=_backoff_seconds(settings, new_retry)))
conn.execute(
"UPDATE submissions SET status='queued', retry_count=?, next_attempt_at=?, "
"rar_error=?, sending_since=NULL, updated_at=datetime('now') WHERE id=?",
(new_retry, next_at, reason, submission_id),
)
print(f"[worker] submission {submission_id} -> requeue (retry {new_retry}, next {next_at}): {reason}", flush=True)
def claim_one(conn) -> dict | None:
"""Claim atomic 'queued' -> 'sending', respectand next_attempt_at. Intoarce randul sau None.
Randul include `account_id` si `rar_creds_enc` (creds RAR criptate) pentru
login-ul per-cont din `run`.
"""
conn.execute("BEGIN IMMEDIATE")
try:
row = conn.execute(
"SELECT s.id, s.account_id, s.payload_json, s.rar_creds_enc "
"FROM submissions s LEFT JOIN accounts a ON a.id = s.account_id "
"WHERE s.status='queued' "
"AND (s.next_attempt_at IS NULL OR s.next_attempt_at <= ?) "
# Gate pe stare de cont: doar 'active' trimite. Derivam defensiv din `active`
# cand `status` lipseste (DB veche pre-migrare), pastrand active=1 <=> 'active'.
"AND COALESCE(a.status, CASE WHEN COALESCE(a.active,1)=1 THEN 'active' ELSE 'pending' END) = 'active' "
"ORDER BY s.id LIMIT 1",
(_iso(_now()),),
).fetchone()
if not row:
conn.execute("COMMIT")
return None
conn.execute(
"UPDATE submissions SET status='sending', sending_since=datetime('now'), "
"updated_at=datetime('now') WHERE id=?",
(row["id"],),
)
conn.execute("COMMIT")
return {
"id": row["id"],
"account_id": row["account_id"] if row["account_id"] is not None else DEFAULT_ACCOUNT_ID,
"creds_enc": row["rar_creds_enc"],
"content": json.loads(row["payload_json"]),
}
except Exception:
conn.execute("ROLLBACK")
raise
def reconcile_and_mark(conn, rar: RarClient, token: str, submission_id: int, content: dict) -> bool:
"""Cauta la RAR o prezentare deja inregistrata pentru acest continut. Marcheaza 'sent' daca exista.
Intoarce True daca a gasit (si a marcat sent), False altfel.
"""
finalizate = rar.get_finalizate(token)
found_id = match_finalizata(
finalizate,
vin=content.get("vin", ""),
data_prestatie=content.get("data_prestatie", ""),
odometru_final=content.get("odometru_final"),
)
if found_id is not None:
mark(conn, submission_id, "sent", rar_status_code=200, id_prezentare=found_id,
rar_error="reconciliat (raspuns pierdut)")
_wlog(conn, "submission_reconciliat",
f"submission {submission_id} -> sent prin reconciliere (idPrezentare={found_id})",
context={"submission_id": submission_id, "id_prezentare": found_id})
return True
return False
def process_one(conn, settings: Settings, rar: RarClient, token: str, claimed: dict) -> str:
"""Trimite o prezentare claimed. Intoarce starea finala (pentru teste/loguri)."""
sid = claimed["id"]
account_id = claimed.get("account_id")
content = claimed["content"]
payload = build_rar_payload(content)
try:
data = rar.post_prezentare(token, payload)
mark(conn, sid, "sent", rar_status_code=200, id_prezentare=data.get("id"))
_wlog(conn, "submission_sent", f"submission {sid} -> sent (idPrezentare={data.get('id')})",
account_id=account_id, context={"submission_id": sid, "id_prezentare": data.get("id")})
return "sent"
except RarError as exc:
if exc.status_code == 400:
if exc.field_errors:
enriched = [
errors.eroare("RAR_VALIDARE", field=fe.get("field"), cauza=fe.get("message"))
for fe in exc.field_errors
]
else:
enriched = [errors.eroare("RAR_VALIDARE", cauza=str(exc))]
detail = json.dumps(enriched, ensure_ascii=False)
mark(conn, sid, "needs_data", rar_status_code=400, rar_error=detail)
_wlog(conn, "submission_needs_data", f"submission {sid} -> needs_data (RAR 400)",
nivel="WARNING", account_id=account_id, cod="RAR_VALIDARE",
context={"submission_id": sid})
return "needs_data"
if exc.status_code == 500 and exc.rar_message:
# RAR a raspuns DEFINITIV cu o eroare de procesare (ex. ORA-12899). NU e o
# pierdere de raspuns ambigua -> NU reconcilia (recordul, daca exista la RAR,
# e PARTIAL/rupt si nu trebuie marcat fals 'sent') si NU reincerca (acelasi
# input va esua iar). Marcam 'error' cu mesajul real RAR.
detail = json.dumps(errors.eroare("RAR_EROARE_SERVER", cauza=exc.rar_message), ensure_ascii=False)
mark(conn, sid, "error", rar_status_code=500, rar_error=detail)
_wlog(conn, "submission_error", f"submission {sid} -> error (RAR 500): {exc.rar_message}",
nivel="ERROR", account_id=account_id, cod="RAR_EROARE_SERVER",
context={"submission_id": sid, "http": 500})
return "error"
if _is_transient(exc):
return _handle_transient(conn, settings, rar, token, sid, content, str(exc))
# 4xx nerecuperabil (nu 400/401/408/429) -> error.
mark(conn, sid, "error", rar_status_code=exc.status_code, rar_error=str(exc))
_wlog(conn, "submission_error", f"submission {sid} -> error: {exc}",
nivel="ERROR", account_id=account_id,
context={"submission_id": sid, "http": exc.status_code})
return "error"
except (httpx.TimeoutException, httpx.TransportError) as exc:
return _handle_transient(conn, settings, rar, token, sid, content, f"retea: {exc}")
def _handle_transient(conn, settings: Settings, rar: RarClient, token: str, sid: int, content: dict, reason: str) -> str:
"""Eroare ambigua: POST-ul poate sa fi ajuns la RAR. Reconciliaza intai; altfel retry/backoff."""
try:
if reconcile_and_mark(conn, rar, token, sid, content):
return "sent"
except RarError as exc:
# Reconcilierea insasi a esuat -> nu putem confirma; tratam ca tranzitoriu si retry.
reason = f"{reason}; reconciliere esuata: {exc}"
requeue_with_backoff(conn, settings, sid, reason=reason)
return "requeued"
def recover_orphans(conn, settings: Settings, rar: RarClient, token: str, account_id: int | None = None) -> int:
"""Randuri 'sending' mai vechi de lease (worker mort mid-POST). Reconciliaza; altfel requeue.
`account_id` filtreaza la orfanii unui cont (login-ul e per-cont); None = toti
(compat teste / single-account).
"""
# Cutoff calculat SQLite-side, in ACELASI format ca sending_since (scris cu
# datetime('now') in claim_one -> 'YYYY-MM-DD HH:MM:SS', cu spatiu). Daca am
# compara cu _iso() (format ISO cu 'T'), spatiul (0x20) < 'T' (0x54) ar face
# orice rand 'sending' sa para mereu <= cutoff -> lease-ul de 120s ignorat,
# iar fiecare rand proaspat revendicat ar fi tratat instant ca orfan.
lease = f"-{int(settings.worker_sending_lease_s)} seconds"
if account_id is not None:
orphans = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='sending' "
"AND (sending_since IS NULL OR sending_since <= datetime('now', ?)) AND account_id=?",
(lease, account_id),
).fetchall()
else:
orphans = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='sending' "
"AND (sending_since IS NULL OR sending_since <= datetime('now', ?))",
(lease,),
).fetchall()
recovered = 0
for row in orphans:
sid = row["id"]
content = json.loads(row["payload_json"])
try:
if reconcile_and_mark(conn, rar, token, sid, content):
recovered += 1
else:
# Nu e la RAR -> sigur de re-trimis.
requeue_with_backoff(conn, settings, sid, reason="orfan recuperat (nu exista la RAR)")
recovered += 1
except RarError as exc:
print(f"[worker] orfan {sid}: reconciliere esuata ({exc}); il las pentru data viitoare", flush=True)
return recovered
def _queue_depth(conn) -> int:
row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone()
return int(row["n"]) if row else 0
def _refresh_nomenclator(conn, rar: RarClient, token: str) -> None:
"""Ia nomenclatorul live din RAR si il upsert-eaza local. Erorile NU opresc worker-ul."""
try:
items = rar.get_nomenclator(token)
n = upsert_nomenclator(conn, items)
print(f"[worker] nomenclator refresh: {n} coduri", flush=True)
except Exception as exc: # noqa: BLE001 — refresh best-effort, nu blocheaza trimiterea
print(f"[worker] nomenclator refresh esuat (continui): {exc}", flush=True)
class AccountSessions:
"""Sesiuni RAR per cont: login lazy cu creds din submission + cache JWT (30h).
La primul login reusit pentru un cont sterge creds-urile criptate ale contului
(token-ul in memorie acopera restul). Pe 401 mid-sesiune se invalideaza sesiunea
-> re-login la urmatorul submission cu creds.
"""
def __init__(self, settings: Settings):
self.settings = settings
self._sessions: dict[int, tuple[RarClient, str]] = {}
def get_token(self, conn, account_id: int, creds: dict | None) -> str | None:
"""Token valid pentru cont. Login daca lipseste din cache si avem creds; altfel None."""
sess = self._sessions.get(account_id)
if sess is not None:
return sess[1]
if not creds or not creds.get("email") or not creds.get("password"):
return None
rar = RarClient(self.settings)
try:
token = rar.login(creds["email"], creds["password"])
except RarAuthError as exc:
rar.close()
# Login esuat (401) — FARA email/parola (doar codul HTTP + contul).
log_event("rar_login", nivel="WARNING", account_id=account_id,
cod="RAR_CREDS_INVALIDE",
mesaj=f"login RAR esuat (cont {account_id}): {exc.status_code or 401}",
context={"rezultat": "esuat", "http": exc.status_code or 401},
conn=conn, sursa="worker")
raise
except Exception:
rar.close()
raise
self._sessions[account_id] = (rar, token)
write_heartbeat(conn, rar_login_ok=True, detail=f"login RAR ok (cont {account_id})")
# Login reusit (fara email/parola in clar — context curat).
log_event("rar_login", account_id=account_id, mesaj=f"login RAR ok (cont {account_id})",
context={"rezultat": "ok", "http": 200}, conn=conn, sursa="worker")
# Creds efemere pe submissions nu mai sunt necesare: JWT acopera retry-urile -> sterge.
# GATE PURJARE: sterge DOAR submissions.rar_creds_enc, NU accounts.rar_creds_enc.
# Canal web: fallback exista in accounts -> purjarea e inofensiva (re-login dupa restart).
# Canal API pur: purjarea e identica cu Treapta 1 (neatinsa).
conn.execute(
"UPDATE submissions SET rar_creds_enc=NULL WHERE account_id=? AND rar_creds_enc IS NOT NULL",
(account_id,),
)
# Nomenclator live (autoritativ) la fiecare login proaspat.
_refresh_nomenclator(conn, rar, token)
return token
def rar(self, account_id: int) -> RarClient:
return self._sessions[account_id][0]
def active(self) -> list[tuple[int, RarClient, str]]:
return [(acct, rar, tok) for acct, (rar, tok) in self._sessions.items()]
def invalidate(self, account_id: int) -> None:
sess = self._sessions.pop(account_id, None)
if sess is not None:
sess[0].close()
def close_all(self) -> None:
for rar, _tok in self._sessions.values():
rar.close()
self._sessions.clear()
def _creds_for(claimed: dict, settings: Settings) -> dict | None:
"""Creds pentru un submission: decripteaza enc-ul lipit; altfel creds <test> (dev)."""
creds = decrypt_creds(claimed.get("creds_enc"))
if creds:
return creds
if settings.worker_use_test_creds:
return load_test_credentials()
return None
def _creds_from_account(conn, account_id: int) -> dict | None:
"""Fallback: crede RAR durabile per-cont din accounts.rar_creds_enc.
Canal web nu are re-pusher. Cand submission n-are creds (sterse dupa primul login
sau upload web fara creds), worker-ul re-citeste din cont si poate re-login oricand.
"""
row = conn.execute(
"SELECT rar_creds_enc FROM accounts WHERE id=?", (account_id,)
).fetchone()
if row and row["rar_creds_enc"]:
return decrypt_creds(row["rar_creds_enc"])
return None
def _keepalive_target(conn, settings: Settings) -> tuple[int | None, dict | None]:
"""Un cont cu creds durabile pentru login-ul de proba (sau creds <test> in dev).
Sare conturile ale caror creds NU se decripteaza sub cheia curenta — in dev
`start.sh both` genereaza o cheie efemera noua la fiecare pornire, deci creds-urile
durabile criptate sub cheia veche dau decrypt -> None. Fallback la creds <test>.
"""
rows = conn.execute(
"SELECT id, rar_creds_enc FROM accounts "
"WHERE rar_creds_enc IS NOT NULL ORDER BY id"
).fetchall()
for row in rows:
creds = decrypt_creds(row["rar_creds_enc"])
if creds and creds.get("email") and creds.get("password"):
return row["id"], creds
if settings.worker_use_test_creds:
return DEFAULT_ACCOUNT_ID, load_test_credentials()
return None, None
def _maybe_keepalive(conn, settings: Settings, sessions: "AccountSessions", state: dict) -> None:
"""Login de proba periodic cand coada e goala — verifica reachability RAR si
pastreaza last_rar_login_ok proaspat ca dashboard-ul sa nu afiseze fals
'RAR inaccesibil' doar din lipsa de trafic.
Sondeaza la cel mult o data pe interval (si pe succes, si pe esec): pe succes
heartbeat-ul se reimprospateaza singur; pe esec real (RAR jos) last_rar_login_ok
ramane vechi -> dashboard-ul degradeaza corect. Forteaza login real (invalideaza
sesiunea cache-uita) ca proba sa fie autentica, nu un token vechi din cache.
"""
interval = settings.worker_rar_keepalive_interval_s
if interval <= 0:
return
hb = read_heartbeat(conn)
last = hb["last_rar_login_ok"] if hb else None
if last:
try:
age = (datetime.now(timezone.utc) - datetime.fromisoformat(last)).total_seconds()
if age < interval:
return # login inca proaspat — nimic de facut
except (ValueError, TypeError):
pass
now_ts = time.time()
if now_ts - state["last_attempt"] < interval:
return # deja am incercat recent (nu hartui RAR daca e jos)
state["last_attempt"] = now_ts
account_id, creds = _keepalive_target(conn, settings)
if account_id is None or not creds:
return # niciun cont cu creds durabile — nimic de sondat
sessions.invalidate(account_id) # forteaza login real, nu token din cache
try:
sessions.get_token(conn, account_id, creds) # reimprospateaza last_rar_login_ok la succes
except RarAuthError:
pass # creds invalide — deja logat in get_token (WARNING)
except Exception as exc:
# RAR indisponibil: last_rar_login_ok ramane vechi (corect). Nu propaga.
log_event("rar_keepalive", nivel="WARNING", account_id=account_id,
mesaj=f"keepalive RAR esuat (cont {account_id}): {type(exc).__name__}",
context={"rezultat": "esuat"}, conn=conn, sursa="worker")
def run() -> int:
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)
settings = get_settings()
set_source("worker") # evenimentele worker-ului au sursa=worker (fisier app-worker.log)
init_db()
conn = get_connection()
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
sessions = AccountSessions(settings)
_last_purge_time: float = 0.0
_keepalive_state = {"last_attempt": 0.0}
while _running:
try:
write_heartbeat(conn, detail=f"poll (queue={_queue_depth(conn)})")
# Purjare periodica (odata pe ora) — NU mai frecvent.
now_ts = time.time()
if now_ts - _last_purge_time >= _PURGE_INTERVAL_S:
stats = purge_expired(conn)
if stats["submissions_purged"] or stats["batches_purged"] or stats["events_purged"]:
print(
f"[worker] purjare: {stats['submissions_purged']} submissions, "
f"{stats['batches_purged']} batches, {stats['events_purged']} evenimente sterse",
flush=True,
)
_last_purge_time = now_ts
if not settings.worker_send_enabled:
time.sleep(settings.worker_poll_interval_s)
continue
claimed = claim_one(conn)
if claimed is None:
# Nimic de trimis: recupereaza orfanii conturilor deja logate.
for acct, rar, tok in sessions.active():
recover_orphans(conn, settings, rar, tok, account_id=acct)
# Login de proba periodic ca dashboard-ul sa nu afiseze fals
# "RAR inaccesibil" din lipsa de trafic (vezi _maybe_keepalive).
_maybe_keepalive(conn, settings, sessions, _keepalive_state)
time.sleep(settings.worker_poll_interval_s)
continue
sid = claimed["id"]
account_id = claimed["account_id"]
# Randul poarta creds proaspete (rar_creds_enc != NULL) — fie prima trimitere
# a contului, fie o REACTIVARE dupa creds gresite. Invalidam sesiunea RAR
# cache-uita ca un JWT vechi (30h) din parola GRESITA sa nu trimita cu ea,
# ignorand corectia. Re-login imediat cu creds-urile noi.
if claimed.get("creds_enc"):
sessions.invalidate(account_id)
# Incearca creds din submission (canal API efemer), cu fallback la
# accounts.rar_creds_enc (canal web durabil). Canal web n-are re-pusher.
creds = _creds_for(claimed, settings) or _creds_from_account(conn, account_id)
try:
token = sessions.get_token(conn, account_id, creds)
except RarAuthError as exc:
# Creds gresite (login 401): NU se face retry.
mark(conn, sid, "error", rar_status_code=401,
rar_error=json.dumps(errors.eroare("RAR_CREDS_INVALIDE", cauza="credentiale RAR invalide"), ensure_ascii=False))
# rar_login esuat e deja logat in get_token; aici doar tranzitia submission-ului.
_wlog(conn, "submission_error", f"submission {sid} (cont {account_id}) -> error: creds RAR invalide",
nivel="ERROR", account_id=account_id, cod="RAR_CREDS_INVALIDE",
context={"submission_id": sid, "http": 401})
continue
if token is None:
# Fara creds disponibile (token pierdut la restart + creds sterse).
# Re-pune in coada cu backoff; ROAAUTO re-trimite creds proaspete.
requeue_with_backoff(conn, settings, sid, reason="creds RAR indisponibile (astept re-trimitere)")
continue
rar = sessions.rar(account_id)
# Recupereaza orfanii contului inainte de trimitere (acelasi token).
recover_orphans(conn, settings, rar, token, account_id=account_id)
# Guard: recover_orphans putea atinge chiar randul tocmai revendicat
# (reconciliat 'sent' sau requeue 'queued'). Daca nu mai e 'sending',
# NU mai face POST — altfel s-ar crea un duplicat la RAR.
still_sending = conn.execute(
"SELECT 1 FROM submissions WHERE id=? AND status='sending'", (sid,)
).fetchone()
if still_sending is None:
continue
try:
process_one(conn, settings, rar, token, claimed)
except RarAuthError as exc:
# Token expirat mid-sesiune: invalideaza sesiunea, re-pune randul.
print(f"[worker] cont {account_id} token expirat: {exc}; re-login data viitoare", flush=True)
sessions.invalidate(account_id)
requeue_with_backoff(conn, settings, sid, reason="token RAR expirat")
except Exception as exc: # noqa: BLE001 — loop top-level: o eroare punctuala nu opreste worker-ul
print(f"[worker] eroare neasteptata: {exc}", flush=True)
time.sleep(settings.worker_poll_interval_s)
sessions.close_all()
conn.close()
print("[worker] oprit curat", flush=True)
return 0
if __name__ == "__main__":
sys.exit(run())