diff --git a/README.md b/README.md index 63de472..2190508 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,7 @@ atm report --week 2026-16 # weekly win rate + R PnL + slippage | `WARN: no window contains 'xxx'` at startup | `cfg.window_title` substring matches nothing | Edit `window_title` in TOML to a substring that's unique to TradeStation. | | No alerts even after trigger ought to fire | Check `logs/YYYY-MM-DD.jsonl` for `event=tick` entries — are colours accepted? Is `trigger` ever set? | If always UNKNOWN → tolerances too tight. If `trigger` but `locked=true` → lockout from prior fire, normal. | | Discord OK, Telegram silent (or vice versa) | `logs/dead_letter.jsonl` contains failed alerts with error | Fix credentials in TOML, restart. | +| Heartbeat shows `telegram: failed > 0` | Telegram returned `ok:false` (bot blocked, invalid chat_id, parse error) | Check `logs/dead_letter.jsonl` for the `error_str` / `description` field. Common: bot never started by user in Telegram, or wrong `chat_id` flavor (channel vs group vs DM). | | Debug circle on mid-strip instead of right edge | Anti-aliasing bridges dots in the mask | Already fixed via erosion+connected-components — ensure `git pull` is current. | | Wizard window is tiny / image not visible | Tk geometry default on Windows | Already fixed — `git pull`. Image is scaled to fit screen. | diff --git a/configs/example.toml b/configs/example.toml index bce326a..cdcb3c9 100644 --- a/configs/example.toml +++ b/configs/example.toml @@ -80,3 +80,12 @@ low_conf_threshold = 0.2 low_conf_run = 3 phaseb_timeout_s = 600 dead_letter_path = "logs/dead_letter.jsonl" + +# Per-kind screenshot-attach toggles. All default to true on upgrade. +# Accepts either a bare bool (legacy: attach_screenshots = true) or this table. +[options.attach_screenshots] +late_start = true # screenshot on startup-late alerts +catchup = true # screenshot on mid-session catchup arm + prime +arm = true # screenshot on normal arm (turquoise/yellow) — noisiest +prime = true # screenshot on normal prime (dark_green/dark_red) +trigger = true # screenshot on FIRE diff --git a/src/atm/config.py b/src/atm/config.py index 3c1856b..4ff6ba2 100644 --- a/src/atm/config.py +++ b/src/atm/config.py @@ -2,7 +2,7 @@ from __future__ import annotations import tomllib -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Literal @@ -84,6 +84,16 @@ class TelegramCfg: raise ValueError("telegram bot_token + chat_id required") +@dataclass(frozen=True) +class AlertsCfg: + """Per-kind screenshot-attach toggles. Default all True — zero behavior change.""" + late_start: bool = True + catchup: bool = True + arm: bool = True + prime: bool = True + trigger: bool = True + + @dataclass(frozen=True) class Config: window_title: str @@ -103,6 +113,7 @@ class Config: low_conf_run: int = 3 phaseb_timeout_s: int = 600 dead_letter_path: str = "logs/dead_letter.jsonl" + attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg) config_version: str = "unknown" def __post_init__(self) -> None: @@ -153,6 +164,18 @@ class Config: region = None if "chart_window_region" in data: region = ROI(**data["chart_window_region"]) + scr = opts.get("attach_screenshots") + if isinstance(scr, bool): + attach = AlertsCfg( + late_start=scr, catchup=scr, arm=scr, prime=scr, trigger=scr, + ) + elif isinstance(scr, dict): + fields_set = set(AlertsCfg.__dataclass_fields__) + attach = AlertsCfg( + **{k: bool(v) for k, v in scr.items() if k in fields_set} + ) + else: + attach = AlertsCfg() return cls( window_title=data["window_title"], dot_roi=roi, @@ -171,5 +194,6 @@ class Config: low_conf_run=int(opts.get("low_conf_run", 3)), phaseb_timeout_s=int(opts.get("phaseb_timeout_s", 600)), dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"), + attach_screenshots=attach, config_version=version, ) diff --git a/src/atm/main.py b/src/atm/main.py index 3eb72ec..69bdd9e 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -7,7 +7,7 @@ import sys import time from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Protocol, cast +from typing import TYPE_CHECKING, Callable, Protocol, cast from atm.config import Config # stdlib-only (tomllib); safe at module level from atm.notifier import Alert @@ -17,6 +17,12 @@ if TYPE_CHECKING: from atm.state_machine import DotColor +# Snapshot closure: called by _handle_tick when an alert is about to fire. +# Takes (kind, label); returns path to annotated screenshot or None (gated off, +# cv2 missing, or write failure — in which case the alert still sends text-only). +Snapshot = Callable[[str, str], "Path | None"] + + class _NotifierLike(Protocol): def send(self, alert: Alert) -> None: ... @@ -335,6 +341,47 @@ def _cmd_report(args) -> None: # Live loop # --------------------------------------------------------------------------- +def _save_annotated_frame( + frame, + cfg, + fires_dir: Path, + label: str, + now: float, + audit: _AuditLike | None = None, +) -> "Path | None": + """Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``. + + Returns the path on success, ``None`` on any error. Failures are logged to + audit (when provided) so disk-full / permission issues don't become silent + regressions. Never raises — snapshot is a best-effort enhancement, the + text alert must still go out. + """ + try: + import cv2 # type: ignore[import-untyped] + except ImportError as exc: + if audit is not None: + try: + audit.log({"ts": now, "event": "snapshot_fail", "label": label, "error": f"cv2 missing: {exc}"}) + except Exception: + pass + return None + try: + ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") + fpath = fires_dir / f"{ts_str}_{label}.png" + annotated = frame.copy() + x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h + cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) + cv2.imwrite(str(fpath), annotated) + return fpath + except Exception as exc: + if audit is not None: + try: + audit.log({"ts": now, "event": "snapshot_fail", "label": label, "error": str(exc)}) + except Exception: + pass + return None + + def _handle_tick( fsm: StateMachine, color: str, @@ -342,14 +389,21 @@ def _handle_tick( notifier: _NotifierLike, audit: _AuditLike, first_accepted: bool, + snapshot: Snapshot | None = None, ) -> Transition | None: """Feed FSM for a single accepted color and dispatch arm/prime/late_start alerts. Returns the final Transition, or None when the color triggered a late-start short-circuit (FSM untouched; caller should skip FIRE handling). + When ``snapshot`` is provided it is called with ``(kind, label)`` for each + alert; it returns the PNG path to attach or None (gated off / write failed). + Default None keeps test call sites numpy-free. + Pure in the sense that all state lives in the arguments — safe to unit-test with a FakeNotifier and FakeAudit. """ + snap: Snapshot = snapshot or (lambda _k, _l: None) + # Late start: the very first accepted color is already at FIRE phase. # User came online after the trade signal fired — warn and skip FSM feed. if first_accepted and color in ("light_green", "light_red") and fsm.state == State.IDLE: @@ -363,6 +417,7 @@ def _handle_tick( kind="late_start", title=f"ATM started late — {direction} already fired", body=f"Observed {color} at startup. Check chart manually.", + image_path=snap("late_start", f"late_start_{direction.lower()}"), direction=direction, )) return None @@ -370,29 +425,38 @@ def _handle_tick( # Catchup synth-arm: first accepted color is already at PRIME phase. # Drive FSM through a synthetic arm so the real PRIME transition fires a # normal prime alert below. Audit entry is tagged catchup:true. + # + # Guard against post-FIRE residual dark_* dots: after a light_green fires + # the cycle, FSM returns to IDLE but dark_green dots continue for the rest + # of the 15m window. Those are tail noise, NOT a new prime signal. The + # direction-scoped fired_in_session() check suppresses synth-arm in that + # case — the tick falls through to the normal _from_idle feed below which + # classifies it as noise. (A genuine new cycle always comes with fresh + # turquoise/yellow first and drives _from_idle → ARMED normally.) catchup = False if color in ("dark_green", "dark_red") and fsm.state == State.IDLE: - assert fsm.state == State.IDLE, "synth-arm invariant: FSM must be IDLE" - arm_color = "turquoise" if color == "dark_green" else "yellow" direction = "BUY" if color == "dark_green" else "SELL" - tr_synth = fsm.feed(cast("DotColor", arm_color), now) - audit.log({ - "ts": now, - "event": "tick", - "color": arm_color, - "state": tr_synth.next.value, - "reason": tr_synth.reason, - "catchup": True, - "synthesized_from": color, - }) - notifier.send(Alert( - kind="arm", - title=f"{direction} armed ({arm_color}) — catchup", - body=f"catchup — session already armed at startup " - f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", - direction=direction, - )) - catchup = True + if not fsm.fired_in_session(direction): + arm_color = "turquoise" if color == "dark_green" else "yellow" + tr_synth = fsm.feed(cast("DotColor", arm_color), now) + audit.log({ + "ts": now, + "event": "tick", + "color": arm_color, + "state": tr_synth.next.value, + "reason": tr_synth.reason, + "catchup": True, + "synthesized_from": color, + }) + notifier.send(Alert( + kind="arm", + title=f"{direction} armed ({arm_color}) — catchup", + body=f"catchup — session already armed at startup " + f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", + image_path=snap("catchup", f"catchup_arm_{direction.lower()}"), + direction=direction, + )) + catchup = True # Normal FSM feed tr = fsm.feed(cast("DotColor", color), now) @@ -414,16 +478,20 @@ def _handle_tick( kind="arm", title=f"{direction} armed ({color})", body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", + image_path=snap("arm", f"arm_{direction.lower()}"), direction=direction, )) # PRIME: dark_green (BUY) / dark_red (SELL) — only on ARMED→PRIMED elif tr.reason == "prime": direction = "BUY" if tr.next == State.PRIMED_BUY else "SELL" suffix = " — catchup" if catchup else "" + prime_kind = "catchup" if catchup else "prime" + prime_label = f"prime_{direction.lower()}_catchup" if catchup else f"prime_{direction.lower()}" notifier.send(Alert( kind="prime", title=f"{direction} primed ({color}){suffix}", body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", + image_path=snap(prime_kind, prime_label), direction=direction, )) return tr @@ -452,7 +520,18 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: DiscordNotifier(cfg.discord.webhook_url), TelegramNotifier(cfg.telegram.bot_token, cfg.telegram.chat_id), ] - notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path)) + + def _on_drop(backend_name: str, dropped: Alert) -> None: + """Audit queue-overflow drops so the silent failure becomes loud.""" + audit.log({ + "ts": time.time(), + "event": "queue_overflow_drop", + "backend": backend_name, + "dropped_title": dropped.title, + "dropped_kind": dropped.kind, + }) + + notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop) # Sanity check: capture one frame, confirm canary matches calibration. first_frame = capture() @@ -519,7 +598,19 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: if res.accepted and res.color: is_first = first_accepted first_accepted = False - tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first) + + # Per-iteration closure — binds current frame/now, gates on config. + def _snapshot(kind: str, label: str) -> "Path | None": + if not getattr(cfg.attach_screenshots, kind, True): + return None + return _save_annotated_frame( + frame, cfg, fires_dir, label, now, audit=audit, + ) + + tr = _handle_tick( + fsm, res.color, now, notifier, audit, is_first, + snapshot=_snapshot, + ) if tr is None: # late_start short-circuit: FSM untouched, skip FIRE + corpus save time.sleep(cfg.loop_interval_s) @@ -535,15 +626,11 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: last_saved_color = res.color # FIRE: annotate frame + save, attach to alert if tr.trigger and not tr.locked: - ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") - fire_path = fires_dir / f"{ts_str}_{tr.trigger}.png" - try: - annotated = frame.copy() - x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h - cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) - cv2.imwrite(str(fire_path), annotated) - except Exception: - fire_path = None + fire_path: "Path | None" = None + if cfg.attach_screenshots.trigger: + fire_path = _save_annotated_frame( + frame, cfg, fires_dir, tr.trigger, now, audit=audit, + ) notifier.send(Alert( kind="trigger", title=f"{tr.trigger} signal", @@ -567,16 +654,35 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: ), )) levels_extractor = None - # heartbeat + # heartbeat — include per-backend dispatch stats so silent failures + # surface every 30 min without waiting for shutdown. if time.time() > heartbeat_due: - notifier.send(Alert(kind="heartbeat", title="alive", body="confidence ok")) + try: + stats = notifier.stats() + audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats}) + body_lines = ["confidence ok"] + for name, s in stats.items(): + body_lines.append( + f"{name}: sent={s['sent']} failed={s['failed']} " + f"dropped={s['dropped']} retries={s['retries']}" + ) + notifier.send(Alert(kind="heartbeat", title="alive", body="\n".join(body_lines))) + except Exception: + notifier.send(Alert(kind="heartbeat", title="alive", body="confidence ok")) heartbeat_due = time.time() + cfg.heartbeat_min * 60 time.sleep(cfg.loop_interval_s) finally: try: + stats = notifier.stats() + lines = [f"after {time.monotonic() - start:.0f}s"] + for name, s in stats.items(): + lines.append( + f"{name}: sent={s['sent']} failed={s['failed']} " + f"dropped={s['dropped']} retries={s['retries']}" + ) notifier.send(Alert( kind="heartbeat", title="ATM stopped", - body=f"after {time.monotonic() - start:.0f}s", + body="\n".join(lines), )) except Exception: pass diff --git a/src/atm/notifier/fanout.py b/src/atm/notifier/fanout.py index db8c113..298b6f1 100644 --- a/src/atm/notifier/fanout.py +++ b/src/atm/notifier/fanout.py @@ -7,7 +7,7 @@ import time from copy import copy from dataclasses import dataclass from pathlib import Path -from typing import Any +from typing import Any, Callable from . import Alert, Notifier @@ -30,12 +30,14 @@ class FanoutNotifier: queue_size: int = 50, max_retries: int = 3, backoff_base: float = 0.5, + on_drop: Callable[[str, Alert], None] | None = None, ) -> None: self._backends = backends self._dead_letter_path = Path(dead_letter_path) self._queue_size = queue_size self._max_retries = max_retries self._backoff_base = backoff_base + self._on_drop = on_drop self._dl_lock = threading.Lock() self._queues: dict[str, queue.Queue[Any]] = {} @@ -62,8 +64,13 @@ class FanoutNotifier: stats = self._stats[backend.name] if q.full(): try: - q.get_nowait() + dropped = q.get_nowait() stats.dropped += 1 + if self._on_drop is not None and isinstance(dropped, Alert): + try: + self._on_drop(backend.name, dropped) + except Exception: + pass # audit failure must never break dispatch except queue.Empty: pass q.put(alert_copy) diff --git a/src/atm/notifier/telegram.py b/src/atm/notifier/telegram.py index fe76d26..b54f704 100644 --- a/src/atm/notifier/telegram.py +++ b/src/atm/notifier/telegram.py @@ -53,3 +53,15 @@ class TelegramNotifier: raise RuntimeError(f"Telegram server error ({resp.status_code}): {resp.text}") if resp.status_code != 200: raise RuntimeError(f"Telegram unexpected status ({resp.status_code}): {resp.text}") + + # Telegram Bot API returns 200 OK with {"ok": false, ...} for several + # logical failures (bot blocked, invalid chat_id, parse_mode errors). + # Parse body and raise on ok:false so Fanout retries + DLQs + stats count it. + try: + body = resp.json() + except ValueError: + return # non-JSON body — treat as success (shouldn't happen in practice) + if isinstance(body, dict) and body.get("ok") is False: + code = body.get("error_code", "?") + desc = body.get("description", "unknown") + raise RuntimeError(f"Telegram logical failure ({code}): {desc}") diff --git a/src/atm/state_machine.py b/src/atm/state_machine.py index 031c5e3..f4e92d2 100644 --- a/src/atm/state_machine.py +++ b/src/atm/state_machine.py @@ -59,6 +59,16 @@ class StateMachine: def state(self) -> State: return self._state + def fired_in_session(self, direction: str) -> bool: + """True if this FSM instance has recorded any FIRE in the given direction. + + Used by catchup logic to suppress synth-arm after a cycle already fired. + Residual dark_green / dark_red dots for the remainder of the 15m window + look identical to startup-catchup from (color, state) alone, but must + be classified as noise, not a new prime signal. + """ + return direction in self._last_fire + # ------------------------------------------------------------------ # Core feed # ------------------------------------------------------------------ diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..f864366 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,99 @@ +"""Tests for atm.config — focused on attach_screenshots parsing (legacy bool vs new dict).""" +from __future__ import annotations + +from atm.config import AlertsCfg, Config + + +_BASE = { + "window_title": "X", + "dot_roi": {"x": 0, "y": 0, "w": 10, "h": 10}, + "chart_roi": {"x": 0, "y": 0, "w": 100, "h": 100}, + "colors": { + "turquoise": {"rgb": [64, 224, 208], "tolerance": 30.0}, + "yellow": {"rgb": [255, 215, 0], "tolerance": 30.0}, + "dark_green": {"rgb": [0, 100, 0], "tolerance": 30.0}, + "dark_red": {"rgb": [139, 0, 0], "tolerance": 30.0}, + "light_green": {"rgb": [0, 230, 118], "tolerance": 30.0}, + "light_red": {"rgb": [255, 82, 82], "tolerance": 30.0}, + "gray": {"rgb": [128, 128, 128], "tolerance": 25.0}, + }, + "y_axis": {"p1_y": 100, "p1_price": 485.0, "p2_y": 200, "p2_price": 484.0}, + "canary": { + "roi": {"x": 0, "y": 0, "w": 10, "h": 10}, + "baseline_phash": "0" * 16, + "drift_threshold": 8, + }, + "discord": {"webhook_url": "https://example.com/hook"}, + "telegram": {"bot_token": "tok", "chat_id": "123"}, +} + + +def _with_opts(opts: dict) -> dict: + d = {k: v for k, v in _BASE.items()} + d["options"] = opts + return d + + +def test_attach_screenshots_default_all_true() -> None: + """Missing attach_screenshots → all fields True.""" + cfg = Config._from_dict(_with_opts({})) + assert cfg.attach_screenshots == AlertsCfg( + late_start=True, catchup=True, arm=True, prime=True, trigger=True, + ) + + +def test_attach_screenshots_legacy_bool_true() -> None: + """Legacy: attach_screenshots = true → all fields True.""" + cfg = Config._from_dict(_with_opts({"attach_screenshots": True})) + assert cfg.attach_screenshots.arm is True + assert cfg.attach_screenshots.catchup is True + assert cfg.attach_screenshots.trigger is True + + +def test_attach_screenshots_legacy_bool_false() -> None: + """Legacy: attach_screenshots = false → all fields False.""" + cfg = Config._from_dict(_with_opts({"attach_screenshots": False})) + assert cfg.attach_screenshots.arm is False + assert cfg.attach_screenshots.catchup is False + assert cfg.attach_screenshots.trigger is False + assert cfg.attach_screenshots.late_start is False + + +def test_attach_screenshots_partial_dict() -> None: + """Dict form with only some keys → specified False, others default True.""" + cfg = Config._from_dict(_with_opts({ + "attach_screenshots": {"arm": False, "prime": False}, + })) + assert cfg.attach_screenshots.arm is False + assert cfg.attach_screenshots.prime is False + # Unspecified → dataclass default True + assert cfg.attach_screenshots.trigger is True + assert cfg.attach_screenshots.catchup is True + assert cfg.attach_screenshots.late_start is True + + +def test_attach_screenshots_full_dict() -> None: + """Dict form with all keys specified.""" + cfg = Config._from_dict(_with_opts({ + "attach_screenshots": { + "late_start": False, + "catchup": True, + "arm": False, + "prime": True, + "trigger": True, + }, + })) + assert cfg.attach_screenshots.late_start is False + assert cfg.attach_screenshots.catchup is True + assert cfg.attach_screenshots.arm is False + assert cfg.attach_screenshots.prime is True + assert cfg.attach_screenshots.trigger is True + + +def test_attach_screenshots_unknown_keys_ignored() -> None: + """Unknown keys are silently dropped (dataclass won't accept them).""" + cfg = Config._from_dict(_with_opts({ + "attach_screenshots": {"arm": False, "nonexistent_knob": True}, + })) + assert cfg.attach_screenshots.arm is False + # Should not raise even with unknown key diff --git a/tests/test_handle_tick.py b/tests/test_handle_tick.py index 20c40e2..6ecbab3 100644 --- a/tests/test_handle_tick.py +++ b/tests/test_handle_tick.py @@ -256,3 +256,234 @@ def test_handle_tick_catchup_dark_green_when_not_first_accepted(): assert len(notif.alerts) == 2 assert notif.alerts[0].direction == "BUY" assert notif.alerts[1].direction == "BUY" + + +# --------------------------------------------------------------------------- +# Regression: user bug 2026-04-16 (image.png). After a BUY FIRE, the chart +# shows residual dark_green dots for the rest of the 15m window. Those are +# noise, NOT a new prime signal. Previously triggered false catchup arm+prime +# because FSM returns to IDLE after fire and the catchup branch only checked +# (color, state). Now gated on fsm.fired_in_session(direction). +# --------------------------------------------------------------------------- + +def test_handle_tick_dark_green_after_light_green_fire_no_catchup(): + """REGRESSION: post-FIRE dark_green must NOT re-arm catchup.""" + fsm = StateMachine(lockout_s=240) + notif = FakeNotifier() + audit = FakeAudit() + + # Full BUY cycle: arm → prime → fire + _handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False) + _handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False) + _handle_tick(fsm, "light_green", 3.0, notif, audit, first_accepted=False) + # After fire: FSM is IDLE, _last_fire[BUY]=3.0 + assert fsm.state == State.IDLE + assert fsm.fired_in_session("BUY") is True + baseline_alerts = len(notif.alerts) # arm + prime (fire alert is handled in main, not here) + + # Residual dark_green post-FIRE — must stay IDLE, must not fire any alert + tr = _handle_tick(fsm, "dark_green", 10.0, notif, audit, first_accepted=False) + + assert fsm.state == State.IDLE + assert tr is not None + assert tr.reason == "noise" + assert len(notif.alerts) == baseline_alerts, ( + f"post-FIRE dark_green falsely fired: " + f"new alerts {notif.alerts[baseline_alerts:]}" + ) + + +def test_handle_tick_dark_red_after_light_red_fire_no_catchup(): + """REGRESSION mirror — SELL side.""" + fsm = StateMachine(lockout_s=240) + notif = FakeNotifier() + audit = FakeAudit() + + _handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False) + _handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False) + _handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False) + assert fsm.state == State.IDLE + assert fsm.fired_in_session("SELL") is True + baseline_alerts = len(notif.alerts) + + tr = _handle_tick(fsm, "dark_red", 10.0, notif, audit, first_accepted=False) + + assert fsm.state == State.IDLE + assert tr is not None + assert tr.reason == "noise" + assert len(notif.alerts) == baseline_alerts + + +def test_handle_tick_opposite_direction_catchup_after_fire(): + """After BUY fires, seeing dark_red at IDLE should STILL trigger SELL + catchup (direction-scoped gate, not session-wide). Proves the gate only + suppresses same-direction residual, not a genuine opposite-direction cycle + the user joined mid-way.""" + fsm = StateMachine(lockout_s=240) + notif = FakeNotifier() + audit = FakeAudit() + + # Fire BUY cycle + _handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False) + _handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False) + _handle_tick(fsm, "light_green", 3.0, notif, audit, first_accepted=False) + assert fsm.fired_in_session("BUY") is True + assert fsm.fired_in_session("SELL") is False + baseline_alerts = len(notif.alerts) + + # Now dark_red at IDLE — SELL hasn't fired, so catchup must still work + tr = _handle_tick(fsm, "dark_red", 10.0, notif, audit, first_accepted=False) + + assert tr is not None + assert tr.next == State.PRIMED_SELL + assert tr.reason == "prime" + # synth-arm alert + real prime alert = 2 new + assert len(notif.alerts) == baseline_alerts + 2 + assert notif.alerts[baseline_alerts].kind == "arm" + assert notif.alerts[baseline_alerts].direction == "SELL" + assert "catchup" in (notif.alerts[baseline_alerts].title + notif.alerts[baseline_alerts].body).lower() + assert notif.alerts[baseline_alerts + 1].kind == "prime" + assert notif.alerts[baseline_alerts + 1].direction == "SELL" + + +# --------------------------------------------------------------------------- +# Snapshot callable: _handle_tick should call snapshot(kind, label) for each +# alert and attach the returned path to Alert.image_path. None default keeps +# existing tests oblivious. +# --------------------------------------------------------------------------- + +def test_handle_tick_snapshot_called_for_each_alert(): + """snapshot callable invoked with (kind, label); returned path attached.""" + from pathlib import Path + + fsm = StateMachine(lockout_s=60) + notif = FakeNotifier() + audit = FakeAudit() + calls: list[tuple[str, str]] = [] + + def snap(kind: str, label: str): + calls.append((kind, label)) + return Path(f"/tmp/fake_{label}.png") + + # BUY cycle arm + prime (2 alerts, 2 snapshot calls) + _handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False, snapshot=snap) + _handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False, snapshot=snap) + + assert len(notif.alerts) == 2 + assert calls == [("arm", "arm_buy"), ("prime", "prime_buy")] + assert notif.alerts[0].image_path == Path("/tmp/fake_arm_buy.png") + assert notif.alerts[1].image_path == Path("/tmp/fake_prime_buy.png") + + +def test_handle_tick_snapshot_none_for_gated_kind(): + """snapshot() returning None (config-gated off) → Alert.image_path=None, + alert still sends text-only.""" + fsm = StateMachine(lockout_s=60) + notif = FakeNotifier() + audit = FakeAudit() + + def snap(kind: str, label: str): + # Simulate cfg.attach_screenshots.arm = False + return None if kind == "arm" else __import__("pathlib").Path(f"/tmp/{label}.png") + + _handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False, snapshot=snap) + _handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False, snapshot=snap) + + assert notif.alerts[0].image_path is None # arm gated off + assert notif.alerts[1].image_path is not None # prime went through + + +def test_handle_tick_snapshot_called_for_late_start(): + """late_start path also invokes snapshot.""" + fsm = StateMachine(lockout_s=60) + notif = FakeNotifier() + audit = FakeAudit() + calls: list[tuple[str, str]] = [] + + def snap(kind: str, label: str): + calls.append((kind, label)) + return None + + _handle_tick(fsm, "light_green", 1.0, notif, audit, first_accepted=True, snapshot=snap) + + assert len(notif.alerts) == 1 + assert notif.alerts[0].kind == "late_start" + assert calls == [("late_start", "late_start_buy")] + + +def test_handle_tick_snapshot_called_for_catchup_prime(): + """catchup path: arm snapshot uses kind=catchup, prime snapshot uses + kind=catchup (so user's catchup toggle also gates the catchup prime).""" + fsm = StateMachine(lockout_s=60) + notif = FakeNotifier() + audit = FakeAudit() + calls: list[tuple[str, str]] = [] + + def snap(kind: str, label: str): + calls.append((kind, label)) + return None + + _handle_tick(fsm, "dark_red", 1.0, notif, audit, first_accepted=False, snapshot=snap) + + # Synth-arm catchup alert + real prime alert (post-synth) — both tagged catchup + assert len(notif.alerts) == 2 + assert calls == [("catchup", "catchup_arm_sell"), ("catchup", "prime_sell_catchup")] + + +# --------------------------------------------------------------------------- +# _save_annotated_frame — audit-log failures instead of swallowing silently. +# --------------------------------------------------------------------------- + +def test_save_annotated_frame_logs_audit_on_failure(tmp_path, monkeypatch): + """cv2.imwrite raising → return None AND audit.log event=snapshot_fail.""" + import atm.main as main_mod + + # Force the lazy cv2 import to succeed but fail on imwrite + class _FakeCv2: + @staticmethod + def rectangle(*a, **kw): pass + @staticmethod + def imwrite(*a, **kw): + raise OSError("disk full") + + import sys + monkeypatch.setitem(sys.modules, "cv2", _FakeCv2) + + cfg = type("C", (), {"dot_roi": type("R", (), {"x": 0, "y": 0, "w": 10, "h": 10})()})() + frame = type("F", (), {"copy": lambda self: self})() # dummy with .copy() + audit = FakeAudit() + + result = main_mod._save_annotated_frame(frame, cfg, tmp_path, "test_label", 123.0, audit=audit) + + assert result is None + assert any(e.get("event") == "snapshot_fail" and e.get("label") == "test_label" + for e in audit.events) + + +def test_save_annotated_frame_succeeds(tmp_path, monkeypatch): + """Happy path: cv2 present + imwrite succeeds → returns path.""" + import atm.main as main_mod + + written: list[str] = [] + + class _FakeCv2: + @staticmethod + def rectangle(*a, **kw): pass + @staticmethod + def imwrite(path, _img): + written.append(path) + + import sys + monkeypatch.setitem(sys.modules, "cv2", _FakeCv2) + + cfg = type("C", (), {"dot_roi": type("R", (), {"x": 0, "y": 0, "w": 10, "h": 10})()})() + frame = type("F", (), {"copy": lambda self: self})() + audit = FakeAudit() + + result = main_mod._save_annotated_frame(frame, cfg, tmp_path, "BUY", 1700000000.0, audit=audit) + + assert result is not None + assert result.parent == tmp_path + assert "BUY" in result.name + assert len(written) == 1 + assert not any(e.get("event") == "snapshot_fail" for e in audit.events) diff --git a/tests/test_notifier.py b/tests/test_notifier.py index 1cba552..3c1575e 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -156,19 +156,43 @@ def test_stop_drains(tmp_path: Path) -> None: # --------------------------------------------------------------------------- class _MockResponse: - def __init__(self, status_code: int, text: str = "") -> None: + def __init__( + self, + status_code: int, + text: str = "", + json_body: dict | None = None, + raise_on_json: bool = False, + ) -> None: self.status_code = status_code self.text = text + self._json_body = json_body if json_body is not None else {"ok": True, "result": {}} + self._raise_on_json = raise_on_json + + def json(self): + if self._raise_on_json: + raise ValueError("no JSON body") + return self._json_body class _MockSession: - def __init__(self, status_code: int = 204) -> None: + def __init__( + self, + status_code: int = 204, + json_body: dict | None = None, + raise_on_json: bool = False, + ) -> None: self.status_code = status_code + self._json_body = json_body + self._raise_on_json = raise_on_json self.calls: list[dict] = [] def post(self, url: str, **kwargs): self.calls.append({"url": url, **kwargs}) - return _MockResponse(self.status_code) + return _MockResponse( + self.status_code, + json_body=self._json_body, + raise_on_json=self._raise_on_json, + ) def test_discord_send_ok() -> None: @@ -219,3 +243,118 @@ def test_telegram_5xx_raises() -> None: n = TelegramNotifier("token", "chat123", session=_MockSession(500)) with pytest.raises(RuntimeError, match="500"): n.send(_alert("x")) + + +# Telegram returns 200 OK with {"ok": false, ...} for logical failures (bot +# blocked, invalid chat_id, parse_mode errors). Previously silent — now raises +# so FanoutNotifier retries + DLQs + stats count the failure. + +def test_telegram_ok_true_passes() -> None: + """200 + {ok:true} → success, no raise.""" + from atm.notifier.telegram import TelegramNotifier + session = _MockSession(200, json_body={"ok": True, "result": {"message_id": 42}}) + n = TelegramNotifier("token", "chat123", session=session) + n.send(_alert("ok body")) # must not raise + assert len(session.calls) == 1 + + +def test_telegram_ok_false_raises() -> None: + """200 + {ok:false, ...} → RuntimeError with code + description.""" + from atm.notifier.telegram import TelegramNotifier + session = _MockSession( + 200, + json_body={ + "ok": False, + "error_code": 403, + "description": "Forbidden: bot was blocked by the user", + }, + ) + n = TelegramNotifier("token", "chat123", session=session) + with pytest.raises(RuntimeError, match="logical failure.*403.*blocked"): + n.send(_alert("x")) + + +def test_telegram_malformed_json_treated_as_success() -> None: + """200 with non-JSON body → no raise (edge case, shouldn't happen in practice).""" + from atm.notifier.telegram import TelegramNotifier + session = _MockSession(200, raise_on_json=True) + n = TelegramNotifier("token", "chat123", session=session) + n.send(_alert("x")) # must not raise + + +def test_telegram_ok_false_goes_to_dlq(tmp_path: Path) -> None: + """Integration: ok:false → 3 retries → DLQ entry written with description.""" + from atm.notifier.telegram import TelegramNotifier + session = _MockSession( + 200, + json_body={"ok": False, "error_code": 400, "description": "chat not found"}, + ) + backend = TelegramNotifier("token", "chat123", session=session) + + dl = tmp_path / "dead.jsonl" + fan = FanoutNotifier([backend], dl, max_retries=3, backoff_base=0.01) + fan.send(_alert("will-fail")) + fan.stop(timeout=5.0) + + # 4 HTTP calls (1 initial + 3 retries) + assert len(session.calls) == 4 + s = fan.stats() + assert s["telegram"]["failed"] == 1 + assert s["telegram"]["retries"] == 3 + assert s["telegram"]["sent"] == 0 + + assert dl.exists() + lines = [json.loads(l) for l in dl.read_text().splitlines()] + assert len(lines) == 1 + entry = lines[0] + assert entry["backend"] == "telegram" + assert entry["alert_title"] == "will-fail" + assert "chat not found" in entry["error_str"] + assert "400" in entry["error_str"] + + +# --------------------------------------------------------------------------- +# on_drop callback — queue overflow audit trail +# --------------------------------------------------------------------------- + +def test_fanout_on_drop_callback_invoked(tmp_path: Path) -> None: + """Queue-overflow drop calls on_drop(backend_name, dropped_alert).""" + drops: list[tuple[str, Alert]] = [] + + def on_drop(name: str, alert: Alert) -> None: + drops.append((name, alert)) + + dl = tmp_path / "dead.jsonl" + slow = FakeBackend("slow", sleep_s=0.2) + fan = FanoutNotifier( + [slow], dl, queue_size=2, backoff_base=0.01, on_drop=on_drop, + ) + for i in range(10): + fan.send(_alert(f"a{i}")) + fan.stop(timeout=10.0) + + assert len(drops) > 0 + assert all(name == "slow" for name, _ in drops) + # Oldest alerts are the ones dropped + dropped_titles = {a.title for _, a in drops} + assert "a0" in dropped_titles or "a1" in dropped_titles + + +def test_fanout_on_drop_exception_swallowed(tmp_path: Path) -> None: + """on_drop raising must not break dispatch — audit failure must not silence alerts.""" + def bad_on_drop(_name: str, _alert: Alert) -> None: + raise RuntimeError("audit broken") + + dl = tmp_path / "dead.jsonl" + slow = FakeBackend("slow", sleep_s=0.2) + fan = FanoutNotifier( + [slow], dl, queue_size=2, backoff_base=0.01, on_drop=bad_on_drop, + ) + # Must not raise despite every drop invoking bad_on_drop + for i in range(10): + fan.send(_alert(f"a{i}")) + fan.stop(timeout=10.0) + + s = fan.stats() + # Some alerts still went through + assert s["slow"]["sent"] > 0 or s["slow"]["dropped"] > 0 diff --git a/tests/test_state_machine.py b/tests/test_state_machine.py index 513c1d5..75ad096 100644 --- a/tests/test_state_machine.py +++ b/tests/test_state_machine.py @@ -269,6 +269,58 @@ def test_refresh_arm_ts() -> None: assert t2.arm_ts == 9.0 +# --------------------------------------------------------------------------- +# 11. fired_in_session — public API for catchup suppression after fire +# --------------------------------------------------------------------------- + + +def test_fired_in_session_fresh_fsm() -> None: + """Fresh FSM — neither direction has fired.""" + sm = StateMachine() + assert sm.fired_in_session("BUY") is False + assert sm.fired_in_session("SELL") is False + + +def test_fired_in_session_after_buy_fire() -> None: + """After a BUY fire: BUY=True, SELL=False (direction-scoped).""" + sm = StateMachine(lockout_s=240) + sm.feed("turquoise", 1.0) + sm.feed("dark_green", 2.0) + t = sm.feed("light_green", 3.0) + assert t.reason == "fire" + + assert sm.fired_in_session("BUY") is True + assert sm.fired_in_session("SELL") is False + + +def test_fired_in_session_after_sell_fire() -> None: + """Mirror — after SELL fire: SELL=True, BUY=False.""" + sm = StateMachine(lockout_s=240) + sm.feed("yellow", 1.0) + sm.feed("dark_red", 2.0) + t = sm.feed("light_red", 3.0) + assert t.reason == "fire" + + assert sm.fired_in_session("SELL") is True + assert sm.fired_in_session("BUY") is False + + +def test_fired_in_session_both_directions() -> None: + """Fire both directions — both True.""" + sm = StateMachine(lockout_s=240) + # BUY cycle + sm.feed("turquoise", 1.0) + sm.feed("dark_green", 2.0) + sm.feed("light_green", 3.0) + # SELL cycle + sm.feed("yellow", 100.0) + sm.feed("dark_red", 101.0) + sm.feed("light_red", 102.0) + + assert sm.fired_in_session("BUY") is True + assert sm.fired_in_session("SELL") is True + + # --------------------------------------------------------------------------- # 11. exhaustive — parameterize over every (state, color) pair # ---------------------------------------------------------------------------