"""Tests for atm.main unified CLI.""" from __future__ import annotations import asyncio import os import subprocess import sys import types from dataclasses import dataclass from pathlib import Path from unittest.mock import MagicMock import pytest SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"] # Ensure subprocess invocations find the atm package even without pip install _SRC = str(Path(__file__).resolve().parent.parent / "src") _SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _mock_config_class(cfg=None): """Return a Config-like class whose load_current() returns *cfg*.""" if cfg is None: cfg = MagicMock() mock_cls = MagicMock() mock_cls.load_current.return_value = cfg return mock_cls # --------------------------------------------------------------------------- # test_help_works # --------------------------------------------------------------------------- def test_help_works(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) assert result.returncode == 0, result.stderr # --------------------------------------------------------------------------- # test_subcommands_listed # --------------------------------------------------------------------------- def test_subcommands_listed(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) output = result.stdout for cmd in SUBCOMMANDS: assert cmd in output, f"Expected subcommand '{cmd}' in --help output" # --------------------------------------------------------------------------- # test_dryrun_wiring # --------------------------------------------------------------------------- @dataclass class _DryrunResult: acceptance_pass: bool def _make_dryrun_module(acceptance_pass: bool): mod = types.ModuleType("atm.dryrun") mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass) mod.print_report = lambda r: None return mod def test_dryrun_wiring_pass(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 0 def test_dryrun_wiring_fail(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 1 # --------------------------------------------------------------------------- # test_report_current_week_default # --------------------------------------------------------------------------- def test_report_current_week_default(monkeypatch, tmp_path): import atm.main as _main # Journal.all returns no entries — report should print a zero-trade week monkeypatch.setattr("atm.journal.Journal.all", lambda self: []) # Should not raise; no sys.exit expected _main.main(["report", "--file", str(tmp_path / "trades.jsonl")]) # --------------------------------------------------------------------------- # test_run_live_dry # --------------------------------------------------------------------------- def test_run_live_dry(monkeypatch): import atm.main as _main calls: list[dict] = [] def _mock_run_live(cfg, duration_s=None, capture_stub=False): calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub}) monkeypatch.setattr("atm.main.run_live", _mock_run_live) monkeypatch.setattr("atm.main.Config", _mock_config_class()) _main.main(["run", "--duration", "0"]) assert len(calls) == 1 assert calls[0]["duration_s"] == pytest.approx(0.0) # --------------------------------------------------------------------------- # Regression integration test — user bug 2026-04-16. # Session starts with an accepted gray tick followed by dark_red. Catchup # synth-arm must fire on dark_red (the previous gray consumed first_accepted), # then light_red triggers SELL. Proves run_live wiring dispatches alerts for # the user's exact scenario. # --------------------------------------------------------------------------- def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured.append(alert) def stop(self): pass class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass class _StopLoop(Exception): pass class ScriptedDetector: _script = [ ("gray", True), ("gray", True), ("dark_red", True), ("dark_red", True), ("light_red", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult( ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color, ) def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0 cfg.dot_roi.y = 0 cfg.dot_roi.w = 10 cfg.dot_roi.h = 10 cfg.chart_window_region = None monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) class _StubPoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StubScheduler: def __init__(self, *a, **kw): self.is_running = False def start(self, interval_s): self.is_running = True def stop(self): self.is_running = False async def run(self): await asyncio.sleep(9999) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.main.time.sleep", lambda s: None) monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler) with pytest.raises(_StopLoop): _main.run_live(cfg, duration_s=None) arm = [a for a in captured if a.kind == "arm"] prime = [a for a in captured if a.kind == "prime"] trigger = [a for a in captured if a.kind == "trigger"] assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})" assert arm[0].direction == "SELL" assert "recuperare" in (arm[0].title + arm[0].body).lower() assert len(prime) == 1 assert prime[0].direction == "SELL" assert "recuperare" in (prime[0].title + prime[0].body).lower() assert len(trigger) == 1 assert trigger[0].direction == "SELL" # --------------------------------------------------------------------------- # MUST-HAVE: async lifecycle integration test # IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops) # Tests: scheduler starts on prime, stops on fire, fire alert sent. # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured_alerts: list = [] scheduler_events: list[str] = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured_alerts.append(alert) def stop(self): pass def stats(self): return {} class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass # Scheduler tracks start/stop calls class FakeScheduler: def __init__(self, *a, **kw): self.is_running = False self.interval_s = None def start(self, interval_s): self.is_running = True self.interval_s = interval_s scheduler_events.append(f"start:{interval_s}") def stop(self): self.is_running = False scheduler_events.append("stop") async def run(self): await asyncio.sleep(9999) class FakePoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StopLoop(Exception): pass class ScriptedDetector: # turquoise→ARM, dark_green→PRIME, light_green→FIRE _script = [ ("turquoise", True), ("dark_green", True), ("light_green", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult(ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color) @property def rolling(self): return [] def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10 cfg.chart_window_region = None cfg.telegram.auto_poll_interval_s = 180 cfg.telegram.bot_token = "tok" cfg.telegram.chat_id = "123" cfg.telegram.allowed_chat_ids = ("123",) fake_sched = FakeScheduler() monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched) with pytest.raises(_StopLoop): await _main.run_live_async(cfg, duration_s=None) arm_alerts = [a for a in captured_alerts if a.kind == "arm"] prime_alerts = [a for a in captured_alerts if a.kind == "prime"] trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"] assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}" assert arm_alerts[0].direction == "BUY" assert len(prime_alerts) == 1 assert prime_alerts[0].direction == "BUY" assert len(trigger_alerts) == 1 assert trigger_alerts[0].direction == "BUY" # Scheduler must have started (on PRIME) and stopped (on FIRE) assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}" assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}" start_idx = scheduler_events.index("start:180") stop_idx = scheduler_events.index("stop") assert start_idx < stop_idx, "scheduler started after it stopped" # --------------------------------------------------------------------------- # Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally, # even when canary is paused or when detection is otherwise skipped. # Prior bug: `continue` past the drain loop caused commands to pile up. # --------------------------------------------------------------------------- def _make_ctx_for_drain(cmd_queue, dispatched: list): """Build a minimal RunContext where _dispatch_command just records calls.""" import atm.main as _main class _FakeAudit: def __init__(self): self.events = [] def log(self, e): self.events.append(e) class _FakeNotifier: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _FakeCanary: def __init__(self, paused=True): self.is_paused = paused class _FakeScheduler: is_running = False interval_s = None def start(self, s): pass def stop(self): pass state = _main._LoopState(start=0.0) ctx = _main.RunContext( cfg=MagicMock(), capture=lambda: None, canary=_FakeCanary(paused=True), detector=MagicMock(), fsm=MagicMock(), notifier=_FakeNotifier(), audit=_FakeAudit(), detection_log=_FakeAudit(), scheduler=_FakeScheduler(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=cmd_queue, state=state, levels_extractor_factory=lambda *a, **kw: None, ) return ctx @pytest.mark.asyncio async def test_drain_works_when_canary_paused(monkeypatch): """Regression: when canary.is_paused, _drain_cmd_queue still dispatches. Prior bug: detection loop `continue`'d past the drain block whenever the tick returned res=None (canary paused). Commands accumulated forever. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) await _main._drain_cmd_queue(ctx) assert dispatched == ["status", "ss"] assert q.empty() @pytest.mark.asyncio async def test_drain_works_when_out_of_window(monkeypatch): """Drain must still fire when the tick skipped (e.g. out of operating hours). The refactored loop runs _drain_cmd_queue unconditionally after every tick, regardless of `_TickSyncResult` content. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="stop")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) # Simulate out-of-window tick (empty _TickSyncResult, no res) await _main._handle_fsm_result(ctx, _main._TickSyncResult()) await _main._drain_cmd_queue(ctx) assert dispatched == ["stop"] @pytest.mark.asyncio async def test_drain_isolates_dispatch_exceptions(monkeypatch): """If one command raises, remaining commands still drain + warn alert sent.""" import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) attempts: list = [] async def _fake_dispatch(ctx, cmd): attempts.append(cmd.action) if cmd.action == "status": raise RuntimeError("boom") monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, attempts) await _main._drain_cmd_queue(ctx) assert attempts == ["status", "ss"] # warn alert for the failed command warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"] assert any("status" in t for t in warn_titles) # command_error audit event errs = [e for e in ctx.audit.events if e.get("event") == "command_error"] assert len(errs) == 1 and errs[0]["action"] == "status" # --------------------------------------------------------------------------- # Commit 4: operating hours + LifecycleState transitions # --------------------------------------------------------------------------- from zoneinfo import ZoneInfo as _ZI # noqa: E402 import datetime as _dt # noqa: E402 def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"), start="09:30", stop="16:00", tz="America/New_York"): """Build a lightweight cfg-like object with operating_hours populated.""" oh = types.SimpleNamespace( enabled=enabled, timezone=tz, weekdays=weekdays, start_hhmm=start, stop_hhmm=stop, _tz_cache=_ZI(tz) if enabled else None, ) return types.SimpleNamespace(operating_hours=oh, window_title=None) def _fake_canary(paused=False): return types.SimpleNamespace(is_paused=paused) @pytest.mark.parametrize( "local_dt,expected", [ # Monday 09:30 NY — exact open → active (None) (_dt.datetime(2026, 4, 20, 9, 30), None), # Monday 16:00 NY — exact close → inactive (>= stop) (_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"), # Monday 08:00 NY — before open (_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"), # Monday 12:00 NY — active (_dt.datetime(2026, 4, 20, 12, 0), None), # Saturday 12:00 NY — weekend (_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"), # Sunday 23:00 NY — weekend (_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"), ], ) def test_operating_hours_skip_matrix(local_dt, expected): """Timezone-aware start/stop + weekday checks.""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache now_ts = local_dt.replace(tzinfo=tz).timestamp() lifecycle = _main.LifecycleState() result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary()) assert result == expected def test_market_open_close_transitions_logged_once(): """Crossing a boundary emits exactly one market_open / market_closed event.""" import atm.main as _main audit_events = [] alerts = [] class _A: def log(self, e): audit_events.append(e) class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState() canary = _fake_canary() # Prime as closed (before open, Monday 08:00) pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary) _main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N()) # First evaluation seeds state, no alert yet. assert lifecycle.last_window_state == "closed" assert alerts == [] assert audit_events == [] # Transition to open mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() skip_mid = _main._should_skip(mid, lifecycle, cfg, canary) _main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N()) assert lifecycle.last_window_state == "open" assert len(alerts) == 1 assert any(e.get("event") == "market_open" for e in audit_events) # Repeated open tick — no duplicate log alerts.clear() audit_events.clear() skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary) _main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N()) assert alerts == [] assert audit_events == [] # Transition to close close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp() skip_close = _main._should_skip(close, lifecycle, cfg, canary) _main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N()) assert lifecycle.last_window_state == "closed" assert any(e.get("event") == "market_closed" for e in audit_events) def test_market_transition_sends_notification(): """market_open / market_closed transitions produce kind=status alerts.""" import atm.main as _main alerts = [] class _A: def log(self, e): pass class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState(last_window_state="closed") mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() _main._maybe_log_transition(None, lifecycle, mid, _A(), _N()) assert len(alerts) == 1 assert alerts[0].kind == "status" assert "piața" in alerts[0].title.lower() or "monitorizare" in alerts[0].body.lower() def test_startup_in_window_suppresses_market_open(): """R2 #20: first evaluation in-window just seeds state; no alert fires.""" import atm.main as _main alerts = [] events = [] class _A: def log(self, e): events.append(e) class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState() # last_window_state is None in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary()) assert skip is None _main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N()) # Seeded silently assert lifecycle.last_window_state == "open" assert alerts == [] assert not any(e.get("event") == "market_open" for e in events) # Two more ticks, still in-window → no spurious alert for _ in range(2): skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary()) _main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N()) assert alerts == [] def test_operating_hours_weekday_locale_independent(): """R2 #22: weekday check must not depend on process locale (strftime('%a')).""" import locale as _locale import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache # Saturday 12:00 NY sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() original = _locale.setlocale(_locale.LC_TIME) try: for loc in ("C", "de_DE.UTF-8"): try: _locale.setlocale(_locale.LC_TIME, loc) except _locale.Error: continue # locale not installed → skip gracefully lifecycle = _main.LifecycleState() result = _main._should_skip(sat, lifecycle, cfg, _fake_canary()) assert result == "out_of_window_weekend", ( f"locale={loc} returned {result!r}" ) finally: try: _locale.setlocale(_locale.LC_TIME, original) except _locale.Error: _locale.setlocale(_locale.LC_TIME, "C") def test_should_skip_user_paused_wins(): import atm.main as _main cfg = _oh_cfg() lifecycle = _main.LifecycleState(user_paused=True) # Mid-Monday (in-window) — should still skip because user_paused tz = cfg.operating_hours._tz_cache mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused" def test_should_skip_canary_drift_wins_over_window(): import atm.main as _main cfg = _oh_cfg() lifecycle = _main.LifecycleState() tz = cfg.operating_hours._tz_cache mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused" # --------------------------------------------------------------------------- # Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21) # --------------------------------------------------------------------------- def _dispatch_ctx(canary=None, lifecycle=None, cfg=None): """Minimal RunContext for _dispatch_command unit tests.""" import atm.main as _main class _A: def __init__(self): self.events = [] def log(self, e): self.events.append(e) class _N: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _S: is_running = False interval_s = None def start(self, s): self.is_running = True def stop(self): self.is_running = False if canary is None: canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) if lifecycle is None: lifecycle = _main.LifecycleState() if cfg is None: cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) state = _main._LoopState(start=0.0) ctx = _main.RunContext( cfg=cfg, capture=lambda: None, canary=canary, detector=MagicMock(), fsm=MagicMock(), notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=MagicMock(), state=state, levels_extractor_factory=lambda *a, **kw: None, lifecycle=lifecycle, ) return ctx @pytest.mark.asyncio async def test_pause_command_sets_user_paused_and_skips_detection(): import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() await _main._dispatch_command(ctx, Command(action="pause")) assert ctx.lifecycle.user_paused is True # When combined with _should_skip, we get user_paused assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused" # Audit + notif assert any(e.get("event") == "user_paused" for e in ctx.audit.events) assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts) @pytest.mark.asyncio async def test_resume_clears_user_paused_and_canary_when_forced(): import atm.main as _main from atm.commands import Command canary_state = {"paused": True} canary = types.SimpleNamespace( is_paused=True, resume=lambda: canary_state.__setitem__("paused", False), ) # Re-bind is_paused via property so resume() effect is visible class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert ctx.lifecycle.user_paused is False assert canary.is_paused is False force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert force_events and force_events[0]["force"] is True @pytest.mark.asyncio async def test_resume_plain_also_clears_canary_drift(): """2026-04-21 decision: /resume (no arg) now clears BOTH user_paused and canary drift. /resume force remains accepted as legacy alias. Previous behavior (force required for drift) was a UX trap — see plan doc.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume")) # no force assert ctx.lifecycle.user_paused is False assert canary.is_paused is False # drift cleared without force # Audit event still records was_drift + force=False for traceability resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert resumed and resumed[0]["was_drift"] is True and resumed[0]["force"] is False # Message mentions drift-pause was cleared status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status and ("drift" in (status[0].title + status[0].body).lower()) @pytest.mark.asyncio async def test_resume_force_alias_still_works(): """/resume force (value=1) remains accepted — legacy muscle memory.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert ctx.lifecycle.user_paused is False assert canary.is_paused is False resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert resumed and resumed[0]["force"] is True @pytest.mark.asyncio async def test_resume_out_of_window_responds_with_pending_message(): """/resume while operating-hours window is closed → special body.""" import atm.main as _main from atm.commands import Command cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed") canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg) # Pin time to Saturday import atm.main as _mm real_time = _mm.time fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() class _FakeTime: def time(self): return fake_ts def monotonic(self): return 0.0 _mm.time = _FakeTime() try: await _main._dispatch_command(ctx, Command(action="resume")) finally: _mm.time = real_time assert ctx.lifecycle.user_paused is False status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status combined = (status[0].title + status[0].body).lower() assert "închis" in combined or "piața" in combined or "ferestr" in combined @pytest.mark.asyncio async def test_status_command_reports_pause_reason(): """/status body must mention pause reason + window state.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.lifecycle.user_paused = True # Stub detector.rolling for status ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status body = status[0].body assert "pauză manuală" in body or "pauza" in body.lower() @pytest.mark.asyncio async def test_set_interval_refused_while_canary_paused(): """2026-04-21: /set_interval must not start scheduler while canary is drift-paused. Previously started scheduler silently, misleading the user into thinking detection was live. Now emits a warn and refuses.""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=True, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) await _main._dispatch_command(ctx, Command(action="set_interval", value=60)) # Scheduler must NOT have started assert ctx.scheduler.is_running is False # No scheduler_started audit event assert not any(e.get("event") == "scheduler_started" for e in ctx.audit.events) # A warn alert must have been sent referencing /resume warns = [a for a in ctx.notifier.alerts if a.kind == "warn"] assert warns combined = (warns[0].title + warns[0].body).lower() assert "drift" in combined and "/resume" in combined @pytest.mark.asyncio async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path): """2026-04-21: /ss still captures while canary is paused, but the alert body must warn that detection is off.""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=True, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) # Bypass window focus + use a simple non-None capture result fake_frame = object() ctx = types.SimpleNamespace(**{**ctx.__dict__}) # shallow copy RunContext fields # Simpler: just override the capture and save functions used async_capture_called = {"n": 0} def _capture(): async_capture_called["n"] += 1 return fake_frame ctx.capture = _capture ctx.fires_dir = tmp_path # window_title off so we skip focus branch ctx.cfg.window_title = None # stub _save_annotated_frame to return a dummy path monkeypatch.setattr(_main, "_save_annotated_frame", lambda *a, **kw: tmp_path / "fake_ss.png") await _main._dispatch_command(ctx, Command(action="ss")) screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"] assert screenshots assert "DETECȚIE OPRITĂ" in screenshots[0].body or "drift" in screenshots[0].body.lower() assert "/resume" in screenshots[0].body @pytest.mark.asyncio async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path): """/ss body must be empty when canary is not paused (no warn noise).""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) ctx.capture = lambda: object() ctx.fires_dir = tmp_path ctx.cfg.window_title = None monkeypatch.setattr(_main, "_save_annotated_frame", lambda *a, **kw: tmp_path / "fake_ss.png") await _main._dispatch_command(ctx, Command(action="ss")) screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"] assert screenshots assert screenshots[0].body == "" @pytest.mark.asyncio async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path): """E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert. This test verifies the full command-driven lifecycle in isolation: - canary starts drift-paused, _should_skip returns drift_paused - /resume force clears canary + user_paused - subsequent detection produces SELL fire through normal FSM path """ import atm.main as _main from atm.commands import Command # Canary with mutable pause state class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) ctx = _dispatch_ctx(canary=canary, cfg=cfg) # 1. While drift-paused, _should_skip returns drift_paused assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused" # 2. User issues /resume force await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert canary.is_paused is False assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None # 3. Feed a yellow→light_red sequence through _handle_tick (FSM path) from atm.state_machine import StateMachine, State fsm = StateMachine(lockout_s=60) class _N: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _A: def log(self, _e): pass notif = _N() audit = _A() cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True)) _main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock) _main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock) tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock) # FSM reached fire via normal path assert tr is not None and tr.trigger == "SELL" assert fsm.state == State.IDLE # --------------------------------------------------------------------------- # _in_trading_window tests # --------------------------------------------------------------------------- import datetime as _dt def test_in_trading_window_disabled(): """Returns True when operating_hours disabled.""" import atm.main as _main cfg = types.SimpleNamespace(operating_hours=types.SimpleNamespace(enabled=False)) assert _main._in_trading_window(0.0, cfg) is True def test_in_trading_window_no_oh(): """Returns True when cfg has no operating_hours attr.""" import atm.main as _main cfg = types.SimpleNamespace() assert _main._in_trading_window(0.0, cfg) is True def test_in_trading_window_in_window(): """Returns True during configured hours (Monday 12:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is True def test_in_trading_window_out_of_hours(): """Returns False before market open (Monday 08:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is False def test_in_trading_window_weekend(): """Returns False on weekend (Sunday 12:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 19, 12, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is False def test_heartbeat_suppressed_outside_hours(monkeypatch): """_in_trading_window=False prevents heartbeat from sending.""" import atm.main as _main monkeypatch.setattr(_main, "_in_trading_window", lambda ts, cfg: False) sent = [] # Simulate the check: outside window → no send if not _main._in_trading_window(0.0, None): pass # heartbeat_due reset, continue — no send assert sent == [] def test_build_heartbeat_alert_active_when_not_paused(): """Healthy state → title=activ, body shows fsm.state plainly.""" import atm.main as _main a = _main._build_heartbeat_alert( fsm_state="ARMED_BUY", fire_count=2, uptime_h=1.5, canary_paused=False, ) assert a.kind == "heartbeat" assert a.title == "activ" assert "ARMED_BUY" in a.body assert "[drift-pause]" not in a.body assert "semnale: 2" in a.body def test_build_heartbeat_alert_paused_when_canary_drift(): """2026-04-21: heartbeat must reflect canary drift instead of lying with 'activ'.""" import atm.main as _main a = _main._build_heartbeat_alert( fsm_state="ARMED_SELL", fire_count=0, uptime_h=3.2, canary_paused=True, ) assert a.kind == "heartbeat" assert "pauzat" in a.title assert "drift" in a.title.lower() assert "ARMED_SELL" in a.body assert "[drift-pause]" in a.body @pytest.mark.asyncio async def test_status_compact_active(): """/status produces compact 2-line format; 'Canary' absent.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="ARMED_BUY") ctx.state.fire_count = 3 await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status body = status[0].body lines = body.splitlines() assert len(lines) == 2 # no fereastră line (oh.enabled=False) assert "ARMED_BUY" in lines[0] assert "semnale: 3" in lines[0] assert "Canary" not in body assert "canary" not in body @pytest.mark.asyncio async def test_status_compact_paused_manual(): """/status shows 'pauză manuală' on line 1 when user_paused.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.lifecycle.user_paused = True ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] body = status[0].body assert body.startswith("pauză manuală") @pytest.mark.asyncio async def test_status_window_line_when_oh_enabled(): """/status adds fereastră line when operating_hours enabled.""" import atm.main as _main from atm.commands import Command cfg = _oh_cfg() lifecycle = _main.LifecycleState(last_window_state="open") ctx = _dispatch_ctx(lifecycle=lifecycle, cfg=cfg) ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] body = status[0].body assert "fereastră: deschisă" in body