""" ROA2WEB Unified Backend - Single FastAPI Application Consolidates Reports, Data Entry, and Telegram modules into one process """ # ============================================================================= # UTF-8 ENCODING FIX FOR WINDOWS CONSOLE # Must be at the TOP, before any logging or print statements # Fixes: 'charmap' codec can't encode character errors with Romanian diacritics # ============================================================================= import sys if sys.platform == 'win32': # Force UTF-8 encoding on Windows console # This prevents encoding errors when logging Romanian characters (ă, î, ș, ț) import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') import asyncio import logging import os import sys from datetime import datetime from pathlib import Path from contextlib import asynccontextmanager from dotenv import load_dotenv from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware # Load environment variables load_dotenv() # Add project root and shared modules to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # Enable 'from backend.xxx import yyy' sys.path.insert(0, str(project_root / "shared")) # Enable 'from shared.xxx import yyy' # Import configuration from backend.config import settings # Import shared infrastructure from shared.database.oracle_pool import oracle_pool from shared.auth.middleware import AuthenticationMiddleware from shared.auth.routes import create_auth_router from shared.routes.companies import create_companies_router from shared.routes.calendar import create_calendar_router from shared.routes.system import create_system_router # Import module router factories from backend.modules.reports.routers import create_reports_router from backend.modules.data_entry.routers import create_data_entry_router from backend.modules.telegram.routers import create_telegram_router # Configure logging (level from env: DEBUG, INFO, WARNING, ERROR) log_level = os.getenv('LOG_LEVEL', 'INFO').upper() logging.basicConfig( level=getattr(logging, log_level, logging.INFO), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) # Reduce noise from third-party libraries logging.getLogger('httpcore').setLevel(logging.WARNING) logging.getLogger('httpx').setLevel(logging.WARNING) logging.getLogger('multipart').setLevel(logging.WARNING) logging.getLogger('doctr').setLevel(logging.WARNING) logging.getLogger('tensorflow').setLevel(logging.WARNING) logging.getLogger('PIL').setLevel(logging.WARNING) logger = logging.getLogger(__name__) # Global variables for background tasks telegram_bot_task = None ocr_job_worker_running = False cleanup_task_running = False email_cache_running = False ssh_tunnel_monitoring = False # ============================================================================ # INITIALIZATION FUNCTIONS # ============================================================================ async def init_oracle_pool(): """Initialize Oracle connection pool (shared by all modules).""" logger.info("[ORACLE] Initializing connection pool...") # Get configured servers servers = settings.get_oracle_servers() if servers: # Multi-server mode: register all servers for lazy pool creation logger.info(f"[ORACLE] Registering {len(servers)} servers for lazy pool creation:") for srv in servers: oracle_pool.register_server( server_id=srv.id, host=srv.host, port=srv.port, user=srv.user, password=srv.password, sid=srv.sid, service_name=srv.service_name, ) logger.info(f"[ORACLE] - {srv.id}: {srv.name} @ {srv.host}:{srv.port}") # Mark as initialized (pools will be created lazily on first connection) await oracle_pool.initialize() else: # Legacy single-server mode: initialize with env vars logger.info("[ORACLE] Using legacy single-server configuration") await oracle_pool.initialize() logger.info("[ORACLE] ✅ Pool manager initialized successfully") async def init_reports_cache(): """Initialize Reports cache system.""" logger.info("[REPORTS] Initializing cache system...") try: from backend.modules.reports.cache import init_cache, init_event_monitor, get_cache from backend.modules.reports.cache.config import CacheConfig cache_config = CacheConfig.from_env() await init_cache(cache_config) logger.info(f"[REPORTS] ✅ Cache initialized: type={cache_config.cache_type}, enabled={cache_config.enabled}") # Initialize event monitor cache = get_cache() await init_event_monitor(cache, cache_config) if cache_config.auto_invalidate_enabled: logger.info("[REPORTS] Event-based auto-invalidation ENABLED") else: logger.info("[REPORTS] Event-based auto-invalidation DISABLED") except Exception as e: logger.error(f"[REPORTS] ⚠️ Cache initialization error: {e}", exc_info=True) logger.warning("[REPORTS] Continuing without cache") async def init_data_entry_db(): """Initialize Data Entry SQLite database.""" logger.info("[DATA-ENTRY] Initializing SQLite database...") try: from backend.modules.data_entry.db.database import init_db await init_db() logger.info(f"[DATA-ENTRY] ✅ Database initialized: {settings.data_entry_sqlite_database_path}") # Ensure upload directory exists settings.data_entry_upload_path_resolved logger.info(f"[DATA-ENTRY] Upload path: {settings.data_entry_upload_path_resolved}") except Exception as e: logger.error(f"[DATA-ENTRY] ❌ Database initialization error: {e}", exc_info=True) raise async def init_telegram_db(): """Initialize Telegram SQLite database.""" logger.info("[TELEGRAM] Initializing SQLite database...") try: from backend.modules.telegram.db import init_database, cleanup_expired_codes, cleanup_expired_sessions, cleanup_expired_email_codes await init_database() logger.info(f"[TELEGRAM] ✅ Database initialized: {settings.telegram_sqlite_database_path}") # Cleanup expired data expired_codes = await cleanup_expired_codes() expired_sessions = await cleanup_expired_sessions() expired_email_codes = await cleanup_expired_email_codes() logger.info(f"[TELEGRAM] Cleanup: {expired_codes} codes, {expired_sessions} sessions, {expired_email_codes} email codes removed") except Exception as e: logger.error(f"[TELEGRAM] ❌ Database initialization error: {e}", exc_info=True) raise async def init_ocr_job_worker(): """Initialize OCR job worker with persistent PaddleOCR. This replaces the old background thread approach: - Starts ProcessPoolExecutor with persistent worker - Pre-warms PaddleOCR (loads once, reuses for all requests) - Starts job queue background task """ global ocr_job_worker_running logger.info("[OCR] Initializing OCR job worker...") try: from backend.modules.data_entry.services.ocr.job_worker import start_job_worker, is_running success = await start_job_worker() ocr_job_worker_running = is_running() if success: logger.info("[OCR] ✅ Job worker started (PaddleOCR persistent)") else: logger.warning("[OCR] ⚠️ Job worker failed to start, falling back to sync mode") except Exception as e: logger.warning(f"[OCR] ⚠️ OCR job worker init failed: {e}") logger.warning("[OCR] Continuing with sync OCR mode") ocr_job_worker_running = False async def init_cleanup_task(): """Initialize the cleanup background task for expired failed receipts (US-008). Runs cleanup at startup and then every 24 hours: - Finds receipts with processing_status='failed' older than 7 days - Deletes the receipts and their attachment files from storage """ global cleanup_task_running logger.info("[CLEANUP] Initializing cleanup background task...") try: from backend.modules.data_entry.services.cleanup_service import start_cleanup_task from backend.modules.data_entry.db.database import get_session success = await start_cleanup_task(get_session) cleanup_task_running = success if success: logger.info("[CLEANUP] ✅ Cleanup task started (runs daily)") else: logger.warning("[CLEANUP] ⚠️ Cleanup task failed to start") except Exception as e: logger.warning(f"[CLEANUP] ⚠️ Cleanup task init failed: {e}") cleanup_task_running = False async def init_email_server_cache(): """Initialize the email-server cache for multi-Oracle auto-discovery (US-003). Builds a cache mapping emails to server IDs by querying CONTAFIN_ORACLE.UTILIZATORI on each configured Oracle server. Starts auto-refresh every 15 minutes. """ global email_cache_running # Only initialize if multi-server mode is configured servers = settings.get_oracle_servers() if not servers or len(servers) <= 1: logger.info("[EMAIL-CACHE] Single-server mode, skipping email cache initialization") return logger.info("[EMAIL-CACHE] Initializing email-server cache...") try: from shared.auth.email_server_cache import ( email_server_cache, build_email_cache, start_email_cache_refresh ) # Build initial cache await build_email_cache() # Start auto-refresh await start_email_cache_refresh() email_cache_running = True stats = email_server_cache.get_cache_stats() logger.info(f"[EMAIL-CACHE] ✅ Cache initialized: {stats['total_emails']} emails") except Exception as e: logger.warning(f"[EMAIL-CACHE] ⚠️ Cache init failed: {e}") logger.warning("[EMAIL-CACHE] Multi-server email lookup will not be available") email_cache_running = False async def init_ssh_tunnel_monitoring(): """Initialize SSH tunnel monitoring with auto-reconnect. This does NOT start tunnels - they should already be running (started by start.sh / start.ps1 / start-backend-service.ps1). Responsibilities: - Monitor tunnel health via port checks (every 30s) - Auto-restart tunnels if they go down - Expose status for /health endpoint """ global ssh_tunnel_monitoring logger.info("[SSH-MONITOR] Initializing tunnel monitoring...") try: from backend.shared.ssh_tunnel_manager import ssh_tunnel_manager success = await ssh_tunnel_manager.start_monitoring() ssh_tunnel_monitoring = success if success: status = ssh_tunnel_manager.get_status() if status["status"] == "not_configured": logger.info("[SSH-MONITOR] No tunnels configured (direct connection mode)") else: logger.info(f"[SSH-MONITOR] ✅ Monitoring active: {status['status']}") else: logger.warning("[SSH-MONITOR] ⚠️ Failed to start monitoring") except Exception as e: logger.warning(f"[SSH-MONITOR] ⚠️ Init failed: {e}") ssh_tunnel_monitoring = False async def run_telegram_bot(): """Run Telegram bot as background task.""" logger.info("[TELEGRAM] Starting bot...") try: from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters from backend.modules.telegram.bot.handlers import ( start_command, help_command, clear_command, companies_command, unlink_command, selectcompany_command, dashboard_command, sold_command, facturi_command, trezorerie_command, menu_command, trezorerie_casa_command, trezorerie_banca_command, clienti_command, furnizori_command, evolutie_command, clearcache_command, togglecache_command, handle_text_message, button_callback, error_handler ) from backend.modules.telegram.bot.email_handlers import email_login_handler # Create Telegram application application = Application.builder().token(settings.telegram_bot_token).build() # Register handlers application.add_handler(email_login_handler) application.add_handler(CommandHandler("start", start_command)) application.add_handler(CommandHandler("menu", menu_command)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("unlink", unlink_command)) application.add_handler(CommandHandler("clear", clear_command)) application.add_handler(CommandHandler("companies", companies_command)) application.add_handler(CommandHandler("selectcompany", selectcompany_command)) application.add_handler(CommandHandler("dashboard", dashboard_command)) application.add_handler(CommandHandler("sold", sold_command)) application.add_handler(CommandHandler("facturi", facturi_command)) application.add_handler(CommandHandler("trezorerie", trezorerie_command)) application.add_handler(CommandHandler("trezorerie_casa", trezorerie_casa_command)) application.add_handler(CommandHandler("trezorerie_banca", trezorerie_banca_command)) application.add_handler(CommandHandler("clienti", clienti_command)) application.add_handler(CommandHandler("furnizori", furnizori_command)) application.add_handler(CommandHandler("evolutie", evolutie_command)) application.add_handler(CommandHandler("clearcache", clearcache_command)) application.add_handler(CommandHandler("togglecache", togglecache_command)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message)) application.add_handler(CallbackQueryHandler(button_callback)) application.add_error_handler(error_handler) # Initialize and start await application.initialize() await application.start() await application.updater.start_polling( drop_pending_updates=True, poll_interval=0, # No delay between polls timeout=30 # Long poll timeout 30 seconds (reduces requests from ~6/min to ~2/min) ) bot_info = await application.bot.get_me() logger.info(f"[TELEGRAM] ✅ Bot running: @{bot_info.username}") # Keep bot running while True: await asyncio.sleep(1) except asyncio.CancelledError: logger.info("[TELEGRAM] Bot task cancelled, stopping...") if 'application' in locals(): await application.updater.stop() await application.stop() await application.shutdown() logger.info("[TELEGRAM] ✅ Bot stopped") raise except Exception as e: logger.error(f"[TELEGRAM] ❌ Bot error: {e}", exc_info=True) raise # ============================================================================ # FASTAPI APPLICATION # ============================================================================ app = FastAPI( title="ROA2WEB Unified Backend", description="Unified FastAPI backend for Reports, Data Entry, and Telegram modules", version="1.0.0" ) # ============================================================================ # STARTUP/SHUTDOWN EVENT HANDLERS # ============================================================================ @app.on_event("startup") async def startup_event(): """Application startup - Initialize all resources.""" global telegram_bot_task logger.info("=" * 80) logger.info("[STARTUP] ROA2WEB Unified Backend") logger.info("=" * 80) try: # Step 1: Initialize Oracle pool (shared by all modules) await init_oracle_pool() # Step 2: Parallel initialization of module-specific resources logger.info("[STARTUP] Initializing module resources in parallel...") await asyncio.gather( init_reports_cache(), init_data_entry_db(), init_telegram_db(), ) # Step 3: Initialize OCR job worker (with persistent PaddleOCR) await init_ocr_job_worker() # Step 4: Initialize cleanup task for expired failed receipts (US-008) await init_cleanup_task() # Step 5: Initialize email-server cache for multi-Oracle (US-003) await init_email_server_cache() # Step 6: Initialize SSH tunnel monitoring (auto-reconnect) await init_ssh_tunnel_monitoring() # Step 7: Start Telegram bot as background task if settings.telegram_bot_token: telegram_bot_task = asyncio.create_task(run_telegram_bot()) logger.info("[STARTUP] ✅ Telegram bot task created") else: logger.warning("[STARTUP] ⚠️ TELEGRAM_BOT_TOKEN not set, bot disabled") logger.info("=" * 80) logger.info("[STARTUP] ✅ All modules initialized successfully") logger.info(f"[STARTUP] ✅ Server running on http://{settings.api_host}:{settings.api_port}") logger.info("=" * 80) except Exception as e: logger.error(f"[STARTUP] ❌ Initialization failed: {e}", exc_info=True) raise @app.on_event("shutdown") async def shutdown_event(): """Application shutdown - Cleanup resources.""" global telegram_bot_task, ocr_job_worker_running, cleanup_task_running, email_cache_running, ssh_tunnel_monitoring logger.info("=" * 80) logger.info("[SHUTDOWN] Stopping ROA2WEB Unified Backend...") logger.info("=" * 80) try: # Stop SSH tunnel monitoring if ssh_tunnel_monitoring: logger.info("[SHUTDOWN] Stopping SSH tunnel monitoring...") try: from backend.shared.ssh_tunnel_manager import ssh_tunnel_manager await ssh_tunnel_manager.stop_monitoring() ssh_tunnel_monitoring = False logger.info("[SHUTDOWN] SSH tunnel monitoring stopped") except Exception as e: logger.error(f"[SHUTDOWN] SSH tunnel monitoring error: {e}") # Stop email cache auto-refresh (US-003) if email_cache_running: logger.info("[SHUTDOWN] Stopping email cache auto-refresh...") try: from shared.auth.email_server_cache import stop_email_cache_refresh await stop_email_cache_refresh() email_cache_running = False logger.info("[SHUTDOWN] Email cache stopped") except Exception as e: logger.error(f"[SHUTDOWN] Email cache error: {e}") # Stop cleanup task (US-008) if cleanup_task_running: logger.info("[SHUTDOWN] Stopping cleanup task...") try: from backend.modules.data_entry.services.cleanup_service import stop_cleanup_task await stop_cleanup_task() cleanup_task_running = False logger.info("[SHUTDOWN] Cleanup task stopped") except Exception as e: logger.error(f"[SHUTDOWN] Cleanup task error: {e}") # Stop OCR job worker if ocr_job_worker_running: logger.info("[SHUTDOWN] Stopping OCR job worker...") try: from backend.modules.data_entry.services.ocr.job_worker import stop_job_worker await stop_job_worker() ocr_job_worker_running = False logger.info("[SHUTDOWN] OCR job worker stopped") except Exception as e: logger.error(f"[SHUTDOWN] OCR worker error: {e}") # Stop Telegram bot if telegram_bot_task and not telegram_bot_task.done(): logger.info("[SHUTDOWN] Stopping Telegram bot...") telegram_bot_task.cancel() try: await telegram_bot_task except asyncio.CancelledError: pass # Stop Reports cache event monitor try: from backend.modules.reports.cache import close_cache, get_event_monitor monitor = get_event_monitor() if monitor: await monitor.stop() logger.info("[SHUTDOWN] Reports cache monitor stopped") await close_cache() logger.info("[SHUTDOWN] Reports cache closed") except Exception as e: logger.error(f"[SHUTDOWN] Cache error: {e}") # Close Oracle pool await oracle_pool.close_pool() logger.info("[SHUTDOWN] Oracle pool closed") logger.info("=" * 80) logger.info("[SHUTDOWN] ✅ Shutdown complete") logger.info("=" * 80) except Exception as e: logger.error(f"[SHUTDOWN] Error during shutdown: {e}", exc_info=True) # ============================================================================ # MIDDLEWARE # ============================================================================ # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins for production deployment allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Authentication middleware app.add_middleware( AuthenticationMiddleware, excluded_paths=[ "/", "/docs", "/health", "/redoc", "/openapi.json", "/api/auth/login", "/api/auth/refresh", "/api/auth/check-email", "/api/auth/check-identity", # US-013: Dual login support (email + username) "/api/system/auth-mode", # Public endpoint for login mode detection "/api/telegram/auth/verify-user", "/api/telegram/auth/verify-email", "/api/telegram/auth/login-with-email", "/api/telegram/auth/refresh-token", "/api/telegram/health", "/api/telegram/internal/save-code" ] ) # ============================================================================ # ROUTER REGISTRATION # ============================================================================ # Module routers with prefixes app.include_router(create_reports_router(), prefix="/api/reports", tags=["reports"]) app.include_router(create_data_entry_router(), prefix="/api/data-entry", tags=["data-entry"]) app.include_router(create_telegram_router(), prefix="/api/telegram", tags=["telegram"]) # Shared routers auth_router = create_auth_router(prefix="", tags=["authentication"]) app.include_router(auth_router, prefix="/api/auth") companies_router = create_companies_router(oracle_pool, tags=["companies"]) app.include_router(companies_router, prefix="/api/companies") calendar_router = create_calendar_router(oracle_pool, tags=["calendar"]) app.include_router(calendar_router, prefix="/api/calendar") system_router = create_system_router() app.include_router(system_router, prefix="/api/system", tags=["system"]) # ============================================================================ # ROOT & HEALTH ENDPOINTS # ============================================================================ @app.get("/") async def root(): """Root endpoint - API information.""" return { "name": settings.app_name, "version": settings.app_version, "status": "running", "modules": ["reports", "data-entry", "telegram"], "docs": "/docs", "health": "/health" } @app.get("/health") async def health_check(): """Health check endpoint with module status.""" health_status = { "api": "healthy", "timestamp": datetime.utcnow().isoformat(), "modules": {} } # Check Oracle connection try: async with oracle_pool.get_connection() as conn: with conn.cursor() as cursor: cursor.execute("SELECT 1 FROM DUAL") health_status["modules"]["oracle"] = "connected" except Exception as e: health_status["modules"]["oracle"] = f"error: {str(e)}" # Check Reports cache try: from backend.modules.reports.cache import get_cache cache = get_cache() health_status["modules"]["reports_cache"] = "initialized" if cache else "disabled" except Exception as e: health_status["modules"]["reports_cache"] = f"error: {str(e)}" # Check Data Entry DB try: db_path = Path(settings.data_entry_sqlite_database_path) health_status["modules"]["data_entry_db"] = "exists" if db_path.exists() else "missing" except Exception as e: health_status["modules"]["data_entry_db"] = f"error: {str(e)}" # Check Telegram bot global telegram_bot_task if telegram_bot_task: if telegram_bot_task.done(): health_status["modules"]["telegram_bot"] = "stopped" else: health_status["modules"]["telegram_bot"] = "running" else: health_status["modules"]["telegram_bot"] = "disabled" # Check OCR job worker global ocr_job_worker_running try: from backend.modules.data_entry.services.ocr.job_worker import is_running from backend.modules.data_entry.services.ocr.job_queue import job_queue if is_running(): # Get queue stats stats = await job_queue.get_queue_stats() health_status["modules"]["ocr_worker"] = { "status": "running", "pending_jobs": stats.get("pending", 0), "processing_jobs": stats.get("processing", 0), "avg_time_seconds": stats.get("average_time_seconds", 0) } else: health_status["modules"]["ocr_worker"] = "stopped" except Exception as e: health_status["modules"]["ocr_worker"] = f"error: {str(e)}" # Check SSH tunnels global ssh_tunnel_monitoring try: from backend.shared.ssh_tunnel_manager import ssh_tunnel_manager health_status["modules"]["ssh_tunnels"] = ssh_tunnel_manager.get_status() except Exception as e: health_status["modules"]["ssh_tunnels"] = f"error: {str(e)}" return health_status # ============================================================================ # MAIN ENTRY POINT # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run( "backend.main:app", host=settings.api_host, port=settings.api_port, reload=False, log_level="info" )