Toate citirile pe coloana legacy accounts.rar_creds_enc mutate pe sloturile per-env (rar_creds_test_enc/rar_creds_prod_enc): worker fallback+keepalive, are_creds (web) si are_creds_rar (integrare, +are_creds_test/_prod), write-back API la reactivare, purjare la stergere cont, _get_acasa_context/_fetch_cont_env_state. Contract API (aditiv): POST /v1/conturi/rar-creds primeste rar_target optional (test/prod), scrie in slotul corect + activeaza mediul; DELETE primeste ?env (sterge un slot sau ambele). Documentat in docs/api-rar-contract.md. DROP cu garda in db.py (schema.sql fara coloana pe DB fresh): - 6a: eliminat ADD COLUMN rar_creds_enc (fara ping-pong re-ADD dupa DROP) - 6b: try/except fail-safe (nu crapa boot-ul) + garda sqlite_version >= 3.35 - 6c: re-backfill old->new imediat inainte de assert (ancora globala) - garda orfane: DROP anulat daca vreun creds legacy nu a aterizat in slot per-env - backup criptat accounts_rar_creds_enc_backup inainte de DROP - 6d: verificare prin PRAGMA table_info (NU grep — submissions are aceeasi coloana) Garda one-way, idempotenta la boot repetat (verificat). submissions.rar_creds_enc ramane neatinsa. tests/test_retragere_creds_enc.py: niciun read pe coloana veche, conturi rar-creds env-aware, are_creds per-env, DROP blocat de garda la lipsa copiere. 9 teste existente actualizate pe sloturi per-env. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
461 lines
21 KiB
Python
461 lines
21 KiB
Python
"""Acces SQLite (WAL). Conexiune per-thread, schema idempotenta, heartbeat worker."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from .config import get_settings
|
|
|
|
_SCHEMA = Path(__file__).resolve().parent / "schema.sql"
|
|
|
|
|
|
def _connect(db_path: Path) -> sqlite3.Connection:
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(db_path, timeout=15.0, isolation_level=None) # autocommit; tranzactii explicite
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode = WAL")
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
conn.execute("PRAGMA busy_timeout = 15000")
|
|
return conn
|
|
|
|
|
|
def get_connection() -> sqlite3.Connection:
|
|
"""Conexiune noua catre baza configurata. Apelantul o inchide."""
|
|
return _connect(get_settings().db_path)
|
|
|
|
|
|
def init_db() -> None:
|
|
"""Creeaza schema daca lipseste + migrari aditive. Idempotent — sigur la fiecare boot."""
|
|
conn = get_connection()
|
|
try:
|
|
conn.executescript(_SCHEMA.read_text(encoding="utf-8"))
|
|
_migrate(conn)
|
|
# Seed fallback nomenclator (doar daca e gol) ca editorul de mapari + fuzzy
|
|
# sa mearga inainte ca worker-ul sa fi luat lista live din RAR.
|
|
from .mapping import seed_nomenclator_if_empty
|
|
|
|
seed_nomenclator_if_empty(conn)
|
|
# Seed corpus operatii etichetate -> mapping_suggestions (SILVER, PRD 5.18 US-004).
|
|
# Gated: OFF in teste (conftest), ON in productie. INSERT OR IGNORE -> idempotent.
|
|
# DOAR daca mapping_suggestions e gol: seedul are ~17k randuri; re-rularea lui pe
|
|
# FIECARE boot (API + worker concurent) tinea write-lock-ul indelung -> al doilea
|
|
# proces primea "database is locked" la pornire. Guard "_if_empty" (ca nomenclatorul)
|
|
# -> boot rapid cand e deja seeded. Re-seed dupa actualizarea fisierului = manual
|
|
# (goleste tabela), consistent cu semantica v1 ignore-not-upsert a seederului.
|
|
if get_settings().seed_operatii_enabled:
|
|
already = conn.execute(
|
|
"SELECT 1 FROM mapping_suggestions LIMIT 1"
|
|
).fetchone()
|
|
if not already:
|
|
from .operatii_seed import seed_operatii_etichetate
|
|
|
|
seed_operatii_etichetate(conn)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _migrate(conn: sqlite3.Connection) -> None:
|
|
"""Migrari aditive pentru DB create inainte de o coloana noua (CREATE IF NOT EXISTS nu altereaza)."""
|
|
# Coloane submissions
|
|
sub_cols = {r["name"] for r in conn.execute("PRAGMA table_info(submissions)").fetchall()}
|
|
if "next_attempt_at" not in sub_cols:
|
|
conn.execute("ALTER TABLE submissions ADD COLUMN next_attempt_at TEXT")
|
|
if "rar_creds_enc" not in sub_cols:
|
|
conn.execute("ALTER TABLE submissions ADD COLUMN rar_creds_enc TEXT")
|
|
if "purge_after" not in sub_cols:
|
|
conn.execute("ALTER TABLE submissions ADD COLUMN purge_after TEXT")
|
|
if "batch_id" not in sub_cols:
|
|
conn.execute("ALTER TABLE submissions ADD COLUMN batch_id INTEGER")
|
|
if "row_index" not in sub_cols:
|
|
conn.execute("ALTER TABLE submissions ADD COLUMN row_index INTEGER")
|
|
if "rar_env" not in sub_cols:
|
|
# PRD 5.20 US-001. Mediul RAR tinta pe submission. Pe DB existent NU lasam
|
|
# randurile pe DEFAULT 'test': un rand prod pre-migrare etichetat 'test' ar fi
|
|
# reconciliat de worker (US-006) contra endpoint TEST -> no-match -> re-send prod
|
|
# = DUPLICAT REAL IREVERSIBIL. Backfill din AUTOPASS_RAR_ENV global (ancora de
|
|
# migrare) + recompute idempotency_key env-aware. Ruleaza O SINGURA DATA (in
|
|
# blocul de adaugare a coloanei); pe DB fresh coloana vine din schema.sql (fara rows).
|
|
conn.execute(
|
|
"ALTER TABLE submissions ADD COLUMN rar_env TEXT NOT NULL DEFAULT 'test' "
|
|
"CHECK (rar_env IN ('test', 'prod'))"
|
|
)
|
|
_backfill_submissions_rar_env(conn)
|
|
|
|
# Coloane accounts
|
|
acc_cols = {r["name"] for r in conn.execute("PRAGMA table_info(accounts)").fetchall()}
|
|
# AUTO-FIX 6a ELIMINAT (US-013): NU mai adaugam accounts.rar_creds_enc — coloana e dropata.
|
|
# _migrate_accounts_medii gestioneaza absenta coloana (guard la ~219: if "rar_creds_enc" not in acc_cols: return).
|
|
# Medii RAR per cont (PRD 5.20 US-001): activare + slot creds + default, per mediu.
|
|
_migrate_accounts_medii(conn, acc_cols)
|
|
# US-013: DROP coloana legacy accounts.rar_creds_enc dupa backfill complet.
|
|
_drop_legacy_accounts_rar_creds(conn, acc_cols)
|
|
if "active" not in acc_cols:
|
|
# Conturi existente raman active (default 1).
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN active INTEGER NOT NULL DEFAULT 1")
|
|
acc_cols.add("active")
|
|
if "status" not in acc_cols:
|
|
# Stare de ciclu de viata. Default 'active' (trece CHECK pe randurile existente),
|
|
# apoi derivam din `active`: active=0 -> 'pending'.
|
|
# Invariant: active=1 <=> status='active'.
|
|
conn.execute(
|
|
"ALTER TABLE accounts ADD COLUMN status TEXT NOT NULL DEFAULT 'active' "
|
|
"CHECK (status IN ('pending','active','blocked','archived','deleted'))"
|
|
)
|
|
conn.execute(
|
|
"UPDATE accounts SET status='pending' WHERE active=0 AND status='active'"
|
|
)
|
|
if "on_unmapped_error_default" not in acc_cols:
|
|
# Comportament la cod necunoscut/nemapat pe canalul API (default non-distructiv = 0).
|
|
conn.execute(
|
|
"ALTER TABLE accounts ADD COLUMN on_unmapped_error_default INTEGER NOT NULL DEFAULT 0 "
|
|
"CHECK (on_unmapped_error_default IN (0, 1))"
|
|
)
|
|
if "email" not in acc_cols:
|
|
# Email canonic de contact al firmei (US-001, PRD 5.12). Nullable pt. conturi legacy.
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN email TEXT")
|
|
if "tier" not in acc_cols:
|
|
# Plan de cont (US-001, PRD 5.17). Legacy -> 'free' fara trial (enforcement DUR la deploy).
|
|
conn.execute(
|
|
"ALTER TABLE accounts ADD COLUMN tier TEXT NOT NULL DEFAULT 'free' "
|
|
"CHECK (tier IN ('free','standard','pro','premium'))"
|
|
)
|
|
if "trial_until" not in acc_cols:
|
|
# Trial Pro activ daca != NULL si > now. Nullable (NULL = fara trial).
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN trial_until TEXT")
|
|
if "requested_plan" not in acc_cols:
|
|
# Planul cerut la signup (integrare plati). NU acorda drepturi; `tier` ramane sursa
|
|
# de adevar pt API/volum. Nullable. ALTER nu poate adauga CHECK pe coloana noua in
|
|
# SQLite -> validarea valorilor se face in cod (signup, fata de VALID_TIERS).
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN requested_plan TEXT")
|
|
if "consent_at" not in acc_cols:
|
|
# Marca temporala consimtamant Termeni+GDPR (proba). Nullable (NULL = CLI/legacy).
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN consent_at TEXT")
|
|
# Unicitate CUI (un CUI = un cont); NULL distinct nativ -> conturi fara CUI multiplu.
|
|
conn.execute(
|
|
"CREATE UNIQUE INDEX IF NOT EXISTS ux_accounts_cui ON accounts(cui) WHERE cui IS NOT NULL"
|
|
)
|
|
|
|
# Coloane users (DB cu users creata inaintea acestor coloane)
|
|
user_tbl = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'"
|
|
).fetchone()
|
|
if user_tbl:
|
|
user_cols = {r["name"] for r in conn.execute("PRAGMA table_info(users)").fetchall()}
|
|
if "is_admin" not in user_cols:
|
|
conn.execute("ALTER TABLE users ADD COLUMN is_admin INTEGER NOT NULL DEFAULT 0")
|
|
if "email_verified" not in user_cols:
|
|
conn.execute("ALTER TABLE users ADD COLUMN email_verified INTEGER NOT NULL DEFAULT 0")
|
|
|
|
# Coloana import_rows.override_json: patch canonic editat in preview, criptat Fernet.
|
|
irows_tbl = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='import_rows'"
|
|
).fetchone()
|
|
if irows_tbl:
|
|
irows_cols = {r["name"] for r in conn.execute("PRAGMA table_info(import_rows)").fetchall()}
|
|
if "override_json" not in irows_cols:
|
|
conn.execute("ALTER TABLE import_rows ADD COLUMN override_json TEXT")
|
|
if "reviewed" not in irows_cols:
|
|
# Marcaj confirmare umana (US-007, PRD 5.12). NU intra in payload/idempotenta.
|
|
# NOT NULL DEFAULT 0: valoare clara (0=neconfirmat), fara ambiguitate NULL vs 0.
|
|
conn.execute(
|
|
"ALTER TABLE import_rows ADD COLUMN reviewed INTEGER NOT NULL DEFAULT 0"
|
|
)
|
|
|
|
# Index batch_id pe submissions (poate lipsi pe DB veche)
|
|
existing_idx = {r["name"] for r in conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='submissions'"
|
|
).fetchall()}
|
|
if "idx_submissions_batch" not in existing_idx:
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_submissions_batch ON submissions(batch_id) "
|
|
"WHERE batch_id IS NOT NULL"
|
|
)
|
|
if "idx_submissions_account_status" not in existing_idx:
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_submissions_account_status "
|
|
"ON submissions(account_id, status)"
|
|
)
|
|
|
|
|
|
def _migrate_accounts_medii(conn: sqlite3.Connection, acc_cols: set[str]) -> None:
|
|
"""PRD 5.20 US-001: coloane medii RAR per cont + backfill din ancora globala.
|
|
|
|
Adauga (idempotent): rar_test_enabled/rar_prod_enabled (bife activare),
|
|
rar_creds_test_enc/rar_creds_prod_enc (sloturi creds), rar_env_default.
|
|
|
|
Backfill (O SINGURA DATA, cand coloanele tocmai au fost adaugate pe DB existent):
|
|
creds-ul legacy `rar_creds_enc` apartine mediului `AUTOPASS_RAR_ENV` global de la
|
|
momentul migrarii (ancora) — il copiem in slotul acelui mediu, activam DOAR acel
|
|
mediu (celalalt dezactivat) si fixam default-ul pe el. Conturile fara creds raman
|
|
pe default-urile coloanei (prod on / test off). Migrarea NU presupune env-ul; se
|
|
bazeaza pe ancora globala, exact cum opera contul inainte de 5.20.
|
|
"""
|
|
newly_added = "rar_env_default" not in acc_cols
|
|
if "rar_test_enabled" not in acc_cols:
|
|
conn.execute(
|
|
"ALTER TABLE accounts ADD COLUMN rar_test_enabled INTEGER NOT NULL DEFAULT 0 "
|
|
"CHECK (rar_test_enabled IN (0, 1))"
|
|
)
|
|
if "rar_prod_enabled" not in acc_cols:
|
|
conn.execute(
|
|
"ALTER TABLE accounts ADD COLUMN rar_prod_enabled INTEGER NOT NULL DEFAULT 1 "
|
|
"CHECK (rar_prod_enabled IN (0, 1))"
|
|
)
|
|
if "rar_creds_test_enc" not in acc_cols:
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN rar_creds_test_enc TEXT")
|
|
if "rar_creds_prod_enc" not in acc_cols:
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN rar_creds_prod_enc TEXT")
|
|
if "rar_env_default" not in acc_cols:
|
|
# ALTER nu poate adauga CHECK pe coloana noua in SQLite -> validarea ('test'/'prod')
|
|
# se face in cod (rar_env.py / rutele de cont). DEFAULT 'prod' (cont client nou).
|
|
conn.execute("ALTER TABLE accounts ADD COLUMN rar_env_default TEXT NOT NULL DEFAULT 'prod'")
|
|
|
|
if not newly_added:
|
|
return # coloanele existau deja -> backfill-ul a rulat la o pornire anterioara
|
|
|
|
# Are coloana legacy rar_creds_enc randuri de migrat? (Pe DB foarte nou, e absenta.)
|
|
if "rar_creds_enc" not in acc_cols:
|
|
return
|
|
env = get_settings().rar_env if get_settings().rar_env in ("test", "prod") else "test"
|
|
other = "prod" if env == "test" else "test"
|
|
slot = f"rar_creds_{env}_enc"
|
|
conn.execute(
|
|
f"UPDATE accounts SET {slot} = rar_creds_enc, "
|
|
f"rar_{env}_enabled = 1, rar_{other}_enabled = 0, rar_env_default = ? "
|
|
f"WHERE rar_creds_enc IS NOT NULL AND TRIM(rar_creds_enc) <> '' AND {slot} IS NULL",
|
|
(env,),
|
|
)
|
|
|
|
|
|
def _drop_legacy_accounts_rar_creds(conn: sqlite3.Connection, acc_cols: set[str]) -> None:
|
|
"""PRD 5.20 US-013: DROP coloana legacy `accounts.rar_creds_enc` dupa backfill complet.
|
|
|
|
Idempotent si sigur la fiecare boot (garda one-way: coloana absenta = nimic de facut).
|
|
La eroare LOGHEAZA si lasa coloana pe loc (fail-safe — nu crapa boot-ul ambelor procese).
|
|
Structura separata pentru testabilitate: `_garda_si_drop(conn)` expune pasul de
|
|
assert+backup+DROP izolat (fara re-backfill 6c), apelabil direct din teste.
|
|
"""
|
|
if "rar_creds_enc" not in acc_cols:
|
|
return # garda one-way: coloana deja dropata sau DB fresh
|
|
try:
|
|
_drop_legacy_rar_creds_impl(conn)
|
|
except Exception as exc:
|
|
print(
|
|
f"[db] AVERTISMENT: DROP coloana legacy accounts.rar_creds_enc esuat: {exc}. "
|
|
"Coloana ramane pe loc (fail-safe).",
|
|
flush=True,
|
|
)
|
|
|
|
|
|
def _drop_legacy_rar_creds_impl(conn: sqlite3.Connection) -> None:
|
|
"""Re-backfill (AUTO-FIX 6c) + delegate la _garda_si_drop.
|
|
|
|
Re-backfill-ul 6c acopera creds setate via POST /v1/conturi/rar-creds intre US-001
|
|
si US-013 (pot fi DOAR in coloana veche). Ancora globala: AUTOPASS_RAR_ENV.
|
|
"""
|
|
if sqlite3.sqlite_version_info < (3, 35, 0):
|
|
print(
|
|
f"[db] SQLite {sqlite3.sqlite_version} < 3.35.0 — DROP COLUMN nesuportat; "
|
|
"coloana legacy accounts.rar_creds_enc ramane.",
|
|
flush=True,
|
|
)
|
|
return
|
|
|
|
# AUTO-FIX 6c: re-backfill creds din coloana veche in slotul per-env (ancora globala).
|
|
# Independent de _migrate_accounts_medii (care sare pe DB deja migrat cu guard newly_added).
|
|
env = get_settings().rar_env if get_settings().rar_env in ("test", "prod") else "test"
|
|
slot = f"rar_creds_{env}_enc"
|
|
conn.execute(
|
|
f"UPDATE accounts SET {slot}=rar_creds_enc, rar_{env}_enabled=1 "
|
|
f"WHERE rar_creds_enc IS NOT NULL AND TRIM(rar_creds_enc)<>'' "
|
|
f"AND ({slot} IS NULL OR TRIM({slot})='')"
|
|
)
|
|
|
|
_garda_si_drop(conn)
|
|
|
|
|
|
def _garda_si_drop(conn: sqlite3.Connection) -> None:
|
|
"""Garda de siguranta + backup + DROP accounts.rar_creds_enc. Testabila izolat.
|
|
|
|
Verifica ca niciun cont nu are creds DOAR in coloana veche (ambele sloturi per-env goale).
|
|
Daca exista orfane -> NU dropa (fail-safe: fara pierdere de date).
|
|
Altfel: backup criptat, DROP, verificare PRAGMA (AUTO-FIX 6d).
|
|
"""
|
|
# Garda: orfane = cont cu creds in coloana veche DAR ambele sloturi per-env goale.
|
|
orphan_count = conn.execute(
|
|
"SELECT COUNT(*) FROM accounts "
|
|
"WHERE rar_creds_enc IS NOT NULL AND TRIM(rar_creds_enc)<>'' "
|
|
"AND (rar_creds_test_enc IS NULL OR TRIM(rar_creds_test_enc)='') "
|
|
"AND (rar_creds_prod_enc IS NULL OR TRIM(rar_creds_prod_enc)='')"
|
|
).fetchone()[0]
|
|
|
|
if orphan_count > 0:
|
|
print(
|
|
f"[db] AVERTISMENT: {orphan_count} cont(uri) cu rar_creds_enc ne-copiat in niciun slot "
|
|
"per-env. DROP anulat (fail-safe: fara pierdere de date).",
|
|
flush=True,
|
|
)
|
|
return
|
|
|
|
# Backup criptat inainte de DROP (blob-urile sunt deja criptate Fernet).
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS accounts_rar_creds_enc_backup "
|
|
"(account_id INTEGER, rar_creds_enc TEXT, backed_up_at TEXT)"
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO accounts_rar_creds_enc_backup "
|
|
"SELECT id, rar_creds_enc, datetime('now') FROM accounts "
|
|
"WHERE rar_creds_enc IS NOT NULL"
|
|
)
|
|
|
|
# DROP coloana legacy.
|
|
conn.execute("ALTER TABLE accounts DROP COLUMN rar_creds_enc")
|
|
|
|
# AUTO-FIX 6d: verifica prin PRAGMA (pe tabela accounts, NU grep — submissions are aceeasi coloana).
|
|
cols_after = {r["name"] for r in conn.execute("PRAGMA table_info(accounts)").fetchall()}
|
|
if "rar_creds_enc" in cols_after:
|
|
raise RuntimeError("DROP COLUMN rar_creds_enc esuat: coloana inca prezenta dupa ALTER TABLE")
|
|
|
|
print("[db] DROP coloana legacy accounts.rar_creds_enc: OK", flush=True)
|
|
|
|
|
|
def _backfill_submissions_rar_env(conn: sqlite3.Connection) -> None:
|
|
"""PRD 5.20 US-001 (AUTO-FIX G + E4/3): backfill rar_env + recompute idempotency_key.
|
|
|
|
Ruleaza O SINGURA DATA, imediat dupa ce coloana `submissions.rar_env` a fost adaugata
|
|
pe un DB existent. Toate randurile pre-migrare au fost trimise (sau urmeaza) catre
|
|
mediul `AUTOPASS_RAR_ENV` global — le etichetam cu acel env (NU DEFAULT 'test'), altfel
|
|
reconcilierea worker-ului ar lovi endpoint-ul gresit -> duplicat ireversibil.
|
|
|
|
Recompute `idempotency_key` la forma env-aware (`build_key(account_id, canon, rar_env)`):
|
|
altfel un re-POST al unui rand legacy (cheie env-less) ar rata randul existent ->
|
|
duplicat. Recompute-ul e consistent (acelasi env pe toate randurile pre-migrare) deci
|
|
nu poate crea coliziuni intre randuri care erau deja distincte.
|
|
"""
|
|
import json as _json
|
|
|
|
from .idempotency import build_key, canonicalize_row
|
|
|
|
env = get_settings().rar_env if get_settings().rar_env in ("test", "prod") else "test"
|
|
conn.execute("UPDATE submissions SET rar_env = ?", (env,))
|
|
|
|
rows = conn.execute(
|
|
"SELECT id, account_id, idempotency_key, payload_json FROM submissions"
|
|
).fetchall()
|
|
for r in rows:
|
|
try:
|
|
content = _json.loads(r["payload_json"])
|
|
except (ValueError, TypeError):
|
|
continue
|
|
canon = canonicalize_row(content)
|
|
# Pastreaza prestatiile rezolvate (cod_prestatie/cod_op_service) pentru _op_identity.
|
|
canon["prestatii"] = content.get("prestatii") or []
|
|
new_key = build_key(r["account_id"], canon, env)
|
|
if new_key == r["idempotency_key"]:
|
|
continue
|
|
try:
|
|
conn.execute(
|
|
"UPDATE submissions SET idempotency_key = ? WHERE id = ?",
|
|
(new_key, r["id"]),
|
|
)
|
|
except sqlite3.IntegrityError:
|
|
# Coliziune improbabila pe UNIQUE(idempotency_key): lasa cheia veche (no-op),
|
|
# randul ramane gasibil prin dual-lookup legacy.
|
|
continue
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
|
|
|
|
def write_heartbeat(conn: sqlite3.Connection, *, rar_login_ok: bool = False, detail: str = "") -> None:
|
|
"""Worker bate la fiecare iteratie. last_rar_login_ok se actualizeaza doar la login reusit."""
|
|
if rar_login_ok:
|
|
conn.execute(
|
|
"UPDATE worker_heartbeat SET last_beat=?, last_rar_login_ok=?, detail=? WHERE id=1",
|
|
(_now_iso(), _now_iso(), detail),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE worker_heartbeat SET last_beat=?, detail=? WHERE id=1",
|
|
(_now_iso(), detail),
|
|
)
|
|
|
|
|
|
def read_heartbeat(conn: sqlite3.Connection) -> sqlite3.Row | None:
|
|
return conn.execute("SELECT * FROM worker_heartbeat WHERE id=1").fetchone()
|
|
|
|
|
|
def queue_depth(conn: sqlite3.Connection) -> int:
|
|
row = conn.execute("SELECT COUNT(*) AS n FROM submissions WHERE status='queued'").fetchone()
|
|
return int(row["n"]) if row else 0
|
|
|
|
|
|
# --- Jurnal de aplicatie (app_events) ---
|
|
|
|
def insert_app_event(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
request_id: str | None,
|
|
account_id: int | None,
|
|
sursa: str,
|
|
tip: str,
|
|
nivel: str,
|
|
cod: str | None,
|
|
mesaj: str | None,
|
|
context_json: str | None,
|
|
purge_after: str | None,
|
|
) -> None:
|
|
"""Insert minimal intr-un rand app_events. Apelat DOAR prin observ.log_event
|
|
(care a redactat deja toate valorile). Nu redacteaza aici — separarea de
|
|
responsabilitati: db.py persista, observ.py/security.py curata."""
|
|
conn.execute(
|
|
"INSERT INTO app_events (request_id, account_id, sursa, tip, nivel, cod, mesaj, "
|
|
"context_json, purge_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(request_id, account_id, sursa, tip, nivel, cod, mesaj, context_json, purge_after),
|
|
)
|
|
|
|
|
|
def read_app_events(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
account_id: int | None = None,
|
|
tip: str | None = None,
|
|
nivel: str | None = None,
|
|
date_from: str | None = None,
|
|
date_to: str | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> list[sqlite3.Row]:
|
|
"""Citire paginata din app_events, ordine descrescatoare dupa id (cele mai noi intai).
|
|
|
|
account_id=None -> toate conturile (admin). account_id=int -> scoped pe cont
|
|
(NULL apartine contului 1, ca restul UI-ului). Filtrele tip/nivel/data sunt optionale.
|
|
"""
|
|
where: list[str] = []
|
|
params: list = []
|
|
if account_id is not None:
|
|
where.append("(account_id = ? OR (account_id IS NULL AND ? = 1))")
|
|
params.extend([account_id, account_id])
|
|
if tip:
|
|
where.append("tip = ?")
|
|
params.append(tip)
|
|
if nivel:
|
|
where.append("nivel = ?")
|
|
params.append(nivel)
|
|
if date_from:
|
|
where.append("date(ts) >= date(?)")
|
|
params.append(date_from)
|
|
if date_to:
|
|
where.append("date(ts) <= date(?)")
|
|
params.append(date_to)
|
|
sql = "SELECT id, ts, request_id, account_id, sursa, tip, nivel, cod, mesaj, context_json FROM app_events"
|
|
if where:
|
|
sql += " WHERE " + " AND ".join(where)
|
|
sql += " ORDER BY id DESC LIMIT ? OFFSET ?"
|
|
params.extend([limit, offset])
|
|
return conn.execute(sql, params).fetchall()
|