"""Autentificare API-key per cont (CORE securitate). Cheile API sunt separate de credentialele RAR: identifica CONTUL ROAAUTO care foloseste gateway-ul, nu utilizatorul RAR. Stocam doar SHA-256 al cheii (tabela `api_keys`), niciodata cheia in clar — emisa o singura data la creare. Enforcement controlat de `AUTOPASS_require_api_key`: - True (prod): orice /v1/* protejat cere cheie valida -> 401 fara/cu cheie gresita. - False (dev/test): fara cheie -> cont implicit id=1 (back-compat); cheie prezenta dar invalida -> tot 401 (o cheie gresita nu trebuie sa treaca tacit). Lifecycle (emitere/rotire/revocare) se face din CLI `python -m tools.apikey` (adminul ruleaza pe masina) — nicio suprafata HTTP de admin de securizat. """ from __future__ import annotations import hashlib import secrets import sqlite3 from datetime import datetime, timezone from fastapi import Depends, Header, HTTPException, Request from .config import get_settings from .db import get_connection from .mapping import DEFAULT_ACCOUNT_ID KEY_PREFIX = "rfak_" # romfast autopass key def generate_key() -> str: """Cheie noua in clar (emisa o singura data). `rfak_` + 43 caractere urlsafe.""" return KEY_PREFIX + secrets.token_urlsafe(32) def hash_key(plaintext: str) -> str: """SHA-256 hex al cheii. Comparam mereu pe hash, niciodata pe clar.""" return hashlib.sha256(plaintext.strip().encode("utf-8")).hexdigest() def create_api_key(conn: sqlite3.Connection, account_id: int) -> str: """Emite o cheie pentru cont, stocheaza hash-ul, intoarce cheia in clar. Verifica intai ca exista contul (FK ON DELETE CASCADE nu raporteaza un cont inexistent la INSERT decat ca eroare obscura). Intoarce cheia in clar — NU se mai poate recupera ulterior. """ acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone() if not acct: raise ValueError(f"cont inexistent: {account_id}") plaintext = generate_key() conn.execute( "INSERT INTO api_keys (account_id, key_hash, active) VALUES (?, ?, 1)", (account_id, hash_key(plaintext)), ) return plaintext def revoke_api_key(conn: sqlite3.Connection, key_id: int) -> bool: """Revoca o cheie (active=0 + revoked_at). Intoarce True daca a fost activa.""" cur = conn.execute( "UPDATE api_keys SET active=0, revoked_at=datetime('now') WHERE id=? AND active=1", (key_id,), ) return cur.rowcount > 0 def rotate_api_key(conn: sqlite3.Connection, account_id: int) -> str: """Revoca toate cheile active ale contului si emite una noua. Intoarce cheia noua.""" conn.execute( "UPDATE api_keys SET active=0, revoked_at=datetime('now') WHERE account_id=? AND active=1", (account_id,), ) return create_api_key(conn, account_id) def list_keys(conn: sqlite3.Connection, account_id: int | None = None) -> list[dict]: """Metadate chei (FARA hash). Pentru CLI list.""" if account_id is not None: rows = conn.execute( "SELECT id, account_id, active, created_at, revoked_at FROM api_keys " "WHERE account_id=? ORDER BY id", (account_id,), ).fetchall() else: rows = conn.execute( "SELECT id, account_id, active, created_at, revoked_at FROM api_keys ORDER BY id" ).fetchall() return [dict(r) for r in rows] def account_for_key(conn: sqlite3.Connection, plaintext: str) -> int | None: """account_id pentru o cheie activa, sau None daca lipseste/revocata.""" if not plaintext: return None row = conn.execute( "SELECT account_id FROM api_keys WHERE key_hash=? AND active=1", (hash_key(plaintext),), ).fetchone() return int(row["account_id"]) if row else None def _extract_key(x_api_key: str | None, authorization: str | None) -> str | None: """Cheia din `X-API-Key` sau `Authorization: Bearer ` (prima are prioritate).""" if x_api_key and x_api_key.strip(): return x_api_key.strip() if authorization and authorization.strip(): parts = authorization.strip().split(None, 1) if len(parts) == 2 and parts[0].lower() == "bearer": return parts[1].strip() return None def _log_auth_esuat(request: Request | None, plaintext: str | None, motiv: str) -> None: """Eveniment de jurnal pentru un esec de auth: IP + prefix cheie, NU cheia. Best-effort (log_event inghite erorile). Import local: evita cuplarea la import-time (observ -> db; auth -> db) si pastreaza auth.py importabil din CLI fara efecte. """ from .observ import log_event ip = None if request is not None and request.client is not None: ip = request.client.host prefix = (plaintext[:8] + "…") if plaintext else None log_event( "api_auth_esuat", nivel="WARNING", cod="RAR_CREDS_INVALIDE" if plaintext else None, mesaj=motiv, context={"ip": ip, "key_prefix": prefix}, ) def resolve_account_id( request: Request, x_api_key: str | None = Header(default=None, alias="X-API-Key"), authorization: str | None = Header(default=None), ) -> int: """Dependency FastAPI: contul cererii din cheia API. - cheie valida -> account_id al cheii - cheie invalida (prezenta) -> 401 (mereu, indiferent de flag) - fara cheie + flag off -> cont implicit (id=1), back-compat - fara cheie + flag on -> 401 Esecurile de auth (401) emit `api_auth_esuat` cu IP + prefix cheie. """ settings = get_settings() plaintext = _extract_key(x_api_key, authorization) if plaintext is None: if settings.require_api_key: _log_auth_esuat(request, None, "cheie API lipsa (prod)") raise HTTPException(status_code=401, detail="cheie API lipsa") return DEFAULT_ACCOUNT_ID conn = get_connection() try: account_id = account_for_key(conn, plaintext) finally: conn.close() if account_id is None: _log_auth_esuat(request, plaintext, "cheie API invalida sau revocata") raise HTTPException(status_code=401, detail="cheie API invalida sau revocata") return account_id def require_api_access( account_id: int = Depends(resolve_account_id), ) -> int: """Dependency FastAPI (T4, PRD 5.17): verifica ca tier-ul efectiv permite accesul la API. Reguli: - enforce_plans=False (kill-switch): sare verificarea. - dev id=1 cu require_api_key=False: bypass (dogfooding, testele existente nu pica). - Pro/Premium sau trial Pro activ: permit. - Free/Standard fara trial: 403 PLAN_FARA_API cu eroare 3 niveluri. Refoloseste resolve_account_id (account_id deja rezolvat din cheie API). Se ataseaza ca Depends() pe rutele de ingestie API (POST /v1/prezentari, POST /v1/import, POST /v1/import/{id}/commit). valideaza + nomenclator raman libere. """ from .plans import PLANS, effective_tier from .errors import eroare as _eroare settings = get_settings() # Kill-switch operare: sare toate gate-urile de plan. if not settings.enforce_plans: return account_id # Bypass pentru contul implicit dev (id=1) in modul fara cheie API obligatorie. # In prod (require_api_key=True), id=1 nu are bypass implicit (cheie = obligatorie). if not settings.require_api_key and account_id == DEFAULT_ACCOUNT_ID: return account_id conn = get_connection() try: row = conn.execute( "SELECT tier, trial_until FROM accounts WHERE id=?", (account_id,) ).fetchone() finally: conn.close() now = datetime.now(timezone.utc) et = effective_tier(row, now) if not PLANS[et].get("api_access"): from .observ import log_event log_event( "plan_api_refuzat", account_id=account_id, nivel="WARNING", mesaj=f"Acces API refuzat: tier efectiv={et}", context={"tier_efectiv": et}, ) raise HTTPException( status_code=403, detail=_eroare( "PLAN_FARA_API", cauza=f"Tier efectiv: {et}. API disponibil pe Pro/Premium.", ), ) return account_id