- Refactor telegram bot tests (remove old, add new real flow tests) - Add conftest.py for telegram bot test fixtures - Update validate.md command 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
277 lines
9.8 KiB
Python
277 lines
9.8 KiB
Python
# reports-app/telegram-bot/tests/test_flows_real.py
|
|
"""
|
|
REAL Integration Tests for complete user flows.
|
|
|
|
Tests end-to-end flows with REAL backend API and database.
|
|
Requires:
|
|
- Backend API running on localhost:8001
|
|
- SQLite database with at least one linked user
|
|
- SSH tunnel for Oracle (if backend needs it)
|
|
|
|
USAGE:
|
|
pytest tests/test_flows_real.py -m integration -v
|
|
"""
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestAuthFlowReal:
|
|
"""Tests for authentication flow with real backend"""
|
|
|
|
async def test_user_linking_check(self, backend_available, auth_data):
|
|
"""Verify we can check if a user is linked"""
|
|
assert auth_data is not None
|
|
assert "telegram_user_id" in auth_data
|
|
assert "jwt_token" in auth_data
|
|
assert auth_data["jwt_token"] is not None
|
|
|
|
async def test_jwt_token_is_valid(self, backend_client, jwt_token):
|
|
"""Verify JWT token can be used to make API calls"""
|
|
companies = await backend_client.get_user_companies(jwt_token=jwt_token)
|
|
|
|
# Token should allow fetching companies
|
|
assert companies is not None
|
|
assert isinstance(companies, list)
|
|
|
|
async def test_auth_data_has_username(self, auth_data):
|
|
"""Verify auth data includes Oracle username"""
|
|
assert "oracle_username" in auth_data
|
|
assert auth_data["oracle_username"] is not None
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestDashboardFlowReal:
|
|
"""Tests for dashboard data flow with real backend"""
|
|
|
|
async def test_get_dashboard_data(self, backend_client, jwt_token, test_company_id):
|
|
"""Verify we can get dashboard data from backend"""
|
|
dashboard_data = await backend_client.get_dashboard_data(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
|
|
assert dashboard_data is not None
|
|
assert isinstance(dashboard_data, dict)
|
|
|
|
async def test_dashboard_to_formatted_output(self, backend_client, jwt_token, test_company_id, test_company_name):
|
|
"""Verify complete flow: API data -> formatted output"""
|
|
from app.bot.formatters import format_dashboard_response
|
|
|
|
# 1. Get real data
|
|
data = await backend_client.get_dashboard_data(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
|
|
# 2. Format response
|
|
formatted = format_dashboard_response(data, test_company_name)
|
|
|
|
# 3. Verify output
|
|
assert isinstance(formatted, str)
|
|
assert len(formatted) > 0
|
|
# Should contain company name or some data
|
|
assert test_company_name in formatted or "RON" in formatted or "lei" in formatted.lower()
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestTreasuryFlowReal:
|
|
"""Tests for treasury data flow with real backend"""
|
|
|
|
async def test_get_treasury_data(self, backend_client, jwt_token, test_company_id):
|
|
"""Verify we can get treasury breakdown from backend"""
|
|
treasury_data = await backend_client.get_treasury_breakdown(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
|
|
assert treasury_data is not None
|
|
|
|
async def test_treasury_breakdown_split(self, jwt_token, test_company_id):
|
|
"""Verify treasury breakdown can be split into casa/banca"""
|
|
from app.bot.helpers import get_treasury_breakdown_split
|
|
|
|
casa_data, banca_data = await get_treasury_breakdown_split(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
|
|
# Both should be returned (even if empty)
|
|
assert casa_data is not None or banca_data is not None
|
|
|
|
async def test_treasury_casa_formatted(self, jwt_token, test_company_id, test_company_name):
|
|
"""Verify casa formatting works with real data"""
|
|
from app.bot.formatters import format_treasury_casa_response
|
|
from app.bot.helpers import get_treasury_breakdown_split
|
|
|
|
# 1. Get and split data
|
|
casa_data, _ = await get_treasury_breakdown_split(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
|
|
# 2. Format (even if empty)
|
|
formatted = format_treasury_casa_response(casa_data or [], test_company_name)
|
|
|
|
assert isinstance(formatted, str)
|
|
|
|
async def test_treasury_banca_formatted(self, jwt_token, test_company_id, test_company_name):
|
|
"""Verify banca formatting works with real data"""
|
|
from app.bot.formatters import format_treasury_banca_response
|
|
from app.bot.helpers import get_treasury_breakdown_split
|
|
|
|
# 1. Get and split data
|
|
_, banca_data = await get_treasury_breakdown_split(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
|
|
# 2. Format (even if empty)
|
|
formatted = format_treasury_banca_response(banca_data or [], test_company_name)
|
|
|
|
assert isinstance(formatted, str)
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestInvoicesFlowReal:
|
|
"""Tests for invoices data flow with real backend"""
|
|
|
|
async def test_get_invoices_clienti(self, backend_client, jwt_token, test_company_id):
|
|
"""Verify we can get client invoices"""
|
|
invoices = await backend_client.search_invoices(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token,
|
|
filters={"partner_type": "CLIENTI", "only_unpaid": True}
|
|
)
|
|
|
|
assert invoices is not None
|
|
|
|
async def test_get_invoices_furnizori(self, backend_client, jwt_token, test_company_id):
|
|
"""Verify we can get supplier invoices"""
|
|
invoices = await backend_client.search_invoices(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token,
|
|
filters={"partner_type": "FURNIZORI", "only_unpaid": True}
|
|
)
|
|
|
|
assert invoices is not None
|
|
|
|
async def test_invoices_formatted(self, backend_client, jwt_token, test_company_id, test_company_name):
|
|
"""Verify invoices formatting works with real data"""
|
|
from app.bot.formatters import format_invoices_response
|
|
|
|
# Get real invoices
|
|
invoices = await backend_client.search_invoices(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token,
|
|
filters={"partner_type": "CLIENTI", "only_unpaid": True, "page_size": 10}
|
|
)
|
|
|
|
# Format response
|
|
formatted = format_invoices_response(
|
|
invoices=invoices.get("items", []) if isinstance(invoices, dict) else invoices,
|
|
company_name=test_company_name,
|
|
partner_type="CLIENTI"
|
|
)
|
|
|
|
assert isinstance(formatted, str)
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestCompanySelectionFlowReal:
|
|
"""Tests for company selection flow with real backend"""
|
|
|
|
async def test_get_all_companies(self, backend_client, jwt_token):
|
|
"""Verify we can get all user companies"""
|
|
companies = await backend_client.get_user_companies(jwt_token=jwt_token)
|
|
|
|
assert companies is not None
|
|
assert isinstance(companies, list)
|
|
assert len(companies) > 0
|
|
|
|
async def test_search_companies_by_name(self, jwt_token, test_companies):
|
|
"""Verify company search works"""
|
|
from app.bot.helpers import search_companies_by_name
|
|
|
|
# Use first company name as search term
|
|
first_company_name = test_companies[0].get("nume_firma", "")
|
|
search_term = first_company_name[:4] if len(first_company_name) >= 4 else first_company_name
|
|
|
|
results = await search_companies_by_name(search_term, jwt_token)
|
|
|
|
assert results is not None
|
|
assert isinstance(results, list)
|
|
|
|
async def test_create_company_keyboard(self, test_companies):
|
|
"""Verify company selection keyboard creation"""
|
|
from app.bot.helpers import create_company_selection_keyboard
|
|
|
|
keyboard = create_company_selection_keyboard(test_companies, max_buttons=5)
|
|
|
|
assert keyboard is not None
|
|
assert hasattr(keyboard, "inline_keyboard")
|
|
assert len(keyboard.inline_keyboard) > 0
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestCacheFlowReal:
|
|
"""Tests for cache behavior with real backend"""
|
|
|
|
async def test_cache_stats_available(self, backend_client, jwt_token):
|
|
"""Verify cache stats endpoint works"""
|
|
try:
|
|
stats = await backend_client.get_cache_stats(jwt_token=jwt_token)
|
|
assert stats is not None
|
|
except Exception:
|
|
# Cache stats may not be implemented
|
|
pytest.skip("Cache stats endpoint not available")
|
|
|
|
async def test_repeated_dashboard_requests(self, backend_client, jwt_token, test_company_id):
|
|
"""Verify repeated requests are handled (cache should help)"""
|
|
import time
|
|
|
|
# First request
|
|
start1 = time.time()
|
|
data1 = await backend_client.get_dashboard_data(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
time1 = time.time() - start1
|
|
|
|
# Second request
|
|
start2 = time.time()
|
|
data2 = await backend_client.get_dashboard_data(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
time2 = time.time() - start2
|
|
|
|
# Both should succeed
|
|
assert data1 is not None
|
|
assert data2 is not None
|
|
|
|
print(f"First request: {time1:.3f}s, Second request: {time2:.3f}s")
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestTrialBalanceFlowReal:
|
|
"""Tests for trial balance flow with real backend"""
|
|
|
|
async def test_get_trial_balance(self, backend_client, jwt_token, test_company_id):
|
|
"""Verify we can get trial balance data"""
|
|
try:
|
|
data = await backend_client.get_trial_balance(
|
|
company_id=test_company_id,
|
|
jwt_token=jwt_token
|
|
)
|
|
assert data is not None
|
|
except AttributeError:
|
|
# Method may not exist on client
|
|
pytest.skip("Trial balance method not available on client")
|