Files
roa2web-service-auto/shared/database/oracle_pool.py
Marius Mutu 4a886f0b64 feat(telegram): Unify Trezorerie button (Casa + Banca combined)
- Replace separate [Trezorerie Casa] and [Trezorerie Banca] buttons
  with single unified [Trezorerie] button in main menu
- Add format_treasury_combined_response() formatter showing:
  - Grand total (Sold Trezorerie)
  - Casa section with total + all accounts
  - Banca section with total + all accounts
- Compact menu layout: Row 2 [Sold Companie][Trezorerie],
  Row 3 [Sold Clienti][Sold Furnizori], Row 4 [Evolutie Incasari]
- Use Romanian number format (period as thousands separator)

Also includes:
- Oracle pool: Support both SERVICE_NAME and SID connections
  (ORACLE_SERVICE_NAME takes priority over ORACLE_SID)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 19:15:05 +02:00

136 lines
5.3 KiB
Python

"""
Oracle Database Connection Pool - Shared între toate aplicațiile ROA2WEB
Folosește oracledb cu connection pooling pentru performance optimă
"""
import oracledb
import os
from contextlib import asynccontextmanager
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class OraclePool:
"""
Singleton class pentru Oracle connection pool
Partajat între toate microservicele ROA2WEB
"""
_instance: Optional['OraclePool'] = None
_pool: Optional[oracledb.ConnectionPool] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(OraclePool, cls).__new__(cls)
return cls._instance
async def initialize(self, **config):
"""Inițializează pool-ul de conexiuni"""
if self._pool is None:
# Check if we have DSN or individual parameters
dsn = config.get('dsn', os.getenv('ORACLE_DSN'))
if dsn:
# Use DSN connection
self._pool = oracledb.create_pool(
user=config.get('user', os.getenv('ORACLE_USER')),
password=config.get('password', os.getenv('ORACLE_PASSWORD')),
dsn=dsn,
min=config.get('min_connections', 2),
max=config.get('max_connections', 10),
increment=config.get('increment', 1),
getmode=oracledb.POOL_GETMODE_WAIT
)
else:
# Use individual parameters (host, port, service_name or sid)
# Prefer SERVICE_NAME over SID (more modern Oracle approach)
service_name = config.get('service_name', os.getenv('ORACLE_SERVICE_NAME'))
sid = config.get('sid', os.getenv('ORACLE_SID'))
pool_params = {
'user': config.get('user', os.getenv('ORACLE_USER')),
'password': config.get('password', os.getenv('ORACLE_PASSWORD')),
'host': config.get('host', os.getenv('ORACLE_HOST', 'localhost')),
'port': config.get('port', int(os.getenv('ORACLE_PORT', '1526'))),
'min': config.get('min_connections', 2),
'max': config.get('max_connections', 10),
'increment': config.get('increment', 1),
'getmode': oracledb.POOL_GETMODE_WAIT
}
# Use service_name if available, otherwise fall back to sid
if service_name:
pool_params['service_name'] = service_name
logger.info(f"Using SERVICE_NAME: {service_name}")
elif sid:
pool_params['sid'] = sid
logger.info(f"Using SID: {sid}")
else:
# Default fallback
pool_params['service_name'] = 'ROA'
logger.info("Using default SERVICE_NAME: ROA")
self._pool = oracledb.create_pool(**pool_params)
logger.info(f"Oracle pool created with {self._pool.opened} connections")
@asynccontextmanager
async def get_connection(self):
"""Context manager pentru obținerea unei conexiuni din pool"""
if self._pool is None:
raise RuntimeError("Pool not initialized. Call initialize() first.")
connection = None
try:
connection = self._pool.acquire()
logger.debug("Connection acquired from pool")
yield connection
finally:
if connection is not None:
connection.close()
logger.debug("Connection returned to pool")
async def execute_query(self, query: str, parameters=None):
"""
Execute a SQL query and return all results
Based on official Oracle python-oracledb patterns
"""
if self._pool is None:
raise RuntimeError("Pool not initialized. Call initialize() first.")
connection = None
try:
connection = self._pool.acquire()
logger.debug(f"Executing query: {query[:100]}...")
with connection.cursor() as cursor:
if parameters:
cursor.execute(query, parameters)
else:
cursor.execute(query)
# Check if this is a SELECT statement
if query.strip().upper().startswith('SELECT') or query.strip().upper().startswith('WITH'):
return cursor.fetchall()
else:
# For DML statements, return affected row count
connection.commit()
return cursor.rowcount
except Exception as e:
if connection:
connection.rollback()
logger.error(f"Query execution failed: {str(e)}")
raise
finally:
if connection is not None:
connection.close()
logger.debug("Connection returned to pool")
async def close_pool(self):
"""Închide pool-ul de conexiuni"""
if self._pool is not None:
self._pool.close()
self._pool = None
logger.info("Oracle pool closed")
# Instance globală pentru folosire în toate aplicațiile
oracle_pool = OraclePool()