""" Baseline performance benchmarking Runs at startup to establish baseline Oracle query times Used for calculating "time saved" by cache """ import time import logging from typing import Dict logger = logging.getLogger(__name__) async def run_baseline_benchmarks() -> Dict[str, float]: """ Run baseline benchmarks for Oracle queries (without cache) Measures typical query times to establish performance baselines These are used to calculate time saved when cache hits occur NOTE: This implementation provides a framework. Actual benchmark implementations need access to Oracle services and sample data. Returns: Dictionary mapping cache_type to average query time (ms) """ from .cache_manager import get_cache cache = get_cache() if not cache: logger.warning("Cache not initialized - skipping benchmarks") return {} logger.info("Starting baseline performance benchmarks...") benchmarks = {} try: # Benchmark: Schema lookup logger.info("Benchmarking: schema lookup") schema_times = await _benchmark_schema_lookup() if schema_times: avg_schema = sum(schema_times) / len(schema_times) benchmarks['schema'] = avg_schema await cache.sqlite.set_benchmark('schema', avg_schema, len(schema_times)) logger.info(f" Schema lookup: {avg_schema:.2f}ms (avg of {len(schema_times)} samples)") # Benchmark: Companies list logger.info("Benchmarking: companies list") companies_time = await _benchmark_companies_list() if companies_time: benchmarks['companies'] = companies_time await cache.sqlite.set_benchmark('companies', companies_time, 1) logger.info(f" Companies list: {companies_time:.2f}ms") # Benchmark: Dashboard summary logger.info("Benchmarking: dashboard summary") dashboard_time = await _benchmark_dashboard_summary() if dashboard_time: benchmarks['dashboard_summary'] = dashboard_time await cache.sqlite.set_benchmark('dashboard_summary', dashboard_time, 1) logger.info(f" Dashboard summary: {dashboard_time:.2f}ms") # Benchmark: Dashboard trends logger.info("Benchmarking: dashboard trends") trends_time = await _benchmark_dashboard_trends() if trends_time: benchmarks['dashboard_trends'] = trends_time await cache.sqlite.set_benchmark('dashboard_trends', trends_time, 1) logger.info(f" Dashboard trends: {trends_time:.2f}ms") # Benchmark: Invoices logger.info("Benchmarking: invoices") invoices_time = await _benchmark_invoices() if invoices_time: benchmarks['invoices'] = invoices_time await cache.sqlite.set_benchmark('invoices', invoices_time, 1) logger.info(f" Invoices: {invoices_time:.2f}ms") # Benchmark: Treasury logger.info("Benchmarking: treasury") treasury_time = await _benchmark_treasury() if treasury_time: benchmarks['treasury'] = treasury_time await cache.sqlite.set_benchmark('treasury', treasury_time, 1) logger.info(f" Treasury: {treasury_time:.2f}ms") logger.info(f"Baseline benchmarks completed: {len(benchmarks)} types measured") return benchmarks except Exception as e: logger.error(f"Benchmark error: {e}", exc_info=True) return benchmarks async def _benchmark_schema_lookup() -> list: """ Benchmark schema lookup queries Returns: List of query times (ms) for multiple samples """ try: # Import here to avoid circular dependency import sys import os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) from shared.database.oracle_pool import oracle_pool # Get sample company IDs to test sample_companies = await _get_sample_company_ids(limit=10) if not sample_companies: logger.warning("No sample companies found for schema benchmark") return [] times = [] for company_id in sample_companies: start = time.time() async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: cursor.execute(""" SELECT schema FROM CONTAFIN_ORACLE.v_nom_firme WHERE id_firma = :id """, {'id': company_id}) cursor.fetchone() elapsed_ms = (time.time() - start) * 1000 times.append(elapsed_ms) return times except Exception as e: logger.error(f"Schema benchmark error: {e}") return [] async def _benchmark_companies_list() -> float: """ Benchmark companies list query Returns: Query time (ms) """ try: import sys import os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) from shared.database.oracle_pool import oracle_pool # Get sample username sample_user = await _get_sample_username() if not sample_user: return 0 start = time.time() async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: cursor.execute(""" SELECT nf.id_firma, nf.denumire, nf.cui, nf.schema FROM CONTAFIN_ORACLE.v_nom_firme nf JOIN CONTAFIN_ORACLE.vdef_util_firme uf ON nf.id_firma = uf.id_firma WHERE uf.nume_utilizator = :username ORDER BY nf.denumire """, {'username': sample_user}) cursor.fetchall() elapsed_ms = (time.time() - start) * 1000 return elapsed_ms except Exception as e: logger.error(f"Companies benchmark error: {e}") return 0 async def _benchmark_dashboard_summary() -> float: """ Benchmark dashboard summary query Returns: Query time (ms) """ try: # This requires access to DashboardService # For now, return estimated value logger.warning("Dashboard summary benchmark not implemented - using estimate") return 250.0 # Estimated 250ms based on plan except Exception as e: logger.error(f"Dashboard benchmark error: {e}") return 0 async def _benchmark_dashboard_trends() -> float: """Benchmark dashboard trends query""" try: logger.warning("Dashboard trends benchmark not implemented - using estimate") return 400.0 # Estimated 400ms except Exception as e: logger.error(f"Trends benchmark error: {e}") return 0 async def _benchmark_invoices() -> float: """Benchmark invoices query""" try: logger.warning("Invoices benchmark not implemented - using estimate") return 180.0 # Estimated 180ms except Exception as e: logger.error(f"Invoices benchmark error: {e}") return 0 async def _benchmark_treasury() -> float: """Benchmark treasury query""" try: logger.warning("Treasury benchmark not implemented - using estimate") return 250.0 # Estimated 250ms except Exception as e: logger.error(f"Treasury benchmark error: {e}") return 0 # Helper functions async def _get_sample_company_ids(limit: int = 10) -> list: """Get sample company IDs for testing""" try: import sys import os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) from shared.database.oracle_pool import oracle_pool async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: cursor.execute(f""" SELECT id_firma FROM CONTAFIN_ORACLE.v_nom_firme WHERE ROWNUM <= {limit} """) results = cursor.fetchall() return [row[0] for row in results] except Exception as e: logger.error(f"Get sample companies error: {e}") return [] async def _get_sample_username() -> str: """Get sample username for testing""" try: import sys import os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) from shared.database.oracle_pool import oracle_pool async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: cursor.execute(""" SELECT nume_utilizator FROM CONTAFIN_ORACLE.vdef_util_firme WHERE ROWNUM <= 1 """) result = cursor.fetchone() return result[0] if result else "admin" except Exception as e: logger.error(f"Get sample username error: {e}") return "admin"