""" Oracle Database Connection Pool - Multi-Server Support for ROA2WEB Uses ORACLE_SERVERS from .env for server configuration. Pool-uri sunt create lazy (la prima conexiune pe fiecare server) pentru optimizare. """ import asyncio import oracledb from contextlib import asynccontextmanager from typing import Optional, Dict, Any import logging logger = logging.getLogger(__name__) class OracleMultiPool: """ Multi-tenant Oracle connection pool manager. Supports: - Multiple Oracle servers with separate pools: {server_id: pool} - Lazy pool creation (created on first connection) - First registered server used when no server_id specified - Graceful shutdown of all pools """ _instance: Optional['OracleMultiPool'] = None _pools: Dict[str, oracledb.ConnectionPool] _pool_configs: Dict[str, Dict[str, Any]] _pool_lock: asyncio.Lock _initialized: bool def __new__(cls): if cls._instance is None: cls._instance = super(OracleMultiPool, cls).__new__(cls) cls._instance._pools = {} cls._instance._pool_configs = {} cls._instance._pool_lock = asyncio.Lock() cls._instance._initialized = False return cls._instance async def initialize(self): """ Initialize pool manager. Call this after registering servers with register_server(). Pools are created lazily on first connection. """ if self._initialized: logger.debug("Pool manager already initialized") return self._initialized = True logger.info("Oracle pool manager initialized") def register_server( self, server_id: str, host: str, port: int, user: str, password: str, sid: Optional[str] = None, service_name: Optional[str] = None, min_connections: int = 2, max_connections: int = 10, session_callback=None, **kwargs ) -> None: """ Register a server configuration for lazy pool creation. Pool will be created on first get_connection(server_id) call. Args: session_callback: Optional callable invoked on each new connection acquired from the pool. Useful for ALTER SESSION SET CURRENT_SCHEMA. Signature: callback(connection, requested_tag) """ self._pool_configs[server_id] = { 'host': host, 'port': port, 'user': user, 'password': password, 'sid': sid, 'service_name': service_name, 'min_connections': min_connections, 'max_connections': max_connections, 'session_callback': session_callback, } logger.info(f"Registered server '{server_id}' ({host}:{port}) for lazy pool creation") async def _get_or_create_pool(self, server_id: str) -> oracledb.ConnectionPool: """ Get existing pool or create new one (lazy loading). Thread-safe: uses asyncio.Lock to prevent duplicate pool creation. """ # Fast path: pool already exists if server_id in self._pools: return self._pools[server_id] # Slow path: need to create pool async with self._pool_lock: # Double-check after acquiring lock if server_id in self._pools: return self._pools[server_id] # Check if server is registered if server_id not in self._pool_configs: raise ValueError(f"Server '{server_id}' not registered. Call register_server() first.") config = self._pool_configs[server_id] logger.info(f"Creating pool for server '{server_id}' (lazy initialization)...") pool_params = { 'user': config['user'], 'password': config['password'], 'host': config['host'], 'port': config['port'], 'min': config['min_connections'], 'max': config['max_connections'], 'increment': 1, 'getmode': oracledb.POOL_GETMODE_WAIT } if config.get('service_name'): pool_params['service_name'] = config['service_name'] elif config.get('sid'): pool_params['sid'] = config['sid'] else: pool_params['service_name'] = 'ROA' if config.get('session_callback'): pool_params['session_callback'] = config['session_callback'] pool = oracledb.create_pool(**pool_params) self._pools[server_id] = pool logger.info(f"Pool created for server '{server_id}' with {pool.opened} connections") return pool def _get_first_server_id(self) -> str: """Get the first registered server ID.""" if not self._pool_configs: raise RuntimeError("No servers registered. Call register_server() first.") return next(iter(self._pool_configs)) @asynccontextmanager async def get_connection(self, server_id: Optional[str] = None): """ Context manager pentru obținerea unei conexiuni din pool. Args: server_id: ID-ul serverului. Dacă None, folosește primul server înregistrat. Usage: # Explicit server async with oracle_pool.get_connection('romfast') as conn: ... # First registered server (when only one server configured) async with oracle_pool.get_connection() as conn: ... """ connection = None pool = None try: if server_id is None: # Use first registered server server_id = self._get_first_server_id() logger.debug(f"No server_id specified, using first registered: '{server_id}'") pool = await self._get_or_create_pool(server_id) connection = pool.acquire() logger.debug(f"Connection acquired from pool (server_id={server_id})") yield connection finally: if connection is not None: connection.close() logger.debug(f"Connection returned to pool (server_id={server_id})") async def execute_query(self, query: str, parameters=None, server_id: Optional[str] = None): """ Execute a SQL query and return all results. Args: query: SQL query string parameters: Query parameters (dict or tuple) server_id: Server ID (optional, uses first server if not specified) """ async with self.get_connection(server_id) as connection: logger.debug(f"Executing query on server '{server_id}': {query[:100]}...") with connection.cursor() as cursor: if parameters: cursor.execute(query, parameters) else: cursor.execute(query) # Check if this is a SELECT statement if query.strip().upper().startswith('SELECT') or query.strip().upper().startswith('WITH'): return cursor.fetchall() else: # For DML statements, return affected row count connection.commit() return cursor.rowcount async def close_pool(self, server_id: Optional[str] = None): """ Close a specific pool or all pools. Args: server_id: Close specific pool. If None, close all pools. """ if server_id is not None: # Close specific pool if server_id in self._pools: self._pools[server_id].close() del self._pools[server_id] logger.info(f"Closed pool for server '{server_id}'") else: # Close all pools (graceful shutdown) for srv_id, pool in list(self._pools.items()): pool.close() logger.info(f"Closed pool for server '{srv_id}'") self._pools.clear() self._initialized = False logger.info("All Oracle pools closed") def get_pool_stats(self, server_id: Optional[str] = None) -> Dict[str, Any]: """ Get statistics for pool(s). Args: server_id: Get stats for specific server. If None, get all stats. Returns: Dict with pool statistics (opened, busy, min, max connections) """ stats = {} if server_id is not None: pool = self._pools.get(server_id) if pool: stats[server_id] = { 'opened': pool.opened, 'busy': pool.busy, 'min': pool.min, 'max': pool.max, } else: for srv_id, pool in self._pools.items(): stats[srv_id] = { 'opened': pool.opened, 'busy': pool.busy, 'min': pool.min, 'max': pool.max, } return stats async def get_pool(self, server_id: Optional[str] = None) -> oracledb.ConnectionPool: """Return the underlying pool for server_id, creating it lazily if needed.""" if server_id is None: server_id = self._get_first_server_id() return await self._get_or_create_pool(server_id) def is_server_registered(self, server_id: str) -> bool: """Check if a server is registered (config exists).""" return server_id in self._pool_configs def is_pool_active(self, server_id: str) -> bool: """Check if a pool is active (created) for a server.""" return server_id in self._pools def get_registered_servers(self) -> list: """Get list of registered server IDs.""" return list(self._pool_configs.keys()) def get_active_pools(self) -> list: """Get list of server IDs with active pools.""" return list(self._pools.keys()) # Backward compatibility alias OraclePool = OracleMultiPool # Global instance oracle_pool = OracleMultiPool()