"""WhatsApp adapter for Echo Core — connects to Node.js bridge.""" import asyncio import logging import httpx from src.config import Config from src.router import route_message from src.claude_session import clear_session, get_active_session log = logging.getLogger("echo-core.whatsapp") _security_log = logging.getLogger("echo-core.security") # Module-level config reference, set by run_whatsapp() _config: Config | None = None _bridge_url: str = "http://127.0.0.1:8098" _running: bool = False VALID_MODELS = {"opus", "sonnet", "haiku"} def _get_config() -> Config: """Return the module-level config, raising if not initialized.""" if _config is None: raise RuntimeError("WhatsApp adapter not initialized — call run_whatsapp() first") return _config # --- Authorization helpers --- def is_owner(phone: str) -> bool: """Check if phone number matches config whatsapp.owner.""" owner = _get_config().get("whatsapp.owner") return phone == str(owner) if owner else False def is_admin(phone: str) -> bool: """Check if phone number is owner or in whatsapp admins list.""" if is_owner(phone): return True admins = _get_config().get("whatsapp.admins", []) return phone in admins def is_registered_chat(chat_id: str) -> bool: """Check if a WhatsApp chat is in any registered channel entry.""" channels = _get_config().get("whatsapp_channels", {}) return any(ch.get("id") == chat_id for ch in channels.values()) # --- Message splitting helper --- def split_message(text: str, limit: int = 4096) -> list[str]: """Split text into chunks that fit WhatsApp's message limit.""" if len(text) <= limit: return [text] chunks = [] while text: if len(text) <= limit: chunks.append(text) break split_at = text.rfind("\n", 0, limit) if split_at == -1: split_at = limit chunks.append(text[:split_at]) text = text[split_at:].lstrip("\n") return chunks # --- Bridge communication --- async def poll_messages(client: httpx.AsyncClient) -> list[dict]: """Poll bridge for new messages.""" try: resp = await client.get(f"{_bridge_url}/messages", timeout=10) if resp.status_code == 200: data = resp.json() return data.get("messages", []) except Exception as e: log.debug("Bridge poll error: %s", e) return [] async def send_whatsapp(client: httpx.AsyncClient, to: str, text: str) -> bool: """Send a message via the bridge.""" try: for chunk in split_message(text): resp = await client.post( f"{_bridge_url}/send", json={"to": to, "text": chunk}, timeout=30, ) if resp.status_code != 200 or not resp.json().get("ok"): log.error("Failed to send to %s: %s", to, resp.text) return False return True except Exception as e: log.error("Send error: %s", e) return False async def get_bridge_status(client: httpx.AsyncClient) -> dict | None: """Get bridge connection status.""" try: resp = await client.get(f"{_bridge_url}/status", timeout=5) if resp.status_code == 200: return resp.json() except Exception: pass return None # --- Message handler --- async def handle_incoming(msg: dict, client: httpx.AsyncClient) -> None: """Process a single incoming WhatsApp message.""" sender = msg.get("from", "") text = msg.get("text", "").strip() push_name = msg.get("pushName", "unknown") is_group = msg.get("isGroup", False) if not text: return # Group chat: only registered chats if is_group: group_jid = sender # group JID like 123456@g.us if not is_registered_chat(group_jid): return # Use group JID as channel ID channel_id = f"wa-{group_jid.split('@')[0]}" else: # Private chat: check admin phone = sender.split("@")[0] if not is_admin(phone): _security_log.warning( "Unauthorized WhatsApp DM from %s (%s): %.100s", phone, push_name, text, ) return channel_id = f"wa-{phone}" # Handle slash commands locally for immediate response if text.startswith("/"): cmd = text.split()[0].lower() if cmd == "/clear": cleared = clear_session(channel_id) reply = "Session cleared." if cleared else "No active session." await send_whatsapp(client, sender, reply) return if cmd == "/status": session = get_active_session(channel_id) if session: model = session.get("model", "?") sid = session.get("session_id", "?")[:8] count = session.get("message_count", 0) in_tok = session.get("total_input_tokens", 0) out_tok = session.get("total_output_tokens", 0) reply = ( f"Model: {model}\n" f"Session: {sid}\n" f"Messages: {count}\n" f"Tokens: {in_tok} in / {out_tok} out" ) else: reply = "No active session." await send_whatsapp(client, sender, reply) return # Identify sender for logging/routing participant = msg.get("participant") or sender user_id = participant.split("@")[0] # Route to Claude via router (handles /model and regular messages) log.info("Message from %s (%s): %.50s", user_id, push_name, text) try: response, _is_cmd = await asyncio.to_thread( route_message, channel_id, user_id, text ) await send_whatsapp(client, sender, response) except Exception as e: log.error("Error handling message from %s: %s", user_id, e) await send_whatsapp(client, sender, "Sorry, an error occurred.") # --- Main loop --- async def run_whatsapp(config: Config, bridge_url: str = "http://127.0.0.1:8098"): """Main WhatsApp polling loop.""" global _config, _bridge_url, _running _config = config _bridge_url = bridge_url _running = True log.info("WhatsApp adapter starting (bridge: %s)", bridge_url) async with httpx.AsyncClient() as client: # Wait for bridge to be ready retries = 0 while _running and retries < 30: status = await get_bridge_status(client) if status: if status.get("connected"): log.info("WhatsApp bridge connected (phone: %s)", status.get("phone")) break else: qr = "QR available" if status.get("qr") else "waiting" log.info("WhatsApp bridge not connected yet (%s)", qr) else: log.info("WhatsApp bridge not reachable, retrying...") retries += 1 await asyncio.sleep(5) if not _running: return log.info("WhatsApp adapter polling started") # Polling loop while _running: try: messages = await poll_messages(client) for msg in messages: await handle_incoming(msg, client) except asyncio.CancelledError: break except Exception as e: log.error("Polling error: %s", e) await asyncio.sleep(2) log.info("WhatsApp adapter stopped") def stop_whatsapp(): """Signal the polling loop to stop.""" global _running _running = False