"""Teste US-009 (PRD 3.3): CSRF token per-sesiune + rate-limit signup.""" from __future__ import annotations import pytest from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse from starlette.middleware.sessions import SessionMiddleware from starlette.testclient import TestClient # ---- App minimal pentru teste CSRF ---- def _make_csrf_app() -> FastAPI: mini = FastAPI() mini.add_middleware( SessionMiddleware, secret_key="csrf-test-secret", session_cookie="autopass_session", https_only=False, same_site="strict", ) @mini.get("/login-sim") def login_sim(request: Request): """Simuleaza login: seteaza account_id in sesiune (ca set_session din US-003).""" from app.web.csrf import get_csrf_token request.session["account_id"] = 1 return {"token": get_csrf_token(request)} @mini.get("/csrf-token") def get_token(request: Request): from app.web.csrf import get_csrf_token return {"token": get_csrf_token(request)} @mini.post("/verify-csrf") async def verify(request: Request): from app.web.csrf import verify_csrf form = await request.form() submitted = form.get("csrf_token") verify_csrf(request, submitted) return {"ok": True} from app.web.csrf import CsrfError @mini.exception_handler(CsrfError) async def csrf_handler(request: Request, exc: CsrfError): return HTMLResponse("CSRF invalid", status_code=403) return mini @pytest.fixture() def csrf_client(): app = _make_csrf_app() with TestClient(app, follow_redirects=False) as c: yield c def test_get_csrf_token_stabil_per_sesiune(csrf_client): """get_csrf_token intoarce acelasi token in cadrul aceleiasi sesiuni.""" r1 = csrf_client.get("/csrf-token") token1 = r1.json()["token"] r2 = csrf_client.get("/csrf-token") token2 = r2.json()["token"] assert token1 == token2 assert len(token1) > 16 def test_verify_csrf_corect(csrf_client): """Token corect -> verify_csrf trece, raspuns 200.""" token = csrf_client.get("/csrf-token").json()["token"] resp = csrf_client.post("/verify-csrf", data={"csrf_token": token}) assert resp.status_code == 200 def test_verify_csrf_gresit_ridica(csrf_client): """Token gresit -> CsrfError -> 403. CSRF se enforce doar cand exista sesiune autentificata (account_id in sesiune). login-sim seteaza account_id + csrf_token, ca login-ul real din US-003. """ csrf_client.get("/login-sim") # initializeaza sesiunea autentificata (account_id + csrf_token) resp = csrf_client.post("/verify-csrf", data={"csrf_token": "token-fals-xxxx"}) assert resp.status_code == 403 def test_verify_csrf_lipsa_ridica(csrf_client): """Token lipsa (None) -> CsrfError -> 403.""" csrf_client.get("/login-sim") # initializeaza sesiunea autentificata resp = csrf_client.post("/verify-csrf", data={}) assert resp.status_code == 403 def test_verify_csrf_enforce_in_prod_fara_login(monkeypatch): """web_auth_required=True -> CSRF enforce chiar fara account_id (login CSRF fix). In prod, POST /login si /signup NU au account_id in sesiune, dar CSRF trebuie enforced (login CSRF: atacatorul forteaza victima sa se logheze in contul lui). Verifica ca verify_csrf ridica CsrfError pe POST fara token, si trece cu token valid. """ import tempfile, os tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "csrf_prod.db")) monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true") from app.config import get_settings get_settings.cache_clear() from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse from starlette.middleware.sessions import SessionMiddleware from starlette.testclient import TestClient from app.web.csrf import CsrfError, get_csrf_token, verify_csrf mini = FastAPI() mini.add_middleware(SessionMiddleware, secret_key="prod-test-key", session_cookie="autopass_session", https_only=False, same_site="strict") @mini.get("/form") def form(request: Request): return {"token": get_csrf_token(request)} @mini.post("/form") async def form_post(request: Request): form = await request.form() verify_csrf(request, form.get("csrf_token")) return {"ok": True} @mini.exception_handler(CsrfError) async def csrf_handler(request: Request, exc: CsrfError): return HTMLResponse("CSRF invalid", status_code=403) with TestClient(mini, follow_redirects=False) as c: # GET initializeaza token in sesiune (fara account_id, ca /login GET) tok = c.get("/form").json()["token"] # POST fara token -> 403 in prod (login CSRF protectie) r_bad = c.post("/form", data={}) assert r_bad.status_code == 403, f"asteptat 403, primit {r_bad.status_code}" # POST cu token valid -> 200 r_ok = c.post("/form", data={"csrf_token": tok}) assert r_ok.status_code == 200, f"asteptat 200, primit {r_ok.status_code}" get_settings.cache_clear() # ---- Teste rate-limit ---- def test_rate_limit_permite_sub_prag(): """check_rate_limit permite N cereri sub prag.""" from app.web.ratelimit import check_rate_limit key = "ip_test_sub_prag_unic" for _ in range(3): assert check_rate_limit(key, max_hits=5, window_s=3600) is True def test_rate_limit_blocheaza_peste_prag(): """check_rate_limit blocheaza la a (max_hits+1)-a cerere.""" from app.web.ratelimit import check_rate_limit key = "ip_test_bloc_unic" for _ in range(3): check_rate_limit(key, max_hits=3, window_s=3600) # A 4-a cerere -> False assert check_rate_limit(key, max_hits=3, window_s=3600) is False def test_rate_limit_fereastra_glisanta(): """Dupa expirarea ferestrei, limiterul permite din nou.""" import time from app.web.ratelimit import check_rate_limit key = "ip_test_fereastra_unic" # Umple fereastra cu window_s=0 (toate timestamp-urile expira imediat) check_rate_limit(key, max_hits=1, window_s=0) check_rate_limit(key, max_hits=1, window_s=0) # Cu window_s=0 toate sunt expirate -> urmatoarea e permisa assert check_rate_limit(key, max_hits=1, window_s=0) is True