"""Tests for atm.main unified CLI.""" from __future__ import annotations import asyncio import os import subprocess import sys import types from dataclasses import dataclass from pathlib import Path from unittest.mock import MagicMock import pytest SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"] # Ensure subprocess invocations find the atm package even without pip install _SRC = str(Path(__file__).resolve().parent.parent / "src") _SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _mock_config_class(cfg=None): """Return a Config-like class whose load_current() returns *cfg*.""" if cfg is None: cfg = MagicMock() mock_cls = MagicMock() mock_cls.load_current.return_value = cfg return mock_cls # --------------------------------------------------------------------------- # test_help_works # --------------------------------------------------------------------------- def test_help_works(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) assert result.returncode == 0, result.stderr # --------------------------------------------------------------------------- # test_subcommands_listed # --------------------------------------------------------------------------- def test_subcommands_listed(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) output = result.stdout for cmd in SUBCOMMANDS: assert cmd in output, f"Expected subcommand '{cmd}' in --help output" # --------------------------------------------------------------------------- # test_dryrun_wiring # --------------------------------------------------------------------------- @dataclass class _DryrunResult: acceptance_pass: bool def _make_dryrun_module(acceptance_pass: bool): mod = types.ModuleType("atm.dryrun") mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass) mod.print_report = lambda r: None return mod def test_dryrun_wiring_pass(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 0 def test_dryrun_wiring_fail(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 1 # --------------------------------------------------------------------------- # test_report_current_week_default # --------------------------------------------------------------------------- def test_report_current_week_default(monkeypatch, tmp_path): import atm.main as _main # Journal.all returns no entries — report should print a zero-trade week monkeypatch.setattr("atm.journal.Journal.all", lambda self: []) # Should not raise; no sys.exit expected _main.main(["report", "--file", str(tmp_path / "trades.jsonl")]) # --------------------------------------------------------------------------- # test_run_live_dry # --------------------------------------------------------------------------- def test_run_live_dry(monkeypatch): import atm.main as _main calls: list[dict] = [] def _mock_run_live(cfg, duration_s=None, capture_stub=False): calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub}) monkeypatch.setattr("atm.main.run_live", _mock_run_live) monkeypatch.setattr("atm.main.Config", _mock_config_class()) _main.main(["run", "--duration", "0"]) assert len(calls) == 1 assert calls[0]["duration_s"] == pytest.approx(0.0) # --------------------------------------------------------------------------- # Regression integration test — user bug 2026-04-16. # Session starts with an accepted gray tick followed by dark_red. Catchup # synth-arm must fire on dark_red (the previous gray consumed first_accepted), # then light_red triggers SELL. Proves run_live wiring dispatches alerts for # the user's exact scenario. # --------------------------------------------------------------------------- def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured.append(alert) def stop(self): pass class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass class _StopLoop(Exception): pass class ScriptedDetector: _script = [ ("gray", True), ("gray", True), ("dark_red", True), ("dark_red", True), ("light_red", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult( ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color, ) def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0 cfg.dot_roi.y = 0 cfg.dot_roi.w = 10 cfg.dot_roi.h = 10 cfg.chart_window_region = None monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) class _StubPoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StubScheduler: def __init__(self, *a, **kw): self.is_running = False def start(self, interval_s): self.is_running = True def stop(self): self.is_running = False async def run(self): await asyncio.sleep(9999) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.main.time.sleep", lambda s: None) monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler) with pytest.raises(_StopLoop): _main.run_live(cfg, duration_s=None) arm = [a for a in captured if a.kind == "arm"] prime = [a for a in captured if a.kind == "prime"] trigger = [a for a in captured if a.kind == "trigger"] assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})" assert arm[0].direction == "SELL" assert "recuperare" in (arm[0].title + arm[0].body).lower() assert len(prime) == 1 assert prime[0].direction == "SELL" assert "recuperare" in (prime[0].title + prime[0].body).lower() assert len(trigger) == 1 assert trigger[0].direction == "SELL" # --------------------------------------------------------------------------- # MUST-HAVE: async lifecycle integration test # IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops) # Tests: scheduler starts on prime, stops on fire, fire alert sent. # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured_alerts: list = [] scheduler_events: list[str] = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured_alerts.append(alert) def stop(self): pass def stats(self): return {} class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass # Scheduler tracks start/stop calls class FakeScheduler: def __init__(self, *a, **kw): self.is_running = False self.interval_s = None def start(self, interval_s): self.is_running = True self.interval_s = interval_s scheduler_events.append(f"start:{interval_s}") def stop(self): self.is_running = False scheduler_events.append("stop") async def run(self): await asyncio.sleep(9999) class FakePoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StopLoop(Exception): pass class ScriptedDetector: # turquoise→ARM, dark_green→PRIME, light_green→FIRE _script = [ ("turquoise", True), ("dark_green", True), ("light_green", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult(ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color) @property def rolling(self): return [] def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10 cfg.chart_window_region = None cfg.telegram.auto_poll_interval_s = 180 cfg.telegram.bot_token = "tok" cfg.telegram.chat_id = "123" cfg.telegram.allowed_chat_ids = ("123",) fake_sched = FakeScheduler() monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched) with pytest.raises(_StopLoop): await _main.run_live_async(cfg, duration_s=None) arm_alerts = [a for a in captured_alerts if a.kind == "arm"] prime_alerts = [a for a in captured_alerts if a.kind == "prime"] trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"] assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}" assert arm_alerts[0].direction == "BUY" assert len(prime_alerts) == 1 assert prime_alerts[0].direction == "BUY" assert len(trigger_alerts) == 1 assert trigger_alerts[0].direction == "BUY" # Scheduler must have started (on PRIME) and stopped (on FIRE) assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}" assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}" start_idx = scheduler_events.index("start:180") stop_idx = scheduler_events.index("stop") assert start_idx < stop_idx, "scheduler started after it stopped" # --------------------------------------------------------------------------- # Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally, # even when canary is paused or when detection is otherwise skipped. # Prior bug: `continue` past the drain loop caused commands to pile up. # --------------------------------------------------------------------------- def _make_ctx_for_drain(cmd_queue, dispatched: list): """Build a minimal RunContext where _dispatch_command just records calls.""" import atm.main as _main class _FakeAudit: def __init__(self): self.events = [] def log(self, e): self.events.append(e) class _FakeNotifier: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _FakeCanary: def __init__(self, paused=True): self.is_paused = paused class _FakeScheduler: is_running = False interval_s = None def start(self, s): pass def stop(self): pass state = _main._LoopState(start=0.0) ctx = _main.RunContext( cfg=MagicMock(), capture=lambda: None, canary=_FakeCanary(paused=True), detector=MagicMock(), fsm=MagicMock(), notifier=_FakeNotifier(), audit=_FakeAudit(), detection_log=_FakeAudit(), scheduler=_FakeScheduler(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=cmd_queue, state=state, levels_extractor_factory=lambda *a, **kw: None, ) return ctx @pytest.mark.asyncio async def test_drain_works_when_canary_paused(monkeypatch): """Regression: when canary.is_paused, _drain_cmd_queue still dispatches. Prior bug: detection loop `continue`'d past the drain block whenever the tick returned res=None (canary paused). Commands accumulated forever. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) await _main._drain_cmd_queue(ctx) assert dispatched == ["status", "ss"] assert q.empty() @pytest.mark.asyncio async def test_drain_works_when_out_of_window(monkeypatch): """Drain must still fire when the tick skipped (e.g. out of operating hours). The refactored loop runs _drain_cmd_queue unconditionally after every tick, regardless of `_TickSyncResult` content. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="stop")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) # Simulate out-of-window tick (empty _TickSyncResult, no res) await _main._handle_fsm_result(ctx, _main._TickSyncResult()) await _main._drain_cmd_queue(ctx) assert dispatched == ["stop"] @pytest.mark.asyncio async def test_drain_isolates_dispatch_exceptions(monkeypatch): """If one command raises, remaining commands still drain + warn alert sent.""" import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) attempts: list = [] async def _fake_dispatch(ctx, cmd): attempts.append(cmd.action) if cmd.action == "status": raise RuntimeError("boom") monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, attempts) await _main._drain_cmd_queue(ctx) assert attempts == ["status", "ss"] # warn alert for the failed command warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"] assert any("status" in t for t in warn_titles) # command_error audit event errs = [e for e in ctx.audit.events if e.get("event") == "command_error"] assert len(errs) == 1 and errs[0]["action"] == "status"