#!/usr/bin/env python3 """ Process emails from echo@romfast.ro inbox. Saves emails as notes in kb/emails/ for further insight extraction. Usage: python3 email_process.py # List unread emails python3 email_process.py --save # Save unread emails as notes python3 email_process.py --all # List all emails """ import imaplib import email import os import sys import re import json from email.header import decode_header from datetime import datetime from pathlib import Path # Load .env env_path = Path(__file__).parent.parent / '.env' if env_path.exists(): with open(env_path) as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ.setdefault(key, value) # Config IMAP_SERVER = os.environ.get('EMAIL_SERVER', 'mail.romfast.ro') IMAP_PORT = 993 IMAP_USER = os.environ.get('EMAIL_USER', 'echo@romfast.ro') IMAP_PASS = os.environ.get('EMAIL_PASSWORD', '') # Whitelist - only process emails from these addresses WHITELIST = [ 'mmarius28@gmail.com', 'marius.mutu@romfast.ro', ] KB_PATH = Path(__file__).parent.parent / 'kb' / 'emails' def slugify(text: str, max_len: int = 50) -> str: """Convert text to URL-friendly slug""" text = text.lower() text = re.sub(r'[^\w\s-]', '', text) text = re.sub(r'[\s_]+', '-', text) text = re.sub(r'-+', '-', text).strip('-') return text[:max_len] def decode_mime_header(header): """Decode MIME encoded header""" if not header: return "" decoded_parts = [] for part, encoding in decode_header(header): if isinstance(part, bytes): decoded_parts.append(part.decode(encoding or 'utf-8', errors='replace')) else: decoded_parts.append(part) return ' '.join(decoded_parts) def get_email_body(msg): """Extract plain text body from email""" body = "" if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or 'utf-8' body = payload.decode(charset, errors='replace') break elif content_type == "text/html" and not body: # Fallback to HTML if no plain text payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or 'utf-8' body = payload.decode(charset, errors='replace') else: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or 'utf-8' body = payload.decode(charset, errors='replace') return body.strip() def extract_sender_email(from_header: str) -> str: """Extract just the email address from From header""" match = re.search(r'<([^>]+)>', from_header) if match: return match.group(1).lower() return from_header.lower().strip() def list_emails(show_all=False): """List emails in inbox""" mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) mail.login(IMAP_USER, IMAP_PASS) mail.select('INBOX') search_criteria = 'ALL' if show_all else 'UNSEEN' status, messages = mail.search(None, search_criteria) email_ids = messages[0].split() if messages[0] else [] emails = [] for eid in email_ids: status, data = mail.fetch(eid, '(RFC822)') msg = email.message_from_bytes(data[0][1]) from_addr = decode_mime_header(msg['From']) sender_email = extract_sender_email(from_addr) subject = decode_mime_header(msg['Subject']) date = msg['Date'] emails.append({ 'id': eid.decode(), 'from': from_addr, 'sender_email': sender_email, 'subject': subject, 'date': date, 'whitelisted': sender_email in WHITELIST }) mail.logout() return emails def save_email_as_note(eid: str) -> dict: """Save a single email as a markdown note""" mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) mail.login(IMAP_USER, IMAP_PASS) mail.select('INBOX') status, data = mail.fetch(eid.encode(), '(RFC822)') msg = email.message_from_bytes(data[0][1]) from_addr = decode_mime_header(msg['From']) sender_email = extract_sender_email(from_addr) subject = decode_mime_header(msg['Subject']) date_str = msg['Date'] body = get_email_body(msg) # Check whitelist if sender_email not in WHITELIST: mail.logout() return {'ok': False, 'error': f'Sender {sender_email} not in whitelist'} # Parse date try: # Try common date formats for fmt in ['%a, %d %b %Y %H:%M:%S %z', '%d %b %Y %H:%M:%S %z']: try: parsed_date = datetime.strptime(date_str.split(' (')[0].strip(), fmt) break except: continue else: parsed_date = datetime.now() except: parsed_date = datetime.now() date_prefix = parsed_date.strftime('%Y-%m-%d') slug = slugify(subject) or 'email' filename = f"{date_prefix}_{slug}.md" filepath = KB_PATH / filename # Create markdown note content = f"""# {subject} **De la:** {from_addr} **Data:** {date_str} **Salvat:** {datetime.now().strftime('%Y-%m-%d %H:%M')} --- {body} --- ## TL;DR ## Insights """ KB_PATH.mkdir(parents=True, exist_ok=True) filepath.write_text(content, encoding='utf-8') # Mark as seen mail.store(eid.encode(), '+FLAGS', '\\Seen') mail.logout() return { 'ok': True, 'file': str(filepath), 'subject': subject, 'from': sender_email } def save_unread_emails(): """Save all unread whitelisted emails as notes""" emails = list_emails(show_all=False) results = [] for em in emails: if em['whitelisted']: result = save_email_as_note(em['id']) results.append(result) return results if __name__ == "__main__": if '--save' in sys.argv: results = save_unread_emails() for r in results: if r['ok']: print(f"✅ Salvat: {r['file']}") else: print(f"❌ Eroare: {r['error']}") if not results: print("Niciun email nou de la adrese whitelisted.") else: show_all = '--all' in sys.argv emails = list_emails(show_all=show_all) if not emails: print("Inbox gol." if show_all else "Niciun email necitit.") else: for em in emails: wl = "✅" if em['whitelisted'] else "⚠️" print(f"{wl} [{em['id']}] {em['subject']}") print(f" De la: {em['from']}") print(f" Data: {em['date']}") print()