Compare commits

...

3 Commits

Author SHA1 Message Date
Claude Agent
6fb92466cb feat(T6/T7): supervizare worker (healthcheck+autoheal) + backup online + cheie partajata
T6 — worker supravegheat:
- app/worker/healthcheck.py: probe pe heartbeat-ul din DB (beat invechit -> exit 1).
  Prinde worker-ul agatat (proces viu, beat inghetat) pe care restart:always nu-l
  vede. Cablat ca healthcheck pe serviciul worker in compose.
- sidecar autoheal: restarteaza efectiv containerul unhealthy (compose simplu doar
  marcheaza, nu restarteaza la unhealthy).

T7 — deploy:
- tools/backup.py: backup ONLINE via Connection.backup (WAL nu se copiaza sigur cu
  cp); --keep N roteste snapshot-urile.
- .env.example documenteaza env-urile; volum persistent numit deja in compose.

Fix critic (split api/worker in 2 containere): AUTOPASS_CREDS_KEY trebuie PARTAJATA
api<->worker, altfel worker nu decripteaza creds-urile criptate de API -> submission
blocate. Acum impusa in compose (${...:?} -> fail explicit daca lipseste).
.gitignore: exceptie !.env.example.

5 teste noi (tests/test_deploy.py). 100 pass total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 20:20:18 +00:00
Claude Agent
fbb2695336 feat(creds): livrare creds per-cerere la worker (criptat efemer + sesiuni per-cont)
Plan sect.5: parola RAR vine per-cerere, stocata CRIPTATA in submission pana la
primul login reusit pe cont, apoi stearsa; JWT 30h acopera restul.

- app/crypto.py: Fernet, cheie din AUTOPASS_creds_key (nesetata -> efemera la
  runtime, creds nu supravietuiesc restartului). encrypt/decrypt_creds.
- schema + migrare: submissions.rar_creds_enc (creds criptate).
- ingestie: cripteaza rar_credentials, le lipeste de fiecare submission nou.
  Niciodata in clar in DB.
- worker: AccountSessions (login per-cont cu creds decriptate, cache JWT in
  memorie, sterge creds-urile contului dupa primul login + refresh nomenclator).
  401 creds gresite -> error fara retry; token expirat -> invalidare + requeue;
  fara creds (restart) -> requeue "indisponibile" (ROAAUTO re-trimite).
  claim_one intoarce account_id + creds_enc; recover_orphans filtrabil pe cont.
- requirements: cryptography==46.0.5.

Nota: refresh nomenclator e acum lazy la primul login per-cont (nu la pornire);
seed-ul fallback acopera editorul offline.

10 teste noi (tests/test_creds_delivery.py). 95 pass total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 20:16:16 +00:00
Claude Agent
c17c1aa4f4 feat(securitate-CORE): redactare creds + auth API-key per cont
Redactare:
- handler RequestValidationError dropeaza input/ctx din 422 (vectorul de
  scurgere a rar_credentials.password pe /v1/prezentari); pastreaza type/loc/msg
- app/security.py: scrub/scrub_text + CredentialRedactingFilter pe root+uvicorn
- models.py: password cu repr=False

Auth API-key:
- app/auth.py: hash SHA-256 in api_keys (cheia in clar emisa o singura data),
  header X-API-Key / Authorization: Bearer, dependency resolve_account_id
- enforcement pe flag AUTOPASS_require_api_key (prod on->401, dev off->cont
  default id=1; cheie prezenta invalida->401 mereu)
- account_id real curge din cheie in ingestie + mapare
- tools/apikey.py: CLI create/rotate/revoke/list (fara endpoint HTTP admin)

16 teste noi (tests/test_security.py). 85 pass total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 20:02:07 +00:00
21 changed files with 1424 additions and 56 deletions

21
.env.example Normal file
View File

@@ -0,0 +1,21 @@
# Gateway RAR AUTOPASS — variabile de mediu (copiaza in .env; .env NU se comite).
# Compose citeste .env automat. Prefix AUTOPASS_ pentru toate.
# --- CRITIC: cheie criptare creds RAR (Fernet) ---
# PARTAJATA intre api si worker (API cripteaza, worker decripteaza). Genereaza:
# python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
AUTOPASS_CREDS_KEY=
# --- Auth API-key ---
# true = orice /v1/* cere cheie valida (prod). false = dev (fara cheie -> cont id=1).
AUTOPASS_REQUIRE_API_KEY=false
# --- Worker ---
# Send catre RAR. false = nu trimite (default, sigur pentru probe). true = end-to-end.
AUTOPASS_WORKER_SEND_ENABLED=false
# Dev: foloseste creds <test> din settings.xml cand submission-ul nu are creds criptate.
AUTOPASS_WORKER_USE_TEST_CREDS=false
# --- RAR ---
# test | prod
AUTOPASS_RAR_ENV=test

1
.gitignore vendored
View File

@@ -13,6 +13,7 @@ settings.xml
*.key *.key
.env .env
.env.* .env.*
!.env.example
# --- VFP: programe compilate (se regenerează din .prg) --- # --- VFP: programe compilate (se regenerează din .prg) ---
*.fxp *.fxp

View File

@@ -13,9 +13,11 @@ from __future__ import annotations
import json import json
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from ...auth import resolve_account_id
from ...crypto import encrypt_creds
from ...db import get_connection from ...db import get_connection
from ...idempotency import idempotency_key from ...idempotency import idempotency_key
from ...mapping import ( from ...mapping import (
@@ -33,18 +35,25 @@ router = APIRouter(prefix="/v1", tags=["v1"])
@router.post("/prezentari", response_model=PrezentariResponse) @router.post("/prezentari", response_model=PrezentariResponse)
def create_prezentari(req: PrezentareRequest) -> PrezentariResponse: def create_prezentari(
req: PrezentareRequest,
account_id: int = Depends(resolve_account_id),
) -> PrezentariResponse:
"""Enqueue una/mai multe prezentari. Idempotent: continut identic -> acelasi submission. """Enqueue una/mai multe prezentari. Idempotent: continut identic -> acelasi submission.
Validarea de continut (T3, app.validation) ruleaza inainte de enqueue: Validarea de continut (T3, app.validation) ruleaza inainte de enqueue:
esecurile NU resping cererea, ci enqueue-aza cu status `needs_data` + motiv esecurile NU resping cererea, ci enqueue-aza cu status `needs_data` + motiv
(plan.md sect. 3). JSON malformat -> 422 din Pydantic (validare de shape). (plan.md sect. 3). JSON malformat -> 422 din Pydantic (validare de shape).
TODO(auth): rezolva account_id din API key (acum None). account_id vine din cheia API (resolve_account_id): cont real cu cheie,
implicit id=1 in dev fara cheie, 401 fara cheie valida in prod.
Nota: rar_credentials NU se persista (zero-storage) — worker-ul le va primi Nota: rar_credentials NU se persista (zero-storage) — worker-ul le va primi
pe alt canal (T2); in schelet enqueue-ul doar stocheaza prezentarea. pe alt canal (T2); in schelet enqueue-ul doar stocheaza prezentarea.
""" """
account_id = None # TODO(auth): din API key
acct = account_or_default(account_id) acct = account_or_default(account_id)
# Creds RAR efemere: criptate si lipite de fiecare submission nou pana la
# primul login reusit pentru cont (worker le sterge atunci). Zero-storage at
# rest — niciodata in clar in DB/loguri (plan sect. 5).
creds_enc = encrypt_creds(req.rar_credentials.model_dump())
conn = get_connection() conn = get_connection()
results: list[SubmissionResult] = [] results: list[SubmissionResult] = []
try: try:
@@ -86,9 +95,9 @@ def create_prezentari(req: PrezentareRequest) -> PrezentariResponse:
status, rar_error = "queued", None status, rar_error = "queued", None
cur = conn.execute( cur = conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_error) " "INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_error, rar_creds_enc) "
"VALUES (?, ?, ?, ?, ?)", "VALUES (?, ?, ?, ?, ?, ?)",
(key, acct, status, json.dumps(content, ensure_ascii=False), rar_error), (key, acct, status, json.dumps(content, ensure_ascii=False), rar_error, creds_enc),
) )
results.append(SubmissionResult(submission_id=int(cur.lastrowid), status=status)) results.append(SubmissionResult(submission_id=int(cur.lastrowid), status=status))
finally: finally:
@@ -174,18 +183,21 @@ def get_mapari_pending() -> dict:
class MapareIn(BaseModel): class MapareIn(BaseModel):
account_id: int | None = None
cod_op_service: str = Field(..., min_length=1) cod_op_service: str = Field(..., min_length=1)
cod_prestatie: str = Field(..., min_length=1) cod_prestatie: str = Field(..., min_length=1)
auto_send: bool = True auto_send: bool = True
@router.post("/mapari") @router.post("/mapari")
def create_mapare(req: MapareIn) -> dict: def create_mapare(
req: MapareIn,
account_id: int = Depends(resolve_account_id),
) -> dict:
"""Salveaza/actualizeaza o mapare op->cod si re-rezolva submission-urile blocate. """Salveaza/actualizeaza o mapare op->cod si re-rezolva submission-urile blocate.
Verifica intai ca `cod_prestatie` exista in nomenclator (nu lasam mapari catre Contul vine din cheia API (NU din body) — un cont nu poate edita maparile
coduri inexistente). Apoi upsert + re-rezolvare automata a `needs_mapping`. altuia. Verifica intai ca `cod_prestatie` exista in nomenclator (nu lasam
mapari catre coduri inexistente). Apoi upsert + re-rezolvare `needs_mapping`.
""" """
conn = get_connection() conn = get_connection()
try: try:
@@ -195,8 +207,8 @@ def create_mapare(req: MapareIn) -> dict:
).fetchone() ).fetchone()
if not exists: if not exists:
raise HTTPException(status_code=422, detail=f"cod_prestatie '{cod}' nu exista in nomenclator") raise HTTPException(status_code=422, detail=f"cod_prestatie '{cod}' nu exista in nomenclator")
save_mapping(conn, req.account_id, req.cod_op_service, cod, req.auto_send) save_mapping(conn, account_id, req.cod_op_service, cod, req.auto_send)
stats = reresolve_account(conn, req.account_id) stats = reresolve_account(conn, account_id)
return {"saved": {"cod_op_service": req.cod_op_service.strip(), "cod_prestatie": cod}, "reresolve": stats} return {"saved": {"cod_op_service": req.cod_op_service.strip(), "cod_prestatie": cod}, "reresolve": stats}
finally: finally:
conn.close() conn.close()

140
app/auth.py Normal file
View File

@@ -0,0 +1,140 @@
"""Autentificare API-key per cont (CORE securitate).
Cheile API sunt separate de credentialele RAR: identifica CONTUL ROAAUTO care
foloseste gateway-ul, nu utilizatorul RAR. Stocam doar SHA-256 al cheii (tabela
`api_keys`), niciodata cheia in clar — emisa o singura data la creare.
Enforcement controlat de `AUTOPASS_require_api_key`:
- True (prod): orice /v1/* protejat cere cheie valida -> 401 fara/cu cheie gresita.
- False (dev/test): fara cheie -> cont implicit id=1 (back-compat); cheie prezenta
dar invalida -> tot 401 (o cheie gresita nu trebuie sa treaca tacit).
Lifecycle (emitere/rotire/revocare) se face din CLI `python -m tools.apikey`
(adminul ruleaza pe masina) — nicio suprafata HTTP de admin de securizat.
"""
from __future__ import annotations
import hashlib
import secrets
import sqlite3
from fastapi import Header, HTTPException
from .config import get_settings
from .db import get_connection
from .mapping import DEFAULT_ACCOUNT_ID
KEY_PREFIX = "rfak_" # romfast autopass key
def generate_key() -> str:
"""Cheie noua in clar (emisa o singura data). `rfak_` + 43 caractere urlsafe."""
return KEY_PREFIX + secrets.token_urlsafe(32)
def hash_key(plaintext: str) -> str:
"""SHA-256 hex al cheii. Comparam mereu pe hash, niciodata pe clar."""
return hashlib.sha256(plaintext.strip().encode("utf-8")).hexdigest()
def create_api_key(conn: sqlite3.Connection, account_id: int) -> str:
"""Emite o cheie pentru cont, stocheaza hash-ul, intoarce cheia in clar.
Verifica intai ca exista contul (FK ON DELETE CASCADE nu raporteaza un cont
inexistent la INSERT decat ca eroare obscura). Intoarce cheia in clar — NU se
mai poate recupera ulterior.
"""
acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone()
if not acct:
raise ValueError(f"cont inexistent: {account_id}")
plaintext = generate_key()
conn.execute(
"INSERT INTO api_keys (account_id, key_hash, active) VALUES (?, ?, 1)",
(account_id, hash_key(plaintext)),
)
return plaintext
def revoke_api_key(conn: sqlite3.Connection, key_id: int) -> bool:
"""Revoca o cheie (active=0 + revoked_at). Intoarce True daca a fost activa."""
cur = conn.execute(
"UPDATE api_keys SET active=0, revoked_at=datetime('now') WHERE id=? AND active=1",
(key_id,),
)
return cur.rowcount > 0
def rotate_api_key(conn: sqlite3.Connection, account_id: int) -> str:
"""Revoca toate cheile active ale contului si emite una noua. Intoarce cheia noua."""
conn.execute(
"UPDATE api_keys SET active=0, revoked_at=datetime('now') WHERE account_id=? AND active=1",
(account_id,),
)
return create_api_key(conn, account_id)
def list_keys(conn: sqlite3.Connection, account_id: int | None = None) -> list[dict]:
"""Metadate chei (FARA hash). Pentru CLI list."""
if account_id is not None:
rows = conn.execute(
"SELECT id, account_id, active, created_at, revoked_at FROM api_keys "
"WHERE account_id=? ORDER BY id",
(account_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT id, account_id, active, created_at, revoked_at FROM api_keys ORDER BY id"
).fetchall()
return [dict(r) for r in rows]
def account_for_key(conn: sqlite3.Connection, plaintext: str) -> int | None:
"""account_id pentru o cheie activa, sau None daca lipseste/revocata."""
if not plaintext:
return None
row = conn.execute(
"SELECT account_id FROM api_keys WHERE key_hash=? AND active=1",
(hash_key(plaintext),),
).fetchone()
return int(row["account_id"]) if row else None
def _extract_key(x_api_key: str | None, authorization: str | None) -> str | None:
"""Cheia din `X-API-Key` sau `Authorization: Bearer <key>` (prima are prioritate)."""
if x_api_key and x_api_key.strip():
return x_api_key.strip()
if authorization and authorization.strip():
parts = authorization.strip().split(None, 1)
if len(parts) == 2 and parts[0].lower() == "bearer":
return parts[1].strip()
return None
def resolve_account_id(
x_api_key: str | None = Header(default=None, alias="X-API-Key"),
authorization: str | None = Header(default=None),
) -> int:
"""Dependency FastAPI: contul cererii din cheia API.
- cheie valida -> account_id al cheii
- cheie invalida (prezenta) -> 401 (mereu, indiferent de flag)
- fara cheie + flag off -> cont implicit (id=1), back-compat
- fara cheie + flag on -> 401
"""
settings = get_settings()
plaintext = _extract_key(x_api_key, authorization)
if plaintext is None:
if settings.require_api_key:
raise HTTPException(status_code=401, detail="cheie API lipsa")
return DEFAULT_ACCOUNT_ID
conn = get_connection()
try:
account_id = account_for_key(conn, plaintext)
finally:
conn.close()
if account_id is None:
raise HTTPException(status_code=401, detail="cheie API invalida sau revocata")
return account_id

View File

@@ -22,6 +22,18 @@ class Settings(BaseSettings):
# --- Bază de date --- # --- Bază de date ---
db_path: Path = ROOT / "data" / "autopass.db" db_path: Path = ROOT / "data" / "autopass.db"
# --- Securitate (CORE) ---
# Enforcement auth API-key pe /v1/* protejat. False (dev/test): fara cheie ->
# cont implicit id=1. True (prod): fara cheie valida -> 401. O cheie PREZENTA
# dar invalida da 401 indiferent de flag.
require_api_key: bool = False
# Cheie Fernet pentru criptarea creds RAR efemere in submissions (zero-storage
# at rest). Nesetata -> cheie efemera la runtime (creds nu supravietuiesc
# restartului). In productie seteaz-o persistent. Genereaza:
# python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
creds_key: str | None = None
# --- RAR --- # --- RAR ---
rar_env: str = "test" # "test" | "prod" rar_env: str = "test" # "test" | "prod"
rar_base_url_test: str = "https://apps.rarom.ro/test-rar-autopass" rar_base_url_test: str = "https://apps.rarom.ro/test-rar-autopass"

57
app/crypto.py Normal file
View File

@@ -0,0 +1,57 @@
"""Criptare simetrica pentru credentialele RAR efemere (zero-storage at rest).
Plan sect. 5: parola RAR vine per-cerere, se stocheaza CRIPTATA in submission
pana la primul login reusit pentru cont, apoi se sterge. JWT (30h) acopera
restul trimiterilor. Cheia traieste doar in `AUTOPASS_creds_key` (env), niciodata
in cod sau in DB.
Daca `AUTOPASS_creds_key` nu e setat, generam o cheie EFEMERA la prima folosire:
creds criptate NU supravietuiesc unui restart (acceptabil — modelul e efemer,
ROAAUTO re-trimite). Pentru productie seteaza o cheie persistenta (vezi README/deploy).
"""
from __future__ import annotations
import json
from functools import lru_cache
from cryptography.fernet import Fernet, InvalidToken
from .config import get_settings
@lru_cache
def _fernet() -> Fernet:
key = get_settings().creds_key
if key:
return Fernet(key.encode() if isinstance(key, str) else key)
generated = Fernet.generate_key()
print(
"[crypto] AUTOPASS_creds_key nesetat — cheie efemera generata; creds "
"criptate NU supravietuiesc restartului worker-ului/API-ului",
flush=True,
)
return Fernet(generated)
def reset_cache() -> None:
"""Reseteaza cheia memorata (pentru teste care schimba env-ul)."""
_fernet.cache_clear()
def encrypt_creds(creds: dict) -> str:
"""Cripteaza un dict de creds -> token Fernet (str). Compact, fara spatii."""
blob = json.dumps(creds, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
return _fernet().encrypt(blob).decode("ascii")
def decrypt_creds(token: str | None) -> dict | None:
"""Decripteaza un token Fernet -> dict, sau None daca lipseste/cheie gresita/corupt."""
if not token:
return None
try:
plain = _fernet().decrypt(token.encode("ascii"))
data = json.loads(plain.decode("utf-8"))
return data if isinstance(data, dict) else None
except (InvalidToken, ValueError, TypeError):
return None

View File

@@ -46,6 +46,8 @@ def _migrate(conn: sqlite3.Connection) -> None:
cols = {r["name"] for r in conn.execute("PRAGMA table_info(submissions)").fetchall()} cols = {r["name"] for r in conn.execute("PRAGMA table_info(submissions)").fetchall()}
if "next_attempt_at" not in cols: if "next_attempt_at" not in cols:
conn.execute("ALTER TABLE submissions ADD COLUMN next_attempt_at TEXT") conn.execute("ALTER TABLE submissions ADD COLUMN next_attempt_at TEXT")
if "rar_creds_enc" not in cols:
conn.execute("ALTER TABLE submissions ADD COLUMN rar_creds_enc TEXT")
def _now_iso() -> str: def _now_iso() -> str:

View File

@@ -11,24 +11,41 @@ from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime, timezone from datetime import datetime, timezone
from fastapi import FastAPI from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, PlainTextResponse
from . import __version__ from . import __version__
from .api.v1.router import router as api_v1_router from .api.v1.router import router as api_v1_router
from .config import get_settings from .config import get_settings
from .db import get_connection, init_db, queue_depth, read_heartbeat from .db import get_connection, init_db, queue_depth, read_heartbeat
from .security import install_log_redaction
from .web.routes import router as web_router from .web.routes import router as web_router
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
install_log_redaction()
init_db() init_db()
yield yield
app = FastAPI(title="Gateway RAR AUTOPASS", version=__version__, lifespan=lifespan) app = FastAPI(title="Gateway RAR AUTOPASS", version=__version__, lifespan=lifespan)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
"""422 fara echo de credentiale.
Pydantic include implicit `input` (+ uneori `ctx`) in fiecare eroare — pe
/v1/prezentari asta ar reflecta inapoi `rar_credentials.password`. Pastram
type/loc/msg (clientul stie ce camp e gresit) si DROP-am input/ctx. Defense
in depth pe TOATE rutele, nu doar prezentari.
"""
cleaned = [{"type": e.get("type"), "loc": e.get("loc"), "msg": e.get("msg")} for e in exc.errors()]
return JSONResponse(status_code=422, content={"detail": cleaned})
app.include_router(api_v1_router) app.include_router(api_v1_router)
app.include_router(web_router) app.include_router(web_router)

View File

@@ -15,7 +15,8 @@ class RarCredentials(BaseModel):
"""Credentiale RAR per-cerere (vin de la ROAAUTO din Oracle). NU se stocheaza.""" """Credentiale RAR per-cerere (vin de la ROAAUTO din Oracle). NU se stocheaza."""
email: str email: str
password: str # repr=False: str(creds) / loguri care fac repr pe model NU expun parola.
password: str = Field(..., repr=False)
class PrestatieItem(BaseModel): class PrestatieItem(BaseModel):

View File

@@ -55,6 +55,7 @@ CREATE TABLE IF NOT EXISTS submissions (
status TEXT NOT NULL DEFAULT 'queued' status TEXT NOT NULL DEFAULT 'queued'
CHECK (status IN ('queued','sending','sent','needs_mapping','needs_data','error')), CHECK (status IN ('queued','sending','sent','needs_mapping','needs_data','error')),
payload_json TEXT NOT NULL, -- TODO(P2): inlocuit cu BLOB criptat payload_json TEXT NOT NULL, -- TODO(P2): inlocuit cu BLOB criptat
rar_creds_enc TEXT, -- creds RAR criptate (Fernet), sterse dupa primul login reusit (plan sect.5)
rar_status_code INTEGER, rar_status_code INTEGER,
rar_error TEXT, rar_error TEXT,
id_prezentare INTEGER, -- data.id intors de RAR la succes id_prezentare INTEGER, -- data.id intors de RAR la succes

124
app/security.py Normal file
View File

@@ -0,0 +1,124 @@
"""Redactare credentiale (CORE securitate).
Corpul lui POST /v1/prezentari contine `rar_credentials.password` (creds RAR
per-cerere, zero-storage). Aceste valori NU trebuie sa apara NICIODATA in:
- raspunsuri de eroare (422 Pydantic echo-eaza `input` => parola) — vezi
`app.main.validation_exception_handler`;
- loguri / traceback-uri uvicorn — vezi `CredentialRedactingFilter`;
- repr-ul modelelor (str(creds)) — vezi `RarCredentials` (Field repr=False).
Modulul e pur (fara DB/HTTP), unit-testabil direct.
"""
from __future__ import annotations
import logging
import re
from typing import Any
MASK = "***REDACTED***"
# Chei al caror continut e secret oriunde apar (case-insensitive). `denumire`
# etc. raman in clar — doar credentialele si token-urile se mascheaza.
SENSITIVE_KEYS = frozenset(
{
"password",
"parola",
"pwd",
"pass",
"rar_credentials",
"credentials",
"token",
"jwt",
"authorization",
"api_key",
"apikey",
"x-api-key",
"secret",
}
)
def scrub(obj: Any) -> Any:
"""Copie a structurii cu valorile cheilor sensibile mascate, recursiv.
Pentru `rar_credentials`/`credentials` masheaza intregul subarbore (nu doar
`password`) — un dict de creds e secret integral. Listele si dict-urile se
parcurg in adancime; scalarii trec neatinsi.
"""
if isinstance(obj, dict):
out: dict = {}
for k, v in obj.items():
if isinstance(k, str) and k.lower() in SENSITIVE_KEYS:
out[k] = MASK
else:
out[k] = scrub(v)
return out
if isinstance(obj, (list, tuple)):
return [scrub(v) for v in obj]
return obj
# Mascare in text liber (mesaje de log, traceback-uri formatate). Acopera formele
# uzuale: JSON ("password": "x"), kwargs (password='x'), Bearer <token>.
_TEXT_PATTERNS = [
# "password": "secret" / 'password' : 'secret' (JSON / dict repr)
re.compile(
r"""(?P<key>["']?(?:password|parola|pwd|pass|token|jwt|secret|api[_-]?key)["']?\s*[:=]\s*)"""
r"""(?P<q>["'])(?P<val>(?:\\.|[^"'\\])*)(?P=q)""",
re.IGNORECASE,
),
# password=secret (fara ghilimele, pana la separator)
re.compile(
r"""(?P<key>\b(?:password|parola|pwd|token|jwt|secret|api[_-]?key)\s*=\s*)(?P<val>[^\s,;&)}\]]+)""",
re.IGNORECASE,
),
# Authorization: Bearer <token> / Bearer eyJ...
re.compile(r"""(?P<key>Bearer\s+)(?P<val>[A-Za-z0-9._\-]+)""", re.IGNORECASE),
]
def scrub_text(text: str) -> str:
"""Masheaza credentiale dintr-un sir liber (loguri, traceback)."""
if not text:
return text
out = text
for pat in _TEXT_PATTERNS:
out = pat.sub(lambda m: m.group("key") + MASK, out)
return out
class CredentialRedactingFilter(logging.Filter):
"""Filtru de logging care masheaza credentiale din orice record emis.
Atasat pe root + logger-ele uvicorn (vezi `install_log_redaction`). Lucreaza
pe mesajul DEJA formatat (cu args interpolate), apoi goleste args ca
formatter-ul sa nu reinterpoleze. Tot ce trece prin logging e curatat;
parolele in variabile locale de traceback nu ajung in mesaj (Python nu
formateaza locals implicit), deci raman protejate.
"""
def filter(self, record: logging.LogRecord) -> bool:
try:
msg = record.getMessage()
except Exception:
return True
scrubbed = scrub_text(msg)
if scrubbed != msg:
record.msg = scrubbed
record.args = ()
return True
def install_log_redaction() -> None:
"""Instaleaza filtrul de redactare pe root + logger-ele uvicorn (idempotent)."""
filt = CredentialRedactingFilter()
targets = [
logging.getLogger(), # root
logging.getLogger("uvicorn"),
logging.getLogger("uvicorn.error"),
logging.getLogger("uvicorn.access"),
]
for lg in targets:
if not any(isinstance(f, CredentialRedactingFilter) for f in lg.filters):
lg.addFilter(filt)

View File

@@ -12,8 +12,14 @@ T2 implementat:
- lease/timeout pe randuri 'sending' orfane. - lease/timeout pe randuri 'sending' orfane.
- re-login la token expirat (401 mid-sesiune) — JWT 30h, retry NU plafonat la 30h. - re-login la token expirat (401 mid-sesiune) — JWT 30h, retry NU plafonat la 30h.
Ce NU e inca: livrarea creds per-cerere de la ROAAUTO (in schelet folosim creds <test>), Creds per-cerere (plan sect. 5): fiecare submission poarta creds RAR CRIPTATE
criptare PII at-rest (P2), b64Image mare pe disc (P2). (rar_creds_enc). Worker-ul face login per CONT cu acele creds, cache-uieste JWT
(30h) in memorie si STERGE creds-urile contului dupa primul login reusit. Token-ul
in memorie acopera restul trimiterilor; la restart token-ul se pierde si contul
re-loghează la urmatorul submission care aduce creds proaspete (degradare acceptata).
Dev: `worker_use_test_creds` foloseste creds <test> cand submission-ul nu are enc.
Ce NU e inca: criptare PII payload at-rest (P2), b64Image mare pe disc (P2).
Pornire: python -m app.worker Pornire: python -m app.worker
""" """
@@ -29,8 +35,9 @@ from datetime import datetime, timedelta, timezone
import httpx import httpx
from ..config import Settings, get_settings, load_test_credentials from ..config import Settings, get_settings, load_test_credentials
from ..crypto import decrypt_creds
from ..db import get_connection, init_db, write_heartbeat from ..db import get_connection, init_db, write_heartbeat
from ..mapping import upsert_nomenclator from ..mapping import DEFAULT_ACCOUNT_ID, upsert_nomenclator
from ..payload import build_rar_payload from ..payload import build_rar_payload
from ..reconcile import match_finalizata from ..reconcile import match_finalizata
from ..rar_client import RarAuthError, RarClient, RarError from ..rar_client import RarAuthError, RarClient, RarError
@@ -93,11 +100,15 @@ def requeue_with_backoff(conn, settings: Settings, submission_id: int, *, reason
def claim_one(conn) -> dict | None: def claim_one(conn) -> dict | None:
"""Claim atomic 'queued' -> 'sending', respectand next_attempt_at. Intoarce randul sau None.""" """Claim atomic 'queued' -> 'sending', respectand next_attempt_at. Intoarce randul sau None.
Randul include `account_id` si `rar_creds_enc` (creds RAR criptate) pentru
login-ul per-cont din `run`.
"""
conn.execute("BEGIN IMMEDIATE") conn.execute("BEGIN IMMEDIATE")
try: try:
row = conn.execute( row = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='queued' " "SELECT id, account_id, payload_json, rar_creds_enc FROM submissions WHERE status='queued' "
"AND (next_attempt_at IS NULL OR next_attempt_at <= ?) ORDER BY id LIMIT 1", "AND (next_attempt_at IS NULL OR next_attempt_at <= ?) ORDER BY id LIMIT 1",
(_iso(_now()),), (_iso(_now()),),
).fetchone() ).fetchone()
@@ -110,7 +121,12 @@ def claim_one(conn) -> dict | None:
(row["id"],), (row["id"],),
) )
conn.execute("COMMIT") conn.execute("COMMIT")
return {"id": row["id"], "content": json.loads(row["payload_json"])} return {
"id": row["id"],
"account_id": row["account_id"] if row["account_id"] is not None else DEFAULT_ACCOUNT_ID,
"creds_enc": row["rar_creds_enc"],
"content": json.loads(row["payload_json"]),
}
except Exception: except Exception:
conn.execute("ROLLBACK") conn.execute("ROLLBACK")
raise raise
@@ -174,14 +190,25 @@ def _handle_transient(conn, settings: Settings, rar: RarClient, token: str, sid:
return "requeued" return "requeued"
def recover_orphans(conn, settings: Settings, rar: RarClient, token: str) -> int: def recover_orphans(conn, settings: Settings, rar: RarClient, token: str, account_id: int | None = None) -> int:
"""Randuri 'sending' mai vechi de lease (worker mort mid-POST). Reconciliaza; altfel requeue.""" """Randuri 'sending' mai vechi de lease (worker mort mid-POST). Reconciliaza; altfel requeue.
`account_id` filtreaza la orfanii unui cont (login-ul e per-cont); None = toti
(compat teste / single-account).
"""
cutoff = _iso(_now() - timedelta(seconds=settings.worker_sending_lease_s)) cutoff = _iso(_now() - timedelta(seconds=settings.worker_sending_lease_s))
orphans = conn.execute( if account_id is not None:
"SELECT id, payload_json FROM submissions WHERE status='sending' " orphans = conn.execute(
"AND (sending_since IS NULL OR sending_since <= ?)", "SELECT id, payload_json FROM submissions WHERE status='sending' "
(cutoff,), "AND (sending_since IS NULL OR sending_since <= ?) AND account_id=?",
).fetchall() (cutoff, account_id),
).fetchall()
else:
orphans = conn.execute(
"SELECT id, payload_json FROM submissions WHERE status='sending' "
"AND (sending_since IS NULL OR sending_since <= ?)",
(cutoff,),
).fetchall()
recovered = 0 recovered = 0
for row in orphans: for row in orphans:
sid = row["id"] sid = row["id"]
@@ -213,6 +240,69 @@ def _refresh_nomenclator(conn, rar: RarClient, token: str) -> None:
print(f"[worker] nomenclator refresh esuat (continui): {exc}", flush=True) print(f"[worker] nomenclator refresh esuat (continui): {exc}", flush=True)
class AccountSessions:
"""Sesiuni RAR per cont: login lazy cu creds din submission + cache JWT (30h).
La primul login reusit pentru un cont sterge creds-urile criptate ale contului
(token-ul in memorie acopera restul). Pe 401 mid-sesiune se invalideaza sesiunea
-> re-login la urmatorul submission cu creds.
"""
def __init__(self, settings: Settings):
self.settings = settings
self._sessions: dict[int, tuple[RarClient, str]] = {}
def get_token(self, conn, account_id: int, creds: dict | None) -> str | None:
"""Token valid pentru cont. Login daca lipseste din cache si avem creds; altfel None."""
sess = self._sessions.get(account_id)
if sess is not None:
return sess[1]
if not creds or not creds.get("email") or not creds.get("password"):
return None
rar = RarClient(self.settings)
try:
token = rar.login(creds["email"], creds["password"])
except Exception:
rar.close()
raise
self._sessions[account_id] = (rar, token)
write_heartbeat(conn, rar_login_ok=True, detail=f"login RAR ok (cont {account_id})")
# Creds nu mai sunt necesare: JWT acopera retry-urile -> sterge la rest.
conn.execute(
"UPDATE submissions SET rar_creds_enc=NULL WHERE account_id=? AND rar_creds_enc IS NOT NULL",
(account_id,),
)
# Nomenclator live (autoritativ) la fiecare login proaspat.
_refresh_nomenclator(conn, rar, token)
return token
def rar(self, account_id: int) -> RarClient:
return self._sessions[account_id][0]
def active(self) -> list[tuple[int, RarClient, str]]:
return [(acct, rar, tok) for acct, (rar, tok) in self._sessions.items()]
def invalidate(self, account_id: int) -> None:
sess = self._sessions.pop(account_id, None)
if sess is not None:
sess[0].close()
def close_all(self) -> None:
for rar, _tok in self._sessions.values():
rar.close()
self._sessions.clear()
def _creds_for(claimed: dict, settings: Settings) -> dict | None:
"""Creds pentru un submission: decripteaza enc-ul lipit; altfel creds <test> (dev)."""
creds = decrypt_creds(claimed.get("creds_enc"))
if creds:
return creds
if settings.worker_use_test_creds:
return load_test_credentials()
return None
def run() -> int: def run() -> int:
signal.signal(signal.SIGTERM, _stop) signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop) signal.signal(signal.SIGINT, _stop)
@@ -222,9 +312,7 @@ def run() -> int:
conn = get_connection() conn = get_connection()
print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True) print(f"[worker] pornit (send_enabled={settings.worker_send_enabled}, env={settings.rar_env})", flush=True)
creds = load_test_credentials() if settings.worker_use_test_creds else None sessions = AccountSessions(settings)
rar: RarClient | None = None
token: str | None = None
while _running: while _running:
try: try:
@@ -233,38 +321,49 @@ def run() -> int:
if not settings.worker_send_enabled: if not settings.worker_send_enabled:
time.sleep(settings.worker_poll_interval_s) time.sleep(settings.worker_poll_interval_s)
continue continue
if not creds:
time.sleep(settings.worker_poll_interval_s)
continue
# Login lazy + token cache (JWT 30h).
if rar is None or token is None:
rar = RarClient(settings)
token = rar.login(creds["email"], creds["password"])
write_heartbeat(conn, rar_login_ok=True, detail="login RAR ok")
# Refresh nomenclator live (autoritativ) la fiecare login proaspat —
# alimenteaza fuzzy lookup-ul din editorul de mapari.
_refresh_nomenclator(conn, rar, token)
recover_orphans(conn, settings, rar, token)
claimed = claim_one(conn) claimed = claim_one(conn)
if claimed is None: if claimed is None:
# Nimic de trimis: recupereaza orfanii conturilor deja logate.
for acct, rar, tok in sessions.active():
recover_orphans(conn, settings, rar, tok, account_id=acct)
time.sleep(settings.worker_poll_interval_s) time.sleep(settings.worker_poll_interval_s)
continue continue
process_one(conn, settings, rar, token, claimed) sid = claimed["id"]
account_id = claimed["account_id"]
creds = _creds_for(claimed, settings)
try:
token = sessions.get_token(conn, account_id, creds)
except RarAuthError as exc:
# Creds gresite (login 401): NU se face retry (plan, failure registry).
mark(conn, sid, "error", rar_status_code=401, rar_error="credentiale RAR invalide")
print(f"[worker] submission {sid} (cont {account_id}) -> error: {exc}", flush=True)
continue
if token is None:
# Fara creds disponibile (token pierdut la restart + creds sterse).
# Re-pune in coada cu backoff; ROAAUTO re-trimite creds proaspete.
requeue_with_backoff(conn, settings, sid, reason="creds RAR indisponibile (astept re-trimitere)")
continue
rar = sessions.rar(account_id)
# Recupereaza orfanii contului inainte de trimitere (acelasi token).
recover_orphans(conn, settings, rar, token, account_id=account_id)
try:
process_one(conn, settings, rar, token, claimed)
except RarAuthError as exc:
# Token expirat mid-sesiune: invalideaza sesiunea, re-pune randul.
print(f"[worker] cont {account_id} token expirat: {exc}; re-login data viitoare", flush=True)
sessions.invalidate(account_id)
requeue_with_backoff(conn, settings, sid, reason="token RAR expirat")
except RarAuthError as exc:
print(f"[worker] login esuat / token expirat: {exc}", flush=True)
token = None # forteaza re-login (acopera si expirarea JWT la 30h)
time.sleep(settings.worker_poll_interval_s)
except Exception as exc: # noqa: BLE001 — loop top-level: o eroare punctuala nu opreste worker-ul except Exception as exc: # noqa: BLE001 — loop top-level: o eroare punctuala nu opreste worker-ul
print(f"[worker] eroare neasteptata: {exc}", flush=True) print(f"[worker] eroare neasteptata: {exc}", flush=True)
time.sleep(settings.worker_poll_interval_s) time.sleep(settings.worker_poll_interval_s)
if rar is not None: sessions.close_all()
rar.close()
conn.close() conn.close()
print("[worker] oprit curat", flush=True) print("[worker] oprit curat", flush=True)
return 0 return 0

48
app/worker/healthcheck.py Normal file
View File

@@ -0,0 +1,48 @@
"""Liveness probe pentru worker (T6) — folosit de healthcheck-ul Docker.
Worker-ul nu e server HTTP, deci `restart: always` prinde doar procesul MORT,
nu si worker-ul AGATAT (proces viu care nu mai bate heartbeat). Acest probe
citeste `worker_heartbeat` din DB si pica daca ultimul beat e mai vechi decat
`worker_heartbeat_stale_s` -> Docker restarteaza containerul worker.
Utilizare (compose healthcheck): python -m app.worker.healthcheck
Exit 0 = sanatos, 1 = invechit/lipsa.
"""
from __future__ import annotations
import sys
from datetime import datetime, timezone
from ..config import Settings, get_settings
from ..db import get_connection, read_heartbeat
def worker_healthy(conn, settings: Settings, *, now: datetime | None = None) -> bool:
"""True daca ultimul heartbeat e mai proaspat decat pragul de invechire."""
hb = read_heartbeat(conn)
if hb is None or not hb["last_beat"]:
return False
try:
last = datetime.fromisoformat(hb["last_beat"])
except (ValueError, TypeError):
return False
now = now or datetime.now(timezone.utc)
return (now - last).total_seconds() <= settings.worker_heartbeat_stale_s
def main() -> int:
settings = get_settings()
conn = get_connection()
try:
ok = worker_healthy(conn, settings)
finally:
conn.close()
if not ok:
print("[healthcheck] worker invechit sau nepornit", flush=True)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,5 +1,10 @@
# Gateway RAR AUTOPASS — un container API + un container worker, acelasi image, # Gateway RAR AUTOPASS — un container API + un container worker, acelasi image,
# acelasi volum SQLite persistent (plan.md sect. 4 + 9). restart: always pe ambele. # acelasi volum SQLite persistent (plan.md sect. 4 + 9). restart: always pe ambele.
#
# CRITIC: AUTOPASS_CREDS_KEY trebuie PARTAJATA intre api si worker — API cripteaza
# creds-urile RAR, worker-ul le decripteaza. Chei diferite -> worker nu poate
# decripta -> submission-uri blocate "creds indisponibile". Seteaz-o in .env
# (vezi .env.example): compose o citeste automat. Lipsa -> compose pica explicit.
services: services:
api: api:
build: . build: .
@@ -11,6 +16,8 @@ services:
environment: environment:
AUTOPASS_DB_PATH: /data/autopass.db AUTOPASS_DB_PATH: /data/autopass.db
AUTOPASS_RAR_ENV: test AUTOPASS_RAR_ENV: test
AUTOPASS_CREDS_KEY: ${AUTOPASS_CREDS_KEY:?seteaza AUTOPASS_CREDS_KEY in .env (vezi .env.example)}
AUTOPASS_REQUIRE_API_KEY: ${AUTOPASS_REQUIRE_API_KEY:-false}
restart: always restart: always
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8000/healthz').status==200 else 1)"] test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8000/healthz').status==200 else 1)"]
@@ -26,11 +33,35 @@ services:
environment: environment:
AUTOPASS_DB_PATH: /data/autopass.db AUTOPASS_DB_PATH: /data/autopass.db
AUTOPASS_RAR_ENV: test AUTOPASS_RAR_ENV: test
AUTOPASS_CREDS_KEY: ${AUTOPASS_CREDS_KEY:?seteaza AUTOPASS_CREDS_KEY in .env (vezi .env.example)}
# Send dezactivat by default; activeaza pentru proba end-to-end. # Send dezactivat by default; activeaza pentru proba end-to-end.
AUTOPASS_WORKER_SEND_ENABLED: "false" AUTOPASS_WORKER_SEND_ENABLED: "false"
restart: always restart: always
depends_on: depends_on:
- api - api
# T6: probe pe heartbeat-ul din DB — prinde worker-ul AGATAT (proces viu, beat
# invechit), pe care restart:always singur nu-l vede. start_period acopera bootul.
# ATENTIE: in compose simplu, "unhealthy" doar marcheaza containerul — NU il
# restarteaza (restart:always reactioneaza la EXIT). Sidecar-ul `autoheal` de
# mai jos vede label-ul si chiar restarteaza worker-ul cand pica probe-ul.
labels:
autoheal: "true"
healthcheck:
test: ["CMD", "python", "-m", "app.worker.healthcheck"]
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
# Restarteaza orice container marcat unhealthy cu label autoheal=true (worker-ul
# agatat). Alternativa: Docker Swarm (restart on unhealthy nativ).
autoheal:
image: willfarrell/autoheal:latest
restart: always
environment:
AUTOHEAL_CONTAINER_LABEL: autoheal
volumes:
- /var/run/docker.sock:/var/run/docker.sock
volumes: volumes:
autopass-data: autopass-data:

View File

@@ -207,9 +207,29 @@ Nimic din cod nu e scris încă (`app/`, `tools/` nu există). Ordine recomandat
retry/backoff exponential (peste `worker_max_retries` → error+banner), re-login la token expirat. Rută monitorizare descoperită retry/backoff exponential (peste `worker_max_retries` → error+banner), re-login la token expirat. Rută monitorizare descoperită
live: `GET /prezentari/getAllPrezentariFinalizate``data.content` (filtrele nu merg pe test → fetch tot, match client-side). live: `GET /prezentari/getAllPrezentariFinalizate``data.content` (filtrele nu merg pe test → fetch tot, match client-side).
Verify: 15 teste (`tests/test_worker_reconcile.py`) + validare LIVE (reconciliere record 68514 din finalizate reale). Verify: 15 teste (`tests/test_worker_reconcile.py`) + validare LIVE (reconciliere record 68514 din finalizate reale).
- [ ] **T6 (P2) — worker proces/container propriu supravegheat;** `/healthz` pică → restart. Verify: worker omorât → restart automat. - [x] **Securitate CORE (P1)** ✅ 2026-06-15. `app/security.py` (redactare creds) + `app/auth.py` (API-key per cont) +
- [ ] **T7 (P2) — deploy:** SQLite pe volum persistent numit + backup (singura copie durabilă, re-push scos). `tools/apikey.py` (CLI emitere/rotire/revocare). Redactare: handler `RequestValidationError` care DROP-ează
Verify: recreare container → coada supraviețuiește. `input`/`ctx` din 422 (vectorul de scurgere a parolei pe `/v1/prezentari`), filtru logging `scrub_text` pe root+uvicorn,
`password` cu `repr=False` în model. Auth: hash SHA-256 în `api_keys` (cheia în clar emisă o singură dată), header
`X-API-Key` / `Authorization: Bearer`, enforcement pe flag `AUTOPASS_require_api_key` (prod on→401, dev off→cont default
id=1; cheie prezentă invalidă→401 mereu). `account_id` real curge din cheie în ingestie + mapare. Verify: 16 teste
(`tests/test_security.py`).
- [x] **Livrare creds per-cerere (P1)** ✅ 2026-06-15. `app/crypto.py` (Fernet, cheie din `AUTOPASS_creds_key`; nesetată →
cheie efemeră la runtime). Creds RAR criptate per submission (`submissions.rar_creds_enc`) la ingestie — niciodată în
clar în DB. Worker: `AccountSessions` face login PER CONT cu creds decriptate, cache JWT 30h în memorie, ȘTERGE creds-urile
contului după primul login reușit (token-ul acoperă restul). Fallback creds `<test>` în dev. 401 creds greșite → error fără
retry; token expirat → invalidare sesiune + requeue; fără creds (restart) → requeue „indisponibile" (ROAAUTO re-trimite).
Verify: 10 teste (`tests/test_creds_delivery.py`). **Risc acceptat:** la restart token+creds se pierd → contul re-loghează
la următorul submission cu creds (degradare per modelul efemer).
- [x] **T6 (P2) — worker supravegheat** ✅ 2026-06-15. `app/worker/healthcheck.py` (probe pe heartbeat-ul din DB: beat
mai vechi de `worker_heartbeat_stale_s` → exit 1) cablat în compose ca healthcheck pe serviciul worker. Prinde worker-ul
AGĂȚAT (proces viu, beat înghețat), pe care `restart:always` (doar la EXIT) nu-l vede. Sidecar `autoheal` restartează
efectiv containerul marcat unhealthy (compose simplu doar marchează, nu restartează). Verify: 3 teste (`tests/test_deploy.py`).
- [x] **T7 (P2) — deploy** ✅ 2026-06-15. `tools/backup.py` (backup ONLINE via `Connection.backup` — WAL nu se copiază sigur
cu `cp`; `--keep N` rotește snapshot-urile) + volum SQLite persistent numit (`autopass-data`, deja în compose). `.env.example`
documentează env-urile. **Fix critic descoperit la split-ul în 2 containere:** `AUTOPASS_CREDS_KEY` trebuie PARTAJATĂ
api↔worker (altfel worker nu decriptează creds) — acum impusă în compose (`${...:?}` → fail explicit dacă lipsește).
Verify: 2 teste (`tests/test_deploy.py`).
- [ ] **Dashboard** (Jinja2+HTMX) cu stările empty/error/RAR-indisponibil + banner alertă. Apoi `/design-review` pe UI-ul live. - [ ] **Dashboard** (Jinja2+HTMX) cu stările empty/error/RAR-indisponibil + banner alertă. Apoi `/design-review` pe UI-ul live.
### De decis ulterior (urmărit, nu blocant) ### De decis ulterior (urmărit, nu blocant)

View File

@@ -9,6 +9,8 @@ pydantic-settings==2.*
python-multipart==0.0.* python-multipart==0.0.*
# Fuzzy lookup pentru editorul de mapari operatii (app/mapping.py). Pur Python/C, fara build extern. # Fuzzy lookup pentru editorul de mapari operatii (app/mapping.py). Pur Python/C, fara build extern.
rapidfuzz==3.14.5 rapidfuzz==3.14.5
# Criptare creds RAR efemere in submissions (app/crypto.py, Fernet). Zero-storage at rest.
cryptography==46.0.5
# Migrare DBF (tools/import_dbf.py). Necesar doar pentru import optional, nu pentru runtime. # Migrare DBF (tools/import_dbf.py). Necesar doar pentru import optional, nu pentru runtime.
dbfread==2.0.7 dbfread==2.0.7

View File

@@ -0,0 +1,252 @@
"""Teste livrare creds per-cerere: criptare efemera + sesiuni worker per-cont.
Acopera: round-trip crypto, stocarea creds criptate la ingestie (niciodata in
clar), login per-cont cu stergere creds dupa primul login, fallback creds <test>,
401 creds gresite -> error fara retry.
"""
from __future__ import annotations
import json
import os
import tempfile
import pytest
from cryptography.fernet import Fernet
from fastapi.testclient import TestClient
from app.rar_client import RarAuthError
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
from app.config import get_settings
from app import crypto
get_settings.cache_clear()
crypto.reset_cache()
from app.db import get_connection, init_db
init_db()
yield monkeypatch
get_settings.cache_clear()
crypto.reset_cache()
# --------------------------------------------------------------------------- #
# Crypto round-trip #
# --------------------------------------------------------------------------- #
def test_crypto_roundtrip(env):
from app.crypto import decrypt_creds, encrypt_creds
creds = {"email": "x@y.ro", "password": "HUNTER2"}
tok = encrypt_creds(creds)
assert "HUNTER2" not in tok # criptat, nu in clar
assert decrypt_creds(tok) == creds
def test_crypto_bad_token_returns_none(env):
from app.crypto import decrypt_creds
assert decrypt_creds(None) is None
assert decrypt_creds("") is None
assert decrypt_creds("garbage-not-a-token") is None
def test_crypto_wrong_key_returns_none(env, monkeypatch):
from app import crypto
from app.crypto import encrypt_creds
tok = encrypt_creds({"email": "a", "password": "b"})
# Rotim cheia -> token-ul vechi nu se mai decripteaza (degradare acceptata).
monkeypatch.setenv("AUTOPASS_CREDS_KEY", Fernet.generate_key().decode())
from app.config import get_settings
get_settings.cache_clear()
crypto.reset_cache()
assert crypto.decrypt_creds(tok) is None
# --------------------------------------------------------------------------- #
# Ingestie: creds stocate criptat, niciodata in clar #
# --------------------------------------------------------------------------- #
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "SECRETPW"}, "prezentari": [prez]}
def test_ingestie_stocheaza_creds_criptate(env):
from app.crypto import decrypt_creds
from app.db import get_connection
from app.main import app
with TestClient(app) as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 200
sid = r.json()["results"][0]["submission_id"]
conn = get_connection()
try:
row = conn.execute("SELECT rar_creds_enc, payload_json FROM submissions WHERE id=?", (sid,)).fetchone()
finally:
conn.close()
# Creds criptate prezente, dar parola NU apare in clar nicaieri in rand.
assert row["rar_creds_enc"]
assert "SECRETPW" not in row["rar_creds_enc"]
assert "SECRETPW" not in row["payload_json"]
assert decrypt_creds(row["rar_creds_enc"]) == {"email": "x@y.ro", "password": "SECRETPW"}
# --------------------------------------------------------------------------- #
# Worker: sesiuni per-cont #
# --------------------------------------------------------------------------- #
class FakeRarClient:
"""Stub RarClient: login intoarce token determinist, get_nomenclator gol."""
made: list = []
def __init__(self, settings=None, login_exc=None):
self.closed = False
self.login_calls = 0
self._login_exc = login_exc
FakeRarClient.made.append(self)
def login(self, email, password):
self.login_calls += 1
if self._login_exc is not None:
raise self._login_exc
return f"TOK-{email}"
def get_nomenclator(self, token):
return []
def close(self):
self.closed = True
def _insert(conn, account_id=1, creds_enc=None, status="queued"):
content = {
"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1",
"data_prestatie": "2026-06-15", "odometru_final": "1",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
cur = conn.execute(
"INSERT INTO submissions (idempotency_key, account_id, status, payload_json, rar_creds_enc) "
"VALUES (?, ?, ?, ?, ?)",
(f"k-{os.urandom(4).hex()}", account_id, status, json.dumps(content), creds_enc),
)
return int(cur.lastrowid)
def test_get_token_login_clears_creds(env, monkeypatch):
import app.worker.__main__ as w
from app.crypto import encrypt_creds
from app.db import get_connection
FakeRarClient.made = []
monkeypatch.setattr(w, "RarClient", FakeRarClient)
conn = get_connection()
try:
enc = encrypt_creds({"email": "a@b.ro", "password": "p"})
s1 = _insert(conn, account_id=1, creds_enc=enc)
s2 = _insert(conn, account_id=1, creds_enc=enc)
sessions = w.AccountSessions(w.get_settings())
token = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
assert token == "TOK-a@b.ro"
# Creds sterse pentru tot contul dupa primul login.
for sid in (s1, s2):
assert conn.execute("SELECT rar_creds_enc FROM submissions WHERE id=?", (sid,)).fetchone()["rar_creds_enc"] is None
# Token cache-uit: al doilea apel NU re-loghează.
assert sessions.get_token(conn, 1, None) == "TOK-a@b.ro"
assert FakeRarClient.made[0].login_calls == 1
finally:
conn.close()
def test_get_token_no_creds_returns_none(env, monkeypatch):
import app.worker.__main__ as w
from app.db import get_connection
monkeypatch.setattr(w, "RarClient", FakeRarClient)
conn = get_connection()
try:
sessions = w.AccountSessions(w.get_settings())
assert sessions.get_token(conn, 1, None) is None
finally:
conn.close()
def test_get_token_bad_creds_raises(env, monkeypatch):
import app.worker.__main__ as w
from app.db import get_connection
def _factory(settings=None):
return FakeRarClient(settings, login_exc=RarAuthError("Credentiale RAR invalide", status_code=401))
monkeypatch.setattr(w, "RarClient", _factory)
conn = get_connection()
try:
sessions = w.AccountSessions(w.get_settings())
with pytest.raises(RarAuthError):
sessions.get_token(conn, 1, {"email": "a", "password": "x"})
finally:
conn.close()
def test_per_account_isolation(env, monkeypatch):
import app.worker.__main__ as w
from app.db import get_connection
FakeRarClient.made = []
monkeypatch.setattr(w, "RarClient", FakeRarClient)
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'doi')")
sessions = w.AccountSessions(w.get_settings())
t1 = sessions.get_token(conn, 1, {"email": "a@b.ro", "password": "p"})
t2 = sessions.get_token(conn, 2, {"email": "c@d.ro", "password": "q"})
assert t1 == "TOK-a@b.ro" and t2 == "TOK-c@d.ro"
assert len(sessions.active()) == 2
sessions.invalidate(1)
assert len(sessions.active()) == 1
assert FakeRarClient.made[0].closed is True
finally:
conn.close()
def test_creds_for_fallback_test_creds(env, monkeypatch):
import app.worker.__main__ as w
# Fara enc + flag test on -> creds <test>.
monkeypatch.setattr(w, "load_test_credentials", lambda: {"email": "t@test", "password": "tp"})
settings = w.get_settings()
object.__setattr__(settings, "worker_use_test_creds", True)
assert w._creds_for({"creds_enc": None}, settings) == {"email": "t@test", "password": "tp"}
# Flag off -> None.
object.__setattr__(settings, "worker_use_test_creds", False)
assert w._creds_for({"creds_enc": None}, settings) is None
def test_creds_for_prefers_enc(env):
import app.worker.__main__ as w
from app.crypto import encrypt_creds
enc = encrypt_creds({"email": "real@x", "password": "rp"})
settings = w.get_settings()
assert w._creds_for({"creds_enc": enc}, settings) == {"email": "real@x", "password": "rp"}

117
tests/test_deploy.py Normal file
View File

@@ -0,0 +1,117 @@
"""Teste T6/T7: liveness probe worker + backup online SQLite."""
from __future__ import annotations
import os
import sqlite3
import tempfile
from datetime import datetime, timedelta, timezone
import pytest
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
from app.db import get_connection, init_db
init_db()
yield get_connection, get_settings()
get_settings.cache_clear()
# --------------------------------------------------------------------------- #
# T6 — worker liveness probe #
# --------------------------------------------------------------------------- #
def _set_beat(conn, dt: datetime | None):
conn.execute(
"UPDATE worker_heartbeat SET last_beat=? WHERE id=1",
(dt.isoformat(timespec="seconds") if dt else None,),
)
def test_healthy_when_beat_fresh(env):
from app.worker.healthcheck import worker_healthy
get_connection, settings = env
conn = get_connection()
try:
_set_beat(conn, datetime.now(timezone.utc))
assert worker_healthy(conn, settings) is True
finally:
conn.close()
def test_unhealthy_when_beat_stale(env):
from app.worker.healthcheck import worker_healthy
get_connection, settings = env
conn = get_connection()
try:
stale = datetime.now(timezone.utc) - timedelta(seconds=settings.worker_heartbeat_stale_s + 60)
_set_beat(conn, stale)
assert worker_healthy(conn, settings) is False
finally:
conn.close()
def test_unhealthy_when_never_started(env):
from app.worker.healthcheck import worker_healthy
get_connection, settings = env
conn = get_connection()
try:
_set_beat(conn, None) # never started
assert worker_healthy(conn, settings) is False
finally:
conn.close()
# --------------------------------------------------------------------------- #
# T7 — backup online #
# --------------------------------------------------------------------------- #
def test_backup_db_roundtrip(env):
from tools.backup import backup_db
get_connection, settings = env
conn = get_connection()
try:
conn.execute(
"INSERT INTO submissions (idempotency_key, status, payload_json) VALUES ('bk1','queued','{}')"
)
finally:
conn.close()
dest = settings.db_path.parent / "backups" / "snap.db"
out = backup_db(settings.db_path, dest)
assert out.exists()
# Copia contine randul scris (backup consistent, nu fisier gol).
bk = sqlite3.connect(out)
try:
n = bk.execute("SELECT COUNT(*) FROM submissions WHERE idempotency_key='bk1'").fetchone()[0]
finally:
bk.close()
assert n == 1
def test_backup_prune_keeps_newest(env):
from tools.backup import prune
get_connection, settings = env
bdir = settings.db_path.parent / "backups"
bdir.mkdir(parents=True, exist_ok=True)
# Creeaza 5 snapshot-uri cu nume-timestamp crescator.
names = [f"autopass-2026010{i}-000000.db" for i in range(1, 6)]
for n in names:
(bdir / n).write_bytes(b"x")
removed = prune(bdir, keep=2)
remaining = sorted(p.name for p in bdir.glob("autopass-*.db"))
assert len(remaining) == 2
assert remaining == names[-2:] # cele mai noi doua
assert len(removed) == 3

241
tests/test_security.py Normal file
View File

@@ -0,0 +1,241 @@
"""Teste CORE securitate: redactare creds (422/log/repr) + auth API-key."""
from __future__ import annotations
import logging
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
from app.security import (
MASK,
CredentialRedactingFilter,
scrub,
scrub_text,
)
# --------------------------------------------------------------------------- #
# Redactare — pur (scrub / scrub_text / repr) #
# --------------------------------------------------------------------------- #
def test_scrub_masks_password_keys():
data = {"email": "x@y.ro", "rar_credentials": {"email": "a", "password": "SECRET"}, "vin": "ABC"}
out = scrub(data)
assert out["rar_credentials"] == MASK # subarbore creds mascat integral
assert out["vin"] == "ABC" # date non-sensibile neatinse
assert "SECRET" not in repr(out)
def test_scrub_nested_password_in_list():
data = {"items": [{"password": "p1"}, {"denumire": "ok"}]}
out = scrub(data)
assert out["items"][0]["password"] == MASK
assert out["items"][1]["denumire"] == "ok"
def test_scrub_text_json_form():
s = '{"email": "x@y.ro", "password": "hunter2"}'
assert "hunter2" not in scrub_text(s)
assert MASK in scrub_text(s)
def test_scrub_text_kwargs_and_bearer():
assert "topsecret" not in scrub_text("login password=topsecret done")
assert "eyJabc.def" not in scrub_text("Authorization: Bearer eyJabc.def")
def test_scrub_text_keeps_innocent():
s = "submission 5 -> queued (vin=ABC123)"
assert scrub_text(s) == s
def test_credentials_repr_hides_password():
from app.models import RarCredentials
c = RarCredentials(email="x@y.ro", password="hunter2")
assert "hunter2" not in repr(c)
assert c.password == "hunter2" # valoarea ramane accesibila in cod
def test_logging_filter_redacts(caplog):
logger = logging.getLogger("test.redact")
logger.addFilter(CredentialRedactingFilter())
with caplog.at_level(logging.INFO, logger="test.redact"):
logger.info('payload {"password": "hunter2"}')
assert "hunter2" not in caplog.text
assert MASK in caplog.text
# --------------------------------------------------------------------------- #
# Integrare API — 422 fara echo creds + auth API-key #
# --------------------------------------------------------------------------- #
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
yield monkeypatch
get_settings.cache_clear()
def _client():
from app.main import app
return TestClient(app)
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "HUNTER2SECRET"}, "prezentari": [prez]}
def test_422_does_not_echo_password(env):
with _client() as c:
# Lipseste odometru_final -> 422 de shape; parola NU trebuie sa apara in raspuns.
bad = _body()
del bad["prezentari"][0]["odometru_final"]
r = c.post("/v1/prezentari", json=bad)
assert r.status_code == 422
assert "HUNTER2SECRET" not in r.text
# Pastram totusi info utila clientului (ce camp lipseste).
assert any("odometru_final" in str(e.get("loc")) for e in r.json()["detail"])
# input/ctx eliminate (vectorul de scurgere).
assert all("input" not in e for e in r.json()["detail"])
def test_422_missing_credentials_no_leak(env):
with _client() as c:
bad = _body()
bad["rar_credentials"]["password"] = 12345 # tip gresit -> 422 cu input=parola
r = c.post("/v1/prezentari", json=bad)
assert r.status_code == 422
assert "12345" not in r.text
def test_dev_no_key_uses_default_account(env):
# Flag off (default): fara cheie -> cont implicit, merge ca inainte.
with _client() as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 200
assert r.json()["results"][0]["status"] == "queued"
def test_prod_requires_key(env):
env.setenv("AUTOPASS_REQUIRE_API_KEY", "true")
from app.config import get_settings
get_settings.cache_clear()
with _client() as c:
r = c.post("/v1/prezentari", json=_body())
assert r.status_code == 401
# Cu cheie valida emisa via lifecycle -> trece.
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
key = create_api_key(conn, 1)
finally:
conn.close()
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": key})
assert r2.status_code == 200
assert r2.json()["results"][0]["status"] == "queued"
def test_invalid_key_rejected_even_in_dev(env):
# Flag off, dar o cheie PREZENTA si gresita nu trebuie sa treaca tacit.
with _client() as c:
r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": "rfak_gresita"})
assert r.status_code == 401
def test_bearer_header_accepted(env):
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
key = create_api_key(conn, 1)
finally:
conn.close()
r = c.post("/v1/prezentari", json=_body(), headers={"Authorization": f"Bearer {key}"})
assert r.status_code == 200
def test_key_account_routes_idempotency(env):
# Doua conturi, aceeasi prezentare -> submission-uri distincte (account_id in cheie).
with _client() as c:
from app.auth import create_api_key
from app.db import get_connection
conn = get_connection()
try:
conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')")
k1 = create_api_key(conn, 1)
k2 = create_api_key(conn, 2)
finally:
conn.close()
r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1})
r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k2})
id1 = r1.json()["results"][0]["submission_id"]
res2 = r2.json()["results"][0]
assert res2["submission_id"] != id1
assert res2["deduped"] is False
# --------------------------------------------------------------------------- #
# Lifecycle chei — create / rotate / revoke #
# --------------------------------------------------------------------------- #
def test_key_lifecycle(env):
from app.auth import account_for_key, create_api_key, hash_key, revoke_api_key, rotate_api_key
from app.db import get_connection, init_db
init_db()
conn = get_connection()
try:
key = create_api_key(conn, 1)
assert key.startswith("rfak_")
assert account_for_key(conn, key) == 1
# Rotire: cheia veche moare, una noua o inlocuieste.
new_key = rotate_api_key(conn, 1)
assert account_for_key(conn, key) is None
assert account_for_key(conn, new_key) == 1
# Revocare directa dupa id.
kid = conn.execute(
"SELECT id FROM api_keys WHERE key_hash=?", (hash_key(new_key),)
).fetchone()["id"]
assert revoke_api_key(conn, kid) is True
assert account_for_key(conn, new_key) is None
assert revoke_api_key(conn, kid) is False # deja revocata
finally:
conn.close()
def test_create_key_unknown_account(env):
from app.auth import create_api_key
from app.db import get_connection, init_db
init_db()
conn = get_connection()
try:
with pytest.raises(ValueError):
create_api_key(conn, 999)
finally:
conn.close()

91
tools/apikey.py Normal file
View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""CLI lifecycle chei API (CORE securitate).
Emitere/rotire/revocare/listare chei per cont. Adminul ruleaza pe masina gateway
— nicio suprafata HTTP de admin. Cheia in clar se afiseaza O SINGURA DATA la
creare/rotire; in DB traieste doar hash-ul SHA-256.
Utilizare:
python -m tools.apikey create --account 1
python -m tools.apikey rotate --account 1
python -m tools.apikey revoke --key-id 3
python -m tools.apikey list [--account 1]
"""
from __future__ import annotations
import argparse
import sys
from app.auth import create_api_key, list_keys, revoke_api_key, rotate_api_key
from app.db import get_connection, init_db
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Lifecycle chei API gateway RAR AUTOPASS")
sub = parser.add_subparsers(dest="cmd", required=True)
p_create = sub.add_parser("create", help="emite o cheie noua pentru cont")
p_create.add_argument("--account", type=int, required=True, help="account_id")
p_rotate = sub.add_parser("rotate", help="revoca cheile active ale contului + emite una noua")
p_rotate.add_argument("--account", type=int, required=True, help="account_id")
p_revoke = sub.add_parser("revoke", help="revoca o cheie dupa id")
p_revoke.add_argument("--key-id", type=int, required=True, help="api_keys.id")
p_list = sub.add_parser("list", help="listeaza chei (fara hash)")
p_list.add_argument("--account", type=int, default=None, help="filtreaza pe cont")
args = parser.parse_args(argv)
init_db() # asigura schema (api_keys) + cont default
conn = get_connection()
try:
if args.cmd == "create":
try:
key = create_api_key(conn, args.account)
except ValueError as exc:
print(f"eroare: {exc}", file=sys.stderr)
return 2
print(f"Cheie creata pentru cont {args.account} (pastreaz-o, nu se mai afiseaza):")
print(key)
return 0
if args.cmd == "rotate":
try:
key = rotate_api_key(conn, args.account)
except ValueError as exc:
print(f"eroare: {exc}", file=sys.stderr)
return 2
print(f"Chei vechi revocate. Cheie noua pentru cont {args.account}:")
print(key)
return 0
if args.cmd == "revoke":
ok = revoke_api_key(conn, args.key_id)
if ok:
print(f"Cheie {args.key_id} revocata.")
return 0
print(f"Cheie {args.key_id} inexistenta sau deja revocata.", file=sys.stderr)
return 1
if args.cmd == "list":
rows = list_keys(conn, args.account)
if not rows:
print("(nicio cheie)")
return 0
print(f"{'id':>4} {'cont':>4} {'activa':>6} {'creata':<20} revocata")
for r in rows:
print(
f"{r['id']:>4} {r['account_id']:>4} {('da' if r['active'] else 'nu'):>6} "
f"{(r['created_at'] or ''):<20} {r['revoked_at'] or ''}"
)
return 0
finally:
conn.close()
return 0
if __name__ == "__main__":
sys.exit(main())

79
tools/backup.py Normal file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""Backup online SQLite (T7) — copie consistenta a bazei pe volum persistent.
SQLite in WAL NU se copiaza sigur cu `cp` (WAL-ul poate avea tranzactii necheckpoint-ate).
Folosim API-ul `Connection.backup` (online, consistent, fara oprirea worker-ului).
Utilizare:
python -m tools.backup # -> <db_dir>/backups/autopass-YYYYMMDD-HHMMSS.db
python -m tools.backup --out /path/snap.db # destinatie explicita
python -m tools.backup --keep 14 # pastreaza ultimele 14, sterge restul
Recomandat: rulat dintr-un cron pe gazda (ex. zilnic), tinta pe volum/montaj separat.
"""
from __future__ import annotations
import argparse
import sqlite3
import sys
from datetime import datetime, timezone
from pathlib import Path
from app.config import get_settings
def backup_db(src: Path, dest: Path) -> Path:
"""Backup online (consistent) din `src` in `dest`. Intoarce `dest`."""
dest.parent.mkdir(parents=True, exist_ok=True)
src_conn = sqlite3.connect(src)
try:
dst_conn = sqlite3.connect(dest)
try:
src_conn.backup(dst_conn)
finally:
dst_conn.close()
finally:
src_conn.close()
return dest
def prune(backup_dir: Path, keep: int) -> list[Path]:
"""Pastreaza cele mai noi `keep` snapshot-uri (dupa nume = timestamp), sterge restul."""
snaps = sorted(backup_dir.glob("autopass-*.db"), reverse=True)
removed = []
for old in snaps[keep:]:
old.unlink(missing_ok=True)
removed.append(old)
return removed
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Backup online SQLite gateway RAR AUTOPASS")
parser.add_argument("--out", type=Path, default=None, help="destinatie explicita (default: <db_dir>/backups/)")
parser.add_argument("--keep", type=int, default=0, help="pastreaza ultimele N snapshot-uri (0 = nu sterge)")
args = parser.parse_args(argv)
src = get_settings().db_path
if not src.exists():
print(f"eroare: baza {src} nu exista", file=sys.stderr)
return 1
if args.out is not None:
dest = args.out
else:
stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
dest = src.parent / "backups" / f"autopass-{stamp}.db"
backup_db(src, dest)
print(f"backup -> {dest} ({dest.stat().st_size} bytes)")
if args.keep > 0:
removed = prune(dest.parent, args.keep)
if removed:
print(f"sterse {len(removed)} snapshot-uri vechi (pastrez {args.keep})")
return 0
if __name__ == "__main__":
sys.exit(main())