Files
echo-core/tools/email_check.py
Marius Mutu 19e253ec43 feat(heartbeat): save emails to KB + fix memory symlink access
- heartbeat saves unread whitelisted emails via email_process --save --json
- fix: add --add-dir so Claude CLI subprocess can access memory/ symlink
- email_check/process: use BODY.PEEK[] to avoid marking emails as read
- email_process: simplify credential loading via credential_store only
- config: heartbeat interval 30→120min, quiet hours end 08→07

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 12:10:53 +00:00

110 lines
3.2 KiB
Python

#!/usr/bin/env python3
"""
IMAP inbox checker for moltbot@romfast.ro
Returns unread emails as JSON
"""
import imaplib
import email
from email.header import decode_header
import json
import sys
from datetime import datetime
from pathlib import Path
# Try keyring first
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from src.credential_store import get_secret
except ImportError:
get_secret = lambda name: None
# IMAP Configuration
IMAP_SERVER = get_secret("email_server") or "mail.romfast.ro"
IMAP_PORT = 993
IMAP_USER = get_secret("email_user") or "echo@romfast.ro"
IMAP_PASS = get_secret("email_password") or ""
def decode_mime_header(header):
"""Decode MIME encoded header"""
if not header:
return ""
decoded = decode_header(header)
result = []
for part, encoding in decoded:
if isinstance(part, bytes):
result.append(part.decode(encoding or 'utf-8', errors='replace'))
else:
result.append(part)
return ''.join(result)
def get_email_body(msg):
"""Extract email body text"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
body = part.get_payload(decode=True).decode('utf-8', errors='replace')
break
except:
pass
else:
try:
body = msg.get_payload(decode=True).decode('utf-8', errors='replace')
except:
pass
return body[:2000] # Limit body length
def check_inbox(unread_only=True, limit=10):
"""Check inbox and return emails"""
try:
# Connect to IMAP
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
mail.login(IMAP_USER, IMAP_PASS)
mail.select("INBOX")
# Search for emails
criteria = "UNSEEN" if unread_only else "ALL"
status, messages = mail.search(None, criteria)
if status != "OK":
return {"ok": False, "error": "Search failed"}
email_ids = messages[0].split()
email_ids = email_ids[-limit:] # Get last N
emails = []
for eid in reversed(email_ids): # Newest first
status, msg_data = mail.fetch(eid, "(BODY.PEEK[])")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
emails.append({
"id": eid.decode(),
"from": decode_mime_header(msg["From"]),
"subject": decode_mime_header(msg["Subject"]),
"date": msg["Date"],
"body_preview": get_email_body(msg)[:500]
})
mail.logout()
return {
"ok": True,
"unread_count": len(emails),
"emails": emails
}
except Exception as e:
return {"ok": False, "error": str(e)}
if __name__ == "__main__":
unread = "--all" not in sys.argv
result = check_inbox(unread_only=unread)
print(json.dumps(result, indent=2, ensure_ascii=False))