Files
roa2web-service-auto/shared/auth/trusted_device_service.py
2026-02-24 17:25:00 +00:00

196 lines
6.3 KiB
Python

"""
Trusted Device Service pentru Web 2FA — async SQLite version
Permite utilizatorilor să „țină minte" un dispozitiv timp de 30 de zile.
Dacă tokenul de dispozitiv de încredere este valid, pasul OTP este sărit.
Securitate critică:
- Tokenul brut (64 hex chars) este returnat clientului și stocat în localStorage
- Pe server se stochează DOAR sha256(token), niciodată tokenul brut
- Skip-uiește DOAR OTP-ul, nu verificarea Oracle (user+parolă rămân obligatorii)
- Expiră automat după TRUSTED_DEVICE_EXPIRE_DAYS zile
Storage: SQLite (shared/database/app_db.py) — tabelul trusted_devices.
"""
import hashlib
import logging
import os
import secrets
from datetime import datetime, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
# ============================================================================
# CONSTANTE
# ============================================================================
TRUSTED_DEVICE_EXPIRE_DAYS = int(os.environ.get("TRUSTED_DEVICE_EXPIRE_DAYS", "30"))
def _hash_token(raw_token: str) -> str:
"""Calculează sha256 al tokenului brut."""
return hashlib.sha256(raw_token.encode("utf-8")).hexdigest()
# ============================================================================
# API PUBLICĂ (async)
# ============================================================================
async def create_trusted_device_token(username: str, server_id: Optional[str]) -> str:
"""
Generează un token de dispozitiv de încredere și îl stochează (ca hash).
Args:
username: Username Oracle (uppercase)
server_id: ID-ul serverului Oracle (pentru multi-server mode)
Returns:
Tokenul brut (64 hex chars) — de returnat clientului pentru localStorage
"""
from shared.database.app_db import get_db
raw_token = secrets.token_hex(64)
token_hash = _hash_token(raw_token)
now = datetime.now()
expires_at = now + timedelta(days=TRUSTED_DEVICE_EXPIRE_DAYS)
db = await get_db()
try:
# Cleanup expired tokens for this user
await db.execute(
"DELETE FROM trusted_devices WHERE expires_at < ?",
(now.isoformat(),)
)
# Insert new token
await db.execute(
"""INSERT INTO trusted_devices (token_hash, username, server_id, expires_at, created_at)
VALUES (?, ?, ?, ?, ?)""",
(token_hash, username.upper(), server_id, expires_at.isoformat(), now.isoformat())
)
await db.commit()
finally:
await db.close()
logger.info(
f"[TRUSTED_DEVICE] Token created for '{username}' "
f"(server={server_id}, expires in {TRUSTED_DEVICE_EXPIRE_DAYS}d)"
)
return raw_token
async def verify_trusted_device_token(
raw_token: str,
username: str,
server_id: Optional[str]
) -> bool:
"""
Verifică dacă tokenul de dispozitiv de încredere este valid.
Verifică: hash există, username corespunde, nu a expirat.
Fail silently — returnează False fără a ridica excepție.
Args:
raw_token: Tokenul brut din localStorage
username: Username Oracle (uppercase)
server_id: ID-ul serverului Oracle
Returns:
True dacă dispozitivul este de încredere și autentificarea poate sări OTP-ul
"""
from shared.database.app_db import get_db
try:
token_hash = _hash_token(raw_token)
db = await get_db()
try:
cursor = await db.execute(
"SELECT username, server_id, expires_at FROM trusted_devices WHERE token_hash = ?",
(token_hash,)
)
row = await cursor.fetchone()
if not row:
logger.debug(f"[TRUSTED_DEVICE] Token not found for '{username}'")
return False
stored_username = row["username"]
stored_server = row["server_id"]
expires_at_str = row["expires_at"]
# Verifică username (case-insensitive)
if stored_username.upper() != username.upper():
logger.warning(
f"[TRUSTED_DEVICE] Username mismatch: "
f"expected '{username}', got '{stored_username}'"
)
return False
# Cross-server trust — log but allow
if stored_server != server_id:
logger.info(
f"[TRUSTED_DEVICE] Cross-server trust for '{username}': "
f"token from '{stored_server}', logging into '{server_id}' — allowing"
)
# Verifică expirarea
if expires_at_str < datetime.now().isoformat():
logger.info(f"[TRUSTED_DEVICE] Token expired for '{username}'")
await db.execute("DELETE FROM trusted_devices WHERE token_hash = ?", (token_hash,))
await db.commit()
return False
logger.info(f"[TRUSTED_DEVICE] Token valid for '{username}' (server={server_id})")
return True
finally:
await db.close()
except Exception as e:
logger.error(f"[TRUSTED_DEVICE] Unexpected error during verification: {e}")
return False
async def revoke_all_for_user(username: str, server_id: Optional[str] = None) -> int:
"""
Revocă toate token-urile de dispozitiv de încredere ale unui utilizator.
Args:
username: Username Oracle (uppercase)
server_id: Dacă specificat, revocă doar pentru serverul respectiv
Returns:
Numărul de token-uri revocate
"""
from shared.database.app_db import get_db
username_upper = username.upper()
db = await get_db()
try:
if server_id is not None:
cursor = await db.execute(
"DELETE FROM trusted_devices WHERE UPPER(username) = ? AND server_id = ?",
(username_upper, server_id)
)
else:
cursor = await db.execute(
"DELETE FROM trusted_devices WHERE UPPER(username) = ?",
(username_upper,)
)
await db.commit()
deleted = cursor.rowcount
finally:
await db.close()
if deleted:
logger.info(
f"[TRUSTED_DEVICE] Revoked {deleted} tokens for '{username}'"
+ (f" (server={server_id})" if server_id else "")
)
return deleted