"""Redactare credentiale (CORE securitate). Corpul lui POST /v1/prezentari contine `rar_credentials.password` (creds RAR per-cerere, zero-storage). Aceste valori NU trebuie sa apara NICIODATA in: - raspunsuri de eroare (422 Pydantic echo-eaza `input` => parola) — vezi `app.main.validation_exception_handler`; - loguri / traceback-uri uvicorn — vezi `CredentialRedactingFilter`; - repr-ul modelelor (str(creds)) — vezi `RarCredentials` (Field repr=False). Modulul e pur (fara DB/HTTP), unit-testabil direct. """ from __future__ import annotations import logging import re from typing import Any MASK = "***REDACTED***" # Chei al caror continut e secret oriunde apar (case-insensitive). `denumire` # etc. raman in clar — doar credentialele si token-urile se mascheaza. SENSITIVE_KEYS = frozenset( { "password", "parola", "pwd", "pass", "rar_credentials", "credentials", "token", "jwt", "authorization", "api_key", "apikey", "x-api-key", "secret", } ) # Chei al caror continut e PII de identificare vehicul/proprietar: se logheaza DOAR # partial (ultimele 4), niciodata integral (L.142/GDPR). PII_PARTIAL_KEYS = frozenset({"vin", "nr_inmatriculare", "nr", "numar"}) def vin_partial(value: Any) -> str: """VIN/numar mascat partial: pastreaza ultimele 4 caractere, restul `…`. 'WVWZZZ1KZAW000123' -> 'WVW…0123'. Sub 4 caractere -> doar masca. Suficient pentru a corela un rand fara a expune identificatorul integral in jurnal. """ s = str(value if value is not None else "").strip() if not s: return "" if len(s) <= 4: return "…" return f"{s[:3]}…{s[-4:]}" if len(s) > 7 else f"…{s[-4:]}" def redact_pii(obj: Any) -> Any: """Ca `scrub`, plus mascare partiala a VIN/numar (PII_PARTIAL_KEYS). Folosit la scrierea jurnalului (observ.log_event): mai intai mascam credentialele integral (scrub), apoi reducem VIN/nr la forma partiala. Recursiv pe dict/list. """ if isinstance(obj, dict): out: dict = {} for k, v in obj.items(): if isinstance(k, str) and k.lower() in SENSITIVE_KEYS: out[k] = MASK elif isinstance(k, str) and k.lower() in PII_PARTIAL_KEYS and not isinstance(v, (dict, list)): out[k] = vin_partial(v) else: out[k] = redact_pii(v) return out if isinstance(obj, (list, tuple)): return [redact_pii(v) for v in obj] return obj def scrub(obj: Any) -> Any: """Copie a structurii cu valorile cheilor sensibile mascate, recursiv. Pentru `rar_credentials`/`credentials` masheaza intregul subarbore (nu doar `password`) — un dict de creds e secret integral. Listele si dict-urile se parcurg in adancime; scalarii trec neatinsi. """ if isinstance(obj, dict): out: dict = {} for k, v in obj.items(): if isinstance(k, str) and k.lower() in SENSITIVE_KEYS: out[k] = MASK else: out[k] = scrub(v) return out if isinstance(obj, (list, tuple)): return [scrub(v) for v in obj] return obj # Mascare in text liber (mesaje de log, traceback-uri formatate). Acopera formele # uzuale: JSON ("password": "x"), kwargs (password='x'), Bearer . _TEXT_PATTERNS = [ # "password": "secret" / 'password' : 'secret' (JSON / dict repr) re.compile( r"""(?P["']?(?:password|parola|pwd|pass|token|jwt|secret|api[_-]?key)["']?\s*[:=]\s*)""" r"""(?P["'])(?P(?:\\.|[^"'\\])*)(?P=q)""", re.IGNORECASE, ), # password=secret (fara ghilimele, pana la separator) re.compile( r"""(?P\b(?:password|parola|pwd|token|jwt|secret|api[_-]?key)\s*=\s*)(?P[^\s,;&)}\]]+)""", re.IGNORECASE, ), # Authorization: Bearer / Bearer eyJ... re.compile(r"""(?PBearer\s+)(?P[A-Za-z0-9._\-]+)""", re.IGNORECASE), ] def scrub_text(text: str) -> str: """Masheaza credentiale dintr-un sir liber (loguri, traceback).""" if not text: return text out = text for pat in _TEXT_PATTERNS: out = pat.sub(lambda m: m.group("key") + MASK, out) return out class CredentialRedactingFilter(logging.Filter): """Filtru de logging care masheaza credentiale din orice record emis. Atasat pe root + logger-ele uvicorn (vezi `install_log_redaction`). Lucreaza pe mesajul DEJA formatat (cu args interpolate), apoi goleste args ca formatter-ul sa nu reinterpoleze. Tot ce trece prin logging e curatat; parolele in variabile locale de traceback nu ajung in mesaj (Python nu formateaza locals implicit), deci raman protejate. """ def filter(self, record: logging.LogRecord) -> bool: try: msg = record.getMessage() except Exception: return True scrubbed = scrub_text(msg) if scrubbed != msg: record.msg = scrubbed record.args = () return True def install_log_redaction() -> None: """Instaleaza filtrul de redactare pe root + logger-ele uvicorn (idempotent).""" filt = CredentialRedactingFilter() targets = [ logging.getLogger(), # root logging.getLogger("uvicorn"), logging.getLogger("uvicorn.error"), logging.getLogger("uvicorn.access"), ] for lg in targets: if not any(isinstance(f, CredentialRedactingFilter) for f in lg.filters): lg.addFilter(filt)