Files
atm/tests/test_main.py
Claude Agent 0f7dd5dc84 fix(deps+tests): move httpx to prod deps; stub Poller+Scheduler in sync test
httpx was in dev deps only, causing ImportError for users doing `pip install -e .`
since atm.commands imports httpx at module level. Moved to main dependencies.

Also stubs TelegramPoller and ScreenshotScheduler in the sync catchup test to
prevent flaky CI failures from attempted real network connections.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 11:00:40 +00:00

404 lines
14 KiB
Python

"""Tests for atm.main unified CLI."""
from __future__ import annotations
import asyncio
import os
import subprocess
import sys
import types
from dataclasses import dataclass
from pathlib import Path
from unittest.mock import MagicMock
import pytest
SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"]
# Ensure subprocess invocations find the atm package even without pip install
_SRC = str(Path(__file__).resolve().parent.parent / "src")
_SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_config_class(cfg=None):
"""Return a Config-like class whose load_current() returns *cfg*."""
if cfg is None:
cfg = MagicMock()
mock_cls = MagicMock()
mock_cls.load_current.return_value = cfg
return mock_cls
# ---------------------------------------------------------------------------
# test_help_works
# ---------------------------------------------------------------------------
def test_help_works():
result = subprocess.run(
[sys.executable, "-m", "atm", "--help"],
capture_output=True,
text=True,
env=_SUBPROCESS_ENV,
)
assert result.returncode == 0, result.stderr
# ---------------------------------------------------------------------------
# test_subcommands_listed
# ---------------------------------------------------------------------------
def test_subcommands_listed():
result = subprocess.run(
[sys.executable, "-m", "atm", "--help"],
capture_output=True,
text=True,
env=_SUBPROCESS_ENV,
)
output = result.stdout
for cmd in SUBCOMMANDS:
assert cmd in output, f"Expected subcommand '{cmd}' in --help output"
# ---------------------------------------------------------------------------
# test_dryrun_wiring
# ---------------------------------------------------------------------------
@dataclass
class _DryrunResult:
acceptance_pass: bool
def _make_dryrun_module(acceptance_pass: bool):
mod = types.ModuleType("atm.dryrun")
mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass)
mod.print_report = lambda r: None
return mod
def test_dryrun_wiring_pass(monkeypatch, tmp_path):
import atm.main as _main
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True))
monkeypatch.setattr("atm.main.Config", _mock_config_class())
with pytest.raises(SystemExit) as exc_info:
_main.main(["dryrun", str(tmp_path)])
assert exc_info.value.code == 0
def test_dryrun_wiring_fail(monkeypatch, tmp_path):
import atm.main as _main
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False))
monkeypatch.setattr("atm.main.Config", _mock_config_class())
with pytest.raises(SystemExit) as exc_info:
_main.main(["dryrun", str(tmp_path)])
assert exc_info.value.code == 1
# ---------------------------------------------------------------------------
# test_report_current_week_default
# ---------------------------------------------------------------------------
def test_report_current_week_default(monkeypatch, tmp_path):
import atm.main as _main
# Journal.all returns no entries — report should print a zero-trade week
monkeypatch.setattr("atm.journal.Journal.all", lambda self: [])
# Should not raise; no sys.exit expected
_main.main(["report", "--file", str(tmp_path / "trades.jsonl")])
# ---------------------------------------------------------------------------
# test_run_live_dry
# ---------------------------------------------------------------------------
def test_run_live_dry(monkeypatch):
import atm.main as _main
calls: list[dict] = []
def _mock_run_live(cfg, duration_s=None, capture_stub=False):
calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub})
monkeypatch.setattr("atm.main.run_live", _mock_run_live)
monkeypatch.setattr("atm.main.Config", _mock_config_class())
_main.main(["run", "--duration", "0"])
assert len(calls) == 1
assert calls[0]["duration_s"] == pytest.approx(0.0)
# ---------------------------------------------------------------------------
# Regression integration test — user bug 2026-04-16.
# Session starts with an accepted gray tick followed by dark_red. Catchup
# synth-arm must fire on dark_red (the previous gray consumed first_accepted),
# then light_red triggers SELL. Proves run_live wiring dispatches alerts for
# the user's exact scenario.
# ---------------------------------------------------------------------------
def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
import numpy as np
import atm.main as _main
from atm.detector import DetectionResult
captured = []
class FakeFanout:
def __init__(self, *a, **kw):
pass
def send(self, alert):
captured.append(alert)
def stop(self):
pass
class FakeCanaryResult:
distance = 0
drifted = False
paused = False
class FakeCanary:
def __init__(self, *a, **kw):
self.is_paused = False
def check(self, frame):
return FakeCanaryResult()
def resume(self):
pass
class _StopLoop(Exception):
pass
class ScriptedDetector:
_script = [
("gray", True),
("gray", True),
("dark_red", True),
("dark_red", True),
("light_red", True),
]
def __init__(self, *a, **kw):
self._i = 0
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]
self._i += 1
return DetectionResult(
ts=ts,
window_found=True,
dot_found=True,
rgb=(1, 1, 1),
match=None,
accepted=accepted,
color=color,
)
def fake_build_capture(cfg, capture_stub=False):
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
cfg = MagicMock()
cfg.lockout_s = 60
cfg.heartbeat_min = 999
cfg.loop_interval_s = 0
cfg.config_version = "test"
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
cfg.canary.drift_threshold = 10
cfg.dot_roi.x = 0
cfg.dot_roi.y = 0
cfg.dot_roi.w = 10
cfg.dot_roi.h = 10
cfg.chart_window_region = None
monkeypatch.chdir(tmp_path)
class _Stub:
def __init__(self, *a, **kw):
pass
def log(self, *a, **kw):
pass
def close(self, *a, **kw):
pass
def step(self, *a, **kw):
return types.SimpleNamespace(status="pending", levels=None)
class _StubPoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StubScheduler:
def __init__(self, *a, **kw):
self.is_running = False
def start(self, interval_s): self.is_running = True
def stop(self): self.is_running = False
async def run(self): await asyncio.sleep(9999)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
with pytest.raises(_StopLoop):
_main.run_live(cfg, duration_s=None)
arm = [a for a in captured if a.kind == "arm"]
prime = [a for a in captured if a.kind == "prime"]
trigger = [a for a in captured if a.kind == "trigger"]
assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})"
assert arm[0].direction == "SELL"
assert "recuperare" in (arm[0].title + arm[0].body).lower()
assert len(prime) == 1
assert prime[0].direction == "SELL"
assert "recuperare" in (prime[0].title + prime[0].body).lower()
assert len(trigger) == 1
assert trigger[0].direction == "SELL"
# ---------------------------------------------------------------------------
# MUST-HAVE: async lifecycle integration test
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
import numpy as np
import atm.main as _main
from atm.detector import DetectionResult
captured_alerts: list = []
scheduler_events: list[str] = []
class FakeFanout:
def __init__(self, *a, **kw): pass
def send(self, alert): captured_alerts.append(alert)
def stop(self): pass
def stats(self): return {}
class FakeCanaryResult:
distance = 0
drifted = False
paused = False
class FakeCanary:
def __init__(self, *a, **kw): self.is_paused = False
def check(self, frame): return FakeCanaryResult()
def resume(self): pass
# Scheduler tracks start/stop calls
class FakeScheduler:
def __init__(self, *a, **kw):
self.is_running = False
self.interval_s = None
def start(self, interval_s):
self.is_running = True
self.interval_s = interval_s
scheduler_events.append(f"start:{interval_s}")
def stop(self):
self.is_running = False
scheduler_events.append("stop")
async def run(self):
await asyncio.sleep(9999)
class FakePoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StopLoop(Exception): pass
class ScriptedDetector:
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
_script = [
("turquoise", True),
("dark_green", True),
("light_green", True),
]
def __init__(self, *a, **kw): self._i = 0
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]
self._i += 1
return DetectionResult(ts=ts, window_found=True, dot_found=True,
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
@property
def rolling(self): return []
def fake_build_capture(cfg, capture_stub=False):
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
cfg = MagicMock()
cfg.lockout_s = 60
cfg.heartbeat_min = 999
cfg.loop_interval_s = 0
cfg.config_version = "test"
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
cfg.canary.drift_threshold = 10
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
cfg.chart_window_region = None
cfg.telegram.auto_poll_interval_s = 180
cfg.telegram.bot_token = "tok"
cfg.telegram.chat_id = "123"
cfg.telegram.allowed_chat_ids = ("123",)
fake_sched = FakeScheduler()
monkeypatch.chdir(tmp_path)
class _Stub:
def __init__(self, *a, **kw): pass
def log(self, *a, **kw): pass
def close(self, *a, **kw): pass
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
with pytest.raises(_StopLoop):
await _main.run_live_async(cfg, duration_s=None)
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
assert arm_alerts[0].direction == "BUY"
assert len(prime_alerts) == 1
assert prime_alerts[0].direction == "BUY"
assert len(trigger_alerts) == 1
assert trigger_alerts[0].direction == "BUY"
# Scheduler must have started (on PRIME) and stopped (on FIRE)
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
start_idx = scheduler_events.index("start:180")
stop_idx = scheduler_events.index("stop")
assert start_idx < stop_idx, "scheduler started after it stopped"