Files
roa2web-service-auto/reports-app/telegram-bot/app/bot/helpers.py
Marius Mutu 2a37959d80 Add cache source tracking (L1/L2) for Telegram bot responses
Implements cache tier identification in Telegram bot to display data source:
- "db" for database queries
- "cached L1" for in-memory cache hits
- "cached L2" for SQLite cache hits

Backend changes:
- Added cache metadata fields to TrendsResponse and DashboardSummary models
  (cache_hit, response_time_ms, cache_source)
- Updated /api/dashboard/summary and /api/dashboard/trends endpoints to
  include cache metadata when X-Include-Cache-Metadata header is present
- Cache metadata is extracted from request.state (set by @cached decorator)

Telegram bot changes:
- Updated API client to send X-Include-Cache-Metadata header
- Modified helpers to extract cache_source from backend responses
- Updated handlers to pass cache metadata to formatters
- Performance footer now displays specific cache tier (L1 vs L2)

Fixed Pydantic serialization issue:
- Changed field names from _cache_hit to cache_hit (without underscore)
- Pydantic excludes underscore-prefixed fields from JSON by default

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 22:39:09 +02:00

815 lines
26 KiB
Python

"""
Helper functions for Telegram bot command handlers.
Provides utilities for company selection, API calls, and response formatting.
"""
import logging
from typing import Optional, Dict, List, Any
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from app.api.client import get_backend_client
from app.agent.session import SessionManager
from app.bot.menus import pad_message_for_wide_buttons
logger = logging.getLogger(__name__)
async def get_active_company_or_prompt(
update: Update,
session_manager: SessionManager,
telegram_user_id: int
) -> Optional[Dict[str, Any]]:
"""
Get active company from session or prompt user to select one with buttons.
This function checks if the user has an active company set in their session.
If not, it fetches companies and displays selection buttons directly.
Args:
update: Telegram Update object (for sending messages)
session_manager: SessionManager instance
telegram_user_id: Telegram user ID
Returns:
Dict with company info (id, name, cui) if set, None if user needs to select
Example:
company = await get_active_company_or_prompt(update, session_manager, user_id)
if not company:
return # User was shown company selection buttons
# Continue with company operations...
"""
session = await session_manager.get_or_create_session(telegram_user_id)
company = session.get_active_company()
if not company:
# Get auth data and companies
from app.auth.linking import get_user_auth_data
auth_data = await get_user_auth_data(telegram_user_id)
jwt_token = auth_data['jwt_token']
client = get_backend_client()
async with client:
companies = await client.get_user_companies(jwt_token=jwt_token)
if companies:
keyboard = create_company_selection_keyboard_paginated(companies, page=0)
message = (
f"**Selecteaza mai intai o companie**\n\n"
f"Companiile tale ({len(companies)}):"
)
# Apply padding to make inline keyboard buttons wider
message = pad_message_for_wide_buttons(message)
await update.message.reply_text(
message,
reply_markup=keyboard,
parse_mode="Markdown"
)
else:
await update.message.reply_text(
"Nu ai acces la nicio companie.\n"
"Contacteaza administratorul.",
parse_mode="Markdown"
)
return None
return company
async def search_companies_by_name(
name_query: str,
jwt_token: str
) -> List[Dict[str, Any]]:
"""
Search companies by partial name match (case-insensitive).
Fetches all companies from backend and filters them by name.
Uses case-insensitive partial matching for flexible search.
Args:
name_query: Search term (partial match, e.g., "ACME")
jwt_token: JWT authentication token
Returns:
List of matching company dicts (each with id, nume_firma, cui, etc.)
Example:
companies = await search_companies_by_name("acme", token)
# Returns all companies with "acme" in their name (case-insensitive)
"""
client = get_backend_client()
async with client:
all_companies = await client.get_user_companies(jwt_token=jwt_token)
# Filter by name (case-insensitive partial match)
query_lower = name_query.lower()
matches = [
comp for comp in all_companies
if query_lower in comp.get('name', comp.get('nume_firma', '')).lower()
]
logger.info(
f"Search '{name_query}': {len(matches)} matches out of {len(all_companies)} total"
)
return matches
def create_company_selection_keyboard(
companies: List[Dict[str, Any]],
max_buttons: int = 10
) -> InlineKeyboardMarkup:
"""
Create inline keyboard for company selection (legacy - without pagination).
Generates a vertical list of buttons, one per company.
Each button shows company name and CUI, and triggers a callback.
NOTE: This function is deprecated in favor of create_company_selection_keyboard_paginated.
It's kept for backwards compatibility only.
Args:
companies: List of company dicts (with id, nume_firma, cui)
max_buttons: Maximum number of buttons to show (default: 10)
Returns:
InlineKeyboardMarkup with company selection buttons
Example:
keyboard = create_company_selection_keyboard(companies)
await update.message.reply_text("Select company:", reply_markup=keyboard)
"""
keyboard = []
for company in companies[:max_buttons]:
company_id = company.get('id_firma', company.get('id'))
company_name = company.get('name', company.get('nume_firma', 'N/A'))
company_cui = company.get('fiscal_code', company.get('cui', ''))
# Button text: "ACME SRL (CUI: 12345)"
button_text = f"{company_name}"
if company_cui:
button_text += f" ({company_cui})"
# Callback data: "select_company:123"
callback_data = f"select_company:{company_id}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=callback_data)])
# Add overflow indicator if there are more companies
if len(companies) > max_buttons:
keyboard.append([InlineKeyboardButton(
f"... și încă {len(companies) - max_buttons} companii",
callback_data="noop"
)])
return InlineKeyboardMarkup(keyboard)
def create_company_selection_keyboard_paginated(
companies: List[Dict[str, Any]],
page: int = 0,
per_page: int = 10
) -> InlineKeyboardMarkup:
"""
Create paginated inline keyboard for company selection.
Generates a vertical list of buttons for one page of companies,
with navigation buttons for previous/next pages.
Args:
companies: Full list of company dicts (with id, nume_firma, cui)
page: Current page number (0-indexed)
per_page: Number of companies per page (default: 10)
Returns:
InlineKeyboardMarkup with company buttons and pagination controls
Example:
keyboard = create_company_selection_keyboard_paginated(companies, page=0)
await update.message.reply_text("Select company:", reply_markup=keyboard)
"""
keyboard = []
# Calculate pagination
total_companies = len(companies)
total_pages = (total_companies + per_page - 1) // per_page # Ceiling division
start_idx = page * per_page
end_idx = min(start_idx + per_page, total_companies)
# Display companies for current page
page_companies = companies[start_idx:end_idx]
for company in page_companies:
company_id = company.get('id_firma', company.get('id'))
company_name = company.get('name', company.get('nume_firma', 'N/A'))
company_cui = company.get('fiscal_code', company.get('cui', ''))
# Button text: "ACME SRL (CUI: 12345)"
button_text = f"{company_name}"
if company_cui:
button_text += f" ({company_cui})"
# Callback data: "select_company:123"
callback_data = f"select_company:{company_id}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=callback_data)])
# Pagination controls (only if more than one page)
if total_pages > 1:
nav_buttons = []
# Previous button
if page > 0:
nav_buttons.append(
InlineKeyboardButton("< Anterior", callback_data=f"select_company_page:{page-1}")
)
# Page indicator (non-clickable)
nav_buttons.append(
InlineKeyboardButton(f"Pagina {page+1}/{total_pages}", callback_data="noop")
)
# Next button
if page < total_pages - 1:
nav_buttons.append(
InlineKeyboardButton("Urmator >", callback_data=f"select_company_page:{page+1}")
)
keyboard.append(nav_buttons)
# Back to menu button
keyboard.append([
InlineKeyboardButton("< Inapoi la Meniu", callback_data="action:menu")
])
return InlineKeyboardMarkup(keyboard)
def format_company_context_footer(company_name: str) -> str:
"""
Format discrete footer with company context.
Adds a subtle footer to command responses showing the active company
and a quick link to change it.
Args:
company_name: Active company name
Returns:
Formatted footer string with separator and company name
Example:
footer = format_company_context_footer("ACME SRL")
message = f"Dashboard data...\n{footer}"
# Output: "Dashboard data...\n\n━━━━━━━━━━━━━━\nCompanie: ACME SRL"
"""
return f"\n\n━━━━━━━━━━━━━━\nCompanie: {company_name}"
# =========================================================================
# FAZA 2: New Helper Functions for Button Interface
# =========================================================================
async def get_treasury_breakdown_split(
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get treasury breakdown split into casa and banca.
Fetches treasury breakdown from backend and transforms it
to the format expected by formatters.
Backend returns:
{
"total": float,
"breakdown": {
"casa": {"total": float, "items": [{"nume": str, "cont": str, "sold": float}]},
"banca": {"total": float, "items": [{"nume": str, "cont": str, "sold": float}]}
},
"currency": "RON"
}
Args:
company_id: Company ID
jwt_token: JWT authentication token
Returns:
Dict with two keys:
- 'casa': Dict with 'accounts' (list) and 'total' (float)
- 'banca': Dict with 'accounts' (list) and 'total' (float)
None if request fails
Example:
data = await get_treasury_breakdown_split(1, token)
casa_total = data['casa']['total'] # Total cash balance
bank_accounts = data['banca']['accounts'] # List of bank accounts
"""
try:
client = get_backend_client()
async with client:
breakdown = await client.get_treasury_breakdown(
company_id=company_id,
jwt_token=jwt_token
)
if not breakdown:
return None
# Backend already splits data into casa and banca
# Transform backend structure to match formatter expectations
breakdown_data = breakdown.get('breakdown', {})
casa_data = breakdown_data.get('casa', {})
banca_data = breakdown_data.get('banca', {})
# Transform items to accounts format (nume->name, sold->balance)
casa_accounts = [
{
'name': item.get('nume', f"Cont {item.get('cont', 'N/A')}"),
'balance': float(item.get('sold', 0)),
'cont': item.get('cont', '')
}
for item in casa_data.get('items', [])
]
banca_accounts = [
{
'name': item.get('nume', f"Cont {item.get('cont', 'N/A')}"),
'balance': float(item.get('sold', 0)),
'cont': item.get('cont', '')
}
for item in banca_data.get('items', [])
]
result = {
'casa': {
'accounts': casa_accounts,
'total': float(casa_data.get('total', 0))
},
'banca': {
'accounts': banca_accounts,
'total': float(banca_data.get('total', 0))
}
}
# Pass through cache metadata if present
if 'cache_hit' in breakdown:
result['cache_hit'] = breakdown['cache_hit']
if 'response_time_ms' in breakdown:
result['response_time_ms'] = breakdown['response_time_ms']
if 'cache_source' in breakdown:
result['cache_source'] = breakdown['cache_source']
return result
except Exception as e:
logger.error(f"Error getting treasury breakdown split: {e}", exc_info=True)
return None
async def get_clients_with_maturity(
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get clients list with maturity breakdown.
Uses maturity analysis endpoint which returns client summaries
with amounts and overdue status.
Backend returns:
{
"clients": [{"name": str, "amount": float, "dueDate": str, "daysOverdue": int}],
"suppliers": [...],
"balance": float,
"metadata": {...}
}
Args:
company_id: Company ID
jwt_token: JWT authentication token
Returns:
Dict with:
- 'clients': List of client dicts (id, name, balance)
- 'maturity': Dict with 'in_term', 'overdue', 'total' amounts
None if request fails
Example:
data = await get_clients_with_maturity(1, token)
clients = data['clients'] # List of all clients
overdue = data['maturity']['overdue'] # Overdue amount
"""
try:
client = get_backend_client()
async with client:
# Get maturity analysis (contains client summaries)
maturity_response = await client.get_maturity_data(
company_id=company_id,
jwt_token=jwt_token,
period='all'
)
if not maturity_response:
return None
# Extract clients from maturity response
clients_raw = maturity_response.get('clients', [])
# Transform to expected format: amount → balance
clients = [
{
'name': c.get('name', 'N/A'),
'balance': float(c.get('amount', 0)),
'daysOverdue': c.get('daysOverdue', 0)
}
for c in clients_raw
]
# Calculate maturity breakdown from clients data
total = sum(c['balance'] for c in clients)
overdue = sum(c['balance'] for c in clients if c.get('daysOverdue', 0) > 0)
in_term = total - overdue
result = {
'clients': clients,
'maturity': {
'in_term': in_term,
'overdue': overdue,
'total': total
}
}
# Pass through cache metadata if present
if 'cache_hit' in maturity_response:
result['cache_hit'] = maturity_response['cache_hit']
if 'response_time_ms' in maturity_response:
result['response_time_ms'] = maturity_response['response_time_ms']
if 'cache_source' in maturity_response:
result['cache_source'] = maturity_response['cache_source']
return result
except Exception as e:
logger.error(f"Error getting clients with maturity: {e}", exc_info=True)
return None
async def get_suppliers_with_maturity(
company_id: int,
jwt_token: str
) -> Optional[Dict[str, Any]]:
"""
Get suppliers list with maturity breakdown.
Uses maturity analysis endpoint which returns supplier summaries
with amounts and overdue status.
Backend returns:
{
"clients": [...],
"suppliers": [{"name": str, "amount": float, "dueDate": str, "daysOverdue": int}],
"balance": float,
"metadata": {...}
}
Args:
company_id: Company ID
jwt_token: JWT authentication token
Returns:
Dict with:
- 'suppliers': List of supplier dicts (id, name, balance)
- 'maturity': Dict with 'in_term', 'overdue', 'total' amounts
None if request fails
Example:
data = await get_suppliers_with_maturity(1, token)
suppliers = data['suppliers'] # List of all suppliers
in_term = data['maturity']['in_term'] # In-term amount
"""
try:
client = get_backend_client()
async with client:
# Get maturity analysis (contains supplier summaries)
maturity_response = await client.get_maturity_data(
company_id=company_id,
jwt_token=jwt_token,
period='all'
)
if not maturity_response:
return None
# Extract suppliers from maturity response
suppliers_raw = maturity_response.get('suppliers', [])
# Transform to expected format: amount → balance
suppliers = [
{
'name': s.get('name', 'N/A'),
'balance': float(s.get('amount', 0)),
'daysOverdue': s.get('daysOverdue', 0)
}
for s in suppliers_raw
]
# Calculate maturity breakdown from suppliers data
total = sum(s['balance'] for s in suppliers)
overdue = sum(s['balance'] for s in suppliers if s.get('daysOverdue', 0) > 0)
in_term = total - overdue
result = {
'suppliers': suppliers,
'maturity': {
'in_term': in_term,
'overdue': overdue,
'total': total
}
}
# Pass through cache metadata if present
if 'cache_hit' in maturity_response:
result['cache_hit'] = maturity_response['cache_hit']
if 'response_time_ms' in maturity_response:
result['response_time_ms'] = maturity_response['response_time_ms']
if 'cache_source' in maturity_response:
result['cache_source'] = maturity_response['cache_source']
return result
except Exception as e:
logger.error(f"Error getting suppliers with maturity: {e}", exc_info=True)
return None
async def get_cashflow_evolution_data(
company_id: int,
jwt_token: str,
period: str = "12m"
) -> Optional[Dict[str, Any]]:
"""
Get cash flow evolution data with YTD comparison.
Uses trends endpoint which returns 12-month historical data for current and previous year.
Calculates YTD for comparison and extracts last 12 months in reverse chronological order.
Args:
company_id: Company ID
jwt_token: JWT authentication token
period: Period for trends data (default: "12m")
Returns:
Dict with:
- 'performance': Dict with YTD data for current and previous year
- 'monthly': Dict with last 12 months data (reverse chronological) + prev year comparison
None if request fails
Example:
data = await get_cashflow_evolution_data(1, token)
ytd_2025 = data['performance']['current_year']
ytd_2024 = data['performance']['previous_year']
"""
try:
client = get_backend_client()
async with client:
# Get trends data (12 months of historical data)
trends_data = await client.get_trends(
company_id=company_id,
jwt_token=jwt_token,
period="12m"
)
if not trends_data:
return None
# Extract current year data
periods = trends_data.get('periods', []) # ["2024-01", "2024-02", ...]
clienti_incasat = trends_data.get('clienti_incasat', [])
furnizori_achitat = trends_data.get('furnizori_achitat', [])
# Extract previous year data
previous_periods = trends_data.get('previous_periods', [])
clienti_incasat_prev = trends_data.get('clienti_incasat_prev', [])
furnizori_achitat_prev = trends_data.get('furnizori_achitat_prev', [])
if not periods or not clienti_incasat or not furnizori_achitat:
logger.warning("Trends data missing required fields")
return None
# Calculate YTD (Year-To-Date) = sum of all available months
incasari_ytd = sum(clienti_incasat)
plati_ytd = sum(furnizori_achitat)
net_ytd = incasari_ytd - plati_ytd
incasari_ytd_prev = sum(clienti_incasat_prev) if clienti_incasat_prev else 0
plati_ytd_prev = sum(furnizori_achitat_prev) if furnizori_achitat_prev else 0
net_ytd_prev = incasari_ytd_prev - plati_ytd_prev
# Extract years from periods
current_year = periods[-1].split('-')[0] if periods else "2025"
previous_year = previous_periods[-1].split('-')[0] if previous_periods else "2024"
# Take last 12 months (current year)
last_12_periods = periods[-12:]
last_12_incasari = clienti_incasat[-12:]
last_12_plati = furnizori_achitat[-12:]
# Take corresponding previous year months
last_12_periods_prev = previous_periods[-12:] if previous_periods else []
last_12_incasari_prev = clienti_incasat_prev[-12:] if clienti_incasat_prev else [0] * 12
last_12_plati_prev = furnizori_achitat_prev[-12:] if furnizori_achitat_prev else [0] * 12
# Month abbreviations (Romanian)
month_abbr = {
'01': 'Ian', '02': 'Feb', '03': 'Mar', '04': 'Apr',
'05': 'Mai', '06': 'Iun', '07': 'Iul', '08': 'Aug',
'09': 'Sep', '10': 'Oct', '11': 'Noi', '12': 'Dec'
}
# Format months as "Noi'25/'24"
formatted_months = []
for i, period_str in enumerate(last_12_periods):
if '-' in period_str:
year = period_str.split('-')[0][-2:] # Last 2 digits: "25"
month_num = period_str.split('-')[1]
month_name = month_abbr.get(month_num, month_num)
# Get previous year month
prev_year = previous_year[-2:] if previous_year else "24"
formatted_months.append(f"{month_name}'{year}/'{prev_year}")
else:
formatted_months.append(period_str)
# Reverse chronological order (newest first)
formatted_months.reverse()
last_12_incasari.reverse()
last_12_plati.reverse()
last_12_incasari_prev.reverse()
last_12_plati_prev.reverse()
# Build performance summary (YTD)
performance = {
'current_year': {
'year': current_year,
'incasari': incasari_ytd,
'plati': plati_ytd,
'net': net_ytd
},
'previous_year': {
'year': previous_year,
'incasari': incasari_ytd_prev,
'plati': plati_ytd_prev,
'net': net_ytd_prev
}
}
# Build monthly breakdown (reverse chronological with prev year comparison)
monthly = {
'months': formatted_months,
'incasari': last_12_incasari,
'plati': last_12_plati,
'incasari_prev': last_12_incasari_prev,
'plati_prev': last_12_plati_prev
}
result = {
'performance': performance,
'monthly': monthly
}
# Pass through cache metadata if present
if 'cache_hit' in trends_data:
result['cache_hit'] = trends_data['cache_hit']
if 'response_time_ms' in trends_data:
result['response_time_ms'] = trends_data['response_time_ms']
if 'cache_source' in trends_data:
result['cache_source'] = trends_data['cache_source']
return result
except Exception as e:
logger.error(f"Error getting cashflow evolution data: {e}", exc_info=True)
return None
async def get_client_invoices(
company_id: int,
client_name: str,
jwt_token: str
) -> List[Dict[str, Any]]:
"""
Get invoices for a specific client.
Args:
company_id: Company ID
client_name: Client name to filter by
jwt_token: JWT authentication token
Returns:
List of invoice dicts for the specified client
Example:
invoices = await get_client_invoices(1, "ACME Corp", token)
for inv in invoices:
print(inv['number'], inv['amount'])
"""
try:
logger.info(f"Fetching invoices for client '{client_name}' (company_id={company_id})")
client = get_backend_client()
async with client:
# Filter only by unpaid invoices (with balance > 0)
invoices = await client.search_invoices(
company_id=company_id,
jwt_token=jwt_token,
filters={
'partner_type': 'CLIENTI',
'partner_name': client_name,
'only_unpaid': True # Only show unpaid invoices (matching balance > 0)
}
)
logger.info(f"Found {len(invoices) if invoices else 0} invoices for client '{client_name}'")
if invoices:
logger.debug(f"First invoice sample: {invoices[0]}")
return invoices or []
except Exception as e:
logger.error(f"Error getting client invoices for '{client_name}': {e}", exc_info=True)
return []
async def get_supplier_invoices(
company_id: int,
supplier_name: str,
jwt_token: str
) -> List[Dict[str, Any]]:
"""
Get invoices for a specific supplier.
Args:
company_id: Company ID
supplier_name: Supplier name to filter by
jwt_token: JWT authentication token
Returns:
List of invoice dicts for the specified supplier
Example:
invoices = await get_supplier_invoices(1, "Supplier Inc", token)
for inv in invoices:
print(inv['number'], inv['amount'])
"""
try:
logger.info(f"Fetching invoices for supplier '{supplier_name}' (company_id={company_id})")
client = get_backend_client()
async with client:
# Filter only by unpaid invoices (with balance > 0)
invoices = await client.search_invoices(
company_id=company_id,
jwt_token=jwt_token,
filters={
'partner_type': 'FURNIZORI',
'partner_name': supplier_name,
'only_unpaid': True # Only show unpaid invoices (matching balance > 0)
}
)
logger.info(f"Found {len(invoices) if invoices else 0} invoices for supplier '{supplier_name}'")
if invoices:
logger.debug(f"First invoice sample: {invoices[0]}")
return invoices or []
except Exception as e:
logger.error(f"Error getting supplier invoices for '{supplier_name}': {e}", exc_info=True)
return []
# Export all helper functions
__all__ = [
'get_active_company_or_prompt',
'search_companies_by_name',
'create_company_selection_keyboard',
'create_company_selection_keyboard_paginated',
'format_company_context_footer',
'get_treasury_breakdown_split',
'get_clients_with_maturity',
'get_suppliers_with_maturity',
'get_cashflow_evolution_data',
'get_client_invoices',
'get_supplier_invoices'
]