""" Cache Manager - Orchestrator for hybrid L1 + L2 cache """ import logging import asyncio from typing import Any, Optional from .config import CacheConfig from .memory_cache import MemoryCache from .sqlite_cache import SQLiteCache logger = logging.getLogger(__name__) class CacheManager: """ Hybrid cache manager (Memory L1 + SQLite L2) Features: - Two-tier caching: fast memory + persistent SQLite - Automatic TTL management per cache type - Performance tracking and benchmarking - Per-user cache enable/disable - Global cache toggle """ def __init__(self, config: CacheConfig): """ Initialize cache manager Args: config: Cache configuration """ self.config = config self.memory = MemoryCache(max_size=config.memory_max_size) self.sqlite = SQLiteCache(db_path=config.sqlite_path) self._cleanup_task: Optional[asyncio.Task] = None self._initialized = False self._last_cache_source: Optional[str] = None # Track last cache source (L1/L2) async def init(self): """Initialize cache system""" if self._initialized: logger.warning("Cache already initialized") return # Initialize SQLite database schema await self.sqlite.init_db() # Start cleanup task if self.config.enabled: self._cleanup_task = asyncio.create_task(self._cleanup_loop()) self._initialized = True logger.info(f"Cache initialized: type={self.config.cache_type}, enabled={self.config.enabled}") async def close(self): """Close cache and cleanup""" if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass # Close SQLite connection manager if hasattr(self.sqlite, 'close'): await self.sqlite.close() logger.info("Cache closed") async def get(self, key: str, cache_type: str) -> Optional[Any]: """ Get value from cache (L1 → L2) Args: key: Cache key cache_type: Type of cache entry Returns: Cached value or None if not found """ if not self.config.enabled: self._last_cache_source = None return None # Try L1 (Memory) first value = await self.memory.get(key) if value is not None: self._last_cache_source = "L1" logger.debug(f"Cache HIT L1 (memory): {key}") return value # Try L2 (SQLite) value = await self.sqlite.get(key) if value is not None: self._last_cache_source = "L2" logger.debug(f"Cache HIT L2 (sqlite): {key}") # Populate L1 for next time ttl = self.config.get_ttl_for_type(cache_type) await self.memory.set(key, value, ttl) return value # Cache MISS self._last_cache_source = None logger.debug(f"Cache MISS: {key}") return None def get_last_cache_source(self) -> Optional[str]: """ Get source of last cache hit (L1/L2/None) Returns: "L1" if last hit was from memory cache "L2" if last hit was from SQLite cache None if last call was a cache miss or cache disabled """ return self._last_cache_source async def set(self, key: str, value: Any, cache_type: str, company_id: Optional[int] = None, ttl: Optional[int] = None): """ Set value in cache (both L1 and L2) Args: key: Cache key value: Value to cache cache_type: Type of cache entry company_id: Company ID (for company-specific caches) ttl: Time to live (uses default for cache_type if not provided) """ if not self.config.enabled: return if ttl is None: ttl = self.config.get_ttl_for_type(cache_type) # Store in both L1 and L2 await self.memory.set(key, value, ttl) await self.sqlite.set(key, value, cache_type, company_id, ttl) logger.debug(f"Cache SET (L1 + L2): {key} (TTL: {ttl}s)") async def delete(self, key: str): """Delete entry from both L1 and L2""" await self.memory.delete(key) await self.sqlite.delete(key) logger.debug(f"Cache deleted: {key}") async def invalidate(self, company_id: Optional[int] = None, cache_type: Optional[str] = None): """ Invalidate cache entries Args: company_id: If provided, clear only this company's cache cache_type: If provided, clear only this cache type """ if company_id is not None and cache_type is not None: # Clear specific company + type from .keys import generate_key_pattern pattern = generate_key_pattern(cache_type, company_id) await self.memory.clear_by_pattern(pattern) # SQLite: clear by company + type (needs query) # For now, just clear by company await self.sqlite.clear_by_company(company_id) logger.info(f"Cache invalidated: company={company_id}, type={cache_type}") elif company_id is not None: # Clear all for company from .keys import generate_key_pattern # Clear all types for this company (pattern match all) # Memory: need to iterate and match company_id in key # For simplicity, clear by pattern prefix await self.memory.clear() # TODO: improve pattern matching await self.sqlite.clear_by_company(company_id) logger.info(f"Cache invalidated: company={company_id}") elif cache_type is not None: # Clear all for type from .keys import generate_key_pattern pattern = generate_key_pattern(cache_type) await self.memory.clear_by_pattern(pattern) await self.sqlite.clear_by_type(cache_type) logger.info(f"Cache invalidated: type={cache_type}") else: # Clear everything await self.memory.clear() await self.sqlite.clear() logger.info("Cache invalidated: ALL") async def is_enabled_for_user(self, username: Optional[str]) -> bool: """ Check if cache is enabled for specific user Args: username: Username to check Returns: True if cache enabled for user, False otherwise """ if not self.config.enabled: return False if username is None: return True # Check per-user setting return await self.sqlite.get_user_cache_enabled(username) async def set_user_cache_enabled(self, username: str, enabled: bool): """Set user cache enabled/disabled""" await self.sqlite.set_user_cache_enabled(username, enabled) logger.info(f"User cache setting: {username} -> {enabled}") # Benchmarking async def get_benchmark(self, cache_type: str) -> Optional[float]: """Get average benchmark time for cache type""" return await self.sqlite.get_benchmark(cache_type) async def update_benchmark(self, cache_type: str, new_time_ms: float): """ Update benchmark with new measurement (exponential moving average) Args: cache_type: Type of cache new_time_ms: New measured time in milliseconds """ current_avg = await self.sqlite.get_benchmark(cache_type) if current_avg is None: # First measurement new_avg = new_time_ms sample_count = 1 else: # Exponential moving average (alpha = 0.1) new_avg = 0.9 * current_avg + 0.1 * new_time_ms # Get current sample count (TODO: retrieve from DB) sample_count = 1 # Simplified for now await self.sqlite.set_benchmark(cache_type, new_avg, sample_count) logger.debug(f"Benchmark updated: {cache_type} -> {new_avg:.2f}ms") # Performance Tracking async def track_performance(self, cache_type: str, is_hit: bool, actual_time_ms: float, time_saved_ms: Optional[float] = None, estimated_oracle_time_ms: Optional[float] = None, company_id: Optional[int] = None, username: Optional[str] = None): """ Track performance metric Args: cache_type: Type of cache is_hit: True if cache hit, False if cache miss actual_time_ms: Actual response time time_saved_ms: Time saved by cache (for hits) estimated_oracle_time_ms: Estimated Oracle time (for hits) company_id: Company ID username: Username """ if not self.config.track_performance: return await self.sqlite.log_performance( cache_type=cache_type, company_id=company_id, cache_hit=is_hit, response_time_ms=actual_time_ms, estimated_oracle_time_ms=estimated_oracle_time_ms, time_saved_ms=time_saved_ms, username=username ) # Statistics async def get_stats(self) -> dict: """Get comprehensive cache statistics""" memory_stats = self.memory.get_stats() sqlite_stats = await self.sqlite.get_stats() return { 'enabled': self.config.enabled, 'cache_type': self.config.cache_type, 'memory': memory_stats, 'sqlite': sqlite_stats, } # Cleanup async def _cleanup_loop(self): """Background task to cleanup expired entries""" while True: try: await asyncio.sleep(self.config.cleanup_interval) await self._cleanup_expired() except asyncio.CancelledError: break except Exception as e: logger.error(f"Cleanup error: {e}", exc_info=True) async def _cleanup_expired(self): """Remove expired entries from both caches""" logger.info("Running cache cleanup...") await self.memory.cleanup_expired() await self.sqlite.cleanup_expired() logger.info("Cache cleanup completed") # Global cache manager instance _cache_manager: Optional[CacheManager] = None async def init_cache(config: CacheConfig): """Initialize global cache manager""" global _cache_manager if _cache_manager is not None: logger.warning("Cache already initialized") return _cache_manager = CacheManager(config) await _cache_manager.init() logger.info("Global cache manager initialized") def get_cache() -> Optional[CacheManager]: """Get global cache manager instance""" return _cache_manager async def close_cache(): """Close global cache manager""" global _cache_manager if _cache_manager is not None: await _cache_manager.close() _cache_manager = None