"""CRUD operations for OCR settings and metrics.""" from datetime import datetime, timedelta from typing import List, Optional from sqlalchemy import func, select, and_ from sqlalchemy.ext.asyncio import AsyncSession from backend.modules.data_entry.db.models.ocr_settings import ( UserOCRPreference, OCRJobMetrics, OCRMetricsSummary, OCREngine, ) class OCRPreferenceCRUD: """CRUD operations for user OCR preferences.""" @staticmethod async def get_by_username(session: AsyncSession, username: str) -> Optional[UserOCRPreference]: """Get user's OCR preference by username.""" result = await session.execute( select(UserOCRPreference).where(UserOCRPreference.username == username) ) return result.scalar_one_or_none() @staticmethod async def create_or_update( session: AsyncSession, username: str, preferred_engine: OCREngine ) -> UserOCRPreference: """Create or update user's OCR preference.""" existing = await OCRPreferenceCRUD.get_by_username(session, username) if existing: existing.preferred_engine = preferred_engine existing.updated_at = datetime.utcnow() await session.commit() await session.refresh(existing) return existing else: preference = UserOCRPreference( username=username, preferred_engine=preferred_engine ) session.add(preference) await session.commit() await session.refresh(preference) return preference @staticmethod async def delete_by_username(session: AsyncSession, username: str) -> bool: """Delete user's OCR preference.""" existing = await OCRPreferenceCRUD.get_by_username(session, username) if existing: await session.delete(existing) await session.commit() return True return False class OCRMetricsCRUD: """CRUD operations for OCR job metrics.""" @staticmethod async def create( session: AsyncSession, job_id: str, username: str, engine_requested: str, engine_used: str, processing_time_ms: int = 0, file_size_bytes: int = 0, file_type: str = "image/jpeg", original_filename: Optional[str] = None, success: bool = True, error_message: Optional[str] = None, overall_confidence: float = 0.0, fields_extracted: int = 0, needs_manual_review: Optional[bool] = None, validation_warnings_count: int = 0, validation_errors_count: int = 0, company_id: Optional[int] = None ) -> OCRJobMetrics: """Create a new OCR job metrics record.""" metrics = OCRJobMetrics( job_id=job_id, username=username, company_id=company_id, engine_requested=engine_requested, engine_used=engine_used, processing_time_ms=processing_time_ms, file_size_bytes=file_size_bytes, file_type=file_type, original_filename=original_filename, success=success, error_message=error_message, overall_confidence=overall_confidence, fields_extracted=fields_extracted, needs_manual_review=needs_manual_review, validation_warnings_count=validation_warnings_count, validation_errors_count=validation_errors_count, ) session.add(metrics) await session.commit() await session.refresh(metrics) return metrics @staticmethod async def get_by_job_id(session: AsyncSession, job_id: str) -> Optional[OCRJobMetrics]: """Get metrics by job ID.""" result = await session.execute( select(OCRJobMetrics).where(OCRJobMetrics.job_id == job_id) ) return result.scalar_one_or_none() @staticmethod async def get_user_history( session: AsyncSession, username: str, limit: int = 50, offset: int = 0 ) -> List[OCRJobMetrics]: """Get user's OCR job history.""" result = await session.execute( select(OCRJobMetrics) .where(OCRJobMetrics.username == username) .order_by(OCRJobMetrics.created_at.desc()) .limit(limit) .offset(offset) ) return list(result.scalars().all()) @staticmethod async def get_summary_by_engine( session: AsyncSession, days: int = 30, username: Optional[str] = None ) -> List[OCRMetricsSummary]: """Get summary metrics grouped by engine.""" cutoff_date = datetime.utcnow() - timedelta(days=days) # Build query conditions = [OCRJobMetrics.created_at >= cutoff_date] if username: conditions.append(OCRJobMetrics.username == username) # Query for aggregated metrics result = await session.execute( select( OCRJobMetrics.engine_used, func.count(OCRJobMetrics.id).label('total_jobs'), func.sum(func.cast(OCRJobMetrics.success, sa.Integer)).label('successful_jobs'), func.avg(OCRJobMetrics.processing_time_ms).label('avg_processing_time_ms'), func.avg(OCRJobMetrics.overall_confidence).label('avg_confidence'), func.avg(OCRJobMetrics.fields_extracted).label('avg_fields_extracted'), ) .where(and_(*conditions)) .group_by(OCRJobMetrics.engine_used) .order_by(func.count(OCRJobMetrics.id).desc()) ) summaries = [] for row in result.all(): total = row.total_jobs or 0 successful = row.successful_jobs or 0 success_rate = successful / total if total > 0 else 0.0 summaries.append(OCRMetricsSummary( engine=row.engine_used, total_jobs=total, successful_jobs=successful, failed_jobs=total - successful, success_rate=success_rate, avg_processing_time_ms=float(row.avg_processing_time_ms or 0), avg_confidence=float(row.avg_confidence or 0), avg_fields_extracted=float(row.avg_fields_extracted or 0), )) return summaries @staticmethod async def get_overall_stats( session: AsyncSession, days: int = 30, username: Optional[str] = None ) -> dict: """Get overall OCR statistics.""" cutoff_date = datetime.utcnow() - timedelta(days=days) conditions = [OCRJobMetrics.created_at >= cutoff_date] if username: conditions.append(OCRJobMetrics.username == username) result = await session.execute( select( func.count(OCRJobMetrics.id).label('total_jobs'), func.sum(func.cast(OCRJobMetrics.success, sa.Integer)).label('successful_jobs'), func.avg(OCRJobMetrics.processing_time_ms).label('avg_processing_time_ms'), func.avg(OCRJobMetrics.overall_confidence).label('avg_confidence'), ) .where(and_(*conditions)) ) row = result.one() total = row.total_jobs or 0 successful = row.successful_jobs or 0 return { "total_jobs": total, "successful_jobs": successful, "failed_jobs": total - successful, "success_rate": (successful / total * 100) if total > 0 else 0.0, "avg_processing_time_ms": float(row.avg_processing_time_ms or 0), "avg_confidence": float(row.avg_confidence or 0), "period_days": days, } # Import sqlalchemy for func.cast import sqlalchemy as sa