""" Database Operations for Telegram Bot This module provides CRUD operations for: - telegram_users: Telegram user management and Oracle account linking - telegram_auth_codes: Authentication code management - telegram_sessions: Conversation session management """ import logging import uuid from datetime import datetime, timedelta from typing import Optional, Dict, Any, List import aiosqlite from .database import DB_PATH logger = logging.getLogger(__name__) # ============================================================================ # TELEGRAM USERS OPERATIONS # ============================================================================ async def create_or_update_user( telegram_user_id: int, username: Optional[str], first_name: str, last_name: Optional[str] ) -> bool: """ Create or update a Telegram user record. Args: telegram_user_id: Telegram user ID username: Telegram username (without @) first_name: User's first name last_name: User's last name Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" INSERT INTO telegram_users ( telegram_user_id, username, first_name, last_name, last_active_at ) VALUES (?, ?, ?, ?, ?) ON CONFLICT(telegram_user_id) DO UPDATE SET username = excluded.username, first_name = excluded.first_name, last_name = excluded.last_name, last_active_at = excluded.last_active_at """, (telegram_user_id, username, first_name, last_name, datetime.now())) await db.commit() logger.info(f"User {telegram_user_id} created/updated") return True except Exception as e: logger.error(f"Failed to create/update user {telegram_user_id}: {e}") return False async def get_user(telegram_user_id: int) -> Optional[Dict[str, Any]]: """ Get user information by Telegram user ID. Args: telegram_user_id: Telegram user ID Returns: Optional[Dict]: User data or None if not found """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_users WHERE telegram_user_id = ? """, (telegram_user_id,)) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get user {telegram_user_id}: {e}") return None async def link_user_to_oracle( telegram_user_id: int, oracle_username: str, jwt_token: str, jwt_refresh_token: str, token_expires_at: datetime ) -> bool: """ Link a Telegram user to an Oracle account and save JWT tokens. Args: telegram_user_id: Telegram user ID oracle_username: Oracle username jwt_token: JWT access token jwt_refresh_token: JWT refresh token token_expires_at: Token expiration timestamp Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_users SET oracle_username = ?, jwt_token = ?, jwt_refresh_token = ?, token_expires_at = ?, linked_at = ?, is_active = 1 WHERE telegram_user_id = ? """, ( oracle_username, jwt_token, jwt_refresh_token, token_expires_at, datetime.now(), telegram_user_id )) await db.commit() logger.info(f"User {telegram_user_id} linked to Oracle user {oracle_username}") return True except Exception as e: logger.error(f"Failed to link user {telegram_user_id}: {e}") return False async def update_user_tokens( telegram_user_id: int, jwt_token: str, jwt_refresh_token: str, token_expires_at: datetime ) -> bool: """ Update JWT tokens for a user. Args: telegram_user_id: Telegram user ID jwt_token: New JWT access token jwt_refresh_token: New JWT refresh token token_expires_at: New token expiration timestamp Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_users SET jwt_token = ?, jwt_refresh_token = ?, token_expires_at = ? WHERE telegram_user_id = ? """, (jwt_token, jwt_refresh_token, token_expires_at, telegram_user_id)) await db.commit() logger.info(f"Tokens updated for user {telegram_user_id}") return True except Exception as e: logger.error(f"Failed to update tokens for user {telegram_user_id}: {e}") return False async def update_user_last_active(telegram_user_id: int) -> bool: """ Update the last active timestamp for a user. Args: telegram_user_id: Telegram user ID Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_users SET last_active_at = ? WHERE telegram_user_id = ? """, (datetime.now(), telegram_user_id)) await db.commit() return True except Exception as e: logger.error(f"Failed to update last active for user {telegram_user_id}: {e}") return False async def is_user_linked(telegram_user_id: int) -> bool: """ Check if a user is linked to an Oracle account. Args: telegram_user_id: Telegram user ID Returns: bool: True if user is linked """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT oracle_username FROM telegram_users WHERE telegram_user_id = ? AND oracle_username IS NOT NULL """, (telegram_user_id,)) row = await cursor.fetchone() return row is not None except Exception as e: logger.error(f"Failed to check if user {telegram_user_id} is linked: {e}") return False # ============================================================================ # AUTHENTICATION CODES OPERATIONS # ============================================================================ async def create_auth_code( code: str, telegram_user_id: int, oracle_username: str, expires_in_minutes: int = 5 ) -> bool: """ Create a new authentication code for linking. Args: code: 8-character authentication code telegram_user_id: Telegram user ID oracle_username: Oracle username to link expires_in_minutes: Code expiration time in minutes (default: 5) Returns: bool: True if successful """ try: expires_at = datetime.now() + timedelta(minutes=expires_in_minutes) async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" INSERT INTO telegram_auth_codes ( code, telegram_user_id, oracle_username, expires_at ) VALUES (?, ?, ?, ?) """, (code, telegram_user_id, oracle_username, expires_at)) await db.commit() logger.info(f"Auth code created for user {telegram_user_id}") return True except Exception as e: logger.error(f"Failed to create auth code: {e}") return False async def get_auth_code(code: str) -> Optional[Dict[str, Any]]: """ Get authentication code information. Args: code: 8-character authentication code Returns: Optional[Dict]: Code data or None if not found """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_auth_codes WHERE code = ? """, (code,)) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get auth code: {e}") return None async def verify_and_use_auth_code(code: str) -> Optional[Dict[str, Any]]: """ Verify an authentication code and mark it as used. Args: code: 8-character authentication code Returns: Optional[Dict]: Code data if valid, None if invalid/expired """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row # Check if code exists, is not used, and not expired cursor = await db.execute(""" SELECT * FROM telegram_auth_codes WHERE code = ? AND used = 0 AND expires_at > ? """, (code, datetime.now())) row = await cursor.fetchone() if not row: logger.warning(f"Invalid or expired code: {code}") return None # Mark code as used await db.execute(""" UPDATE telegram_auth_codes SET used = 1, used_at = ? WHERE code = ? """, (datetime.now(), code)) await db.commit() logger.info(f"Auth code {code} verified and used") return dict(row) except Exception as e: logger.error(f"Failed to verify auth code: {e}") return None async def get_pending_codes_for_user(telegram_user_id: int) -> List[Dict[str, Any]]: """ Get all pending (unused, non-expired) codes for a user. Args: telegram_user_id: Telegram user ID Returns: List[Dict]: List of pending codes """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_auth_codes WHERE telegram_user_id = ? AND used = 0 AND expires_at > ? ORDER BY created_at DESC """, (telegram_user_id, datetime.now())) rows = await cursor.fetchall() return [dict(row) for row in rows] except Exception as e: logger.error(f"Failed to get pending codes for user {telegram_user_id}: {e}") return [] # ============================================================================ # SESSION OPERATIONS # ============================================================================ async def create_session( telegram_user_id: int, conversation_state: Optional[str] = None, expires_in_hours: int = 24 ) -> Optional[str]: """ Create a new conversation session. Args: telegram_user_id: Telegram user ID conversation_state: JSON string of conversation state expires_in_hours: Session expiration time in hours (default: 24) Returns: Optional[str]: Session ID if successful, None otherwise """ try: session_id = str(uuid.uuid4()) expires_at = datetime.now() + timedelta(hours=expires_in_hours) async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" INSERT INTO telegram_sessions ( session_id, telegram_user_id, conversation_state, expires_at ) VALUES (?, ?, ?, ?) """, (session_id, telegram_user_id, conversation_state, expires_at)) await db.commit() logger.info(f"Session {session_id} created for user {telegram_user_id}") return session_id except Exception as e: logger.error(f"Failed to create session: {e}") return None async def get_session(session_id: str) -> Optional[Dict[str, Any]]: """ Get session information. Args: session_id: Session UUID Returns: Optional[Dict]: Session data or None if not found/expired """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_sessions WHERE session_id = ? AND expires_at > ? """, (session_id, datetime.now())) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get session {session_id}: {e}") return None async def get_user_active_session(telegram_user_id: int) -> Optional[Dict[str, Any]]: """ Get the most recent active session for a user. Args: telegram_user_id: Telegram user ID Returns: Optional[Dict]: Session data or None if no active session """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM telegram_sessions WHERE telegram_user_id = ? AND expires_at > ? ORDER BY updated_at DESC LIMIT 1 """, (telegram_user_id, datetime.now())) row = await cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"Failed to get active session for user {telegram_user_id}: {e}") return None async def update_session_state( session_id: str, conversation_state: str ) -> bool: """ Update the conversation state for a session. Args: session_id: Session UUID conversation_state: JSON string of conversation state Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" UPDATE telegram_sessions SET conversation_state = ?, updated_at = ? WHERE session_id = ? """, (conversation_state, datetime.now(), session_id)) await db.commit() logger.info(f"Session {session_id} state updated") return True except Exception as e: logger.error(f"Failed to update session {session_id}: {e}") return False async def delete_session(session_id: str) -> bool: """ Delete a session. Args: session_id: Session UUID Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row await db.execute(""" DELETE FROM telegram_sessions WHERE session_id = ? """, (session_id,)) await db.commit() logger.info(f"Session {session_id} deleted") return True except Exception as e: logger.error(f"Failed to delete session {session_id}: {e}") return False async def delete_user_sessions(telegram_user_id: int) -> bool: """ Delete all sessions for a user. Args: telegram_user_id: Telegram user ID Returns: bool: True if successful """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" DELETE FROM telegram_sessions WHERE telegram_user_id = ? """, (telegram_user_id,)) await db.commit() deleted = cursor.rowcount logger.info(f"Deleted {deleted} sessions for user {telegram_user_id}") return True except Exception as e: logger.error(f"Failed to delete sessions for user {telegram_user_id}: {e}") return False # Export all functions __all__ = [ # User operations 'create_or_update_user', 'get_user', 'link_user_to_oracle', 'update_user_tokens', 'update_user_last_active', 'is_user_linked', # Auth code operations 'create_auth_code', 'get_auth_code', 'verify_and_use_auth_code', 'get_pending_codes_for_user', # Session operations 'create_session', 'get_session', 'get_user_active_session', 'update_session_state', 'delete_session', 'delete_user_sessions', ]