feat(canary): single-shot on_pause_callback + wire Telegram drift alert

Canary auto-pause was silent: when drift > threshold the module flipped
to paused without any user-facing notification, leaving the user to
wonder why detection went dark. Add an optional on_pause_callback
invoked exactly once per not_paused→paused transition. Wrap the call
in try/except so a notifier failure can never break the detection
cycle.

main.py wires the callback to emit canary_drift_paused audit event
plus a warn Alert guiding the user toward /resume or recalibration.

Tests: test_canary_pause_callback_fires_once (idempotent),
test_canary_resume_allows_new_pause_notification (re-arms after
resume), test_canary_pause_callback_exception_does_not_crash_check
(safety).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 11:53:22 +03:00
parent c5024ce600
commit 9cf49caf8a
3 changed files with 85 additions and 1 deletions

View File

@@ -1,14 +1,18 @@
"""Layout drift detector via perceptual hash comparison."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
import numpy as np
from .config import Config
from .vision import crop_roi, hamming_hex, phash
logger = logging.getLogger(__name__)
@dataclass
class CanaryResult:
@@ -28,10 +32,15 @@ class Canary:
self,
cfg: Config,
pause_flag_path: Path | None = None,
on_pause_callback: Callable[[int], None] | None = None,
) -> None:
self._cfg = cfg
self._pause_flag_path = pause_flag_path
self._paused = False
# Single-shot callback invoked exactly once per not_paused→paused transition.
# Wrapped in try/except at call site so a faulty notifier never breaks
# the detection cycle.
self._on_pause = on_pause_callback
def check(self, frame_bgr: np.ndarray) -> CanaryResult:
roi_img = crop_roi(frame_bgr, self._cfg.canary.roi)
@@ -43,6 +52,12 @@ class Canary:
self._paused = True
if self._pause_flag_path is not None:
self._pause_flag_path.write_text("paused", encoding="utf-8")
if self._on_pause is not None:
try:
self._on_pause(distance)
except Exception as exc:
# Never let a notifier hiccup abort the detection cycle.
logger.warning("canary on_pause_callback raised: %s", exc)
return CanaryResult(distance=distance, drifted=drifted, paused=self._paused)

View File

@@ -809,8 +809,30 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
capture = _build_capture(cfg, capture_stub=capture_stub)
detector = Detector(cfg, capture)
fsm = StateMachine(lockout_s=cfg.lockout_s)
canary = Canary(cfg, pause_flag_path=Path("logs/pause.flag"))
audit = AuditLog(Path("logs"))
# Forward-declare notifier so the canary pause callback can close over it.
# The notifier is constructed a few lines below once backends exist.
_notifier_ref: dict = {}
def _on_canary_pause(distance: int) -> None:
audit.log({"ts": time.time(), "event": "canary_drift_paused", "distance": distance})
n = _notifier_ref.get("n")
if n is not None:
n.send(Alert(
kind="warn",
title=f"Canary drift={distance} — monitorizare pauzată",
body=(
"Fereastra/paleta s-a schimbat. Trimite /resume pentru a relua "
"sau recalibrează."
),
))
canary = Canary(
cfg,
pause_flag_path=Path("logs/pause.flag"),
on_pause_callback=_on_canary_pause,
)
detection_log = AuditLog(Path("logs/detections"))
backends = [
DiscordNotifier(cfg.discord.webhook_url),
@@ -827,6 +849,7 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
})
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
_notifier_ref["n"] = notifier
# Initial frame + canary check
first_frame = capture()

View File

@@ -140,6 +140,52 @@ def test_pause_file_written(tmp_path: Path) -> None:
assert flag.exists()
def test_canary_pause_callback_fires_once() -> None:
"""Single-shot: callback invoked exactly once per not_paused→paused edge."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
calls: list[int] = []
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
canary.check(DRIFTED_FRAME) # transition → callback fires
canary.check(DRIFTED_FRAME) # still paused → no new callback
canary.check(BASELINE_FRAME) # clean but still paused → no new callback
assert len(calls) == 1
assert calls[0] > 0 # distance should be positive
def test_canary_resume_allows_new_pause_notification() -> None:
"""After resume, a fresh drift must re-fire the callback."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
calls: list[int] = []
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
canary.check(DRIFTED_FRAME)
assert len(calls) == 1
canary.resume()
canary.check(DRIFTED_FRAME) # new pause transition
assert len(calls) == 2
def test_canary_pause_callback_exception_does_not_crash_check() -> None:
"""A failing callback must not break canary.check (detection cycle safety)."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
def _boom(_d: int) -> None:
raise RuntimeError("notifier down")
canary = Canary(cfg, on_pause_callback=_boom)
# Must not raise — exception is swallowed + logged.
result = canary.check(DRIFTED_FRAME)
assert result.paused is True
assert canary.is_paused is True
def test_resume_deletes_pause_file(tmp_path: Path) -> None:
"""resume() deletes the pause flag file."""
flag = tmp_path / "paused.flag"