feat(run): screenshot attach, Telegram ok:false fix, post-FIRE catchup guard

Three bundled fixes on the dispatch + FSM + notifier triangle:

1. Telegram silent-success bug: parse JSON body after 200 OK, raise on
   ok:false so FanoutNotifier retries + DLQs + stats surface the failure.
   Previously Discord succeeded while Telegram silently dropped.

2. Per-kind screenshot attach: new AlertsCfg dataclass with per-kind toggle
   (late_start, catchup, arm, prime, trigger). _save_annotated_frame helper
   extracted from inline FIRE block, threaded via Snapshot closure into
   _handle_tick. Failures audit-logged, never silent.

3. Post-FIRE catchup regression (d7305fb): residual dark_green/dark_red dots
   after a FIRE cycle look like startup-catchup from (color, state) alone.
   New fsm.fired_in_session(direction) gate suppresses synth-arm after a
   cycle already fired in that direction. Opposite direction unaffected.

Also: queue-overflow on_drop audit callback, periodic + shutdown heartbeat
stats per-backend, config back-compat (bool or dict for attach_screenshots).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-04-16 22:40:17 +00:00
parent d7305fbbfc
commit 840c23f74c
11 changed files with 731 additions and 41 deletions

View File

@@ -181,6 +181,7 @@ atm report --week 2026-16 # weekly win rate + R PnL + slippage
| `WARN: no window contains 'xxx'` at startup | `cfg.window_title` substring matches nothing | Edit `window_title` in TOML to a substring that's unique to TradeStation. |
| No alerts even after trigger ought to fire | Check `logs/YYYY-MM-DD.jsonl` for `event=tick` entries — are colours accepted? Is `trigger` ever set? | If always UNKNOWN → tolerances too tight. If `trigger` but `locked=true` → lockout from prior fire, normal. |
| Discord OK, Telegram silent (or vice versa) | `logs/dead_letter.jsonl` contains failed alerts with error | Fix credentials in TOML, restart. |
| Heartbeat shows `telegram: failed > 0` | Telegram returned `ok:false` (bot blocked, invalid chat_id, parse error) | Check `logs/dead_letter.jsonl` for the `error_str` / `description` field. Common: bot never started by user in Telegram, or wrong `chat_id` flavor (channel vs group vs DM). |
| Debug circle on mid-strip instead of right edge | Anti-aliasing bridges dots in the mask | Already fixed via erosion+connected-components — ensure `git pull` is current. |
| Wizard window is tiny / image not visible | Tk geometry default on Windows | Already fixed — `git pull`. Image is scaled to fit screen. |

View File

@@ -80,3 +80,12 @@ low_conf_threshold = 0.2
low_conf_run = 3
phaseb_timeout_s = 600
dead_letter_path = "logs/dead_letter.jsonl"
# Per-kind screenshot-attach toggles. All default to true on upgrade.
# Accepts either a bare bool (legacy: attach_screenshots = true) or this table.
[options.attach_screenshots]
late_start = true # screenshot on startup-late alerts
catchup = true # screenshot on mid-session catchup arm + prime
arm = true # screenshot on normal arm (turquoise/yellow) — noisiest
prime = true # screenshot on normal prime (dark_green/dark_red)
trigger = true # screenshot on FIRE

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
import tomllib
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
@@ -84,6 +84,16 @@ class TelegramCfg:
raise ValueError("telegram bot_token + chat_id required")
@dataclass(frozen=True)
class AlertsCfg:
"""Per-kind screenshot-attach toggles. Default all True — zero behavior change."""
late_start: bool = True
catchup: bool = True
arm: bool = True
prime: bool = True
trigger: bool = True
@dataclass(frozen=True)
class Config:
window_title: str
@@ -103,6 +113,7 @@ class Config:
low_conf_run: int = 3
phaseb_timeout_s: int = 600
dead_letter_path: str = "logs/dead_letter.jsonl"
attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg)
config_version: str = "unknown"
def __post_init__(self) -> None:
@@ -153,6 +164,18 @@ class Config:
region = None
if "chart_window_region" in data:
region = ROI(**data["chart_window_region"])
scr = opts.get("attach_screenshots")
if isinstance(scr, bool):
attach = AlertsCfg(
late_start=scr, catchup=scr, arm=scr, prime=scr, trigger=scr,
)
elif isinstance(scr, dict):
fields_set = set(AlertsCfg.__dataclass_fields__)
attach = AlertsCfg(
**{k: bool(v) for k, v in scr.items() if k in fields_set}
)
else:
attach = AlertsCfg()
return cls(
window_title=data["window_title"],
dot_roi=roi,
@@ -171,5 +194,6 @@ class Config:
low_conf_run=int(opts.get("low_conf_run", 3)),
phaseb_timeout_s=int(opts.get("phaseb_timeout_s", 600)),
dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"),
attach_screenshots=attach,
config_version=version,
)

View File

@@ -7,7 +7,7 @@ import sys
import time
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Protocol, cast
from typing import TYPE_CHECKING, Callable, Protocol, cast
from atm.config import Config # stdlib-only (tomllib); safe at module level
from atm.notifier import Alert
@@ -17,6 +17,12 @@ if TYPE_CHECKING:
from atm.state_machine import DotColor
# Snapshot closure: called by _handle_tick when an alert is about to fire.
# Takes (kind, label); returns path to annotated screenshot or None (gated off,
# cv2 missing, or write failure — in which case the alert still sends text-only).
Snapshot = Callable[[str, str], "Path | None"]
class _NotifierLike(Protocol):
def send(self, alert: Alert) -> None: ...
@@ -335,6 +341,47 @@ def _cmd_report(args) -> None:
# Live loop
# ---------------------------------------------------------------------------
def _save_annotated_frame(
frame,
cfg,
fires_dir: Path,
label: str,
now: float,
audit: _AuditLike | None = None,
) -> "Path | None":
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
Returns the path on success, ``None`` on any error. Failures are logged to
audit (when provided) so disk-full / permission issues don't become silent
regressions. Never raises — snapshot is a best-effort enhancement, the
text alert must still go out.
"""
try:
import cv2 # type: ignore[import-untyped]
except ImportError as exc:
if audit is not None:
try:
audit.log({"ts": now, "event": "snapshot_fail", "label": label, "error": f"cv2 missing: {exc}"})
except Exception:
pass
return None
try:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_{label}.png"
annotated = frame.copy()
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
cv2.imwrite(str(fpath), annotated)
return fpath
except Exception as exc:
if audit is not None:
try:
audit.log({"ts": now, "event": "snapshot_fail", "label": label, "error": str(exc)})
except Exception:
pass
return None
def _handle_tick(
fsm: StateMachine,
color: str,
@@ -342,14 +389,21 @@ def _handle_tick(
notifier: _NotifierLike,
audit: _AuditLike,
first_accepted: bool,
snapshot: Snapshot | None = None,
) -> Transition | None:
"""Feed FSM for a single accepted color and dispatch arm/prime/late_start
alerts. Returns the final Transition, or None when the color triggered a
late-start short-circuit (FSM untouched; caller should skip FIRE handling).
When ``snapshot`` is provided it is called with ``(kind, label)`` for each
alert; it returns the PNG path to attach or None (gated off / write failed).
Default None keeps test call sites numpy-free.
Pure in the sense that all state lives in the arguments — safe to unit-test
with a FakeNotifier and FakeAudit.
"""
snap: Snapshot = snapshot or (lambda _k, _l: None)
# Late start: the very first accepted color is already at FIRE phase.
# User came online after the trade signal fired — warn and skip FSM feed.
if first_accepted and color in ("light_green", "light_red") and fsm.state == State.IDLE:
@@ -363,6 +417,7 @@ def _handle_tick(
kind="late_start",
title=f"ATM started late — {direction} already fired",
body=f"Observed {color} at startup. Check chart manually.",
image_path=snap("late_start", f"late_start_{direction.lower()}"),
direction=direction,
))
return None
@@ -370,29 +425,38 @@ def _handle_tick(
# Catchup synth-arm: first accepted color is already at PRIME phase.
# Drive FSM through a synthetic arm so the real PRIME transition fires a
# normal prime alert below. Audit entry is tagged catchup:true.
#
# Guard against post-FIRE residual dark_* dots: after a light_green fires
# the cycle, FSM returns to IDLE but dark_green dots continue for the rest
# of the 15m window. Those are tail noise, NOT a new prime signal. The
# direction-scoped fired_in_session() check suppresses synth-arm in that
# case — the tick falls through to the normal _from_idle feed below which
# classifies it as noise. (A genuine new cycle always comes with fresh
# turquoise/yellow first and drives _from_idle → ARMED normally.)
catchup = False
if color in ("dark_green", "dark_red") and fsm.state == State.IDLE:
assert fsm.state == State.IDLE, "synth-arm invariant: FSM must be IDLE"
arm_color = "turquoise" if color == "dark_green" else "yellow"
direction = "BUY" if color == "dark_green" else "SELL"
tr_synth = fsm.feed(cast("DotColor", arm_color), now)
audit.log({
"ts": now,
"event": "tick",
"color": arm_color,
"state": tr_synth.next.value,
"reason": tr_synth.reason,
"catchup": True,
"synthesized_from": color,
})
notifier.send(Alert(
kind="arm",
title=f"{direction} armed ({arm_color}) — catchup",
body=f"catchup — session already armed at startup "
f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
direction=direction,
))
catchup = True
if not fsm.fired_in_session(direction):
arm_color = "turquoise" if color == "dark_green" else "yellow"
tr_synth = fsm.feed(cast("DotColor", arm_color), now)
audit.log({
"ts": now,
"event": "tick",
"color": arm_color,
"state": tr_synth.next.value,
"reason": tr_synth.reason,
"catchup": True,
"synthesized_from": color,
})
notifier.send(Alert(
kind="arm",
title=f"{direction} armed ({arm_color}) — catchup",
body=f"catchup — session already armed at startup "
f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=snap("catchup", f"catchup_arm_{direction.lower()}"),
direction=direction,
))
catchup = True
# Normal FSM feed
tr = fsm.feed(cast("DotColor", color), now)
@@ -414,16 +478,20 @@ def _handle_tick(
kind="arm",
title=f"{direction} armed ({color})",
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=snap("arm", f"arm_{direction.lower()}"),
direction=direction,
))
# PRIME: dark_green (BUY) / dark_red (SELL) — only on ARMED→PRIMED
elif tr.reason == "prime":
direction = "BUY" if tr.next == State.PRIMED_BUY else "SELL"
suffix = " — catchup" if catchup else ""
prime_kind = "catchup" if catchup else "prime"
prime_label = f"prime_{direction.lower()}_catchup" if catchup else f"prime_{direction.lower()}"
notifier.send(Alert(
kind="prime",
title=f"{direction} primed ({color}){suffix}",
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=snap(prime_kind, prime_label),
direction=direction,
))
return tr
@@ -452,7 +520,18 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
DiscordNotifier(cfg.discord.webhook_url),
TelegramNotifier(cfg.telegram.bot_token, cfg.telegram.chat_id),
]
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path))
def _on_drop(backend_name: str, dropped: Alert) -> None:
"""Audit queue-overflow drops so the silent failure becomes loud."""
audit.log({
"ts": time.time(),
"event": "queue_overflow_drop",
"backend": backend_name,
"dropped_title": dropped.title,
"dropped_kind": dropped.kind,
})
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
# Sanity check: capture one frame, confirm canary matches calibration.
first_frame = capture()
@@ -519,7 +598,19 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
if res.accepted and res.color:
is_first = first_accepted
first_accepted = False
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first)
# Per-iteration closure — binds current frame/now, gates on config.
def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True):
return None
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
)
tr = _handle_tick(
fsm, res.color, now, notifier, audit, is_first,
snapshot=_snapshot,
)
if tr is None:
# late_start short-circuit: FSM untouched, skip FIRE + corpus save
time.sleep(cfg.loop_interval_s)
@@ -535,15 +626,11 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
last_saved_color = res.color
# FIRE: annotate frame + save, attach to alert
if tr.trigger and not tr.locked:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fire_path = fires_dir / f"{ts_str}_{tr.trigger}.png"
try:
annotated = frame.copy()
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
cv2.imwrite(str(fire_path), annotated)
except Exception:
fire_path = None
fire_path: "Path | None" = None
if cfg.attach_screenshots.trigger:
fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
)
notifier.send(Alert(
kind="trigger",
title=f"{tr.trigger} signal",
@@ -567,16 +654,35 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
),
))
levels_extractor = None
# heartbeat
# heartbeat — include per-backend dispatch stats so silent failures
# surface every 30 min without waiting for shutdown.
if time.time() > heartbeat_due:
notifier.send(Alert(kind="heartbeat", title="alive", body="confidence ok"))
try:
stats = notifier.stats()
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
body_lines = ["confidence ok"]
for name, s in stats.items():
body_lines.append(
f"{name}: sent={s['sent']} failed={s['failed']} "
f"dropped={s['dropped']} retries={s['retries']}"
)
notifier.send(Alert(kind="heartbeat", title="alive", body="\n".join(body_lines)))
except Exception:
notifier.send(Alert(kind="heartbeat", title="alive", body="confidence ok"))
heartbeat_due = time.time() + cfg.heartbeat_min * 60
time.sleep(cfg.loop_interval_s)
finally:
try:
stats = notifier.stats()
lines = [f"after {time.monotonic() - start:.0f}s"]
for name, s in stats.items():
lines.append(
f"{name}: sent={s['sent']} failed={s['failed']} "
f"dropped={s['dropped']} retries={s['retries']}"
)
notifier.send(Alert(
kind="heartbeat", title="ATM stopped",
body=f"after {time.monotonic() - start:.0f}s",
body="\n".join(lines),
))
except Exception:
pass

View File

@@ -7,7 +7,7 @@ import time
from copy import copy
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Any, Callable
from . import Alert, Notifier
@@ -30,12 +30,14 @@ class FanoutNotifier:
queue_size: int = 50,
max_retries: int = 3,
backoff_base: float = 0.5,
on_drop: Callable[[str, Alert], None] | None = None,
) -> None:
self._backends = backends
self._dead_letter_path = Path(dead_letter_path)
self._queue_size = queue_size
self._max_retries = max_retries
self._backoff_base = backoff_base
self._on_drop = on_drop
self._dl_lock = threading.Lock()
self._queues: dict[str, queue.Queue[Any]] = {}
@@ -62,8 +64,13 @@ class FanoutNotifier:
stats = self._stats[backend.name]
if q.full():
try:
q.get_nowait()
dropped = q.get_nowait()
stats.dropped += 1
if self._on_drop is not None and isinstance(dropped, Alert):
try:
self._on_drop(backend.name, dropped)
except Exception:
pass # audit failure must never break dispatch
except queue.Empty:
pass
q.put(alert_copy)

View File

@@ -53,3 +53,15 @@ class TelegramNotifier:
raise RuntimeError(f"Telegram server error ({resp.status_code}): {resp.text}")
if resp.status_code != 200:
raise RuntimeError(f"Telegram unexpected status ({resp.status_code}): {resp.text}")
# Telegram Bot API returns 200 OK with {"ok": false, ...} for several
# logical failures (bot blocked, invalid chat_id, parse_mode errors).
# Parse body and raise on ok:false so Fanout retries + DLQs + stats count it.
try:
body = resp.json()
except ValueError:
return # non-JSON body — treat as success (shouldn't happen in practice)
if isinstance(body, dict) and body.get("ok") is False:
code = body.get("error_code", "?")
desc = body.get("description", "unknown")
raise RuntimeError(f"Telegram logical failure ({code}): {desc}")

View File

@@ -59,6 +59,16 @@ class StateMachine:
def state(self) -> State:
return self._state
def fired_in_session(self, direction: str) -> bool:
"""True if this FSM instance has recorded any FIRE in the given direction.
Used by catchup logic to suppress synth-arm after a cycle already fired.
Residual dark_green / dark_red dots for the remainder of the 15m window
look identical to startup-catchup from (color, state) alone, but must
be classified as noise, not a new prime signal.
"""
return direction in self._last_fire
# ------------------------------------------------------------------
# Core feed
# ------------------------------------------------------------------

99
tests/test_config.py Normal file
View File

@@ -0,0 +1,99 @@
"""Tests for atm.config — focused on attach_screenshots parsing (legacy bool vs new dict)."""
from __future__ import annotations
from atm.config import AlertsCfg, Config
_BASE = {
"window_title": "X",
"dot_roi": {"x": 0, "y": 0, "w": 10, "h": 10},
"chart_roi": {"x": 0, "y": 0, "w": 100, "h": 100},
"colors": {
"turquoise": {"rgb": [64, 224, 208], "tolerance": 30.0},
"yellow": {"rgb": [255, 215, 0], "tolerance": 30.0},
"dark_green": {"rgb": [0, 100, 0], "tolerance": 30.0},
"dark_red": {"rgb": [139, 0, 0], "tolerance": 30.0},
"light_green": {"rgb": [0, 230, 118], "tolerance": 30.0},
"light_red": {"rgb": [255, 82, 82], "tolerance": 30.0},
"gray": {"rgb": [128, 128, 128], "tolerance": 25.0},
},
"y_axis": {"p1_y": 100, "p1_price": 485.0, "p2_y": 200, "p2_price": 484.0},
"canary": {
"roi": {"x": 0, "y": 0, "w": 10, "h": 10},
"baseline_phash": "0" * 16,
"drift_threshold": 8,
},
"discord": {"webhook_url": "https://example.com/hook"},
"telegram": {"bot_token": "tok", "chat_id": "123"},
}
def _with_opts(opts: dict) -> dict:
d = {k: v for k, v in _BASE.items()}
d["options"] = opts
return d
def test_attach_screenshots_default_all_true() -> None:
"""Missing attach_screenshots → all fields True."""
cfg = Config._from_dict(_with_opts({}))
assert cfg.attach_screenshots == AlertsCfg(
late_start=True, catchup=True, arm=True, prime=True, trigger=True,
)
def test_attach_screenshots_legacy_bool_true() -> None:
"""Legacy: attach_screenshots = true → all fields True."""
cfg = Config._from_dict(_with_opts({"attach_screenshots": True}))
assert cfg.attach_screenshots.arm is True
assert cfg.attach_screenshots.catchup is True
assert cfg.attach_screenshots.trigger is True
def test_attach_screenshots_legacy_bool_false() -> None:
"""Legacy: attach_screenshots = false → all fields False."""
cfg = Config._from_dict(_with_opts({"attach_screenshots": False}))
assert cfg.attach_screenshots.arm is False
assert cfg.attach_screenshots.catchup is False
assert cfg.attach_screenshots.trigger is False
assert cfg.attach_screenshots.late_start is False
def test_attach_screenshots_partial_dict() -> None:
"""Dict form with only some keys → specified False, others default True."""
cfg = Config._from_dict(_with_opts({
"attach_screenshots": {"arm": False, "prime": False},
}))
assert cfg.attach_screenshots.arm is False
assert cfg.attach_screenshots.prime is False
# Unspecified → dataclass default True
assert cfg.attach_screenshots.trigger is True
assert cfg.attach_screenshots.catchup is True
assert cfg.attach_screenshots.late_start is True
def test_attach_screenshots_full_dict() -> None:
"""Dict form with all keys specified."""
cfg = Config._from_dict(_with_opts({
"attach_screenshots": {
"late_start": False,
"catchup": True,
"arm": False,
"prime": True,
"trigger": True,
},
}))
assert cfg.attach_screenshots.late_start is False
assert cfg.attach_screenshots.catchup is True
assert cfg.attach_screenshots.arm is False
assert cfg.attach_screenshots.prime is True
assert cfg.attach_screenshots.trigger is True
def test_attach_screenshots_unknown_keys_ignored() -> None:
"""Unknown keys are silently dropped (dataclass won't accept them)."""
cfg = Config._from_dict(_with_opts({
"attach_screenshots": {"arm": False, "nonexistent_knob": True},
}))
assert cfg.attach_screenshots.arm is False
# Should not raise even with unknown key

View File

@@ -256,3 +256,234 @@ def test_handle_tick_catchup_dark_green_when_not_first_accepted():
assert len(notif.alerts) == 2
assert notif.alerts[0].direction == "BUY"
assert notif.alerts[1].direction == "BUY"
# ---------------------------------------------------------------------------
# Regression: user bug 2026-04-16 (image.png). After a BUY FIRE, the chart
# shows residual dark_green dots for the rest of the 15m window. Those are
# noise, NOT a new prime signal. Previously triggered false catchup arm+prime
# because FSM returns to IDLE after fire and the catchup branch only checked
# (color, state). Now gated on fsm.fired_in_session(direction).
# ---------------------------------------------------------------------------
def test_handle_tick_dark_green_after_light_green_fire_no_catchup():
"""REGRESSION: post-FIRE dark_green must NOT re-arm catchup."""
fsm = StateMachine(lockout_s=240)
notif = FakeNotifier()
audit = FakeAudit()
# Full BUY cycle: arm → prime → fire
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False)
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False)
_handle_tick(fsm, "light_green", 3.0, notif, audit, first_accepted=False)
# After fire: FSM is IDLE, _last_fire[BUY]=3.0
assert fsm.state == State.IDLE
assert fsm.fired_in_session("BUY") is True
baseline_alerts = len(notif.alerts) # arm + prime (fire alert is handled in main, not here)
# Residual dark_green post-FIRE — must stay IDLE, must not fire any alert
tr = _handle_tick(fsm, "dark_green", 10.0, notif, audit, first_accepted=False)
assert fsm.state == State.IDLE
assert tr is not None
assert tr.reason == "noise"
assert len(notif.alerts) == baseline_alerts, (
f"post-FIRE dark_green falsely fired: "
f"new alerts {notif.alerts[baseline_alerts:]}"
)
def test_handle_tick_dark_red_after_light_red_fire_no_catchup():
"""REGRESSION mirror — SELL side."""
fsm = StateMachine(lockout_s=240)
notif = FakeNotifier()
audit = FakeAudit()
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False)
_handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False)
_handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False)
assert fsm.state == State.IDLE
assert fsm.fired_in_session("SELL") is True
baseline_alerts = len(notif.alerts)
tr = _handle_tick(fsm, "dark_red", 10.0, notif, audit, first_accepted=False)
assert fsm.state == State.IDLE
assert tr is not None
assert tr.reason == "noise"
assert len(notif.alerts) == baseline_alerts
def test_handle_tick_opposite_direction_catchup_after_fire():
"""After BUY fires, seeing dark_red at IDLE should STILL trigger SELL
catchup (direction-scoped gate, not session-wide). Proves the gate only
suppresses same-direction residual, not a genuine opposite-direction cycle
the user joined mid-way."""
fsm = StateMachine(lockout_s=240)
notif = FakeNotifier()
audit = FakeAudit()
# Fire BUY cycle
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False)
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False)
_handle_tick(fsm, "light_green", 3.0, notif, audit, first_accepted=False)
assert fsm.fired_in_session("BUY") is True
assert fsm.fired_in_session("SELL") is False
baseline_alerts = len(notif.alerts)
# Now dark_red at IDLE — SELL hasn't fired, so catchup must still work
tr = _handle_tick(fsm, "dark_red", 10.0, notif, audit, first_accepted=False)
assert tr is not None
assert tr.next == State.PRIMED_SELL
assert tr.reason == "prime"
# synth-arm alert + real prime alert = 2 new
assert len(notif.alerts) == baseline_alerts + 2
assert notif.alerts[baseline_alerts].kind == "arm"
assert notif.alerts[baseline_alerts].direction == "SELL"
assert "catchup" in (notif.alerts[baseline_alerts].title + notif.alerts[baseline_alerts].body).lower()
assert notif.alerts[baseline_alerts + 1].kind == "prime"
assert notif.alerts[baseline_alerts + 1].direction == "SELL"
# ---------------------------------------------------------------------------
# Snapshot callable: _handle_tick should call snapshot(kind, label) for each
# alert and attach the returned path to Alert.image_path. None default keeps
# existing tests oblivious.
# ---------------------------------------------------------------------------
def test_handle_tick_snapshot_called_for_each_alert():
"""snapshot callable invoked with (kind, label); returned path attached."""
from pathlib import Path
fsm = StateMachine(lockout_s=60)
notif = FakeNotifier()
audit = FakeAudit()
calls: list[tuple[str, str]] = []
def snap(kind: str, label: str):
calls.append((kind, label))
return Path(f"/tmp/fake_{label}.png")
# BUY cycle arm + prime (2 alerts, 2 snapshot calls)
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False, snapshot=snap)
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False, snapshot=snap)
assert len(notif.alerts) == 2
assert calls == [("arm", "arm_buy"), ("prime", "prime_buy")]
assert notif.alerts[0].image_path == Path("/tmp/fake_arm_buy.png")
assert notif.alerts[1].image_path == Path("/tmp/fake_prime_buy.png")
def test_handle_tick_snapshot_none_for_gated_kind():
"""snapshot() returning None (config-gated off) → Alert.image_path=None,
alert still sends text-only."""
fsm = StateMachine(lockout_s=60)
notif = FakeNotifier()
audit = FakeAudit()
def snap(kind: str, label: str):
# Simulate cfg.attach_screenshots.arm = False
return None if kind == "arm" else __import__("pathlib").Path(f"/tmp/{label}.png")
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False, snapshot=snap)
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False, snapshot=snap)
assert notif.alerts[0].image_path is None # arm gated off
assert notif.alerts[1].image_path is not None # prime went through
def test_handle_tick_snapshot_called_for_late_start():
"""late_start path also invokes snapshot."""
fsm = StateMachine(lockout_s=60)
notif = FakeNotifier()
audit = FakeAudit()
calls: list[tuple[str, str]] = []
def snap(kind: str, label: str):
calls.append((kind, label))
return None
_handle_tick(fsm, "light_green", 1.0, notif, audit, first_accepted=True, snapshot=snap)
assert len(notif.alerts) == 1
assert notif.alerts[0].kind == "late_start"
assert calls == [("late_start", "late_start_buy")]
def test_handle_tick_snapshot_called_for_catchup_prime():
"""catchup path: arm snapshot uses kind=catchup, prime snapshot uses
kind=catchup (so user's catchup toggle also gates the catchup prime)."""
fsm = StateMachine(lockout_s=60)
notif = FakeNotifier()
audit = FakeAudit()
calls: list[tuple[str, str]] = []
def snap(kind: str, label: str):
calls.append((kind, label))
return None
_handle_tick(fsm, "dark_red", 1.0, notif, audit, first_accepted=False, snapshot=snap)
# Synth-arm catchup alert + real prime alert (post-synth) — both tagged catchup
assert len(notif.alerts) == 2
assert calls == [("catchup", "catchup_arm_sell"), ("catchup", "prime_sell_catchup")]
# ---------------------------------------------------------------------------
# _save_annotated_frame — audit-log failures instead of swallowing silently.
# ---------------------------------------------------------------------------
def test_save_annotated_frame_logs_audit_on_failure(tmp_path, monkeypatch):
"""cv2.imwrite raising → return None AND audit.log event=snapshot_fail."""
import atm.main as main_mod
# Force the lazy cv2 import to succeed but fail on imwrite
class _FakeCv2:
@staticmethod
def rectangle(*a, **kw): pass
@staticmethod
def imwrite(*a, **kw):
raise OSError("disk full")
import sys
monkeypatch.setitem(sys.modules, "cv2", _FakeCv2)
cfg = type("C", (), {"dot_roi": type("R", (), {"x": 0, "y": 0, "w": 10, "h": 10})()})()
frame = type("F", (), {"copy": lambda self: self})() # dummy with .copy()
audit = FakeAudit()
result = main_mod._save_annotated_frame(frame, cfg, tmp_path, "test_label", 123.0, audit=audit)
assert result is None
assert any(e.get("event") == "snapshot_fail" and e.get("label") == "test_label"
for e in audit.events)
def test_save_annotated_frame_succeeds(tmp_path, monkeypatch):
"""Happy path: cv2 present + imwrite succeeds → returns path."""
import atm.main as main_mod
written: list[str] = []
class _FakeCv2:
@staticmethod
def rectangle(*a, **kw): pass
@staticmethod
def imwrite(path, _img):
written.append(path)
import sys
monkeypatch.setitem(sys.modules, "cv2", _FakeCv2)
cfg = type("C", (), {"dot_roi": type("R", (), {"x": 0, "y": 0, "w": 10, "h": 10})()})()
frame = type("F", (), {"copy": lambda self: self})()
audit = FakeAudit()
result = main_mod._save_annotated_frame(frame, cfg, tmp_path, "BUY", 1700000000.0, audit=audit)
assert result is not None
assert result.parent == tmp_path
assert "BUY" in result.name
assert len(written) == 1
assert not any(e.get("event") == "snapshot_fail" for e in audit.events)

View File

@@ -156,19 +156,43 @@ def test_stop_drains(tmp_path: Path) -> None:
# ---------------------------------------------------------------------------
class _MockResponse:
def __init__(self, status_code: int, text: str = "") -> None:
def __init__(
self,
status_code: int,
text: str = "",
json_body: dict | None = None,
raise_on_json: bool = False,
) -> None:
self.status_code = status_code
self.text = text
self._json_body = json_body if json_body is not None else {"ok": True, "result": {}}
self._raise_on_json = raise_on_json
def json(self):
if self._raise_on_json:
raise ValueError("no JSON body")
return self._json_body
class _MockSession:
def __init__(self, status_code: int = 204) -> None:
def __init__(
self,
status_code: int = 204,
json_body: dict | None = None,
raise_on_json: bool = False,
) -> None:
self.status_code = status_code
self._json_body = json_body
self._raise_on_json = raise_on_json
self.calls: list[dict] = []
def post(self, url: str, **kwargs):
self.calls.append({"url": url, **kwargs})
return _MockResponse(self.status_code)
return _MockResponse(
self.status_code,
json_body=self._json_body,
raise_on_json=self._raise_on_json,
)
def test_discord_send_ok() -> None:
@@ -219,3 +243,118 @@ def test_telegram_5xx_raises() -> None:
n = TelegramNotifier("token", "chat123", session=_MockSession(500))
with pytest.raises(RuntimeError, match="500"):
n.send(_alert("x"))
# Telegram returns 200 OK with {"ok": false, ...} for logical failures (bot
# blocked, invalid chat_id, parse_mode errors). Previously silent — now raises
# so FanoutNotifier retries + DLQs + stats count the failure.
def test_telegram_ok_true_passes() -> None:
"""200 + {ok:true} → success, no raise."""
from atm.notifier.telegram import TelegramNotifier
session = _MockSession(200, json_body={"ok": True, "result": {"message_id": 42}})
n = TelegramNotifier("token", "chat123", session=session)
n.send(_alert("ok body")) # must not raise
assert len(session.calls) == 1
def test_telegram_ok_false_raises() -> None:
"""200 + {ok:false, ...} → RuntimeError with code + description."""
from atm.notifier.telegram import TelegramNotifier
session = _MockSession(
200,
json_body={
"ok": False,
"error_code": 403,
"description": "Forbidden: bot was blocked by the user",
},
)
n = TelegramNotifier("token", "chat123", session=session)
with pytest.raises(RuntimeError, match="logical failure.*403.*blocked"):
n.send(_alert("x"))
def test_telegram_malformed_json_treated_as_success() -> None:
"""200 with non-JSON body → no raise (edge case, shouldn't happen in practice)."""
from atm.notifier.telegram import TelegramNotifier
session = _MockSession(200, raise_on_json=True)
n = TelegramNotifier("token", "chat123", session=session)
n.send(_alert("x")) # must not raise
def test_telegram_ok_false_goes_to_dlq(tmp_path: Path) -> None:
"""Integration: ok:false → 3 retries → DLQ entry written with description."""
from atm.notifier.telegram import TelegramNotifier
session = _MockSession(
200,
json_body={"ok": False, "error_code": 400, "description": "chat not found"},
)
backend = TelegramNotifier("token", "chat123", session=session)
dl = tmp_path / "dead.jsonl"
fan = FanoutNotifier([backend], dl, max_retries=3, backoff_base=0.01)
fan.send(_alert("will-fail"))
fan.stop(timeout=5.0)
# 4 HTTP calls (1 initial + 3 retries)
assert len(session.calls) == 4
s = fan.stats()
assert s["telegram"]["failed"] == 1
assert s["telegram"]["retries"] == 3
assert s["telegram"]["sent"] == 0
assert dl.exists()
lines = [json.loads(l) for l in dl.read_text().splitlines()]
assert len(lines) == 1
entry = lines[0]
assert entry["backend"] == "telegram"
assert entry["alert_title"] == "will-fail"
assert "chat not found" in entry["error_str"]
assert "400" in entry["error_str"]
# ---------------------------------------------------------------------------
# on_drop callback — queue overflow audit trail
# ---------------------------------------------------------------------------
def test_fanout_on_drop_callback_invoked(tmp_path: Path) -> None:
"""Queue-overflow drop calls on_drop(backend_name, dropped_alert)."""
drops: list[tuple[str, Alert]] = []
def on_drop(name: str, alert: Alert) -> None:
drops.append((name, alert))
dl = tmp_path / "dead.jsonl"
slow = FakeBackend("slow", sleep_s=0.2)
fan = FanoutNotifier(
[slow], dl, queue_size=2, backoff_base=0.01, on_drop=on_drop,
)
for i in range(10):
fan.send(_alert(f"a{i}"))
fan.stop(timeout=10.0)
assert len(drops) > 0
assert all(name == "slow" for name, _ in drops)
# Oldest alerts are the ones dropped
dropped_titles = {a.title for _, a in drops}
assert "a0" in dropped_titles or "a1" in dropped_titles
def test_fanout_on_drop_exception_swallowed(tmp_path: Path) -> None:
"""on_drop raising must not break dispatch — audit failure must not silence alerts."""
def bad_on_drop(_name: str, _alert: Alert) -> None:
raise RuntimeError("audit broken")
dl = tmp_path / "dead.jsonl"
slow = FakeBackend("slow", sleep_s=0.2)
fan = FanoutNotifier(
[slow], dl, queue_size=2, backoff_base=0.01, on_drop=bad_on_drop,
)
# Must not raise despite every drop invoking bad_on_drop
for i in range(10):
fan.send(_alert(f"a{i}"))
fan.stop(timeout=10.0)
s = fan.stats()
# Some alerts still went through
assert s["slow"]["sent"] > 0 or s["slow"]["dropped"] > 0

View File

@@ -269,6 +269,58 @@ def test_refresh_arm_ts() -> None:
assert t2.arm_ts == 9.0
# ---------------------------------------------------------------------------
# 11. fired_in_session — public API for catchup suppression after fire
# ---------------------------------------------------------------------------
def test_fired_in_session_fresh_fsm() -> None:
"""Fresh FSM — neither direction has fired."""
sm = StateMachine()
assert sm.fired_in_session("BUY") is False
assert sm.fired_in_session("SELL") is False
def test_fired_in_session_after_buy_fire() -> None:
"""After a BUY fire: BUY=True, SELL=False (direction-scoped)."""
sm = StateMachine(lockout_s=240)
sm.feed("turquoise", 1.0)
sm.feed("dark_green", 2.0)
t = sm.feed("light_green", 3.0)
assert t.reason == "fire"
assert sm.fired_in_session("BUY") is True
assert sm.fired_in_session("SELL") is False
def test_fired_in_session_after_sell_fire() -> None:
"""Mirror — after SELL fire: SELL=True, BUY=False."""
sm = StateMachine(lockout_s=240)
sm.feed("yellow", 1.0)
sm.feed("dark_red", 2.0)
t = sm.feed("light_red", 3.0)
assert t.reason == "fire"
assert sm.fired_in_session("SELL") is True
assert sm.fired_in_session("BUY") is False
def test_fired_in_session_both_directions() -> None:
"""Fire both directions — both True."""
sm = StateMachine(lockout_s=240)
# BUY cycle
sm.feed("turquoise", 1.0)
sm.feed("dark_green", 2.0)
sm.feed("light_green", 3.0)
# SELL cycle
sm.feed("yellow", 100.0)
sm.feed("dark_red", 101.0)
sm.feed("light_red", 102.0)
assert sm.fired_in_session("BUY") is True
assert sm.fired_in_session("SELL") is True
# ---------------------------------------------------------------------------
# 11. exhaustive — parameterize over every (state, color) pair
# ---------------------------------------------------------------------------