diff --git a/app/api/v1/router.py b/app/api/v1/router.py index aa97ebe..cbc943b 100644 --- a/app/api/v1/router.py +++ b/app/api/v1/router.py @@ -13,9 +13,10 @@ from __future__ import annotations import json -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field +from ...auth import resolve_account_id from ...db import get_connection from ...idempotency import idempotency_key from ...mapping import ( @@ -33,17 +34,20 @@ router = APIRouter(prefix="/v1", tags=["v1"]) @router.post("/prezentari", response_model=PrezentariResponse) -def create_prezentari(req: PrezentareRequest) -> PrezentariResponse: +def create_prezentari( + req: PrezentareRequest, + account_id: int = Depends(resolve_account_id), +) -> PrezentariResponse: """Enqueue una/mai multe prezentari. Idempotent: continut identic -> acelasi submission. Validarea de continut (T3, app.validation) ruleaza inainte de enqueue: esecurile NU resping cererea, ci enqueue-aza cu status `needs_data` + motiv (plan.md sect. 3). JSON malformat -> 422 din Pydantic (validare de shape). - TODO(auth): rezolva account_id din API key (acum None). + account_id vine din cheia API (resolve_account_id): cont real cu cheie, + implicit id=1 in dev fara cheie, 401 fara cheie valida in prod. Nota: rar_credentials NU se persista (zero-storage) — worker-ul le va primi pe alt canal (T2); in schelet enqueue-ul doar stocheaza prezentarea. """ - account_id = None # TODO(auth): din API key acct = account_or_default(account_id) conn = get_connection() results: list[SubmissionResult] = [] @@ -174,18 +178,21 @@ def get_mapari_pending() -> dict: class MapareIn(BaseModel): - account_id: int | None = None cod_op_service: str = Field(..., min_length=1) cod_prestatie: str = Field(..., min_length=1) auto_send: bool = True @router.post("/mapari") -def create_mapare(req: MapareIn) -> dict: +def create_mapare( + req: MapareIn, + account_id: int = Depends(resolve_account_id), +) -> dict: """Salveaza/actualizeaza o mapare op->cod si re-rezolva submission-urile blocate. - Verifica intai ca `cod_prestatie` exista in nomenclator (nu lasam mapari catre - coduri inexistente). Apoi upsert + re-rezolvare automata a `needs_mapping`. + Contul vine din cheia API (NU din body) — un cont nu poate edita maparile + altuia. Verifica intai ca `cod_prestatie` exista in nomenclator (nu lasam + mapari catre coduri inexistente). Apoi upsert + re-rezolvare `needs_mapping`. """ conn = get_connection() try: @@ -195,8 +202,8 @@ def create_mapare(req: MapareIn) -> dict: ).fetchone() if not exists: raise HTTPException(status_code=422, detail=f"cod_prestatie '{cod}' nu exista in nomenclator") - save_mapping(conn, req.account_id, req.cod_op_service, cod, req.auto_send) - stats = reresolve_account(conn, req.account_id) + save_mapping(conn, account_id, req.cod_op_service, cod, req.auto_send) + stats = reresolve_account(conn, account_id) return {"saved": {"cod_op_service": req.cod_op_service.strip(), "cod_prestatie": cod}, "reresolve": stats} finally: conn.close() diff --git a/app/auth.py b/app/auth.py new file mode 100644 index 0000000..748da15 --- /dev/null +++ b/app/auth.py @@ -0,0 +1,140 @@ +"""Autentificare API-key per cont (CORE securitate). + +Cheile API sunt separate de credentialele RAR: identifica CONTUL ROAAUTO care +foloseste gateway-ul, nu utilizatorul RAR. Stocam doar SHA-256 al cheii (tabela +`api_keys`), niciodata cheia in clar — emisa o singura data la creare. + +Enforcement controlat de `AUTOPASS_require_api_key`: + - True (prod): orice /v1/* protejat cere cheie valida -> 401 fara/cu cheie gresita. + - False (dev/test): fara cheie -> cont implicit id=1 (back-compat); cheie prezenta + dar invalida -> tot 401 (o cheie gresita nu trebuie sa treaca tacit). + +Lifecycle (emitere/rotire/revocare) se face din CLI `python -m tools.apikey` +(adminul ruleaza pe masina) — nicio suprafata HTTP de admin de securizat. +""" + +from __future__ import annotations + +import hashlib +import secrets +import sqlite3 + +from fastapi import Header, HTTPException + +from .config import get_settings +from .db import get_connection +from .mapping import DEFAULT_ACCOUNT_ID + +KEY_PREFIX = "rfak_" # romfast autopass key + + +def generate_key() -> str: + """Cheie noua in clar (emisa o singura data). `rfak_` + 43 caractere urlsafe.""" + return KEY_PREFIX + secrets.token_urlsafe(32) + + +def hash_key(plaintext: str) -> str: + """SHA-256 hex al cheii. Comparam mereu pe hash, niciodata pe clar.""" + return hashlib.sha256(plaintext.strip().encode("utf-8")).hexdigest() + + +def create_api_key(conn: sqlite3.Connection, account_id: int) -> str: + """Emite o cheie pentru cont, stocheaza hash-ul, intoarce cheia in clar. + + Verifica intai ca exista contul (FK ON DELETE CASCADE nu raporteaza un cont + inexistent la INSERT decat ca eroare obscura). Intoarce cheia in clar — NU se + mai poate recupera ulterior. + """ + acct = conn.execute("SELECT 1 FROM accounts WHERE id=?", (account_id,)).fetchone() + if not acct: + raise ValueError(f"cont inexistent: {account_id}") + plaintext = generate_key() + conn.execute( + "INSERT INTO api_keys (account_id, key_hash, active) VALUES (?, ?, 1)", + (account_id, hash_key(plaintext)), + ) + return plaintext + + +def revoke_api_key(conn: sqlite3.Connection, key_id: int) -> bool: + """Revoca o cheie (active=0 + revoked_at). Intoarce True daca a fost activa.""" + cur = conn.execute( + "UPDATE api_keys SET active=0, revoked_at=datetime('now') WHERE id=? AND active=1", + (key_id,), + ) + return cur.rowcount > 0 + + +def rotate_api_key(conn: sqlite3.Connection, account_id: int) -> str: + """Revoca toate cheile active ale contului si emite una noua. Intoarce cheia noua.""" + conn.execute( + "UPDATE api_keys SET active=0, revoked_at=datetime('now') WHERE account_id=? AND active=1", + (account_id,), + ) + return create_api_key(conn, account_id) + + +def list_keys(conn: sqlite3.Connection, account_id: int | None = None) -> list[dict]: + """Metadate chei (FARA hash). Pentru CLI list.""" + if account_id is not None: + rows = conn.execute( + "SELECT id, account_id, active, created_at, revoked_at FROM api_keys " + "WHERE account_id=? ORDER BY id", + (account_id,), + ).fetchall() + else: + rows = conn.execute( + "SELECT id, account_id, active, created_at, revoked_at FROM api_keys ORDER BY id" + ).fetchall() + return [dict(r) for r in rows] + + +def account_for_key(conn: sqlite3.Connection, plaintext: str) -> int | None: + """account_id pentru o cheie activa, sau None daca lipseste/revocata.""" + if not plaintext: + return None + row = conn.execute( + "SELECT account_id FROM api_keys WHERE key_hash=? AND active=1", + (hash_key(plaintext),), + ).fetchone() + return int(row["account_id"]) if row else None + + +def _extract_key(x_api_key: str | None, authorization: str | None) -> str | None: + """Cheia din `X-API-Key` sau `Authorization: Bearer ` (prima are prioritate).""" + if x_api_key and x_api_key.strip(): + return x_api_key.strip() + if authorization and authorization.strip(): + parts = authorization.strip().split(None, 1) + if len(parts) == 2 and parts[0].lower() == "bearer": + return parts[1].strip() + return None + + +def resolve_account_id( + x_api_key: str | None = Header(default=None, alias="X-API-Key"), + authorization: str | None = Header(default=None), +) -> int: + """Dependency FastAPI: contul cererii din cheia API. + + - cheie valida -> account_id al cheii + - cheie invalida (prezenta) -> 401 (mereu, indiferent de flag) + - fara cheie + flag off -> cont implicit (id=1), back-compat + - fara cheie + flag on -> 401 + """ + settings = get_settings() + plaintext = _extract_key(x_api_key, authorization) + + if plaintext is None: + if settings.require_api_key: + raise HTTPException(status_code=401, detail="cheie API lipsa") + return DEFAULT_ACCOUNT_ID + + conn = get_connection() + try: + account_id = account_for_key(conn, plaintext) + finally: + conn.close() + if account_id is None: + raise HTTPException(status_code=401, detail="cheie API invalida sau revocata") + return account_id diff --git a/app/config.py b/app/config.py index c74fbd4..a575065 100644 --- a/app/config.py +++ b/app/config.py @@ -22,6 +22,12 @@ class Settings(BaseSettings): # --- Bază de date --- db_path: Path = ROOT / "data" / "autopass.db" + # --- Securitate (CORE) --- + # Enforcement auth API-key pe /v1/* protejat. False (dev/test): fara cheie -> + # cont implicit id=1. True (prod): fara cheie valida -> 401. O cheie PREZENTA + # dar invalida da 401 indiferent de flag. + require_api_key: bool = False + # --- RAR --- rar_env: str = "test" # "test" | "prod" rar_base_url_test: str = "https://apps.rarom.ro/test-rar-autopass" diff --git a/app/main.py b/app/main.py index 9b7e654..81ed1f9 100644 --- a/app/main.py +++ b/app/main.py @@ -11,24 +11,41 @@ from __future__ import annotations from contextlib import asynccontextmanager from datetime import datetime, timezone -from fastapi import FastAPI -from fastapi.responses import PlainTextResponse +from fastapi import FastAPI, Request +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse, PlainTextResponse from . import __version__ from .api.v1.router import router as api_v1_router from .config import get_settings from .db import get_connection, init_db, queue_depth, read_heartbeat +from .security import install_log_redaction from .web.routes import router as web_router @asynccontextmanager async def lifespan(app: FastAPI): + install_log_redaction() init_db() yield app = FastAPI(title="Gateway RAR AUTOPASS", version=__version__, lifespan=lifespan) + +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse: + """422 fara echo de credentiale. + + Pydantic include implicit `input` (+ uneori `ctx`) in fiecare eroare — pe + /v1/prezentari asta ar reflecta inapoi `rar_credentials.password`. Pastram + type/loc/msg (clientul stie ce camp e gresit) si DROP-am input/ctx. Defense + in depth pe TOATE rutele, nu doar prezentari. + """ + cleaned = [{"type": e.get("type"), "loc": e.get("loc"), "msg": e.get("msg")} for e in exc.errors()] + return JSONResponse(status_code=422, content={"detail": cleaned}) + + app.include_router(api_v1_router) app.include_router(web_router) diff --git a/app/models.py b/app/models.py index 5f874b9..d7b92cd 100644 --- a/app/models.py +++ b/app/models.py @@ -15,7 +15,8 @@ class RarCredentials(BaseModel): """Credentiale RAR per-cerere (vin de la ROAAUTO din Oracle). NU se stocheaza.""" email: str - password: str + # repr=False: str(creds) / loguri care fac repr pe model NU expun parola. + password: str = Field(..., repr=False) class PrestatieItem(BaseModel): diff --git a/app/security.py b/app/security.py new file mode 100644 index 0000000..02d2a03 --- /dev/null +++ b/app/security.py @@ -0,0 +1,124 @@ +"""Redactare credentiale (CORE securitate). + +Corpul lui POST /v1/prezentari contine `rar_credentials.password` (creds RAR +per-cerere, zero-storage). Aceste valori NU trebuie sa apara NICIODATA in: + - raspunsuri de eroare (422 Pydantic echo-eaza `input` => parola) — vezi + `app.main.validation_exception_handler`; + - loguri / traceback-uri uvicorn — vezi `CredentialRedactingFilter`; + - repr-ul modelelor (str(creds)) — vezi `RarCredentials` (Field repr=False). + +Modulul e pur (fara DB/HTTP), unit-testabil direct. +""" + +from __future__ import annotations + +import logging +import re +from typing import Any + +MASK = "***REDACTED***" + +# Chei al caror continut e secret oriunde apar (case-insensitive). `denumire` +# etc. raman in clar — doar credentialele si token-urile se mascheaza. +SENSITIVE_KEYS = frozenset( + { + "password", + "parola", + "pwd", + "pass", + "rar_credentials", + "credentials", + "token", + "jwt", + "authorization", + "api_key", + "apikey", + "x-api-key", + "secret", + } +) + + +def scrub(obj: Any) -> Any: + """Copie a structurii cu valorile cheilor sensibile mascate, recursiv. + + Pentru `rar_credentials`/`credentials` masheaza intregul subarbore (nu doar + `password`) — un dict de creds e secret integral. Listele si dict-urile se + parcurg in adancime; scalarii trec neatinsi. + """ + if isinstance(obj, dict): + out: dict = {} + for k, v in obj.items(): + if isinstance(k, str) and k.lower() in SENSITIVE_KEYS: + out[k] = MASK + else: + out[k] = scrub(v) + return out + if isinstance(obj, (list, tuple)): + return [scrub(v) for v in obj] + return obj + + +# Mascare in text liber (mesaje de log, traceback-uri formatate). Acopera formele +# uzuale: JSON ("password": "x"), kwargs (password='x'), Bearer . +_TEXT_PATTERNS = [ + # "password": "secret" / 'password' : 'secret' (JSON / dict repr) + re.compile( + r"""(?P["']?(?:password|parola|pwd|pass|token|jwt|secret|api[_-]?key)["']?\s*[:=]\s*)""" + r"""(?P["'])(?P(?:\\.|[^"'\\])*)(?P=q)""", + re.IGNORECASE, + ), + # password=secret (fara ghilimele, pana la separator) + re.compile( + r"""(?P\b(?:password|parola|pwd|token|jwt|secret|api[_-]?key)\s*=\s*)(?P[^\s,;&)}\]]+)""", + re.IGNORECASE, + ), + # Authorization: Bearer / Bearer eyJ... + re.compile(r"""(?PBearer\s+)(?P[A-Za-z0-9._\-]+)""", re.IGNORECASE), +] + + +def scrub_text(text: str) -> str: + """Masheaza credentiale dintr-un sir liber (loguri, traceback).""" + if not text: + return text + out = text + for pat in _TEXT_PATTERNS: + out = pat.sub(lambda m: m.group("key") + MASK, out) + return out + + +class CredentialRedactingFilter(logging.Filter): + """Filtru de logging care masheaza credentiale din orice record emis. + + Atasat pe root + logger-ele uvicorn (vezi `install_log_redaction`). Lucreaza + pe mesajul DEJA formatat (cu args interpolate), apoi goleste args ca + formatter-ul sa nu reinterpoleze. Tot ce trece prin logging e curatat; + parolele in variabile locale de traceback nu ajung in mesaj (Python nu + formateaza locals implicit), deci raman protejate. + """ + + def filter(self, record: logging.LogRecord) -> bool: + try: + msg = record.getMessage() + except Exception: + return True + scrubbed = scrub_text(msg) + if scrubbed != msg: + record.msg = scrubbed + record.args = () + return True + + +def install_log_redaction() -> None: + """Instaleaza filtrul de redactare pe root + logger-ele uvicorn (idempotent).""" + filt = CredentialRedactingFilter() + targets = [ + logging.getLogger(), # root + logging.getLogger("uvicorn"), + logging.getLogger("uvicorn.error"), + logging.getLogger("uvicorn.access"), + ] + for lg in targets: + if not any(isinstance(f, CredentialRedactingFilter) for f in lg.filters): + lg.addFilter(filt) diff --git a/docs/plans/plan.md b/docs/plans/plan.md index ba32c4a..f9d6948 100644 --- a/docs/plans/plan.md +++ b/docs/plans/plan.md @@ -207,6 +207,13 @@ Nimic din cod nu e scris încă (`app/`, `tools/` nu există). Ordine recomandat retry/backoff exponential (peste `worker_max_retries` → error+banner), re-login la token expirat. Rută monitorizare descoperită live: `GET /prezentari/getAllPrezentariFinalizate` → `data.content` (filtrele nu merg pe test → fetch tot, match client-side). Verify: 15 teste (`tests/test_worker_reconcile.py`) + validare LIVE (reconciliere record 68514 din finalizate reale). +- [x] **Securitate CORE (P1)** ✅ 2026-06-15. `app/security.py` (redactare creds) + `app/auth.py` (API-key per cont) + + `tools/apikey.py` (CLI emitere/rotire/revocare). Redactare: handler `RequestValidationError` care DROP-ează + `input`/`ctx` din 422 (vectorul de scurgere a parolei pe `/v1/prezentari`), filtru logging `scrub_text` pe root+uvicorn, + `password` cu `repr=False` în model. Auth: hash SHA-256 în `api_keys` (cheia în clar emisă o singură dată), header + `X-API-Key` / `Authorization: Bearer`, enforcement pe flag `AUTOPASS_require_api_key` (prod on→401, dev off→cont default + id=1; cheie prezentă invalidă→401 mereu). `account_id` real curge din cheie în ingestie + mapare. Verify: 16 teste + (`tests/test_security.py`). **Rămas:** livrare creds per-cerere ROAAUTO→worker (zero-storage, ramășiță T2). - [ ] **T6 (P2) — worker proces/container propriu supravegheat;** `/healthz` pică → restart. Verify: worker omorât → restart automat. - [ ] **T7 (P2) — deploy:** SQLite pe volum persistent numit + backup (singura copie durabilă, re-push scos). Verify: recreare container → coada supraviețuiește. diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 0000000..ba4f911 --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,241 @@ +"""Teste CORE securitate: redactare creds (422/log/repr) + auth API-key.""" + +from __future__ import annotations + +import logging +import os +import tempfile + +import pytest +from fastapi.testclient import TestClient + +from app.security import ( + MASK, + CredentialRedactingFilter, + scrub, + scrub_text, +) + + +# --------------------------------------------------------------------------- # +# Redactare — pur (scrub / scrub_text / repr) # +# --------------------------------------------------------------------------- # + +def test_scrub_masks_password_keys(): + data = {"email": "x@y.ro", "rar_credentials": {"email": "a", "password": "SECRET"}, "vin": "ABC"} + out = scrub(data) + assert out["rar_credentials"] == MASK # subarbore creds mascat integral + assert out["vin"] == "ABC" # date non-sensibile neatinse + assert "SECRET" not in repr(out) + + +def test_scrub_nested_password_in_list(): + data = {"items": [{"password": "p1"}, {"denumire": "ok"}]} + out = scrub(data) + assert out["items"][0]["password"] == MASK + assert out["items"][1]["denumire"] == "ok" + + +def test_scrub_text_json_form(): + s = '{"email": "x@y.ro", "password": "hunter2"}' + assert "hunter2" not in scrub_text(s) + assert MASK in scrub_text(s) + + +def test_scrub_text_kwargs_and_bearer(): + assert "topsecret" not in scrub_text("login password=topsecret done") + assert "eyJabc.def" not in scrub_text("Authorization: Bearer eyJabc.def") + + +def test_scrub_text_keeps_innocent(): + s = "submission 5 -> queued (vin=ABC123)" + assert scrub_text(s) == s + + +def test_credentials_repr_hides_password(): + from app.models import RarCredentials + + c = RarCredentials(email="x@y.ro", password="hunter2") + assert "hunter2" not in repr(c) + assert c.password == "hunter2" # valoarea ramane accesibila in cod + + +def test_logging_filter_redacts(caplog): + logger = logging.getLogger("test.redact") + logger.addFilter(CredentialRedactingFilter()) + with caplog.at_level(logging.INFO, logger="test.redact"): + logger.info('payload {"password": "hunter2"}') + assert "hunter2" not in caplog.text + assert MASK in caplog.text + + +# --------------------------------------------------------------------------- # +# Integrare API — 422 fara echo creds + auth API-key # +# --------------------------------------------------------------------------- # + +@pytest.fixture() +def env(monkeypatch): + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) + from app.config import get_settings + + get_settings.cache_clear() + yield monkeypatch + get_settings.cache_clear() + + +def _client(): + from app.main import app + + return TestClient(app) + + +def _body(**over): + prez = { + "vin": "WVWZZZ1KZAW000123", + "nr_inmatriculare": "B999TST", + "data_prestatie": "2026-06-15", + "odometru_final": "123456", + "prestatii": [{"cod_prestatie": "OE-1"}], + } + prez.update(over) + return {"rar_credentials": {"email": "x@y.ro", "password": "HUNTER2SECRET"}, "prezentari": [prez]} + + +def test_422_does_not_echo_password(env): + with _client() as c: + # Lipseste odometru_final -> 422 de shape; parola NU trebuie sa apara in raspuns. + bad = _body() + del bad["prezentari"][0]["odometru_final"] + r = c.post("/v1/prezentari", json=bad) + assert r.status_code == 422 + assert "HUNTER2SECRET" not in r.text + # Pastram totusi info utila clientului (ce camp lipseste). + assert any("odometru_final" in str(e.get("loc")) for e in r.json()["detail"]) + # input/ctx eliminate (vectorul de scurgere). + assert all("input" not in e for e in r.json()["detail"]) + + +def test_422_missing_credentials_no_leak(env): + with _client() as c: + bad = _body() + bad["rar_credentials"]["password"] = 12345 # tip gresit -> 422 cu input=parola + r = c.post("/v1/prezentari", json=bad) + assert r.status_code == 422 + assert "12345" not in r.text + + +def test_dev_no_key_uses_default_account(env): + # Flag off (default): fara cheie -> cont implicit, merge ca inainte. + with _client() as c: + r = c.post("/v1/prezentari", json=_body()) + assert r.status_code == 200 + assert r.json()["results"][0]["status"] == "queued" + + +def test_prod_requires_key(env): + env.setenv("AUTOPASS_REQUIRE_API_KEY", "true") + from app.config import get_settings + + get_settings.cache_clear() + with _client() as c: + r = c.post("/v1/prezentari", json=_body()) + assert r.status_code == 401 + # Cu cheie valida emisa via lifecycle -> trece. + from app.auth import create_api_key + from app.db import get_connection + + conn = get_connection() + try: + key = create_api_key(conn, 1) + finally: + conn.close() + r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": key}) + assert r2.status_code == 200 + assert r2.json()["results"][0]["status"] == "queued" + + +def test_invalid_key_rejected_even_in_dev(env): + # Flag off, dar o cheie PREZENTA si gresita nu trebuie sa treaca tacit. + with _client() as c: + r = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": "rfak_gresita"}) + assert r.status_code == 401 + + +def test_bearer_header_accepted(env): + with _client() as c: + from app.auth import create_api_key + from app.db import get_connection + + conn = get_connection() + try: + key = create_api_key(conn, 1) + finally: + conn.close() + r = c.post("/v1/prezentari", json=_body(), headers={"Authorization": f"Bearer {key}"}) + assert r.status_code == 200 + + +def test_key_account_routes_idempotency(env): + # Doua conturi, aceeasi prezentare -> submission-uri distincte (account_id in cheie). + with _client() as c: + from app.auth import create_api_key + from app.db import get_connection + + conn = get_connection() + try: + conn.execute("INSERT INTO accounts (id, name) VALUES (2, 'al-doilea')") + k1 = create_api_key(conn, 1) + k2 = create_api_key(conn, 2) + finally: + conn.close() + r1 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k1}) + r2 = c.post("/v1/prezentari", json=_body(), headers={"X-API-Key": k2}) + id1 = r1.json()["results"][0]["submission_id"] + res2 = r2.json()["results"][0] + assert res2["submission_id"] != id1 + assert res2["deduped"] is False + + +# --------------------------------------------------------------------------- # +# Lifecycle chei — create / rotate / revoke # +# --------------------------------------------------------------------------- # + +def test_key_lifecycle(env): + from app.auth import account_for_key, create_api_key, hash_key, revoke_api_key, rotate_api_key + from app.db import get_connection, init_db + + init_db() + conn = get_connection() + try: + key = create_api_key(conn, 1) + assert key.startswith("rfak_") + assert account_for_key(conn, key) == 1 + + # Rotire: cheia veche moare, una noua o inlocuieste. + new_key = rotate_api_key(conn, 1) + assert account_for_key(conn, key) is None + assert account_for_key(conn, new_key) == 1 + + # Revocare directa dupa id. + kid = conn.execute( + "SELECT id FROM api_keys WHERE key_hash=?", (hash_key(new_key),) + ).fetchone()["id"] + assert revoke_api_key(conn, kid) is True + assert account_for_key(conn, new_key) is None + assert revoke_api_key(conn, kid) is False # deja revocata + finally: + conn.close() + + +def test_create_key_unknown_account(env): + from app.auth import create_api_key + from app.db import get_connection, init_db + + init_db() + conn = get_connection() + try: + with pytest.raises(ValueError): + create_api_key(conn, 999) + finally: + conn.close() diff --git a/tools/apikey.py b/tools/apikey.py new file mode 100644 index 0000000..0b770e5 --- /dev/null +++ b/tools/apikey.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +"""CLI lifecycle chei API (CORE securitate). + +Emitere/rotire/revocare/listare chei per cont. Adminul ruleaza pe masina gateway +— nicio suprafata HTTP de admin. Cheia in clar se afiseaza O SINGURA DATA la +creare/rotire; in DB traieste doar hash-ul SHA-256. + +Utilizare: + python -m tools.apikey create --account 1 + python -m tools.apikey rotate --account 1 + python -m tools.apikey revoke --key-id 3 + python -m tools.apikey list [--account 1] +""" + +from __future__ import annotations + +import argparse +import sys + +from app.auth import create_api_key, list_keys, revoke_api_key, rotate_api_key +from app.db import get_connection, init_db + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Lifecycle chei API gateway RAR AUTOPASS") + sub = parser.add_subparsers(dest="cmd", required=True) + + p_create = sub.add_parser("create", help="emite o cheie noua pentru cont") + p_create.add_argument("--account", type=int, required=True, help="account_id") + + p_rotate = sub.add_parser("rotate", help="revoca cheile active ale contului + emite una noua") + p_rotate.add_argument("--account", type=int, required=True, help="account_id") + + p_revoke = sub.add_parser("revoke", help="revoca o cheie dupa id") + p_revoke.add_argument("--key-id", type=int, required=True, help="api_keys.id") + + p_list = sub.add_parser("list", help="listeaza chei (fara hash)") + p_list.add_argument("--account", type=int, default=None, help="filtreaza pe cont") + + args = parser.parse_args(argv) + + init_db() # asigura schema (api_keys) + cont default + conn = get_connection() + try: + if args.cmd == "create": + try: + key = create_api_key(conn, args.account) + except ValueError as exc: + print(f"eroare: {exc}", file=sys.stderr) + return 2 + print(f"Cheie creata pentru cont {args.account} (pastreaz-o, nu se mai afiseaza):") + print(key) + return 0 + + if args.cmd == "rotate": + try: + key = rotate_api_key(conn, args.account) + except ValueError as exc: + print(f"eroare: {exc}", file=sys.stderr) + return 2 + print(f"Chei vechi revocate. Cheie noua pentru cont {args.account}:") + print(key) + return 0 + + if args.cmd == "revoke": + ok = revoke_api_key(conn, args.key_id) + if ok: + print(f"Cheie {args.key_id} revocata.") + return 0 + print(f"Cheie {args.key_id} inexistenta sau deja revocata.", file=sys.stderr) + return 1 + + if args.cmd == "list": + rows = list_keys(conn, args.account) + if not rows: + print("(nicio cheie)") + return 0 + print(f"{'id':>4} {'cont':>4} {'activa':>6} {'creata':<20} revocata") + for r in rows: + print( + f"{r['id']:>4} {r['account_id']:>4} {('da' if r['active'] else 'nu'):>6} " + f"{(r['created_at'] or ''):<20} {r['revoked_at'] or ''}" + ) + return 0 + finally: + conn.close() + return 0 + + +if __name__ == "__main__": + sys.exit(main())