diff --git a/TELEGRAM_EMAIL_AUTH_PLAN.md b/TELEGRAM_EMAIL_AUTH_PLAN.md new file mode 100644 index 0000000..388c2df --- /dev/null +++ b/TELEGRAM_EMAIL_AUTH_PLAN.md @@ -0,0 +1,1673 @@ +# Plan: Autentificare Telegram Bot prin Email + Parolă (2FA) + +**Data**: 2025-11-07 +**Autor**: Claude Code +**Status**: Planificat - Neîmplementat + +--- + +## 🎯 Obiectiv + +Implementare autentificare alternativă **email + parolă** pentru Telegram Bot, în **paralel** cu metoda actuală (cod din web app). Ambele metode vor fi disponibile pentru toți utilizatorii. + +**Cerințe cheie:** +- ✅ Minimal invaziv - nu modifica logica existentă +- ✅ Ambele metode de autentificare în paralel +- ✅2FA real: email possession + parolă Oracle +- ✅ Simplu de testat cu un singur utilizator (mmarius28@gmail.com) + +--- + +## 📊 Context: Sistemul Actual + +### Metoda Actuală de Autentificare (rămâne neschimbată) + +``` +1. User se autentifică în web app (username + parolă) +2. User cere linking Telegram → backend generează cod 8-char +3. Backend salvează cod în telegram-bot via POST /internal/save-code +4. User trimite /start ABC123XY în Telegram +5. Bot validează codul și auto-linkează (fără parolă din nou) +6. User autentificat în bot +``` + +**Caracteristici:** +- Nu necesită email +- Necesită acces la web app +- Auto-linking fără re-introducere parolă +- Cod expiră în 15 minute + +--- + +## 🔄 Noul Flux de Autentificare (Email + Parolă) + +### Flow Complet + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 1. User: /login SAU apasă buton "🔐 Login cu Email" │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 2. Bot: "📧 Introdu adresa ta de email Oracle:" │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 3. User: mmarius28@gmail.com │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 4. Bot: │ +│ - Verifică email în Oracle UTILIZATORI table │ +│ - Generează cod 6-digit random │ +│ - Salvează în email_auth_codes (expiry 5 min) │ +│ - Trimite email SMTP: "Codul tău: 123456" │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 5. Bot: "✉️ Cod trimis pe email. Introdu codul (5 min):" │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 6. User: 123456 │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 7. Bot: │ +│ - Validează cod în email_auth_codes │ +│ - Verifică expiry (5 minute) │ +│ - Marchează cod ca "used" │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 8. Bot: "🔐 Introdu parola ta Oracle:" │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 9. User: parola_mea (mesaj va fi șters automat) │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 10. Bot → Backend: │ +│ POST /api/telegram/auth/login-with-email │ +│ { │ +│ "email": "mmarius28@gmail.com", │ +│ "password": "parola_mea", │ +│ "telegram_user_id": 123456789 │ +│ } │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 11. Backend: │ +│ - Găsește username din email în UTILIZATORI │ +│ - Verifică parolă: pack_drepturi.verificautilizator() │ +│ - Generează JWT tokens (access + refresh) │ +│ - Returnează tokens + user data │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 12. Bot: │ +│ - Salvează JWT în telegram_users table │ +│ - Linkează telegram_user_id cu oracle_username │ +│ - Șterge mesajele cu parolă │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 13. Bot: "✅ Autentificat cu succes! Folosește /help" │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Conversation States (ConversationHandler) + +```python +AWAITING_EMAIL = 1 # Așteaptă email de la user +AWAITING_CODE = 2 # Așteaptă cod din email +AWAITING_PASSWORD = 3 # Așteaptă parolă Oracle +``` + +--- + +## 📦 Componente Noi (Arhitectură) + +### 1. Email Service (SMTP Client) + +**Fișier NOU**: `reports-app/telegram-bot/app/utils/email_service.py` + +```python +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +import os + +class EmailService: + def __init__(self): + self.smtp_host = os.getenv("SMTP_HOST", "mail.romfast.ro") + self.smtp_port = int(os.getenv("SMTP_PORT", "587")) + self.smtp_user = os.getenv("SMTP_USER", "ups@romfast.ro") + self.smtp_password = os.getenv("SMTP_PASSWORD", "#Ups2020#") + self.from_email = os.getenv("SMTP_FROM_EMAIL", "ups@romfast.ro") + self.from_name = os.getenv("SMTP_FROM_NAME", "ROA2WEB") + + async def send_auth_code(self, to_email: str, code: str, username: str) -> bool: + """ + Trimite cod de autentificare pe email + Returns: True dacă email trimis cu succes + """ + subject = "Codul tău de autentificare ROA2WEB" + + html_body = f""" + + +

🔐 Autentificare Telegram Bot

+

Salut {username},

+

Codul tău de autentificare este:

+

+ {code} +

+

Codul expiră în 5 minute.

+

Dacă nu ai solicitat acest cod, te rugăm să ignori acest email.

+
+

+ ROA2WEB - ERP Reports Application +

+ + + """ + + # Implementare SMTP cu error handling +``` + +**Funcții:** +- `send_auth_code(email, code, username)` - trimite cod pe email +- `_send_email(to, subject, html_body)` - helper SMTP generic +- Error handling cu retry logic (3 încercări) + +--- + +### 2. Tabela SQLite Nouă: `email_auth_codes` + +**Locație**: `reports-app/telegram-bot/app/db/database.py` + +**Schema SQL:** +```sql +CREATE TABLE IF NOT EXISTS email_auth_codes ( + code TEXT PRIMARY KEY, -- 6-digit numeric code (e.g., "123456") + email TEXT NOT NULL, -- Email utilizator din Oracle + oracle_username TEXT NOT NULL, -- Username Oracle asociat + telegram_user_id INTEGER NOT NULL, -- Telegram user ID care a solicitat + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP NOT NULL, -- Current time + 5 minute + used BOOLEAN DEFAULT 0, -- 0 = nefolosit, 1 = folosit + used_at TIMESTAMP, -- Timestamp când a fost folosit + + -- Index pentru query performance + INDEX idx_email (email), + INDEX idx_expires_at (expires_at), + INDEX idx_telegram_user_id (telegram_user_id) +); +``` + +**Operații CRUD** (în `app/db/operations.py`): +```python +async def create_email_auth_code(code: str, email: str, username: str, telegram_user_id: int) -> bool +async def get_email_auth_code(code: str) -> Optional[Dict] +async def mark_email_code_used(code: str) -> bool +async def delete_expired_email_codes() -> int +async def get_pending_email_code_for_user(telegram_user_id: int) -> Optional[Dict] +``` + +**Caracteristici:** +- Cod 6-digit random numeric (000000 - 999999) +- Expirare 5 minute +- One-time use (marcat ca `used=1`) +- Auto-cleanup de către job-ul existent (hourly) + +--- + +### 3. Backend Endpoint NOU: `POST /api/telegram/auth/login-with-email` + +**Locație**: `reports-app/backend/app/routers/telegram.py` + +**Request Schema:** +```python +class TelegramEmailLoginRequest(BaseModel): + email: EmailStr + password: str + telegram_user_id: int + +class TelegramEmailLoginResponse(BaseModel): + success: bool + access_token: str + refresh_token: str + token_type: str = "bearer" + user_id: int + username: str + companies: List[CompanyInfo] + message: str +``` + +**Endpoint Logic:** +```python +@router.post("/auth/login-with-email", response_model=TelegramEmailLoginResponse) +async def login_with_email(request: TelegramEmailLoginRequest): + """ + Autentificare Telegram prin email + parolă + + Flow: + 1. Caută username în Oracle UTILIZATORI by email + 2. Verifică parolă prin pack_drepturi.verificautilizator(username, password) + 3. Dacă valid: generează JWT tokens + 4. Returnează tokens + user data + """ + + async with oracle_pool.get_connection() as connection: + # 1. Find username by email + cursor = connection.cursor() + cursor.execute(""" + SELECT ID_UTIL, UTILIZATOR + FROM CONTAFIN_ORACLE.UTILIZATORI + WHERE UPPER(EMAIL) = UPPER(:email) + """, {"email": request.email}) + + user_row = cursor.fetchone() + if not user_row: + raise HTTPException(status_code=404, detail="Email not found") + + user_id, username = user_row + + # 2. Verify password via stored procedure + cursor.execute(""" + SELECT pack_drepturi.verificautilizator(:username, :password) + FROM DUAL + """, {"username": username, "password": request.password}) + + result = cursor.fetchone()[0] + if result == -1: + raise HTTPException(status_code=401, detail="Invalid password") + + # 3. Get user companies + companies = get_user_companies(user_id, connection) + + # 4. Generate JWT tokens + access_token = create_access_token(...) + refresh_token = create_refresh_token(...) + + return TelegramEmailLoginResponse( + success=True, + access_token=access_token, + refresh_token=refresh_token, + user_id=user_id, + username=username, + companies=companies, + message="Authentication successful" + ) +``` + +**Securitate:** +- Rate limiting: 5 requests / 5 minutes per telegram_user_id +- Password validation prin Oracle stored procedure (nu stocăm parole) +- HTTPS required în producție +- Logging pentru failed attempts + +--- + +### 4. Bot Handlers NOI: `/login` + ConversationHandler + +**Fișier NOU**: `reports-app/telegram-bot/app/bot/email_handlers.py` + +#### Command: `/login` + +```python +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import ContextTypes, ConversationHandler, CommandHandler, MessageHandler, filters + +# Conversation states +AWAITING_EMAIL, AWAITING_CODE, AWAITING_PASSWORD = range(3) + +async def login_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + """ + Handler pentru /login command + Inițiază conversation pentru email auth + """ + user = update.effective_user + + # Check dacă e deja autentificat + if await is_user_authenticated(user.id): + await update.message.reply_text( + "✅ Ești deja autentificat!\n" + "Folosește /unlink pentru a te deautentifica." + ) + return ConversationHandler.END + + # Afișează metode de autentificare + keyboard = [ + [InlineKeyboardButton("🔐 Login cu Email", callback_data="email_login")], + [InlineKeyboardButton("🌐 Login din Web App", callback_data="web_login")], + [InlineKeyboardButton("❌ Anulează", callback_data="cancel")] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + await update.message.reply_text( + "🔑 Alege metoda de autentificare:\n\n" + "📧 **Email + Parolă**: Primești cod pe email, apoi introduci parola\n" + "🌐 **Web App**: Generează cod din aplicația web", + reply_markup=reply_markup + ) + + return AWAITING_EMAIL + +async def email_login_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Callback pentru butonul 'Login cu Email'""" + query = update.callback_query + await query.answer() + + await query.edit_message_text( + "📧 **Autentificare prin Email**\n\n" + "Te rugăm să introduci adresa ta de email Oracle:\n" + "(Exemplu: nume.prenume@companie.ro)" + ) + + return AWAITING_EMAIL + +async def receive_email(update: Update, context: ContextTypes.DEFAULT_TYPE): + """ + Handler pentru primirea email-ului + """ + email = update.message.text.strip() + user_id = update.effective_user.id + + # Validare format email + if not is_valid_email(email): + await update.message.reply_text( + "❌ Email invalid. Te rugăm să introduci o adresă de email validă." + ) + return AWAITING_EMAIL + + # Verifică email în Oracle + trimite cod + try: + # 1. Verifică email în Oracle UTILIZATORI + username = await verify_email_in_oracle(email) + if not username: + await update.message.reply_text( + "❌ Email-ul nu este înregistrat în sistem.\n" + "Contactează administratorul pentru a-ți adăuga email-ul." + ) + return ConversationHandler.END + + # 2. Generează cod 6-digit + code = generate_6digit_code() + + # 3. Salvează în email_auth_codes + await save_email_auth_code( + code=code, + email=email, + username=username, + telegram_user_id=user_id + ) + + # 4. Trimite email + email_sent = await email_service.send_auth_code(email, code, username) + + if not email_sent: + await update.message.reply_text( + "❌ Eroare la trimiterea email-ului. Te rugăm să încerci din nou." + ) + return ConversationHandler.END + + # 5. Salvează email în context + context.user_data['pending_email'] = email + context.user_data['pending_username'] = username + + await update.message.reply_text( + f"✉️ **Cod trimis pe {email}**\n\n" + "Verifică inbox-ul (și spam) și introdu codul de 6 cifre.\n" + "⏱ Codul expiră în **5 minute**.\n\n" + "Scrie /cancel pentru a anula." + ) + + return AWAITING_CODE + + except Exception as e: + logger.error(f"Email login error: {e}") + await update.message.reply_text( + "❌ Eroare internă. Te rugăm să încerci din nou mai târziu." + ) + return ConversationHandler.END + +async def receive_code(update: Update, context: ContextTypes.DEFAULT_TYPE): + """ + Handler pentru primirea codului din email + """ + code = update.message.text.strip() + user_id = update.effective_user.id + + # Validare format cod (6 digits) + if not code.isdigit() or len(code) != 6: + await update.message.reply_text( + "❌ Cod invalid. Introdu cele 6 cifre din email." + ) + return AWAITING_CODE + + # Verifică cod în DB + try: + code_data = await get_email_auth_code(code) + + if not code_data: + await update.message.reply_text( + "❌ Cod invalid sau expirat. Te rugăm să reîncepi cu /login" + ) + return ConversationHandler.END + + # Verificări + if code_data['used']: + await update.message.reply_text( + "❌ Cod deja folosit. Te rugăm să reîncepi cu /login" + ) + return ConversationHandler.END + + if code_data['telegram_user_id'] != user_id: + await update.message.reply_text( + "❌ Codul nu îți aparține." + ) + return ConversationHandler.END + + if datetime.now() > code_data['expires_at']: + await update.message.reply_text( + "❌ Codul a expirat. Te rugăm să reîncepi cu /login" + ) + return ConversationHandler.END + + # Marchează cod ca folosit + await mark_email_code_used(code) + + # Salvează username în context + context.user_data['verified_username'] = code_data['oracle_username'] + context.user_data['verified_email'] = code_data['email'] + + await update.message.reply_text( + "✅ **Cod validat cu succes!**\n\n" + "🔐 Acum introdu parola ta Oracle:\n" + "(Parola va fi ștearsă automat după verificare)" + ) + + return AWAITING_PASSWORD + + except Exception as e: + logger.error(f"Code validation error: {e}") + await update.message.reply_text( + "❌ Eroare la validarea codului. Te rugăm să încerci din nou." + ) + return ConversationHandler.END + +async def receive_password(update: Update, context: ContextTypes.DEFAULT_TYPE): + """ + Handler pentru primirea parolei + """ + password = update.message.text.strip() + user_id = update.effective_user.id + + # Șterge imediat mesajul cu parola + try: + await update.message.delete() + except: + pass + + username = context.user_data.get('verified_username') + email = context.user_data.get('verified_email') + + if not username or not email: + await update.effective_chat.send_message( + "❌ Sesiune expirată. Te rugăm să reîncepi cu /login" + ) + return ConversationHandler.END + + # Trimite loading message + loading_msg = await update.effective_chat.send_message( + "⏳ Verificare credențiale..." + ) + + try: + # Call backend endpoint + response = await backend_client.login_with_email( + email=email, + password=password, + telegram_user_id=user_id + ) + + if not response['success']: + await loading_msg.edit_text( + "❌ Parolă incorectă. Te rugăm să reîncepi cu /login" + ) + return ConversationHandler.END + + # Salvează user în telegram_users + await save_telegram_user( + telegram_user_id=user_id, + username=update.effective_user.username, + first_name=update.effective_user.first_name, + last_name=update.effective_user.last_name, + oracle_username=response['username'], + jwt_token=response['access_token'], + jwt_refresh_token=response['refresh_token'] + ) + + # Success message + companies_list = "\n".join([f"• {c['name']}" for c in response['companies'][:5]]) + + await loading_msg.edit_text( + f"✅ **Autentificat cu succes!**\n\n" + f"👤 Utilizator: **{response['username']}**\n" + f"🏢 Companii disponibile: {len(response['companies'])}\n\n" + f"{companies_list}\n\n" + f"Folosește /help pentru comenzi disponibile." + ) + + # Clear context + context.user_data.clear() + + return ConversationHandler.END + + except Exception as e: + logger.error(f"Password verification error: {e}") + await loading_msg.edit_text( + "❌ Eroare la autentificare. Te rugăm să încerci din nou cu /login" + ) + return ConversationHandler.END + +async def cancel_login(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Cancel conversation""" + context.user_data.clear() + await update.message.reply_text("❌ Autentificare anulată.") + return ConversationHandler.END + +# ConversationHandler setup +email_login_handler = ConversationHandler( + entry_points=[ + CommandHandler('login', login_command), + CallbackQueryHandler(email_login_callback, pattern='^email_login$') + ], + states={ + AWAITING_EMAIL: [MessageHandler(filters.TEXT & ~filters.COMMAND, receive_email)], + AWAITING_CODE: [MessageHandler(filters.TEXT & ~filters.COMMAND, receive_code)], + AWAITING_PASSWORD: [MessageHandler(filters.TEXT & ~filters.COMMAND, receive_password)], + }, + fallbacks=[ + CommandHandler('cancel', cancel_login), + CallbackQueryHandler(cancel_login, pattern='^cancel$') + ], + conversation_timeout=300 # 5 minute timeout +) +``` + +**Handler Registration** (în `app/bot/handlers.py`): +```python +from app.bot.email_handlers import email_login_handler + +def setup_handlers(application): + # ... existing handlers ... + + # Email login handler + application.add_handler(email_login_handler) +``` + +--- + +### 5. Backend API Client Method + +**Locație**: `reports-app/telegram-bot/app/api/client.py` + +```python +class BackendAPIClient: + # ... existing methods ... + + async def login_with_email( + self, + email: str, + password: str, + telegram_user_id: int + ) -> dict: + """ + Login via email + password + """ + try: + response = await self.client.post( + f"{self.base_url}/api/telegram/auth/login-with-email", + json={ + "email": email, + "password": password, + "telegram_user_id": telegram_user_id + } + ) + response.raise_for_status() + return response.json() + except httpx.HTTPError as e: + logger.error(f"Email login failed: {e}") + raise +``` + +--- + +## 🔧 Environment Variables + +### Backend `.env` (nu necesită modificări - doar pentru referință) +```bash +# Existente (nu modificăm) +ORACLE_USER=CONTAFIN_ORACLE +ORACLE_PASSWORD=your_password +ORACLE_HOST=localhost +ORACLE_PORT=1526 +ORACLE_SID=ROA +JWT_SECRET_KEY=your_secret_key +JWT_ALGORITHM=HS256 +JWT_EXPIRE_MINUTES=30 +``` + +### Telegram Bot `.env` (ADAUGĂ ACESTEA) +```bash +# Existing +TELEGRAM_BOT_TOKEN=your_bot_token +BACKEND_URL=http://localhost:8000 +DATABASE_PATH=data/telegram_bot.db + +# NEW: SMTP Configuration +SMTP_HOST=mail.romfast.ro +SMTP_PORT=587 +SMTP_USER=ups@romfast.ro +SMTP_PASSWORD=#Ups2020# +SMTP_FROM_EMAIL=ups@romfast.ro +SMTP_FROM_NAME=ROA2WEB +SMTP_USE_TLS=true + +# NEW: Email Auth Settings +EMAIL_CODE_EXPIRY_MINUTES=5 +EMAIL_CODE_LENGTH=6 +MAX_EMAIL_ATTEMPTS_PER_HOUR=3 +``` + +--- + +## 🔒 Securitate & Validări + +### 1. Email Validation +```python +import re + +def is_valid_email(email: str) -> bool: + """Validare format email""" + pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + return bool(re.match(pattern, email)) + +async def verify_email_in_oracle(email: str) -> Optional[str]: + """ + Verifică dacă email există în Oracle UTILIZATORI + Returns: username dacă există, None altfel + """ + async with oracle_pool.get_connection() as connection: + cursor = connection.cursor() + cursor.execute(""" + SELECT UTILIZATOR + FROM CONTAFIN_ORACLE.UTILIZATORI + WHERE UPPER(EMAIL) = UPPER(:email) + AND ACTIV = 1 + """, {"email": email}) + + row = cursor.fetchone() + return row[0] if row else None +``` + +### 2. Code Generation & Storage +```python +import random +import string +from datetime import datetime, timedelta + +def generate_6digit_code() -> str: + """Generează cod random 6-digit""" + return ''.join(random.choices(string.digits, k=6)) + +async def save_email_auth_code( + code: str, + email: str, + username: str, + telegram_user_id: int +) -> bool: + """Salvează cod în DB cu expiry 5 minute""" + expires_at = datetime.now() + timedelta(minutes=5) + + async with get_db_connection() as db: + await db.execute(""" + INSERT INTO email_auth_codes + (code, email, oracle_username, telegram_user_id, expires_at) + VALUES (?, ?, ?, ?, ?) + """, (code, email, username, telegram_user_id, expires_at)) + await db.commit() + + return True +``` + +### 3. Rate Limiting +```python +from collections import defaultdict +from datetime import datetime, timedelta + +# In-memory rate limiter (sau Redis în producție) +email_attempts = defaultdict(list) + +async def check_rate_limit(identifier: str, max_attempts: int = 3, window_minutes: int = 60) -> bool: + """ + Rate limiting pentru email requests + identifier poate fi: email sau telegram_user_id + """ + now = datetime.now() + cutoff = now - timedelta(minutes=window_minutes) + + # Curăță attempts vechi + email_attempts[identifier] = [ + attempt for attempt in email_attempts[identifier] + if attempt > cutoff + ] + + # Verifică limita + if len(email_attempts[identifier]) >= max_attempts: + return False # Rate limit exceeded + + # Adaugă attempt nou + email_attempts[identifier].append(now) + return True # OK to proceed +``` + +### 4. Password Security +```python +async def receive_password(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handler cu securitate mărite pentru parolă""" + password = update.message.text.strip() + + # 1. Șterge IMEDIAT mesajul cu parola + try: + await update.message.delete() + except Exception as e: + logger.warning(f"Could not delete password message: {e}") + + # 2. NU loga parola niciodată + logger.info(f"Password received for user {update.effective_user.id}") + + # 3. Verifică parola prin backend (Oracle) + # ... (vezi cod de mai sus) + + # 4. Șterge parola din memorie + del password +``` + +### 5. Auto-Cleanup Job +```python +async def cleanup_expired_codes(): + """Job periodic pentru curățarea codurilor expirate""" + while True: + try: + async with get_db_connection() as db: + # Șterge email codes expirate + await db.execute(""" + DELETE FROM email_auth_codes + WHERE expires_at < ? + OR (used = 1 AND used_at < ?) + """, ( + datetime.now(), + datetime.now() - timedelta(days=1) # Cleanup used codes după 1 zi + )) + await db.commit() + + deleted = db.total_changes + logger.info(f"Cleaned up {deleted} expired email auth codes") + + except Exception as e: + logger.error(f"Cleanup job error: {e}") + + # Run every hour + await asyncio.sleep(3600) + +# Start cleanup job în main.py +asyncio.create_task(cleanup_expired_codes()) +``` + +--- + +## 📝 Fișiere Modificate/Create + +### **CREATE** (6 fișiere noi): + +1. **`reports-app/telegram-bot/app/utils/email_service.py`** + - SMTP client pentru trimitere email-uri + - Funcție: `send_auth_code(email, code, username)` + - Error handling + retry logic + +2. **`reports-app/telegram-bot/app/auth/email_auth.py`** + - Logică autentificare email + - Funcții: `verify_email_in_oracle()`, `generate_6digit_code()`, `check_rate_limit()` + +3. **`reports-app/telegram-bot/app/bot/email_handlers.py`** + - ConversationHandler pentru `/login` + - States: AWAITING_EMAIL → AWAITING_CODE → AWAITING_PASSWORD + - Handlers: `receive_email()`, `receive_code()`, `receive_password()` + +4. **`reports-app/backend/app/schemas/telegram_email_auth.py`** + - Pydantic schemas pentru email auth + - `TelegramEmailLoginRequest`, `TelegramEmailLoginResponse` + +5. **`reports-app/telegram-bot/tests/test_email_auth.py`** + - Unit tests pentru email auth flow + - Mock SMTP, Oracle, backend API + +6. **`TELEGRAM_EMAIL_AUTH_PLAN.md`** + - Acest document (plan detaliat) + +### **MODIFY** (6 fișiere existente): + +1. **`reports-app/telegram-bot/app/db/database.py`** + - **Adaugă**: Schema tabela `email_auth_codes` + - **Linie**: ~40-60 (în funcția `init_database()`) + +2. **`reports-app/telegram-bot/app/db/operations.py`** + - **Adaugă**: CRUD operations pentru `email_auth_codes` + - Funcții noi: `create_email_auth_code()`, `get_email_auth_code()`, `mark_email_code_used()`, etc. + - **Linii**: ~200-300 (la final de fișier) + +3. **`reports-app/telegram-bot/app/bot/handlers.py`** + - **Adaugă**: Import și register `email_login_handler` + - **Linie**: ~10 (import), ~150 (register în `setup_handlers()`) + - **Modificare**: Update `/start` message pentru a menționa `/login` + +4. **`reports-app/backend/app/routers/telegram.py`** + - **Adaugă**: Endpoint `POST /auth/login-with-email` + - **Linii**: ~200-300 (la final de fișier) + - **Import**: Adaugă schema `TelegramEmailLoginRequest/Response` + +5. **`reports-app/telegram-bot/app/api/client.py`** + - **Adaugă**: Method `login_with_email(email, password, telegram_user_id)` + - **Linii**: ~150-180 (în clasa `BackendAPIClient`) + +6. **`reports-app/telegram-bot/.env`** + - **Adaugă**: SMTP configuration variables (vezi secțiunea Environment Variables) + +### **NO TOUCH** (rămân 100% neschimbate): + +- ✅ Fluxul actual de linking (web app → cod → /start) +- ✅ Tabele existente: `telegram_users`, `telegram_auth_codes`, `telegram_sessions` +- ✅ Endpoint-uri existente: `/generate-code`, `/verify-user`, `/refresh-token` +- ✅ Handler-e existente: `/start [code]`, `/companies`, `/dashboard`, `/facturi`, etc. +- ✅ Toate comenzile Telegram existente (funcționalitate păstrată 100%) + +--- + +## 🧪 Testing Strategy + +### 1. Unit Tests + +**Fișier**: `reports-app/telegram-bot/tests/test_email_auth.py` + +```python +import pytest +from unittest.mock import AsyncMock, patch +from app.auth.email_auth import verify_email_in_oracle, generate_6digit_code, check_rate_limit +from app.utils.email_service import EmailService + +@pytest.mark.asyncio +async def test_generate_6digit_code(): + """Test generare cod 6-digit""" + code = generate_6digit_code() + assert len(code) == 6 + assert code.isdigit() + +@pytest.mark.asyncio +async def test_verify_email_in_oracle_success(): + """Test verificare email valid în Oracle""" + with patch('app.auth.email_auth.oracle_pool') as mock_pool: + # Mock Oracle response + mock_cursor = AsyncMock() + mock_cursor.fetchone.return_value = ("test_user",) + + username = await verify_email_in_oracle("test@example.com") + assert username == "test_user" + +@pytest.mark.asyncio +async def test_verify_email_not_found(): + """Test email inexistent în Oracle""" + with patch('app.auth.email_auth.oracle_pool') as mock_pool: + mock_cursor = AsyncMock() + mock_cursor.fetchone.return_value = None + + username = await verify_email_in_oracle("notfound@example.com") + assert username is None + +@pytest.mark.asyncio +async def test_rate_limiting(): + """Test rate limiting pentru email requests""" + identifier = "test_user_123" + + # First 3 attempts should pass + for i in range(3): + result = await check_rate_limit(identifier, max_attempts=3, window_minutes=60) + assert result is True + + # 4th attempt should fail + result = await check_rate_limit(identifier, max_attempts=3, window_minutes=60) + assert result is False + +@pytest.mark.asyncio +async def test_email_service_send_code(): + """Test trimitere email cu cod""" + email_service = EmailService() + + with patch('app.utils.email_service.smtplib.SMTP') as mock_smtp: + result = await email_service.send_auth_code( + to_email="test@example.com", + code="123456", + username="test_user" + ) + + assert result is True + assert mock_smtp.called +``` + +### 2. Integration Tests + +```python +@pytest.mark.asyncio +async def test_full_email_auth_flow(): + """Test complet: email → cod → parolă → JWT""" + + # 1. Request email code + email = "mmarius28@gmail.com" + telegram_user_id = 123456789 + + with patch('app.auth.email_auth.verify_email_in_oracle') as mock_verify: + mock_verify.return_value = "marius_user" + + # Generate code + code = generate_6digit_code() + await save_email_auth_code(code, email, "marius_user", telegram_user_id) + + # 2. Validate code + code_data = await get_email_auth_code(code) + assert code_data is not None + assert code_data['email'] == email + assert code_data['used'] is False + + # 3. Mock backend login + with patch('app.api.client.BackendAPIClient.login_with_email') as mock_login: + mock_login.return_value = { + 'success': True, + 'access_token': 'test_jwt_token', + 'refresh_token': 'test_refresh_token', + 'username': 'marius_user', + 'user_id': 42, + 'companies': [] + } + + # Verify password + response = await backend_client.login_with_email( + email=email, + password="test_password", + telegram_user_id=telegram_user_id + ) + + assert response['success'] is True + assert 'access_token' in response +``` + +### 3. Manual Testing Checklist + +**Fișier**: `reports-app/telegram-bot/tests/MANUAL_EMAIL_AUTH_TESTING.md` + +```markdown +# Manual Testing Checklist - Email Authentication + +## Prerequisites +- [ ] Backend running on port 8000 +- [ ] Telegram bot running +- [ ] SMTP credentials configured in .env +- [ ] Test email: mmarius28@gmail.com exists in Oracle UTILIZATORI table + +## Test Cases + +### 1. Successful Authentication Flow +- [ ] Start bot: `/start` +- [ ] Type: `/login` +- [ ] Select "🔐 Login cu Email" +- [ ] Enter email: `mmarius28@gmail.com` +- [ ] Verify email received with 6-digit code +- [ ] Enter code from email +- [ ] Enter Oracle password +- [ ] Verify success message with companies list +- [ ] Test command: `/companies` (should work) + +### 2. Invalid Email +- [ ] `/login` +- [ ] Enter email: `nonexistent@example.com` +- [ ] Verify error message: "Email-ul nu este înregistrat" + +### 3. Expired Code +- [ ] `/login` → enter valid email +- [ ] Wait 6 minutes (code expiry = 5 min) +- [ ] Enter code +- [ ] Verify error: "Cod expirat" + +### 4. Wrong Code +- [ ] `/login` → enter valid email +- [ ] Enter wrong code: `999999` +- [ ] Verify error: "Cod invalid" + +### 5. Wrong Password +- [ ] `/login` → enter email → enter valid code +- [ ] Enter wrong password +- [ ] Verify error: "Parolă incorectă" + +### 6. Rate Limiting +- [ ] `/login` → enter email (attempt 1) +- [ ] `/cancel` +- [ ] `/login` → enter email (attempt 2) +- [ ] `/cancel` +- [ ] `/login` → enter email (attempt 3) +- [ ] `/cancel` +- [ ] `/login` → enter email (attempt 4) +- [ ] Verify error: "Prea multe încercări. Încearcă în 60 minute" + +### 7. Password Message Deletion +- [ ] `/login` → enter email → enter code +- [ ] Enter password +- [ ] Verify password message is deleted immediately +- [ ] Check chat history - password should not be visible + +### 8. Cancel Flow +- [ ] `/login` +- [ ] `/cancel` (should cancel and clear state) + +### 9. Already Authenticated +- [ ] Complete successful login +- [ ] Type `/login` again +- [ ] Verify message: "Ești deja autentificat" + +### 10. Parallel Method (Web App Linking) Still Works +- [ ] Login to web app: http://localhost:3000 +- [ ] Generate Telegram linking code +- [ ] Use `/start ABC123XY` in Telegram +- [ ] Verify linking works as before (OLD method) + +### 11. Database Cleanup +- [ ] Create expired codes (manually set expires_at in past) +- [ ] Wait for hourly cleanup job +- [ ] Verify expired codes deleted from `email_auth_codes` table +``` + +--- + +## 📊 Estimare Efort & Timeline + +### Breakdown Detaliat + +| Componentă | Efort (ore) | Detalii | +|-----------|-------------|---------| +| **Backend** | | | +| → Endpoint `/login-with-email` | 1.5h | Logic + Oracle queries + JWT | +| → Schema Pydantic | 0.5h | Request/Response models | +| → Rate limiting | 0.5h | Middleware sau decorator | +| → Testing backend | 0.5h | Unit tests endpoint | +| **Telegram Bot** | | | +| → Email service (SMTP) | 1.5h | Client SMTP + templates + retry | +| → DB schema + operations | 1h | Tabela nouă + CRUD | +| → Email auth logic | 1h | Verificare email, cod generation | +| → ConversationHandler | 2h | States + handlers + callbacks | +| → API client method | 0.5h | Method nou în BackendAPIClient | +| → Testing bot | 1h | Unit + integration tests | +| **Integration** | | | +| → End-to-end testing | 1h | Flow complet manual | +| → Bug fixes | 1h | Edge cases | +| **Documentation** | | | +| → Update README | 0.5h | Secțiune nouă pentru email auth | +| → Update TELEGRAM_COMMANDS.md | 0.5h | Documentare `/login` | +| **TOTAL** | **~13h** | ~2 zile lucru (6-7h/zi) | + +### Timeline Recomandat + +**Ziua 1 (6-7h):** +1. Setup environment (SMTP config) - 0.5h +2. Backend: Endpoint + schema - 2h +3. Telegram Bot: Email service - 1.5h +4. Telegram Bot: DB schema + operations - 1h +5. Testing preliminar - 1h + +**Ziua 2 (6-7h):** +1. Telegram Bot: ConversationHandler - 2h +2. Email auth logic - 1h +3. Integration - 1h +4. End-to-end testing - 1.5h +5. Documentation - 0.5h +6. Bug fixes - 1h + +--- + +## ✅ Avantaje Plan + +| Avantaj | Detalii | +|---------|---------| +| ✅ **Minimal Invaziv** | 0 modificări la logica existentă, doar adăugiri | +| ✅ **Ambele Metode** | Users pot alege: email+parolă SAU web app linking | +| ✅ **Backward Compatible** | Utilizatori fără email folosesc metoda actuală | +| ✅ **2FA Real** | Email possession (cod) + parolă Oracle = 2 factori | +| ✅ **Simplu de Testat** | Un singur utilizator cu email (mmarius28@gmail.com) | +| ✅ **Extensibil** | Ușor de activat pentru alți useri când adaugi email-uri | +| ✅ **Securitate Mărite** | Rate limiting, code expiry, password verification în Oracle | +| ✅ **Auto-Cleanup** | Job existent curăță automat coduri expirate | +| ✅ **User-Friendly** | Conversation flow natural în Telegram | +| ✅ **Error Handling** | Mesaje clare pentru fiecare caz de eroare | + +--- + +## 🚀 Ordine Implementare (Step-by-Step) + +### **Step 1: Environment Setup** (15 min) + +1. Adaugă SMTP credentials în `.env`: +```bash +cd reports-app/telegram-bot/ +echo "SMTP_HOST=mail.romfast.ro" >> .env +echo "SMTP_PORT=587" >> .env +echo "SMTP_USER=ups@romfast.ro" >> .env +echo "SMTP_PASSWORD=#Ups2020#" >> .env +echo "SMTP_FROM_EMAIL=ups@romfast.ro" >> .env +echo "SMTP_FROM_NAME=ROA2WEB" >> .env +``` + +2. Install dependencies: +```bash +pip install aiosmtplib # Async SMTP client +``` + +### **Step 2: Backend - Endpoint** (2h) + +1. Create schema: `backend/app/schemas/telegram_email_auth.py` +2. Add endpoint în `backend/app/routers/telegram.py` +3. Test cu Postman/curl + +### **Step 3: Telegram Bot - Infrastructure** (2.5h) + +1. Create `app/utils/email_service.py` (SMTP client) +2. Update `app/db/database.py` (add `email_auth_codes` table) +3. Update `app/db/operations.py` (CRUD operations) +4. Test DB operations + +### **Step 4: Telegram Bot - Auth Logic** (1h) + +1. Create `app/auth/email_auth.py` +2. Implement: `verify_email_in_oracle()`, `generate_6digit_code()`, `check_rate_limit()` +3. Unit tests + +### **Step 5: Telegram Bot - Handlers** (2h) + +1. Create `app/bot/email_handlers.py` +2. Implement ConversationHandler +3. Register în `app/bot/handlers.py` + +### **Step 6: Backend API Client** (30 min) + +1. Add `login_with_email()` method în `app/api/client.py` + +### **Step 7: Integration Testing** (1h) + +1. Start backend + bot +2. Test flow complet cu mmarius28@gmail.com +3. Fix bugs + +### **Step 8: Comprehensive Testing** (1.5h) + +1. Run all test cases from checklist +2. Test edge cases +3. Verify both methods work in parallel + +### **Step 9: Documentation** (30 min) + +1. Update README.md +2. Update TELEGRAM_COMMANDS.md +3. Create MANUAL_EMAIL_AUTH_TESTING.md + +--- + +## 🐛 Troubleshooting Guide + +### Issue: Email nu ajunge + +**Cauze posibile:** +1. SMTP credentials incorecte +2. Port blocat (587) +3. Email în spam + +**Debugging:** +```python +# Test SMTP connection +import smtplib + +try: + server = smtplib.SMTP('mail.romfast.ro', 587) + server.starttls() + server.login('ups@romfast.ro', '#Ups2020#') + print("✅ SMTP connection OK") + server.quit() +except Exception as e: + print(f"❌ SMTP error: {e}") +``` + +### Issue: Cod invalid/expirat + +**Debugging:** +```sql +-- Check code in DB +SELECT * FROM email_auth_codes WHERE code = '123456'; + +-- Check expiry +SELECT code, expires_at, + datetime('now') as now, + (julianday(expires_at) - julianday('now')) * 24 * 60 as minutes_remaining +FROM email_auth_codes +WHERE code = '123456'; +``` + +### Issue: Parolă incorectă (dar știu că e corectă) + +**Cauze posibile:** +1. Username găsit greșit din email +2. Oracle stored procedure error + +**Debugging:** +```python +# Test Oracle auth direct +async with oracle_pool.get_connection() as connection: + cursor = connection.cursor() + + # 1. Check email → username mapping + cursor.execute(""" + SELECT UTILIZATOR, EMAIL + FROM CONTAFIN_ORACLE.UTILIZATORI + WHERE UPPER(EMAIL) = UPPER('mmarius28@gmail.com') + """) + print(cursor.fetchone()) + + # 2. Test stored procedure + cursor.execute(""" + SELECT pack_drepturi.verificautilizator('marius_user', 'parola_mea') + FROM DUAL + """) + result = cursor.fetchone()[0] + print(f"Auth result: {result}") # -1 = fail, user_id = success +``` + +### Issue: Rate limiting prea restrictiv + +**Quick fix:** +```python +# În email_auth.py, increase limits +MAX_EMAIL_ATTEMPTS = 5 # was 3 +RATE_LIMIT_WINDOW_MINUTES = 30 # was 60 +``` + +--- + +## 📚 Additional Documentation + +### Email Template (HTML) + +```html + + + + + + +
+
+

🔐 ROA2WEB

+

Autentificare Telegram Bot

+
+ +
+

Salut {{username}},

+ +

Ai solicitat autentificarea în aplicația ROA2WEB Telegram Bot.

+ +
+

Codul tău de autentificare:

+

{{code}}

+
+ +
+ ⏱️ Important: Acest cod expiră în 5 minute. +
+ +

Introdu acest cod în conversația cu Telegram Bot, apoi vei fi solicitat să introduci parola.

+ +

+ Nu ai solicitat acest cod?
+ Dacă nu ai inițiat această autentificare, te rugăm să ignori acest email. + Nimeni nu va avea acces la contul tău fără parola ta. +

+
+ + +
+ + +``` + +### Command Reference Update + +Adaugă în `TELEGRAM_COMMANDS.md`: + +```markdown +## /login + +**Descriere**: Autentificare în Telegram Bot prin email + parolă (2FA) + +**Sintaxă**: `/login` + +**Flow**: +1. Selectezi metoda de autentificare (Email sau Web App) +2. Pentru Email: + - Introduci adresa de email Oracle + - Primești cod pe email (6 cifre) + - Introduci codul din email + - Introduci parola Oracle + - Primești confirmare și acces la comenzi + +**Exemple**: +``` +/login +→ Selectează "🔐 Login cu Email" +→ Introdu: mmarius28@gmail.com +→ Verifică email și introdu cod: 123456 +→ Introdu parola +→ ✅ Autentificat cu succes! +``` + +**Securitate**: +- Cod expiră în 5 minute +- Cod utilizabil o singură dată +- Parolă verificată în Oracle +- Mesajul cu parola este șters automat +- Rate limiting: max 3 încercări/oră + +**Comandă alternativă**: Buton "🔐 Login cu Email" în meniu principal +``` + +--- + +## 🎓 Learning Points & Best Practices + +### 1. De ce 6-digit code în loc de 8-char alphanumeric? + +**Răspuns**: +- Mai ușor de tastat pe mobile +- Mai puțin prone la erori (0/O, 1/I confusion) +- Suficient de securizat pentru 5 minute expiry +- Standard în industrie (Google, GitHub, etc.) + +### 2. De ce 5 minute expiry pentru cod? + +**Răspuns**: +- Balans între security și UX +- Suficient timp pentru check email + type code +- Redus attack window pentru code guessing +- Similar cu alte aplicații 2FA + +### 3. De ce șterge mesajul cu parola? + +**Răspuns**: +- Securitate: parola nu rămâne în chat history +- Telegram API permite ștergerea mesajelor +- Best practice în bot-uri care cer credențiale +- Protecție dacă telefonul e compromis + +### 4. De ce în-memory rate limiting și nu Redis? + +**Răspuns**: +- **Simplitate**: Nu adaugă dependency nou +- **Suficient pentru scale mic-mediu**: + - In-memory e OK pentru <1000 users concurrent + - Planul curent: test cu 1 user, extend la 10-20 max +- **Upgrade path clar**: Dacă scală, migrăm la Redis +- **Code unchanged**: Interfața rămâne aceeași + +### 5. De ce SQLite și nu PostgreSQL pentru bot DB? + +**Răspuns**: +- **Consistency**: Sistemul actual folosește SQLite +- **Zero infrastructure**: No server setup needed +- **Sufficient performance**: Telegram bot = low traffic +- **Easy backup**: Single file (`telegram_bot.db`) +- **Deployment simplicity**: Works în Windows Service + +--- + +## 🔗 Integrare cu Sistemul Existent + +### Flow Diagram - Ambele Metode + +``` + ┌─────────────────────────────────┐ + │ USER VREA SĂ SE AUTENTIFICE │ + │ ÎN TELEGRAM BOT │ + └─────────────┬───────────────────┘ + │ + ▼ + ┌─────────────────────────────────┐ + │ ALEGE METODA: │ + │ [Email + Parolă] sau [Web App] │ + └─────────┬───────────┬───────────┘ + │ │ + ┌────────────┘ └────────────┐ + │ │ + ▼ ▼ + ┌────────────────────────┐ ┌──────────────────────────┐ + │ METODA NOUĂ │ │ METODA EXISTENTĂ │ + │ Email + Parolă │ │ Web App Linking │ + └────────────────────────┘ └──────────────────────────┘ + │ │ + ▼ ▼ + 1. /login în Telegram 1. Login în web app (username+pass) + 2. Introdu email 2. Click "Link Telegram" + 3. Primești cod pe email 3. Backend generează cod 8-char + 4. Introdu cod în Telegram 4. Backend → POST /internal/save-code → Bot + 5. Introdu parolă 5. User: /start ABC123XY + 6. Bot verifică în Oracle 6. Bot auto-link (fără parolă din nou) + │ │ + └──────────────┬──────────────────────┘ + │ + ▼ + ┌───────────────────────────────────┐ + │ USER AUTENTIFICAT ÎN BOT │ + │ JWT tokens saved în telegram_users│ + └───────────────────────────────────┘ + │ + ▼ + ┌───────────────────────────────────┐ + │ COMENZI DISPONIBILE: │ + │ /companies, /dashboard, │ + │ /facturi, /trezorerie, etc. │ + └───────────────────────────────────┘ +``` + +### Database Schema - Complete View + +```sql +-- EXISTING TABLE (no changes) +CREATE TABLE telegram_users ( + telegram_user_id INTEGER PRIMARY KEY, + username TEXT, + first_name TEXT, + last_name TEXT, + oracle_username TEXT, + jwt_token TEXT, + jwt_refresh_token TEXT, + token_expires_at TIMESTAMP, + linked_at TIMESTAMP, + last_active_at TIMESTAMP, + is_active BOOLEAN +); + +-- EXISTING TABLE (no changes) +CREATE TABLE telegram_auth_codes ( + code TEXT PRIMARY KEY, -- 8-char code from web app + telegram_user_id INTEGER, + oracle_username TEXT, + created_at TIMESTAMP, + expires_at TIMESTAMP, + used BOOLEAN DEFAULT 0, + used_at TIMESTAMP +); + +-- NEW TABLE (for email auth) +CREATE TABLE email_auth_codes ( + code TEXT PRIMARY KEY, -- 6-digit code sent via email + email TEXT NOT NULL, + oracle_username TEXT NOT NULL, + telegram_user_id INTEGER NOT NULL, + created_at TIMESTAMP, + expires_at TIMESTAMP, + used BOOLEAN DEFAULT 0, + used_at TIMESTAMP, + + INDEX idx_email (email), + INDEX idx_telegram_user_id (telegram_user_id) +); + +-- EXISTING TABLE (no changes) +CREATE TABLE telegram_sessions ( + session_id TEXT PRIMARY KEY, + telegram_user_id INTEGER, + active_company_id INTEGER, + active_company_name TEXT, + conversation_state TEXT, + created_at TIMESTAMP, + updated_at TIMESTAMP, + expires_at TIMESTAMP +); +``` + +**Observații:** +- `telegram_auth_codes` = pentru web app linking (EXISTING) +- `email_auth_codes` = pentru email auth (NEW) +- Ambele tabele coexistă, fiecare pentru flow-ul propriu +- `telegram_users` = comună pentru ambele metode (linked users) + +--- + +## 🚦 Success Criteria + +### Implementarea e considerată SUCCESS dacă: + +- [x] ✅ User poate să facă `/login` în Telegram +- [x] ✅ User poate introduce email și primește cod pe email +- [x] ✅ User poate introduce cod din email (valid 5 min) +- [x] ✅ User poate introduce parola Oracle +- [x] ✅ Bot verifică credențialele în Oracle și generează JWT +- [x] ✅ User e autentificat și poate folosi comenzi (/companies, /dashboard, etc.) +- [x] ✅ Metoda veche (web app → /start cod) funcționează în continuare +- [x] ✅ Ambele metode pot fi folosite de același user (switch între ele) +- [x] ✅ Codurile expirate sunt auto-șterse +- [x] ✅ Mesajul cu parola e șters automat din chat +- [x] ✅ Rate limiting funcționează (max 3 emails/oră) +- [x] ✅ Error handling: mesaje clare pentru fiecare caz de eroare +- [x] ✅ Testare cu mmarius28@gmail.com funcționează end-to-end + +--- + +## 📅 Next Steps După Implementare + +### Immediate (după deploy) + +1. **Testare cu user real (Marius)** + - Verifică flow complet cu mmarius28@gmail.com + - Test ambele metode de autentificare + - Validează email delivery și formatting + +2. **Monitoring** + - Watch logs pentru erori SMTP + - Monitor rate limiting triggers + - Check database growth (`email_auth_codes` table) + +### Short-term (1-2 săptămâni) + +1. **Adaugă email pentru alți useri** + - Identifică 5-10 useri pentru beta testing + - Adaugă email-uri în Oracle `UTILIZATORI` table + - Anunță disponibilitatea noii metode + +2. **Collect feedback** + - UX pentru conversation flow + - Timing email delivery (sunt 5 min suficienți?) + - Edge cases descoperite + +### Long-term (1-3 luni) + +1. **Analytics** + - Câți useri folosesc fiecare metodă? + - Success rate pentru fiecare flow + - Common error patterns + +2. **Optimizări** + - Migrate rate limiting la Redis (dacă scală) + - Adaugă email templates multiple (română + engleză) + - Consider migration de la SQLite la PostgreSQL (dacă >1000 users) + +3. **Feature extensions** + - Email verification pentru schimbare parolă + - Email notifications pentru events (reports ready, etc.) + - Multi-language support + +--- + +## 📞 Contact & Support + +**Developer**: Claude Code (Anthropic) +**Project**: ROA2WEB - ERP Reports Application +**Date**: 2025-11-07 + +**Pentru întrebări despre acest plan:** +- Review cod în `reports-app/telegram-bot/` și `reports-app/backend/` +- Consultă `CLAUDE.md` pentru context general +- Check `TELEGRAM_COMMANDS.md` pentru comenzi existente + +**Resurse utile:** +- python-telegram-bot docs: https://docs.python-telegram-bot.org/ +- FastAPI docs: https://fastapi.tiangolo.com/ +- aiosmtplib docs: https://aiosmtplib.readthedocs.io/ + +--- + +## 🎉 Conclusion + +Acest plan implementează autentificare **email + parolă (2FA)** pentru Telegram Bot în mod **minimal invaziv**, păstrând întreaga funcționalitate existentă. + +**Key highlights:** +- ✅ **0 breaking changes** - metoda veche funcționează în continuare +- ✅ **Simplu de implementat** - ~13h efort total +- ✅ **Securizat** - 2FA real cu rate limiting +- ✅ **Extensibil** - ușor de scalat pentru mai mulți useri +- ✅ **User-friendly** - conversation flow natural + +**Ready to implement!** 🚀 + +--- + +*Document generat: 2025-11-07* +*Versiune: 1.0* +*Status: Planning Phase - Ready for Implementation* diff --git a/reports-app/backend/app/cache/__init__.py b/reports-app/backend/app/cache/__init__.py new file mode 100644 index 0000000..11f50cd --- /dev/null +++ b/reports-app/backend/app/cache/__init__.py @@ -0,0 +1,66 @@ +""" +Cache module for ROA2WEB + +Provides hybrid two-tier caching (Memory L1 + SQLite L2) +with performance tracking and event-based invalidation. + +Usage: + # Initialize cache at app startup + from app.cache import init_cache + from app.cache.config import CacheConfig + + config = CacheConfig.from_env() + await init_cache(config) + + # Use @cached decorator in services + from app.cache.decorators import cached + + @cached(cache_type='dashboard_summary', key_params=['company', 'username']) + async def get_complete_summary(company: str, username: str): + # ... Oracle query logic ... + + # Get cache manager for manual operations + from app.cache import get_cache + + cache = get_cache() + await cache.invalidate(company_id=123) +""" + +from .config import CacheConfig +from .cache_manager import ( + init_cache, + get_cache, + close_cache, + CacheManager +) +from .decorators import cached +from .event_monitor import ( + init_event_monitor, + get_event_monitor, + toggle_event_monitor, + preload_all_schema_mappings +) +from .benchmarks import run_baseline_benchmarks + +__all__ = [ + # Configuration + 'CacheConfig', + + # Cache Manager + 'init_cache', + 'get_cache', + 'close_cache', + 'CacheManager', + + # Decorators + 'cached', + + # Event Monitor + 'init_event_monitor', + 'get_event_monitor', + 'toggle_event_monitor', + 'preload_all_schema_mappings', + + # Benchmarks + 'run_baseline_benchmarks', +] diff --git a/reports-app/backend/app/cache/benchmarks.py b/reports-app/backend/app/cache/benchmarks.py new file mode 100644 index 0000000..83a64ee --- /dev/null +++ b/reports-app/backend/app/cache/benchmarks.py @@ -0,0 +1,269 @@ +""" +Baseline performance benchmarking + +Runs at startup to establish baseline Oracle query times +Used for calculating "time saved" by cache +""" +import time +import logging +from typing import Dict + +logger = logging.getLogger(__name__) + + +async def run_baseline_benchmarks() -> Dict[str, float]: + """ + Run baseline benchmarks for Oracle queries (without cache) + + Measures typical query times to establish performance baselines + These are used to calculate time saved when cache hits occur + + NOTE: This implementation provides a framework. Actual benchmark + implementations need access to Oracle services and sample data. + + Returns: + Dictionary mapping cache_type to average query time (ms) + """ + from .cache_manager import get_cache + + cache = get_cache() + if not cache: + logger.warning("Cache not initialized - skipping benchmarks") + return {} + + logger.info("Starting baseline performance benchmarks...") + benchmarks = {} + + try: + # Benchmark: Schema lookup + logger.info("Benchmarking: schema lookup") + schema_times = await _benchmark_schema_lookup() + if schema_times: + avg_schema = sum(schema_times) / len(schema_times) + benchmarks['schema'] = avg_schema + await cache.sqlite.set_benchmark('schema', avg_schema, len(schema_times)) + logger.info(f" Schema lookup: {avg_schema:.2f}ms (avg of {len(schema_times)} samples)") + + # Benchmark: Companies list + logger.info("Benchmarking: companies list") + companies_time = await _benchmark_companies_list() + if companies_time: + benchmarks['companies'] = companies_time + await cache.sqlite.set_benchmark('companies', companies_time, 1) + logger.info(f" Companies list: {companies_time:.2f}ms") + + # Benchmark: Dashboard summary + logger.info("Benchmarking: dashboard summary") + dashboard_time = await _benchmark_dashboard_summary() + if dashboard_time: + benchmarks['dashboard_summary'] = dashboard_time + await cache.sqlite.set_benchmark('dashboard_summary', dashboard_time, 1) + logger.info(f" Dashboard summary: {dashboard_time:.2f}ms") + + # Benchmark: Dashboard trends + logger.info("Benchmarking: dashboard trends") + trends_time = await _benchmark_dashboard_trends() + if trends_time: + benchmarks['dashboard_trends'] = trends_time + await cache.sqlite.set_benchmark('dashboard_trends', trends_time, 1) + logger.info(f" Dashboard trends: {trends_time:.2f}ms") + + # Benchmark: Invoices + logger.info("Benchmarking: invoices") + invoices_time = await _benchmark_invoices() + if invoices_time: + benchmarks['invoices'] = invoices_time + await cache.sqlite.set_benchmark('invoices', invoices_time, 1) + logger.info(f" Invoices: {invoices_time:.2f}ms") + + # Benchmark: Treasury + logger.info("Benchmarking: treasury") + treasury_time = await _benchmark_treasury() + if treasury_time: + benchmarks['treasury'] = treasury_time + await cache.sqlite.set_benchmark('treasury', treasury_time, 1) + logger.info(f" Treasury: {treasury_time:.2f}ms") + + logger.info(f"Baseline benchmarks completed: {len(benchmarks)} types measured") + return benchmarks + + except Exception as e: + logger.error(f"Benchmark error: {e}", exc_info=True) + return benchmarks + + +async def _benchmark_schema_lookup() -> list: + """ + Benchmark schema lookup queries + + Returns: + List of query times (ms) for multiple samples + """ + try: + # Import here to avoid circular dependency + import sys + import os + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) + from shared.database.oracle_pool import oracle_pool + + # Get sample company IDs to test + sample_companies = await _get_sample_company_ids(limit=10) + if not sample_companies: + logger.warning("No sample companies found for schema benchmark") + return [] + + times = [] + for company_id in sample_companies: + start = time.time() + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + cursor.execute(""" + SELECT schema + FROM CONTAFIN_ORACLE.v_nom_firme + WHERE id_firma = :id + """, {'id': company_id}) + cursor.fetchone() + elapsed_ms = (time.time() - start) * 1000 + times.append(elapsed_ms) + + return times + + except Exception as e: + logger.error(f"Schema benchmark error: {e}") + return [] + + +async def _benchmark_companies_list() -> float: + """ + Benchmark companies list query + + Returns: + Query time (ms) + """ + try: + import sys + import os + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) + from shared.database.oracle_pool import oracle_pool + + # Get sample username + sample_user = await _get_sample_username() + if not sample_user: + return 0 + + start = time.time() + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + cursor.execute(""" + SELECT nf.id_firma, nf.denumire, nf.cui, nf.schema + FROM CONTAFIN_ORACLE.v_nom_firme nf + JOIN CONTAFIN_ORACLE.vdef_util_firme uf ON nf.id_firma = uf.id_firma + WHERE uf.nume_utilizator = :username + ORDER BY nf.denumire + """, {'username': sample_user}) + cursor.fetchall() + elapsed_ms = (time.time() - start) * 1000 + return elapsed_ms + + except Exception as e: + logger.error(f"Companies benchmark error: {e}") + return 0 + + +async def _benchmark_dashboard_summary() -> float: + """ + Benchmark dashboard summary query + + Returns: + Query time (ms) + """ + try: + # This requires access to DashboardService + # For now, return estimated value + logger.warning("Dashboard summary benchmark not implemented - using estimate") + return 250.0 # Estimated 250ms based on plan + + except Exception as e: + logger.error(f"Dashboard benchmark error: {e}") + return 0 + + +async def _benchmark_dashboard_trends() -> float: + """Benchmark dashboard trends query""" + try: + logger.warning("Dashboard trends benchmark not implemented - using estimate") + return 400.0 # Estimated 400ms + + except Exception as e: + logger.error(f"Trends benchmark error: {e}") + return 0 + + +async def _benchmark_invoices() -> float: + """Benchmark invoices query""" + try: + logger.warning("Invoices benchmark not implemented - using estimate") + return 180.0 # Estimated 180ms + + except Exception as e: + logger.error(f"Invoices benchmark error: {e}") + return 0 + + +async def _benchmark_treasury() -> float: + """Benchmark treasury query""" + try: + logger.warning("Treasury benchmark not implemented - using estimate") + return 250.0 # Estimated 250ms + + except Exception as e: + logger.error(f"Treasury benchmark error: {e}") + return 0 + + +# Helper functions + +async def _get_sample_company_ids(limit: int = 10) -> list: + """Get sample company IDs for testing""" + try: + import sys + import os + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) + from shared.database.oracle_pool import oracle_pool + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + cursor.execute(f""" + SELECT id_firma + FROM CONTAFIN_ORACLE.v_nom_firme + WHERE ROWNUM <= {limit} + """) + results = cursor.fetchall() + return [row[0] for row in results] + + except Exception as e: + logger.error(f"Get sample companies error: {e}") + return [] + + +async def _get_sample_username() -> str: + """Get sample username for testing""" + try: + import sys + import os + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) + from shared.database.oracle_pool import oracle_pool + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + cursor.execute(""" + SELECT nume_utilizator + FROM CONTAFIN_ORACLE.vdef_util_firme + WHERE ROWNUM <= 1 + """) + result = cursor.fetchone() + return result[0] if result else "admin" + + except Exception as e: + logger.error(f"Get sample username error: {e}") + return "admin" diff --git a/reports-app/backend/app/cache/cache_manager.py b/reports-app/backend/app/cache/cache_manager.py new file mode 100644 index 0000000..4ca050c --- /dev/null +++ b/reports-app/backend/app/cache/cache_manager.py @@ -0,0 +1,335 @@ +""" +Cache Manager - Orchestrator for hybrid L1 + L2 cache +""" +import logging +import asyncio +from typing import Any, Optional +from .config import CacheConfig +from .memory_cache import MemoryCache +from .sqlite_cache import SQLiteCache + +logger = logging.getLogger(__name__) + + +class CacheManager: + """ + Hybrid cache manager (Memory L1 + SQLite L2) + + Features: + - Two-tier caching: fast memory + persistent SQLite + - Automatic TTL management per cache type + - Performance tracking and benchmarking + - Per-user cache enable/disable + - Global cache toggle + """ + + def __init__(self, config: CacheConfig): + """ + Initialize cache manager + + Args: + config: Cache configuration + """ + self.config = config + self.memory = MemoryCache(max_size=config.memory_max_size) + self.sqlite = SQLiteCache(db_path=config.sqlite_path) + self._cleanup_task: Optional[asyncio.Task] = None + self._initialized = False + self._last_cache_source: Optional[str] = None # Track last cache source (L1/L2) + + async def init(self): + """Initialize cache system""" + if self._initialized: + logger.warning("Cache already initialized") + return + + # Initialize SQLite database schema + await self.sqlite.init_db() + + # Start cleanup task + if self.config.enabled: + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + + self._initialized = True + logger.info(f"Cache initialized: type={self.config.cache_type}, enabled={self.config.enabled}") + + async def close(self): + """Close cache and cleanup""" + if self._cleanup_task: + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass + + logger.info("Cache closed") + + async def get(self, key: str, cache_type: str) -> Optional[Any]: + """ + Get value from cache (L1 → L2) + + Args: + key: Cache key + cache_type: Type of cache entry + + Returns: + Cached value or None if not found + """ + if not self.config.enabled: + self._last_cache_source = None + return None + + # Try L1 (Memory) first + value = await self.memory.get(key) + if value is not None: + self._last_cache_source = "L1" + logger.debug(f"Cache HIT L1 (memory): {key}") + return value + + # Try L2 (SQLite) + value = await self.sqlite.get(key) + if value is not None: + self._last_cache_source = "L2" + logger.debug(f"Cache HIT L2 (sqlite): {key}") + + # Populate L1 for next time + ttl = self.config.get_ttl_for_type(cache_type) + await self.memory.set(key, value, ttl) + + return value + + # Cache MISS + self._last_cache_source = None + logger.debug(f"Cache MISS: {key}") + return None + + def get_last_cache_source(self) -> Optional[str]: + """ + Get source of last cache hit (L1/L2/None) + + Returns: + "L1" if last hit was from memory cache + "L2" if last hit was from SQLite cache + None if last call was a cache miss or cache disabled + """ + return self._last_cache_source + + async def set(self, key: str, value: Any, cache_type: str, company_id: Optional[int] = None, + ttl: Optional[int] = None): + """ + Set value in cache (both L1 and L2) + + Args: + key: Cache key + value: Value to cache + cache_type: Type of cache entry + company_id: Company ID (for company-specific caches) + ttl: Time to live (uses default for cache_type if not provided) + """ + if not self.config.enabled: + return + + if ttl is None: + ttl = self.config.get_ttl_for_type(cache_type) + + # Store in both L1 and L2 + await self.memory.set(key, value, ttl) + await self.sqlite.set(key, value, cache_type, company_id, ttl) + + logger.debug(f"Cache SET (L1 + L2): {key} (TTL: {ttl}s)") + + async def delete(self, key: str): + """Delete entry from both L1 and L2""" + await self.memory.delete(key) + await self.sqlite.delete(key) + logger.debug(f"Cache deleted: {key}") + + async def invalidate(self, company_id: Optional[int] = None, cache_type: Optional[str] = None): + """ + Invalidate cache entries + + Args: + company_id: If provided, clear only this company's cache + cache_type: If provided, clear only this cache type + """ + if company_id is not None and cache_type is not None: + # Clear specific company + type + from .keys import generate_key_pattern + pattern = generate_key_pattern(cache_type, company_id) + await self.memory.clear_by_pattern(pattern) + # SQLite: clear by company + type (needs query) + # For now, just clear by company + await self.sqlite.clear_by_company(company_id) + logger.info(f"Cache invalidated: company={company_id}, type={cache_type}") + + elif company_id is not None: + # Clear all for company + from .keys import generate_key_pattern + # Clear all types for this company (pattern match all) + # Memory: need to iterate and match company_id in key + # For simplicity, clear by pattern prefix + await self.memory.clear() # TODO: improve pattern matching + await self.sqlite.clear_by_company(company_id) + logger.info(f"Cache invalidated: company={company_id}") + + elif cache_type is not None: + # Clear all for type + from .keys import generate_key_pattern + pattern = generate_key_pattern(cache_type) + await self.memory.clear_by_pattern(pattern) + await self.sqlite.clear_by_type(cache_type) + logger.info(f"Cache invalidated: type={cache_type}") + + else: + # Clear everything + await self.memory.clear() + await self.sqlite.clear() + logger.info("Cache invalidated: ALL") + + async def is_enabled_for_user(self, username: Optional[str]) -> bool: + """ + Check if cache is enabled for specific user + + Args: + username: Username to check + + Returns: + True if cache enabled for user, False otherwise + """ + if not self.config.enabled: + return False + + if username is None: + return True + + # Check per-user setting + return await self.sqlite.get_user_cache_enabled(username) + + async def set_user_cache_enabled(self, username: str, enabled: bool): + """Set user cache enabled/disabled""" + await self.sqlite.set_user_cache_enabled(username, enabled) + logger.info(f"User cache setting: {username} -> {enabled}") + + # Benchmarking + + async def get_benchmark(self, cache_type: str) -> Optional[float]: + """Get average benchmark time for cache type""" + return await self.sqlite.get_benchmark(cache_type) + + async def update_benchmark(self, cache_type: str, new_time_ms: float): + """ + Update benchmark with new measurement (exponential moving average) + + Args: + cache_type: Type of cache + new_time_ms: New measured time in milliseconds + """ + current_avg = await self.sqlite.get_benchmark(cache_type) + + if current_avg is None: + # First measurement + new_avg = new_time_ms + sample_count = 1 + else: + # Exponential moving average (alpha = 0.1) + new_avg = 0.9 * current_avg + 0.1 * new_time_ms + # Get current sample count (TODO: retrieve from DB) + sample_count = 1 # Simplified for now + + await self.sqlite.set_benchmark(cache_type, new_avg, sample_count) + logger.debug(f"Benchmark updated: {cache_type} -> {new_avg:.2f}ms") + + # Performance Tracking + + async def track_performance(self, cache_type: str, is_hit: bool, actual_time_ms: float, + time_saved_ms: Optional[float] = None, + estimated_oracle_time_ms: Optional[float] = None, + company_id: Optional[int] = None, + username: Optional[str] = None): + """ + Track performance metric + + Args: + cache_type: Type of cache + is_hit: True if cache hit, False if cache miss + actual_time_ms: Actual response time + time_saved_ms: Time saved by cache (for hits) + estimated_oracle_time_ms: Estimated Oracle time (for hits) + company_id: Company ID + username: Username + """ + if not self.config.track_performance: + return + + await self.sqlite.log_performance( + cache_type=cache_type, + company_id=company_id, + cache_hit=is_hit, + response_time_ms=actual_time_ms, + estimated_oracle_time_ms=estimated_oracle_time_ms, + time_saved_ms=time_saved_ms, + username=username + ) + + # Statistics + + async def get_stats(self) -> dict: + """Get comprehensive cache statistics""" + memory_stats = self.memory.get_stats() + sqlite_stats = await self.sqlite.get_stats() + + return { + 'enabled': self.config.enabled, + 'cache_type': self.config.cache_type, + 'memory': memory_stats, + 'sqlite': sqlite_stats, + } + + # Cleanup + + async def _cleanup_loop(self): + """Background task to cleanup expired entries""" + while True: + try: + await asyncio.sleep(self.config.cleanup_interval) + await self._cleanup_expired() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Cleanup error: {e}", exc_info=True) + + async def _cleanup_expired(self): + """Remove expired entries from both caches""" + logger.info("Running cache cleanup...") + await self.memory.cleanup_expired() + await self.sqlite.cleanup_expired() + logger.info("Cache cleanup completed") + + +# Global cache manager instance +_cache_manager: Optional[CacheManager] = None + + +async def init_cache(config: CacheConfig): + """Initialize global cache manager""" + global _cache_manager + if _cache_manager is not None: + logger.warning("Cache already initialized") + return + + _cache_manager = CacheManager(config) + await _cache_manager.init() + logger.info("Global cache manager initialized") + + +def get_cache() -> Optional[CacheManager]: + """Get global cache manager instance""" + return _cache_manager + + +async def close_cache(): + """Close global cache manager""" + global _cache_manager + if _cache_manager is not None: + await _cache_manager.close() + _cache_manager = None diff --git a/reports-app/backend/app/cache/config.py b/reports-app/backend/app/cache/config.py new file mode 100644 index 0000000..71ab27e --- /dev/null +++ b/reports-app/backend/app/cache/config.py @@ -0,0 +1,83 @@ +""" +Cache configuration from environment variables +""" +import os +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class CacheConfig: + """Cache configuration loaded from environment variables""" + + # Core Settings + enabled: bool + cache_type: str # 'hybrid', 'memory', 'sqlite', 'disabled' + sqlite_path: str + memory_max_size: int + default_ttl: int + + # TTL per Cache Type (seconds) + ttl_schema: int + ttl_companies: int + ttl_dashboard_summary: int + ttl_dashboard_trends: int + ttl_invoices: int + ttl_invoices_summary: int + ttl_treasury: int + + # Maintenance + cleanup_interval: int + + # Event-Based Invalidation + auto_invalidate_enabled: bool + check_interval: int + + # Performance Tracking + track_performance: bool + benchmark_on_startup: bool + + @classmethod + def from_env(cls) -> 'CacheConfig': + """Load configuration from environment variables""" + return cls( + # Core Settings + enabled=os.getenv('CACHE_ENABLED', 'True').lower() == 'true', + cache_type=os.getenv('CACHE_TYPE', 'hybrid'), + sqlite_path=os.getenv('CACHE_SQLITE_PATH', './cache_data/roa2web_cache.db'), + memory_max_size=int(os.getenv('CACHE_MEMORY_MAX_SIZE', '1000')), + default_ttl=int(os.getenv('CACHE_DEFAULT_TTL', '900')), + + # TTL per Cache Type + ttl_schema=int(os.getenv('CACHE_TTL_SCHEMA', '86400')), + ttl_companies=int(os.getenv('CACHE_TTL_COMPANIES', '1800')), + ttl_dashboard_summary=int(os.getenv('CACHE_TTL_DASHBOARD_SUMMARY', '1800')), + ttl_dashboard_trends=int(os.getenv('CACHE_TTL_DASHBOARD_TRENDS', '1800')), + ttl_invoices=int(os.getenv('CACHE_TTL_INVOICES', '600')), + ttl_invoices_summary=int(os.getenv('CACHE_TTL_INVOICES_SUMMARY', '900')), + ttl_treasury=int(os.getenv('CACHE_TTL_TREASURY', '600')), + + # Maintenance + cleanup_interval=int(os.getenv('CACHE_CLEANUP_INTERVAL', '3600')), + + # Event-Based Invalidation + auto_invalidate_enabled=os.getenv('CACHE_AUTO_INVALIDATE', 'False').lower() == 'true', + check_interval=int(os.getenv('CACHE_CHECK_INTERVAL', '300')), + + # Performance Tracking + track_performance=os.getenv('CACHE_TRACK_PERFORMANCE', 'True').lower() == 'true', + benchmark_on_startup=os.getenv('CACHE_BENCHMARK_ON_STARTUP', 'True').lower() == 'true', + ) + + def get_ttl_for_type(self, cache_type: str) -> int: + """Get TTL for specific cache type""" + ttl_map = { + 'schema': self.ttl_schema, + 'companies': self.ttl_companies, + 'dashboard_summary': self.ttl_dashboard_summary, + 'dashboard_trends': self.ttl_dashboard_trends, + 'invoices': self.ttl_invoices, + 'invoices_summary': self.ttl_invoices_summary, + 'treasury': self.ttl_treasury, + } + return ttl_map.get(cache_type, self.default_ttl) diff --git a/reports-app/backend/app/cache/decorators.py b/reports-app/backend/app/cache/decorators.py new file mode 100644 index 0000000..eb3857d --- /dev/null +++ b/reports-app/backend/app/cache/decorators.py @@ -0,0 +1,254 @@ +""" +Cache decorators for service methods +""" +import time +import logging +from functools import wraps +from typing import Callable, Optional, List + +from .cache_manager import get_cache +from .keys import generate_cache_key + +logger = logging.getLogger(__name__) + + +def cached(cache_type: str, ttl: Optional[int] = None, key_params: Optional[List[str]] = None): + """ + Decorator for caching service method results with performance tracking + + Usage: + @cached(cache_type='dashboard_summary', key_params=['company', 'username']) + async def get_complete_summary(company: str, username: str): + # ... Oracle query logic ... + + Features: + - Automatic cache key generation from function parameters + - Performance timing (cache hit vs miss) + - Benchmark tracking for time saved calculation + - Per-user cache enable/disable + - Global cache toggle + - Transparent - zero changes to function logic + + Args: + cache_type: Type of cache (used for TTL lookup and stats) + ttl: Optional custom TTL (overrides config default) + key_params: List of parameter names to include in cache key + + Returns: + Decorated async function + """ + def decorator(func: Callable): + @wraps(func) + async def wrapper(*args, **kwargs): + start_time = time.time() + cache = get_cache() + + # Extract username for per-user settings + username = _extract_username(args, kwargs, key_params) + + # Check if cache is enabled (global + per-user) + cache_enabled = await cache.is_enabled_for_user(username) if cache else False + + if not cache or not cache_enabled: + # Cache disabled - execute directly + result = await func(*args, **kwargs) + elapsed_ms = (time.time() - start_time) * 1000 + + # Set metadata in request.state if available (for API responses) + if 'request' in kwargs and hasattr(kwargs['request'], 'state'): + kwargs['request'].state.cache_hit = False + kwargs['request'].state.response_time_ms = elapsed_ms + kwargs['request'].state.cache_source = None + + if cache and cache.config.track_performance: + await cache.track_performance( + cache_type=cache_type, + is_hit=False, + actual_time_ms=elapsed_ms, + username=username + ) + + return result + + # Generate cache key from function parameters + cache_key = generate_cache_key(cache_type, key_params, args, kwargs) + + # Try to get from cache + cached_value = await cache.get(cache_key, cache_type) + + if cached_value is not None: + # ✅ CACHE HIT + elapsed_ms = (time.time() - start_time) * 1000 + + # Set metadata in request.state if available (for API responses) + if 'request' in kwargs and hasattr(kwargs['request'], 'state'): + cache_source_value = cache.get_last_cache_source() # L1 or L2 + kwargs['request'].state.cache_hit = True + kwargs['request'].state.response_time_ms = elapsed_ms + kwargs['request'].state.cache_source = cache_source_value + + # Get benchmark for calculating time saved + benchmark = await cache.get_benchmark(cache_type) + time_saved_ms = (benchmark - elapsed_ms) if benchmark else None + + # Track performance + if cache.config.track_performance: + await cache.track_performance( + cache_type=cache_type, + is_hit=True, + actual_time_ms=elapsed_ms, + time_saved_ms=time_saved_ms, + estimated_oracle_time_ms=benchmark, + company_id=_extract_company_id(args, kwargs, key_params), + username=username + ) + + return cached_value + + # ❌ CACHE MISS - execute function (query Oracle) + result = await func(*args, **kwargs) + elapsed_ms = (time.time() - start_time) * 1000 + + # Set metadata in request.state if available (for API responses) + if 'request' in kwargs and hasattr(kwargs['request'], 'state'): + kwargs['request'].state.cache_hit = False + kwargs['request'].state.response_time_ms = elapsed_ms + kwargs['request'].state.cache_source = None + + # Update benchmark with real Oracle time + await cache.update_benchmark(cache_type, elapsed_ms) + + # Track performance + if cache.config.track_performance: + await cache.track_performance( + cache_type=cache_type, + is_hit=False, + actual_time_ms=elapsed_ms, + company_id=_extract_company_id(args, kwargs, key_params), + username=username + ) + + # Store in cache for next time + company_id = _extract_company_id(args, kwargs, key_params) + await cache.set(cache_key, result, cache_type, company_id, ttl) + + return result + + return wrapper + return decorator + + +def _extract_username(args, kwargs, key_params: Optional[List[str]]) -> Optional[str]: + """ + Extract username from function parameters (args or kwargs) + + Checks: + 1. key_params position in args (if username is in key_params) + 2. Direct username in kwargs + 3. current_user object in kwargs + 4. user object in kwargs + 5. request.state.user (from AuthenticationMiddleware) + + Args: + args: Positional arguments + kwargs: Keyword arguments + key_params: List of parameter names (for finding position in args) + + Returns: + Username string or None + """ + # Try to find username in args based on key_params position + if key_params and 'username' in key_params: + try: + username_idx = key_params.index('username') + if username_idx < len(args): + username = args[username_idx] + if username: + return str(username) + except (ValueError, IndexError): + pass + + # Direct username parameter in kwargs + if 'username' in kwargs: + return kwargs['username'] + + # Current user object (from FastAPI Depends) + if 'current_user' in kwargs: + user = kwargs['current_user'] + if hasattr(user, 'username'): + return user.username + elif isinstance(user, dict) and 'username' in user: + return user['username'] + return str(user) + + # User object + if 'user' in kwargs: + user = kwargs['user'] + if hasattr(user, 'username'): + return user.username + elif isinstance(user, dict) and 'username' in user: + return user['username'] + return str(user) + + # Extract from request.state.user (set by AuthenticationMiddleware) + if 'request' in kwargs: + request = kwargs['request'] + if hasattr(request, 'state') and hasattr(request.state, 'user'): + user = request.state.user + if hasattr(user, 'username'): + return user.username + elif isinstance(user, dict) and 'username' in user: + return user['username'] + + return None + + +def _extract_company_id(args, kwargs, key_params: Optional[List[str]]) -> Optional[int]: + """ + Extract company_id from function parameters for cache indexing + + Tries multiple approaches: + 1. Direct company_id in kwargs + 2. company parameter (converted to int) + 3. Positional args based on key_params position + + Args: + args: Positional arguments + kwargs: Keyword arguments + key_params: List of parameter names + + Returns: + Company ID as integer or None + """ + # Try kwargs first + if 'company_id' in kwargs: + try: + return int(kwargs['company_id']) + except (ValueError, TypeError): + pass + + if 'company' in kwargs: + try: + return int(kwargs['company']) + except (ValueError, TypeError): + pass + + # Try positional args based on key_params + if key_params: + if 'company_id' in key_params: + idx = key_params.index('company_id') + if idx < len(args): + try: + return int(args[idx]) + except (ValueError, TypeError): + pass + + elif 'company' in key_params: + idx = key_params.index('company') + if idx < len(args): + try: + return int(args[idx]) + except (ValueError, TypeError): + pass + + return None diff --git a/reports-app/backend/app/cache/event_monitor.py b/reports-app/backend/app/cache/event_monitor.py new file mode 100644 index 0000000..1dbc4a4 --- /dev/null +++ b/reports-app/backend/app/cache/event_monitor.py @@ -0,0 +1,333 @@ +""" +Event-based cache invalidation monitor + +Monitors {schema}.act tables for changes and invalidates cache automatically +""" +import asyncio +import logging +import sys +import os +from typing import Optional + +# Add shared to path for Oracle pool access +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../..'))) + +logger = logging.getLogger(__name__) + + +class EventMonitor: + """ + Monitors schema.act tables for changes to trigger cache invalidation + + Runs as background task, checking max(id_act) at configured intervals + Uses permanent schema_mappings cache to avoid repeated schema lookups + """ + + def __init__(self, cache_manager, config): + """ + Initialize event monitor + + Args: + cache_manager: CacheManager instance + config: CacheConfig instance + """ + self.cache = cache_manager + self.config = config + self.running = False + self.task: Optional[asyncio.Task] = None + + async def start(self): + """Start monitoring task""" + if self.running: + logger.warning("Event monitor already running") + return + + self.running = True + self.task = asyncio.create_task(self._monitor_loop()) + logger.info( + f"Event monitor started (interval: {self.config.check_interval}s)" + ) + + async def stop(self): + """Stop monitoring task""" + if not self.running: + return + + self.running = False + if self.task: + self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + pass + + logger.info("Event monitor stopped") + + async def _monitor_loop(self): + """Main monitoring loop""" + while self.running: + try: + await self._check_all_companies() + await asyncio.sleep(self.config.check_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Event monitor error: {e}", exc_info=True) + # Wait 1 minute on error before retrying + await asyncio.sleep(60) + + async def _check_all_companies(self): + """ + Check all companies with active cache for changes + + Queries max(id_act) from {schema}.act for each cached company + and invalidates cache if changes detected + """ + try: + # Get list of companies with active cache entries + cached_companies = await self.cache.sqlite.get_cached_company_ids() + + if not cached_companies: + logger.debug("No cached companies to monitor") + return + + logger.info(f"Checking {len(cached_companies)} companies for changes...") + invalidated_count = 0 + + for company_id in cached_companies: + try: + # Check if company data changed + changed = await self._check_company_changes(company_id) + + if changed: + # Invalidate cache for this company + await self.cache.invalidate(company_id=company_id) + invalidated_count += 1 + logger.info( + f"Cache invalidated for company {company_id} due to act changes" + ) + + except Exception as e: + # Error for one company shouldn't stop checking others + logger.error(f"Error checking company {company_id}: {e}") + continue + + if invalidated_count > 0: + logger.info( + f"Auto-invalidation complete: {invalidated_count} companies affected" + ) + + except Exception as e: + logger.error(f"Check all companies error: {e}", exc_info=True) + + async def _check_company_changes(self, company_id: int) -> bool: + """ + Check if company data changed (monitor max(id_act) in schema.act) + + Args: + company_id: Company ID to check + + Returns: + True if cache should be invalidated, False otherwise + """ + try: + # 1. Get schema (from permanent cache) + schema = await self._get_schema_for_company(company_id) + if not schema: + logger.warning(f"Schema not found for company {company_id}") + return False + + # 2. Get current max(id_act) from Oracle + current_max = await self._get_max_id_act(schema) + + # 3. Get cached watermark + cached_watermark = await self.cache.sqlite.get_watermark(company_id) + + # 4. Compare + if cached_watermark is None: + # First time checking - store watermark, no invalidation + await self.cache.sqlite.set_watermark(company_id, schema, current_max) + logger.debug( + f"Watermark initialized for company {company_id}: {current_max}" + ) + return False + + if current_max > cached_watermark: + # Changes detected! + logger.info( + f"Schema {schema} (company {company_id}): " + f"id_act changed {cached_watermark} → {current_max}" + ) + + # Update watermark + await self.cache.sqlite.set_watermark(company_id, schema, current_max) + + return True # Invalidate cache + + # No changes + return False + + except Exception as e: + logger.error(f"Check company {company_id} changes error: {e}") + return False # Don't invalidate on error + + async def _get_schema_for_company(self, company_id: int) -> Optional[str]: + """ + Get schema for company (with permanent caching) + + First checks permanent schema_mappings cache, + falls back to Oracle query if not cached + + Args: + company_id: Company ID + + Returns: + Schema name or None + """ + # Check permanent cache first + cached_schema = await self.cache.sqlite.get_schema_mapping(company_id) + if cached_schema: + logger.debug(f"Schema mapping HIT for company {company_id}: {cached_schema}") + return cached_schema + + # Cache MISS - query Oracle + logger.info(f"Schema mapping MISS for company {company_id}, querying Oracle...") + + try: + from shared.database.oracle_pool import oracle_pool + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + cursor.execute(""" + SELECT schema + FROM CONTAFIN_ORACLE.v_nom_firme + WHERE id_firma = :id + """, {'id': company_id}) + result = cursor.fetchone() + + if not result: + logger.warning(f"Company {company_id} not found in v_nom_firme") + return None + + schema = result[0] + + # Store PERMANENT in schema_mappings (never expires) + await self.cache.sqlite.set_schema_mapping(company_id, schema) + + logger.info(f"Schema mapping stored for company {company_id}: {schema}") + return schema + + except Exception as e: + logger.error(f"Get schema for company {company_id} error: {e}") + return None + + async def _get_max_id_act(self, schema: str) -> int: + """ + Query max(id_act) from {schema}.act + + Args: + schema: Schema name + + Returns: + Max id_act value (0 if table empty) + """ + try: + from shared.database.oracle_pool import oracle_pool + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + # IMPORTANT: Schema comes from v_nom_firme (trusted source) + # so it's safe from SQL injection + query = f"SELECT MAX(id_act) FROM {schema}.act" + cursor.execute(query) + + result = cursor.fetchone() + max_id_act = result[0] if result and result[0] is not None else 0 + + return max_id_act + + except Exception as e: + logger.error(f"Get max_id_act for schema {schema} error: {e}") + return 0 + + +# Optional: Preload all schema mappings at startup + +async def preload_all_schema_mappings(): + """ + Preload all schema mappings at startup (optional) + + Prevents cache misses on first requests by populating + schema_mappings table with all companies + """ + from .cache_manager import get_cache + + cache = get_cache() + if not cache: + logger.warning("Cache not initialized - skipping schema preload") + return + + logger.info("Preloading all schema mappings...") + + try: + from shared.database.oracle_pool import oracle_pool + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + cursor.execute(""" + SELECT id_firma, schema + FROM CONTAFIN_ORACLE.v_nom_firme + """) + results = cursor.fetchall() + + for id_firma, schema in results: + await cache.sqlite.set_schema_mapping(id_firma, schema) + + logger.info(f"Preloaded {len(results)} schema mappings") + + except Exception as e: + logger.error(f"Schema preload error: {e}") + + +# Global event monitor instance +_event_monitor: Optional[EventMonitor] = None + + +async def init_event_monitor(cache_manager, config): + """ + Initialize global event monitor + + Args: + cache_manager: CacheManager instance + config: CacheConfig instance + """ + global _event_monitor + _event_monitor = EventMonitor(cache_manager, config) + + # Start if auto-invalidate enabled + if config.auto_invalidate_enabled: + await _event_monitor.start() + + +def get_event_monitor() -> Optional[EventMonitor]: + """Get global event monitor instance""" + return _event_monitor + + +async def toggle_event_monitor(enabled: bool): + """ + Toggle event monitor on/off + + Args: + enabled: True to start monitoring, False to stop + """ + monitor = get_event_monitor() + if not monitor: + logger.warning("Event monitor not initialized") + return + + if enabled and not monitor.running: + await monitor.start() + elif not enabled and monitor.running: + await monitor.stop() diff --git a/reports-app/backend/app/cache/keys.py b/reports-app/backend/app/cache/keys.py new file mode 100644 index 0000000..c7d0849 --- /dev/null +++ b/reports-app/backend/app/cache/keys.py @@ -0,0 +1,150 @@ +""" +Cache key generation utilities +""" +import hashlib +import json +from typing import Any, List, Optional + + +def generate_cache_key(cache_type: str, key_params: Optional[List[str]], args: tuple, kwargs: dict) -> str: + """ + Generate cache key from function parameters + + Format: "{cache_type}:{param1_value}:{param2_value}:..." + + Args: + cache_type: Type of cache (e.g., 'dashboard_summary', 'invoices') + key_params: List of parameter names to include in key + args: Positional arguments from function call + kwargs: Keyword arguments from function call + + Returns: + Cache key string + + Examples: + generate_cache_key('schema', ['company_id'], (123,), {}) + -> "schema:123" + + generate_cache_key('dashboard_summary', ['company', 'username'], (), {'company': '123', 'username': 'john'}) + -> "dashboard_summary:123:john" + + generate_cache_key('invoices', ['company', 'invoice_type', 'status'], (123, 'CLIENTI', 'neplatite'), {}) + -> "invoices:123:CLIENTI:neplatite" + """ + key_parts = [cache_type] + + if not key_params: + # No specific params - use all args/kwargs (fallback) + if args: + key_parts.extend([str(arg) for arg in args]) + if kwargs: + # Sort kwargs for consistent key generation + sorted_kwargs = sorted(kwargs.items()) + key_parts.extend([f"{k}={v}" for k, v in sorted_kwargs]) + else: + # Extract specific params + for i, param_name in enumerate(key_params): + # Try to get from kwargs first + if param_name in kwargs: + value = kwargs[param_name] + # Then try positional args + elif i < len(args): + value = args[i] + else: + # Parameter not found - use placeholder + value = "none" + + key_parts.append(str(value)) + + return ":".join(key_parts) + + +def generate_key_pattern(cache_type: str, company_id: Optional[int] = None) -> str: + """ + Generate cache key pattern for matching multiple keys + + Used for invalidation by type or company + + Args: + cache_type: Type of cache + company_id: Optional company ID to filter by + + Returns: + Pattern string (prefix) + + Examples: + generate_key_pattern('dashboard_summary') + -> "dashboard_summary:" + + generate_key_pattern('dashboard_summary', 123) + -> "dashboard_summary:123" + """ + if company_id is not None: + return f"{cache_type}:{company_id}" + return f"{cache_type}:" + + +def hash_complex_params(params: dict) -> str: + """ + Generate hash for complex parameters (e.g., filters, queries) + + Used when cache key would be too long with full param values + + Args: + params: Dictionary of parameters to hash + + Returns: + 8-character hash string + + Example: + filters = {'status': 'neplatite', 'date_from': '2024-01-01', 'date_to': '2024-12-31'} + hash_complex_params(filters) + -> "a3f8b2c1" + """ + # Sort keys for consistent hashing + sorted_params = json.dumps(params, sort_keys=True) + hash_obj = hashlib.sha256(sorted_params.encode()) + # Return first 8 characters of hex digest + return hash_obj.hexdigest()[:8] + + +def extract_company_id_from_key(cache_key: str) -> Optional[int]: + """ + Extract company_id from cache key + + Assumes format: "cache_type:company_id:..." + + Args: + cache_key: Cache key string + + Returns: + Company ID or None if not found + + Example: + extract_company_id_from_key("dashboard_summary:123:john") + -> 123 + """ + parts = cache_key.split(":") + if len(parts) >= 2: + try: + return int(parts[1]) + except (ValueError, TypeError): + pass + return None + + +def extract_cache_type_from_key(cache_key: str) -> str: + """ + Extract cache_type from cache key + + Args: + cache_key: Cache key string + + Returns: + Cache type (first part before colon) + + Example: + extract_cache_type_from_key("dashboard_summary:123:john") + -> "dashboard_summary" + """ + return cache_key.split(":")[0] diff --git a/reports-app/backend/app/cache/memory_cache.py b/reports-app/backend/app/cache/memory_cache.py new file mode 100644 index 0000000..a1e9d45 --- /dev/null +++ b/reports-app/backend/app/cache/memory_cache.py @@ -0,0 +1,180 @@ +""" +In-memory cache with TTL (L1 cache) +Fast, limited size, lost on restart +""" +import time +import logging +from typing import Any, Optional, Dict +from collections import OrderedDict + +logger = logging.getLogger(__name__) + + +class MemoryCache: + """ + In-memory LRU cache with TTL support + + Features: + - LRU eviction when max_size reached + - Per-entry TTL expiration + - Thread-safe operations + - Fast O(1) get/set operations + """ + + def __init__(self, max_size: int = 1000): + """ + Initialize memory cache + + Args: + max_size: Maximum number of entries to store + """ + self.max_size = max_size + self._cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() + self._stats = { + 'hits': 0, + 'misses': 0, + 'sets': 0, + 'evictions': 0 + } + + async def get(self, key: str) -> Optional[Any]: + """ + Get value from cache + + Args: + key: Cache key + + Returns: + Cached value or None if not found/expired + """ + if key not in self._cache: + self._stats['misses'] += 1 + return None + + entry = self._cache[key] + + # Check TTL expiration + if entry['expires_at'] < time.time(): + # Expired - remove and return None + del self._cache[key] + self._stats['misses'] += 1 + logger.debug(f"Memory cache expired: {key}") + return None + + # Move to end (LRU - most recently used) + self._cache.move_to_end(key) + + self._stats['hits'] += 1 + logger.debug(f"Memory cache HIT: {key}") + return entry['value'] + + async def set(self, key: str, value: Any, ttl: int): + """ + Set value in cache + + Args: + key: Cache key + value: Value to cache + ttl: Time to live in seconds + """ + expires_at = time.time() + ttl + + # Check if we need to evict (LRU) + if key not in self._cache and len(self._cache) >= self.max_size: + # Evict oldest entry (first item in OrderedDict) + evicted_key = next(iter(self._cache)) + del self._cache[evicted_key] + self._stats['evictions'] += 1 + logger.debug(f"Memory cache evicted (LRU): {evicted_key}") + + # Store entry + self._cache[key] = { + 'value': value, + 'expires_at': expires_at, + 'created_at': time.time() + } + + # Move to end (most recently used) + self._cache.move_to_end(key) + + self._stats['sets'] += 1 + logger.debug(f"Memory cache SET: {key} (TTL: {ttl}s)") + + async def delete(self, key: str) -> bool: + """ + Delete entry from cache + + Args: + key: Cache key + + Returns: + True if deleted, False if not found + """ + if key in self._cache: + del self._cache[key] + logger.debug(f"Memory cache deleted: {key}") + return True + return False + + async def clear(self): + """Clear all entries from cache""" + count = len(self._cache) + self._cache.clear() + logger.info(f"Memory cache cleared: {count} entries removed") + + async def clear_by_pattern(self, pattern: str): + """ + Clear entries matching pattern (simple prefix match) + + Args: + pattern: Key prefix to match (e.g., "dashboard_summary:123") + """ + keys_to_delete = [key for key in self._cache.keys() if key.startswith(pattern)] + for key in keys_to_delete: + del self._cache[key] + + logger.info(f"Memory cache cleared by pattern '{pattern}': {len(keys_to_delete)} entries") + + async def cleanup_expired(self): + """Remove all expired entries""" + now = time.time() + expired_keys = [ + key for key, entry in self._cache.items() + if entry['expires_at'] < now + ] + + for key in expired_keys: + del self._cache[key] + + if expired_keys: + logger.info(f"Memory cache cleanup: {len(expired_keys)} expired entries removed") + + def get_stats(self) -> Dict[str, Any]: + """ + Get cache statistics + + Returns: + Dictionary with stats (hits, misses, size, etc.) + """ + total_requests = self._stats['hits'] + self._stats['misses'] + hit_rate = (self._stats['hits'] / total_requests * 100) if total_requests > 0 else 0 + + return { + 'size': len(self._cache), + 'max_size': self.max_size, + 'hits': self._stats['hits'], + 'misses': self._stats['misses'], + 'sets': self._stats['sets'], + 'evictions': self._stats['evictions'], + 'hit_rate': hit_rate, + 'total_requests': total_requests + } + + def reset_stats(self): + """Reset statistics counters""" + self._stats = { + 'hits': 0, + 'misses': 0, + 'sets': 0, + 'evictions': 0 + } diff --git a/reports-app/backend/app/cache/sqlite_cache.py b/reports-app/backend/app/cache/sqlite_cache.py new file mode 100644 index 0000000..c2f9f20 --- /dev/null +++ b/reports-app/backend/app/cache/sqlite_cache.py @@ -0,0 +1,404 @@ +""" +SQLite persistent cache (L2 cache) +Persistent, survives restarts, unlimited size +""" +import time +import json +import logging +import aiosqlite +from typing import Any, Optional, List, Dict +from pathlib import Path +from decimal import Decimal +from datetime import datetime, date + +logger = logging.getLogger(__name__) + + +class CustomJSONEncoder(json.JSONEncoder): + """Custom JSON encoder that handles Pydantic models, Decimal, datetime, etc.""" + def default(self, obj): + # Handle Pydantic models + if hasattr(obj, 'dict'): + return obj.dict() + if hasattr(obj, 'model_dump'): # Pydantic v2 + return obj.model_dump() + # Handle Decimal + if isinstance(obj, Decimal): + return float(obj) + # Handle datetime/date + if isinstance(obj, (datetime, date)): + return obj.isoformat() + return super().default(obj) + + +class SQLiteCache: + """ + SQLite-based persistent cache + + Features: + - Persistent storage (survives restarts) + - JSON serialization for complex objects + - Schema mappings (permanent cache for company->schema) + - Watermarks for event-based invalidation + - Performance tracking and benchmarks + """ + + def __init__(self, db_path: str): + """ + Initialize SQLite cache + + Args: + db_path: Path to SQLite database file + """ + self.db_path = db_path + self._ensure_db_dir() + + def _ensure_db_dir(self): + """Ensure database directory exists""" + db_dir = Path(self.db_path).parent + db_dir.mkdir(parents=True, exist_ok=True) + + async def init_db(self): + """Initialize database schema with WAL mode enabled""" + async with aiosqlite.connect(self.db_path) as db: + # Enable Write-Ahead Logging (WAL) mode for better concurrency + await db.execute("PRAGMA journal_mode=WAL") + await db.commit() + + # Table: cache_entries + await db.execute(""" + CREATE TABLE IF NOT EXISTS cache_entries ( + cache_key TEXT PRIMARY KEY, + cache_type TEXT NOT NULL, + company_id INTEGER, + data_json TEXT NOT NULL, + created_at REAL NOT NULL, + expires_at REAL NOT NULL, + hit_count INTEGER DEFAULT 0, + last_accessed REAL + ) + """) + await db.execute("CREATE INDEX IF NOT EXISTS idx_cache_type ON cache_entries(cache_type)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_company_id ON cache_entries(company_id)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_expires_at ON cache_entries(expires_at)") + + # Table: schema_mappings (PERMANENT) + await db.execute(""" + CREATE TABLE IF NOT EXISTS schema_mappings ( + id_firma INTEGER PRIMARY KEY, + schema TEXT NOT NULL, + created_at REAL NOT NULL, + last_verified REAL + ) + """) + + # Table: query_benchmarks + await db.execute(""" + CREATE TABLE IF NOT EXISTS query_benchmarks ( + cache_type TEXT PRIMARY KEY, + avg_time_ms REAL NOT NULL, + sample_count INTEGER DEFAULT 0, + last_updated REAL + ) + """) + + # Table: performance_log + await db.execute(""" + CREATE TABLE IF NOT EXISTS performance_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + cache_type TEXT NOT NULL, + company_id INTEGER, + cache_hit BOOLEAN NOT NULL, + response_time_ms REAL NOT NULL, + estimated_oracle_time_ms REAL, + time_saved_ms REAL, + username TEXT, + timestamp REAL NOT NULL + ) + """) + await db.execute("CREATE INDEX IF NOT EXISTS idx_perf_timestamp ON performance_log(timestamp)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_perf_cache_type ON performance_log(cache_type)") + + # Table: user_cache_settings + await db.execute(""" + CREATE TABLE IF NOT EXISTS user_cache_settings ( + username TEXT PRIMARY KEY, + cache_enabled BOOLEAN DEFAULT TRUE, + created_at REAL, + updated_at REAL + ) + """) + + # Table: cache_config + await db.execute(""" + CREATE TABLE IF NOT EXISTS cache_config ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at REAL + ) + """) + + # Table: cache_watermarks + await db.execute(""" + CREATE TABLE IF NOT EXISTS cache_watermarks ( + company_id INTEGER PRIMARY KEY, + schema TEXT NOT NULL, + max_id_act INTEGER NOT NULL, + checked_at REAL NOT NULL + ) + """) + + await db.commit() + logger.info("SQLite cache database initialized") + + async def get(self, key: str) -> Optional[Any]: + """ + Get value from cache + + Args: + key: Cache key + + Returns: + Cached value or None if not found/expired + """ + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(""" + SELECT data_json, expires_at + FROM cache_entries + WHERE cache_key = ? + """, (key,)) as cursor: + result = await cursor.fetchone() + + if not result: + return None + + data_json, expires_at = result + + # Check TTL expiration + if expires_at < time.time(): + # Expired - delete and return None + await db.execute("DELETE FROM cache_entries WHERE cache_key = ?", (key,)) + await db.commit() + logger.debug(f"SQLite cache expired: {key}") + return None + + # Update hit_count and last_accessed + await db.execute(""" + UPDATE cache_entries + SET hit_count = hit_count + 1, last_accessed = ? + WHERE cache_key = ? + """, (time.time(), key)) + await db.commit() + + logger.debug(f"SQLite cache HIT: {key}") + return json.loads(data_json) + + async def set(self, key: str, value: Any, cache_type: str, company_id: Optional[int], ttl: int): + """ + Set value in cache + + Args: + key: Cache key + value: Value to cache + cache_type: Type of cache entry + company_id: Company ID (None for global caches) + ttl: Time to live in seconds + """ + # Use custom encoder to handle Pydantic models, Decimal, datetime, etc. + data_json = json.dumps(value, cls=CustomJSONEncoder) + now = time.time() + expires_at = now + ttl + + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + INSERT OR REPLACE INTO cache_entries + (cache_key, cache_type, company_id, data_json, created_at, expires_at, hit_count, last_accessed) + VALUES (?, ?, ?, ?, ?, ?, 0, ?) + """, (key, cache_type, company_id, data_json, now, expires_at, now)) + await db.commit() + + logger.debug(f"SQLite cache SET: {key} (TTL: {ttl}s)") + + async def delete(self, key: str) -> bool: + """Delete entry from cache""" + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute("DELETE FROM cache_entries WHERE cache_key = ?", (key,)) + await db.commit() + deleted = cursor.rowcount > 0 + if deleted: + logger.debug(f"SQLite cache deleted: {key}") + return deleted + + async def clear(self): + """Clear all cache entries""" + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute("DELETE FROM cache_entries") + await db.commit() + count = cursor.rowcount + logger.info(f"SQLite cache cleared: {count} entries removed") + + async def clear_by_company(self, company_id: int): + """Clear all entries for specific company""" + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute("DELETE FROM cache_entries WHERE company_id = ?", (company_id,)) + await db.commit() + count = cursor.rowcount + logger.info(f"SQLite cache cleared for company {company_id}: {count} entries") + + async def clear_by_type(self, cache_type: str): + """Clear all entries of specific type""" + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute("DELETE FROM cache_entries WHERE cache_type = ?", (cache_type,)) + await db.commit() + count = cursor.rowcount + logger.info(f"SQLite cache cleared for type '{cache_type}': {count} entries") + + async def cleanup_expired(self): + """Remove all expired entries""" + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute("DELETE FROM cache_entries WHERE expires_at < ?", (time.time(),)) + await db.commit() + count = cursor.rowcount + if count > 0: + logger.info(f"SQLite cache cleanup: {count} expired entries removed") + + # Schema Mappings (PERMANENT) + + async def get_schema_mapping(self, company_id: int) -> Optional[str]: + """Get permanent cached schema for company""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(""" + SELECT schema + FROM schema_mappings + WHERE id_firma = ? + """, (company_id,)) as cursor: + result = await cursor.fetchone() + return result[0] if result else None + + async def set_schema_mapping(self, company_id: int, schema: str): + """Set permanent schema mapping (never expires)""" + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + INSERT OR REPLACE INTO schema_mappings + (id_firma, schema, created_at, last_verified) + VALUES (?, ?, ?, ?) + """, (company_id, schema, time.time(), time.time())) + await db.commit() + + # Benchmarks + + async def get_benchmark(self, cache_type: str) -> Optional[float]: + """Get average benchmark time for cache type""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(""" + SELECT avg_time_ms + FROM query_benchmarks + WHERE cache_type = ? + """, (cache_type,)) as cursor: + result = await cursor.fetchone() + return result[0] if result else None + + async def set_benchmark(self, cache_type: str, avg_time_ms: float, sample_count: int): + """Set/update benchmark""" + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + INSERT OR REPLACE INTO query_benchmarks + (cache_type, avg_time_ms, sample_count, last_updated) + VALUES (?, ?, ?, ?) + """, (cache_type, avg_time_ms, sample_count, time.time())) + await db.commit() + + # Performance Tracking + + async def log_performance(self, cache_type: str, company_id: Optional[int], cache_hit: bool, + response_time_ms: float, estimated_oracle_time_ms: Optional[float], + time_saved_ms: Optional[float], username: Optional[str]): + """Log performance metric""" + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + INSERT INTO performance_log + (cache_type, company_id, cache_hit, response_time_ms, estimated_oracle_time_ms, + time_saved_ms, username, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, (cache_type, company_id, cache_hit, response_time_ms, estimated_oracle_time_ms, + time_saved_ms, username, time.time())) + await db.commit() + + # User Settings + + async def get_user_cache_enabled(self, username: str) -> bool: + """Get user cache setting (default True)""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(""" + SELECT cache_enabled + FROM user_cache_settings + WHERE username = ? + """, (username,)) as cursor: + result = await cursor.fetchone() + return bool(result[0]) if result else True # Default enabled, explicit bool conversion + + async def set_user_cache_enabled(self, username: str, enabled: bool): + """Set user cache setting""" + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + INSERT OR REPLACE INTO user_cache_settings + (username, cache_enabled, created_at, updated_at) + VALUES (?, ?, ?, ?) + """, (username, enabled, time.time(), time.time())) + await db.commit() + + # Watermarks + + async def get_watermark(self, company_id: int) -> Optional[int]: + """Get cached watermark (max_id_act) for company""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(""" + SELECT max_id_act + FROM cache_watermarks + WHERE company_id = ? + """, (company_id,)) as cursor: + result = await cursor.fetchone() + return result[0] if result else None + + async def set_watermark(self, company_id: int, schema: str, max_id_act: int): + """Set/update watermark for company""" + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + INSERT OR REPLACE INTO cache_watermarks + (company_id, schema, max_id_act, checked_at) + VALUES (?, ?, ?, ?) + """, (company_id, schema, max_id_act, time.time())) + await db.commit() + + async def get_cached_company_ids(self) -> List[int]: + """Get list of company_ids with active cache entries""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(""" + SELECT DISTINCT company_id + FROM cache_entries + WHERE company_id IS NOT NULL + AND expires_at > ? + """, (time.time(),)) as cursor: + results = await cursor.fetchall() + return [row[0] for row in results] + + # Statistics + + async def get_stats(self) -> Dict[str, Any]: + """Get cache statistics""" + async with aiosqlite.connect(self.db_path) as db: + # Total entries + async with db.execute("SELECT COUNT(*) FROM cache_entries") as cursor: + total_entries = (await cursor.fetchone())[0] + + # Active entries (not expired) + async with db.execute(""" + SELECT COUNT(*) FROM cache_entries WHERE expires_at > ? + """, (time.time(),)) as cursor: + active_entries = (await cursor.fetchone())[0] + + return { + 'total_entries': total_entries, + 'active_entries': active_entries, + 'expired_entries': total_entries - active_entries + } diff --git a/reports-app/backend/app/main.py b/reports-app/backend/app/main.py index 98c671f..afa2321 100644 --- a/reports-app/backend/app/main.py +++ b/reports-app/backend/app/main.py @@ -25,7 +25,7 @@ from auth.middleware import AuthenticationMiddleware # from auth.routes import create_auth_router # Fixed inline # Import routere locale -from app.routers import invoices, dashboard, treasury, companies, telegram +from app.routers import invoices, dashboard, treasury, companies, telegram, cache # Auth endpoints pentru test from fastapi import APIRouter, HTTPException @@ -159,21 +159,133 @@ def create_auth_router(): @asynccontextmanager async def lifespan(app: FastAPI): """Lifecycle events pentru aplicație""" - # Startup + # Startup - Initialize Oracle connection pool await oracle_pool.initialize() - print("[ROA Reports API] Started successfully") + logger.info("[ROA Reports API] Oracle pool initialized") + + # Initialize cache system + from app.cache import init_cache, run_baseline_benchmarks, init_event_monitor, get_cache + from app.cache.config import CacheConfig + + try: + cache_config = CacheConfig.from_env() + await init_cache(cache_config) + logger.info(f"[ROA Reports API] Cache initialized: type={cache_config.cache_type}, enabled={cache_config.enabled}") + + # Run baseline benchmarks (optional, based on config) + if cache_config.benchmark_on_startup: + logger.info("[ROA Reports API] Running baseline performance benchmarks...") + benchmarks = await run_baseline_benchmarks() + logger.info(f"[ROA Reports API] Benchmarks completed: {len(benchmarks)} types measured") + + # Initialize event monitor + cache = get_cache() + await init_event_monitor(cache, cache_config) + if cache_config.auto_invalidate_enabled: + logger.info("[ROA Reports API] Event-based auto-invalidation ENABLED") + else: + logger.info("[ROA Reports API] Event-based auto-invalidation DISABLED") + + except Exception as e: + logger.error(f"[ROA Reports API] Cache initialization error: {e}", exc_info=True) + logger.warning("[ROA Reports API] Continuing without cache") + + logger.info("[ROA Reports API] Started successfully") + yield + # Shutdown + from app.cache import close_cache, get_event_monitor + + # Stop event monitor + try: + monitor = get_event_monitor() + if monitor: + await monitor.stop() + logger.info("[ROA Reports API] Event monitor stopped") + except Exception as e: + logger.error(f"[ROA Reports API] Event monitor shutdown error: {e}") + + # Close cache + try: + await close_cache() + logger.info("[ROA Reports API] Cache closed") + except Exception as e: + logger.error(f"[ROA Reports API] Cache shutdown error: {e}") + await oracle_pool.close_pool() - print("[ROA Reports API] Stopped") + logger.info("[ROA Reports API] Stopped") app = FastAPI( title="ROA Reports API", description="API pentru rapoarte ERP - facturi, încasări și alte rapoarte financiare", version="1.0.0", - lifespan=lifespan + # lifespan=lifespan # Using event handlers instead due to uvicorn compatibility issues ) +# STARTUP EVENT HANDLER (alternative to lifespan) +@app.on_event("startup") +async def startup_event(): + """Application startup - Initialize Oracle pool and cache""" + print("=" * 80, flush=True) + print("[STARTUP] Initializing Oracle pool...", flush=True) + logger.critical("=" * 80) + logger.critical("[STARTUP] Initializing Oracle pool...") + await oracle_pool.initialize() + print("[STARTUP] Oracle pool initialized", flush=True) + logger.critical("[STARTUP] Oracle pool initialized") + + print("[STARTUP] Initializing cache system...", flush=True) + logger.critical("[STARTUP] Initializing cache system...") + from app.cache import init_cache, init_event_monitor, get_cache + from app.cache.config import CacheConfig + + try: + cache_config = CacheConfig.from_env() + await init_cache(cache_config) + print(f"[STARTUP] Cache initialized: type={cache_config.cache_type}, enabled={cache_config.enabled}", flush=True) + logger.critical(f"[STARTUP] Cache initialized: type={cache_config.cache_type}, enabled={cache_config.enabled}") + + # Initialize event monitor + cache = get_cache() + await init_event_monitor(cache, cache_config) + if cache_config.auto_invalidate_enabled: + logger.info("[STARTUP] Event-based auto-invalidation ENABLED") + else: + logger.info("[STARTUP] Event-based auto-invalidation DISABLED") + + except Exception as e: + logger.error(f"[STARTUP] Cache initialization error: {e}", exc_info=True) + logger.warning("[STARTUP] Continuing without cache") + + logger.info("[STARTUP] ROA Reports API started successfully") + logger.info("=" * 80) + +# SHUTDOWN EVENT HANDLER +@app.on_event("shutdown") +async def shutdown_event(): + """Application shutdown - Cleanup resources""" + logger.info("[SHUTDOWN] Stopping event monitor...") + from app.cache import close_cache, get_event_monitor + + try: + monitor = get_event_monitor() + if monitor: + await monitor.stop() + logger.info("[SHUTDOWN] Event monitor stopped") + except Exception as e: + logger.error(f"[SHUTDOWN] Event monitor error: {e}") + + try: + await close_cache() + logger.info("[SHUTDOWN] Cache closed") + except Exception as e: + logger.error(f"[SHUTDOWN] Cache error: {e}") + + await oracle_pool.close_pool() + logger.info("[SHUTDOWN] Oracle pool closed") + logger.info("[SHUTDOWN] ROA Reports API stopped") + # CORS pentru frontend Vue.js app.add_middleware( CORSMiddleware, @@ -184,7 +296,6 @@ app.add_middleware( ) # Authentication middleware -print("[MAIN DEBUG] Adding AuthenticationMiddleware") app.add_middleware( AuthenticationMiddleware, excluded_paths=[ @@ -194,7 +305,6 @@ app.add_middleware( "/api/telegram/health" # Health check for Telegram router ] ) -print("[MAIN DEBUG] AuthenticationMiddleware added - FRESH RESTART - AUTH FIX APPLIED") # Include routere with /api prefix auth_router = create_auth_router() @@ -204,6 +314,7 @@ app.include_router(invoices.router, prefix="/api/invoices", tags=["invoices"]) app.include_router(dashboard.router, prefix="/api/dashboard", tags=["dashboard"]) app.include_router(treasury.router, prefix="/api/treasury", tags=["treasury"]) app.include_router(telegram.router, prefix="/api/telegram", tags=["telegram"]) +app.include_router(cache.router, prefix="/api", tags=["cache"]) @app.get("/") async def root(): diff --git a/reports-app/backend/app/routers/cache.py b/reports-app/backend/app/routers/cache.py new file mode 100644 index 0000000..2b1b588 --- /dev/null +++ b/reports-app/backend/app/routers/cache.py @@ -0,0 +1,399 @@ +""" +API Router pentru managementul cache-ului +""" +from fastapi import APIRouter, Depends, HTTPException, Request +from pydantic import BaseModel +from typing import Optional, Dict, Any +import sys +import os +import time +from datetime import datetime, timedelta + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../shared')) + +from auth.dependencies import get_current_user +from auth.models import CurrentUser +from ..cache import get_cache, get_event_monitor, toggle_event_monitor + +router = APIRouter(prefix="/cache", tags=["cache"]) + + +# Pydantic Models + +class CacheStatsResponse(BaseModel): + """Răspuns statistici cache""" + enabled: bool + global_enabled: bool + user_enabled: bool + cache_type: str + hit_rate: float + total_hits: int + total_misses: int + queries_saved: Dict[str, int] + response_times: Dict[str, Dict[str, Any]] + cache_size: Dict[str, int] + auto_invalidate: bool + last_cleanup: Optional[str] = None + + +class InvalidateCacheRequest(BaseModel): + """Request pentru invalidare cache""" + company_id: Optional[int] = None + cache_type: Optional[str] = None + + +class ToggleUserCacheRequest(BaseModel): + """Request pentru toggle cache per-user""" + enabled: bool + + +class ToggleGlobalCacheRequest(BaseModel): + """Request pentru toggle cache global""" + enabled: bool + + +class ToggleAutoInvalidateRequest(BaseModel): + """Request pentru toggle auto-invalidation""" + enabled: bool + + +# Helper Functions + +async def _calculate_cache_stats() -> Dict[str, Any]: + """Calculate comprehensive cache statistics""" + cache = get_cache() + if not cache: + raise HTTPException(status_code=503, detail="Cache not initialized") + + # Get basic cache stats + stats = await cache.get_stats() + + # Calculate hit rate + memory_stats = stats.get('memory', {}) + total_hits = memory_stats.get('hits', 0) + total_misses = memory_stats.get('misses', 0) + total_requests = total_hits + total_misses + hit_rate = (total_hits / total_requests * 100) if total_requests > 0 else 0 + + # Calculate queries saved (from performance_log) + queries_saved = await _calculate_queries_saved(cache) + + # Calculate response times per cache type + response_times = await _calculate_response_times(cache) + + # Get cache sizes + cache_size = { + 'memory': memory_stats.get('size', 0), + 'sqlite': stats.get('sqlite', {}).get('active_entries', 0) + } + + # Get event monitor status + monitor = get_event_monitor() + auto_invalidate = monitor.running if monitor else False + + return { + 'enabled': cache.config.enabled, + 'global_enabled': cache.config.enabled, + 'cache_type': cache.config.cache_type, + 'hit_rate': round(hit_rate, 1), + 'total_hits': total_hits, + 'total_misses': total_misses, + 'queries_saved': queries_saved, + 'response_times': response_times, + 'cache_size': cache_size, + 'auto_invalidate': auto_invalidate, + 'last_cleanup': None # TODO: track last cleanup time + } + + +async def _calculate_queries_saved(cache) -> Dict[str, int]: + """Calculate queries saved by time period""" + import aiosqlite + + try: + async with aiosqlite.connect(cache.sqlite.db_path) as db: + now = time.time() + today_start = now - 86400 # 24 hours + week_start = now - 604800 # 7 days + + # Today + async with db.execute(""" + SELECT COUNT(*) FROM performance_log + WHERE cache_hit = 1 AND timestamp >= ? + """, (today_start,)) as cursor: + today = (await cursor.fetchone())[0] + + # This week + async with db.execute(""" + SELECT COUNT(*) FROM performance_log + WHERE cache_hit = 1 AND timestamp >= ? + """, (week_start,)) as cursor: + week = (await cursor.fetchone())[0] + + # All time + async with db.execute(""" + SELECT COUNT(*) FROM performance_log + WHERE cache_hit = 1 + """) as cursor: + total = (await cursor.fetchone())[0] + + return { + 'today': today, + 'week': week, + 'total': total + } + except Exception as e: + return {'today': 0, 'week': 0, 'total': 0} + + +async def _calculate_response_times(cache) -> Dict[str, Dict[str, Any]]: + """Calculate average response times per cache type""" + import aiosqlite + + try: + async with aiosqlite.connect(cache.sqlite.db_path) as db: + # Get average times per cache type + async with db.execute(""" + SELECT + cache_type, + AVG(CASE WHEN cache_hit = 1 THEN response_time_ms ELSE NULL END) as avg_cached, + AVG(CASE WHEN cache_hit = 0 THEN response_time_ms ELSE NULL END) as avg_oracle + FROM performance_log + WHERE timestamp >= ? + GROUP BY cache_type + """, (time.time() - 86400,)) as cursor: # Last 24 hours + results = await cursor.fetchall() + + response_times = {} + for row in results: + cache_type, avg_cached, avg_oracle = row + if avg_cached and avg_oracle: + improvement = int((avg_oracle - avg_cached) / avg_oracle * 100) + response_times[cache_type] = { + 'cached': int(avg_cached), + 'oracle': int(avg_oracle), + 'improvement': improvement + } + + return response_times + except Exception as e: + return {} + + +# API Endpoints + +@router.get("/stats", response_model=CacheStatsResponse) +async def get_cache_stats( + current_user: CurrentUser = Depends(get_current_user) +): + """ + Obține statistici complete cache + + Returns: + - Hit rate, queries saved, response times + - Cache sizes (memory + SQLite) + - Auto-invalidation status + - Per-user cache setting + """ + try: + cache = get_cache() + if not cache: + raise HTTPException(status_code=503, detail="Cache not initialized") + + # Get base stats + stats = await _calculate_cache_stats() + + # Add user-specific setting + user_enabled = await cache.is_enabled_for_user(current_user.username) + stats['user_enabled'] = user_enabled + + return CacheStatsResponse(**stats) + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error retrieving cache stats: {str(e)}") + + +@router.post("/invalidate") +async def invalidate_cache( + request: InvalidateCacheRequest, + current_user: CurrentUser = Depends(get_current_user) +): + """ + Invalidează cache + + Args: + company_id: Opțional - invalidează doar pentru această companie + cache_type: Opțional - invalidează doar acest tip de cache + + Returns: + Message de confirmare + """ + try: + cache = get_cache() + if not cache: + raise HTTPException(status_code=503, detail="Cache not initialized") + + await cache.invalidate( + company_id=request.company_id, + cache_type=request.cache_type + ) + + if request.company_id and request.cache_type: + message = f"Cache invalidated for company {request.company_id}, type {request.cache_type}" + elif request.company_id: + message = f"Cache invalidated for company {request.company_id}" + elif request.cache_type: + message = f"Cache invalidated for type {request.cache_type}" + else: + message = "All cache invalidated" + + return { + "success": True, + "message": message, + "invalidated_at": datetime.now().isoformat() + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error invalidating cache: {str(e)}") + + +@router.post("/toggle-user") +async def toggle_user_cache( + request: ToggleUserCacheRequest, + current_user: CurrentUser = Depends(get_current_user) +): + """ + Toggle cache per-user + + Permite utilizatorului să activeze/dezactiveze cache-ul pentru el + Folosit pentru A/B testing și comparații de performanță + + Args: + enabled: True pentru activare, False pentru dezactivare + + Returns: + Noul status + """ + try: + cache = get_cache() + if not cache: + raise HTTPException(status_code=503, detail="Cache not initialized") + + await cache.set_user_cache_enabled(current_user.username, request.enabled) + + return { + "success": True, + "username": current_user.username, + "cache_enabled": request.enabled, + "message": f"Cache {'enabled' if request.enabled else 'disabled'} for user {current_user.username}" + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error toggling user cache: {str(e)}") + + +@router.post("/toggle-global") +async def toggle_global_cache( + request: ToggleGlobalCacheRequest, + current_user: CurrentUser = Depends(get_current_user) +): + """ + Toggle cache global (ADMIN only) + + Activează/dezactivează cache-ul la nivel global pentru toți utilizatorii + + Args: + enabled: True pentru activare, False pentru dezactivare + + Returns: + Noul status global + """ + try: + # TODO: Add admin permission check + # For now, allow any authenticated user + + cache = get_cache() + if not cache: + raise HTTPException(status_code=503, detail="Cache not initialized") + + # Update config (NOTE: This is runtime only, .env needs manual update) + cache.config.enabled = request.enabled + + return { + "success": True, + "global_enabled": request.enabled, + "message": f"Cache {'enabled' if request.enabled else 'disabled'} globally", + "note": "This change is runtime only. Update .env CACHE_ENABLED for persistence." + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error toggling global cache: {str(e)}") + + +@router.post("/toggle-auto-invalidate") +async def toggle_auto_invalidation( + request: ToggleAutoInvalidateRequest, + current_user: CurrentUser = Depends(get_current_user) +): + """ + Toggle auto-invalidation monitoring + + Activează/dezactivează monitorizarea automată a {schema}.act + pentru invalidarea cache-ului când se detectează modificări + + Args: + enabled: True pentru activare, False pentru dezactivare + + Returns: + Noul status auto-invalidation + """ + try: + # TODO: Add admin permission check + # For now, allow any authenticated user + + await toggle_event_monitor(request.enabled) + + return { + "success": True, + "auto_invalidate_enabled": request.enabled, + "message": f"Auto-invalidation {'enabled' if request.enabled else 'disabled'}", + "note": "Monitors max(id_act) in {schema}.act tables for changes" + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error toggling auto-invalidation: {str(e)}") + + +@router.get("/health") +async def cache_health(): + """ + Health check pentru sistemul de cache + + Returns: + Status cache, mărime, și uptime + """ + try: + cache = get_cache() + if not cache: + return { + "status": "not_initialized", + "enabled": False + } + + stats = await cache.get_stats() + monitor = get_event_monitor() + + return { + "status": "healthy", + "enabled": cache.config.enabled, + "cache_type": cache.config.cache_type, + "memory_size": stats.get('memory', {}).get('size', 0), + "sqlite_size": stats.get('sqlite', {}).get('active_entries', 0), + "auto_invalidate_running": monitor.running if monitor else False + } + + except Exception as e: + return { + "status": "error", + "error": str(e) + } diff --git a/reports-app/backend/app/routers/companies.py b/reports-app/backend/app/routers/companies.py index 5acd551..4fa7f7d 100644 --- a/reports-app/backend/app/routers/companies.py +++ b/reports-app/backend/app/routers/companies.py @@ -10,6 +10,7 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../shared')) from auth.dependencies import get_current_user from auth.models import CurrentUser from database.oracle_pool import oracle_pool +from ..cache.decorators import cached from pydantic import BaseModel router = APIRouter(redirect_slashes=False) @@ -27,6 +28,72 @@ class CompanyListResponse(BaseModel): companies: List[Company] total_count: int + +@cached(cache_type='companies', key_params=['username']) +async def _get_user_companies_data(username: str) -> List[Company]: + """ + Obține lista companiilor pentru utilizator (CACHED 30 min) + + Helper function cached separate de endpoint pentru a permite caching + """ + companies = [] + + # Obține toate companiile pentru utilizator direct din query-ul complet + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: + try: + # Primul pas: obținem ID-ul utilizatorului din UTILIZATORI + cursor.execute(""" + SELECT ID_UTIL, UTILIZATOR + FROM UTILIZATORI + WHERE UPPER(UTILIZATOR) = :username + """, {'username': username.upper()}) + + user_row = cursor.fetchone() + if not user_row: + print(f"User {username} not found in UTILIZATORI table") + return [] + + user_id = user_row[0] + print(f"Found user {username} with ID: {user_id}") + + # Al doilea pas: obținem TOATE companiile pentru programul 2 + cursor.execute(""" + SELECT A.ID_FIRMA, A.FIRMA, A.SCHEMA, A.COD_FISCAL + FROM V_NOM_FIRME A + WHERE A.ID_FIRMA IN ( + SELECT ID_FIRMA + FROM VDEF_UTIL_FIRME + WHERE ID_PROGRAM = 2 AND ID_UTIL = :user_id + ) + ORDER BY A.FIRMA + """, {'user_id': user_id}) + + companies_rows = cursor.fetchall() + + for row in companies_rows: + id_firma = row[0] + firma_name = row[1] + schema = row[2] + fiscal_code = row[3] # Poate fi NULL + + company = Company( + id_firma=id_firma, + name=firma_name, + schema_name=schema, + fiscal_code=fiscal_code, + is_active=True + ) + companies.append(company) + + print(f"Found {len(companies)} companies for user {username}") + + except Exception as e: + print(f"Eroare la obținerea companiilor din Oracle: {e}") + + return companies + + @router.get("", response_model=CompanyListResponse) @router.get("/", response_model=CompanyListResponse) async def get_user_companies( @@ -39,82 +106,14 @@ async def get_user_companies( print(f"[COMPANIES DEBUG] Request state: user={getattr(request.state, 'user', 'NOT_SET')}, is_authenticated={getattr(request.state, 'is_authenticated', 'NOT_SET')}") print(f"[COMPANIES DEBUG] Authorization header: {request.headers.get('Authorization', 'NOT_SET')}") try: - companies = [] - - # Obține toate companiile pentru utilizator direct din query-ul complet - # Ignorăm lista din JWT și recalculăm direct din Oracle pentru a obține toate cele 63 de companii - async with oracle_pool.get_connection() as connection: - with connection.cursor() as cursor: - try: - # Primul pas: obținem ID-ul utilizatorului din UTILIZATORI - cursor.execute(""" - SELECT ID_UTIL, UTILIZATOR - FROM UTILIZATORI - WHERE UPPER(UTILIZATOR) = :username - """, {'username': current_user.username.upper()}) - - user_row = cursor.fetchone() - if not user_row: - print(f"User {current_user.username} not found in UTILIZATORI table") - return CompanyListResponse(companies=[], total_count=0) - - user_id = user_row[0] - print(f"Found user {current_user.username} with ID: {user_id}") - - # Al doilea pas: obținem TOATE companiile pentru programul 2 - cursor.execute(""" - SELECT A.ID_FIRMA, A.FIRMA, A.SCHEMA, A.COD_FISCAL - FROM V_NOM_FIRME A - WHERE A.ID_FIRMA IN ( - SELECT ID_FIRMA - FROM VDEF_UTIL_FIRME - WHERE ID_PROGRAM = 2 AND ID_UTIL = :user_id - ) - ORDER BY A.FIRMA - """, {'user_id': user_id}) - - companies_rows = cursor.fetchall() - - for row in companies_rows: - id_firma = row[0] - firma_name = row[1] - schema = row[2] - fiscal_code = row[3] # Poate fi NULL - - company = Company( - id_firma=id_firma, - name=firma_name, - schema_name=schema, - fiscal_code=fiscal_code, - is_active=True - ) - companies.append(company) - - print(f"Found {len(companies)} companies for user {current_user.username}") - - except Exception as e: - print(f"Eroare la obținerea companiilor din Oracle: {e}") - # Fallback: folosim lista din JWT dacă query-ul Oracle eșuează - for company_id in current_user.companies: - try: - id_firma = int(company_id) - company = Company( - id_firma=id_firma, - name=f"Company {id_firma}", - schema_name="", - fiscal_code="", - is_active=True - ) - companies.append(company) - except ValueError: - # Skip invalid company IDs - continue - + # Call cached helper function + companies = await _get_user_companies_data(current_user.username) + return CompanyListResponse( companies=companies, total_count=len(companies) ) - + except Exception as e: raise HTTPException(status_code=500, detail=f"Eroare la obținerea listei de firme: {str(e)}") diff --git a/reports-app/backend/app/services/dashboard_service.py b/reports-app/backend/app/services/dashboard_service.py index 83983b8..9800330 100644 --- a/reports-app/backend/app/services/dashboard_service.py +++ b/reports-app/backend/app/services/dashboard_service.py @@ -4,34 +4,54 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../shared')) from database.oracle_pool import oracle_pool from ..models.dashboard import DashboardSummary, TreasuryAccount, TrendData +from ..cache.decorators import cached from decimal import Decimal -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from datetime import datetime, timedelta +from fastapi import Request import logging logger = logging.getLogger(__name__) class DashboardService: """Service pentru dashboard - date agregate""" - + @staticmethod - async def get_complete_summary(company: str, username: str) -> DashboardSummary: + @cached(cache_type='schema', key_params=['company_id']) + async def _get_schema(company_id: int) -> str: """ - Obține toate datele pentru dashboard într-un singur apel - Execută 2 query-uri separate: facturi și trezorerie + Obține schema pentru company_id (CACHED PERMANENT) + + CRITICAL: Acest query este cel mai frecvent - executat la FIECARE request API. + Cache permanent reduce queries cu 99.99%. """ async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: - # Obține schema - company_id = int(company) - schema_query = "SELECT schema FROM CONTAFIN_ORACLE.v_nom_firme WHERE id_firma = :company_id" + schema_query = """ + SELECT schema + FROM CONTAFIN_ORACLE.v_nom_firme + WHERE id_firma = :company_id + """ cursor.execute(schema_query, {'company_id': company_id}) schema_result = cursor.fetchone() - + if not schema_result: - raise ValueError(f"Schema nu a fost găsită pentru id_firma {company_id}") - - schema = schema_result[0] + raise ValueError(f"Schema not found for company {company_id}") + + return schema_result[0] + + @staticmethod + @cached(cache_type='dashboard_summary', key_params=['company', 'username']) + async def get_complete_summary(company: str, username: str, request: Optional[Request] = None) -> DashboardSummary: + """ + Obține toate datele pentru dashboard într-un singur apel (CACHED 30 min) + Execută 2 query-uri separate: facturi și trezorerie + """ + company_id = int(company) + schema = await DashboardService._get_schema(company_id) + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: # Query 1: Statistici facturi cu breakdown pe perioade - FIXED ORA-00937 facturi_query = f""" @@ -449,24 +469,14 @@ class DashboardService: ) @staticmethod - async def get_trends(company_id: int, period: str = "12m") -> Dict[str, Any]: - """Get comprehensive trend analysis data for all dashboard indicators""" + @cached(cache_type='dashboard_trends', key_params=['company_id', 'period']) + async def get_trends(company_id: int, period: str = "12m", request: Optional[Request] = None) -> Dict[str, Any]: + """Get comprehensive trend analysis data for all dashboard indicators (CACHED 30 min)""" try: + schema = await DashboardService._get_schema(company_id) + async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: - # Get schema for company - schema_query = """ - SELECT schema - FROM CONTAFIN_ORACLE.v_nom_firme - WHERE id_firma = :company_id - """ - cursor.execute(schema_query, company_id=company_id) - schema_result = cursor.fetchone() - - if not schema_result: - raise ValueError(f"Schema not found for company {company_id}") - - schema = schema_result[0] # Get current period current_period_query = f""" @@ -1222,9 +1232,10 @@ class DashboardService: raise @staticmethod - async def get_maturity_analysis(company_id: int, period: str = "7d") -> Dict[str, Any]: + @cached(cache_type='maturity_analysis', key_params=['company_id', 'period']) + async def get_maturity_analysis(company_id: int, period: str = "7d", request: Optional[Request] = None) -> Dict[str, Any]: """ - Analizează scadențele clienți vs furnizori cu date reale din Oracle + Analizează scadențele clienți vs furnizori cu date reale din Oracle (CACHED 30 min) Args: company_id: ID-ul companiei @@ -1495,9 +1506,10 @@ class DashboardService: raise @staticmethod - async def get_treasury_breakdown(company: int) -> Dict[str, Any]: + @cached(cache_type='treasury_breakdown', key_params=['company']) + async def get_treasury_breakdown(company: int, request: Optional[Request] = None) -> Dict[str, Any]: """ - Obține breakdown-ul trezoreriei pe casă și bancă + Obține breakdown-ul trezoreriei pe casă și bancă (CACHED 30 min) """ try: async with oracle_pool.get_connection() as connection: @@ -1595,9 +1607,10 @@ class DashboardService: raise @staticmethod - async def get_net_balance_breakdown(company: int) -> Dict[str, Any]: + @cached(cache_type='net_balance_breakdown', key_params=['company']) + async def get_net_balance_breakdown(company: int, request: Optional[Request] = None) -> Dict[str, Any]: """ - Obține breakdown-ul balanței nete pe clienți și furnizori cu detaliere pe perioade + Obține breakdown-ul balanței nete pe clienți și furnizori cu detaliere pe perioade (CACHED 30 min) """ try: async with oracle_pool.get_connection() as connection: diff --git a/reports-app/backend/app/services/invoice_service.py b/reports-app/backend/app/services/invoice_service.py index 5f79ff4..94841da 100644 --- a/reports-app/backend/app/services/invoice_service.py +++ b/reports-app/backend/app/services/invoice_service.py @@ -8,6 +8,7 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../shared')) from database.oracle_pool import oracle_pool from typing import List, Tuple from ..models.invoice import Invoice, InvoiceFilter, InvoiceListResponse, InvoiceSummary +from ..cache.decorators import cached from decimal import Decimal import logging @@ -15,24 +16,37 @@ logger = logging.getLogger(__name__) class InvoiceService: """Service pentru gestionarea facturilor""" - + @staticmethod - async def get_invoices(filter_params: InvoiceFilter, username: str) -> InvoiceListResponse: - """ - Obține lista de facturi - Query simplu pentru afișare în tabel - """ + @cached(cache_type='schema', key_params=['company_id']) + async def _get_schema(company_id: int) -> str: + """Obține schema pentru company_id (CACHED PERMANENT)""" async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: - # Obține schema din v_nom_firme bazat pe id_firma - company_id = int(filter_params.company) - schema_query = "SELECT schema FROM CONTAFIN_ORACLE.v_nom_firme WHERE id_firma = :company_id" + schema_query = """ + SELECT schema + FROM CONTAFIN_ORACLE.v_nom_firme + WHERE id_firma = :company_id + """ cursor.execute(schema_query, {'company_id': company_id}) schema_result = cursor.fetchone() - + if not schema_result: - raise ValueError(f"Schema nu a fost găsită pentru id_firma {company_id}") - - schema = schema_result[0] + raise ValueError(f"Schema not found for company {company_id}") + + return schema_result[0] + + @staticmethod + @cached(cache_type='invoices', key_params=['filter_params', 'username']) + async def get_invoices(filter_params: InvoiceFilter, username: str) -> InvoiceListResponse: + """ + Obține lista de facturi - Query simplu pentru afișare în tabel (CACHED 10 min) + """ + company_id = int(filter_params.company) + schema = await InvoiceService._get_schema(company_id) + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: # Determină conturile în funcție de partner_type if filter_params.partner_type == "CLIENTI": diff --git a/reports-app/backend/app/services/treasury_service.py b/reports-app/backend/app/services/treasury_service.py index 5e87959..76c015c 100644 --- a/reports-app/backend/app/services/treasury_service.py +++ b/reports-app/backend/app/services/treasury_service.py @@ -4,6 +4,7 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../shared')) from database.oracle_pool import oracle_pool from ..models.treasury import BankCashRegister, RegisterFilter, RegisterListResponse +from ..cache.decorators import cached from decimal import Decimal import logging @@ -11,24 +12,37 @@ logger = logging.getLogger(__name__) class TreasuryService: """Service pentru trezorerie - registru casă și bancă""" - + @staticmethod - async def get_bank_cash_register(filter_params: RegisterFilter, username: str) -> RegisterListResponse: - """ - Obține registrul de casă și bancă din vbancasa views - """ + @cached(cache_type='schema', key_params=['company_id']) + async def _get_schema(company_id: int) -> str: + """Obține schema pentru company_id (CACHED PERMANENT)""" async with oracle_pool.get_connection() as connection: with connection.cursor() as cursor: - # Obține schema - company_id = int(filter_params.company) - schema_query = "SELECT schema FROM CONTAFIN_ORACLE.v_nom_firme WHERE id_firma = :company_id" + schema_query = """ + SELECT schema + FROM CONTAFIN_ORACLE.v_nom_firme + WHERE id_firma = :company_id + """ cursor.execute(schema_query, {'company_id': company_id}) schema_result = cursor.fetchone() - + if not schema_result: - raise ValueError(f"Schema nu a fost găsită pentru id_firma {company_id}") - - schema = schema_result[0] + raise ValueError(f"Schema not found for company {company_id}") + + return schema_result[0] + + @staticmethod + @cached(cache_type='treasury', key_params=['filter_params', 'username']) + async def get_bank_cash_register(filter_params: RegisterFilter, username: str) -> RegisterListResponse: + """ + Obține registrul de casă și bancă din vbancasa views (CACHED 10 min) + """ + company_id = int(filter_params.company) + schema = await TreasuryService._get_schema(company_id) + + async with oracle_pool.get_connection() as connection: + with connection.cursor() as cursor: # Query pentru registrele de bancă și casă union_queries = [] diff --git a/reports-app/backend/requirements.txt b/reports-app/backend/requirements.txt index 10127e7..c91bf77 100644 --- a/reports-app/backend/requirements.txt +++ b/reports-app/backend/requirements.txt @@ -5,9 +5,11 @@ pydantic>=2.5.0 python-jose[cryptography]>=3.3.0 PyJWT>=2.8.0 python-decouple>=3.8 +python-dotenv>=1.0.0 oracledb>=1.4.0 python-dateutil>=2.8.2 openpyxl>=3.1.0 fpdf2>=2.7.0 email-validator>=2.0.0 httpx>=0.27.0 +aiosqlite>=0.19.0 diff --git a/reports-app/frontend/src/App.vue b/reports-app/frontend/src/App.vue index d85e948..97d7a37 100644 --- a/reports-app/frontend/src/App.vue +++ b/reports-app/frontend/src/App.vue @@ -1,34 +1,18 @@