O cheie AUTOPASS_CREDS_KEY setata dar invalida (format Fernet gresit) arunca ValueError abia la primul encrypt_creds -> 500 brut pe POST /v1/prezentari, fara mesaj util (cazul reprodus din client VFP). crypto.validate_creds_key() valideaza cheia, apelata in main.lifespan: o cheie invalida opreste pornirea cu mesaj clar + comanda de generare, in loc sa explodeze la prima cerere. Cheie nesetata = OK (model efemer). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
80 lines
2.9 KiB
Python
80 lines
2.9 KiB
Python
"""Criptare simetrica pentru credentialele RAR efemere (zero-storage at rest).
|
|
|
|
Plan sect. 5: parola RAR vine per-cerere, se stocheaza CRIPTATA in submission
|
|
pana la primul login reusit pentru cont, apoi se sterge. JWT (30h) acopera
|
|
restul trimiterilor. Cheia traieste doar in `AUTOPASS_creds_key` (env), niciodata
|
|
in cod sau in DB.
|
|
|
|
Daca `AUTOPASS_creds_key` nu e setat, generam o cheie EFEMERA la prima folosire:
|
|
creds criptate NU supravietuiesc unui restart (acceptabil — modelul e efemer,
|
|
ROAAUTO re-trimite). Pentru productie seteaza o cheie persistenta (vezi README/deploy).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from functools import lru_cache
|
|
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
|
|
from .config import get_settings
|
|
|
|
|
|
@lru_cache
|
|
def _fernet() -> Fernet:
|
|
key = get_settings().creds_key
|
|
if key:
|
|
return Fernet(key.encode() if isinstance(key, str) else key)
|
|
generated = Fernet.generate_key()
|
|
print(
|
|
"[crypto] AUTOPASS_creds_key nesetat — cheie efemera generata; creds "
|
|
"criptate NU supravietuiesc restartului worker-ului/API-ului",
|
|
flush=True,
|
|
)
|
|
return Fernet(generated)
|
|
|
|
|
|
def reset_cache() -> None:
|
|
"""Reseteaza cheia memorata (pentru teste care schimba env-ul)."""
|
|
_fernet.cache_clear()
|
|
|
|
|
|
def validate_creds_key() -> None:
|
|
"""Fail-fast la startup: o cheie `creds_key` setata DAR invalida trebuie sa
|
|
opreasca pornirea, nu sa explodeze abia la primul POST /v1/prezentari (500
|
|
brut, fara mesaj util pentru client — cazul real reprodus din ROAAUTO/VFP).
|
|
|
|
Cheie nesetata = OK (modelul efemer, vezi _fernet). Cheie setata si invalida
|
|
(lungime/padding gresit) -> RuntimeError cu instructiunea de generare.
|
|
"""
|
|
key = get_settings().creds_key
|
|
if not key:
|
|
return
|
|
try:
|
|
Fernet(key.encode() if isinstance(key, str) else key)
|
|
except (ValueError, TypeError) as exc:
|
|
raise RuntimeError(
|
|
"AUTOPASS_CREDS_KEY este setata dar invalida (Fernet cere 32 bytes "
|
|
"url-safe base64, 44 caractere terminate in '='). Genereaza una cu:\n"
|
|
" python3 -c \"from cryptography.fernet import Fernet; "
|
|
"print(Fernet.generate_key().decode())\""
|
|
) from exc
|
|
|
|
|
|
def encrypt_creds(creds: dict) -> str:
|
|
"""Cripteaza un dict de creds -> token Fernet (str). Compact, fara spatii."""
|
|
blob = json.dumps(creds, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
|
return _fernet().encrypt(blob).decode("ascii")
|
|
|
|
|
|
def decrypt_creds(token: str | None) -> dict | None:
|
|
"""Decripteaza un token Fernet -> dict, sau None daca lipseste/cheie gresita/corupt."""
|
|
if not token:
|
|
return None
|
|
try:
|
|
plain = _fernet().decrypt(token.encode("ascii"))
|
|
data = json.loads(plain.decode("utf-8"))
|
|
return data if isinstance(data, dict) else None
|
|
except (InvalidToken, ValueError, TypeError):
|
|
return None
|