Files
clawd/tools/email_process.py
Echo 10fb3d6fb5 refactor: mutat kb/ -> memory/kb/ pentru memory search
- Mutat toate fișierele din kb/ în memory/kb/
- Actualizat toate referințele în fișiere (.md, .py, .html)
- Actualizat 10 joburi cron cu noi căi
- Memory search indexează acum 58 fișiere din memory/
- TOOLS.md actualizat cu documentație completă
2026-02-01 21:18:45 +00:00

239 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Process emails from echo@romfast.ro inbox.
Saves emails as notes in memory/kb/emails/ for further insight extraction.
Usage:
python3 email_process.py # List unread emails
python3 email_process.py --save # Save unread emails as notes
python3 email_process.py --all # List all emails
"""
import imaplib
import email
import os
import sys
import re
import json
from email.header import decode_header
from datetime import datetime
from pathlib import Path
# Load .env
env_path = Path(__file__).parent.parent / '.env'
if env_path.exists():
with open(env_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ.setdefault(key, value)
# Config
IMAP_SERVER = os.environ.get('EMAIL_SERVER', 'mail.romfast.ro')
IMAP_PORT = 993
IMAP_USER = os.environ.get('EMAIL_USER', 'echo@romfast.ro')
IMAP_PASS = os.environ.get('EMAIL_PASSWORD', '')
# Whitelist - only process emails from these addresses
WHITELIST = [
'mmarius28@gmail.com',
'marius.mutu@romfast.ro',
]
KB_PATH = Path(__file__).parent.parent / 'kb' / 'emails'
def slugify(text: str, max_len: int = 50) -> str:
"""Convert text to URL-friendly slug"""
text = text.lower()
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[\s_]+', '-', text)
text = re.sub(r'-+', '-', text).strip('-')
return text[:max_len]
def decode_mime_header(header):
"""Decode MIME encoded header"""
if not header:
return ""
decoded_parts = []
for part, encoding in decode_header(header):
if isinstance(part, bytes):
decoded_parts.append(part.decode(encoding or 'utf-8', errors='replace'))
else:
decoded_parts.append(part)
return ' '.join(decoded_parts)
def get_email_body(msg):
"""Extract plain text body from email"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
break
elif content_type == "text/html" and not body:
# Fallback to HTML if no plain text
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
return body.strip()
def extract_sender_email(from_header: str) -> str:
"""Extract just the email address from From header"""
match = re.search(r'<([^>]+)>', from_header)
if match:
return match.group(1).lower()
return from_header.lower().strip()
def list_emails(show_all=False):
"""List emails in inbox"""
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
mail.login(IMAP_USER, IMAP_PASS)
mail.select('INBOX')
search_criteria = 'ALL' if show_all else 'UNSEEN'
status, messages = mail.search(None, search_criteria)
email_ids = messages[0].split() if messages[0] else []
emails = []
for eid in email_ids:
status, data = mail.fetch(eid, '(RFC822)')
msg = email.message_from_bytes(data[0][1])
from_addr = decode_mime_header(msg['From'])
sender_email = extract_sender_email(from_addr)
subject = decode_mime_header(msg['Subject'])
date = msg['Date']
emails.append({
'id': eid.decode(),
'from': from_addr,
'sender_email': sender_email,
'subject': subject,
'date': date,
'whitelisted': sender_email in WHITELIST
})
mail.logout()
return emails
def save_email_as_note(eid: str) -> dict:
"""Save a single email as a markdown note"""
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
mail.login(IMAP_USER, IMAP_PASS)
mail.select('INBOX')
status, data = mail.fetch(eid.encode(), '(RFC822)')
msg = email.message_from_bytes(data[0][1])
from_addr = decode_mime_header(msg['From'])
sender_email = extract_sender_email(from_addr)
subject = decode_mime_header(msg['Subject'])
date_str = msg['Date']
body = get_email_body(msg)
# Check whitelist
if sender_email not in WHITELIST:
mail.logout()
return {'ok': False, 'error': f'Sender {sender_email} not in whitelist'}
# Parse date
try:
# Try common date formats
for fmt in ['%a, %d %b %Y %H:%M:%S %z', '%d %b %Y %H:%M:%S %z']:
try:
parsed_date = datetime.strptime(date_str.split(' (')[0].strip(), fmt)
break
except:
continue
else:
parsed_date = datetime.now()
except:
parsed_date = datetime.now()
date_prefix = parsed_date.strftime('%Y-%m-%d')
slug = slugify(subject) or 'email'
filename = f"{date_prefix}_{slug}.md"
filepath = KB_PATH / filename
# Create markdown note
content = f"""# {subject}
**De la:** {from_addr}
**Data:** {date_str}
**Salvat:** {datetime.now().strftime('%Y-%m-%d %H:%M')}
---
{body}
---
## TL;DR
<!-- Echo: completează cu rezumat -->
## Insights
<!-- Echo: extrage idei acționabile cu tag-uri @work @health @growth etc -->
"""
KB_PATH.mkdir(parents=True, exist_ok=True)
filepath.write_text(content, encoding='utf-8')
# Mark as seen
mail.store(eid.encode(), '+FLAGS', '\\Seen')
mail.logout()
return {
'ok': True,
'file': str(filepath),
'subject': subject,
'from': sender_email
}
def save_unread_emails():
"""Save all unread whitelisted emails as notes"""
emails = list_emails(show_all=False)
results = []
for em in emails:
if em['whitelisted']:
result = save_email_as_note(em['id'])
results.append(result)
return results
if __name__ == "__main__":
if '--save' in sys.argv:
results = save_unread_emails()
for r in results:
if r['ok']:
print(f"✅ Salvat: {r['file']}")
else:
print(f"❌ Eroare: {r['error']}")
if not results:
print("Niciun email nou de la adrese whitelisted.")
else:
show_all = '--all' in sys.argv
emails = list_emails(show_all=show_all)
if not emails:
print("Inbox gol." if show_all else "Niciun email necitit.")
else:
for em in emails:
wl = "" if em['whitelisted'] else "⚠️"
print(f"{wl} [{em['id']}] {em['subject']}")
print(f" De la: {em['from']}")
print(f" Data: {em['date']}")
print()