Node.js bridge (bridge/whatsapp/): Baileys client with Express HTTP API on localhost:8098 — QR code linking, message queue, reconnection logic. Python adapter (src/adapters/whatsapp.py): polls bridge every 2s, routes through router.py, separate whatsapp.owner/admins auth, security logging. Integrated in main.py alongside Discord + Telegram via asyncio.gather. CLI: echo whatsapp status/qr. 442 tests pass (32 new, zero failures). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
430 lines
13 KiB
Python
430 lines
13 KiB
Python
"""Tests for src/adapters/whatsapp.py — WhatsApp adapter."""
|
|
|
|
import json
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import httpx
|
|
|
|
from src.config import Config
|
|
from src.adapters import whatsapp
|
|
from src.adapters.whatsapp import (
|
|
is_owner,
|
|
is_admin,
|
|
is_registered_chat,
|
|
split_message,
|
|
poll_messages,
|
|
send_whatsapp,
|
|
get_bridge_status,
|
|
handle_incoming,
|
|
run_whatsapp,
|
|
stop_whatsapp,
|
|
)
|
|
|
|
|
|
# --- Fixtures ---
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_config(tmp_path):
|
|
"""Create a Config backed by a temp file with default data."""
|
|
data = {
|
|
"bot": {
|
|
"name": "Echo",
|
|
"default_model": "sonnet",
|
|
},
|
|
"whatsapp": {
|
|
"owner": None,
|
|
"admins": [],
|
|
},
|
|
"whatsapp_channels": {},
|
|
}
|
|
config_file = tmp_path / "config.json"
|
|
config_file.write_text(json.dumps(data, indent=2))
|
|
return Config(config_file)
|
|
|
|
|
|
@pytest.fixture
|
|
def owned_config(tmp_path):
|
|
"""Config with owner and whatsapp channels set."""
|
|
data = {
|
|
"bot": {
|
|
"name": "Echo",
|
|
"default_model": "sonnet",
|
|
},
|
|
"whatsapp": {
|
|
"owner": "5511999990000",
|
|
"admins": ["5511888880000"],
|
|
},
|
|
"whatsapp_channels": {
|
|
"general": {"id": "group123@g.us"},
|
|
},
|
|
}
|
|
config_file = tmp_path / "config.json"
|
|
config_file.write_text(json.dumps(data, indent=2))
|
|
return Config(config_file)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _set_config(tmp_config):
|
|
"""Ensure module config is set for each test."""
|
|
whatsapp._config = tmp_config
|
|
yield
|
|
whatsapp._config = None
|
|
|
|
|
|
@pytest.fixture
|
|
def _set_owned(owned_config):
|
|
"""Set owned config for specific tests."""
|
|
whatsapp._config = owned_config
|
|
yield
|
|
whatsapp._config = None
|
|
|
|
|
|
def _mock_httpx_response(status_code=200, json_data=None, text=""):
|
|
"""Create a mock httpx.Response."""
|
|
resp = MagicMock(spec=httpx.Response)
|
|
resp.status_code = status_code
|
|
resp.text = text
|
|
if json_data is not None:
|
|
resp.json.return_value = json_data
|
|
return resp
|
|
|
|
|
|
def _mock_client():
|
|
"""Create a mock httpx.AsyncClient."""
|
|
client = AsyncMock(spec=httpx.AsyncClient)
|
|
return client
|
|
|
|
|
|
# --- Authorization helpers ---
|
|
|
|
|
|
class TestIsOwner:
|
|
def test_is_owner_true(self, _set_owned):
|
|
assert is_owner("5511999990000") is True
|
|
|
|
def test_is_owner_false(self, _set_owned):
|
|
assert is_owner("9999999999") is False
|
|
|
|
def test_is_owner_none_owner(self):
|
|
assert is_owner("5511999990000") is False
|
|
|
|
|
|
class TestIsAdmin:
|
|
def test_is_admin_owner_is_admin(self, _set_owned):
|
|
assert is_admin("5511999990000") is True
|
|
|
|
def test_is_admin_listed(self, _set_owned):
|
|
assert is_admin("5511888880000") is True
|
|
|
|
def test_is_admin_not_listed(self, _set_owned):
|
|
assert is_admin("9999999999") is False
|
|
|
|
|
|
class TestIsRegisteredChat:
|
|
def test_is_registered_true(self, _set_owned):
|
|
assert is_registered_chat("group123@g.us") is True
|
|
|
|
def test_is_registered_false(self, _set_owned):
|
|
assert is_registered_chat("unknown@g.us") is False
|
|
|
|
def test_is_registered_empty(self):
|
|
assert is_registered_chat("group123@g.us") is False
|
|
|
|
|
|
# --- split_message ---
|
|
|
|
|
|
class TestSplitMessage:
|
|
def test_short_message_not_split(self):
|
|
assert split_message("hello") == ["hello"]
|
|
|
|
def test_long_message_split(self):
|
|
text = "a" * 8192
|
|
chunks = split_message(text, limit=4096)
|
|
assert len(chunks) == 2
|
|
assert all(len(c) <= 4096 for c in chunks)
|
|
assert "".join(chunks) == text
|
|
|
|
def test_split_at_newline(self):
|
|
text = "line1\n" * 1000
|
|
chunks = split_message(text, limit=100)
|
|
assert all(len(c) <= 100 for c in chunks)
|
|
|
|
def test_empty_string(self):
|
|
assert split_message("") == [""]
|
|
|
|
|
|
# --- Bridge communication ---
|
|
|
|
|
|
class TestPollMessages:
|
|
@pytest.mark.asyncio
|
|
async def test_successful_poll(self):
|
|
client = _mock_client()
|
|
messages = [{"from": "123@s.whatsapp.net", "text": "hi"}]
|
|
client.get.return_value = _mock_httpx_response(
|
|
json_data={"messages": messages}
|
|
)
|
|
result = await poll_messages(client)
|
|
assert result == messages
|
|
client.get.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_poll_error_returns_empty(self):
|
|
client = _mock_client()
|
|
client.get.side_effect = httpx.ConnectError("bridge down")
|
|
result = await poll_messages(client)
|
|
assert result == []
|
|
|
|
|
|
class TestSendWhatsapp:
|
|
@pytest.mark.asyncio
|
|
async def test_successful_send(self):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(
|
|
json_data={"ok": True}
|
|
)
|
|
result = await send_whatsapp(client, "123@s.whatsapp.net", "hello")
|
|
assert result is True
|
|
client.post.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_send(self):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(
|
|
status_code=500, json_data={"ok": False}, text="Server Error"
|
|
)
|
|
result = await send_whatsapp(client, "123@s.whatsapp.net", "hello")
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_long_message_split_send(self):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(
|
|
json_data={"ok": True}
|
|
)
|
|
long_text = "a" * 8192
|
|
result = await send_whatsapp(client, "123@s.whatsapp.net", long_text)
|
|
assert result is True
|
|
assert client.post.call_count == 2
|
|
|
|
|
|
class TestGetBridgeStatus:
|
|
@pytest.mark.asyncio
|
|
async def test_connected(self):
|
|
client = _mock_client()
|
|
client.get.return_value = _mock_httpx_response(
|
|
json_data={"connected": True, "phone": "5511999990000"}
|
|
)
|
|
result = await get_bridge_status(client)
|
|
assert result == {"connected": True, "phone": "5511999990000"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unreachable(self):
|
|
client = _mock_client()
|
|
client.get.side_effect = httpx.ConnectError("unreachable")
|
|
result = await get_bridge_status(client)
|
|
assert result is None
|
|
|
|
|
|
# --- Message handler ---
|
|
|
|
|
|
class TestHandleIncoming:
|
|
@pytest.mark.asyncio
|
|
async def test_private_admin_message(self, _set_owned):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(json_data={"ok": True})
|
|
msg = {
|
|
"from": "5511999990000@s.whatsapp.net",
|
|
"text": "Hello Claude",
|
|
"pushName": "Owner",
|
|
"isGroup": False,
|
|
}
|
|
with patch("src.adapters.whatsapp.route_message", return_value=("Hi!", False)) as mock_route:
|
|
await handle_incoming(msg, client)
|
|
mock_route.assert_called_once()
|
|
client.post.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_private_unauthorized(self, _set_owned):
|
|
client = _mock_client()
|
|
msg = {
|
|
"from": "9999999999@s.whatsapp.net",
|
|
"text": "Hello",
|
|
"pushName": "Stranger",
|
|
"isGroup": False,
|
|
}
|
|
with patch("src.adapters.whatsapp.route_message") as mock_route:
|
|
await handle_incoming(msg, client)
|
|
mock_route.assert_not_called()
|
|
client.post.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_unregistered(self, _set_owned):
|
|
client = _mock_client()
|
|
msg = {
|
|
"from": "unknown@g.us",
|
|
"text": "Hello",
|
|
"pushName": "User",
|
|
"isGroup": True,
|
|
}
|
|
with patch("src.adapters.whatsapp.route_message") as mock_route:
|
|
await handle_incoming(msg, client)
|
|
mock_route.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_registered_skip(self, _set_owned):
|
|
client = _mock_client()
|
|
msg = {
|
|
"from": "group123@g.us",
|
|
"text": "Hello",
|
|
"pushName": "User",
|
|
"isGroup": True,
|
|
}
|
|
with patch("src.adapters.whatsapp.route_message") as mock_route:
|
|
await handle_incoming(msg, client)
|
|
mock_route.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clear_command(self, _set_owned):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(json_data={"ok": True})
|
|
msg = {
|
|
"from": "5511999990000@s.whatsapp.net",
|
|
"text": "/clear",
|
|
"pushName": "Owner",
|
|
"isGroup": False,
|
|
}
|
|
with patch("src.adapters.whatsapp.clear_session", return_value=True) as mock_clear:
|
|
await handle_incoming(msg, client)
|
|
mock_clear.assert_called_once_with("wa-5511999990000")
|
|
client.post.assert_called_once()
|
|
sent_json = client.post.call_args[1]["json"]
|
|
assert "cleared" in sent_json["text"].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_command_with_session(self, _set_owned):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(json_data={"ok": True})
|
|
msg = {
|
|
"from": "5511999990000@s.whatsapp.net",
|
|
"text": "/status",
|
|
"pushName": "Owner",
|
|
"isGroup": False,
|
|
}
|
|
session = {
|
|
"model": "opus",
|
|
"session_id": "sess-abc-12345678",
|
|
"message_count": 5,
|
|
"total_input_tokens": 1000,
|
|
"total_output_tokens": 500,
|
|
}
|
|
with patch("src.adapters.whatsapp.get_active_session", return_value=session):
|
|
await handle_incoming(msg, client)
|
|
client.post.assert_called_once()
|
|
sent_json = client.post.call_args[1]["json"]
|
|
assert "opus" in sent_json["text"]
|
|
assert "5" in sent_json["text"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_command_no_session(self, _set_owned):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(json_data={"ok": True})
|
|
msg = {
|
|
"from": "5511999990000@s.whatsapp.net",
|
|
"text": "/status",
|
|
"pushName": "Owner",
|
|
"isGroup": False,
|
|
}
|
|
with patch("src.adapters.whatsapp.get_active_session", return_value=None):
|
|
await handle_incoming(msg, client)
|
|
client.post.assert_called_once()
|
|
sent_json = client.post.call_args[1]["json"]
|
|
assert "No active session" in sent_json["text"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_handling(self, _set_owned):
|
|
client = _mock_client()
|
|
client.post.return_value = _mock_httpx_response(json_data={"ok": True})
|
|
msg = {
|
|
"from": "5511999990000@s.whatsapp.net",
|
|
"text": "Hello",
|
|
"pushName": "Owner",
|
|
"isGroup": False,
|
|
}
|
|
with patch("src.adapters.whatsapp.route_message", side_effect=Exception("boom")):
|
|
await handle_incoming(msg, client)
|
|
client.post.assert_called_once()
|
|
sent_json = client.post.call_args[1]["json"]
|
|
assert "Sorry" in sent_json["text"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_text_ignored(self, _set_owned):
|
|
client = _mock_client()
|
|
msg = {
|
|
"from": "5511999990000@s.whatsapp.net",
|
|
"text": "",
|
|
"pushName": "Owner",
|
|
"isGroup": False,
|
|
}
|
|
with patch("src.adapters.whatsapp.route_message") as mock_route:
|
|
await handle_incoming(msg, client)
|
|
mock_route.assert_not_called()
|
|
client.post.assert_not_called()
|
|
|
|
|
|
# --- Security logging ---
|
|
|
|
|
|
class TestSecurityLogging:
|
|
@pytest.mark.asyncio
|
|
async def test_unauthorized_dm_logged(self, _set_owned):
|
|
client = _mock_client()
|
|
msg = {
|
|
"from": "9999999999@s.whatsapp.net",
|
|
"text": "hack attempt",
|
|
"pushName": "Stranger",
|
|
"isGroup": False,
|
|
}
|
|
with patch.object(whatsapp._security_log, "warning") as mock_log:
|
|
await handle_incoming(msg, client)
|
|
mock_log.assert_called_once()
|
|
assert "Unauthorized" in mock_log.call_args[0][0]
|
|
|
|
|
|
# --- Lifecycle ---
|
|
|
|
|
|
class TestRunWhatsapp:
|
|
@pytest.mark.asyncio
|
|
async def test_basic_start_stop(self, tmp_config):
|
|
"""Test that run_whatsapp sets state and exits when stopped."""
|
|
mock_client = _mock_client()
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
# get_bridge_status returns None → retries, but we stop after first sleep
|
|
mock_client.get.return_value = _mock_httpx_response(status_code=500)
|
|
|
|
async def stop_on_sleep(*args, **kwargs):
|
|
whatsapp._running = False
|
|
|
|
with (
|
|
patch("src.adapters.whatsapp.httpx.AsyncClient", return_value=mock_client),
|
|
patch("src.adapters.whatsapp.asyncio.sleep", side_effect=stop_on_sleep),
|
|
):
|
|
await run_whatsapp(tmp_config, bridge_url="http://127.0.0.1:9999")
|
|
|
|
assert whatsapp._config is tmp_config
|
|
assert whatsapp._bridge_url == "http://127.0.0.1:9999"
|
|
|
|
|
|
class TestStopWhatsapp:
|
|
def test_sets_running_false(self):
|
|
whatsapp._running = True
|
|
stop_whatsapp()
|
|
assert whatsapp._running is False
|