""" Event-based cache invalidation monitor Monitors {schema}.act tables for changes and invalidates cache automatically """ import asyncio import logging import sys import os from typing import Optional # Path setup handled by main.py - this is redundant but kept for module isolation # sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) logger = logging.getLogger(__name__) class EventMonitor: """ Monitors schema.act tables for changes to trigger cache invalidation Runs as background task, checking max(id_act) at configured intervals Uses permanent schema_mappings cache to avoid repeated schema lookups """ def __init__(self, cache_manager, config): """ Initialize event monitor Args: cache_manager: CacheManager instance config: CacheConfig instance """ self.cache = cache_manager self.config = config self.running = False self.task: Optional[asyncio.Task] = None async def start(self): """Start monitoring task""" if self.running: logger.warning("Event monitor already running") return self.running = True self.task = asyncio.create_task(self._monitor_loop()) logger.info( f"Event monitor started (interval: {self.config.check_interval}s)" ) async def stop(self): """Stop monitoring task""" if not self.running: return self.running = False if self.task: self.task.cancel() try: await self.task except asyncio.CancelledError: pass logger.info("Event monitor stopped") async def _monitor_loop(self): """Main monitoring loop""" while self.running: try: await self._check_all_companies() await asyncio.sleep(self.config.check_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Event monitor error: {e}", exc_info=True) # Wait 1 minute on error before retrying await asyncio.sleep(60) async def _check_all_companies(self): """ Check all companies with active cache for changes Queries max(id_act) from {schema}.act for each cached company and invalidates cache if changes detected """ try: # Get list of companies with active cache entries cached_companies = await self.cache.sqlite.get_cached_company_ids() if not cached_companies: logger.debug("No cached companies to monitor") return logger.info(f"Checking {len(cached_companies)} companies for changes...") invalidated_count = 0 for company_id in cached_companies: try: # Check if company data changed changed = await self._check_company_changes(company_id) if changed: # Invalidate cache for this company await self.cache.invalidate(company_id=company_id) invalidated_count += 1 logger.info( f"Cache invalidated for company {company_id} due to act changes" ) except Exception as e: # Error for one company shouldn't stop checking others logger.error(f"Error checking company {company_id}: {e}") continue if invalidated_count > 0: logger.info( f"Auto-invalidation complete: {invalidated_count} companies affected" ) except Exception as e: logger.error(f"Check all companies error: {e}", exc_info=True) async def _check_company_changes(self, company_id: int) -> bool: """ Check if company data changed (monitor max(id_act) in schema.act) Args: company_id: Company ID to check Returns: True if cache should be invalidated, False otherwise """ try: # 1. Get schema (from permanent cache) schema = await self._get_schema_for_company(company_id) if not schema: logger.warning(f"Schema not found for company {company_id}") return False # 2. Get current max(id_act) from Oracle current_max = await self._get_max_id_act(schema) # 3. Get cached watermark cached_watermark = await self.cache.sqlite.get_watermark(company_id) # 4. Compare if cached_watermark is None: # First time checking - store watermark, no invalidation await self.cache.sqlite.set_watermark(company_id, schema, current_max) logger.debug( f"Watermark initialized for company {company_id}: {current_max}" ) return False if current_max > cached_watermark: # Changes detected! logger.info( f"Schema {schema} (company {company_id}): " f"id_act changed {cached_watermark} -> {current_max}" ) # Update watermark await self.cache.sqlite.set_watermark(company_id, schema, current_max) return True # Invalidate cache # No changes return False except Exception as e: logger.error(f"Check company {company_id} changes error: {e}") return False # Don't invalidate on error async def _get_schema_for_company(self, company_id: int) -> Optional[str]: """ Get schema for company (with permanent caching) First checks permanent schema_mappings cache, falls back to Oracle query if not cached Args: company_id: Company ID Returns: Schema name or None """ # Check permanent cache first cached_schema = await self.cache.sqlite.get_schema_mapping(company_id) if cached_schema: logger.debug(f"Schema mapping HIT for company {company_id}: {cached_schema}") return cached_schema # Cache MISS - query Oracle logger.info(f"Schema mapping MISS for company {company_id}, querying Oracle...") try: from shared.database.oracle_pool import oracle_pool async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: cursor.execute(""" SELECT schema FROM CONTAFIN_ORACLE.v_nom_firme WHERE id_firma = :id """, {'id': company_id}) result = cursor.fetchone() if not result: logger.warning(f"Company {company_id} not found in v_nom_firme") return None schema = result[0] # Store PERMANENT in schema_mappings (never expires) await self.cache.sqlite.set_schema_mapping(company_id, schema) logger.info(f"Schema mapping stored for company {company_id}: {schema}") return schema except Exception as e: logger.error(f"Get schema for company {company_id} error: {e}") return None async def _get_max_id_act(self, schema: str) -> int: """ Query max(id_act) from {schema}.act Args: schema: Schema name Returns: Max id_act value (0 if table empty) """ try: from shared.database.oracle_pool import oracle_pool async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: # IMPORTANT: Schema comes from v_nom_firme (trusted source) # so it's safe from SQL injection query = f"SELECT MAX(id_act) FROM {schema}.act" cursor.execute(query) result = cursor.fetchone() max_id_act = result[0] if result and result[0] is not None else 0 return max_id_act except Exception as e: logger.error(f"Get max_id_act for schema {schema} error: {e}") return 0 # Optional: Preload all schema mappings at startup async def preload_all_schema_mappings(): """ Preload all schema mappings at startup (optional) Prevents cache misses on first requests by populating schema_mappings table with all companies """ from .cache_manager import get_cache cache = get_cache() if not cache: logger.warning("Cache not initialized - skipping schema preload") return logger.info("Preloading all schema mappings...") try: from shared.database.oracle_pool import oracle_pool async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: cursor.execute(""" SELECT id_firma, schema FROM CONTAFIN_ORACLE.v_nom_firme """) results = cursor.fetchall() for id_firma, schema in results: await cache.sqlite.set_schema_mapping(id_firma, schema) logger.info(f"Preloaded {len(results)} schema mappings") except Exception as e: logger.error(f"Schema preload error: {e}") # Global event monitor instance _event_monitor: Optional[EventMonitor] = None async def init_event_monitor(cache_manager, config): """ Initialize global event monitor Args: cache_manager: CacheManager instance config: CacheConfig instance """ global _event_monitor _event_monitor = EventMonitor(cache_manager, config) # Start if auto-invalidate enabled if config.auto_invalidate_enabled: await _event_monitor.start() def get_event_monitor() -> Optional[EventMonitor]: """Get global event monitor instance""" return _event_monitor async def toggle_event_monitor(enabled: bool): """ Toggle event monitor on/off Args: enabled: True to start monitoring, False to stop """ monitor = get_event_monitor() if not monitor: logger.warning("Event monitor not initialized") return if enabled and not monitor.running: await monitor.start() elif not enabled and monitor.running: await monitor.stop()