"""Criptare simetrica pentru credentialele RAR efemere (zero-storage at rest). Plan sect. 5: parola RAR vine per-cerere, se stocheaza CRIPTATA in submission pana la primul login reusit pentru cont, apoi se sterge. JWT (30h) acopera restul trimiterilor. Cheia traieste doar in `AUTOPASS_creds_key` (env), niciodata in cod sau in DB. Daca `AUTOPASS_creds_key` nu e setat, generam o cheie EFEMERA la prima folosire: creds criptate NU supravietuiesc unui restart (acceptabil — modelul e efemer, ROAAUTO re-trimite). Pentru productie seteaza o cheie persistenta (vezi README/deploy). """ from __future__ import annotations import json from functools import lru_cache from cryptography.fernet import Fernet, InvalidToken from .config import get_settings @lru_cache def _fernet() -> Fernet: key = get_settings().creds_key if key: return Fernet(key.encode() if isinstance(key, str) else key) generated = Fernet.generate_key() print( "[crypto] AUTOPASS_creds_key nesetat — cheie efemera generata; creds " "criptate NU supravietuiesc restartului worker-ului/API-ului", flush=True, ) return Fernet(generated) def reset_cache() -> None: """Reseteaza cheia memorata (pentru teste care schimba env-ul).""" _fernet.cache_clear() def validate_creds_key() -> None: """Fail-fast la startup: o cheie `creds_key` setata DAR invalida trebuie sa opreasca pornirea, nu sa explodeze abia la primul POST /v1/prezentari (500 brut, fara mesaj util pentru client — cazul real reprodus din ROAAUTO/VFP). Cheie nesetata = OK (modelul efemer, vezi _fernet). Cheie setata si invalida (lungime/padding gresit) -> RuntimeError cu instructiunea de generare. """ key = get_settings().creds_key if not key: return try: Fernet(key.encode() if isinstance(key, str) else key) except (ValueError, TypeError) as exc: raise RuntimeError( "AUTOPASS_CREDS_KEY este setata dar invalida (Fernet cere 32 bytes " "url-safe base64, 44 caractere terminate in '='). Genereaza una cu:\n" " python3 -c \"from cryptography.fernet import Fernet; " "print(Fernet.generate_key().decode())\"" ) from exc def encrypt_creds(creds: dict) -> str: """Cripteaza un dict de creds -> token Fernet (str). Compact, fara spatii.""" blob = json.dumps(creds, separators=(",", ":"), ensure_ascii=False).encode("utf-8") return _fernet().encrypt(blob).decode("ascii") def decrypt_creds(token: str | None) -> dict | None: """Decripteaza un token Fernet -> dict, sau None daca lipseste/cheie gresita/corupt.""" if not token: return None try: plain = _fernet().decrypt(token.encode("ascii")) data = json.loads(plain.decode("utf-8")) return data if isinstance(data, dict) else None except (InvalidToken, ValueError, TypeError): return None