""" Search service for INDEX-SISTEM-JOCURI v2.0 Enhanced search with FTS5 and intelligent filtering """ from typing import List, Dict, Any, Optional from app.models.database import DatabaseManager import re class SearchService: """Enhanced search service with intelligent query processing""" def __init__(self, db_manager: DatabaseManager): """Initialize search service with database manager""" self.db = db_manager def search_activities(self, search_text: Optional[str] = None, filters: Optional[Dict[str, str]] = None, limit: int = 100) -> List[Dict[str, Any]]: """ Enhanced search with intelligent filter mapping and query processing """ if filters is None: filters = {} # Process and normalize search text processed_search = self._process_search_text(search_text) # Map web filters to database fields db_filters = self._map_filters_to_db_fields(filters) # Perform database search results = self.db.search_activities( search_text=processed_search, **db_filters, limit=limit ) # Post-process results for relevance and ranking return self._post_process_results(results, processed_search, filters) def _process_search_text(self, search_text: Optional[str]) -> Optional[str]: """Process and enhance search text for better FTS5 results""" if not search_text or not search_text.strip(): return None # Clean the search text cleaned = search_text.strip() # Handle Romanian diacritics and common variations replacements = { 'ă': 'a', 'â': 'a', 'î': 'i', 'ș': 's', 'ț': 't', 'Ă': 'A', 'Â': 'A', 'Î': 'I', 'Ș': 'S', 'Ț': 'T' } # Create both original and normalized versions for search normalized = cleaned for old, new in replacements.items(): normalized = normalized.replace(old, new) # If different, search for both versions if normalized != cleaned and len(cleaned.split()) == 1: return f'"{cleaned}" OR "{normalized}"' # For multi-word queries, use phrase search with fallback if len(cleaned.split()) > 1: # Try exact phrase first, then individual words words = cleaned.split() individual_terms = ' OR '.join(f'"{word}"' for word in words) return f'"{cleaned}" OR ({individual_terms})' return f'"{cleaned}"' def _map_filters_to_db_fields(self, filters: Dict[str, str]) -> Dict[str, Any]: """Map web interface filters to database query parameters""" db_filters = {} for filter_key, filter_value in filters.items(): if not filter_value or not filter_value.strip(): continue # Map filter types to database fields if filter_key == 'category': db_filters['category'] = filter_value elif filter_key == 'age_group': # Parse age range (e.g., "5-8 ani", "12+ ani") age_match = re.search(r'(\d+)(?:-(\d+))?\s*ani?', filter_value) if age_match: min_age = int(age_match.group(1)) max_age = int(age_match.group(2)) if age_match.group(2) else None if max_age: # Range like "5-8 ani" db_filters['age_group_min'] = min_age db_filters['age_group_max'] = max_age else: # Open range like "12+ ani" db_filters['age_group_min'] = min_age elif filter_key == 'participants': # Parse participant range (e.g., "5-10 persoane", "30+ persoane") part_match = re.search(r'(\d+)(?:-(\d+))?\s*persoan[eă]?', filter_value) if part_match: min_part = int(part_match.group(1)) max_part = int(part_match.group(2)) if part_match.group(2) else None if max_part: db_filters['participants_min'] = min_part db_filters['participants_max'] = max_part else: db_filters['participants_min'] = min_part elif filter_key == 'duration': # Parse duration (e.g., "15-30 minute", "60+ minute") dur_match = re.search(r'(\d+)(?:-(\d+))?\s*minut[eă]?', filter_value) if dur_match: min_dur = int(dur_match.group(1)) max_dur = int(dur_match.group(2)) if dur_match.group(2) else None if max_dur: db_filters['duration_min'] = min_dur db_filters['duration_max'] = max_dur else: db_filters['duration_min'] = min_dur elif filter_key == 'materials': db_filters['materials_category'] = filter_value elif filter_key == 'difficulty': db_filters['difficulty_level'] = filter_value # Handle any other custom filters else: # Generic filter handling - try to match against keywords or tags if 'keywords' not in db_filters: db_filters['keywords'] = [] db_filters['keywords'].append(filter_value) return db_filters def _post_process_results(self, results: List[Dict[str, Any]], search_text: Optional[str], filters: Dict[str, str]) -> List[Dict[str, Any]]: """Post-process results for better ranking and relevance""" if not results: return results # If we have search text, boost results based on relevance if search_text: results = self._boost_search_relevance(results, search_text) # Apply secondary ranking based on filters if filters: results = self._apply_filter_boost(results, filters) # Ensure variety in categories if no specific category filter if 'category' not in filters: results = self._ensure_category_variety(results) return results def _boost_search_relevance(self, results: List[Dict[str, Any]], search_text: str) -> List[Dict[str, Any]]: """Boost results based on search text relevance""" search_terms = search_text.lower().replace('"', '').split() for result in results: boost_score = 0 # Check name matches (highest priority) name_lower = result.get('name', '').lower() for term in search_terms: if term in name_lower: boost_score += 10 if name_lower.startswith(term): boost_score += 5 # Extra boost for name starts with term # Check description matches desc_lower = result.get('description', '').lower() for term in search_terms: if term in desc_lower: boost_score += 3 # Check keywords matches keywords_lower = result.get('keywords', '').lower() for term in search_terms: if term in keywords_lower: boost_score += 5 # Store boost score for sorting result['_boost_score'] = boost_score # Sort by boost score, then by existing search rank results.sort(key=lambda x: ( x.get('_boost_score', 0), x.get('search_rank', 0), x.get('popularity_score', 0) ), reverse=True) # Remove boost score from final results for result in results: result.pop('_boost_score', None) return results def _apply_filter_boost(self, results: List[Dict[str, Any]], filters: Dict[str, str]) -> List[Dict[str, Any]]: """Apply additional ranking based on filter preferences""" # If user filtered by materials, boost activities with detailed material lists if 'materials' in filters: for result in results: if result.get('materials_list') and len(result['materials_list']) > 50: result['popularity_score'] = result.get('popularity_score', 0) + 1 # If user filtered by age, boost activities with specific age ranges if 'age_group' in filters: for result in results: if result.get('age_group_min') and result.get('age_group_max'): result['popularity_score'] = result.get('popularity_score', 0) + 1 return results def _ensure_category_variety(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Ensure variety in categories when no specific category is filtered""" if len(results) <= 10: return results # Group results by category category_groups = {} for result in results: category = result.get('category', 'Unknown') if category not in category_groups: category_groups[category] = [] category_groups[category].append(result) # If we have multiple categories, ensure balanced representation if len(category_groups) > 1: balanced_results = [] max_per_category = max(3, len(results) // len(category_groups)) # Take up to max_per_category from each category for category, category_results in category_groups.items(): balanced_results.extend(category_results[:max_per_category]) # Add remaining results to reach original count remaining_slots = len(results) - len(balanced_results) if remaining_slots > 0: remaining_results = [] for category_results in category_groups.values(): remaining_results.extend(category_results[max_per_category:]) # Sort remaining by relevance and add top ones remaining_results.sort(key=lambda x: ( x.get('search_rank', 0), x.get('popularity_score', 0) ), reverse=True) balanced_results.extend(remaining_results[:remaining_slots]) return balanced_results return results def get_search_suggestions(self, partial_query: str, limit: int = 5) -> List[str]: """Get search suggestions based on partial query""" if not partial_query or len(partial_query) < 2: return [] try: # Search for activities that match the partial query results = self.db.search_activities( search_text=f'"{partial_query}"', limit=limit * 2 ) suggestions = [] seen = set() for result in results: # Extract potential suggestions from name and keywords name = result.get('name', '') keywords = result.get('keywords', '') # Add name if it contains the partial query if partial_query.lower() in name.lower() and name not in seen: suggestions.append(name) seen.add(name) # Add individual keywords that start with partial query if keywords: for keyword in keywords.split(','): keyword = keyword.strip() if (keyword.lower().startswith(partial_query.lower()) and len(keyword) > len(partial_query) and keyword not in seen): suggestions.append(keyword) seen.add(keyword) if len(suggestions) >= limit: break return suggestions[:limit] except Exception as e: print(f"Error getting search suggestions: {e}") return []