""" OCR Job Worker - Background Task for Queue Processing Runs as an asyncio background task in FastAPI. Continuously polls the job queue and processes OCR requests. Architecture: FastAPI startup ↓ start_job_worker() ↓ asyncio.create_task(_job_worker_loop()) ↓ while True: job = job_queue.get_next_pending() if job: result = ocr_worker_pool.submit_task(...) job_queue.update_status(...) await asyncio.sleep(0.5) """ import asyncio import logging import time from pathlib import Path from typing import Optional from .job_queue import job_queue, OCRJobStatus, OCRJob from .ocr_worker_pool import ocr_worker_pool logger = logging.getLogger(__name__) # Global task reference _job_worker_task: Optional[asyncio.Task] = None _cleanup_task: Optional[asyncio.Task] = None _shutdown_event: Optional[asyncio.Event] = None # Configuration POLL_INTERVAL_SECONDS = 0.5 # How often to check for new jobs CLEANUP_INTERVAL_SECONDS = 3600 # Clean expired jobs every hour OCR_TIMEOUT_SECONDS = 120 # Max time for OCR processing async def _job_worker_loop() -> None: """ Main worker loop - processes jobs from queue. Runs continuously until shutdown. Polls queue every 0.5s and submits jobs to worker pool for processing. """ global _shutdown_event logger.info("[JobWorker] Starting worker loop...") _shutdown_event = asyncio.Event() consecutive_errors = 0 max_consecutive_errors = 5 while not _shutdown_event.is_set(): try: # Get next pending job job = await job_queue.get_next_pending() if job: consecutive_errors = 0 # Reset error counter on success await _process_job(job) else: # No jobs - wait before polling again try: await asyncio.wait_for( _shutdown_event.wait(), timeout=POLL_INTERVAL_SECONDS ) if _shutdown_event.is_set(): break except asyncio.TimeoutError: pass # Normal timeout, continue loop except asyncio.CancelledError: logger.info("[JobWorker] Worker loop cancelled") break except Exception as e: consecutive_errors += 1 logger.error(f"[JobWorker] Error in worker loop ({consecutive_errors}/{max_consecutive_errors}): {e}") if consecutive_errors >= max_consecutive_errors: logger.error("[JobWorker] Too many consecutive errors, stopping worker") break # Backoff on errors await asyncio.sleep(min(consecutive_errors * 2, 30)) logger.info("[JobWorker] Worker loop stopped") async def _process_job(job: OCRJob) -> None: """ Process a single OCR job. Reads file, submits to worker pool, updates job status. Args: job: OCRJob to process """ logger.info(f"[JobWorker] Processing job {job.id}: engine={job.engine}, file={Path(job.file_path).name}") start_time = time.time() try: # Mark as processing await job_queue.update_status(job.id, OCRJobStatus.processing) # Read file bytes file_path = Path(job.file_path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") with open(file_path, 'rb') as f: file_bytes = f.read() # Submit to worker pool result = await ocr_worker_pool.submit_task( image_bytes=file_bytes, engine=job.engine, preprocessing="auto", timeout=OCR_TIMEOUT_SECONDS ) elapsed_ms = int((time.time() - start_time) * 1000) if result.get("success"): # Job completed successfully extraction = result.get("extraction", {}) await job_queue.update_status( job_id=job.id, status=OCRJobStatus.completed, result=extraction, processing_time_ms=elapsed_ms ) logger.info(f"[JobWorker] Job {job.id} completed in {elapsed_ms}ms") else: # Job failed error_msg = result.get("error", "Unknown error") await job_queue.update_status( job_id=job.id, status=OCRJobStatus.failed, error=error_msg, processing_time_ms=elapsed_ms ) logger.warning(f"[JobWorker] Job {job.id} failed after {elapsed_ms}ms: {error_msg}") except Exception as e: elapsed_ms = int((time.time() - start_time) * 1000) logger.error(f"[JobWorker] Job {job.id} error after {elapsed_ms}ms: {e}") await job_queue.update_status( job_id=job.id, status=OCRJobStatus.failed, error=str(e), processing_time_ms=elapsed_ms ) finally: # Cleanup file after processing try: await job_queue.cleanup_job_file(job.id) except Exception as e: logger.warning(f"[JobWorker] Failed to cleanup file for job {job.id}: {e}") async def _cleanup_loop() -> None: """ Periodic cleanup of expired jobs. Runs every hour to delete jobs older than 24 hours. """ global _shutdown_event logger.info("[JobWorker] Starting cleanup loop...") while not _shutdown_event.is_set(): try: # Wait for interval or shutdown try: await asyncio.wait_for( _shutdown_event.wait(), timeout=CLEANUP_INTERVAL_SECONDS ) if _shutdown_event.is_set(): break except asyncio.TimeoutError: pass # Normal timeout, do cleanup # Run cleanup deleted = await job_queue.cleanup_expired() if deleted > 0: logger.info(f"[JobWorker] Cleanup: deleted {deleted} expired jobs") except asyncio.CancelledError: logger.info("[JobWorker] Cleanup loop cancelled") break except Exception as e: logger.error(f"[JobWorker] Cleanup error: {e}") await asyncio.sleep(60) # Retry after 1 minute logger.info("[JobWorker] Cleanup loop stopped") async def start_job_worker() -> bool: """ Start the job worker background task. Called at FastAPI startup to begin processing queue. Returns: True if started successfully """ global _job_worker_task, _cleanup_task, _shutdown_event if _job_worker_task is not None and not _job_worker_task.done(): logger.warning("[JobWorker] Already running") return True try: # Initialize job queue await job_queue.initialize() # Initialize worker pool if not ocr_worker_pool.initialize(): logger.error("[JobWorker] Failed to initialize worker pool") return False # Pre-warm worker pool in BACKGROUND (don't block startup) # First OCR request may be slower if prewarm isn't done yet async def _background_prewarm(): logger.info("[JobWorker] Pre-warming OCR worker pool (background)...") warmup_success = await ocr_worker_pool.prewarm(timeout=90.0) if warmup_success: logger.info("[JobWorker] ✅ OCR worker pool pre-warmed successfully") else: logger.warning("[JobWorker] ⚠️ Worker pool pre-warm failed, first request will be slower") asyncio.create_task(_background_prewarm()) # Start worker loop _shutdown_event = asyncio.Event() _job_worker_task = asyncio.create_task(_job_worker_loop()) # Start cleanup loop _cleanup_task = asyncio.create_task(_cleanup_loop()) logger.info("[JobWorker] Started successfully") return True except Exception as e: logger.error(f"[JobWorker] Failed to start: {e}") return False async def stop_job_worker() -> None: """ Stop the job worker background task. Called at FastAPI shutdown to gracefully stop processing. """ global _job_worker_task, _cleanup_task, _shutdown_event logger.info("[JobWorker] Stopping...") # Signal shutdown if _shutdown_event: _shutdown_event.set() # Cancel worker task if _job_worker_task and not _job_worker_task.done(): _job_worker_task.cancel() try: await _job_worker_task except asyncio.CancelledError: pass # Cancel cleanup task if _cleanup_task and not _cleanup_task.done(): _cleanup_task.cancel() try: await _cleanup_task except asyncio.CancelledError: pass # Shutdown worker pool ocr_worker_pool.shutdown(wait=True) _job_worker_task = None _cleanup_task = None _shutdown_event = None logger.info("[JobWorker] Stopped") def is_running() -> bool: """Check if job worker is running.""" return _job_worker_task is not None and not _job_worker_task.done() def estimate_wait_time(queue_position: int) -> int: """ Estimate wait time for a job in queue. Args: queue_position: Position in queue (1 = next) Returns: Estimated wait time in seconds """ if queue_position <= 0: return 0 # Get average processing time (synchronous fallback) # Default ~7 seconds per job if no data avg_time = 7.0 try: # Try to get from queue stats import asyncio loop = asyncio.get_event_loop() if loop.is_running(): # Can't use sync call in async context, use default pass else: avg_time = loop.run_until_complete(job_queue.get_average_processing_time()) except Exception: pass # Estimate: position * average_time return int(queue_position * avg_time)