From 424437ceaf7a44077813ef114508850ed92d8d0a Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Fri, 17 Apr 2026 10:54:10 +0000 Subject: [PATCH] fix(audit)+test: deadlock fix + lifecycle test + pytest-asyncio MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AuditLog deadlock: log() held self._lock and called _open() which called close() which tried to acquire self._lock again — RLock not needed, refactored to _close_locked() (called while already holding lock). pyproject.toml: pytest-asyncio + httpx in dev deps. test_main.py: - lifecycle integration test (MUST-HAVE): IDLE→ARMED→PRIMED→auto-poll starts→FIRE→auto-poll stops, asserts scheduler event order - asyncio import for async test marker Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 + src/atm/audit.py | 20 ++++--- tests/test_main.py | 133 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6f8a0a1..a5a56c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,8 @@ windows = [ dev = [ "pytest>=8.0", "pytest-cov>=5.0", + "pytest-asyncio>=0.23", + "httpx>=0.27", ] [project.scripts] diff --git a/src/atm/audit.py b/src/atm/audit.py index 710a4c7..19d12ba 100644 --- a/src/atm/audit.py +++ b/src/atm/audit.py @@ -32,13 +32,17 @@ class AuditLog: def close(self) -> None: with self._lock: - if self._fh is not None: - try: - self._fh.close() - except Exception: - pass - finally: - self._fh = None + self._close_locked() + + def _close_locked(self) -> None: + """Close file handle; must be called while holding self._lock.""" + if self._fh is not None: + try: + self._fh.close() + except Exception: + pass + finally: + self._fh = None @property def current_path(self) -> Path: @@ -48,7 +52,7 @@ class AuditLog: return self._base_dir / f"{self._current_date}.jsonl" def _open(self, today: date) -> None: - self.close() + self._close_locked() # already holding self._lock self._base_dir.mkdir(parents=True, exist_ok=True) path = self._base_dir / f"{today}.jsonl" self._fh = open(path, "a", buffering=1, encoding="utf-8") diff --git a/tests/test_main.py b/tests/test_main.py index 302cceb..39a380b 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,6 +1,7 @@ """Tests for atm.main unified CLI.""" from __future__ import annotations +import asyncio import os import subprocess import sys @@ -255,3 +256,135 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path): assert len(trigger) == 1 assert trigger[0].direction == "SELL" + + +# --------------------------------------------------------------------------- +# MUST-HAVE: async lifecycle integration test +# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops) +# Tests: scheduler starts on prime, stops on fire, fire alert sent. +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path): + import numpy as np + import atm.main as _main + from atm.detector import DetectionResult + + captured_alerts: list = [] + scheduler_events: list[str] = [] + + class FakeFanout: + def __init__(self, *a, **kw): pass + def send(self, alert): captured_alerts.append(alert) + def stop(self): pass + def stats(self): return {} + + class FakeCanaryResult: + distance = 0 + drifted = False + paused = False + + class FakeCanary: + def __init__(self, *a, **kw): self.is_paused = False + def check(self, frame): return FakeCanaryResult() + def resume(self): pass + + # Scheduler tracks start/stop calls + class FakeScheduler: + def __init__(self, *a, **kw): + self.is_running = False + self.interval_s = None + def start(self, interval_s): + self.is_running = True + self.interval_s = interval_s + scheduler_events.append(f"start:{interval_s}") + def stop(self): + self.is_running = False + scheduler_events.append("stop") + async def run(self): + await asyncio.sleep(9999) + + class FakePoller: + def __init__(self, *a, **kw): pass + async def run(self): await asyncio.sleep(9999) + + class _StopLoop(Exception): pass + + class ScriptedDetector: + # turquoise→ARM, dark_green→PRIME, light_green→FIRE + _script = [ + ("turquoise", True), + ("dark_green", True), + ("light_green", True), + ] + def __init__(self, *a, **kw): self._i = 0 + def step(self, ts, frame=None): + if self._i >= len(self._script): + raise _StopLoop + color, accepted = self._script[self._i] + self._i += 1 + return DetectionResult(ts=ts, window_found=True, dot_found=True, + rgb=(1, 1, 1), match=None, accepted=accepted, color=color) + @property + def rolling(self): return [] + + def fake_build_capture(cfg, capture_stub=False): + return lambda: np.zeros((50, 50, 3), dtype=np.uint8) + + cfg = MagicMock() + cfg.lockout_s = 60 + cfg.heartbeat_min = 999 + cfg.loop_interval_s = 0 + cfg.config_version = "test" + cfg.dead_letter_path = str(tmp_path / "dl.jsonl") + cfg.canary.drift_threshold = 10 + cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10 + cfg.chart_window_region = None + cfg.telegram.auto_poll_interval_s = 180 + cfg.telegram.bot_token = "tok" + cfg.telegram.chat_id = "123" + cfg.telegram.allowed_chat_ids = ("123",) + + fake_sched = FakeScheduler() + + monkeypatch.chdir(tmp_path) + + class _Stub: + def __init__(self, *a, **kw): pass + def log(self, *a, **kw): pass + def close(self, *a, **kw): pass + def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) + + monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) + monkeypatch.setattr("atm.canary.Canary", FakeCanary) + monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) + monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) + monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) + monkeypatch.setattr("atm.audit.AuditLog", _Stub) + monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) + monkeypatch.setattr("atm.main._build_capture", fake_build_capture) + monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller) + monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched) + + with pytest.raises(_StopLoop): + await _main.run_live_async(cfg, duration_s=None) + + arm_alerts = [a for a in captured_alerts if a.kind == "arm"] + prime_alerts = [a for a in captured_alerts if a.kind == "prime"] + trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"] + + assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}" + assert arm_alerts[0].direction == "BUY" + + assert len(prime_alerts) == 1 + assert prime_alerts[0].direction == "BUY" + + assert len(trigger_alerts) == 1 + assert trigger_alerts[0].direction == "BUY" + + # Scheduler must have started (on PRIME) and stopped (on FIRE) + assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}" + assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}" + start_idx = scheduler_events.index("start:180") + stop_idx = scheduler_events.index("stop") + assert start_idx < stop_idx, "scheduler started after it stopped"