Canalul web trece de la 100% deschis (hardcodat cont 1) la autentificat si multi-tenant. Un service nou se inregistreaza din browser, primeste o cheie API (o singura data) si o sesiune; contul se creeaza "in asteptare" (active=0) si nu trimite la RAR pana la activarea de catre admin (tools/account.py activate). - users + app/users.py: parole scrypt (salt per-user, eticheta parametri onorata la verify pentru migrare cost), email unic case-insensitive - sesiune: SessionMiddleware (same_site=strict, https_only config) + app/web/session.py (current_account/web_account/require_login->LoginRequired, set_session clear-inainte) - CSRF (app/web/csrf.py) enforce in prod inclusiv pe login/signup + rate-limit in-proces (app/web/ratelimit.py) pe signup si login - signup/login/logout (app/web/auth_routes.py): signup tranzactie atomica, cheie-o-data, log SIGNUP pentru descoperire admin - dashboard + import scoped pe contul sesiunii (regula NULL->cont 1); toate rutele web care ating date sensibile sub require_login; nomenclator ramane global - banner "cont in asteptare" pentru conturi active=0 - gate worker: claim_one LEFT JOIN accounts COALESCE(active,1)=1 (account_id NULL=activ) VERIFY context curat (2 runde): leak cross-account /_fragments/mapari prins+reparat. /code-review high: csrf_token lipsa pe re-randari de eroare, scrypt_params ignorat, login fara rate-limit -- toate reparate. 361 teste pass (de la 313). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
178 lines
6.2 KiB
Python
178 lines
6.2 KiB
Python
"""Teste US-009 (PRD 3.3): CSRF token per-sesiune + rate-limit signup."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from starlette.testclient import TestClient
|
|
|
|
|
|
# ---- App minimal pentru teste CSRF ----
|
|
|
|
def _make_csrf_app() -> FastAPI:
|
|
mini = FastAPI()
|
|
mini.add_middleware(
|
|
SessionMiddleware,
|
|
secret_key="csrf-test-secret",
|
|
session_cookie="autopass_session",
|
|
https_only=False,
|
|
same_site="strict",
|
|
)
|
|
|
|
@mini.get("/login-sim")
|
|
def login_sim(request: Request):
|
|
"""Simuleaza login: seteaza account_id in sesiune (ca set_session din US-003)."""
|
|
from app.web.csrf import get_csrf_token
|
|
request.session["account_id"] = 1
|
|
return {"token": get_csrf_token(request)}
|
|
|
|
@mini.get("/csrf-token")
|
|
def get_token(request: Request):
|
|
from app.web.csrf import get_csrf_token
|
|
return {"token": get_csrf_token(request)}
|
|
|
|
@mini.post("/verify-csrf")
|
|
async def verify(request: Request):
|
|
from app.web.csrf import verify_csrf
|
|
form = await request.form()
|
|
submitted = form.get("csrf_token")
|
|
verify_csrf(request, submitted)
|
|
return {"ok": True}
|
|
|
|
from app.web.csrf import CsrfError
|
|
|
|
@mini.exception_handler(CsrfError)
|
|
async def csrf_handler(request: Request, exc: CsrfError):
|
|
return HTMLResponse("CSRF invalid", status_code=403)
|
|
|
|
return mini
|
|
|
|
|
|
@pytest.fixture()
|
|
def csrf_client():
|
|
app = _make_csrf_app()
|
|
with TestClient(app, follow_redirects=False) as c:
|
|
yield c
|
|
|
|
|
|
def test_get_csrf_token_stabil_per_sesiune(csrf_client):
|
|
"""get_csrf_token intoarce acelasi token in cadrul aceleiasi sesiuni."""
|
|
r1 = csrf_client.get("/csrf-token")
|
|
token1 = r1.json()["token"]
|
|
r2 = csrf_client.get("/csrf-token")
|
|
token2 = r2.json()["token"]
|
|
assert token1 == token2
|
|
assert len(token1) > 16
|
|
|
|
|
|
def test_verify_csrf_corect(csrf_client):
|
|
"""Token corect -> verify_csrf trece, raspuns 200."""
|
|
token = csrf_client.get("/csrf-token").json()["token"]
|
|
resp = csrf_client.post("/verify-csrf", data={"csrf_token": token})
|
|
assert resp.status_code == 200
|
|
|
|
|
|
def test_verify_csrf_gresit_ridica(csrf_client):
|
|
"""Token gresit -> CsrfError -> 403.
|
|
|
|
CSRF se enforce doar cand exista sesiune autentificata (account_id in sesiune).
|
|
login-sim seteaza account_id + csrf_token, ca login-ul real din US-003.
|
|
"""
|
|
csrf_client.get("/login-sim") # initializeaza sesiunea autentificata (account_id + csrf_token)
|
|
resp = csrf_client.post("/verify-csrf", data={"csrf_token": "token-fals-xxxx"})
|
|
assert resp.status_code == 403
|
|
|
|
|
|
def test_verify_csrf_lipsa_ridica(csrf_client):
|
|
"""Token lipsa (None) -> CsrfError -> 403."""
|
|
csrf_client.get("/login-sim") # initializeaza sesiunea autentificata
|
|
resp = csrf_client.post("/verify-csrf", data={})
|
|
assert resp.status_code == 403
|
|
|
|
|
|
def test_verify_csrf_enforce_in_prod_fara_login(monkeypatch):
|
|
"""web_auth_required=True -> CSRF enforce chiar fara account_id (login CSRF fix).
|
|
|
|
In prod, POST /login si /signup NU au account_id in sesiune, dar CSRF trebuie
|
|
enforced (login CSRF: atacatorul forteaza victima sa se logheze in contul lui).
|
|
Verifica ca verify_csrf ridica CsrfError pe POST fara token, si trece cu token valid.
|
|
"""
|
|
import tempfile, os
|
|
tmp = tempfile.mkdtemp()
|
|
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "csrf_prod.db"))
|
|
monkeypatch.setenv("AUTOPASS_WEB_AUTH_REQUIRED", "true")
|
|
from app.config import get_settings
|
|
get_settings.cache_clear()
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from starlette.testclient import TestClient
|
|
from app.web.csrf import CsrfError, get_csrf_token, verify_csrf
|
|
|
|
mini = FastAPI()
|
|
mini.add_middleware(SessionMiddleware, secret_key="prod-test-key",
|
|
session_cookie="autopass_session", https_only=False, same_site="strict")
|
|
|
|
@mini.get("/form")
|
|
def form(request: Request):
|
|
return {"token": get_csrf_token(request)}
|
|
|
|
@mini.post("/form")
|
|
async def form_post(request: Request):
|
|
form = await request.form()
|
|
verify_csrf(request, form.get("csrf_token"))
|
|
return {"ok": True}
|
|
|
|
@mini.exception_handler(CsrfError)
|
|
async def csrf_handler(request: Request, exc: CsrfError):
|
|
return HTMLResponse("CSRF invalid", status_code=403)
|
|
|
|
with TestClient(mini, follow_redirects=False) as c:
|
|
# GET initializeaza token in sesiune (fara account_id, ca /login GET)
|
|
tok = c.get("/form").json()["token"]
|
|
|
|
# POST fara token -> 403 in prod (login CSRF protectie)
|
|
r_bad = c.post("/form", data={})
|
|
assert r_bad.status_code == 403, f"asteptat 403, primit {r_bad.status_code}"
|
|
|
|
# POST cu token valid -> 200
|
|
r_ok = c.post("/form", data={"csrf_token": tok})
|
|
assert r_ok.status_code == 200, f"asteptat 200, primit {r_ok.status_code}"
|
|
|
|
get_settings.cache_clear()
|
|
|
|
|
|
# ---- Teste rate-limit ----
|
|
|
|
def test_rate_limit_permite_sub_prag():
|
|
"""check_rate_limit permite N cereri sub prag."""
|
|
from app.web.ratelimit import check_rate_limit
|
|
key = "ip_test_sub_prag_unic"
|
|
for _ in range(3):
|
|
assert check_rate_limit(key, max_hits=5, window_s=3600) is True
|
|
|
|
|
|
def test_rate_limit_blocheaza_peste_prag():
|
|
"""check_rate_limit blocheaza la a (max_hits+1)-a cerere."""
|
|
from app.web.ratelimit import check_rate_limit
|
|
key = "ip_test_bloc_unic"
|
|
for _ in range(3):
|
|
check_rate_limit(key, max_hits=3, window_s=3600)
|
|
# A 4-a cerere -> False
|
|
assert check_rate_limit(key, max_hits=3, window_s=3600) is False
|
|
|
|
|
|
def test_rate_limit_fereastra_glisanta():
|
|
"""Dupa expirarea ferestrei, limiterul permite din nou."""
|
|
import time
|
|
from app.web.ratelimit import check_rate_limit
|
|
key = "ip_test_fereastra_unic"
|
|
# Umple fereastra cu window_s=0 (toate timestamp-urile expira imediat)
|
|
check_rate_limit(key, max_hits=1, window_s=0)
|
|
check_rate_limit(key, max_hits=1, window_s=0)
|
|
# Cu window_s=0 toate sunt expirate -> urmatoarea e permisa
|
|
assert check_rate_limit(key, max_hits=1, window_s=0) is True
|