Files
roa2web-service-auto/reports-app/telegram-bot/app/internal_api.py
Marius Mutu 6b13ffa183 Initial commit: ROA2WEB - FastAPI + Vue.js + Telegram Bot
Modern ERP Reports Application with microservices architecture

Tech Stack:
- Backend: FastAPI + python-oracledb (Oracle DB integration)
- Frontend: Vue.js 3 + PrimeVue + Vite
- Telegram Bot: python-telegram-bot + SQLite
- Infrastructure: Shared database pool, JWT authentication, SSH tunnel

Features:
- FastAPI backend with async Oracle connection pool
- Vue.js 3 responsive frontend with PrimeVue components
- Telegram bot alternative interface
- Microservices architecture with shared components
- Complete deployment support (Linux Docker + Windows IIS)
- Comprehensive testing (Playwright E2E + pytest)

Repository Structure:
- reports-app/ - Main application (backend, frontend, telegram-bot)
- shared/ - Shared components (database pool, auth, utils)
- deployment/ - Deployment scripts (Linux & Windows)
- docs/ - Project documentation
- security/ - Security scanning and git hooks
2025-10-25 14:55:08 +03:00

376 lines
11 KiB
Python

"""
Internal API for Backend Communication
This FastAPI application provides internal endpoints for the ROA2WEB backend
to communicate with the Telegram bot service. Main purpose is to save
authentication codes generated in the web frontend.
This API runs alongside the Telegram bot and is accessible only internally
(not exposed to public internet).
"""
import logging
import os
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from app.db.operations import create_auth_code, get_auth_code
from app.db.database import get_database_stats
logger = logging.getLogger(__name__)
# Initialize FastAPI app
internal_api = FastAPI(
title="ROA2WEB Telegram Bot - Internal API",
description="Internal API for backend communication (auth code management)",
version="1.0.0",
docs_url="/internal/docs" if os.getenv("ENABLE_DOCS", "false") == "true" else None,
redoc_url=None
)
# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================
class SaveAuthCodeRequest(BaseModel):
"""
Request model for saving an authentication code.
"""
code: str = Field(
...,
description="8-character authentication code",
min_length=8,
max_length=8
)
telegram_user_id: int = Field(
...,
description="Telegram user ID (if known, otherwise 0)",
ge=0
)
oracle_username: str = Field(
...,
description="Oracle username to link"
)
expires_in_minutes: int = Field(
default=5,
description="Code expiration time in minutes",
ge=1,
le=60
)
class SaveAuthCodeResponse(BaseModel):
"""
Response model for save auth code endpoint.
"""
success: bool = Field(..., description="Whether the operation succeeded")
code: str = Field(..., description="The saved authentication code")
expires_at: Optional[str] = Field(None, description="Expiration timestamp (ISO format)")
message: Optional[str] = Field(None, description="Additional message")
class VerifyAuthCodeRequest(BaseModel):
"""
Request model for verifying an authentication code.
"""
code: str = Field(..., description="Authentication code to verify")
class VerifyAuthCodeResponse(BaseModel):
"""
Response model for verify auth code endpoint.
"""
valid: bool = Field(..., description="Whether the code is valid")
oracle_username: Optional[str] = Field(None, description="Oracle username if valid")
telegram_user_id: Optional[int] = Field(None, description="Telegram user ID if set")
message: Optional[str] = Field(None, description="Additional message")
class HealthResponse(BaseModel):
"""
Response model for health check endpoint.
"""
status: str = Field(..., description="Service status")
timestamp: str = Field(..., description="Current timestamp")
database_stats: Optional[dict] = Field(None, description="Database statistics")
# ============================================================================
# ENDPOINTS
# ============================================================================
@internal_api.post(
"/internal/save-code",
response_model=SaveAuthCodeResponse,
status_code=status.HTTP_201_CREATED,
summary="Save Authentication Code",
description="Save an authentication code for Telegram linking (called by backend)"
)
async def save_auth_code(request: SaveAuthCodeRequest):
"""
Save an authentication code to SQLite database.
This endpoint is called by the FastAPI backend when a user generates
a linking code in the web frontend.
**Flow:**
1. User logs in to web frontend
2. User clicks "Link Telegram Account"
3. Backend generates 8-character code
4. Backend calls this endpoint to save code
5. Backend returns code to user for display
6. User sends code to Telegram bot via /start command
Args:
request: SaveAuthCodeRequest with code, oracle_username, etc.
Returns:
SaveAuthCodeResponse with success status and code details
Raises:
HTTPException 400: If code already exists or invalid data
HTTPException 500: If database operation fails
"""
try:
logger.info(
f"Saving auth code for Oracle user: {request.oracle_username}, "
f"code: {request.code}"
)
# Check if code already exists
existing_code = await get_auth_code(request.code)
if existing_code:
logger.warning(f"Code {request.code} already exists")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Code {request.code} already exists. Generate a new unique code."
)
# Create auth code in database
success = await create_auth_code(
code=request.code,
telegram_user_id=request.telegram_user_id,
oracle_username=request.oracle_username,
expires_in_minutes=request.expires_in_minutes
)
if not success:
logger.error(f"Failed to save auth code {request.code}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to save authentication code to database"
)
# Calculate expiration time
from datetime import timedelta
expires_at = (datetime.now() + timedelta(minutes=request.expires_in_minutes)).isoformat()
logger.info(f"Auth code {request.code} saved successfully")
return SaveAuthCodeResponse(
success=True,
code=request.code,
expires_at=expires_at,
message=f"Code saved successfully, expires in {request.expires_in_minutes} minutes"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error saving auth code: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}"
)
@internal_api.post(
"/internal/verify-code",
response_model=VerifyAuthCodeResponse,
summary="Verify Authentication Code",
description="Verify if an authentication code is valid (without using it)"
)
async def verify_auth_code(request: VerifyAuthCodeRequest):
"""
Verify if an authentication code exists and is valid.
This is a read-only check that does NOT mark the code as used.
Useful for backend to verify codes before user links Telegram account.
Args:
request: VerifyAuthCodeRequest with code to verify
Returns:
VerifyAuthCodeResponse with validation status
Raises:
HTTPException 404: If code not found
"""
try:
logger.info(f"Verifying auth code: {request.code}")
code_data = await get_auth_code(request.code)
if not code_data:
return VerifyAuthCodeResponse(
valid=False,
message="Code not found"
)
# Check if code is expired
expires_at_str = code_data.get('expires_at')
expires_at = datetime.fromisoformat(expires_at_str) if expires_at_str else None
is_expired = expires_at and datetime.now() >= expires_at
is_used = code_data.get('used', 0) == 1
if is_expired:
return VerifyAuthCodeResponse(
valid=False,
oracle_username=code_data.get('oracle_username'),
message="Code expired"
)
if is_used:
return VerifyAuthCodeResponse(
valid=False,
oracle_username=code_data.get('oracle_username'),
message="Code already used"
)
# Code is valid
return VerifyAuthCodeResponse(
valid=True,
oracle_username=code_data.get('oracle_username'),
telegram_user_id=code_data.get('telegram_user_id'),
message="Code is valid"
)
except Exception as e:
logger.error(f"Error verifying auth code: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}"
)
@internal_api.get(
"/internal/health",
response_model=HealthResponse,
summary="Health Check",
description="Check if the internal API and database are healthy"
)
async def health_check():
"""
Health check endpoint.
Returns service status and database statistics.
Returns:
HealthResponse with status and stats
"""
try:
# Get database stats
stats = await get_database_stats()
return HealthResponse(
status="healthy",
timestamp=datetime.now().isoformat(),
database_stats=stats
)
except Exception as e:
logger.error(f"Health check failed: {e}", exc_info=True)
return HealthResponse(
status="unhealthy",
timestamp=datetime.now().isoformat(),
database_stats={"error": str(e)}
)
@internal_api.get(
"/internal/stats",
summary="Database Statistics",
description="Get detailed database statistics"
)
async def get_stats():
"""
Get detailed database statistics.
Returns:
JSON with database statistics
"""
try:
stats = await get_database_stats()
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"success": True,
"timestamp": datetime.now().isoformat(),
"stats": stats
}
)
except Exception as e:
logger.error(f"Error getting stats: {e}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"error": str(e)
}
)
# ============================================================================
# EXCEPTION HANDLERS
# ============================================================================
@internal_api.exception_handler(Exception)
async def global_exception_handler(request, exc):
"""
Global exception handler for uncaught exceptions.
"""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"error": "Internal server error",
"detail": str(exc) if os.getenv("DEBUG", "false") == "true" else "An error occurred"
}
)
# ============================================================================
# STARTUP/SHUTDOWN EVENTS
# ============================================================================
@internal_api.on_event("startup")
async def startup_event():
"""
Startup event handler.
"""
logger.info("Internal API starting up...")
logger.info(f"Internal API ready on port {os.getenv('INTERNAL_API_PORT', '8002')}")
@internal_api.on_event("shutdown")
async def shutdown_event():
"""
Shutdown event handler.
"""
logger.info("Internal API shutting down...")
# Export the FastAPI app
__all__ = ['internal_api']