""" REAL Integration Tests for Phase 2: Helper Functions ⚠️ MANUAL INTEGRATION TEST - Not run by default in CI/CD This test suite uses REAL DATA from: - SQLite database (telegram_bot.db) - Backend API (localhost:8001) - Actual JWT tokens - Real user sessions REQUIREMENTS: - Backend API running on localhost:8001 - SQLite database with at least one linked user - NO MOCKS - Only real integration testing! USAGE: # Run as script python tests/test_helpers_real.py # Run via pytest (requires -m integration) pytest tests/test_helpers_real.py -m integration NOTE: This test is marked as @pytest.mark.integration and skipped by default. """ import pytest import asyncio import logging import aiosqlite from typing import Optional, Dict, Any from app.bot.helpers import ( search_companies_by_name, create_company_selection_keyboard, format_company_context_footer ) from app.api.client import get_backend_client from app.db.database import DB_PATH logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def get_real_user_data() -> Optional[Dict[str, Any]]: """ Get REAL linked user data from database. Returns: Dict with telegram_user_id, oracle_username, jwt_token, etc. None if no linked user found """ try: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT telegram_user_id, oracle_username, jwt_token, jwt_refresh_token, linked_at FROM telegram_users WHERE oracle_username IS NOT NULL AND jwt_token IS NOT NULL ORDER BY linked_at DESC LIMIT 1 """) row = await cursor.fetchone() if row: return dict(row) logger.warning("No linked user found in database!") return None except Exception as e: logger.error(f"Failed to get real user data: {e}") return None @pytest.mark.integration async def test_real_search_companies(): """ TEST 1: Search companies with REAL backend API Uses actual JWT token and backend at localhost:8001 """ print("\n" + "="*80) print("TEST 1: REAL search_companies_by_name()") print("="*80) # Get real user user_data = await get_real_user_data() if not user_data: print("❌ SKIP: No linked user found in database") return False jwt_token = user_data['jwt_token'] username = user_data['oracle_username'] print(f"✅ Using REAL user: {username}") print(f"✅ JWT token: {jwt_token[:20]}...") # Test 1: Get ALL companies print("\n--- Test 1a: Get all companies ---") try: client = get_backend_client() async with client: all_companies = await client.get_user_companies(jwt_token=jwt_token) print(f"✅ Found {len(all_companies)} companies total") if all_companies: print("\nFirst 3 companies:") for i, comp in enumerate(all_companies[:3], 1): print(f" {i}. {comp.get('nume_firma')} (ID: {comp.get('id')}, CUI: {comp.get('cui', 'N/A')})") except Exception as e: print(f"❌ FAILED to get companies: {e}") return False # Test 2: Search with partial name (use first company name) if all_companies: print("\n--- Test 1b: Search companies by partial name ---") first_company_name = all_companies[0].get('nume_firma', '') # Take first 4 characters as search term search_term = first_company_name[:4].upper() if len(first_company_name) >= 4 else first_company_name print(f"Search term: '{search_term}' (from '{first_company_name}')") try: results = await search_companies_by_name(search_term, jwt_token) print(f"✅ Found {len(results)} matching companies") # Verify original company is in results found_original = any( comp.get('id') == all_companies[0].get('id') for comp in results ) if found_original: print("✅ Original company found in search results") else: print("⚠️ WARNING: Original company NOT in search results") # Test case-insensitive search print("\n--- Test 1c: Case-insensitive search ---") search_lower = search_term.lower() print(f"Search term (lowercase): '{search_lower}'") results_lower = await search_companies_by_name(search_lower, jwt_token) print(f"✅ Found {len(results_lower)} matching companies (lowercase)") if len(results) == len(results_lower): print("✅ Case-insensitive search works correctly") else: print(f"❌ FAILED: Different results for uppercase ({len(results)}) vs lowercase ({len(results_lower)})") return False except Exception as e: print(f"❌ FAILED search: {e}") return False print("\n✅ TEST 1 PASSED: Real search works correctly!\n") return True @pytest.mark.integration async def test_real_keyboard_creation(): """ TEST 2: Create company selection keyboard with REAL data """ print("\n" + "="*80) print("TEST 2: REAL create_company_selection_keyboard()") print("="*80) # Get real user user_data = await get_real_user_data() if not user_data: print("❌ SKIP: No linked user found in database") return False jwt_token = user_data['jwt_token'] # Get real companies try: client = get_backend_client() async with client: companies = await client.get_user_companies(jwt_token=jwt_token) print(f"✅ Got {len(companies)} real companies") # Create keyboard keyboard = create_company_selection_keyboard(companies, max_buttons=5) print(f"✅ Created keyboard with {len(keyboard.inline_keyboard)} buttons") # Verify structure expected_buttons = min(len(companies), 5) has_overflow = len(companies) > 5 total_buttons = expected_buttons + (1 if has_overflow else 0) if len(keyboard.inline_keyboard) == total_buttons: print(f"✅ Keyboard structure correct (expected {total_buttons} rows)") else: print(f"❌ FAILED: Expected {total_buttons} rows, got {len(keyboard.inline_keyboard)}") return False # Verify button content print("\nFirst 3 buttons:") for i, row in enumerate(keyboard.inline_keyboard[:3], 1): button = row[0] print(f" {i}. Text: {button.text}") print(f" Callback: {button.callback_data}") # Verify callback data format first_button = keyboard.inline_keyboard[0][0] if first_button.callback_data.startswith("select_company:"): print("\n✅ Callback data format correct") else: print(f"\n❌ FAILED: Invalid callback format: {first_button.callback_data}") return False print("\n✅ TEST 2 PASSED: Real keyboard creation works!\n") return True except Exception as e: print(f"❌ FAILED: {e}") return False @pytest.mark.integration async def test_real_footer_format(): """ TEST 3: Format company context footer with REAL company name """ print("\n" + "="*80) print("TEST 3: REAL format_company_context_footer()") print("="*80) # Get real user user_data = await get_real_user_data() if not user_data: print("❌ SKIP: No linked user found in database") return False jwt_token = user_data['jwt_token'] # Get real company try: client = get_backend_client() async with client: companies = await client.get_user_companies(jwt_token=jwt_token) if not companies: print("❌ SKIP: No companies found") return False company_name = companies[0].get('nume_firma') print(f"✅ Using REAL company: {company_name}") # Format footer footer = format_company_context_footer(company_name) print(f"\nFormatted footer:") print(repr(footer)) print("\nRendered:") print(footer) # Verify structure checks = { "Has separator": "━━━━━━━━━━━━━━" in footer, "Has emoji": "📊" in footer, "Has company name": company_name in footer, "Has command link": "/selectcompany" in footer, "Starts with newlines": footer.startswith("\n\n"), "Is discrete (< 150 chars)": len(footer) < 150 } print("\nVerification checks:") all_passed = True for check_name, passed in checks.items(): status = "✅" if passed else "❌" print(f" {status} {check_name}") if not passed: all_passed = False if all_passed: print("\n✅ TEST 3 PASSED: Real footer format correct!\n") return True else: print("\n❌ TEST 3 FAILED: Some checks did not pass\n") return False except Exception as e: print(f"❌ FAILED: {e}") return False @pytest.mark.integration async def test_real_dashboard_data(): """ TEST 4: BONUS - Test that we can get real dashboard data This validates that the backend API is working correctly and we can use it for the formatters in PHASE 3 """ print("\n" + "="*80) print("TEST 4: BONUS - Get REAL dashboard data from backend") print("="*80) # Get real user user_data = await get_real_user_data() if not user_data: print("❌ SKIP: No linked user found in database") return False jwt_token = user_data['jwt_token'] try: # Get companies client = get_backend_client() async with client: companies = await client.get_user_companies(jwt_token=jwt_token) if not companies: print("❌ SKIP: No companies found") return False company_id = companies[0].get('id') company_name = companies[0].get('nume_firma') print(f"✅ Testing with company: {company_name} (ID: {company_id})") # Get dashboard data async with get_backend_client() as client: dashboard_data = await client.get_dashboard_data( company_id=company_id, jwt_token=jwt_token ) if dashboard_data: print(f"✅ Got REAL dashboard data!") print(f"\nDashboard keys: {list(dashboard_data.keys())}") # Show some data (without exposing sensitive info) sample_keys = ['sold_total', 'facturi_emise', 'facturi_platite', 'facturi_neplatite'] print("\nSample data:") for key in sample_keys: if key in dashboard_data: value = dashboard_data[key] print(f" {key}: {value}") print("\n✅ TEST 4 PASSED: Real dashboard data accessible!\n") return True else: print("❌ FAILED: No dashboard data returned") return False except Exception as e: print(f"❌ FAILED: {e}") import traceback traceback.print_exc() return False async def main(): """ Run all REAL integration tests """ print("\n" + "="*80) print(" FAZA 2 - REAL INTEGRATION TESTS") print(" Testing with ACTUAL data from database and backend API") print("="*80) # Check prerequisites print("\n📋 Prerequisites:") user_data = await get_real_user_data() if not user_data: print("❌ No linked user found in database") print(" Please link a user first using /start in Telegram") return print(f"✅ Linked user: {user_data['oracle_username']}") print(f"✅ Telegram ID: {user_data['telegram_user_id']}") print(f"✅ Database: {DB_PATH}") print(f"✅ Backend API: http://localhost:8001") # Run tests results = [] results.append(("Search Companies", await test_real_search_companies())) results.append(("Keyboard Creation", await test_real_keyboard_creation())) results.append(("Footer Format", await test_real_footer_format())) results.append(("Dashboard Data (Bonus)", await test_real_dashboard_data())) # Summary print("\n" + "="*80) print(" TEST SUMMARY") print("="*80) for test_name, passed in results: status = "✅ PASSED" if passed else "❌ FAILED" print(f"{status}: {test_name}") total = len(results) passed = sum(1 for _, p in results if p) print(f"\nTotal: {passed}/{total} tests passed") if passed == total: print("\n🎉 ALL TESTS PASSED! FAZA 2 Implementation is CORRECT!") else: print(f"\n⚠️ {total - passed} test(s) failed. Please review.") if __name__ == "__main__": asyncio.run(main())