feat(5.20): US-004/005/006/009 ingestie+API+worker+import pe mediu RAR
US-004: rezolva_rar_env (cerere>default cont>ancora globala) + MediuIndisponibil + cod RAR_MEDIU_INDISPONIBIL. US-005: camp rar_env pe POST /v1/prezentari + /valideaza (Literal), echo in SubmissionResult/ValidareResult/GET, build_key + INSERT env-aware. US-006: AccountSessions re-cheiat (account_id, rar_env); RarClient base_url per env; creds din slotul env; purge + recover_orphans scoped pe env (E1/1a, 1b/E6); claim_one propaga rar_env (1c/E8); keepalive pe ancora globala (M2). US-009: selector mediu la import (>=2 medii), eticheta la 1, banner la 0; commit seteaza rar_env pe submissions. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -39,7 +39,7 @@ from ..observ import log_event, set_source
|
||||
from ..mapping import DEFAULT_ACCOUNT_ID, upsert_nomenclator
|
||||
from ..payload import build_rar_payload
|
||||
from ..reconcile import match_finalizata
|
||||
from ..rar_client import RarAuthError, RarClient, RarError
|
||||
from ..rar_client import RarAuthError, RarClient, RarError, base_url_pentru_env
|
||||
|
||||
_running = True
|
||||
|
||||
@@ -167,7 +167,7 @@ def claim_one(conn) -> dict | None:
|
||||
conn.execute("BEGIN IMMEDIATE")
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT s.id, s.account_id, s.payload_json, s.rar_creds_enc "
|
||||
"SELECT s.id, s.account_id, s.payload_json, s.rar_creds_enc, s.rar_env "
|
||||
"FROM submissions s LEFT JOIN accounts a ON a.id = s.account_id "
|
||||
"WHERE s.status='queued' "
|
||||
"AND (s.next_attempt_at IS NULL OR s.next_attempt_at <= ?) "
|
||||
@@ -189,6 +189,7 @@ def claim_one(conn) -> dict | None:
|
||||
return {
|
||||
"id": row["id"],
|
||||
"account_id": row["account_id"] if row["account_id"] is not None else DEFAULT_ACCOUNT_ID,
|
||||
"rar_env": row["rar_env"],
|
||||
"creds_enc": row["rar_creds_enc"],
|
||||
"content": json.loads(row["payload_json"]),
|
||||
}
|
||||
@@ -281,11 +282,13 @@ def _handle_transient(conn, settings: Settings, rar: RarClient, token: str, sid:
|
||||
return "requeued"
|
||||
|
||||
|
||||
def recover_orphans(conn, settings: Settings, rar: RarClient, token: str, account_id: int | None = None) -> int:
|
||||
def recover_orphans(conn, settings: Settings, rar: RarClient, token: str,
|
||||
account_id: int | None = None, rar_env: str | None = None) -> int:
|
||||
"""Randuri 'sending' mai vechi de lease (worker mort mid-POST). Reconciliaza; altfel requeue.
|
||||
|
||||
`account_id` filtreaza la orfanii unui cont (login-ul e per-cont); None = toti
|
||||
(compat teste / single-account).
|
||||
`account_id` filtreaza la orfanii unui cont (login-ul e per-cont); None = toti.
|
||||
`rar_env` filtreaza la orfanii unui mediu (1b/E6): orfanii prod contra endpoint prod,
|
||||
NU contra test — altfel no-match -> re-POST prod = DUPLICAT real ireversibil.
|
||||
"""
|
||||
# Cutoff calculat SQLite-side, in ACELASI format ca sending_since (scris cu
|
||||
# datetime('now') in claim_one -> 'YYYY-MM-DD HH:MM:SS', cu spatiu). Daca am
|
||||
@@ -293,18 +296,18 @@ def recover_orphans(conn, settings: Settings, rar: RarClient, token: str, accoun
|
||||
# orice rand 'sending' sa para mereu <= cutoff -> lease-ul de 120s ignorat,
|
||||
# iar fiecare rand proaspat revendicat ar fi tratat instant ca orfan.
|
||||
lease = f"-{int(settings.worker_sending_lease_s)} seconds"
|
||||
base_sql = (
|
||||
"SELECT id, payload_json FROM submissions WHERE status='sending' "
|
||||
"AND (sending_since IS NULL OR sending_since <= datetime('now', ?))"
|
||||
)
|
||||
params: list = [lease]
|
||||
if account_id is not None:
|
||||
orphans = conn.execute(
|
||||
"SELECT id, payload_json FROM submissions WHERE status='sending' "
|
||||
"AND (sending_since IS NULL OR sending_since <= datetime('now', ?)) AND account_id=?",
|
||||
(lease, account_id),
|
||||
).fetchall()
|
||||
else:
|
||||
orphans = conn.execute(
|
||||
"SELECT id, payload_json FROM submissions WHERE status='sending' "
|
||||
"AND (sending_since IS NULL OR sending_since <= datetime('now', ?))",
|
||||
(lease,),
|
||||
).fetchall()
|
||||
base_sql += " AND account_id=?"
|
||||
params.append(account_id)
|
||||
if rar_env is not None:
|
||||
base_sql += " AND rar_env=?"
|
||||
params.append(rar_env)
|
||||
orphans = conn.execute(base_sql, params).fetchall()
|
||||
recovered = 0
|
||||
for row in orphans:
|
||||
sid = row["id"]
|
||||
@@ -337,25 +340,26 @@ def _refresh_nomenclator(conn, rar: RarClient, token: str) -> None:
|
||||
|
||||
|
||||
class AccountSessions:
|
||||
"""Sesiuni RAR per cont: login lazy cu creds din submission + cache JWT (30h).
|
||||
"""Sesiuni RAR per (cont, env): login lazy cu creds din submission + cache JWT (30h).
|
||||
|
||||
La primul login reusit pentru un cont sterge creds-urile criptate ale contului
|
||||
(token-ul in memorie acopera restul). Pe 401 mid-sesiune se invalideaza sesiunea
|
||||
-> re-login la urmatorul submission cu creds.
|
||||
Cheia = (account_id, rar_env): test si prod sunt sisteme RAR separate cu JWT separate.
|
||||
La primul login reusit pentru (cont, env) sterge creds-urile efemere scoped pe acel env.
|
||||
Pe 401 mid-sesiune se invalideaza sesiunea -> re-login la urmatorul submission cu creds.
|
||||
"""
|
||||
|
||||
def __init__(self, settings: Settings):
|
||||
self.settings = settings
|
||||
self._sessions: dict[int, tuple[RarClient, str]] = {}
|
||||
self._sessions: dict[tuple[int, str], tuple[RarClient, str]] = {}
|
||||
|
||||
def get_token(self, conn, account_id: int, creds: dict | None) -> str | None:
|
||||
"""Token valid pentru cont. Login daca lipseste din cache si avem creds; altfel None."""
|
||||
sess = self._sessions.get(account_id)
|
||||
def get_token(self, conn, account_id: int, creds: dict | None, rar_env: str = "test") -> str | None:
|
||||
"""Token valid pentru (cont, env). Login daca lipseste din cache si avem creds; altfel None."""
|
||||
key = (account_id, rar_env)
|
||||
sess = self._sessions.get(key)
|
||||
if sess is not None:
|
||||
return sess[1]
|
||||
if not creds or not creds.get("email") or not creds.get("password"):
|
||||
return None
|
||||
rar = RarClient(self.settings)
|
||||
rar = RarClient(self.settings, base_url=base_url_pentru_env(self.settings, rar_env))
|
||||
try:
|
||||
token = rar.login(creds["email"], creds["password"])
|
||||
except RarAuthError as exc:
|
||||
@@ -363,40 +367,52 @@ class AccountSessions:
|
||||
# Login esuat (401) — FARA email/parola (doar codul HTTP + contul).
|
||||
log_event("rar_login", nivel="WARNING", account_id=account_id,
|
||||
cod="RAR_CREDS_INVALIDE",
|
||||
mesaj=f"login RAR esuat (cont {account_id}): {exc.status_code or 401}",
|
||||
mesaj=f"login RAR esuat (cont {account_id}, env={rar_env}): {exc.status_code or 401}",
|
||||
context={"rezultat": "esuat", "http": exc.status_code or 401},
|
||||
conn=conn, sursa="worker")
|
||||
raise
|
||||
except Exception:
|
||||
rar.close()
|
||||
raise
|
||||
self._sessions[account_id] = (rar, token)
|
||||
write_heartbeat(conn, rar_login_ok=True, detail=f"login RAR ok (cont {account_id})")
|
||||
self._sessions[key] = (rar, token)
|
||||
write_heartbeat(conn, rar_login_ok=True, detail=f"login RAR ok (cont {account_id}, env={rar_env})")
|
||||
# Login reusit (fara email/parola in clar — context curat).
|
||||
log_event("rar_login", account_id=account_id, mesaj=f"login RAR ok (cont {account_id})",
|
||||
log_event("rar_login", account_id=account_id,
|
||||
mesaj=f"login RAR ok (cont {account_id}, env={rar_env})",
|
||||
context={"rezultat": "ok", "http": 200}, conn=conn, sursa="worker")
|
||||
# Creds efemere pe submissions nu mai sunt necesare: JWT acopera retry-urile -> sterge.
|
||||
# GATE PURJARE: sterge DOAR submissions.rar_creds_enc, NU accounts.rar_creds_enc.
|
||||
# Canal web: fallback exista in accounts -> purjarea e inofensiva (re-login dupa restart).
|
||||
# Canal API pur: purjarea e identica cu Treapta 1 (neatinsa).
|
||||
# GATE PURJARE: sterge DOAR submissions.rar_creds_enc (WHERE account_id=? AND rar_env=?),
|
||||
# NU accounts.rar_creds_{env}_enc. Scoped pe env: login test NU sterge creds prod (E1/1a).
|
||||
conn.execute(
|
||||
"UPDATE submissions SET rar_creds_enc=NULL WHERE account_id=? AND rar_creds_enc IS NOT NULL",
|
||||
(account_id,),
|
||||
"UPDATE submissions SET rar_creds_enc=NULL "
|
||||
"WHERE account_id=? AND rar_env=? AND rar_creds_enc IS NOT NULL",
|
||||
(account_id, rar_env),
|
||||
)
|
||||
# Nomenclator live (autoritativ) la fiecare login proaspat.
|
||||
# Nota (1e): nomenclatorul e presupus identic intre medii (aceleasi 18 coduri RAR);
|
||||
# daca diverge in viitor, scoparea per-env a tabelei nomenclator_rar e out of scope acum.
|
||||
_refresh_nomenclator(conn, rar, token)
|
||||
return token
|
||||
|
||||
def rar(self, account_id: int) -> RarClient:
|
||||
return self._sessions[account_id][0]
|
||||
def rar(self, account_id: int, rar_env: str = "test") -> RarClient:
|
||||
return self._sessions[(account_id, rar_env)][0]
|
||||
|
||||
def active(self) -> list[tuple[int, RarClient, str]]:
|
||||
return [(acct, rar, tok) for acct, (rar, tok) in self._sessions.items()]
|
||||
def active(self) -> list[tuple[int, str, RarClient, str]]:
|
||||
"""Sesiunile active: lista de (account_id, rar_env, RarClient, token)."""
|
||||
return [(acct, env, rar, tok) for (acct, env), (rar, tok) in self._sessions.items()]
|
||||
|
||||
def invalidate(self, account_id: int) -> None:
|
||||
sess = self._sessions.pop(account_id, None)
|
||||
if sess is not None:
|
||||
sess[0].close()
|
||||
def invalidate(self, account_id: int, rar_env: str | None = None) -> None:
|
||||
"""Invalideaza sesiunea (cont, env). rar_env=None invalideaza TOATE sesiunile contului."""
|
||||
if rar_env is not None:
|
||||
sess = self._sessions.pop((account_id, rar_env), None)
|
||||
if sess is not None:
|
||||
sess[0].close()
|
||||
else:
|
||||
# Invalideaza toate mediile pentru acest cont.
|
||||
keys_to_remove = [k for k in self._sessions if k[0] == account_id]
|
||||
for k in keys_to_remove:
|
||||
sess = self._sessions.pop(k)
|
||||
sess[0].close()
|
||||
|
||||
def close_all(self) -> None:
|
||||
for rar, _tok in self._sessions.values():
|
||||
@@ -414,33 +430,39 @@ def _creds_for(claimed: dict, settings: Settings) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
def _creds_from_account(conn, account_id: int) -> dict | None:
|
||||
"""Fallback: crede RAR durabile per-cont din accounts.rar_creds_enc.
|
||||
def _creds_from_account(conn, account_id: int, rar_env: str = "test") -> dict | None:
|
||||
"""Creds RAR durabile per-cont din slotul per-env, cu fallback la coloana legacy.
|
||||
|
||||
Canal web nu are re-pusher. Cand submission n-are creds (sterse dupa primul login
|
||||
sau upload web fara creds), worker-ul re-citeste din cont si poate re-login oricand.
|
||||
Canal web: creds in accounts.rar_creds_{rar_env}_enc (per-env). Fallback la
|
||||
accounts.rar_creds_enc (legacy, back-compat inainte de US-013 care dropa coloana veche).
|
||||
"""
|
||||
env_slot = f"rar_creds_{rar_env}_enc"
|
||||
row = conn.execute(
|
||||
"SELECT rar_creds_enc FROM accounts WHERE id=?", (account_id,)
|
||||
f"SELECT {env_slot}, rar_creds_enc FROM accounts WHERE id=?", (account_id,)
|
||||
).fetchone()
|
||||
if row and row["rar_creds_enc"]:
|
||||
return decrypt_creds(row["rar_creds_enc"])
|
||||
return None
|
||||
if not row:
|
||||
return None
|
||||
enc = row[env_slot] or row["rar_creds_enc"] # per-env intai, legacy fallback
|
||||
return decrypt_creds(enc) if enc else None
|
||||
|
||||
|
||||
def _keepalive_target(conn, settings: Settings) -> tuple[int | None, dict | None]:
|
||||
"""Un cont cu creds durabile pentru login-ul de proba (sau creds <test> in dev).
|
||||
|
||||
Ancora M2: cauta in slotul per-env al mediului `settings.rar_env` (ancora globala).
|
||||
Fallback la coloana legacy `rar_creds_enc` (back-compat inainte de US-013).
|
||||
Sare conturile ale caror creds NU se decripteaza sub cheia curenta — in dev
|
||||
`start.sh both` genereaza o cheie efemera noua la fiecare pornire, deci creds-urile
|
||||
durabile criptate sub cheia veche dau decrypt -> None. Fallback la creds <test>.
|
||||
`start.sh both` genereaza o cheie efemera noua la fiecare pornire.
|
||||
"""
|
||||
env_slot = f"rar_creds_{settings.rar_env}_enc"
|
||||
rows = conn.execute(
|
||||
"SELECT id, rar_creds_enc FROM accounts "
|
||||
"WHERE rar_creds_enc IS NOT NULL ORDER BY id"
|
||||
f"SELECT id, {env_slot}, rar_creds_enc FROM accounts ORDER BY id"
|
||||
).fetchall()
|
||||
for row in rows:
|
||||
creds = decrypt_creds(row["rar_creds_enc"])
|
||||
enc = row[env_slot] or row["rar_creds_enc"] # per-env intai, legacy fallback
|
||||
if not enc:
|
||||
continue
|
||||
creds = decrypt_creds(enc)
|
||||
if creds and creds.get("email") and creds.get("password"):
|
||||
return row["id"], creds
|
||||
if settings.worker_use_test_creds:
|
||||
@@ -478,9 +500,9 @@ def _maybe_keepalive(conn, settings: Settings, sessions: "AccountSessions", stat
|
||||
account_id, creds = _keepalive_target(conn, settings)
|
||||
if account_id is None or not creds:
|
||||
return # niciun cont cu creds durabile — nimic de sondat
|
||||
sessions.invalidate(account_id) # forteaza login real, nu token din cache
|
||||
sessions.invalidate(account_id, settings.rar_env) # forteaza login real pt env-ul global
|
||||
try:
|
||||
sessions.get_token(conn, account_id, creds) # reimprospateaza last_rar_login_ok la succes
|
||||
sessions.get_token(conn, account_id, creds, settings.rar_env) # reimprospateaza last_rar_login_ok la succes
|
||||
except RarAuthError:
|
||||
pass # creds invalide — deja logat in get_token (WARNING)
|
||||
except Exception as exc:
|
||||
@@ -527,8 +549,8 @@ def run() -> int:
|
||||
claimed = claim_one(conn)
|
||||
if claimed is None:
|
||||
# Nimic de trimis: recupereaza orfanii conturilor deja logate.
|
||||
for acct, rar, tok in sessions.active():
|
||||
recover_orphans(conn, settings, rar, tok, account_id=acct)
|
||||
for acct, env, rar, tok in sessions.active():
|
||||
recover_orphans(conn, settings, rar, tok, account_id=acct, rar_env=env)
|
||||
# Login de proba periodic ca dashboard-ul sa nu afiseze fals
|
||||
# "RAR inaccesibil" din lipsa de trafic (vezi _maybe_keepalive).
|
||||
_maybe_keepalive(conn, settings, sessions, _keepalive_state)
|
||||
@@ -537,18 +559,20 @@ def run() -> int:
|
||||
|
||||
sid = claimed["id"]
|
||||
account_id = claimed["account_id"]
|
||||
# Mediul tinta al trimiterii (per-submission, cu fallback la ancora globala).
|
||||
rar_env = claimed.get("rar_env") or settings.rar_env
|
||||
# Randul poarta creds proaspete (rar_creds_enc != NULL) — fie prima trimitere
|
||||
# a contului, fie o REACTIVARE dupa creds gresite. Invalidam sesiunea RAR
|
||||
# cache-uita ca un JWT vechi (30h) din parola GRESITA sa nu trimita cu ea,
|
||||
# ignorand corectia. Re-login imediat cu creds-urile noi.
|
||||
# cache-uita (per env) ca un JWT vechi (30h) din parola GRESITA sa nu trimita
|
||||
# cu ea, ignorand corectia. Re-login imediat cu creds-urile noi.
|
||||
if claimed.get("creds_enc"):
|
||||
sessions.invalidate(account_id)
|
||||
sessions.invalidate(account_id, rar_env)
|
||||
# Incearca creds din submission (canal API efemer), cu fallback la
|
||||
# accounts.rar_creds_enc (canal web durabil). Canal web n-are re-pusher.
|
||||
creds = _creds_for(claimed, settings) or _creds_from_account(conn, account_id)
|
||||
# accounts.rar_creds_{rar_env}_enc (canal web durabil, per-env).
|
||||
creds = _creds_for(claimed, settings) or _creds_from_account(conn, account_id, rar_env)
|
||||
|
||||
try:
|
||||
token = sessions.get_token(conn, account_id, creds)
|
||||
token = sessions.get_token(conn, account_id, creds, rar_env)
|
||||
except RarAuthError as exc:
|
||||
# Creds gresite (login 401): NU se face retry.
|
||||
mark(conn, sid, "error", rar_status_code=401,
|
||||
@@ -565,9 +589,9 @@ def run() -> int:
|
||||
requeue_with_backoff(conn, settings, sid, reason="creds RAR indisponibile (astept re-trimitere)")
|
||||
continue
|
||||
|
||||
rar = sessions.rar(account_id)
|
||||
# Recupereaza orfanii contului inainte de trimitere (acelasi token).
|
||||
recover_orphans(conn, settings, rar, token, account_id=account_id)
|
||||
rar = sessions.rar(account_id, rar_env)
|
||||
# Recupereaza orfanii contului + env-ului inainte de trimitere (acelasi token).
|
||||
recover_orphans(conn, settings, rar, token, account_id=account_id, rar_env=rar_env)
|
||||
# Guard: recover_orphans putea atinge chiar randul tocmai revendicat
|
||||
# (reconciliat 'sent' sau requeue 'queued'). Daca nu mai e 'sending',
|
||||
# NU mai face POST — altfel s-ar crea un duplicat la RAR.
|
||||
@@ -579,9 +603,9 @@ def run() -> int:
|
||||
try:
|
||||
process_one(conn, settings, rar, token, claimed)
|
||||
except RarAuthError as exc:
|
||||
# Token expirat mid-sesiune: invalideaza sesiunea, re-pune randul.
|
||||
print(f"[worker] cont {account_id} token expirat: {exc}; re-login data viitoare", flush=True)
|
||||
sessions.invalidate(account_id)
|
||||
# Token expirat mid-sesiune: invalideaza sesiunea (per env), re-pune randul.
|
||||
print(f"[worker] cont {account_id} env={rar_env} token expirat: {exc}; re-login data viitoare", flush=True)
|
||||
sessions.invalidate(account_id, rar_env)
|
||||
requeue_with_backoff(conn, settings, sid, reason="token RAR expirat")
|
||||
|
||||
except Exception as exc: # noqa: BLE001 — loop top-level: o eroare punctuala nu opreste worker-ul
|
||||
|
||||
Reference in New Issue
Block a user