""" SQLite Database Setup for Telegram Bot This module handles database connection, initialization, and schema creation. Uses aiosqlite for async SQLite operations. """ import aiosqlite import logging from pathlib import Path from datetime import datetime, timedelta from typing import Optional logger = logging.getLogger(__name__) # Database file location DB_DIR = Path(__file__).parent.parent.parent / "data" DB_PATH = DB_DIR / "telegram_bot.db" async def get_db_connection() -> aiosqlite.Connection: """ Get a database connection. Returns: aiosqlite.Connection: Database connection """ conn = await aiosqlite.connect(DB_PATH) conn.row_factory = aiosqlite.Row # Enable column access by name return conn async def init_database() -> None: """ Initialize the database and create all tables. Safe to call multiple times - only creates tables if they don't exist. """ try: # Ensure data directory exists DB_DIR.mkdir(parents=True, exist_ok=True) logger.info(f"Database directory: {DB_DIR}") async with aiosqlite.connect(DB_PATH) as db: # Enable foreign keys await db.execute("PRAGMA foreign_keys = ON") # Create telegram_users table await db.execute(""" CREATE TABLE IF NOT EXISTS telegram_users ( telegram_user_id INTEGER PRIMARY KEY, username TEXT, first_name TEXT NOT NULL, last_name TEXT, oracle_username TEXT, jwt_token TEXT, jwt_refresh_token TEXT, token_expires_at TIMESTAMP, linked_at TIMESTAMP, last_active_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT 1 ) """) # Create telegram_auth_codes table await db.execute(""" CREATE TABLE IF NOT EXISTS telegram_auth_codes ( code TEXT PRIMARY KEY, telegram_user_id INTEGER, oracle_username TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL, used BOOLEAN DEFAULT 0, used_at TIMESTAMP, FOREIGN KEY (telegram_user_id) REFERENCES telegram_users(telegram_user_id) ) """) # Create telegram_sessions table await db.execute(""" CREATE TABLE IF NOT EXISTS telegram_sessions ( session_id TEXT PRIMARY KEY, telegram_user_id INTEGER NOT NULL, conversation_state TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL, FOREIGN KEY (telegram_user_id) REFERENCES telegram_users(telegram_user_id) ) """) # Create indexes for better query performance await db.execute(""" CREATE INDEX IF NOT EXISTS idx_auth_codes_telegram_user ON telegram_auth_codes(telegram_user_id) """) await db.execute(""" CREATE INDEX IF NOT EXISTS idx_auth_codes_expires ON telegram_auth_codes(expires_at) """) await db.execute(""" CREATE INDEX IF NOT EXISTS idx_sessions_telegram_user ON telegram_sessions(telegram_user_id) """) await db.execute(""" CREATE INDEX IF NOT EXISTS idx_sessions_expires ON telegram_sessions(expires_at) """) await db.commit() logger.info("Database initialized successfully") # Log table info cursor = await db.execute(""" SELECT name FROM sqlite_master WHERE type='table' ORDER BY name """) tables = await cursor.fetchall() logger.info(f"Existing tables: {[t[0] for t in tables]}") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise async def cleanup_expired_codes() -> int: """ Delete expired authentication codes from the database. This should be called periodically (e.g., every hour). Returns: int: Number of expired codes deleted """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" DELETE FROM telegram_auth_codes WHERE expires_at < ? """, (datetime.now(),)) await db.commit() deleted = cursor.rowcount if deleted > 0: logger.info(f"Cleaned up {deleted} expired auth codes") return deleted except Exception as e: logger.error(f"Failed to cleanup expired codes: {e}") return 0 async def cleanup_expired_sessions() -> int: """ Delete expired sessions from the database. This should be called periodically (e.g., daily). Returns: int: Number of expired sessions deleted """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" DELETE FROM telegram_sessions WHERE expires_at < ? """, (datetime.now(),)) await db.commit() deleted = cursor.rowcount if deleted > 0: logger.info(f"Cleaned up {deleted} expired sessions") return deleted except Exception as e: logger.error(f"Failed to cleanup expired sessions: {e}") return 0 async def get_database_stats() -> dict: """ Get database statistics for monitoring. Returns: dict: Database statistics """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row stats = {} # Count users cursor = await db.execute("SELECT COUNT(*) FROM telegram_users") stats['total_users'] = (await cursor.fetchone())[0] cursor = await db.execute( "SELECT COUNT(*) FROM telegram_users WHERE is_active = 1" ) stats['active_users'] = (await cursor.fetchone())[0] # Count pending codes cursor = await db.execute(""" SELECT COUNT(*) FROM telegram_auth_codes WHERE used = 0 AND expires_at > ? """, (datetime.now(),)) stats['pending_codes'] = (await cursor.fetchone())[0] # Count active sessions cursor = await db.execute(""" SELECT COUNT(*) FROM telegram_sessions WHERE expires_at > ? """, (datetime.now(),)) stats['active_sessions'] = (await cursor.fetchone())[0] # Database file size if DB_PATH.exists(): stats['db_size_mb'] = DB_PATH.stat().st_size / (1024 * 1024) else: stats['db_size_mb'] = 0 return stats except Exception as e: logger.error(f"Failed to get database stats: {e}") return {} # Export main functions __all__ = [ 'get_db_connection', 'init_database', 'cleanup_expired_codes', 'cleanup_expired_sessions', 'get_database_stats', 'DB_PATH', ]