""" Email-Server Cache for Multi-Oracle Auto-Discovery Builds and maintains a cache mapping emails to server IDs: - At startup, connects to each Oracle server and extracts emails from CONTAFIN_ORACLE.UTILIZATORI - Cache structure: {email: [server_ids]} - Auto-refresh every 15 minutes (configurable) - Thread-safe with asyncio.Lock US-003: Auto-Discovery Email-Server Cache US-013: Added username lookup support (direct query, no caching) """ import asyncio import logging from datetime import datetime, timedelta from typing import Dict, List, Optional, Set logger = logging.getLogger(__name__) class EmailServerCache: """ Cache for email-to-server mapping. Builds a dictionary {email: [server_ids]} by querying CONTAFIN_ORACLE.UTILIZATORI on each configured Oracle server. Features: - Lazy initialization (build on first access or explicit call) - Auto-refresh at configurable intervals - Thread-safe operations - Graceful handling of server connection failures """ _instance: Optional['EmailServerCache'] = None def __new__(cls): if cls._instance is None: cls._instance = super(EmailServerCache, cls).__new__(cls) cls._instance._cache: Dict[str, List[str]] = {} cls._instance._last_refresh: Optional[datetime] = None cls._instance._refresh_interval = timedelta(minutes=15) cls._instance._lock = asyncio.Lock() cls._instance._initialized = False cls._instance._refresh_task: Optional[asyncio.Task] = None return cls._instance def set_refresh_interval(self, minutes: int) -> None: """ Set the cache refresh interval. Args: minutes: Refresh interval in minutes (default: 15) """ self._refresh_interval = timedelta(minutes=minutes) logger.info(f"Email cache refresh interval set to {minutes} minutes") async def build_cache(self) -> None: """ Build the email-server cache by querying all configured Oracle servers. Connects to each server and extracts active user emails from CONTAFIN_ORACLE.UTILIZATORI table. """ from shared.database.oracle_pool import oracle_pool from backend.config import settings async with self._lock: logger.info("[EMAIL-CACHE] Building email-server cache...") new_cache: Dict[str, Set[str]] = {} # Use set to avoid duplicates servers = settings.get_oracle_servers() if not servers: logger.warning("[EMAIL-CACHE] No Oracle servers configured") self._cache = {} self._last_refresh = datetime.now() self._initialized = True return for server in servers: try: logger.info(f"[EMAIL-CACHE] Querying server '{server.id}' ({server.name})...") # Get connection from the multi-pool async with oracle_pool.get_connection(server.id) as connection: with connection.cursor() as cursor: # Query emails from UTILIZATORI table # Only active users (INACTIV=0, STERS=0) with valid emails cursor.execute(""" SELECT LOWER(EMAIL) as email FROM CONTAFIN_ORACLE.UTILIZATORI WHERE EMAIL IS NOT NULL AND TRIM(EMAIL) IS NOT NULL AND INACTIV = 0 AND STERS = 0 """) rows = cursor.fetchall() email_count = 0 for row in rows: email = row[0].strip().lower() if row[0] else None if email and '@' in email: # Basic email validation if email not in new_cache: new_cache[email] = set() new_cache[email].add(server.id) email_count += 1 logger.info(f"[EMAIL-CACHE] Found {email_count} valid emails on server '{server.id}'") except Exception as e: # Log error but continue with other servers logger.error(f"[EMAIL-CACHE] Failed to query server '{server.id}': {e}") continue # Convert sets to sorted lists for consistent ordering self._cache = {email: sorted(list(server_ids)) for email, server_ids in new_cache.items()} self._last_refresh = datetime.now() self._initialized = True total_emails = len(self._cache) multi_server_emails = sum(1 for servers in self._cache.values() if len(servers) > 1) logger.info(f"[EMAIL-CACHE] ✅ Cache built: {total_emails} unique emails") logger.info(f"[EMAIL-CACHE] {multi_server_emails} emails exist on multiple servers") async def refresh_if_needed(self) -> bool: """ Refresh cache if the refresh interval has passed. Returns: True if cache was refreshed, False otherwise """ if not self._initialized: await self.build_cache() return True if self._last_refresh is None: await self.build_cache() return True time_since_refresh = datetime.now() - self._last_refresh if time_since_refresh >= self._refresh_interval: await self.build_cache() return True return False def get_servers_for_email(self, email: str) -> List[str]: """ Get list of server IDs where the email exists. Args: email: User email address Returns: List of server_ids where this email exists. Empty list if email not found (NOT an error). """ if not email: return [] normalized_email = email.strip().lower() servers = self._cache.get(normalized_email, []) if servers: logger.debug(f"[EMAIL-CACHE] Email '{normalized_email}' found on servers: {servers}") else: logger.debug(f"[EMAIL-CACHE] Email '{normalized_email}' not found in cache") return servers.copy() # Return a copy to prevent external modification def is_initialized(self) -> bool: """Check if cache has been built at least once.""" return self._initialized def get_cache_stats(self) -> Dict: """ Get cache statistics. Returns: Dict with cache stats (total_emails, multi_server_count, last_refresh, etc.) """ if not self._initialized: return { 'initialized': False, 'total_emails': 0, 'last_refresh': None, 'refresh_interval_minutes': self._refresh_interval.total_seconds() / 60 } multi_server = sum(1 for servers in self._cache.values() if len(servers) > 1) server_distribution = {} for servers in self._cache.values(): count = len(servers) server_distribution[count] = server_distribution.get(count, 0) + 1 return { 'initialized': True, 'total_emails': len(self._cache), 'multi_server_count': multi_server, 'server_distribution': server_distribution, 'last_refresh': self._last_refresh.isoformat() if self._last_refresh else None, 'refresh_interval_minutes': self._refresh_interval.total_seconds() / 60 } async def start_auto_refresh(self) -> None: """ Start background task for automatic cache refresh. Runs refresh at the configured interval (default: 15 minutes). """ if self._refresh_task and not self._refresh_task.done(): logger.warning("[EMAIL-CACHE] Auto-refresh task already running") return async def refresh_loop(): while True: try: await asyncio.sleep(self._refresh_interval.total_seconds()) logger.info("[EMAIL-CACHE] Auto-refresh triggered") await self.build_cache() except asyncio.CancelledError: logger.info("[EMAIL-CACHE] Auto-refresh task cancelled") break except Exception as e: logger.error(f"[EMAIL-CACHE] Auto-refresh error: {e}") # Continue running, will retry on next interval self._refresh_task = asyncio.create_task(refresh_loop()) logger.info(f"[EMAIL-CACHE] Auto-refresh started (every {self._refresh_interval.total_seconds() / 60:.0f} minutes)") async def stop_auto_refresh(self) -> None: """Stop the auto-refresh background task.""" if self._refresh_task and not self._refresh_task.done(): self._refresh_task.cancel() try: await self._refresh_task except asyncio.CancelledError: pass self._refresh_task = None logger.info("[EMAIL-CACHE] Auto-refresh stopped") def clear_cache(self) -> None: """Clear the cache (useful for testing).""" self._cache = {} self._initialized = False self._last_refresh = None logger.info("[EMAIL-CACHE] Cache cleared") async def get_servers_for_username(self, username: str) -> List[str]: """ Get list of server IDs where the username exists (US-013). Unlike email lookup which uses the cache, username lookup queries Oracle directly on each server. This is because: - Usernames are less commonly used for login - Direct query ensures fresh data - Avoids bloating the cache with both email and username mappings Args: username: Username to look up (case-insensitive, converted to uppercase) Returns: List of server_ids where this username exists. Empty list if username not found (NOT an error). """ if not username: return [] from shared.database.oracle_pool import oracle_pool from backend.config import settings normalized_username = username.strip().upper() found_servers: List[str] = [] servers = settings.get_oracle_servers() if not servers: logger.warning("[EMAIL-CACHE] No Oracle servers configured for username lookup") return [] for server in servers: try: async with oracle_pool.get_connection(server.id) as connection: with connection.cursor() as cursor: # Query for username in UTILIZATORI table # Only active users (INACTIV=0, STERS=0) cursor.execute(""" SELECT 1 FROM CONTAFIN_ORACLE.UTILIZATORI WHERE UPPER(UTILIZATOR) = :username AND INACTIV = 0 AND STERS = 0 AND ROWNUM = 1 """, {"username": normalized_username}) row = cursor.fetchone() if row: found_servers.append(server.id) logger.debug(f"[EMAIL-CACHE] Username '{normalized_username}' found on server '{server.id}'") except Exception as e: logger.error(f"[EMAIL-CACHE] Failed to query username on server '{server.id}': {e}") continue if found_servers: logger.info(f"[EMAIL-CACHE] Username '{normalized_username}' found on {len(found_servers)} server(s): {found_servers}") else: logger.debug(f"[EMAIL-CACHE] Username '{normalized_username}' not found on any server") return sorted(found_servers) # Global singleton instance email_server_cache = EmailServerCache() # Convenience functions for external use def get_servers_for_email(email: str) -> List[str]: """ Get list of server IDs where the email exists. This is a convenience function that wraps the singleton instance. Args: email: User email address Returns: List of server_ids. Empty list if email not found (NOT an error). """ return email_server_cache.get_servers_for_email(email) async def build_email_cache() -> None: """Build/refresh the email-server cache.""" await email_server_cache.build_cache() async def start_email_cache_refresh() -> None: """Start automatic cache refresh.""" await email_server_cache.start_auto_refresh() async def stop_email_cache_refresh() -> None: """Stop automatic cache refresh.""" await email_server_cache.stop_auto_refresh() async def get_servers_for_username(username: str) -> List[str]: """ Get list of server IDs where the username exists (US-013). This is a convenience function that wraps the singleton instance. Args: username: Username to look up (case-insensitive) Returns: List of server_ids. Empty list if username not found (NOT an error). """ return await email_server_cache.get_servers_for_username(username)