AuditLog deadlock: log() held self._lock and called _open() which called close() which tried to acquire self._lock again — RLock not needed, refactored to _close_locked() (called while already holding lock). pyproject.toml: pytest-asyncio + httpx in dev deps. test_main.py: - lifecycle integration test (MUST-HAVE): IDLE→ARMED→PRIMED→auto-poll starts→FIRE→auto-poll stops, asserts scheduler event order - asyncio import for async test marker Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
391 lines
13 KiB
Python
391 lines
13 KiB
Python
"""Tests for atm.main unified CLI."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import types
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
|
|
SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"]
|
|
|
|
# Ensure subprocess invocations find the atm package even without pip install
|
|
_SRC = str(Path(__file__).resolve().parent.parent / "src")
|
|
_SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _mock_config_class(cfg=None):
|
|
"""Return a Config-like class whose load_current() returns *cfg*."""
|
|
if cfg is None:
|
|
cfg = MagicMock()
|
|
mock_cls = MagicMock()
|
|
mock_cls.load_current.return_value = cfg
|
|
return mock_cls
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_help_works
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_help_works():
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "atm", "--help"],
|
|
capture_output=True,
|
|
text=True,
|
|
env=_SUBPROCESS_ENV,
|
|
)
|
|
assert result.returncode == 0, result.stderr
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_subcommands_listed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_subcommands_listed():
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "atm", "--help"],
|
|
capture_output=True,
|
|
text=True,
|
|
env=_SUBPROCESS_ENV,
|
|
)
|
|
output = result.stdout
|
|
for cmd in SUBCOMMANDS:
|
|
assert cmd in output, f"Expected subcommand '{cmd}' in --help output"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_dryrun_wiring
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class _DryrunResult:
|
|
acceptance_pass: bool
|
|
|
|
|
|
def _make_dryrun_module(acceptance_pass: bool):
|
|
mod = types.ModuleType("atm.dryrun")
|
|
mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass)
|
|
mod.print_report = lambda r: None
|
|
return mod
|
|
|
|
|
|
def test_dryrun_wiring_pass(monkeypatch, tmp_path):
|
|
import atm.main as _main
|
|
|
|
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True))
|
|
monkeypatch.setattr("atm.main.Config", _mock_config_class())
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
_main.main(["dryrun", str(tmp_path)])
|
|
|
|
assert exc_info.value.code == 0
|
|
|
|
|
|
def test_dryrun_wiring_fail(monkeypatch, tmp_path):
|
|
import atm.main as _main
|
|
|
|
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False))
|
|
monkeypatch.setattr("atm.main.Config", _mock_config_class())
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
_main.main(["dryrun", str(tmp_path)])
|
|
|
|
assert exc_info.value.code == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_report_current_week_default
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_report_current_week_default(monkeypatch, tmp_path):
|
|
import atm.main as _main
|
|
|
|
# Journal.all returns no entries — report should print a zero-trade week
|
|
monkeypatch.setattr("atm.journal.Journal.all", lambda self: [])
|
|
|
|
# Should not raise; no sys.exit expected
|
|
_main.main(["report", "--file", str(tmp_path / "trades.jsonl")])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# test_run_live_dry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_run_live_dry(monkeypatch):
|
|
import atm.main as _main
|
|
|
|
calls: list[dict] = []
|
|
|
|
def _mock_run_live(cfg, duration_s=None, capture_stub=False):
|
|
calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub})
|
|
|
|
monkeypatch.setattr("atm.main.run_live", _mock_run_live)
|
|
monkeypatch.setattr("atm.main.Config", _mock_config_class())
|
|
|
|
_main.main(["run", "--duration", "0"])
|
|
|
|
assert len(calls) == 1
|
|
assert calls[0]["duration_s"] == pytest.approx(0.0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Regression integration test — user bug 2026-04-16.
|
|
# Session starts with an accepted gray tick followed by dark_red. Catchup
|
|
# synth-arm must fire on dark_red (the previous gray consumed first_accepted),
|
|
# then light_red triggers SELL. Proves run_live wiring dispatches alerts for
|
|
# the user's exact scenario.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
|
import numpy as np
|
|
import atm.main as _main
|
|
from atm.detector import DetectionResult
|
|
|
|
captured = []
|
|
|
|
class FakeFanout:
|
|
def __init__(self, *a, **kw):
|
|
pass
|
|
def send(self, alert):
|
|
captured.append(alert)
|
|
def stop(self):
|
|
pass
|
|
|
|
class FakeCanaryResult:
|
|
distance = 0
|
|
drifted = False
|
|
paused = False
|
|
|
|
class FakeCanary:
|
|
def __init__(self, *a, **kw):
|
|
self.is_paused = False
|
|
def check(self, frame):
|
|
return FakeCanaryResult()
|
|
def resume(self):
|
|
pass
|
|
|
|
class _StopLoop(Exception):
|
|
pass
|
|
|
|
class ScriptedDetector:
|
|
_script = [
|
|
("gray", True),
|
|
("gray", True),
|
|
("dark_red", True),
|
|
("dark_red", True),
|
|
("light_red", True),
|
|
]
|
|
def __init__(self, *a, **kw):
|
|
self._i = 0
|
|
def step(self, ts, frame=None):
|
|
if self._i >= len(self._script):
|
|
raise _StopLoop
|
|
color, accepted = self._script[self._i]
|
|
self._i += 1
|
|
return DetectionResult(
|
|
ts=ts,
|
|
window_found=True,
|
|
dot_found=True,
|
|
rgb=(1, 1, 1),
|
|
match=None,
|
|
accepted=accepted,
|
|
color=color,
|
|
)
|
|
|
|
def fake_build_capture(cfg, capture_stub=False):
|
|
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
|
|
|
|
cfg = MagicMock()
|
|
cfg.lockout_s = 60
|
|
cfg.heartbeat_min = 999
|
|
cfg.loop_interval_s = 0
|
|
cfg.config_version = "test"
|
|
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
|
|
cfg.canary.drift_threshold = 10
|
|
cfg.dot_roi.x = 0
|
|
cfg.dot_roi.y = 0
|
|
cfg.dot_roi.w = 10
|
|
cfg.dot_roi.h = 10
|
|
cfg.chart_window_region = None
|
|
|
|
monkeypatch.chdir(tmp_path)
|
|
class _Stub:
|
|
def __init__(self, *a, **kw):
|
|
pass
|
|
def log(self, *a, **kw):
|
|
pass
|
|
def close(self, *a, **kw):
|
|
pass
|
|
def step(self, *a, **kw):
|
|
return types.SimpleNamespace(status="pending", levels=None)
|
|
|
|
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
|
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
|
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
|
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
|
|
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
|
|
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
|
|
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
|
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
|
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
|
|
|
|
with pytest.raises(_StopLoop):
|
|
_main.run_live(cfg, duration_s=None)
|
|
|
|
arm = [a for a in captured if a.kind == "arm"]
|
|
prime = [a for a in captured if a.kind == "prime"]
|
|
trigger = [a for a in captured if a.kind == "trigger"]
|
|
|
|
assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})"
|
|
assert arm[0].direction == "SELL"
|
|
assert "recuperare" in (arm[0].title + arm[0].body).lower()
|
|
|
|
assert len(prime) == 1
|
|
assert prime[0].direction == "SELL"
|
|
assert "recuperare" in (prime[0].title + prime[0].body).lower()
|
|
|
|
assert len(trigger) == 1
|
|
assert trigger[0].direction == "SELL"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MUST-HAVE: async lifecycle integration test
|
|
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
|
|
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
|
|
import numpy as np
|
|
import atm.main as _main
|
|
from atm.detector import DetectionResult
|
|
|
|
captured_alerts: list = []
|
|
scheduler_events: list[str] = []
|
|
|
|
class FakeFanout:
|
|
def __init__(self, *a, **kw): pass
|
|
def send(self, alert): captured_alerts.append(alert)
|
|
def stop(self): pass
|
|
def stats(self): return {}
|
|
|
|
class FakeCanaryResult:
|
|
distance = 0
|
|
drifted = False
|
|
paused = False
|
|
|
|
class FakeCanary:
|
|
def __init__(self, *a, **kw): self.is_paused = False
|
|
def check(self, frame): return FakeCanaryResult()
|
|
def resume(self): pass
|
|
|
|
# Scheduler tracks start/stop calls
|
|
class FakeScheduler:
|
|
def __init__(self, *a, **kw):
|
|
self.is_running = False
|
|
self.interval_s = None
|
|
def start(self, interval_s):
|
|
self.is_running = True
|
|
self.interval_s = interval_s
|
|
scheduler_events.append(f"start:{interval_s}")
|
|
def stop(self):
|
|
self.is_running = False
|
|
scheduler_events.append("stop")
|
|
async def run(self):
|
|
await asyncio.sleep(9999)
|
|
|
|
class FakePoller:
|
|
def __init__(self, *a, **kw): pass
|
|
async def run(self): await asyncio.sleep(9999)
|
|
|
|
class _StopLoop(Exception): pass
|
|
|
|
class ScriptedDetector:
|
|
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
|
|
_script = [
|
|
("turquoise", True),
|
|
("dark_green", True),
|
|
("light_green", True),
|
|
]
|
|
def __init__(self, *a, **kw): self._i = 0
|
|
def step(self, ts, frame=None):
|
|
if self._i >= len(self._script):
|
|
raise _StopLoop
|
|
color, accepted = self._script[self._i]
|
|
self._i += 1
|
|
return DetectionResult(ts=ts, window_found=True, dot_found=True,
|
|
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
|
|
@property
|
|
def rolling(self): return []
|
|
|
|
def fake_build_capture(cfg, capture_stub=False):
|
|
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
|
|
|
|
cfg = MagicMock()
|
|
cfg.lockout_s = 60
|
|
cfg.heartbeat_min = 999
|
|
cfg.loop_interval_s = 0
|
|
cfg.config_version = "test"
|
|
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
|
|
cfg.canary.drift_threshold = 10
|
|
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
|
|
cfg.chart_window_region = None
|
|
cfg.telegram.auto_poll_interval_s = 180
|
|
cfg.telegram.bot_token = "tok"
|
|
cfg.telegram.chat_id = "123"
|
|
cfg.telegram.allowed_chat_ids = ("123",)
|
|
|
|
fake_sched = FakeScheduler()
|
|
|
|
monkeypatch.chdir(tmp_path)
|
|
|
|
class _Stub:
|
|
def __init__(self, *a, **kw): pass
|
|
def log(self, *a, **kw): pass
|
|
def close(self, *a, **kw): pass
|
|
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
|
|
|
|
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
|
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
|
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
|
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
|
|
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
|
|
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
|
|
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
|
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
|
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
|
|
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
|
|
|
|
with pytest.raises(_StopLoop):
|
|
await _main.run_live_async(cfg, duration_s=None)
|
|
|
|
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
|
|
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
|
|
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
|
|
|
|
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
|
|
assert arm_alerts[0].direction == "BUY"
|
|
|
|
assert len(prime_alerts) == 1
|
|
assert prime_alerts[0].direction == "BUY"
|
|
|
|
assert len(trigger_alerts) == 1
|
|
assert trigger_alerts[0].direction == "BUY"
|
|
|
|
# Scheduler must have started (on PRIME) and stopped (on FIRE)
|
|
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
|
|
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
|
|
start_idx = scheduler_events.index("start:180")
|
|
stop_idx = scheduler_events.index("stop")
|
|
assert start_idx < stop_idx, "scheduler started after it stopped"
|