"""Tests for notifier module: FanoutNotifier, DiscordNotifier, TelegramNotifier.""" from __future__ import annotations import json import time from pathlib import Path import pytest from atm.notifier import Alert from atm.notifier.fanout import FanoutNotifier # --------------------------------------------------------------------------- # Fake backends # --------------------------------------------------------------------------- class FakeBackend: """Configurable fake backend for testing.""" def __init__( self, name: str = "fake", always_fail: bool = False, fail_first_n: int = 0, sleep_s: float = 0.0, ) -> None: self.name = name self._always_fail = always_fail self._fail_first_n = fail_first_n self._sleep_s = sleep_s self._call_count = 0 def send(self, alert: Alert) -> None: self._call_count += 1 if self._sleep_s: time.sleep(self._sleep_s) if self._always_fail: raise RuntimeError(f"{self.name}: simulated failure") if self._call_count <= self._fail_first_n: raise RuntimeError(f"{self.name}: simulated failure #{self._call_count}") def _alert(title: str = "test", kind: str = "trigger") -> Alert: return Alert(kind=kind, title=title, body="body text") # --------------------------------------------------------------------------- # FanoutNotifier tests # --------------------------------------------------------------------------- def test_fanout_both_delivered(tmp_path: Path) -> None: dl = tmp_path / "dead.jsonl" b1 = FakeBackend("b1") b2 = FakeBackend("b2") fan = FanoutNotifier([b1, b2], dl, backoff_base=0.01) for i in range(3): fan.send(_alert(f"alert-{i}")) fan.stop(timeout=5.0) s = fan.stats() assert s["b1"]["sent"] == 3 assert s["b2"]["sent"] == 3 assert s["b1"]["failed"] == 0 assert s["b2"]["failed"] == 0 def test_one_backend_down_other_delivers(tmp_path: Path) -> None: dl = tmp_path / "dead.jsonl" ok_backend = FakeBackend("ok") bad_backend = FakeBackend("bad", always_fail=True) fan = FanoutNotifier( [ok_backend, bad_backend], dl, max_retries=1, backoff_base=0.01 ) for i in range(2): fan.send(_alert(f"a{i}")) fan.stop(timeout=5.0) s = fan.stats() assert s["ok"]["sent"] == 2 assert s["bad"]["failed"] == 2 # dead letter file should have entries for the bad backend assert dl.exists() lines = [json.loads(l) for l in dl.read_text().splitlines()] assert all(e["backend"] == "bad" for e in lines) assert len(lines) == 2 def test_dead_letter_on_exhausted_retries(tmp_path: Path) -> None: dl = tmp_path / "dead.jsonl" bad = FakeBackend("bad", always_fail=True) fan = FanoutNotifier([bad], dl, max_retries=3, backoff_base=0.01) fan.send(_alert("my-alert")) fan.stop(timeout=5.0) s = fan.stats() assert s["bad"]["failed"] == 1 # retries = max_retries (3 extra attempts after first) assert s["bad"]["retries"] == 3 assert dl.exists() lines = [json.loads(l) for l in dl.read_text().splitlines()] assert len(lines) == 1 entry = lines[0] assert entry["backend"] == "bad" assert entry["alert_title"] == "my-alert" assert "error_str" in entry assert "timestamp" in entry def test_queue_drop_oldest(tmp_path: Path) -> None: dl = tmp_path / "dead.jsonl" # slow backend: each send takes 0.5s so queue fills fast slow = FakeBackend("slow", sleep_s=0.5) fan = FanoutNotifier([slow], dl, queue_size=2, backoff_base=0.01) # Pump 10 alerts rapidly; worker can't keep up for i in range(10): fan.send(_alert(f"a{i}")) fan.stop(timeout=10.0) s = fan.stats() assert s["slow"]["dropped"] > 0 assert s["slow"]["sent"] <= 2 + 1 # queue_size + possibly 1 in-flight def test_retry_backoff_recovers(tmp_path: Path) -> None: dl = tmp_path / "dead.jsonl" # Fails only the very first call, succeeds after b = FakeBackend("b", fail_first_n=1) fan = FanoutNotifier([b], dl, max_retries=3, backoff_base=0.01) fan.send(_alert("recover")) fan.stop(timeout=5.0) s = fan.stats() assert s["b"]["sent"] == 1 assert s["b"]["retries"] == 1 assert s["b"]["failed"] == 0 assert not dl.exists() def test_stop_drains(tmp_path: Path) -> None: dl = tmp_path / "dead.jsonl" b = FakeBackend("b") fan = FanoutNotifier([b], dl, backoff_base=0.01) for i in range(5): fan.send(_alert(f"a{i}")) fan.stop(timeout=5.0) # All items should have been processed before stop returned assert fan.stats()["b"]["sent"] == 5 # --------------------------------------------------------------------------- # DiscordNotifier unit tests (no real HTTP) # --------------------------------------------------------------------------- class _MockResponse: def __init__(self, status_code: int, text: str = "") -> None: self.status_code = status_code self.text = text class _MockSession: def __init__(self, status_code: int = 204) -> None: self.status_code = status_code self.calls: list[dict] = [] def post(self, url: str, **kwargs): self.calls.append({"url": url, **kwargs}) return _MockResponse(self.status_code) def test_discord_send_ok() -> None: from atm.notifier.discord import DiscordNotifier session = _MockSession(204) n = DiscordNotifier("https://discord.example/hook", session=session) n.send(_alert("Hello")) assert len(session.calls) == 1 assert "**Hello**" in session.calls[0]["json"]["content"] def test_discord_429_raises() -> None: from atm.notifier.discord import DiscordNotifier n = DiscordNotifier("https://discord.example/hook", session=_MockSession(429)) with pytest.raises(RuntimeError, match="429"): n.send(_alert("x")) def test_discord_5xx_raises() -> None: from atm.notifier.discord import DiscordNotifier n = DiscordNotifier("https://discord.example/hook", session=_MockSession(500)) with pytest.raises(RuntimeError, match="500"): n.send(_alert("x")) # --------------------------------------------------------------------------- # TelegramNotifier unit tests (no real HTTP) # --------------------------------------------------------------------------- def test_telegram_send_ok() -> None: from atm.notifier.telegram import TelegramNotifier session = _MockSession(200) n = TelegramNotifier("token", "chat123", session=session) n.send(_alert("Hi")) assert len(session.calls) == 1 assert "*Hi*" in session.calls[0]["json"]["text"] def test_telegram_429_raises() -> None: from atm.notifier.telegram import TelegramNotifier n = TelegramNotifier("token", "chat123", session=_MockSession(429)) with pytest.raises(RuntimeError, match="429"): n.send(_alert("x")) def test_telegram_5xx_raises() -> None: from atm.notifier.telegram import TelegramNotifier n = TelegramNotifier("token", "chat123", session=_MockSession(500)) with pytest.raises(RuntimeError, match="500"): n.send(_alert("x"))