Files
roa2web-service-auto/shared/auth/backup_codes_service.py
2026-02-24 17:25:00 +00:00

115 lines
3.9 KiB
Python

"""Backup Codes Service - fallback 2FA când emailul nu sosește."""
import hashlib
import logging
import secrets
import string
from datetime import datetime, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
BACKUP_CODE_COUNT = 10
BACKUP_CODE_EXPIRE_DAYS = 365 # 1 an
# Alphabet for backup codes: uppercase letters + digits (no ambiguous chars like 0/O, 1/I/L)
_ALPHABET = "ABCDEFGHJKMNPQRSTUVWXYZ23456789"
def _hash_code(code: str) -> str:
return hashlib.sha256(code.upper().strip().encode("utf-8")).hexdigest()
async def generate_backup_codes(username: str, server_id: Optional[str]) -> list[str]:
"""Generează 10 coduri de backup, stochează hash-uri în DB, returnează codurile plain."""
from shared.database.app_db import get_db
username_upper = username.upper()
expires_at = (datetime.now() + timedelta(days=BACKUP_CODE_EXPIRE_DAYS)).isoformat()
now = datetime.now().isoformat()
db = await get_db()
try:
# Șterge codurile vechi ale userului
await db.execute(
"DELETE FROM backup_codes WHERE UPPER(username) = ? AND (server_id = ? OR (server_id IS NULL AND ? IS NULL))",
(username_upper, server_id, server_id)
)
# Generează 10 coduri noi (8 chars uppercase alfanumeric)
codes: list[str] = []
for _ in range(BACKUP_CODE_COUNT):
code = "".join(secrets.choice(_ALPHABET) for _ in range(8))
codes.append(code)
code_hash = _hash_code(code)
await db.execute(
"""INSERT INTO backup_codes (username, server_id, code_hash, used, created_at)
VALUES (?, ?, ?, 0, ?)""",
(username_upper, server_id, code_hash, now)
)
await db.commit()
finally:
await db.close()
logger.info(f"[BACKUP_CODES] Generated {len(codes)} codes for '{username}' (server={server_id})")
return codes
async def verify_backup_code(username: str, server_id: Optional[str], code: str) -> bool:
"""Verifică și marchează codul ca folosit. False dacă invalid/deja folosit."""
from shared.database.app_db import get_db
username_upper = username.upper()
code_hash = _hash_code(code)
db = await get_db()
try:
cursor = await db.execute(
"""SELECT id FROM backup_codes
WHERE UPPER(username) = ? AND code_hash = ? AND used = 0
AND (server_id = ? OR (server_id IS NULL AND ? IS NULL))""",
(username_upper, code_hash, server_id, server_id)
)
row = await cursor.fetchone()
if not row:
logger.warning(f"[BACKUP_CODES] Invalid or used code for '{username}'")
return False
# Marchează ca folosit
await db.execute(
"UPDATE backup_codes SET used = 1, used_at = ? WHERE id = ?",
(datetime.now().isoformat(), row["id"])
)
await db.commit()
finally:
await db.close()
logger.info(f"[BACKUP_CODES] Code used for '{username}' (server={server_id})")
return True
async def has_backup_codes(username: str, server_id: Optional[str]) -> bool:
"""Verifică dacă userul are coduri de backup active (nefolosite)."""
count = await get_remaining_count(username, server_id)
return count > 0
async def get_remaining_count(username: str, server_id: Optional[str]) -> int:
"""Numără codurile nefolosite."""
from shared.database.app_db import get_db
username_upper = username.upper()
db = await get_db()
try:
cursor = await db.execute(
"""SELECT COUNT(*) as cnt FROM backup_codes
WHERE UPPER(username) = ? AND used = 0
AND (server_id = ? OR (server_id IS NULL AND ? IS NULL))""",
(username_upper, server_id, server_id)
)
row = await cursor.fetchone()
return row["cnt"] if row else 0
finally:
await db.close()