Files
game-library/app/services/search.py
Claude Agent 66ae831c36 Rebuild extraction pipeline infrastructure (Faza 0 prep)
Implements the approved plan to replace the broken regex/index-master
extraction with an LLM-subagent pipeline. Four parallel lanes:

Lane A — scripts/extract_common.py (PDF/docx/doc/pptx/html/zip, no
  max_pages truncation), normalize_sources.py, chunk_sources.py
  (~20pg chunks + overlap, manifest registry), activity_schema.json.
Lane B — app/config_taxonomy.py (16 fixed category slugs), schema
  rebuilt from scratch in app/models/ with content_type, language,
  source_files, source_excerpt, normalized_name, extraction_confidence,
  needs_review; FTS5 + 3 triggers extended with materials_list and
  skills_developed.
Lane C — build_database.py (--rebuild, atomic swap, schema + fuzzy
  source_excerpt validation, dedup with needs_review band),
  validate_extractions.py, review_queue.py, new run_extraction.py
  orchestrator, SUBAGENT_PROMPT.md.
Lane D — search.py content_type/language filters (default search
  excludes non-game content), E7 schema-compat audit; fixed a NULL
  keywords AttributeError in _boost_search_relevance.

Removes 8 orphaned/dead scripts and app/services/parser.py +
indexer.py. Adds tests/ (70 passing, 1 skipped — libreoffice absent).

Note: Lane D made one additive edit to app/models/database.py
(_update_category_counts) to surface content_type/language in
get_filter_options, outside its nominal lane boundary but after
Lane B completed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 17:43:38 +00:00

390 lines
16 KiB
Python

"""
Search service for INDEX-SISTEM-JOCURI v2.0
Enhanced search with FTS5 and intelligent filtering
"""
from typing import List, Dict, Any, Optional
from app.models.database import DatabaseManager
from app.config_taxonomy import NON_GAME_CONTENT_TYPES
import re
# Category slugs that are themselves "non-game" — selecting one of these as a
# category filter also lifts the default non-game content_type exclusion.
NON_GAME_CATEGORIES = {"retete", "cantece-ceremonii"}
# When a Python-side post-filter is active the DB LIMIT is applied *before*
# filtering, so we over-fetch to still satisfy the caller's `limit`.
_OVERSCAN_FACTOR = 5
_OVERSCAN_CAP = 2000
class SearchService:
"""Enhanced search service with intelligent query processing"""
def __init__(self, db_manager: DatabaseManager):
"""Initialize search service with database manager"""
self.db = db_manager
def search_activities(self,
search_text: Optional[str] = None,
filters: Optional[Dict[str, str]] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""
Enhanced search with intelligent filter mapping and query processing
"""
if filters is None:
filters = {}
# Process and normalize search text
processed_search = self._process_search_text(search_text)
# Map web filters to database fields
db_filters = self._map_filters_to_db_fields(filters)
# content_type and language are filtered in Python: the DB layer does
# not expose them as query parameters. The DEFAULT search excludes the
# non-game content types (rețete / cântece / ceremonii) — they surface
# only when the user explicitly filters that content_type, or picks a
# non-game category. See plan §6.
content_type, exclude_non_game = self._resolve_content_type_filter(filters)
language = (filters.get('language') or '').strip().lower() or None
post_filtering = bool(content_type or exclude_non_game or language)
# Over-fetch when post-filtering so the final list can still reach `limit`.
fetch_limit = min(limit * _OVERSCAN_FACTOR, _OVERSCAN_CAP) if post_filtering else limit
# Perform database search
results = self.db.search_activities(
search_text=processed_search,
**db_filters,
limit=fetch_limit
)
# Apply content_type / language post-filters
results = self._apply_content_type_filter(results, content_type, exclude_non_game)
if language:
results = [r for r in results
if (r.get('language') or '').strip().lower() == language]
# Post-process results for relevance and ranking, then honour `limit`
results = self._post_process_results(results, processed_search, filters)
return results[:limit]
def _resolve_content_type_filter(self, filters: Dict[str, str]):
"""Determine the content_type post-filter.
Returns (explicit_content_type | None, exclude_non_game: bool):
- an explicit `content_type` filter → that value, no exclusion;
- a `category` filter on a non-game category → no exclusion;
- otherwise → default search, exclude non-game content types.
"""
content_type = (filters.get('content_type') or '').strip()
if content_type:
return content_type, False
category = (filters.get('category') or '').strip()
if category in NON_GAME_CATEGORIES:
return None, False
return None, True
def _apply_content_type_filter(self,
results: List[Dict[str, Any]],
content_type: Optional[str],
exclude_non_game: bool) -> List[Dict[str, Any]]:
"""Filter results by content_type (explicit include vs default exclude)."""
if content_type:
return [r for r in results
if (r.get('content_type') or '') == content_type]
if exclude_non_game:
# Rows with NULL/unknown content_type are kept — only the known
# non-game types are dropped from the default search.
return [r for r in results
if (r.get('content_type') or '') not in NON_GAME_CONTENT_TYPES]
return results
def _process_search_text(self, search_text: Optional[str]) -> Optional[str]:
"""Process and enhance search text for better FTS5 results"""
if not search_text or not search_text.strip():
return None
# Clean the search text
cleaned = search_text.strip()
# Handle Romanian diacritics and common variations
replacements = {
'ă': 'a', 'â': 'a', 'î': 'i', 'ș': 's', 'ț': 't',
'Ă': 'A', 'Â': 'A', 'Î': 'I', 'Ș': 'S', 'Ț': 'T'
}
# Create both original and normalized versions for search
normalized = cleaned
for old, new in replacements.items():
normalized = normalized.replace(old, new)
# If different, search for both versions
if normalized != cleaned and len(cleaned.split()) == 1:
return f'"{cleaned}" OR "{normalized}"'
# For multi-word queries, use phrase search with fallback
if len(cleaned.split()) > 1:
# Try exact phrase first, then individual words
words = cleaned.split()
individual_terms = ' OR '.join(f'"{word}"' for word in words)
return f'"{cleaned}" OR ({individual_terms})'
return f'"{cleaned}"'
def _map_filters_to_db_fields(self, filters: Dict[str, str]) -> Dict[str, Any]:
"""Map web interface filters to database query parameters"""
db_filters = {}
for filter_key, filter_value in filters.items():
if not filter_value or not filter_value.strip():
continue
# content_type / language are NOT database query params — they are
# applied as Python post-filters in search_activities(). Skip them
# here so they never reach DatabaseManager.search_activities().
if filter_key in ('content_type', 'language'):
continue
# Map filter types to database fields
if filter_key == 'category':
db_filters['category'] = filter_value
elif filter_key == 'age_group':
# Parse age range (e.g., "5-8 ani", "12+ ani")
age_match = re.search(r'(\d+)(?:-(\d+))?\s*ani?', filter_value)
if age_match:
min_age = int(age_match.group(1))
max_age = int(age_match.group(2)) if age_match.group(2) else None
if max_age:
# Range like "5-8 ani"
db_filters['age_group_min'] = min_age
db_filters['age_group_max'] = max_age
else:
# Open range like "12+ ani"
db_filters['age_group_min'] = min_age
elif filter_key == 'participants':
# Parse participant range (e.g., "5-10 persoane", "30+ persoane")
part_match = re.search(r'(\d+)(?:-(\d+))?\s*persoan[eă]?', filter_value)
if part_match:
min_part = int(part_match.group(1))
max_part = int(part_match.group(2)) if part_match.group(2) else None
if max_part:
db_filters['participants_min'] = min_part
db_filters['participants_max'] = max_part
else:
db_filters['participants_min'] = min_part
elif filter_key == 'duration':
# Parse duration (e.g., "15-30 minute", "60+ minute")
dur_match = re.search(r'(\d+)(?:-(\d+))?\s*minut[eă]?', filter_value)
if dur_match:
min_dur = int(dur_match.group(1))
max_dur = int(dur_match.group(2)) if dur_match.group(2) else None
if max_dur:
db_filters['duration_min'] = min_dur
db_filters['duration_max'] = max_dur
else:
db_filters['duration_min'] = min_dur
elif filter_key == 'materials':
db_filters['materials_category'] = filter_value
elif filter_key == 'difficulty':
db_filters['difficulty_level'] = filter_value
# Handle any other custom filters
else:
# Generic filter handling - try to match against keywords or tags
if 'keywords' not in db_filters:
db_filters['keywords'] = []
db_filters['keywords'].append(filter_value)
return db_filters
def _post_process_results(self,
results: List[Dict[str, Any]],
search_text: Optional[str],
filters: Dict[str, str]) -> List[Dict[str, Any]]:
"""Post-process results for better ranking and relevance"""
if not results:
return results
# If we have search text, boost results based on relevance
if search_text:
results = self._boost_search_relevance(results, search_text)
# Apply secondary ranking based on filters
if filters:
results = self._apply_filter_boost(results, filters)
# Ensure variety in categories if no specific category filter
if 'category' not in filters:
results = self._ensure_category_variety(results)
return results
def _boost_search_relevance(self,
results: List[Dict[str, Any]],
search_text: str) -> List[Dict[str, Any]]:
"""Boost results based on search text relevance"""
search_terms = search_text.lower().replace('"', '').split()
for result in results:
boost_score = 0
# Check name matches (highest priority)
# NB: use `or ''` — nullable columns come back as None, not ''.
name_lower = (result.get('name') or '').lower()
for term in search_terms:
if term in name_lower:
boost_score += 10
if name_lower.startswith(term):
boost_score += 5 # Extra boost for name starts with term
# Check description matches
desc_lower = (result.get('description') or '').lower()
for term in search_terms:
if term in desc_lower:
boost_score += 3
# Check keywords matches
keywords_lower = (result.get('keywords') or '').lower()
for term in search_terms:
if term in keywords_lower:
boost_score += 5
# Store boost score for sorting
result['_boost_score'] = boost_score
# Sort by boost score, then by existing search rank
results.sort(key=lambda x: (
x.get('_boost_score', 0),
x.get('search_rank', 0),
x.get('popularity_score', 0)
), reverse=True)
# Remove boost score from final results
for result in results:
result.pop('_boost_score', None)
return results
def _apply_filter_boost(self,
results: List[Dict[str, Any]],
filters: Dict[str, str]) -> List[Dict[str, Any]]:
"""Apply additional ranking based on filter preferences"""
# If user filtered by materials, boost activities with detailed material lists
if 'materials' in filters:
for result in results:
if result.get('materials_list') and len(result['materials_list']) > 50:
result['popularity_score'] = result.get('popularity_score', 0) + 1
# If user filtered by age, boost activities with specific age ranges
if 'age_group' in filters:
for result in results:
if result.get('age_group_min') and result.get('age_group_max'):
result['popularity_score'] = result.get('popularity_score', 0) + 1
return results
def _ensure_category_variety(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Ensure variety in categories when no specific category is filtered"""
if len(results) <= 10:
return results
# Group results by category
category_groups = {}
for result in results:
category = result.get('category', 'Unknown')
if category not in category_groups:
category_groups[category] = []
category_groups[category].append(result)
# If we have multiple categories, ensure balanced representation
if len(category_groups) > 1:
balanced_results = []
max_per_category = max(3, len(results) // len(category_groups))
# Take up to max_per_category from each category
for category, category_results in category_groups.items():
balanced_results.extend(category_results[:max_per_category])
# Add remaining results to reach original count
remaining_slots = len(results) - len(balanced_results)
if remaining_slots > 0:
remaining_results = []
for category_results in category_groups.values():
remaining_results.extend(category_results[max_per_category:])
# Sort remaining by relevance and add top ones
remaining_results.sort(key=lambda x: (
x.get('search_rank', 0),
x.get('popularity_score', 0)
), reverse=True)
balanced_results.extend(remaining_results[:remaining_slots])
return balanced_results
return results
def get_search_suggestions(self, partial_query: str, limit: int = 5) -> List[str]:
"""Get search suggestions based on partial query"""
if not partial_query or len(partial_query) < 2:
return []
try:
# Search for activities that match the partial query.
# Over-fetch then drop non-game content types so autocomplete
# mirrors the default search (no rețete / cântece / ceremonii).
results = self.db.search_activities(
search_text=f'"{partial_query}"',
limit=limit * 6
)
results = self._apply_content_type_filter(results, None, True)
suggestions = []
seen = set()
for result in results:
# Extract potential suggestions from name and keywords
name = result.get('name', '')
keywords = result.get('keywords', '')
# Add name if it contains the partial query
if partial_query.lower() in name.lower() and name not in seen:
suggestions.append(name)
seen.add(name)
# Add individual keywords that start with partial query
if keywords:
for keyword in keywords.split(','):
keyword = keyword.strip()
if (keyword.lower().startswith(partial_query.lower()) and
len(keyword) > len(partial_query) and
keyword not in seen):
suggestions.append(keyword)
seen.add(keyword)
if len(suggestions) >= limit:
break
return suggestions[:limit]
except Exception as e:
print(f"Error getting search suggestions: {e}")
return []