httpx was in dev deps only, causing ImportError for users doing `pip install -e .` since atm.commands imports httpx at module level. Moved to main dependencies. Also stubs TelegramPoller and ScreenshotScheduler in the sync catchup test to prevent flaky CI failures from attempted real network connections. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
404 lines
14 KiB
Python
404 lines
14 KiB
Python
"""Tests for atm.main unified CLI."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import types
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
|
|
SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"]
|
|
|
|
# Ensure subprocess invocations find the atm package even without pip install
|
|
_SRC = str(Path(__file__).resolve().parent.parent / "src")
|
|
_SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _mock_config_class(cfg=None):
|
|
"""Return a Config-like class whose load_current() returns *cfg*."""
|
|
if cfg is None:
|
|
cfg = MagicMock()
|
|
mock_cls = MagicMock()
|
|
mock_cls.load_current.return_value = cfg
|
|
return mock_cls
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_help_works
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_help_works():
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "atm", "--help"],
|
|
capture_output=True,
|
|
text=True,
|
|
env=_SUBPROCESS_ENV,
|
|
)
|
|
assert result.returncode == 0, result.stderr
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_subcommands_listed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_subcommands_listed():
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "atm", "--help"],
|
|
capture_output=True,
|
|
text=True,
|
|
env=_SUBPROCESS_ENV,
|
|
)
|
|
output = result.stdout
|
|
for cmd in SUBCOMMANDS:
|
|
assert cmd in output, f"Expected subcommand '{cmd}' in --help output"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_dryrun_wiring
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class _DryrunResult:
|
|
acceptance_pass: bool
|
|
|
|
|
|
def _make_dryrun_module(acceptance_pass: bool):
|
|
mod = types.ModuleType("atm.dryrun")
|
|
mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass)
|
|
mod.print_report = lambda r: None
|
|
return mod
|
|
|
|
|
|
def test_dryrun_wiring_pass(monkeypatch, tmp_path):
|
|
import atm.main as _main
|
|
|
|
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True))
|
|
monkeypatch.setattr("atm.main.Config", _mock_config_class())
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
_main.main(["dryrun", str(tmp_path)])
|
|
|
|
assert exc_info.value.code == 0
|
|
|
|
|
|
def test_dryrun_wiring_fail(monkeypatch, tmp_path):
|
|
import atm.main as _main
|
|
|
|
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False))
|
|
monkeypatch.setattr("atm.main.Config", _mock_config_class())
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
_main.main(["dryrun", str(tmp_path)])
|
|
|
|
assert exc_info.value.code == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_report_current_week_default
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_report_current_week_default(monkeypatch, tmp_path):
|
|
import atm.main as _main
|
|
|
|
# Journal.all returns no entries — report should print a zero-trade week
|
|
monkeypatch.setattr("atm.journal.Journal.all", lambda self: [])
|
|
|
|
# Should not raise; no sys.exit expected
|
|
_main.main(["report", "--file", str(tmp_path / "trades.jsonl")])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_run_live_dry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_run_live_dry(monkeypatch):
|
|
import atm.main as _main
|
|
|
|
calls: list[dict] = []
|
|
|
|
def _mock_run_live(cfg, duration_s=None, capture_stub=False):
|
|
calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub})
|
|
|
|
monkeypatch.setattr("atm.main.run_live", _mock_run_live)
|
|
monkeypatch.setattr("atm.main.Config", _mock_config_class())
|
|
|
|
_main.main(["run", "--duration", "0"])
|
|
|
|
assert len(calls) == 1
|
|
assert calls[0]["duration_s"] == pytest.approx(0.0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Regression integration test — user bug 2026-04-16.
|
|
# Session starts with an accepted gray tick followed by dark_red. Catchup
|
|
# synth-arm must fire on dark_red (the previous gray consumed first_accepted),
|
|
# then light_red triggers SELL. Proves run_live wiring dispatches alerts for
|
|
# the user's exact scenario.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
|
import numpy as np
|
|
import atm.main as _main
|
|
from atm.detector import DetectionResult
|
|
|
|
captured = []
|
|
|
|
class FakeFanout:
|
|
def __init__(self, *a, **kw):
|
|
pass
|
|
def send(self, alert):
|
|
captured.append(alert)
|
|
def stop(self):
|
|
pass
|
|
|
|
class FakeCanaryResult:
|
|
distance = 0
|
|
drifted = False
|
|
paused = False
|
|
|
|
class FakeCanary:
|
|
def __init__(self, *a, **kw):
|
|
self.is_paused = False
|
|
def check(self, frame):
|
|
return FakeCanaryResult()
|
|
def resume(self):
|
|
pass
|
|
|
|
class _StopLoop(Exception):
|
|
pass
|
|
|
|
class ScriptedDetector:
|
|
_script = [
|
|
("gray", True),
|
|
("gray", True),
|
|
("dark_red", True),
|
|
("dark_red", True),
|
|
("light_red", True),
|
|
]
|
|
def __init__(self, *a, **kw):
|
|
self._i = 0
|
|
def step(self, ts, frame=None):
|
|
if self._i >= len(self._script):
|
|
raise _StopLoop
|
|
color, accepted = self._script[self._i]
|
|
self._i += 1
|
|
return DetectionResult(
|
|
ts=ts,
|
|
window_found=True,
|
|
dot_found=True,
|
|
rgb=(1, 1, 1),
|
|
match=None,
|
|
accepted=accepted,
|
|
color=color,
|
|
)
|
|
|
|
def fake_build_capture(cfg, capture_stub=False):
|
|
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
|
|
|
|
cfg = MagicMock()
|
|
cfg.lockout_s = 60
|
|
cfg.heartbeat_min = 999
|
|
cfg.loop_interval_s = 0
|
|
cfg.config_version = "test"
|
|
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
|
|
cfg.canary.drift_threshold = 10
|
|
cfg.dot_roi.x = 0
|
|
cfg.dot_roi.y = 0
|
|
cfg.dot_roi.w = 10
|
|
cfg.dot_roi.h = 10
|
|
cfg.chart_window_region = None
|
|
|
|
monkeypatch.chdir(tmp_path)
|
|
class _Stub:
|
|
def __init__(self, *a, **kw):
|
|
pass
|
|
def log(self, *a, **kw):
|
|
pass
|
|
def close(self, *a, **kw):
|
|
pass
|
|
def step(self, *a, **kw):
|
|
return types.SimpleNamespace(status="pending", levels=None)
|
|
|
|
class _StubPoller:
|
|
def __init__(self, *a, **kw): pass
|
|
async def run(self): await asyncio.sleep(9999)
|
|
|
|
class _StubScheduler:
|
|
def __init__(self, *a, **kw):
|
|
self.is_running = False
|
|
def start(self, interval_s): self.is_running = True
|
|
def stop(self): self.is_running = False
|
|
async def run(self): await asyncio.sleep(9999)
|
|
|
|
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
|
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
|
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
|
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
|
|
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
|
|
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
|
|
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
|
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
|
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
|
|
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
|
|
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
|
|
|
|
with pytest.raises(_StopLoop):
|
|
_main.run_live(cfg, duration_s=None)
|
|
|
|
arm = [a for a in captured if a.kind == "arm"]
|
|
prime = [a for a in captured if a.kind == "prime"]
|
|
trigger = [a for a in captured if a.kind == "trigger"]
|
|
|
|
assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})"
|
|
assert arm[0].direction == "SELL"
|
|
assert "recuperare" in (arm[0].title + arm[0].body).lower()
|
|
|
|
assert len(prime) == 1
|
|
assert prime[0].direction == "SELL"
|
|
assert "recuperare" in (prime[0].title + prime[0].body).lower()
|
|
|
|
assert len(trigger) == 1
|
|
assert trigger[0].direction == "SELL"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MUST-HAVE: async lifecycle integration test
|
|
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
|
|
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
|
|
import numpy as np
|
|
import atm.main as _main
|
|
from atm.detector import DetectionResult
|
|
|
|
captured_alerts: list = []
|
|
scheduler_events: list[str] = []
|
|
|
|
class FakeFanout:
|
|
def __init__(self, *a, **kw): pass
|
|
def send(self, alert): captured_alerts.append(alert)
|
|
def stop(self): pass
|
|
def stats(self): return {}
|
|
|
|
class FakeCanaryResult:
|
|
distance = 0
|
|
drifted = False
|
|
paused = False
|
|
|
|
class FakeCanary:
|
|
def __init__(self, *a, **kw): self.is_paused = False
|
|
def check(self, frame): return FakeCanaryResult()
|
|
def resume(self): pass
|
|
|
|
# Scheduler tracks start/stop calls
|
|
class FakeScheduler:
|
|
def __init__(self, *a, **kw):
|
|
self.is_running = False
|
|
self.interval_s = None
|
|
def start(self, interval_s):
|
|
self.is_running = True
|
|
self.interval_s = interval_s
|
|
scheduler_events.append(f"start:{interval_s}")
|
|
def stop(self):
|
|
self.is_running = False
|
|
scheduler_events.append("stop")
|
|
async def run(self):
|
|
await asyncio.sleep(9999)
|
|
|
|
class FakePoller:
|
|
def __init__(self, *a, **kw): pass
|
|
async def run(self): await asyncio.sleep(9999)
|
|
|
|
class _StopLoop(Exception): pass
|
|
|
|
class ScriptedDetector:
|
|
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
|
|
_script = [
|
|
("turquoise", True),
|
|
("dark_green", True),
|
|
("light_green", True),
|
|
]
|
|
def __init__(self, *a, **kw): self._i = 0
|
|
def step(self, ts, frame=None):
|
|
if self._i >= len(self._script):
|
|
raise _StopLoop
|
|
color, accepted = self._script[self._i]
|
|
self._i += 1
|
|
return DetectionResult(ts=ts, window_found=True, dot_found=True,
|
|
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
|
|
@property
|
|
def rolling(self): return []
|
|
|
|
def fake_build_capture(cfg, capture_stub=False):
|
|
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
|
|
|
|
cfg = MagicMock()
|
|
cfg.lockout_s = 60
|
|
cfg.heartbeat_min = 999
|
|
cfg.loop_interval_s = 0
|
|
cfg.config_version = "test"
|
|
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
|
|
cfg.canary.drift_threshold = 10
|
|
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
|
|
cfg.chart_window_region = None
|
|
cfg.telegram.auto_poll_interval_s = 180
|
|
cfg.telegram.bot_token = "tok"
|
|
cfg.telegram.chat_id = "123"
|
|
cfg.telegram.allowed_chat_ids = ("123",)
|
|
|
|
fake_sched = FakeScheduler()
|
|
|
|
monkeypatch.chdir(tmp_path)
|
|
|
|
class _Stub:
|
|
def __init__(self, *a, **kw): pass
|
|
def log(self, *a, **kw): pass
|
|
def close(self, *a, **kw): pass
|
|
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
|
|
|
|
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
|
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
|
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
|
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
|
|
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
|
|
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
|
|
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
|
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
|
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
|
|
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
|
|
|
|
with pytest.raises(_StopLoop):
|
|
await _main.run_live_async(cfg, duration_s=None)
|
|
|
|
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
|
|
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
|
|
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
|
|
|
|
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
|
|
assert arm_alerts[0].direction == "BUY"
|
|
|
|
assert len(prime_alerts) == 1
|
|
assert prime_alerts[0].direction == "BUY"
|
|
|
|
assert len(trigger_alerts) == 1
|
|
assert trigger_alerts[0].direction == "BUY"
|
|
|
|
# Scheduler must have started (on PRIME) and stopped (on FIRE)
|
|
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
|
|
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
|
|
start_idx = scheduler_events.index("start:180")
|
|
stop_idx = scheduler_events.index("stop")
|
|
assert start_idx < stop_idx, "scheduler started after it stopped"
|