Files
roa2web-service-auto/backend/main.py
Claude Agent 752858182d feat: [US-002] Fix UTF-8 encoding for Windows console
Add UTF-8 encoding setup at the top of backend/main.py to fix
'charmap' codec encoding errors when logging Romanian diacritics
(ă, î, ș, ț) on Windows console.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 23:36:47 +00:00

629 lines
24 KiB
Python

"""
ROA2WEB Unified Backend - Single FastAPI Application
Consolidates Reports, Data Entry, and Telegram modules into one process
"""
# =============================================================================
# UTF-8 ENCODING FIX FOR WINDOWS CONSOLE
# Must be at the TOP, before any logging or print statements
# Fixes: 'charmap' codec can't encode character errors with Romanian diacritics
# =============================================================================
import sys
if sys.platform == 'win32':
# Force UTF-8 encoding on Windows console
# This prevents encoding errors when logging Romanian characters (ă, î, ș, ț)
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
import asyncio
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
# Load environment variables
load_dotenv()
# Add project root and shared modules to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root)) # Enable 'from backend.xxx import yyy'
sys.path.insert(0, str(project_root / "shared")) # Enable 'from shared.xxx import yyy'
# Import configuration
from backend.config import settings
# Import shared infrastructure
from shared.database.oracle_pool import oracle_pool
from shared.auth.middleware import AuthenticationMiddleware
from shared.auth.routes import create_auth_router
from shared.routes.companies import create_companies_router
from shared.routes.calendar import create_calendar_router
from shared.routes.system import create_system_router
# Import module router factories
from backend.modules.reports.routers import create_reports_router
from backend.modules.data_entry.routers import create_data_entry_router
from backend.modules.telegram.routers import create_telegram_router
# Configure logging (level from env: DEBUG, INFO, WARNING, ERROR)
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# Reduce noise from third-party libraries
logging.getLogger('httpcore').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('multipart').setLevel(logging.WARNING)
logging.getLogger('doctr').setLevel(logging.WARNING)
logging.getLogger('tensorflow').setLevel(logging.WARNING)
logging.getLogger('PIL').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# Global variables for background tasks
telegram_bot_task = None
ocr_job_worker_running = False
cleanup_task_running = False
email_cache_running = False
# ============================================================================
# INITIALIZATION FUNCTIONS
# ============================================================================
async def init_oracle_pool():
"""Initialize Oracle connection pool (shared by all modules)."""
logger.info("[ORACLE] Initializing connection pool...")
# Get configured servers
servers = settings.get_oracle_servers()
if servers:
# Multi-server mode: register all servers for lazy pool creation
logger.info(f"[ORACLE] Registering {len(servers)} servers for lazy pool creation:")
for srv in servers:
oracle_pool.register_server(
server_id=srv.id,
host=srv.host,
port=srv.port,
user=srv.user,
password=srv.password,
sid=srv.sid,
service_name=srv.service_name,
)
logger.info(f"[ORACLE] - {srv.id}: {srv.name} @ {srv.host}:{srv.port}")
# Mark as initialized (pools will be created lazily on first connection)
await oracle_pool.initialize()
else:
# Legacy single-server mode: initialize with env vars
logger.info("[ORACLE] Using legacy single-server configuration")
await oracle_pool.initialize()
logger.info("[ORACLE] ✅ Pool manager initialized successfully")
async def init_reports_cache():
"""Initialize Reports cache system."""
logger.info("[REPORTS] Initializing cache system...")
try:
from backend.modules.reports.cache import init_cache, init_event_monitor, get_cache
from backend.modules.reports.cache.config import CacheConfig
cache_config = CacheConfig.from_env()
await init_cache(cache_config)
logger.info(f"[REPORTS] ✅ Cache initialized: type={cache_config.cache_type}, enabled={cache_config.enabled}")
# Initialize event monitor
cache = get_cache()
await init_event_monitor(cache, cache_config)
if cache_config.auto_invalidate_enabled:
logger.info("[REPORTS] Event-based auto-invalidation ENABLED")
else:
logger.info("[REPORTS] Event-based auto-invalidation DISABLED")
except Exception as e:
logger.error(f"[REPORTS] ⚠️ Cache initialization error: {e}", exc_info=True)
logger.warning("[REPORTS] Continuing without cache")
async def init_data_entry_db():
"""Initialize Data Entry SQLite database."""
logger.info("[DATA-ENTRY] Initializing SQLite database...")
try:
from backend.modules.data_entry.db.database import init_db
await init_db()
logger.info(f"[DATA-ENTRY] ✅ Database initialized: {settings.data_entry_sqlite_database_path}")
# Ensure upload directory exists
settings.data_entry_upload_path_resolved
logger.info(f"[DATA-ENTRY] Upload path: {settings.data_entry_upload_path_resolved}")
except Exception as e:
logger.error(f"[DATA-ENTRY] ❌ Database initialization error: {e}", exc_info=True)
raise
async def init_telegram_db():
"""Initialize Telegram SQLite database."""
logger.info("[TELEGRAM] Initializing SQLite database...")
try:
from backend.modules.telegram.db import init_database, cleanup_expired_codes, cleanup_expired_sessions, cleanup_expired_email_codes
await init_database()
logger.info(f"[TELEGRAM] ✅ Database initialized: {settings.telegram_sqlite_database_path}")
# Cleanup expired data
expired_codes = await cleanup_expired_codes()
expired_sessions = await cleanup_expired_sessions()
expired_email_codes = await cleanup_expired_email_codes()
logger.info(f"[TELEGRAM] Cleanup: {expired_codes} codes, {expired_sessions} sessions, {expired_email_codes} email codes removed")
except Exception as e:
logger.error(f"[TELEGRAM] ❌ Database initialization error: {e}", exc_info=True)
raise
async def init_ocr_job_worker():
"""Initialize OCR job worker with persistent PaddleOCR.
This replaces the old background thread approach:
- Starts ProcessPoolExecutor with persistent worker
- Pre-warms PaddleOCR (loads once, reuses for all requests)
- Starts job queue background task
"""
global ocr_job_worker_running
logger.info("[OCR] Initializing OCR job worker...")
try:
from backend.modules.data_entry.services.ocr.job_worker import start_job_worker, is_running
success = await start_job_worker()
ocr_job_worker_running = is_running()
if success:
logger.info("[OCR] ✅ Job worker started (PaddleOCR persistent)")
else:
logger.warning("[OCR] ⚠️ Job worker failed to start, falling back to sync mode")
except Exception as e:
logger.warning(f"[OCR] ⚠️ OCR job worker init failed: {e}")
logger.warning("[OCR] Continuing with sync OCR mode")
ocr_job_worker_running = False
async def init_cleanup_task():
"""Initialize the cleanup background task for expired failed receipts (US-008).
Runs cleanup at startup and then every 24 hours:
- Finds receipts with processing_status='failed' older than 7 days
- Deletes the receipts and their attachment files from storage
"""
global cleanup_task_running
logger.info("[CLEANUP] Initializing cleanup background task...")
try:
from backend.modules.data_entry.services.cleanup_service import start_cleanup_task
from backend.modules.data_entry.db.database import get_session
success = await start_cleanup_task(get_session)
cleanup_task_running = success
if success:
logger.info("[CLEANUP] ✅ Cleanup task started (runs daily)")
else:
logger.warning("[CLEANUP] ⚠️ Cleanup task failed to start")
except Exception as e:
logger.warning(f"[CLEANUP] ⚠️ Cleanup task init failed: {e}")
cleanup_task_running = False
async def init_email_server_cache():
"""Initialize the email-server cache for multi-Oracle auto-discovery (US-003).
Builds a cache mapping emails to server IDs by querying CONTAFIN_ORACLE.UTILIZATORI
on each configured Oracle server. Starts auto-refresh every 15 minutes.
"""
global email_cache_running
# Only initialize if multi-server mode is configured
servers = settings.get_oracle_servers()
if not servers or len(servers) <= 1:
logger.info("[EMAIL-CACHE] Single-server mode, skipping email cache initialization")
return
logger.info("[EMAIL-CACHE] Initializing email-server cache...")
try:
from shared.auth.email_server_cache import (
email_server_cache,
build_email_cache,
start_email_cache_refresh
)
# Build initial cache
await build_email_cache()
# Start auto-refresh
await start_email_cache_refresh()
email_cache_running = True
stats = email_server_cache.get_cache_stats()
logger.info(f"[EMAIL-CACHE] ✅ Cache initialized: {stats['total_emails']} emails")
except Exception as e:
logger.warning(f"[EMAIL-CACHE] ⚠️ Cache init failed: {e}")
logger.warning("[EMAIL-CACHE] Multi-server email lookup will not be available")
email_cache_running = False
async def run_telegram_bot():
"""Run Telegram bot as background task."""
logger.info("[TELEGRAM] Starting bot...")
try:
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters
from backend.modules.telegram.bot.handlers import (
start_command, help_command, clear_command, companies_command,
unlink_command, selectcompany_command, dashboard_command, sold_command,
facturi_command, trezorerie_command, menu_command, trezorerie_casa_command,
trezorerie_banca_command, clienti_command, furnizori_command, evolutie_command,
clearcache_command, togglecache_command, handle_text_message, button_callback,
error_handler
)
from backend.modules.telegram.bot.email_handlers import email_login_handler
# Create Telegram application
application = Application.builder().token(settings.telegram_bot_token).build()
# Register handlers
application.add_handler(email_login_handler)
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("menu", menu_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("unlink", unlink_command))
application.add_handler(CommandHandler("clear", clear_command))
application.add_handler(CommandHandler("companies", companies_command))
application.add_handler(CommandHandler("selectcompany", selectcompany_command))
application.add_handler(CommandHandler("dashboard", dashboard_command))
application.add_handler(CommandHandler("sold", sold_command))
application.add_handler(CommandHandler("facturi", facturi_command))
application.add_handler(CommandHandler("trezorerie", trezorerie_command))
application.add_handler(CommandHandler("trezorerie_casa", trezorerie_casa_command))
application.add_handler(CommandHandler("trezorerie_banca", trezorerie_banca_command))
application.add_handler(CommandHandler("clienti", clienti_command))
application.add_handler(CommandHandler("furnizori", furnizori_command))
application.add_handler(CommandHandler("evolutie", evolutie_command))
application.add_handler(CommandHandler("clearcache", clearcache_command))
application.add_handler(CommandHandler("togglecache", togglecache_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
application.add_handler(CallbackQueryHandler(button_callback))
application.add_error_handler(error_handler)
# Initialize and start
await application.initialize()
await application.start()
await application.updater.start_polling(
drop_pending_updates=True,
poll_interval=0, # No delay between polls
timeout=30 # Long poll timeout 30 seconds (reduces requests from ~6/min to ~2/min)
)
bot_info = await application.bot.get_me()
logger.info(f"[TELEGRAM] ✅ Bot running: @{bot_info.username}")
# Keep bot running
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.info("[TELEGRAM] Bot task cancelled, stopping...")
if 'application' in locals():
await application.updater.stop()
await application.stop()
await application.shutdown()
logger.info("[TELEGRAM] ✅ Bot stopped")
raise
except Exception as e:
logger.error(f"[TELEGRAM] ❌ Bot error: {e}", exc_info=True)
raise
# ============================================================================
# FASTAPI APPLICATION
# ============================================================================
app = FastAPI(
title="ROA2WEB Unified Backend",
description="Unified FastAPI backend for Reports, Data Entry, and Telegram modules",
version="1.0.0"
)
# ============================================================================
# STARTUP/SHUTDOWN EVENT HANDLERS
# ============================================================================
@app.on_event("startup")
async def startup_event():
"""Application startup - Initialize all resources."""
global telegram_bot_task
logger.info("=" * 80)
logger.info("[STARTUP] ROA2WEB Unified Backend")
logger.info("=" * 80)
try:
# Step 1: Initialize Oracle pool (shared by all modules)
await init_oracle_pool()
# Step 2: Parallel initialization of module-specific resources
logger.info("[STARTUP] Initializing module resources in parallel...")
await asyncio.gather(
init_reports_cache(),
init_data_entry_db(),
init_telegram_db(),
)
# Step 3: Initialize OCR job worker (with persistent PaddleOCR)
await init_ocr_job_worker()
# Step 4: Initialize cleanup task for expired failed receipts (US-008)
await init_cleanup_task()
# Step 5: Initialize email-server cache for multi-Oracle (US-003)
await init_email_server_cache()
# Step 6: Start Telegram bot as background task
if settings.telegram_bot_token:
telegram_bot_task = asyncio.create_task(run_telegram_bot())
logger.info("[STARTUP] ✅ Telegram bot task created")
else:
logger.warning("[STARTUP] ⚠️ TELEGRAM_BOT_TOKEN not set, bot disabled")
logger.info("=" * 80)
logger.info("[STARTUP] ✅ All modules initialized successfully")
logger.info(f"[STARTUP] ✅ Server running on http://{settings.api_host}:{settings.api_port}")
logger.info("=" * 80)
except Exception as e:
logger.error(f"[STARTUP] ❌ Initialization failed: {e}", exc_info=True)
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Application shutdown - Cleanup resources."""
global telegram_bot_task, ocr_job_worker_running, cleanup_task_running, email_cache_running
logger.info("=" * 80)
logger.info("[SHUTDOWN] Stopping ROA2WEB Unified Backend...")
logger.info("=" * 80)
try:
# Stop email cache auto-refresh (US-003)
if email_cache_running:
logger.info("[SHUTDOWN] Stopping email cache auto-refresh...")
try:
from shared.auth.email_server_cache import stop_email_cache_refresh
await stop_email_cache_refresh()
email_cache_running = False
logger.info("[SHUTDOWN] Email cache stopped")
except Exception as e:
logger.error(f"[SHUTDOWN] Email cache error: {e}")
# Stop cleanup task (US-008)
if cleanup_task_running:
logger.info("[SHUTDOWN] Stopping cleanup task...")
try:
from backend.modules.data_entry.services.cleanup_service import stop_cleanup_task
await stop_cleanup_task()
cleanup_task_running = False
logger.info("[SHUTDOWN] Cleanup task stopped")
except Exception as e:
logger.error(f"[SHUTDOWN] Cleanup task error: {e}")
# Stop OCR job worker
if ocr_job_worker_running:
logger.info("[SHUTDOWN] Stopping OCR job worker...")
try:
from backend.modules.data_entry.services.ocr.job_worker import stop_job_worker
await stop_job_worker()
ocr_job_worker_running = False
logger.info("[SHUTDOWN] OCR job worker stopped")
except Exception as e:
logger.error(f"[SHUTDOWN] OCR worker error: {e}")
# Stop Telegram bot
if telegram_bot_task and not telegram_bot_task.done():
logger.info("[SHUTDOWN] Stopping Telegram bot...")
telegram_bot_task.cancel()
try:
await telegram_bot_task
except asyncio.CancelledError:
pass
# Stop Reports cache event monitor
try:
from backend.modules.reports.cache import close_cache, get_event_monitor
monitor = get_event_monitor()
if monitor:
await monitor.stop()
logger.info("[SHUTDOWN] Reports cache monitor stopped")
await close_cache()
logger.info("[SHUTDOWN] Reports cache closed")
except Exception as e:
logger.error(f"[SHUTDOWN] Cache error: {e}")
# Close Oracle pool
await oracle_pool.close_pool()
logger.info("[SHUTDOWN] Oracle pool closed")
logger.info("=" * 80)
logger.info("[SHUTDOWN] ✅ Shutdown complete")
logger.info("=" * 80)
except Exception as e:
logger.error(f"[SHUTDOWN] Error during shutdown: {e}", exc_info=True)
# ============================================================================
# MIDDLEWARE
# ============================================================================
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for production deployment
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Authentication middleware
app.add_middleware(
AuthenticationMiddleware,
excluded_paths=[
"/", "/docs", "/health", "/redoc", "/openapi.json",
"/api/auth/login", "/api/auth/refresh", "/api/auth/check-email",
"/api/auth/check-identity", # US-013: Dual login support (email + username)
"/api/system/auth-mode", # Public endpoint for login mode detection
"/api/telegram/auth/verify-user",
"/api/telegram/auth/verify-email",
"/api/telegram/auth/login-with-email",
"/api/telegram/auth/refresh-token",
"/api/telegram/health",
"/api/telegram/internal/save-code"
]
)
# ============================================================================
# ROUTER REGISTRATION
# ============================================================================
# Module routers with prefixes
app.include_router(create_reports_router(), prefix="/api/reports", tags=["reports"])
app.include_router(create_data_entry_router(), prefix="/api/data-entry", tags=["data-entry"])
app.include_router(create_telegram_router(), prefix="/api/telegram", tags=["telegram"])
# Shared routers
auth_router = create_auth_router(prefix="", tags=["authentication"])
app.include_router(auth_router, prefix="/api/auth")
companies_router = create_companies_router(oracle_pool, tags=["companies"])
app.include_router(companies_router, prefix="/api/companies")
calendar_router = create_calendar_router(oracle_pool, tags=["calendar"])
app.include_router(calendar_router, prefix="/api/calendar")
system_router = create_system_router()
app.include_router(system_router, prefix="/api/system", tags=["system"])
# ============================================================================
# ROOT & HEALTH ENDPOINTS
# ============================================================================
@app.get("/")
async def root():
"""Root endpoint - API information."""
return {
"name": settings.app_name,
"version": settings.app_version,
"status": "running",
"modules": ["reports", "data-entry", "telegram"],
"docs": "/docs",
"health": "/health"
}
@app.get("/health")
async def health_check():
"""Health check endpoint with module status."""
health_status = {
"api": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"modules": {}
}
# Check Oracle connection
try:
async with oracle_pool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1 FROM DUAL")
health_status["modules"]["oracle"] = "connected"
except Exception as e:
health_status["modules"]["oracle"] = f"error: {str(e)}"
# Check Reports cache
try:
from backend.modules.reports.cache import get_cache
cache = get_cache()
health_status["modules"]["reports_cache"] = "initialized" if cache else "disabled"
except Exception as e:
health_status["modules"]["reports_cache"] = f"error: {str(e)}"
# Check Data Entry DB
try:
db_path = Path(settings.data_entry_sqlite_database_path)
health_status["modules"]["data_entry_db"] = "exists" if db_path.exists() else "missing"
except Exception as e:
health_status["modules"]["data_entry_db"] = f"error: {str(e)}"
# Check Telegram bot
global telegram_bot_task
if telegram_bot_task:
if telegram_bot_task.done():
health_status["modules"]["telegram_bot"] = "stopped"
else:
health_status["modules"]["telegram_bot"] = "running"
else:
health_status["modules"]["telegram_bot"] = "disabled"
# Check OCR job worker
global ocr_job_worker_running
try:
from backend.modules.data_entry.services.ocr.job_worker import is_running
from backend.modules.data_entry.services.ocr.job_queue import job_queue
if is_running():
# Get queue stats
stats = await job_queue.get_queue_stats()
health_status["modules"]["ocr_worker"] = {
"status": "running",
"pending_jobs": stats.get("pending", 0),
"processing_jobs": stats.get("processing", 0),
"avg_time_seconds": stats.get("average_time_seconds", 0)
}
else:
health_status["modules"]["ocr_worker"] = "stopped"
except Exception as e:
health_status["modules"]["ocr_worker"] = f"error: {str(e)}"
return health_status
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"backend.main:app",
host=settings.api_host,
port=settings.api_port,
reload=False,
log_level="info"
)