Files
atm/tests/test_canary.py
Marius Mutu 9cf49caf8a feat(canary): single-shot on_pause_callback + wire Telegram drift alert
Canary auto-pause was silent: when drift > threshold the module flipped
to paused without any user-facing notification, leaving the user to
wonder why detection went dark. Add an optional on_pause_callback
invoked exactly once per not_paused→paused transition. Wrap the call
in try/except so a notifier failure can never break the detection
cycle.

main.py wires the callback to emit canary_drift_paused audit event
plus a warn Alert guiding the user toward /resume or recalibration.

Tests: test_canary_pause_callback_fires_once (idempotent),
test_canary_resume_allows_new_pause_notification (re-arms after
resume), test_canary_pause_callback_exception_does_not_crash_check
(safety).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 11:53:22 +03:00

199 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for src/atm/canary.py."""
from __future__ import annotations
import dataclasses
from pathlib import Path
import numpy as np
import pytest
from atm.canary import Canary, CanaryResult
from atm.config import (
CanaryRegion,
ColorSpec,
Config,
DiscordCfg,
ROI,
TelegramCfg,
YAxisCalib,
)
from atm.vision import crop_roi, phash
# ---------------------------------------------------------------------------
# Fixtures / helpers
# ---------------------------------------------------------------------------
CANARY_ROI = ROI(x=0, y=0, w=50, h=50)
def _make_base_cfg() -> Config:
colors = {
"turquoise": ColorSpec(rgb=(0, 255, 255), tolerance=30.0),
"yellow": ColorSpec(rgb=(255, 255, 0), tolerance=30.0),
"dark_green": ColorSpec(rgb=(0, 100, 0), tolerance=30.0),
"dark_red": ColorSpec(rgb=(100, 0, 0), tolerance=30.0),
"light_green": ColorSpec(rgb=(0, 255, 0), tolerance=30.0),
"light_red": ColorSpec(rgb=(255, 0, 0), tolerance=30.0),
"gray": ColorSpec(rgb=(128, 128, 128), tolerance=30.0),
}
# placeholder baseline_phash; tests replace canary via dataclasses.replace
return Config(
window_title="test",
dot_roi=ROI(x=0, y=0, w=100, h=50),
chart_roi=ROI(x=0, y=0, w=600, h=400),
colors=colors,
y_axis=YAxisCalib(p1_y=0, p1_price=100.0, p2_y=400, p2_price=80.0),
canary=CanaryRegion(roi=CANARY_ROI, baseline_phash="0" * 64, drift_threshold=8),
discord=DiscordCfg(webhook_url="http://example.com/hook"),
telegram=TelegramCfg(bot_token="tok", chat_id="123"),
)
def _cfg_with_baseline(baseline_frame: np.ndarray) -> Config:
"""Build a Config whose baseline_phash matches the given frame's canary ROI."""
roi_img = crop_roi(baseline_frame, CANARY_ROI)
h = phash(roi_img)
canary_region = CanaryRegion(roi=CANARY_ROI, baseline_phash=h, drift_threshold=8)
return dataclasses.replace(_make_base_cfg(), canary=canary_region)
def _checkerboard(h: int, w: int, block: int = 8) -> np.ndarray:
"""Return a checkerboard BGR image (high-frequency, distinct phash)."""
img = np.zeros((h, w, 3), dtype=np.uint8)
for y in range(0, h, block):
for x in range(0, w, block):
if (y // block + x // block) % 2 == 0:
img[y : y + block, x : x + block] = 255
return img
# A purely black 100×100 frame as baseline
BASELINE_FRAME = np.zeros((100, 100, 3), dtype=np.uint8)
# A frame where the canary ROI is a checkerboard (visually very different)
DRIFTED_FRAME = BASELINE_FRAME.copy()
DRIFTED_FRAME[:50, :50] = _checkerboard(50, 50)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_no_drift() -> None:
"""Same image as baseline → distance ≤ threshold, not paused."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
canary = Canary(cfg)
result = canary.check(BASELINE_FRAME)
assert result.drifted is False
assert result.paused is False
assert canary.is_paused is False
def test_drift_triggers_pause() -> None:
"""Drastically different canary ROI → drifted=True, paused=True."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
canary = Canary(cfg)
result = canary.check(DRIFTED_FRAME)
assert result.drifted is True
assert result.paused is True
assert canary.is_paused is True
def test_persists_paused() -> None:
"""After drift, feeding back a clean frame keeps paused=True."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
canary = Canary(cfg)
canary.check(DRIFTED_FRAME) # trigger pause
result = canary.check(BASELINE_FRAME) # clean frame, but still paused
assert result.paused is True
assert canary.is_paused is True
def test_resume_clears() -> None:
"""resume() clears the paused flag; subsequent clean frame stays unpaused."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
canary = Canary(cfg)
canary.check(DRIFTED_FRAME) # pause
canary.resume()
assert canary.is_paused is False
result = canary.check(BASELINE_FRAME)
assert result.paused is False
def test_pause_file_written(tmp_path: Path) -> None:
"""When pause_flag_path is provided, the file is created on drift."""
flag = tmp_path / "paused.flag"
cfg = _cfg_with_baseline(BASELINE_FRAME)
canary = Canary(cfg, pause_flag_path=flag)
assert not flag.exists()
canary.check(DRIFTED_FRAME)
assert flag.exists()
def test_canary_pause_callback_fires_once() -> None:
"""Single-shot: callback invoked exactly once per not_paused→paused edge."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
calls: list[int] = []
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
canary.check(DRIFTED_FRAME) # transition → callback fires
canary.check(DRIFTED_FRAME) # still paused → no new callback
canary.check(BASELINE_FRAME) # clean but still paused → no new callback
assert len(calls) == 1
assert calls[0] > 0 # distance should be positive
def test_canary_resume_allows_new_pause_notification() -> None:
"""After resume, a fresh drift must re-fire the callback."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
calls: list[int] = []
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
canary.check(DRIFTED_FRAME)
assert len(calls) == 1
canary.resume()
canary.check(DRIFTED_FRAME) # new pause transition
assert len(calls) == 2
def test_canary_pause_callback_exception_does_not_crash_check() -> None:
"""A failing callback must not break canary.check (detection cycle safety)."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
def _boom(_d: int) -> None:
raise RuntimeError("notifier down")
canary = Canary(cfg, on_pause_callback=_boom)
# Must not raise — exception is swallowed + logged.
result = canary.check(DRIFTED_FRAME)
assert result.paused is True
assert canary.is_paused is True
def test_resume_deletes_pause_file(tmp_path: Path) -> None:
"""resume() deletes the pause flag file."""
flag = tmp_path / "paused.flag"
cfg = _cfg_with_baseline(BASELINE_FRAME)
canary = Canary(cfg, pause_flag_path=flag)
canary.check(DRIFTED_FRAME)
assert flag.exists()
canary.resume()
assert not flag.exists()