""" Database Operations for Telegram Bot This module provides CRUD operations for: - telegram_users: Telegram user management and Oracle account linking - telegram_auth_codes: Authentication code management - telegram_sessions: Conversation session management """ import logging import uuid from datetime import datetime, timedelta from typing import Optional, Dict, Any, List import aiosqlite from .database import DB_PATH logger = logging.getLogger(__name__) # ============================================================================ # TELEGRAM USERS OPERATIONS # ============================================================================ async def create_or_update_user( telegram_user_id: int, username: Optional[str], first_name: str, last_name: Optional[str] ) -> bool: """ Create or update a Telegram user record. Args: telegram_user_id: Telegram user ID username: Telegram username (without @) first_name: User's first name last_name: User's last name Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" INSERT INTO telegram_users ( telegram_user_id, username, first_name, last_name, last_active_at ) VALUES (?, ?, ?, ?, ?) ON CONFLICT(telegram_user_id) DO UPDATE SET username = excluded.username, first_name = excluded.first_name, last_name = excluded.last_name, last_active_at = excluded.last_active_at """, (telegram_user_id, username, first_name, last_name, datetime.now())) await db.commit() logger.info(f"User {telegram_user_id} created/updated") return True except Exception as e: logger.error(f"Failed to create/update user {telegram_user_id}: {e}") return False async def get_user(telegram_user_id: int) -> Optional[Dict[str, Any]]: """ Get user information by Telegram user ID. Args: telegram_user_id: Telegram user ID Returns: Optional[Dict]: User data or None if not found """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_users WHERE telegram_user_id = ? """, (telegram_user_id,)) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get user {telegram_user_id}: {e}") return None async def link_user_to_oracle( telegram_user_id: int, oracle_username: str, jwt_token: str, jwt_refresh_token: str, token_expires_at: datetime, server_id: Optional[str] = None ) -> bool: """ Link a Telegram user to an Oracle account and save JWT tokens. Args: telegram_user_id: Telegram user ID oracle_username: Oracle username jwt_token: JWT access token jwt_refresh_token: JWT refresh token token_expires_at: Token expiration timestamp server_id: Oracle server ID for multi-server mode Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_users SET oracle_username = ?, oracle_server_id = ?, jwt_token = ?, jwt_refresh_token = ?, token_expires_at = ?, linked_at = ?, is_active = 1 WHERE telegram_user_id = ? """, ( oracle_username, server_id, jwt_token, jwt_refresh_token, token_expires_at, datetime.now(), telegram_user_id )) await db.commit() logger.info(f"User {telegram_user_id} linked to Oracle user {oracle_username} (server_id={server_id})") return True except Exception as e: logger.error(f"Failed to link user {telegram_user_id}: {e}") return False async def update_user_tokens( telegram_user_id: int, jwt_token: str, jwt_refresh_token: str, token_expires_at: datetime ) -> bool: """ Update JWT tokens for a user. Args: telegram_user_id: Telegram user ID jwt_token: New JWT access token jwt_refresh_token: New JWT refresh token token_expires_at: New token expiration timestamp Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_users SET jwt_token = ?, jwt_refresh_token = ?, token_expires_at = ? WHERE telegram_user_id = ? """, (jwt_token, jwt_refresh_token, token_expires_at, telegram_user_id)) await db.commit() logger.info(f"Tokens updated for user {telegram_user_id}") return True except Exception as e: logger.error(f"Failed to update tokens for user {telegram_user_id}: {e}") return False async def update_user_last_active(telegram_user_id: int) -> bool: """ Update the last active timestamp for a user. Args: telegram_user_id: Telegram user ID Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_users SET last_active_at = ? WHERE telegram_user_id = ? """, (datetime.now(), telegram_user_id)) await db.commit() return True except Exception as e: logger.error(f"Failed to update last active for user {telegram_user_id}: {e}") return False async def is_user_linked(telegram_user_id: int) -> bool: """ Check if a user is linked to an Oracle account. Args: telegram_user_id: Telegram user ID Returns: bool: True if user is linked """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT oracle_username FROM telegram_users WHERE telegram_user_id = ? AND oracle_username IS NOT NULL """, (telegram_user_id,)) row = await cursor.fetchone() return row is not None except Exception as e: logger.error(f"Failed to check if user {telegram_user_id} is linked: {e}") return False async def is_user_authenticated(telegram_user_id: int) -> bool: """ Check if a user is authenticated (linked and has valid token). Args: telegram_user_id: Telegram user ID Returns: bool: True if user is authenticated """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT oracle_username, jwt_token, token_expires_at FROM telegram_users WHERE telegram_user_id = ? AND oracle_username IS NOT NULL AND jwt_token IS NOT NULL """, (telegram_user_id,)) row = await cursor.fetchone() if not row: return False # Check if token is expired (with some buffer) if row[2]: # token_expires_at expires_at = datetime.fromisoformat(row[2]) # Token should have at least 5 minutes remaining if expires_at < datetime.now() + timedelta(minutes=5): return False return True except Exception as e: logger.error(f"Failed to check if user {telegram_user_id} is authenticated: {e}") return False # ============================================================================ # AUTHENTICATION CODES OPERATIONS # ============================================================================ async def create_auth_code( code: str, telegram_user_id: int, oracle_username: str, expires_in_minutes: int = 5, server_id: Optional[str] = None ) -> bool: """ Create a new authentication code for linking. Args: code: 8-character authentication code telegram_user_id: Telegram user ID oracle_username: Oracle username to link expires_in_minutes: Code expiration time in minutes (default: 5) server_id: Oracle server ID (for multi-server mode) Returns: bool: True if successful """ try: expires_at = datetime.now() + timedelta(minutes=expires_in_minutes) async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" INSERT INTO telegram_auth_codes ( code, telegram_user_id, oracle_username, expires_at, server_id ) VALUES (?, ?, ?, ?, ?) """, (code, telegram_user_id, oracle_username, expires_at, server_id)) await db.commit() logger.info(f"Auth code created for user {telegram_user_id} (server_id={server_id})") return True except Exception as e: logger.error(f"Failed to create auth code: {e}") return False async def get_auth_code(code: str) -> Optional[Dict[str, Any]]: """ Get authentication code information. Args: code: 8-character authentication code Returns: Optional[Dict]: Code data or None if not found """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_auth_codes WHERE code = ? """, (code,)) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get auth code: {e}") return None async def verify_and_use_auth_code(code: str) -> Optional[Dict[str, Any]]: """ Verify an authentication code and mark it as used. Args: code: 8-character authentication code Returns: Optional[Dict]: Code data if valid, None if invalid/expired """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row # Check if code exists, is not used, and not expired cursor = await db.execute(""" SELECT * FROM telegram_auth_codes WHERE code = ? AND used = 0 AND expires_at > ? """, (code, datetime.now())) row = await cursor.fetchone() if not row: logger.warning(f"Invalid or expired code: {code}") return None # Mark code as used await db.execute(""" UPDATE telegram_auth_codes SET used = 1, used_at = ? WHERE code = ? """, (datetime.now(), code)) await db.commit() logger.info(f"Auth code {code} verified and used") return dict(row) except Exception as e: logger.error(f"Failed to verify auth code: {e}") return None async def get_pending_codes_for_user(telegram_user_id: int) -> List[Dict[str, Any]]: """ Get all pending (unused, non-expired) codes for a user. Args: telegram_user_id: Telegram user ID Returns: List[Dict]: List of pending codes """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_auth_codes WHERE telegram_user_id = ? AND used = 0 AND expires_at > ? ORDER BY created_at DESC """, (telegram_user_id, datetime.now())) rows = await cursor.fetchall() return [dict(row) for row in rows] except Exception as e: logger.error(f"Failed to get pending codes for user {telegram_user_id}: {e}") return [] # ============================================================================ # EMAIL AUTHENTICATION CODES OPERATIONS # ============================================================================ async def get_pending_email_code( telegram_user_id: int ) -> Optional[Dict]: """ Get pending (non-expired, non-used) email code for user Returns: Code data dict or None if no pending code """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT code, email, oracle_username, expires_at, failed_attempts FROM email_auth_codes WHERE telegram_user_id = ? AND used = 0 AND expires_at > ? ORDER BY created_at DESC LIMIT 1 """, (telegram_user_id, datetime.now())) row = await cursor.fetchone() if row: return { 'code': row[0], 'email': row[1], 'oracle_username': row[2], 'expires_at': datetime.fromisoformat(row[3]), 'failed_attempts': row[4] } return None except Exception as e: logger.error(f"Failed to get pending email code: {e}") return None async def create_email_auth_code( code: str, email: str, username: str, telegram_user_id: int, expiry_minutes: int = 5 ) -> bool: """ Create new email authentication code NOTE: Caller should check for existing pending codes first """ expires_at = datetime.now() + timedelta(minutes=expiry_minutes) try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" INSERT INTO email_auth_codes (code, email, oracle_username, telegram_user_id, expires_at) VALUES (?, ?, ?, ?, ?) """, (code, email, username, telegram_user_id, expires_at)) await db.commit() logger.info( f"Email auth code created for user {telegram_user_id}, " f"expires at {expires_at.isoformat()}" ) return True except Exception as e: logger.error(f"Error creating email auth code: {e}", exc_info=True) return False async def get_email_auth_code(code: str) -> Optional[Dict]: """Get email auth code details""" try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT code, email, oracle_username, telegram_user_id, created_at, expires_at, used, used_at, failed_attempts FROM email_auth_codes WHERE code = ? """, (code,)) row = await cursor.fetchone() if not row: return None return { 'code': row[0], 'email': row[1], 'oracle_username': row[2], 'telegram_user_id': row[3], 'created_at': datetime.fromisoformat(row[4]), 'expires_at': datetime.fromisoformat(row[5]), 'used': bool(row[6]), 'used_at': datetime.fromisoformat(row[7]) if row[7] else None, 'failed_attempts': row[8] } except Exception as e: logger.error(f"Failed to get email auth code: {e}") return None async def increment_failed_attempts(code: str) -> bool: """Increment failed validation attempts for code""" try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" UPDATE email_auth_codes SET failed_attempts = failed_attempts + 1 WHERE code = ? """, (code,)) await db.commit() return True except Exception as e: logger.error(f"Error incrementing failed attempts: {e}") return False async def mark_email_code_used(code: str) -> bool: """Mark email code as used""" try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" UPDATE email_auth_codes SET used = 1, used_at = ? WHERE code = ? """, (datetime.now(), code)) await db.commit() logger.info(f"Email auth code marked as used: {code}") return True except Exception as e: logger.error(f"Error marking email code as used: {e}") return False async def delete_user_email_codes(telegram_user_id: int) -> int: """Delete all email codes for user (cleanup)""" try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" DELETE FROM email_auth_codes WHERE telegram_user_id = ? """, (telegram_user_id,)) await db.commit() deleted = cursor.rowcount logger.info(f"Deleted {deleted} email codes for user {telegram_user_id}") return deleted except Exception as e: logger.error(f"Error deleting user email codes: {e}") return 0 # ============================================================================ # SESSION OPERATIONS # ============================================================================ async def create_session( telegram_user_id: int, conversation_state: Optional[str] = None, expires_in_hours: int = 24 ) -> Optional[str]: """ Create a new conversation session. Args: telegram_user_id: Telegram user ID conversation_state: JSON string of conversation state expires_in_hours: Session expiration time in hours (default: 24) Returns: Optional[str]: Session ID if successful, None otherwise """ try: session_id = str(uuid.uuid4()) expires_at = datetime.now() + timedelta(hours=expires_in_hours) async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" INSERT INTO telegram_sessions ( session_id, telegram_user_id, conversation_state, expires_at ) VALUES (?, ?, ?, ?) """, (session_id, telegram_user_id, conversation_state, expires_at)) await db.commit() logger.info(f"Session {session_id} created for user {telegram_user_id}") return session_id except Exception as e: logger.error(f"Failed to create session: {e}") return None async def get_session(session_id: str) -> Optional[Dict[str, Any]]: """ Get session information. Args: session_id: Session UUID Returns: Optional[Dict]: Session data or None if not found/expired """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_sessions WHERE session_id = ? AND expires_at > ? """, (session_id, datetime.now())) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get session {session_id}: {e}") return None async def get_user_active_session(telegram_user_id: int) -> Optional[Dict[str, Any]]: """ Get the most recent active session for a user. Args: telegram_user_id: Telegram user ID Returns: Optional[Dict]: Session data or None if no active session """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_sessions WHERE telegram_user_id = ? AND expires_at > ? ORDER BY updated_at DESC LIMIT 1 """, (telegram_user_id, datetime.now())) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get active session for user {telegram_user_id}: {e}") return None async def update_session_state( session_id: str, conversation_state: str ) -> bool: """ Update the conversation state for a session. Args: session_id: Session UUID conversation_state: JSON string of conversation state Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_sessions SET conversation_state = ?, updated_at = ? WHERE session_id = ? """, (conversation_state, datetime.now(), session_id)) await db.commit() logger.info(f"Session {session_id} state updated") return True except Exception as e: logger.error(f"Failed to update session {session_id}: {e}") return False async def delete_session(session_id: str) -> bool: """ Delete a session. Args: session_id: Session UUID Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row await db.execute(""" DELETE FROM telegram_sessions WHERE session_id = ? """, (session_id,)) await db.commit() logger.info(f"Session {session_id} deleted") return True except Exception as e: logger.error(f"Failed to delete session {session_id}: {e}") return False async def delete_user_sessions(telegram_user_id: int) -> bool: """ Delete all sessions for a user. Args: telegram_user_id: Telegram user ID Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA busy_timeout=5000") db.row_factory = aiosqlite.Row cursor = await db.execute(""" DELETE FROM telegram_sessions WHERE telegram_user_id = ? """, (telegram_user_id,)) await db.commit() deleted = cursor.rowcount logger.info(f"Deleted {deleted} sessions for user {telegram_user_id}") return True except Exception as e: logger.error(f"Failed to delete sessions for user {telegram_user_id}: {e}") return False # Export all functions __all__ = [ # User operations 'create_or_update_user', 'get_user', 'link_user_to_oracle', 'update_user_tokens', 'update_user_last_active', 'is_user_linked', 'is_user_authenticated', # Auth code operations 'create_auth_code', 'get_auth_code', 'verify_and_use_auth_code', 'get_pending_codes_for_user', # Email auth code operations 'get_pending_email_code', 'create_email_auth_code', 'get_email_auth_code', 'increment_failed_attempts', 'mark_email_code_used', 'delete_user_email_codes', # Session operations 'create_session', 'get_session', 'get_user_active_session', 'update_session_state', 'delete_session', 'delete_user_sessions', ]