Files
game-library/app/services/search.py
Claude Agent bcfb6841eb Faza 1 complete: bilingual+enrichment plumbing, UI/filters, frozen DB
Extraction finished (575/588 chunks; 6 content-filter-blocked, 7 await
re-extraction). DB rebuilt and frozen at 9418 activities — content_keys
are now stable for the enrichment overlay.

Part A (plumbing + UI):
- database.py: name_ro/description_ro/rules_ro/variations_ro, indoor_outdoor,
  space_needed, estimated_fields, source_id/source_ids/chunk_key columns;
  FTS5 indexes the 4 *_ro columns across CREATE + all 3 triggers; new equality
  filters + category counts for both axes.
- activity.py: new fields + bilingual display helpers (get_display_*,
  is_estimated, axis displays).
- config_taxonomy.py: INDOOR_OUTDOOR/SPACE_NEEDED enums + normalizers
  (None on unrecognised, no fabrication).
- search.py / routes.py / config.py / templates / css: new dropdowns,
  RO-primary rendering with "(estimat)" markers and collapsible original
  text, and a /source/<id> download route shipped DARK behind
  SOURCE_DOWNLOAD_ENABLED (copyright opt-in).
- build_database.py: source_id/chunk_key in dict_to_activity; merge_cluster
  unions source_ids without touching enrichment fields.

Part B (enrichment pipeline, built not yet run):
- build_database.py: load_enrichment + apply_enrichment (post-dedup, keyed on
  content_key) + --enrichment CLI + stated-vs-estimated QA.
- run_enrichment.py (resumable, --source/--limit pilot scoping, --collect),
  ENRICHMENT_PROMPT.md.

Repair: scripts/repair_extractions.py fixes the subagents' systematic
unescaped-ASCII-quote bug with a faithful char-scanner (escapes, never
truncates) + schema validation + a strictly-more-text guard. json_repair was
tried first, truncated silently, and is NOT used. build_database has no repair
dependency.

Tests: tests/test_enrichment.py added; 99 pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 18:10:13 +00:00

397 lines
17 KiB
Python

"""
Search service for INDEX-SISTEM-JOCURI v2.0
Enhanced search with FTS5 and intelligent filtering
"""
from typing import List, Dict, Any, Optional
from app.models.database import DatabaseManager
from app.config_taxonomy import NON_GAME_CONTENT_TYPES
import re
# Category slugs that are themselves "non-game" — selecting one of these as a
# category filter also lifts the default non-game content_type exclusion.
NON_GAME_CATEGORIES = {"retete", "cantece-ceremonii"}
# When a Python-side post-filter is active the DB LIMIT is applied *before*
# filtering, so we over-fetch to still satisfy the caller's `limit`.
_OVERSCAN_FACTOR = 5
_OVERSCAN_CAP = 2000
class SearchService:
"""Enhanced search service with intelligent query processing"""
def __init__(self, db_manager: DatabaseManager):
"""Initialize search service with database manager"""
self.db = db_manager
def search_activities(self,
search_text: Optional[str] = None,
filters: Optional[Dict[str, str]] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""
Enhanced search with intelligent filter mapping and query processing
"""
if filters is None:
filters = {}
# Process and normalize search text
processed_search = self._process_search_text(search_text)
# Map web filters to database fields
db_filters = self._map_filters_to_db_fields(filters)
# content_type and language are filtered in Python: the DB layer does
# not expose them as query parameters. The DEFAULT search excludes the
# non-game content types (rețete / cântece / ceremonii) — they surface
# only when the user explicitly filters that content_type, or picks a
# non-game category. See plan §6.
content_type, exclude_non_game = self._resolve_content_type_filter(filters)
language = (filters.get('language') or '').strip().lower() or None
post_filtering = bool(content_type or exclude_non_game or language)
# Over-fetch when post-filtering so the final list can still reach `limit`.
fetch_limit = min(limit * _OVERSCAN_FACTOR, _OVERSCAN_CAP) if post_filtering else limit
# Perform database search
results = self.db.search_activities(
search_text=processed_search,
**db_filters,
limit=fetch_limit
)
# Apply content_type / language post-filters
results = self._apply_content_type_filter(results, content_type, exclude_non_game)
if language:
results = [r for r in results
if (r.get('language') or '').strip().lower() == language]
# Post-process results for relevance and ranking, then honour `limit`
results = self._post_process_results(results, processed_search, filters)
return results[:limit]
def _resolve_content_type_filter(self, filters: Dict[str, str]):
"""Determine the content_type post-filter.
Returns (explicit_content_type | None, exclude_non_game: bool):
- an explicit `content_type` filter → that value, no exclusion;
- a `category` filter on a non-game category → no exclusion;
- otherwise → default search, exclude non-game content types.
"""
content_type = (filters.get('content_type') or '').strip()
if content_type:
return content_type, False
category = (filters.get('category') or '').strip()
if category in NON_GAME_CATEGORIES:
return None, False
return None, True
def _apply_content_type_filter(self,
results: List[Dict[str, Any]],
content_type: Optional[str],
exclude_non_game: bool) -> List[Dict[str, Any]]:
"""Filter results by content_type (explicit include vs default exclude)."""
if content_type:
return [r for r in results
if (r.get('content_type') or '') == content_type]
if exclude_non_game:
# Rows with NULL/unknown content_type are kept — only the known
# non-game types are dropped from the default search.
return [r for r in results
if (r.get('content_type') or '') not in NON_GAME_CONTENT_TYPES]
return results
def _process_search_text(self, search_text: Optional[str]) -> Optional[str]:
"""Process and enhance search text for better FTS5 results"""
if not search_text or not search_text.strip():
return None
# Clean the search text
cleaned = search_text.strip()
# Handle Romanian diacritics and common variations
replacements = {
'ă': 'a', 'â': 'a', 'î': 'i', 'ș': 's', 'ț': 't',
'Ă': 'A', 'Â': 'A', 'Î': 'I', 'Ș': 'S', 'Ț': 'T'
}
# Create both original and normalized versions for search
normalized = cleaned
for old, new in replacements.items():
normalized = normalized.replace(old, new)
# If different, search for both versions
if normalized != cleaned and len(cleaned.split()) == 1:
return f'"{cleaned}" OR "{normalized}"'
# For multi-word queries, use phrase search with fallback
if len(cleaned.split()) > 1:
# Try exact phrase first, then individual words
words = cleaned.split()
individual_terms = ' OR '.join(f'"{word}"' for word in words)
return f'"{cleaned}" OR ({individual_terms})'
return f'"{cleaned}"'
def _map_filters_to_db_fields(self, filters: Dict[str, str]) -> Dict[str, Any]:
"""Map web interface filters to database query parameters"""
db_filters = {}
for filter_key, filter_value in filters.items():
if not filter_value or not filter_value.strip():
continue
# content_type / language are NOT database query params — they are
# applied as Python post-filters in search_activities(). Skip them
# here so they never reach DatabaseManager.search_activities().
if filter_key in ('content_type', 'language'):
continue
# Map filter types to database fields
if filter_key == 'category':
db_filters['category'] = filter_value
elif filter_key == 'age_group':
# Parse age range (e.g., "5-8 ani", "12+ ani")
age_match = re.search(r'(\d+)(?:-(\d+))?\s*ani?', filter_value)
if age_match:
min_age = int(age_match.group(1))
max_age = int(age_match.group(2)) if age_match.group(2) else None
if max_age:
# Range like "5-8 ani"
db_filters['age_group_min'] = min_age
db_filters['age_group_max'] = max_age
else:
# Open range like "12+ ani"
db_filters['age_group_min'] = min_age
elif filter_key == 'participants':
# Parse participant range (e.g., "5-10 persoane", "30+ persoane")
part_match = re.search(r'(\d+)(?:-(\d+))?\s*persoan[eă]?', filter_value)
if part_match:
min_part = int(part_match.group(1))
max_part = int(part_match.group(2)) if part_match.group(2) else None
if max_part:
db_filters['participants_min'] = min_part
db_filters['participants_max'] = max_part
else:
db_filters['participants_min'] = min_part
elif filter_key == 'duration':
# Parse duration (e.g., "15-30 minute", "60+ minute")
dur_match = re.search(r'(\d+)(?:-(\d+))?\s*minut[eă]?', filter_value)
if dur_match:
min_dur = int(dur_match.group(1))
max_dur = int(dur_match.group(2)) if dur_match.group(2) else None
if max_dur:
db_filters['duration_min'] = min_dur
db_filters['duration_max'] = max_dur
else:
db_filters['duration_min'] = min_dur
elif filter_key == 'materials':
db_filters['materials_category'] = filter_value
elif filter_key == 'difficulty':
db_filters['difficulty_level'] = filter_value
elif filter_key == 'indoor_outdoor':
# Equality filter on the slug column (mirror difficulty).
db_filters['indoor_outdoor'] = filter_value
elif filter_key == 'space_needed':
db_filters['space_needed'] = filter_value
# Handle any other custom filters
else:
# Generic filter handling - try to match against keywords or tags
if 'keywords' not in db_filters:
db_filters['keywords'] = []
db_filters['keywords'].append(filter_value)
return db_filters
def _post_process_results(self,
results: List[Dict[str, Any]],
search_text: Optional[str],
filters: Dict[str, str]) -> List[Dict[str, Any]]:
"""Post-process results for better ranking and relevance"""
if not results:
return results
# If we have search text, boost results based on relevance
if search_text:
results = self._boost_search_relevance(results, search_text)
# Apply secondary ranking based on filters
if filters:
results = self._apply_filter_boost(results, filters)
# Ensure variety in categories if no specific category filter
if 'category' not in filters:
results = self._ensure_category_variety(results)
return results
def _boost_search_relevance(self,
results: List[Dict[str, Any]],
search_text: str) -> List[Dict[str, Any]]:
"""Boost results based on search text relevance"""
search_terms = search_text.lower().replace('"', '').split()
for result in results:
boost_score = 0
# Check name matches (highest priority)
# NB: use `or ''` — nullable columns come back as None, not ''.
name_lower = (result.get('name') or '').lower()
for term in search_terms:
if term in name_lower:
boost_score += 10
if name_lower.startswith(term):
boost_score += 5 # Extra boost for name starts with term
# Check description matches
desc_lower = (result.get('description') or '').lower()
for term in search_terms:
if term in desc_lower:
boost_score += 3
# Check keywords matches
keywords_lower = (result.get('keywords') or '').lower()
for term in search_terms:
if term in keywords_lower:
boost_score += 5
# Store boost score for sorting
result['_boost_score'] = boost_score
# Sort by boost score, then by existing search rank
results.sort(key=lambda x: (
x.get('_boost_score', 0),
x.get('search_rank', 0),
x.get('popularity_score', 0)
), reverse=True)
# Remove boost score from final results
for result in results:
result.pop('_boost_score', None)
return results
def _apply_filter_boost(self,
results: List[Dict[str, Any]],
filters: Dict[str, str]) -> List[Dict[str, Any]]:
"""Apply additional ranking based on filter preferences"""
# If user filtered by materials, boost activities with detailed material lists
if 'materials' in filters:
for result in results:
if result.get('materials_list') and len(result['materials_list']) > 50:
result['popularity_score'] = result.get('popularity_score', 0) + 1
# If user filtered by age, boost activities with specific age ranges
if 'age_group' in filters:
for result in results:
if result.get('age_group_min') and result.get('age_group_max'):
result['popularity_score'] = result.get('popularity_score', 0) + 1
return results
def _ensure_category_variety(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Ensure variety in categories when no specific category is filtered"""
if len(results) <= 10:
return results
# Group results by category
category_groups = {}
for result in results:
category = result.get('category', 'Unknown')
if category not in category_groups:
category_groups[category] = []
category_groups[category].append(result)
# If we have multiple categories, ensure balanced representation
if len(category_groups) > 1:
balanced_results = []
max_per_category = max(3, len(results) // len(category_groups))
# Take up to max_per_category from each category
for category, category_results in category_groups.items():
balanced_results.extend(category_results[:max_per_category])
# Add remaining results to reach original count
remaining_slots = len(results) - len(balanced_results)
if remaining_slots > 0:
remaining_results = []
for category_results in category_groups.values():
remaining_results.extend(category_results[max_per_category:])
# Sort remaining by relevance and add top ones
remaining_results.sort(key=lambda x: (
x.get('search_rank', 0),
x.get('popularity_score', 0)
), reverse=True)
balanced_results.extend(remaining_results[:remaining_slots])
return balanced_results
return results
def get_search_suggestions(self, partial_query: str, limit: int = 5) -> List[str]:
"""Get search suggestions based on partial query"""
if not partial_query or len(partial_query) < 2:
return []
try:
# Search for activities that match the partial query.
# Over-fetch then drop non-game content types so autocomplete
# mirrors the default search (no rețete / cântece / ceremonii).
results = self.db.search_activities(
search_text=f'"{partial_query}"',
limit=limit * 6
)
results = self._apply_content_type_filter(results, None, True)
suggestions = []
seen = set()
for result in results:
# Extract potential suggestions from name and keywords
name = result.get('name', '')
keywords = result.get('keywords', '')
# Add name if it contains the partial query
if partial_query.lower() in name.lower() and name not in seen:
suggestions.append(name)
seen.add(name)
# Add individual keywords that start with partial query
if keywords:
for keyword in keywords.split(','):
keyword = keyword.strip()
if (keyword.lower().startswith(partial_query.lower()) and
len(keyword) > len(partial_query) and
keyword not in seen):
suggestions.append(keyword)
seen.add(keyword)
if len(suggestions) >= limit:
break
return suggestions[:limit]
except Exception as e:
print(f"Error getting search suggestions: {e}")
return []