Files
roa2web-service-auto/reports-app/telegram-bot/app/api/client.py
Marius Mutu 706062dc0f Implement email-based 2FA authentication for Telegram bot with Oracle integration fixes
This commit adds a complete email authentication flow for the Telegram bot, allowing users to login with email + password instead of web app linking codes. Includes critical bug fixes for Oracle integration.

**New Features:**
- Email-based 2FA authentication with 6-digit codes sent via SMTP
- Backend endpoints: verify-email and login-with-email
- ConversationHandler for email authentication flow in Telegram bot
- Session token verification to prevent user ID spoofing
- Rate limiting (5 attempts per 5 minutes)
- Email code expiry (5 minutes) with automatic cleanup

**Bug Fixes:**
- Fixed Oracle column name: ACTIV → INACTIV (with inverted logic)
- Fixed Oracle password verification: verificautilizator returns checksum, not user_id
- Fixed username case sensitivity: Oracle usernames must be uppercase
- Fixed SMTP connection: use start_tls parameter instead of manual STARTTLS
- Added middleware exclusions for public email auth endpoints

**Backend Changes:**
- Added verify-email endpoint (public) in telegram.py
- Added login-with-email endpoint (public) with rate limiting and session verification
- Updated middleware exclusions in main.py and auth_middleware_wrapper.py
- Added AUTH_SESSION_SECRET configuration for session token signing

**Telegram Bot Changes:**
- New modules: app/auth/email_auth.py, app/bot/email_handlers.py
- New utilities: app/utils/email_service.py (SMTP email sending)
- Updated handlers.py: ignore callbacks handled by ConversationHandler
- Updated menus.py: show Login button for unauthenticated users
- Updated API client: verify_email() and login_with_email() methods
- Database: email_auth_codes table with cleanup task

**Configuration:**
- Added SMTP configuration to telegram-bot .env.example
- Added AUTH_SESSION_SECRET to backend .env.example
- Updated .gitignore: exclude temporary files (*.pid, *.checksum, test scripts)

**Dependencies:**
- Added aiosmtplib for async SMTP email sending

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 12:00:46 +02:00

918 lines
28 KiB
Python

"""
API Client for ROA2WEB Backend Communication
This module provides an async HTTP client for communicating with the FastAPI backend.
Handles authentication, requests, error handling, and response parsing.
"""
import logging
import os
from typing import Optional, Dict, Any, List
from datetime import datetime
import httpx
from httpx import AsyncClient, Response, HTTPError, ConnectError
logger = logging.getLogger(__name__)
# Backend configuration from environment
# Default to port 8000 (production) instead of 8001 (development)
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
REQUEST_TIMEOUT = float(os.getenv("API_TIMEOUT", "30.0")) # 30 seconds default
class BackendAPIClient:
"""
Async HTTP client for ROA2WEB FastAPI backend.
Provides methods for all API endpoints used by the Telegram bot:
- Dashboard data
- Invoices search and retrieval
- Treasury/payment data
- Report exports
- Company listings
- User authentication and token management
"""
def __init__(self, base_url: str = BACKEND_URL):
"""
Initialize the API client.
Args:
base_url: Base URL of the FastAPI backend
"""
self.base_url = base_url.rstrip('/')
self.client: Optional[AsyncClient] = None
logger.info(f"Backend API client initialized with base URL: {self.base_url}")
async def __aenter__(self):
"""Async context manager entry."""
self.client = AsyncClient(
base_url=self.base_url,
timeout=REQUEST_TIMEOUT,
follow_redirects=True
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.client:
await self.client.aclose()
def _get_auth_headers(self, jwt_token: str) -> Dict[str, str]:
"""
Generate authentication headers with JWT token.
Args:
jwt_token: JWT access token
Returns:
Dict with Authorization header
"""
return {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json"
}
async def _handle_response(self, response: Response) -> Dict[str, Any]:
"""
Handle API response and extract data.
Args:
response: HTTP response object
Returns:
Dict: Response JSON data
Raises:
HTTPError: If response status is not successful
"""
try:
response.raise_for_status()
return response.json()
except HTTPError as e:
logger.error(f"API request failed: {e}")
logger.error(f"Response body: {response.text}")
raise
except Exception as e:
logger.error(f"Failed to parse response: {e}")
raise
# =========================================================================
# AUTHENTICATION & USER ENDPOINTS
# =========================================================================
async def verify_user(
self,
oracle_username: str,
linking_code: str
) -> Optional[Dict[str, Any]]:
"""
Verify user exists in Oracle and get JWT token.
Called during Telegram linking process (auto-linking flow).
Args:
oracle_username: Oracle username extracted from linking code
linking_code: The 8-character linking code for validation
Returns:
Dict with:
- success: True if verification succeeded
- access_token: JWT access token
- refresh_token: JWT refresh token
- user: Dict with user_id, username, companies, permissions
- message: Status message
None if user not found or error
Example:
result = await client.verify_user("JOHN.DOE", "ABC12345")
if result and result['success']:
jwt_token = result['access_token']
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Flow A: Auto-linking (no password required)
response = await self.client.post(
"/api/telegram/auth/verify-user",
json={
"linking_code": linking_code,
"oracle_username": oracle_username
}
)
return await self._handle_response(response)
except ConnectError as e:
logger.error(f"Cannot connect to backend at {self.base_url}: {e}")
logger.error("Verify that backend service is running and BACKEND_URL is correct")
return None
except HTTPError as e:
if e.response.status_code == 404:
logger.warning(f"User {oracle_username} not found in Oracle")
return None
logger.error(f"Failed to verify user {oracle_username}: {e}")
return None
except Exception as e:
logger.error(f"Error verifying user: {e}")
return None
async def refresh_token(self, refresh_token: str) -> Optional[str]:
"""
Refresh JWT token for a user.
Args:
refresh_token: JWT refresh token
Returns:
str: New JWT access token, None if failed
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.post(
"/api/telegram/auth/refresh-token",
json={"refresh_token": refresh_token}
)
data = await self._handle_response(response)
return data.get('access_token')
except Exception as e:
logger.error(f"Failed to refresh token: {e}")
return None
async def verify_email(self, email: str) -> dict:
"""
Verify if email exists in Oracle database
Args:
email: Email address to verify
Returns:
dict with 'success' (bool), 'username' (str or None), and 'message' (str)
Raises:
httpx.HTTPError: On network or HTTP errors
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.post(
"/api/telegram/auth/verify-email",
json={"email": email}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error verifying email {email}: {e.response.status_code}")
return {
"success": False,
"username": None,
"message": "Eroare la verificarea email-ului"
}
except Exception as e:
logger.error(f"Failed to verify email {email}: {e}")
return {
"success": False,
"username": None,
"message": "Eroare la verificarea email-ului"
}
async def login_with_email(
self,
email: str,
password: str,
telegram_user_id: int,
session_token: str
) -> dict:
"""
Login via email + password with session token
Args:
email: User email address
password: Oracle password
telegram_user_id: Telegram user ID
session_token: Signed token from code validation
Returns:
Login response with JWT tokens and user data
Raises:
httpx.HTTPError: On network or HTTP errors
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.post(
"/api/telegram/auth/login-with-email",
json={
"email": email,
"password": password,
"telegram_user_id": telegram_user_id,
"session_token": session_token
},
timeout=30.0 # 30 seconds timeout
)
response.raise_for_status()
data = response.json()
logger.info(f"Email login successful for user {telegram_user_id}")
return data
except httpx.HTTPStatusError as e:
logger.error(f"Email login HTTP error: {e.response.status_code} - {e.response.text}")
# Parse error detail if available
try:
error_data = e.response.json()
return {
"success": False,
"message": error_data.get("detail", "Autentificare eșuată")
}
except:
return {
"success": False,
"message": "Autentificare eșuată"
}
except httpx.TimeoutException:
logger.error("Email login timeout")
return {
"success": False,
"message": "Timeout. Te rugăm să încerci din nou."
}
except Exception as e:
logger.error(f"Email login error: {e}", exc_info=True)
return {
"success": False,
"message": "Eroare de conexiune"
}
async def get_user_companies(self, jwt_token: str) -> List[Dict[str, Any]]:
"""
Get list of companies the user has access to.
Args:
jwt_token: JWT access token
Returns:
List of company dicts with id, nume_firma, cui, etc.
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.get(
"/api/companies",
headers=self._get_auth_headers(jwt_token)
)
data = await self._handle_response(response)
# Backend returns {"companies": [...], "total_count": N}
if isinstance(data, dict) and "companies" in data:
return data["companies"]
return data if isinstance(data, list) else []
except Exception as e:
logger.error(f"Failed to get companies: {e}")
return []
# =========================================================================
# DASHBOARD ENDPOINTS
# =========================================================================
async def get_dashboard_data(
self,
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get dashboard statistics for a company.
Args:
company_id: Company ID
jwt_token: JWT access token
Returns:
Dict with dashboard data (sold_total, facturi, plati, etc.)
Includes _cache_hit and _response_time_ms metadata
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Add cache metadata header for Telegram Bot
headers = self._get_auth_headers(jwt_token)
headers['X-Include-Cache-Metadata'] = 'true'
response = await self.client.get(
"/api/dashboard/summary",
params={"company": str(company_id)},
headers=headers
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get dashboard data for company {company_id}: {e}")
return None
async def get_treasury_breakdown(
self,
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get detailed treasury breakdown (casa + banca accounts).
Args:
company_id: Company ID
jwt_token: JWT access token
Returns:
Dict with treasury breakdown data (accounts by type)
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Add cache metadata header for Telegram Bot
headers = self._get_auth_headers(jwt_token)
headers['X-Include-Cache-Metadata'] = 'true'
response = await self.client.get(
f"/api/dashboard/treasury-breakdown?company={company_id}",
headers=headers
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get treasury breakdown for company {company_id}: {e}")
return None
async def get_detailed_data(
self,
company_id: int,
jwt_token: str,
data_type: str
) -> Optional[Dict[str, Any]]:
"""
Get detailed data for clients or suppliers.
Args:
company_id: Company ID
jwt_token: JWT access token
data_type: Type of data ('clients' or 'suppliers')
Returns:
Dict with detailed data (list of clients/suppliers with balances)
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Add cache metadata header for Telegram Bot
headers = self._get_auth_headers(jwt_token)
headers['X-Include-Cache-Metadata'] = 'true'
response = await self.client.get(
f"/api/dashboard/detailed-data?company={company_id}&data_type={data_type}",
headers=headers
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get detailed data ({data_type}) for company {company_id}: {e}")
return None
async def get_maturity_data(
self,
company_id: int,
jwt_token: str,
period: str = "all"
) -> Optional[Dict[str, Any]]:
"""
Get maturity data (in term/overdue breakdown).
Args:
company_id: Company ID
jwt_token: JWT access token
period: Period filter ('all', '30', '60', '90')
Returns:
Dict with maturity data (in_term, overdue, total)
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Add cache metadata header for Telegram Bot
headers = self._get_auth_headers(jwt_token)
headers['X-Include-Cache-Metadata'] = 'true'
response = await self.client.get(
f"/api/dashboard/maturity?company={company_id}&period={period}",
headers=headers
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get maturity data for company {company_id}: {e}")
return None
async def get_performance_data(
self,
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get performance data (incasari/plati totals).
Args:
company_id: Company ID
jwt_token: JWT access token
Returns:
Dict with performance data (incasari_total, plati_total, net)
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Add cache metadata header for Telegram Bot
headers = self._get_auth_headers(jwt_token)
headers['X-Include-Cache-Metadata'] = 'true'
response = await self.client.get(
f"/api/dashboard/performance?company={company_id}",
headers=headers
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get performance data for company {company_id}: {e}")
return None
async def get_monthly_flows(
self,
company_id: int,
jwt_token: str,
months: int = 12
) -> Optional[Dict[str, Any]]:
"""
Get monthly cash flows data.
Args:
company_id: Company ID
jwt_token: JWT access token
months: Number of months to retrieve
Returns:
Dict with monthly flows (months, incasari, plati arrays)
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Add cache metadata header for Telegram Bot
headers = self._get_auth_headers(jwt_token)
headers['X-Include-Cache-Metadata'] = 'true'
response = await self.client.get(
f"/api/dashboard/monthly-flows?company={company_id}&months={months}",
headers=headers
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get monthly flows for company {company_id}: {e}")
return None
async def get_trends(
self,
company_id: int,
jwt_token: str,
period: str = "12m"
) -> Optional[Dict[str, Any]]:
"""
Get trends data (12-month historical data for collections/payments).
Args:
company_id: Company ID
jwt_token: JWT access token
period: Period for trends (e.g., "12m", "6m", "ytd")
Returns:
Dict with trends data including periods, clienti_incasat, furnizori_achitat arrays
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
# Add cache metadata header for Telegram Bot
headers = self._get_auth_headers(jwt_token)
headers['X-Include-Cache-Metadata'] = 'true'
response = await self.client.get(
f"/api/dashboard/trends?company={company_id}&period={period}",
headers=headers
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get trends for company {company_id}: {e}")
return None
# =========================================================================
# INVOICES ENDPOINTS
# =========================================================================
async def search_invoices(
self,
company_id: int,
jwt_token: str,
filters: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Search invoices with optional filters.
Args:
company_id: Company ID
jwt_token: JWT access token
filters: Optional filters dict:
- date_from: str (YYYY-MM-DD)
- date_to: str (YYYY-MM-DD)
- status: str (paid, unpaid, overdue)
- client_name: str
- partner_type: str (CLIENTI, FURNIZORI)
- partner_name: str
- series: str
- number: str
Returns:
List of invoice dicts
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
params = {"company": company_id}
if filters:
params.update(filters)
response = await self.client.get(
"/api/invoices/",
params=params,
headers=self._get_auth_headers(jwt_token)
)
data = await self._handle_response(response)
if isinstance(data, dict) and 'invoices' in data:
invoice_list = data['invoices']
return invoice_list
elif isinstance(data, list):
return data
else:
logger.warning(f"📥 Unexpected response format: {type(data)}")
return []
except Exception as e:
logger.error(f"Failed to search invoices for company {company_id}: {e}")
return []
async def get_invoice_summary(
self,
company_id: int,
jwt_token: str,
partner_type: str = "CLIENTI"
) -> Optional[Dict[str, Any]]:
"""
Get invoice summary statistics.
Args:
company_id: Company ID
jwt_token: JWT access token
Returns:
Dict with summary (total_count, total_amount, paid, unpaid, etc.)
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.get(
"/api/invoices/summary",
params={
"company": str(company_id),
"partner_type": partner_type
},
headers=self._get_auth_headers(jwt_token)
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get invoice summary for company {company_id}: {e}")
return None
# =========================================================================
# TREASURY ENDPOINTS
# =========================================================================
async def get_treasury_data(
self,
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get treasury/cash flow data for a company.
Args:
company_id: Company ID
jwt_token: JWT access token
Returns:
Dict with treasury data (cash_balance, incoming, outgoing, etc.)
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.get(
"/api/treasury/bank-cash-register",
params={
"company": str(company_id),
"page": 1,
"page_size": 1000
},
headers=self._get_auth_headers(jwt_token)
)
return await self._handle_response(response)
except Exception as e:
logger.error(f"Failed to get treasury data for company {company_id}: {e}")
return None
# =========================================================================
# EXPORT ENDPOINTS
# =========================================================================
async def export_report(
self,
jwt_token: str,
report_type: str,
company_id: int,
format: str = "xlsx",
filters: Optional[Dict[str, Any]] = None
) -> Optional[bytes]:
"""
Generate and export a report.
Args:
jwt_token: JWT access token
report_type: Type of report ('dashboard', 'invoices', 'treasury')
company_id: Company ID
format: Export format ('xlsx', 'csv', 'pdf')
filters: Optional filters for data
Returns:
bytes: File content, None if failed
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
request_data = {
"type": report_type,
"company_id": company_id,
"format": format,
"filters": filters or {}
}
response = await self.client.post(
"/api/telegram/export",
json=request_data,
headers=self._get_auth_headers(jwt_token)
)
response.raise_for_status()
return response.content
except Exception as e:
logger.error(f"Failed to export report: {e}")
return None
# =========================================================================
# CACHE MANAGEMENT
# =========================================================================
async def invalidate_cache(
self,
jwt_token: str,
company_id: Optional[int] = None,
cache_type: Optional[str] = None
) -> bool:
"""
Invalidate cache entries.
Args:
jwt_token: JWT access token
company_id: Optional company ID (None = all companies)
cache_type: Optional cache type (None = all types)
Returns:
bool: True if successful
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
request_data = {}
if company_id is not None:
request_data['company_id'] = company_id
if cache_type is not None:
request_data['cache_type'] = cache_type
response = await self.client.post(
"/api/cache/invalidate",
json=request_data,
headers=self._get_auth_headers(jwt_token)
)
response.raise_for_status()
logger.info(f"Cache invalidated: company_id={company_id}, cache_type={cache_type}")
return True
except Exception as e:
logger.error(f"Failed to invalidate cache: {e}")
return False
async def toggle_user_cache(
self,
jwt_token: str,
enabled: bool
) -> bool:
"""
Toggle cache for current user.
Args:
jwt_token: JWT access token
enabled: True to enable cache, False to disable
Returns:
bool: True if successful
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.post(
"/api/cache/toggle-user",
json={"enabled": enabled},
headers=self._get_auth_headers(jwt_token)
)
response.raise_for_status()
logger.info(f"User cache toggled: enabled={enabled}")
return True
except Exception as e:
logger.error(f"Failed to toggle user cache: {e}")
return False
async def get_cache_stats(
self,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get cache statistics including user-specific settings.
Args:
jwt_token: JWT access token
Returns:
Dict with cache stats including 'user_enabled' field
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.get(
"/api/cache/stats",
headers=self._get_auth_headers(jwt_token)
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Failed to get cache stats: {e}")
return None
# =========================================================================
# HEALTH CHECK
# =========================================================================
async def health_check(self) -> bool:
"""
Check if backend is healthy and reachable.
Returns:
bool: True if backend is healthy
"""
try:
if not self.client:
self.client = AsyncClient(base_url=self.base_url, timeout=REQUEST_TIMEOUT)
response = await self.client.get("/api/telegram/health")
return response.status_code == 200
except Exception as e:
logger.error(f"Backend health check failed: {e}")
return False
# Singleton instance for global use
_backend_client_instance: Optional[BackendAPIClient] = None
def get_backend_client() -> BackendAPIClient:
"""
Get or create the singleton BackendAPIClient instance.
Returns:
BackendAPIClient: Singleton instance
"""
global _backend_client_instance
if _backend_client_instance is None:
_backend_client_instance = BackendAPIClient()
return _backend_client_instance
# Export main classes and functions
__all__ = [
'BackendAPIClient',
'get_backend_client',
'BACKEND_URL'
]