"""Tests for atm.main unified CLI.""" from __future__ import annotations import asyncio import os import subprocess import sys import types from dataclasses import dataclass from pathlib import Path from unittest.mock import MagicMock import cv2 import numpy as np import pytest SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"] # Ensure subprocess invocations find the atm package even without pip install _SRC = str(Path(__file__).resolve().parent.parent / "src") _SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _mock_config_class(cfg=None): """Return a Config-like class whose load_current() returns *cfg*.""" if cfg is None: cfg = MagicMock() # window_title must be a real falsy value, not a MagicMock auto-attribute; # otherwise _cmd_run enters _focus_window_by_title and TypeErrors. cfg.window_title = None mock_cls = MagicMock() mock_cls.load_current.return_value = cfg return mock_cls # --------------------------------------------------------------------------- # test_help_works # --------------------------------------------------------------------------- def test_help_works(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) assert result.returncode == 0, result.stderr # --------------------------------------------------------------------------- # test_subcommands_listed # --------------------------------------------------------------------------- def test_subcommands_listed(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) output = result.stdout for cmd in SUBCOMMANDS: assert cmd in output, f"Expected subcommand '{cmd}' in --help output" # --------------------------------------------------------------------------- # test_dryrun_wiring # --------------------------------------------------------------------------- @dataclass class _DryrunResult: acceptance_pass: bool def _make_dryrun_module(acceptance_pass: bool): mod = types.ModuleType("atm.dryrun") mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass) mod.print_report = lambda r: None return mod def test_dryrun_wiring_pass(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 0 def test_dryrun_wiring_fail(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 1 # --------------------------------------------------------------------------- # test_report_current_week_default # --------------------------------------------------------------------------- def test_report_current_week_default(monkeypatch, tmp_path): import atm.main as _main # Journal.all returns no entries — report should print a zero-trade week monkeypatch.setattr("atm.journal.Journal.all", lambda self: []) # Should not raise; no sys.exit expected _main.main(["report", "--file", str(tmp_path / "trades.jsonl")]) # --------------------------------------------------------------------------- # test_run_live_dry # --------------------------------------------------------------------------- def test_run_live_dry(monkeypatch): import atm.main as _main calls: list[dict] = [] def _mock_run_live(cfg, duration_s=None, capture_stub=False): calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub}) monkeypatch.setattr("atm.main.run_live", _mock_run_live) monkeypatch.setattr("atm.main.Config", _mock_config_class()) _main.main(["run", "--duration", "0"]) assert len(calls) == 1 assert calls[0]["duration_s"] == pytest.approx(0.0) # --------------------------------------------------------------------------- # Regression integration test — user bug 2026-04-16. # Session starts with an accepted gray tick followed by dark_red. Catchup # synth-arm must fire on dark_red (the previous gray consumed first_accepted), # then light_red triggers SELL. Proves run_live wiring dispatches alerts for # the user's exact scenario. # --------------------------------------------------------------------------- def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured.append(alert) def stop(self): pass class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass class _StopLoop(Exception): pass class ScriptedDetector: _script = [ ("gray", True), ("gray", True), ("dark_red", True), ("dark_red", True), ("light_red", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult( ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color, ) def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0 cfg.dot_roi.y = 0 cfg.dot_roi.w = 10 cfg.dot_roi.h = 10 cfg.chart_window_region = None monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) class _StubPoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StubScheduler: def __init__(self, *a, **kw): self.is_running = False def start(self, interval_s): self.is_running = True def stop(self): self.is_running = False async def run(self): await asyncio.sleep(9999) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.main.time.sleep", lambda s: None) monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler) with pytest.raises(_StopLoop): _main.run_live(cfg, duration_s=None) arm = [a for a in captured if a.kind == "arm"] prime = [a for a in captured if a.kind == "prime"] trigger = [a for a in captured if a.kind == "trigger"] assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})" assert arm[0].direction == "SELL" assert "recuperare" in (arm[0].title + arm[0].body).lower() assert len(prime) == 1 assert prime[0].direction == "SELL" assert "recuperare" in (prime[0].title + prime[0].body).lower() assert len(trigger) == 1 assert trigger[0].direction == "SELL" # --------------------------------------------------------------------------- # MUST-HAVE: async lifecycle integration test # IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops) # Tests: scheduler starts on prime, stops on fire, fire alert sent. # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured_alerts: list = [] scheduler_events: list[str] = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured_alerts.append(alert) def stop(self): pass def stats(self): return {} class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass # Scheduler tracks start/stop calls class FakeScheduler: def __init__(self, *a, **kw): self.is_running = False self.interval_s = None def start(self, interval_s): self.is_running = True self.interval_s = interval_s scheduler_events.append(f"start:{interval_s}") def stop(self): self.is_running = False scheduler_events.append("stop") async def run(self): await asyncio.sleep(9999) class FakePoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StopLoop(Exception): pass class ScriptedDetector: # turquoise→ARM, dark_green→PRIME, light_green→FIRE _script = [ ("turquoise", True), ("dark_green", True), ("light_green", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult(ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color) @property def rolling(self): return [] def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10 cfg.chart_window_region = None cfg.telegram.auto_poll_interval_s = 180 cfg.telegram.bot_token = "tok" cfg.telegram.chat_id = "123" cfg.telegram.allowed_chat_ids = ("123",) fake_sched = FakeScheduler() monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched) with pytest.raises(_StopLoop): await _main.run_live_async(cfg, duration_s=None) arm_alerts = [a for a in captured_alerts if a.kind == "arm"] prime_alerts = [a for a in captured_alerts if a.kind == "prime"] trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"] assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}" assert arm_alerts[0].direction == "BUY" assert len(prime_alerts) == 1 assert prime_alerts[0].direction == "BUY" assert len(trigger_alerts) == 1 assert trigger_alerts[0].direction == "BUY" # Scheduler must have started (on PRIME) and stopped (on FIRE) assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}" assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}" start_idx = scheduler_events.index("start:180") stop_idx = scheduler_events.index("stop") assert start_idx < stop_idx, "scheduler started after it stopped" # --------------------------------------------------------------------------- # Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally, # even when canary is paused or when detection is otherwise skipped. # Prior bug: `continue` past the drain loop caused commands to pile up. # --------------------------------------------------------------------------- def _make_ctx_for_drain(cmd_queue, dispatched: list): """Build a minimal RunContext where _dispatch_command just records calls.""" import atm.main as _main class _FakeAudit: def __init__(self): self.events = [] def log(self, e): self.events.append(e) class _FakeNotifier: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _FakeCanary: def __init__(self, paused=True): self.is_paused = paused class _FakeScheduler: is_running = False interval_s = None def start(self, s): pass def stop(self): pass state = _main._LoopState(start=0.0) ctx = _main.RunContext( cfg=MagicMock(), capture=lambda: None, canary=_FakeCanary(paused=True), detector=MagicMock(), fsm=MagicMock(), notifier=_FakeNotifier(), audit=_FakeAudit(), detection_log=_FakeAudit(), scheduler=_FakeScheduler(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=cmd_queue, state=state, levels_extractor_factory=lambda *a, **kw: None, ) return ctx @pytest.mark.asyncio async def test_drain_works_when_canary_paused(monkeypatch): """Regression: when canary.is_paused, _drain_cmd_queue still dispatches. Prior bug: detection loop `continue`'d past the drain block whenever the tick returned res=None (canary paused). Commands accumulated forever. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) await _main._drain_cmd_queue(ctx) assert dispatched == ["status", "ss"] assert q.empty() @pytest.mark.asyncio async def test_drain_works_when_out_of_window(monkeypatch): """Drain must still fire when the tick skipped (e.g. out of operating hours). The refactored loop runs _drain_cmd_queue unconditionally after every tick, regardless of `_TickSyncResult` content. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="stop")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) # Simulate out-of-window tick (empty _TickSyncResult, no res) await _main._handle_fsm_result(ctx, _main._TickSyncResult()) await _main._drain_cmd_queue(ctx) assert dispatched == ["stop"] @pytest.mark.asyncio async def test_drain_isolates_dispatch_exceptions(monkeypatch): """If one command raises, remaining commands still drain + warn alert sent.""" import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) attempts: list = [] async def _fake_dispatch(ctx, cmd): attempts.append(cmd.action) if cmd.action == "status": raise RuntimeError("boom") monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, attempts) await _main._drain_cmd_queue(ctx) assert attempts == ["status", "ss"] # warn alert for the failed command warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"] assert any("status" in t for t in warn_titles) # command_error audit event errs = [e for e in ctx.audit.events if e.get("event") == "command_error"] assert len(errs) == 1 and errs[0]["action"] == "status" # --------------------------------------------------------------------------- # Commit 4: operating hours + LifecycleState transitions # --------------------------------------------------------------------------- from zoneinfo import ZoneInfo as _ZI # noqa: E402 import datetime as _dt # noqa: E402 def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"), start="09:30", stop="16:00", tz="America/New_York"): """Build a lightweight cfg-like object with operating_hours populated.""" oh = types.SimpleNamespace( enabled=enabled, timezone=tz, weekdays=weekdays, start_hhmm=start, stop_hhmm=stop, _tz_cache=_ZI(tz) if enabled else None, ) return types.SimpleNamespace(operating_hours=oh, window_title=None) def _fake_canary(paused=False): return types.SimpleNamespace(is_paused=paused) @pytest.mark.parametrize( "local_dt,expected", [ # Monday 09:30 NY — exact open → active (None) (_dt.datetime(2026, 4, 20, 9, 30), None), # Monday 16:00 NY — exact close → inactive (>= stop) (_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"), # Monday 08:00 NY — before open (_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"), # Monday 12:00 NY — active (_dt.datetime(2026, 4, 20, 12, 0), None), # Saturday 12:00 NY — weekend (_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"), # Sunday 23:00 NY — weekend (_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"), ], ) def test_operating_hours_skip_matrix(local_dt, expected): """Timezone-aware start/stop + weekday checks.""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache now_ts = local_dt.replace(tzinfo=tz).timestamp() lifecycle = _main.LifecycleState() result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary()) assert result == expected def test_market_open_close_transitions_logged_once(): """Crossing a boundary emits exactly one market_open / market_closed event.""" import atm.main as _main audit_events = [] alerts = [] class _A: def log(self, e): audit_events.append(e) class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState() canary = _fake_canary() # Prime as closed (before open, Monday 08:00) pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary) _main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N()) # First evaluation seeds state, no alert yet. assert lifecycle.last_window_state == "closed" assert alerts == [] assert audit_events == [] # Transition to open mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() skip_mid = _main._should_skip(mid, lifecycle, cfg, canary) _main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N()) assert lifecycle.last_window_state == "open" assert len(alerts) == 1 assert any(e.get("event") == "market_open" for e in audit_events) # Repeated open tick — no duplicate log alerts.clear() audit_events.clear() skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary) _main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N()) assert alerts == [] assert audit_events == [] # Transition to close close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp() skip_close = _main._should_skip(close, lifecycle, cfg, canary) _main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N()) assert lifecycle.last_window_state == "closed" assert any(e.get("event") == "market_closed" for e in audit_events) def test_market_transition_sends_notification(): """market_open / market_closed transitions produce kind=status alerts.""" import atm.main as _main alerts = [] class _A: def log(self, e): pass class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState(last_window_state="closed") mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() _main._maybe_log_transition(None, lifecycle, mid, _A(), _N()) assert len(alerts) == 1 assert alerts[0].kind == "status" assert "piața" in alerts[0].title.lower() or "monitorizare" in alerts[0].body.lower() def test_startup_in_window_suppresses_market_open(): """R2 #20: first evaluation in-window just seeds state; no alert fires.""" import atm.main as _main alerts = [] events = [] class _A: def log(self, e): events.append(e) class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState() # last_window_state is None in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary()) assert skip is None _main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N()) # Seeded silently assert lifecycle.last_window_state == "open" assert alerts == [] assert not any(e.get("event") == "market_open" for e in events) # Two more ticks, still in-window → no spurious alert for _ in range(2): skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary()) _main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N()) assert alerts == [] def test_operating_hours_weekday_locale_independent(): """R2 #22: weekday check must not depend on process locale (strftime('%a')).""" import locale as _locale import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache # Saturday 12:00 NY sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() original = _locale.setlocale(_locale.LC_TIME) try: for loc in ("C", "de_DE.UTF-8"): try: _locale.setlocale(_locale.LC_TIME, loc) except _locale.Error: continue # locale not installed → skip gracefully lifecycle = _main.LifecycleState() result = _main._should_skip(sat, lifecycle, cfg, _fake_canary()) assert result == "out_of_window_weekend", ( f"locale={loc} returned {result!r}" ) finally: try: _locale.setlocale(_locale.LC_TIME, original) except _locale.Error: _locale.setlocale(_locale.LC_TIME, "C") def test_should_skip_user_paused_wins(): import atm.main as _main cfg = _oh_cfg() lifecycle = _main.LifecycleState(user_paused=True) # Mid-Monday (in-window) — should still skip because user_paused tz = cfg.operating_hours._tz_cache mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused" def test_should_skip_canary_drift_wins_over_window(): import atm.main as _main cfg = _oh_cfg() lifecycle = _main.LifecycleState() tz = cfg.operating_hours._tz_cache mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused" # --------------------------------------------------------------------------- # Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21) # --------------------------------------------------------------------------- def _dispatch_ctx(canary=None, lifecycle=None, cfg=None): """Minimal RunContext for _dispatch_command unit tests.""" import atm.main as _main class _A: def __init__(self): self.events = [] def log(self, e): self.events.append(e) class _N: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _S: is_running = False interval_s = None def start(self, s): self.is_running = True def stop(self): self.is_running = False if canary is None: canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) if lifecycle is None: lifecycle = _main.LifecycleState() if cfg is None: cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) # Skip window focus in dispatch tests — MagicMock window_title would # propagate into _focus_window_by_title (real Win32 call). cfg.window_title = None state = _main._LoopState(start=0.0) ctx = _main.RunContext( cfg=cfg, capture=lambda: None, canary=canary, detector=MagicMock(), fsm=MagicMock(), notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=MagicMock(), state=state, levels_extractor_factory=lambda *a, **kw: None, lifecycle=lifecycle, ) return ctx @pytest.mark.asyncio async def test_pause_command_sets_user_paused_and_skips_detection(): import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() await _main._dispatch_command(ctx, Command(action="pause")) assert ctx.lifecycle.user_paused is True # When combined with _should_skip, we get user_paused assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused" # Audit + notif assert any(e.get("event") == "user_paused" for e in ctx.audit.events) assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts) @pytest.mark.asyncio async def test_resume_clears_user_paused_and_canary_when_forced(): import atm.main as _main from atm.commands import Command canary_state = {"paused": True} canary = types.SimpleNamespace( is_paused=True, resume=lambda: canary_state.__setitem__("paused", False), ) # Re-bind is_paused via property so resume() effect is visible class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert ctx.lifecycle.user_paused is False assert canary.is_paused is False force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert force_events and force_events[0]["force"] is True @pytest.mark.asyncio async def test_resume_plain_also_clears_canary_drift(): """2026-04-21 decision: /resume (no arg) now clears BOTH user_paused and canary drift. /resume force remains accepted as legacy alias. Previous behavior (force required for drift) was a UX trap — see plan doc.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume")) # no force assert ctx.lifecycle.user_paused is False assert canary.is_paused is False # drift cleared without force # Audit event still records was_drift + force=False for traceability resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert resumed and resumed[0]["was_drift"] is True and resumed[0]["force"] is False # Message mentions drift-pause was cleared (kind is "screenshot" now since /resume attaches image) alerts = ctx.notifier.alerts assert alerts and ("drift" in (alerts[0].title + alerts[0].body).lower()) @pytest.mark.asyncio async def test_resume_force_alias_still_works(): """/resume force (value=1) remains accepted — legacy muscle memory.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert ctx.lifecycle.user_paused is False assert canary.is_paused is False resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert resumed and resumed[0]["force"] is True # --------------------------------------------------------------------------- # /rebase: propose + confirm flow # --------------------------------------------------------------------------- def _rebase_cfg_and_path(tmp_path): """Write a minimal TOML with baseline_phash, return (cfg_live, path, cfg_version).""" name = "rebase_test" path = tmp_path / "configs" / f"{name}.toml" path.parent.mkdir(parents=True, exist_ok=True) old = "a" * 64 path.write_text( f'[canary]\n' f'baseline_phash = "{old}" # comment stays\n' f'drift_threshold = 8\n' f'[canary.roi]\nx = 0\ny = 0\nw = 4\nh = 4\n', encoding="utf-8", ) cfg = types.SimpleNamespace( window_title=None, config_version=name, canary=types.SimpleNamespace( roi=types.SimpleNamespace(x=0, y=0, w=4, h=4), baseline_phash=old, drift_threshold=8, ), telegram=types.SimpleNamespace(auto_poll_interval_s=180), operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None), ) return cfg, path, old def _blue_frame(h=200, w=200): import numpy as _np return _np.full((h, w, 3), (50, 80, 20), dtype=_np.uint8) def test_rewrite_baseline_phash_updates_only_target_line(tmp_path): import atm.main as _main p = tmp_path / "cfg.toml" p.write_text( '[canary]\n' 'baseline_phash = "deadbeef" # keep this comment\n' 'drift_threshold = 8\n', encoding="utf-8", ) old = _main._rewrite_baseline_phash(p, "cafef00d") assert old == "deadbeef" txt = p.read_text(encoding="utf-8") assert 'baseline_phash = "cafef00d" # keep this comment' in txt assert "drift_threshold = 8" in txt def test_rewrite_baseline_phash_raises_when_missing(tmp_path): import atm.main as _main p = tmp_path / "cfg.toml" p.write_text("[canary]\ndrift_threshold = 8\n", encoding="utf-8") with pytest.raises(ValueError): _main._rewrite_baseline_phash(p, "ff") @pytest.mark.asyncio async def test_rebase_propose_sets_pending_and_does_not_touch_file(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.capture = lambda: _blue_frame() ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) assert ctx.pending_rebase is not None proposed_ts, new_phash, pending_path = ctx.pending_rebase assert new_phash != old assert pending_path == Path("configs") / f"{cfg.config_version}.toml" # File unchanged at propose time assert f'"{old}"' in cfg_path.read_text(encoding="utf-8") events = [e["event"] for e in ctx.audit.events] assert "rebase_proposed" in events @pytest.mark.asyncio async def test_rebase_confirm_applies_and_clears_pauses(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) canary = _Canary() ctx = _dispatch_ctx(cfg=cfg, canary=canary) ctx.capture = lambda: _blue_frame() ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() ctx.lifecycle.user_paused = True monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) assert ctx.pending_rebase is not None _, new_phash, _ = ctx.pending_rebase await _main._dispatch_command(ctx, Command(action="rebase", value=1)) assert ctx.pending_rebase is None assert ctx.lifecycle.user_paused is False assert canary.is_paused is False assert cfg.canary.baseline_phash == new_phash txt = cfg_path.read_text(encoding="utf-8") assert f'"{new_phash}"' in txt assert f'"{old}"' not in txt applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] assert applied and applied[0]["old_phash"] == old and applied[0]["new_phash"] == new_phash @pytest.mark.asyncio async def test_rebase_confirm_without_pending_warns(tmp_path): import atm.main as _main from atm.commands import Command cfg, _, _ = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() await _main._dispatch_command(ctx, Command(action="rebase", value=1)) assert ctx.pending_rebase is None alerts = ctx.notifier.alerts assert alerts and "nimic" in alerts[0].title.lower() applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] assert not applied @pytest.mark.asyncio async def test_rebase_confirm_expired_pending_warns(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() # Pretend propose happened 1h ago import time as _t ctx.pending_rebase = (_t.time() - 3600, "b" * 64, cfg_path) await _main._dispatch_command(ctx, Command(action="rebase", value=1)) assert ctx.pending_rebase is None # File untouched assert f'"{old}"' in cfg_path.read_text(encoding="utf-8") alerts = ctx.notifier.alerts assert alerts and "expirat" in alerts[0].title.lower() @pytest.mark.asyncio async def test_rebase_confirm_mutates_frozen_canary_region(tmp_path, monkeypatch): """Regression: CanaryRegion is @dataclass(frozen=True). Plain assignment raises FrozenInstanceError. /rebase confirm must mirror the new phash into the live cfg anyway — otherwise canary.check() keeps the stale hash and drift re-pauses within one tick after the user confirms.""" import atm.main as _main from atm.commands import Command from atm.config import CanaryRegion, ROI as _ROI class _Canary: def __init__(self): self._p = False @property def is_paused(self): return self._p def resume(self): self._p = False # Build cfg with the REAL frozen dataclass, not a SimpleNamespace. _, cfg_path, old = _rebase_cfg_and_path(tmp_path) cfg = types.SimpleNamespace( window_title=None, config_version="rebase_test", canary=CanaryRegion( roi=_ROI(x=0, y=0, w=4, h=4), baseline_phash=old, drift_threshold=8, ), telegram=types.SimpleNamespace(auto_poll_interval_s=180), operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None), ) ctx = _dispatch_ctx(cfg=cfg, canary=_Canary()) ctx.capture = lambda: _blue_frame() ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) _, new_phash, _ = ctx.pending_rebase await _main._dispatch_command(ctx, Command(action="rebase", value=1)) # Live cfg mirrors the new hash (in-memory canary.check() sees it) assert cfg.canary.baseline_phash == new_phash # TOML on disk also updated assert f'"{new_phash}"' in cfg_path.read_text(encoding="utf-8") applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] assert applied @pytest.mark.asyncio async def test_rebase_propose_capture_failed_warns(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command cfg, _, _ = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.capture = lambda: None ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) assert ctx.pending_rebase is None alerts = ctx.notifier.alerts assert alerts and "e" in alerts[0].title.lower() # "Captură eșuată" @pytest.mark.asyncio async def test_resume_out_of_window_responds_with_pending_message(): """/resume while operating-hours window is closed → special body.""" import atm.main as _main from atm.commands import Command cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed") canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg) # Pin time to Saturday import atm.main as _mm real_time = _mm.time fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() class _FakeTime: def time(self): return fake_ts def monotonic(self): return 0.0 _mm.time = _FakeTime() try: await _main._dispatch_command(ctx, Command(action="resume")) finally: _mm.time = real_time assert ctx.lifecycle.user_paused is False alerts = ctx.notifier.alerts assert alerts combined = (alerts[0].title + alerts[0].body).lower() assert "închis" in combined or "piața" in combined or "ferestr" in combined @pytest.mark.asyncio async def test_dispatch_resume_sends_inline_screenshot(monkeypatch, tmp_path): """/resume produces a single Alert with image_path + FSM pick caption when capture succeeds.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True ctx.capture = lambda: object() # non-None frame ctx.fires_dir = tmp_path ctx.cfg.window_title = None _fake_detections = [{ "idx": 0, "name": "light_green", "rgb": (0, 255, 0), "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), }] monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "fake_resume.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="resume")) # Exactly one alert, with image attached + caption in body. alerts = ctx.notifier.alerts assert len(alerts) == 1 alert = alerts[0] assert alert.image_path == tmp_path / "fake_resume.png" assert "Monitorizare reluată" in alert.title assert "← pick" in alert.body assert "captură eșuată" not in alert.title @pytest.mark.asyncio async def test_dispatch_resume_capture_failed_still_resumes(monkeypatch, tmp_path): """/resume with capture=None → Alert title contains capture-failed, no image, resume still executes.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True ctx.capture = lambda: None # capture fails ctx.fires_dir = tmp_path ctx.cfg.window_title = None await _main._dispatch_command(ctx, Command(action="resume")) # State still cleared despite capture failure. assert ctx.lifecycle.user_paused is False assert canary.is_paused is False alerts = ctx.notifier.alerts assert len(alerts) == 1 assert alerts[0].image_path is None assert "captură eșuată" in alerts[0].title assert "Monitorizare reluată" in alerts[0].title @pytest.mark.asyncio async def test_dispatch_resume_captures_before_state_clear(monkeypatch, tmp_path): """Capture must run BEFORE clearing user_paused / canary.resume() to avoid race with FSM tick.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True self.resumed_at: float | None = None @property def is_paused(self): return self._p def resume(self): self._p = False self.resumed_at = _capture_sequence[0] if _capture_sequence else 0 canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True ctx.fires_dir = tmp_path ctx.cfg.window_title = None _capture_sequence: list[int] = [] _capture_called = [0] def _capture(): _capture_called[0] += 1 # State must still be paused at capture time. assert ctx.lifecycle.user_paused is True, "capture ran AFTER user_paused was cleared" assert canary.is_paused is True, "capture ran AFTER canary.resume()" _capture_sequence.append(_capture_called[0]) return object() ctx.capture = _capture monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "ok.png", [])) await _main._dispatch_command(ctx, Command(action="resume")) assert _capture_called[0] == 1 assert ctx.lifecycle.user_paused is False # cleared after capture @pytest.mark.asyncio async def test_ss_and_fire_agree_on_rightmost_dot(tmp_path): """Parity: _save_inspect_frame's detections[0].pos_abs must match find_rightmost_dot output on the same frame + ROI. Prevents silent drift between /ss verify and fire path.""" import atm.main as _main from atm.vision import find_rightmost_dot, crop_roi from atm.config import ROI, ColorSpec, YAxisCalib # Synthetic frame with one bright green dot. frame = np.zeros((100, 200, 3), dtype=np.uint8) frame[:, :] = (18, 18, 18) # BGR background matching the palette entry below cv2.circle(frame, (150, 50), 5, (0, 255, 0), -1) cfg = types.SimpleNamespace( dot_roi=ROI(x=10, y=10, w=180, h=80), colors={ "background": ColorSpec(rgb=(18, 18, 18), tolerance=15.0), "light_green": ColorSpec(rgb=(0, 255, 0), tolerance=60.0), }, y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=90, p2_price=50.0), version="parity-test", ) dot_crop = crop_roi(frame, cfg.dot_roi) fire_pos = find_rightmost_dot(dot_crop, cfg.colors["background"].rgb) assert fire_pos is not None fire_abs = (cfg.dot_roi.x + fire_pos[0], cfg.dot_roi.y + fire_pos[1]) path, detections = _main._save_inspect_frame(frame, cfg, tmp_path, now=123.0) assert path is not None assert detections, "inspect should detect the green dot" inspect_abs = detections[0]["pos_abs"] assert inspect_abs == fire_abs, ( f"Parity break: fire={fire_abs} inspect={inspect_abs} — " "fire path and /ss would show different rightmost positions." ) @pytest.mark.asyncio async def test_status_command_reports_pause_reason(): """/status body must mention pause reason + window state.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.lifecycle.user_paused = True # Stub detector.rolling for status ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status body = status[0].body assert "pauză manuală" in body or "pauza" in body.lower() @pytest.mark.asyncio async def test_set_interval_refused_while_canary_paused(): """2026-04-21: /set_interval must not start scheduler while canary is drift-paused. Previously started scheduler silently, misleading the user into thinking detection was live. Now emits a warn and refuses.""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=True, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) await _main._dispatch_command(ctx, Command(action="set_interval", value=60)) # Scheduler must NOT have started assert ctx.scheduler.is_running is False # No scheduler_started audit event assert not any(e.get("event") == "scheduler_started" for e in ctx.audit.events) # A warn alert must have been sent referencing /resume warns = [a for a in ctx.notifier.alerts if a.kind == "warn"] assert warns combined = (warns[0].title + warns[0].body).lower() assert "drift" in combined and "/resume" in combined @pytest.mark.asyncio async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path): """2026-04-21: /ss still captures while canary is paused, but the alert body must warn that detection is off.""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=True, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) # Bypass window focus + use a simple non-None capture result fake_frame = object() ctx = types.SimpleNamespace(**{**ctx.__dict__}) # shallow copy RunContext fields # Simpler: just override the capture and save functions used async_capture_called = {"n": 0} def _capture(): async_capture_called["n"] += 1 return fake_frame ctx.capture = _capture ctx.fires_dir = tmp_path # window_title off so we skip focus branch ctx.cfg.window_title = None # stub _save_inspect_frame to return (path, detections) _fake_detections = [{ "idx": 0, "name": "light_green", "rgb": (0, 255, 0), "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), }] monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="ss")) screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"] assert screenshots assert "DETECȚIE OPRITĂ" in screenshots[0].body or "drift" in screenshots[0].body.lower() assert "/resume" in screenshots[0].body # Caption with FSM pick must appear alongside the warn. assert "← pick" in screenshots[0].body @pytest.mark.asyncio async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path): """/ss body contains caption only when canary is not paused (no warn prefix).""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) ctx.capture = lambda: object() ctx.fires_dir = tmp_path ctx.cfg.window_title = None _fake_detections = [{ "idx": 0, "name": "light_green", "rgb": (0, 255, 0), "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), }] monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="ss")) screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"] assert screenshots # Body should contain the caption (no warn), not be empty. assert "← pick" in screenshots[0].body assert "DETECȚIE OPRITĂ" not in screenshots[0].body @pytest.mark.asyncio async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path): """E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert. This test verifies the full command-driven lifecycle in isolation: - canary starts drift-paused, _should_skip returns drift_paused - /resume force clears canary + user_paused - subsequent detection produces SELL fire through normal FSM path """ import atm.main as _main from atm.commands import Command # Canary with mutable pause state class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) cfg.window_title = None # skip Win32 focus path in unit test ctx = _dispatch_ctx(canary=canary, cfg=cfg) # 1. While drift-paused, _should_skip returns drift_paused assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused" # 2. User issues /resume force await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert canary.is_paused is False assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None # 3. Feed a yellow→light_red sequence through _handle_tick (FSM path) from atm.state_machine import StateMachine, State fsm = StateMachine(lockout_s=60) class _N: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _A: def log(self, _e): pass notif = _N() audit = _A() cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True)) _main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock) _main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock) tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock) # FSM reached fire via normal path assert tr is not None and tr.trigger == "SELL" assert fsm.state == State.IDLE # --------------------------------------------------------------------------- # _in_trading_window tests # --------------------------------------------------------------------------- import datetime as _dt def test_in_trading_window_disabled(): """Returns True when operating_hours disabled.""" import atm.main as _main cfg = types.SimpleNamespace(operating_hours=types.SimpleNamespace(enabled=False)) assert _main._in_trading_window(0.0, cfg) is True def test_in_trading_window_no_oh(): """Returns True when cfg has no operating_hours attr.""" import atm.main as _main cfg = types.SimpleNamespace() assert _main._in_trading_window(0.0, cfg) is True def test_in_trading_window_in_window(): """Returns True during configured hours (Monday 12:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is True def test_in_trading_window_out_of_hours(): """Returns False before market open (Monday 08:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is False def test_in_trading_window_weekend(): """Returns False on weekend (Sunday 12:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 19, 12, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is False def test_heartbeat_suppressed_outside_hours(monkeypatch): """_in_trading_window=False prevents heartbeat from sending.""" import atm.main as _main monkeypatch.setattr(_main, "_in_trading_window", lambda ts, cfg: False) sent = [] # Simulate the check: outside window → no send if not _main._in_trading_window(0.0, None): pass # heartbeat_due reset, continue — no send assert sent == [] def test_build_heartbeat_alert_active_when_not_paused(): """Healthy state → title=activ, body shows fsm.state plainly.""" import atm.main as _main a = _main._build_heartbeat_alert( fsm_state="ARMED_BUY", fire_count=2, uptime_h=1.5, canary_paused=False, ) assert a.kind == "heartbeat" assert a.title == "activ" assert "ARMED_BUY" in a.body assert "[drift-pause]" not in a.body assert "semnale: 2" in a.body def test_build_heartbeat_alert_paused_when_canary_drift(): """2026-04-21: heartbeat must reflect canary drift instead of lying with 'activ'.""" import atm.main as _main a = _main._build_heartbeat_alert( fsm_state="ARMED_SELL", fire_count=0, uptime_h=3.2, canary_paused=True, ) assert a.kind == "heartbeat" assert "pauzat" in a.title assert "drift" in a.title.lower() assert "ARMED_SELL" in a.body assert "[drift-pause]" in a.body @pytest.mark.asyncio async def test_status_compact_active(): """/status produces compact 2-line format; 'Canary' absent.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="ARMED_BUY") ctx.state.fire_count = 3 await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status body = status[0].body lines = body.splitlines() assert len(lines) == 2 # no fereastră line (oh.enabled=False) assert "ARMED_BUY" in lines[0] assert "semnale: 3" in lines[0] assert "Canary" not in body assert "canary" not in body @pytest.mark.asyncio async def test_status_compact_paused_manual(): """/status shows 'pauză manuală' on line 1 when user_paused.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.lifecycle.user_paused = True ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] body = status[0].body assert body.startswith("pauză manuală") @pytest.mark.asyncio async def test_status_window_line_when_oh_enabled(): """/status adds fereastră line when operating_hours enabled.""" import atm.main as _main from atm.commands import Command cfg = _oh_cfg() lifecycle = _main.LifecycleState(last_window_state="open") ctx = _dispatch_ctx(lifecycle=lifecycle, cfg=cfg) ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] body = status[0].body assert "fereastră: deschisă" in body