"""Backup Codes Service - fallback 2FA când emailul nu sosește.""" import hashlib import logging import secrets import string from datetime import datetime, timedelta from typing import Optional logger = logging.getLogger(__name__) BACKUP_CODE_COUNT = 10 BACKUP_CODE_EXPIRE_DAYS = 365 # 1 an # Alphabet for backup codes: uppercase letters + digits (no ambiguous chars like 0/O, 1/I/L) _ALPHABET = "ABCDEFGHJKMNPQRSTUVWXYZ23456789" def _hash_code(code: str) -> str: return hashlib.sha256(code.upper().strip().encode("utf-8")).hexdigest() async def generate_backup_codes(username: str, server_id: Optional[str]) -> list[str]: """Generează 10 coduri de backup, stochează hash-uri în DB, returnează codurile plain.""" from shared.database.app_db import get_db username_upper = username.upper() expires_at = (datetime.now() + timedelta(days=BACKUP_CODE_EXPIRE_DAYS)).isoformat() now = datetime.now().isoformat() db = await get_db() try: # Șterge codurile vechi ale userului await db.execute( "DELETE FROM backup_codes WHERE UPPER(username) = ? AND (server_id = ? OR (server_id IS NULL AND ? IS NULL))", (username_upper, server_id, server_id) ) # Generează 10 coduri noi (8 chars uppercase alfanumeric) codes: list[str] = [] for _ in range(BACKUP_CODE_COUNT): code = "".join(secrets.choice(_ALPHABET) for _ in range(8)) codes.append(code) code_hash = _hash_code(code) await db.execute( """INSERT INTO backup_codes (username, server_id, code_hash, used, created_at) VALUES (?, ?, ?, 0, ?)""", (username_upper, server_id, code_hash, now) ) await db.commit() finally: await db.close() logger.info(f"[BACKUP_CODES] Generated {len(codes)} codes for '{username}' (server={server_id})") return codes async def verify_backup_code(username: str, server_id: Optional[str], code: str) -> bool: """Verifică și marchează codul ca folosit. False dacă invalid/deja folosit.""" from shared.database.app_db import get_db username_upper = username.upper() code_hash = _hash_code(code) db = await get_db() try: cursor = await db.execute( """SELECT id FROM backup_codes WHERE UPPER(username) = ? AND code_hash = ? AND used = 0 AND (server_id = ? OR (server_id IS NULL AND ? IS NULL))""", (username_upper, code_hash, server_id, server_id) ) row = await cursor.fetchone() if not row: logger.warning(f"[BACKUP_CODES] Invalid or used code for '{username}'") return False # Marchează ca folosit await db.execute( "UPDATE backup_codes SET used = 1, used_at = ? WHERE id = ?", (datetime.now().isoformat(), row["id"]) ) await db.commit() finally: await db.close() logger.info(f"[BACKUP_CODES] Code used for '{username}' (server={server_id})") return True async def has_backup_codes(username: str, server_id: Optional[str]) -> bool: """Verifică dacă userul are coduri de backup active (nefolosite).""" count = await get_remaining_count(username, server_id) return count > 0 async def get_remaining_count(username: str, server_id: Optional[str]) -> int: """Numără codurile nefolosite.""" from shared.database.app_db import get_db username_upper = username.upper() db = await get_db() try: cursor = await db.execute( """SELECT COUNT(*) as cnt FROM backup_codes WHERE UPPER(username) = ? AND used = 0 AND (server_id = ? OR (server_id IS NULL AND ? IS NULL))""", (username_upper, server_id, server_id) ) row = await cursor.fetchone() return row["cnt"] if row else 0 finally: await db.close()