Files
roa2web-service-auto/reports-app/telegram-bot/app/bot/helpers.py
Marius Mutu 6b13ffa183 Initial commit: ROA2WEB - FastAPI + Vue.js + Telegram Bot
Modern ERP Reports Application with microservices architecture

Tech Stack:
- Backend: FastAPI + python-oracledb (Oracle DB integration)
- Frontend: Vue.js 3 + PrimeVue + Vite
- Telegram Bot: python-telegram-bot + SQLite
- Infrastructure: Shared database pool, JWT authentication, SSH tunnel

Features:
- FastAPI backend with async Oracle connection pool
- Vue.js 3 responsive frontend with PrimeVue components
- Telegram bot alternative interface
- Microservices architecture with shared components
- Complete deployment support (Linux Docker + Windows IIS)
- Comprehensive testing (Playwright E2E + pytest)

Repository Structure:
- reports-app/ - Main application (backend, frontend, telegram-bot)
- shared/ - Shared components (database pool, auth, utils)
- deployment/ - Deployment scripts (Linux & Windows)
- docs/ - Project documentation
- security/ - Security scanning and git hooks
2025-10-25 14:55:08 +03:00

706 lines
22 KiB
Python

"""
Helper functions for Telegram bot command handlers.
Provides utilities for company selection, API calls, and response formatting.
"""
import logging
from typing import Optional, Dict, List, Any
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from app.api.client import get_backend_client
from app.agent.session import SessionManager
from app.bot.menus import pad_message_for_wide_buttons
logger = logging.getLogger(__name__)
async def get_active_company_or_prompt(
update: Update,
session_manager: SessionManager,
telegram_user_id: int
) -> Optional[Dict[str, Any]]:
"""
Get active company from session or prompt user to select one with buttons.
This function checks if the user has an active company set in their session.
If not, it fetches companies and displays selection buttons directly.
Args:
update: Telegram Update object (for sending messages)
session_manager: SessionManager instance
telegram_user_id: Telegram user ID
Returns:
Dict with company info (id, name, cui) if set, None if user needs to select
Example:
company = await get_active_company_or_prompt(update, session_manager, user_id)
if not company:
return # User was shown company selection buttons
# Continue with company operations...
"""
session = await session_manager.get_or_create_session(telegram_user_id)
company = session.get_active_company()
if not company:
# Get auth data and companies
from app.auth.linking import get_user_auth_data
auth_data = await get_user_auth_data(telegram_user_id)
jwt_token = auth_data['jwt_token']
client = get_backend_client()
async with client:
companies = await client.get_user_companies(jwt_token=jwt_token)
if companies:
keyboard = create_company_selection_keyboard_paginated(companies, page=0)
message = (
f"**Selecteaza mai intai o companie**\n\n"
f"Companiile tale ({len(companies)}):"
)
# Apply padding to make inline keyboard buttons wider
message = pad_message_for_wide_buttons(message)
await update.message.reply_text(
message,
reply_markup=keyboard,
parse_mode="Markdown"
)
else:
await update.message.reply_text(
"Nu ai acces la nicio companie.\n"
"Contacteaza administratorul.",
parse_mode="Markdown"
)
return None
return company
async def search_companies_by_name(
name_query: str,
jwt_token: str
) -> List[Dict[str, Any]]:
"""
Search companies by partial name match (case-insensitive).
Fetches all companies from backend and filters them by name.
Uses case-insensitive partial matching for flexible search.
Args:
name_query: Search term (partial match, e.g., "ACME")
jwt_token: JWT authentication token
Returns:
List of matching company dicts (each with id, nume_firma, cui, etc.)
Example:
companies = await search_companies_by_name("acme", token)
# Returns all companies with "acme" in their name (case-insensitive)
"""
client = get_backend_client()
async with client:
all_companies = await client.get_user_companies(jwt_token=jwt_token)
# Filter by name (case-insensitive partial match)
query_lower = name_query.lower()
matches = [
comp for comp in all_companies
if query_lower in comp.get('name', comp.get('nume_firma', '')).lower()
]
logger.info(
f"Search '{name_query}': {len(matches)} matches out of {len(all_companies)} total"
)
return matches
def create_company_selection_keyboard(
companies: List[Dict[str, Any]],
max_buttons: int = 10
) -> InlineKeyboardMarkup:
"""
Create inline keyboard for company selection (legacy - without pagination).
Generates a vertical list of buttons, one per company.
Each button shows company name and CUI, and triggers a callback.
NOTE: This function is deprecated in favor of create_company_selection_keyboard_paginated.
It's kept for backwards compatibility only.
Args:
companies: List of company dicts (with id, nume_firma, cui)
max_buttons: Maximum number of buttons to show (default: 10)
Returns:
InlineKeyboardMarkup with company selection buttons
Example:
keyboard = create_company_selection_keyboard(companies)
await update.message.reply_text("Select company:", reply_markup=keyboard)
"""
keyboard = []
for company in companies[:max_buttons]:
company_id = company.get('id_firma', company.get('id'))
company_name = company.get('name', company.get('nume_firma', 'N/A'))
company_cui = company.get('fiscal_code', company.get('cui', ''))
# Button text: "ACME SRL (CUI: 12345)"
button_text = f"{company_name}"
if company_cui:
button_text += f" ({company_cui})"
# Callback data: "select_company:123"
callback_data = f"select_company:{company_id}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=callback_data)])
# Add overflow indicator if there are more companies
if len(companies) > max_buttons:
keyboard.append([InlineKeyboardButton(
f"... și încă {len(companies) - max_buttons} companii",
callback_data="noop"
)])
return InlineKeyboardMarkup(keyboard)
def create_company_selection_keyboard_paginated(
companies: List[Dict[str, Any]],
page: int = 0,
per_page: int = 10
) -> InlineKeyboardMarkup:
"""
Create paginated inline keyboard for company selection.
Generates a vertical list of buttons for one page of companies,
with navigation buttons for previous/next pages.
Args:
companies: Full list of company dicts (with id, nume_firma, cui)
page: Current page number (0-indexed)
per_page: Number of companies per page (default: 10)
Returns:
InlineKeyboardMarkup with company buttons and pagination controls
Example:
keyboard = create_company_selection_keyboard_paginated(companies, page=0)
await update.message.reply_text("Select company:", reply_markup=keyboard)
"""
keyboard = []
# Calculate pagination
total_companies = len(companies)
total_pages = (total_companies + per_page - 1) // per_page # Ceiling division
start_idx = page * per_page
end_idx = min(start_idx + per_page, total_companies)
# Display companies for current page
page_companies = companies[start_idx:end_idx]
for company in page_companies:
company_id = company.get('id_firma', company.get('id'))
company_name = company.get('name', company.get('nume_firma', 'N/A'))
company_cui = company.get('fiscal_code', company.get('cui', ''))
# Button text: "ACME SRL (CUI: 12345)"
button_text = f"{company_name}"
if company_cui:
button_text += f" ({company_cui})"
# Callback data: "select_company:123"
callback_data = f"select_company:{company_id}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=callback_data)])
# Pagination controls (only if more than one page)
if total_pages > 1:
nav_buttons = []
# Previous button
if page > 0:
nav_buttons.append(
InlineKeyboardButton("< Anterior", callback_data=f"select_company_page:{page-1}")
)
# Page indicator (non-clickable)
nav_buttons.append(
InlineKeyboardButton(f"Pagina {page+1}/{total_pages}", callback_data="noop")
)
# Next button
if page < total_pages - 1:
nav_buttons.append(
InlineKeyboardButton("Urmator >", callback_data=f"select_company_page:{page+1}")
)
keyboard.append(nav_buttons)
# Back to menu button
keyboard.append([
InlineKeyboardButton("< Inapoi la Meniu", callback_data="action:menu")
])
return InlineKeyboardMarkup(keyboard)
def format_company_context_footer(company_name: str) -> str:
"""
Format discrete footer with company context.
Adds a subtle footer to command responses showing the active company
and a quick link to change it.
Args:
company_name: Active company name
Returns:
Formatted footer string with separator and company name
Example:
footer = format_company_context_footer("ACME SRL")
message = f"Dashboard data...\n{footer}"
# Output: "Dashboard data...\n\n━━━━━━━━━━━━━━\nCompanie: ACME SRL"
"""
return f"\n\n━━━━━━━━━━━━━━\nCompanie: {company_name}"
# =========================================================================
# FAZA 2: New Helper Functions for Button Interface
# =========================================================================
async def get_treasury_breakdown_split(
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get treasury breakdown split into casa and banca.
Fetches treasury breakdown from backend and transforms it
to the format expected by formatters.
Backend returns:
{
"total": float,
"breakdown": {
"casa": {"total": float, "items": [{"nume": str, "cont": str, "sold": float}]},
"banca": {"total": float, "items": [{"nume": str, "cont": str, "sold": float}]}
},
"currency": "RON"
}
Args:
company_id: Company ID
jwt_token: JWT authentication token
Returns:
Dict with two keys:
- 'casa': Dict with 'accounts' (list) and 'total' (float)
- 'banca': Dict with 'accounts' (list) and 'total' (float)
None if request fails
Example:
data = await get_treasury_breakdown_split(1, token)
casa_total = data['casa']['total'] # Total cash balance
bank_accounts = data['banca']['accounts'] # List of bank accounts
"""
try:
client = get_backend_client()
async with client:
breakdown = await client.get_treasury_breakdown(
company_id=company_id,
jwt_token=jwt_token
)
if not breakdown:
return None
# Backend already splits data into casa and banca
# Transform backend structure to match formatter expectations
breakdown_data = breakdown.get('breakdown', {})
casa_data = breakdown_data.get('casa', {})
banca_data = breakdown_data.get('banca', {})
# Transform items to accounts format (nume->name, sold->balance)
casa_accounts = [
{
'name': item.get('nume', f"Cont {item.get('cont', 'N/A')}"),
'balance': float(item.get('sold', 0)),
'cont': item.get('cont', '')
}
for item in casa_data.get('items', [])
]
banca_accounts = [
{
'name': item.get('nume', f"Cont {item.get('cont', 'N/A')}"),
'balance': float(item.get('sold', 0)),
'cont': item.get('cont', '')
}
for item in banca_data.get('items', [])
]
return {
'casa': {
'accounts': casa_accounts,
'total': float(casa_data.get('total', 0))
},
'banca': {
'accounts': banca_accounts,
'total': float(banca_data.get('total', 0))
}
}
except Exception as e:
logger.error(f"Error getting treasury breakdown split: {e}", exc_info=True)
return None
async def get_clients_with_maturity(
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get clients list with maturity breakdown.
Uses maturity analysis endpoint which returns client summaries
with amounts and overdue status.
Backend returns:
{
"clients": [{"name": str, "amount": float, "dueDate": str, "daysOverdue": int}],
"suppliers": [...],
"balance": float,
"metadata": {...}
}
Args:
company_id: Company ID
jwt_token: JWT authentication token
Returns:
Dict with:
- 'clients': List of client dicts (id, name, balance)
- 'maturity': Dict with 'in_term', 'overdue', 'total' amounts
None if request fails
Example:
data = await get_clients_with_maturity(1, token)
clients = data['clients'] # List of all clients
overdue = data['maturity']['overdue'] # Overdue amount
"""
try:
client = get_backend_client()
async with client:
# Get maturity analysis (contains client summaries)
maturity_response = await client.get_maturity_data(
company_id=company_id,
jwt_token=jwt_token,
period='all'
)
if not maturity_response:
return None
# Extract clients from maturity response
clients_raw = maturity_response.get('clients', [])
# Transform to expected format: amount → balance
clients = [
{
'name': c.get('name', 'N/A'),
'balance': float(c.get('amount', 0)),
'daysOverdue': c.get('daysOverdue', 0)
}
for c in clients_raw
]
# Calculate maturity breakdown from clients data
total = sum(c['balance'] for c in clients)
overdue = sum(c['balance'] for c in clients if c.get('daysOverdue', 0) > 0)
in_term = total - overdue
return {
'clients': clients,
'maturity': {
'in_term': in_term,
'overdue': overdue,
'total': total
}
}
except Exception as e:
logger.error(f"Error getting clients with maturity: {e}", exc_info=True)
return None
async def get_suppliers_with_maturity(
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get suppliers list with maturity breakdown.
Uses maturity analysis endpoint which returns supplier summaries
with amounts and overdue status.
Backend returns:
{
"clients": [...],
"suppliers": [{"name": str, "amount": float, "dueDate": str, "daysOverdue": int}],
"balance": float,
"metadata": {...}
}
Args:
company_id: Company ID
jwt_token: JWT authentication token
Returns:
Dict with:
- 'suppliers': List of supplier dicts (id, name, balance)
- 'maturity': Dict with 'in_term', 'overdue', 'total' amounts
None if request fails
Example:
data = await get_suppliers_with_maturity(1, token)
suppliers = data['suppliers'] # List of all suppliers
in_term = data['maturity']['in_term'] # In-term amount
"""
try:
client = get_backend_client()
async with client:
# Get maturity analysis (contains supplier summaries)
maturity_response = await client.get_maturity_data(
company_id=company_id,
jwt_token=jwt_token,
period='all'
)
if not maturity_response:
return None
# Extract suppliers from maturity response
suppliers_raw = maturity_response.get('suppliers', [])
# Transform to expected format: amount → balance
suppliers = [
{
'name': s.get('name', 'N/A'),
'balance': float(s.get('amount', 0)),
'daysOverdue': s.get('daysOverdue', 0)
}
for s in suppliers_raw
]
# Calculate maturity breakdown from suppliers data
total = sum(s['balance'] for s in suppliers)
overdue = sum(s['balance'] for s in suppliers if s.get('daysOverdue', 0) > 0)
in_term = total - overdue
return {
'suppliers': suppliers,
'maturity': {
'in_term': in_term,
'overdue': overdue,
'total': total
}
}
except Exception as e:
logger.error(f"Error getting suppliers with maturity: {e}", exc_info=True)
return None
async def get_cashflow_evolution_data(
company_id: int,
jwt_token: str,
period: str = "12m"
) -> Optional[Dict[str, Any]]:
"""
Get cash flow evolution data.
Uses monthly flows endpoint which returns current month data.
Backend returns: {'inflows': float, 'outflows': float, 'period': str, 'currency': str}
Args:
company_id: Company ID
jwt_token: JWT authentication token
period: Period for monthly data (default: "12m")
Returns:
Dict with:
- 'performance': Dict with incasari_total, plati_total, net
- 'monthly': Dict with months, incasari, plati arrays
None if request fails
Example:
data = await get_cashflow_evolution_data(1, token)
net = data['performance']['net'] # Net cash flow
months = data['monthly']['months'] # List of month names
"""
try:
client = get_backend_client()
async with client:
# Get monthly flows (current month only from backend)
monthly_flows = await client.get_monthly_flows(
company_id=company_id,
jwt_token=jwt_token,
months=12 # Note: backend ignores this and returns only current month
)
if not monthly_flows:
return None
# Transform backend response to expected format
inflows = float(monthly_flows.get('inflows', 0))
outflows = float(monthly_flows.get('outflows', 0))
period_name = monthly_flows.get('period', 'Luna curentă')
# Calculate net
net = inflows - outflows
# Build performance summary
performance = {
'incasari_total': inflows,
'plati_total': outflows,
'net': net
}
# Build monthly breakdown (single month from backend)
monthly = {
'months': [period_name],
'incasari': [inflows],
'plati': [outflows]
}
return {
'performance': performance,
'monthly': monthly
}
except Exception as e:
logger.error(f"Error getting cashflow evolution data: {e}", exc_info=True)
return None
async def get_client_invoices(
company_id: int,
client_name: str,
jwt_token: str
) -> List[Dict[str, Any]]:
"""
Get invoices for a specific client.
Args:
company_id: Company ID
client_name: Client name to filter by
jwt_token: JWT authentication token
Returns:
List of invoice dicts for the specified client
Example:
invoices = await get_client_invoices(1, "ACME Corp", token)
for inv in invoices:
print(inv['number'], inv['amount'])
"""
try:
logger.info(f"Fetching invoices for client '{client_name}' (company_id={company_id})")
client = get_backend_client()
async with client:
# Filter only by unpaid invoices (with balance > 0)
invoices = await client.search_invoices(
company_id=company_id,
jwt_token=jwt_token,
filters={
'partner_type': 'CLIENTI',
'partner_name': client_name,
'only_unpaid': True # Only show unpaid invoices (matching balance > 0)
}
)
logger.info(f"Found {len(invoices) if invoices else 0} invoices for client '{client_name}'")
if invoices:
logger.debug(f"First invoice sample: {invoices[0]}")
return invoices or []
except Exception as e:
logger.error(f"Error getting client invoices for '{client_name}': {e}", exc_info=True)
return []
async def get_supplier_invoices(
company_id: int,
supplier_name: str,
jwt_token: str
) -> List[Dict[str, Any]]:
"""
Get invoices for a specific supplier.
Args:
company_id: Company ID
supplier_name: Supplier name to filter by
jwt_token: JWT authentication token
Returns:
List of invoice dicts for the specified supplier
Example:
invoices = await get_supplier_invoices(1, "Supplier Inc", token)
for inv in invoices:
print(inv['number'], inv['amount'])
"""
try:
logger.info(f"Fetching invoices for supplier '{supplier_name}' (company_id={company_id})")
client = get_backend_client()
async with client:
# Filter only by unpaid invoices (with balance > 0)
invoices = await client.search_invoices(
company_id=company_id,
jwt_token=jwt_token,
filters={
'partner_type': 'FURNIZORI',
'partner_name': supplier_name,
'only_unpaid': True # Only show unpaid invoices (matching balance > 0)
}
)
logger.info(f"Found {len(invoices) if invoices else 0} invoices for supplier '{supplier_name}'")
if invoices:
logger.debug(f"First invoice sample: {invoices[0]}")
return invoices or []
except Exception as e:
logger.error(f"Error getting supplier invoices for '{supplier_name}': {e}", exc_info=True)
return []
# Export all helper functions
__all__ = [
'get_active_company_or_prompt',
'search_companies_by_name',
'create_company_selection_keyboard',
'create_company_selection_keyboard_paginated',
'format_company_context_footer',
'get_treasury_breakdown_split',
'get_clients_with_maturity',
'get_suppliers_with_maturity',
'get_cashflow_evolution_data',
'get_client_invoices',
'get_supplier_invoices'
]