- Email: process + send tools - Security: documentație securizare Clawdbot - KB: coaching, youtube notes (Monica Ion, ClawdBot 10x) - Reflecții: audit relații/bani, pattern 'nu merit', dizolvare vină - Insights: 2026-02-01 + backlog + content recomandat - Memory: heartbeat state, reguli comunicare
239 lines
7.1 KiB
Python
Executable File
239 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Process emails from echo@romfast.ro inbox.
|
|
Saves emails as notes in kb/emails/ for further insight extraction.
|
|
|
|
Usage:
|
|
python3 email_process.py # List unread emails
|
|
python3 email_process.py --save # Save unread emails as notes
|
|
python3 email_process.py --all # List all emails
|
|
"""
|
|
|
|
import imaplib
|
|
import email
|
|
import os
|
|
import sys
|
|
import re
|
|
import json
|
|
from email.header import decode_header
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Load .env
|
|
env_path = Path(__file__).parent.parent / '.env'
|
|
if env_path.exists():
|
|
with open(env_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and '=' in line:
|
|
key, value = line.split('=', 1)
|
|
os.environ.setdefault(key, value)
|
|
|
|
# Config
|
|
IMAP_SERVER = os.environ.get('EMAIL_SERVER', 'mail.romfast.ro')
|
|
IMAP_PORT = 993
|
|
IMAP_USER = os.environ.get('EMAIL_USER', 'echo@romfast.ro')
|
|
IMAP_PASS = os.environ.get('EMAIL_PASSWORD', '')
|
|
|
|
# Whitelist - only process emails from these addresses
|
|
WHITELIST = [
|
|
'mmarius28@gmail.com',
|
|
'marius.mutu@romfast.ro',
|
|
]
|
|
|
|
KB_PATH = Path(__file__).parent.parent / 'kb' / 'emails'
|
|
|
|
def slugify(text: str, max_len: int = 50) -> str:
|
|
"""Convert text to URL-friendly slug"""
|
|
text = text.lower()
|
|
text = re.sub(r'[^\w\s-]', '', text)
|
|
text = re.sub(r'[\s_]+', '-', text)
|
|
text = re.sub(r'-+', '-', text).strip('-')
|
|
return text[:max_len]
|
|
|
|
def decode_mime_header(header):
|
|
"""Decode MIME encoded header"""
|
|
if not header:
|
|
return ""
|
|
decoded_parts = []
|
|
for part, encoding in decode_header(header):
|
|
if isinstance(part, bytes):
|
|
decoded_parts.append(part.decode(encoding or 'utf-8', errors='replace'))
|
|
else:
|
|
decoded_parts.append(part)
|
|
return ' '.join(decoded_parts)
|
|
|
|
def get_email_body(msg):
|
|
"""Extract plain text body from email"""
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
if content_type == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or 'utf-8'
|
|
body = payload.decode(charset, errors='replace')
|
|
break
|
|
elif content_type == "text/html" and not body:
|
|
# Fallback to HTML if no plain text
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or 'utf-8'
|
|
body = payload.decode(charset, errors='replace')
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or 'utf-8'
|
|
body = payload.decode(charset, errors='replace')
|
|
return body.strip()
|
|
|
|
def extract_sender_email(from_header: str) -> str:
|
|
"""Extract just the email address from From header"""
|
|
match = re.search(r'<([^>]+)>', from_header)
|
|
if match:
|
|
return match.group(1).lower()
|
|
return from_header.lower().strip()
|
|
|
|
def list_emails(show_all=False):
|
|
"""List emails in inbox"""
|
|
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
|
|
mail.login(IMAP_USER, IMAP_PASS)
|
|
mail.select('INBOX')
|
|
|
|
search_criteria = 'ALL' if show_all else 'UNSEEN'
|
|
status, messages = mail.search(None, search_criteria)
|
|
|
|
email_ids = messages[0].split() if messages[0] else []
|
|
emails = []
|
|
|
|
for eid in email_ids:
|
|
status, data = mail.fetch(eid, '(RFC822)')
|
|
msg = email.message_from_bytes(data[0][1])
|
|
|
|
from_addr = decode_mime_header(msg['From'])
|
|
sender_email = extract_sender_email(from_addr)
|
|
subject = decode_mime_header(msg['Subject'])
|
|
date = msg['Date']
|
|
|
|
emails.append({
|
|
'id': eid.decode(),
|
|
'from': from_addr,
|
|
'sender_email': sender_email,
|
|
'subject': subject,
|
|
'date': date,
|
|
'whitelisted': sender_email in WHITELIST
|
|
})
|
|
|
|
mail.logout()
|
|
return emails
|
|
|
|
def save_email_as_note(eid: str) -> dict:
|
|
"""Save a single email as a markdown note"""
|
|
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
|
|
mail.login(IMAP_USER, IMAP_PASS)
|
|
mail.select('INBOX')
|
|
|
|
status, data = mail.fetch(eid.encode(), '(RFC822)')
|
|
msg = email.message_from_bytes(data[0][1])
|
|
|
|
from_addr = decode_mime_header(msg['From'])
|
|
sender_email = extract_sender_email(from_addr)
|
|
subject = decode_mime_header(msg['Subject'])
|
|
date_str = msg['Date']
|
|
body = get_email_body(msg)
|
|
|
|
# Check whitelist
|
|
if sender_email not in WHITELIST:
|
|
mail.logout()
|
|
return {'ok': False, 'error': f'Sender {sender_email} not in whitelist'}
|
|
|
|
# Parse date
|
|
try:
|
|
# Try common date formats
|
|
for fmt in ['%a, %d %b %Y %H:%M:%S %z', '%d %b %Y %H:%M:%S %z']:
|
|
try:
|
|
parsed_date = datetime.strptime(date_str.split(' (')[0].strip(), fmt)
|
|
break
|
|
except:
|
|
continue
|
|
else:
|
|
parsed_date = datetime.now()
|
|
except:
|
|
parsed_date = datetime.now()
|
|
|
|
date_prefix = parsed_date.strftime('%Y-%m-%d')
|
|
slug = slugify(subject) or 'email'
|
|
filename = f"{date_prefix}_{slug}.md"
|
|
filepath = KB_PATH / filename
|
|
|
|
# Create markdown note
|
|
content = f"""# {subject}
|
|
|
|
**De la:** {from_addr}
|
|
**Data:** {date_str}
|
|
**Salvat:** {datetime.now().strftime('%Y-%m-%d %H:%M')}
|
|
|
|
---
|
|
|
|
{body}
|
|
|
|
---
|
|
|
|
## TL;DR
|
|
<!-- Echo: completează cu rezumat -->
|
|
|
|
## Insights
|
|
<!-- Echo: extrage idei acționabile cu tag-uri @work @health @growth etc -->
|
|
"""
|
|
|
|
KB_PATH.mkdir(parents=True, exist_ok=True)
|
|
filepath.write_text(content, encoding='utf-8')
|
|
|
|
# Mark as seen
|
|
mail.store(eid.encode(), '+FLAGS', '\\Seen')
|
|
mail.logout()
|
|
|
|
return {
|
|
'ok': True,
|
|
'file': str(filepath),
|
|
'subject': subject,
|
|
'from': sender_email
|
|
}
|
|
|
|
def save_unread_emails():
|
|
"""Save all unread whitelisted emails as notes"""
|
|
emails = list_emails(show_all=False)
|
|
results = []
|
|
|
|
for em in emails:
|
|
if em['whitelisted']:
|
|
result = save_email_as_note(em['id'])
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
if __name__ == "__main__":
|
|
if '--save' in sys.argv:
|
|
results = save_unread_emails()
|
|
for r in results:
|
|
if r['ok']:
|
|
print(f"✅ Salvat: {r['file']}")
|
|
else:
|
|
print(f"❌ Eroare: {r['error']}")
|
|
if not results:
|
|
print("Niciun email nou de la adrese whitelisted.")
|
|
else:
|
|
show_all = '--all' in sys.argv
|
|
emails = list_emails(show_all=show_all)
|
|
|
|
if not emails:
|
|
print("Inbox gol." if show_all else "Niciun email necitit.")
|
|
else:
|
|
for em in emails:
|
|
wl = "✅" if em['whitelisted'] else "⚠️"
|
|
print(f"{wl} [{em['id']}] {em['subject']}")
|
|
print(f" De la: {em['from']}")
|
|
print(f" Data: {em['date']}")
|
|
print()
|