Când canary drift coincide cu schimbare strip-count pe același frame (ex: TS comută 2→1 chart-uri și mută menu-ul peste care e ancorat ROI), sistemul rescrie automat baseline_phash în TOML, commit layout change și trimite o singură alertă combinată — fără pauză, fără /rebase manual. Drift fără strip-count change rămâne pauză ca azi (drift real). Gate pe două semnale independente previne fals-pozitive. Canary.check() despărțit în măsurare pură + commit_pause/rebase explicit; tick-loop-ul orchestrează decizia. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
254 lines
8.1 KiB
Python
254 lines
8.1 KiB
Python
"""Tests for src/atm/canary.py."""
|
||
from __future__ import annotations
|
||
|
||
import dataclasses
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import pytest
|
||
|
||
from atm.canary import Canary, CanaryResult
|
||
from atm.config import (
|
||
CanaryRegion,
|
||
ColorSpec,
|
||
Config,
|
||
DiscordCfg,
|
||
ROI,
|
||
TelegramCfg,
|
||
YAxisCalib,
|
||
)
|
||
from atm.vision import crop_roi, phash
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fixtures / helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
CANARY_ROI = ROI(x=0, y=0, w=50, h=50)
|
||
|
||
|
||
def _make_base_cfg() -> Config:
|
||
colors = {
|
||
"turquoise": ColorSpec(rgb=(0, 255, 255), tolerance=30.0),
|
||
"yellow": ColorSpec(rgb=(255, 255, 0), tolerance=30.0),
|
||
"dark_green": ColorSpec(rgb=(0, 100, 0), tolerance=30.0),
|
||
"dark_red": ColorSpec(rgb=(100, 0, 0), tolerance=30.0),
|
||
"light_green": ColorSpec(rgb=(0, 255, 0), tolerance=30.0),
|
||
"light_red": ColorSpec(rgb=(255, 0, 0), tolerance=30.0),
|
||
"gray": ColorSpec(rgb=(128, 128, 128), tolerance=30.0),
|
||
}
|
||
# placeholder baseline_phash; tests replace canary via dataclasses.replace
|
||
return Config(
|
||
window_title="test",
|
||
dot_roi=ROI(x=0, y=0, w=100, h=50),
|
||
chart_roi=ROI(x=0, y=0, w=600, h=400),
|
||
colors=colors,
|
||
y_axis=YAxisCalib(p1_y=0, p1_price=100.0, p2_y=400, p2_price=80.0),
|
||
canary=CanaryRegion(roi=CANARY_ROI, baseline_phash="0" * 64, drift_threshold=8),
|
||
discord=DiscordCfg(webhook_url="http://example.com/hook"),
|
||
telegram=TelegramCfg(bot_token="tok", chat_id="123"),
|
||
)
|
||
|
||
|
||
def _cfg_with_baseline(baseline_frame: np.ndarray) -> Config:
|
||
"""Build a Config whose baseline_phash matches the given frame's canary ROI."""
|
||
roi_img = crop_roi(baseline_frame, CANARY_ROI)
|
||
h = phash(roi_img)
|
||
canary_region = CanaryRegion(roi=CANARY_ROI, baseline_phash=h, drift_threshold=8)
|
||
return dataclasses.replace(_make_base_cfg(), canary=canary_region)
|
||
|
||
|
||
def _checkerboard(h: int, w: int, block: int = 8) -> np.ndarray:
|
||
"""Return a checkerboard BGR image (high-frequency, distinct phash)."""
|
||
img = np.zeros((h, w, 3), dtype=np.uint8)
|
||
for y in range(0, h, block):
|
||
for x in range(0, w, block):
|
||
if (y // block + x // block) % 2 == 0:
|
||
img[y : y + block, x : x + block] = 255
|
||
return img
|
||
|
||
|
||
# A purely black 100×100 frame as baseline
|
||
BASELINE_FRAME = np.zeros((100, 100, 3), dtype=np.uint8)
|
||
|
||
# A frame where the canary ROI is a checkerboard (visually very different)
|
||
DRIFTED_FRAME = BASELINE_FRAME.copy()
|
||
DRIFTED_FRAME[:50, :50] = _checkerboard(50, 50)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_no_drift() -> None:
|
||
"""Same image as baseline → distance ≤ threshold, not paused."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg)
|
||
|
||
result = canary.check(BASELINE_FRAME)
|
||
|
||
assert result.drifted is False
|
||
assert result.paused is False
|
||
assert canary.is_paused is False
|
||
|
||
|
||
def test_check_does_not_auto_pause() -> None:
|
||
"""check() is pure measurement — never transitions to paused on its own."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg)
|
||
|
||
result = canary.check(DRIFTED_FRAME)
|
||
|
||
assert result.drifted is True
|
||
assert result.paused is False # not committed
|
||
assert canary.is_paused is False
|
||
|
||
|
||
def test_drift_triggers_pause() -> None:
|
||
"""check() detects drift; commit_pause() transitions state."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg)
|
||
|
||
result = canary.check(DRIFTED_FRAME)
|
||
assert result.drifted is True
|
||
canary.commit_pause(result.distance)
|
||
|
||
assert canary.is_paused is True
|
||
|
||
|
||
def test_persists_paused() -> None:
|
||
"""After commit_pause, feeding back a clean frame keeps paused=True."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg)
|
||
|
||
r1 = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r1.distance)
|
||
result = canary.check(BASELINE_FRAME) # clean frame, but still paused
|
||
|
||
assert result.paused is True
|
||
assert canary.is_paused is True
|
||
|
||
|
||
def test_resume_clears() -> None:
|
||
"""resume() clears the paused flag; subsequent clean frame stays unpaused."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg)
|
||
|
||
r = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r.distance)
|
||
canary.resume()
|
||
|
||
assert canary.is_paused is False
|
||
|
||
result = canary.check(BASELINE_FRAME)
|
||
assert result.paused is False
|
||
|
||
|
||
def test_pause_file_written(tmp_path: Path) -> None:
|
||
"""When pause_flag_path is provided, the file is created on commit_pause."""
|
||
flag = tmp_path / "paused.flag"
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg, pause_flag_path=flag)
|
||
|
||
assert not flag.exists()
|
||
r = canary.check(DRIFTED_FRAME)
|
||
assert not flag.exists() # check() alone does NOT write the flag
|
||
canary.commit_pause(r.distance)
|
||
assert flag.exists()
|
||
|
||
|
||
def test_canary_pause_callback_fires_once() -> None:
|
||
"""Single-shot: callback invoked exactly once per not_paused→paused edge."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
calls: list[int] = []
|
||
|
||
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
|
||
|
||
r1 = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r1.distance) # transition → callback fires
|
||
canary.commit_pause(r1.distance) # idempotent → no new callback
|
||
r2 = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r2.distance) # still paused → no new callback
|
||
canary.check(BASELINE_FRAME) # clean but still paused → no new callback
|
||
|
||
assert len(calls) == 1
|
||
assert calls[0] > 0 # distance should be positive
|
||
|
||
|
||
def test_commit_pause_idempotent() -> None:
|
||
"""commit_pause is no-op when already paused — no flag re-write, no callback."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
calls: list[int] = []
|
||
|
||
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
|
||
|
||
canary.commit_pause(100)
|
||
canary.commit_pause(200)
|
||
canary.commit_pause(300)
|
||
|
||
assert len(calls) == 1
|
||
assert calls[0] == 100
|
||
|
||
|
||
def test_canary_resume_allows_new_pause_notification() -> None:
|
||
"""After resume, a fresh drift must re-fire the callback."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
calls: list[int] = []
|
||
|
||
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
|
||
|
||
r1 = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r1.distance)
|
||
assert len(calls) == 1
|
||
|
||
canary.resume()
|
||
r2 = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r2.distance) # new pause transition
|
||
|
||
assert len(calls) == 2
|
||
|
||
|
||
def test_canary_pause_callback_exception_does_not_crash_commit_pause() -> None:
|
||
"""A failing callback must not break commit_pause (detection cycle safety)."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
|
||
def _boom(_d: int) -> None:
|
||
raise RuntimeError("notifier down")
|
||
|
||
canary = Canary(cfg, on_pause_callback=_boom)
|
||
|
||
# Must not raise — exception is swallowed + logged.
|
||
r = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r.distance)
|
||
assert canary.is_paused is True
|
||
|
||
|
||
def test_resume_deletes_pause_file(tmp_path: Path) -> None:
|
||
"""resume() deletes the pause flag file."""
|
||
flag = tmp_path / "paused.flag"
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg, pause_flag_path=flag)
|
||
|
||
r = canary.check(DRIFTED_FRAME)
|
||
canary.commit_pause(r.distance)
|
||
assert flag.exists()
|
||
canary.resume()
|
||
assert not flag.exists()
|
||
|
||
|
||
def test_rebase_updates_baseline_in_memory() -> None:
|
||
"""rebase(new_h) mirrors hash into cfg.canary; subsequent check uses it."""
|
||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||
canary = Canary(cfg)
|
||
|
||
# Compute the phash of the drifted frame; rebase to it.
|
||
drifted_hash = phash(crop_roi(DRIFTED_FRAME, CANARY_ROI))
|
||
assert cfg.canary.baseline_phash != drifted_hash
|
||
|
||
canary.rebase(drifted_hash)
|
||
|
||
assert cfg.canary.baseline_phash == drifted_hash
|
||
# Now the drifted frame reads as clean.
|
||
result = canary.check(DRIFTED_FRAME)
|
||
assert result.drifted is False
|
||
assert result.paused is False
|