"""Tests for atm.main unified CLI.""" from __future__ import annotations import asyncio import os import subprocess import sys import types from dataclasses import dataclass from pathlib import Path from unittest.mock import MagicMock import cv2 import numpy as np import pytest SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"] # Ensure subprocess invocations find the atm package even without pip install _SRC = str(Path(__file__).resolve().parent.parent / "src") _SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _mock_config_class(cfg=None): """Return a Config-like class whose load_current() returns *cfg*.""" if cfg is None: cfg = MagicMock() # window_title must be a real falsy value, not a MagicMock auto-attribute; # otherwise _cmd_run enters _focus_window_by_title and TypeErrors. cfg.window_title = None mock_cls = MagicMock() mock_cls.load_current.return_value = cfg return mock_cls # --------------------------------------------------------------------------- # test_help_works # --------------------------------------------------------------------------- def test_help_works(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) assert result.returncode == 0, result.stderr # --------------------------------------------------------------------------- # test_subcommands_listed # --------------------------------------------------------------------------- def test_subcommands_listed(): result = subprocess.run( [sys.executable, "-m", "atm", "--help"], capture_output=True, text=True, env=_SUBPROCESS_ENV, ) output = result.stdout for cmd in SUBCOMMANDS: assert cmd in output, f"Expected subcommand '{cmd}' in --help output" # --------------------------------------------------------------------------- # test_dryrun_wiring # --------------------------------------------------------------------------- @dataclass class _DryrunResult: acceptance_pass: bool def _make_dryrun_module(acceptance_pass: bool): mod = types.ModuleType("atm.dryrun") mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass) mod.print_report = lambda r: None return mod def test_dryrun_wiring_pass(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 0 def test_dryrun_wiring_fail(monkeypatch, tmp_path): import atm.main as _main monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False)) monkeypatch.setattr("atm.main.Config", _mock_config_class()) with pytest.raises(SystemExit) as exc_info: _main.main(["dryrun", str(tmp_path)]) assert exc_info.value.code == 1 # --------------------------------------------------------------------------- # test_report_current_week_default # --------------------------------------------------------------------------- def test_report_current_week_default(monkeypatch, tmp_path): import atm.main as _main # Journal.all returns no entries — report should print a zero-trade week monkeypatch.setattr("atm.journal.Journal.all", lambda self: []) # Should not raise; no sys.exit expected _main.main(["report", "--file", str(tmp_path / "trades.jsonl")]) # --------------------------------------------------------------------------- # test_run_live_dry # --------------------------------------------------------------------------- def test_run_live_dry(monkeypatch): import atm.main as _main calls: list[dict] = [] def _mock_run_live(cfg, duration_s=None, capture_stub=False): calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub}) monkeypatch.setattr("atm.main.run_live", _mock_run_live) monkeypatch.setattr("atm.main.Config", _mock_config_class()) _main.main(["run", "--duration", "0"]) assert len(calls) == 1 assert calls[0]["duration_s"] == pytest.approx(0.0) # --------------------------------------------------------------------------- # Regression integration test — user bug 2026-04-16. # Session starts with an accepted gray tick followed by dark_red. Catchup # synth-arm must fire on dark_red (the previous gray consumed first_accepted), # then light_red triggers SELL. Proves run_live wiring dispatches alerts for # the user's exact scenario. # --------------------------------------------------------------------------- def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured.append(alert) def stop(self): pass class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass class _StopLoop(Exception): pass class ScriptedDetector: _script = [ ("gray", True), ("gray", True), ("dark_red", True), ("dark_red", True), ("light_red", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult( ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color, ) def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0 cfg.dot_roi.y = 0 cfg.dot_roi.w = 10 cfg.dot_roi.h = 10 cfg.chart_window_region = None monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) class _StubPoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StubScheduler: def __init__(self, *a, **kw): self.is_running = False def start(self, interval_s): self.is_running = True def stop(self): self.is_running = False async def run(self): await asyncio.sleep(9999) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.main.time.sleep", lambda s: None) monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler) # Bootstrap a single chart so _run_multi_tick populates ctx.charts on tick 1. # Frame is zeros → real detect_strips returns [] → without this, charts stays # empty and the ScriptedDetector loop never advances (regression after the # multi-chart refactor made _run_multi_tick the single detection path). from atm.config import ROI _bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)] monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip) with pytest.raises(_StopLoop): _main.run_live(cfg, duration_s=None) arm = [a for a in captured if a.kind == "arm"] prime = [a for a in captured if a.kind == "prime"] trigger = [a for a in captured if a.kind == "trigger"] assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})" assert arm[0].direction == "SELL" assert "recuperare" in (arm[0].title + arm[0].body).lower() assert len(prime) == 1 assert prime[0].direction == "SELL" assert "recuperare" in (prime[0].title + prime[0].body).lower() assert len(trigger) == 1 assert trigger[0].direction == "SELL" # --------------------------------------------------------------------------- # MUST-HAVE: async lifecycle integration test # IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops) # Tests: scheduler starts on prime, stops on fire, fire alert sent. # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path): import numpy as np import atm.main as _main from atm.detector import DetectionResult captured_alerts: list = [] scheduler_events: list[str] = [] class FakeFanout: def __init__(self, *a, **kw): pass def send(self, alert): captured_alerts.append(alert) def stop(self): pass def stats(self): return {} class FakeCanaryResult: distance = 0 drifted = False paused = False class FakeCanary: def __init__(self, *a, **kw): self.is_paused = False def check(self, frame): return FakeCanaryResult() def resume(self): pass # Scheduler tracks start/stop calls class FakeScheduler: def __init__(self, *a, **kw): self.is_running = False self.interval_s = None def start(self, interval_s): self.is_running = True self.interval_s = interval_s scheduler_events.append(f"start:{interval_s}") def stop(self): self.is_running = False scheduler_events.append("stop") async def run(self): await asyncio.sleep(9999) class FakePoller: def __init__(self, *a, **kw): pass async def run(self): await asyncio.sleep(9999) class _StopLoop(Exception): pass class ScriptedDetector: # turquoise→ARM, dark_green→PRIME, light_green→FIRE _script = [ ("turquoise", True), ("dark_green", True), ("light_green", True), ] def __init__(self, *a, **kw): self._i = 0 def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i] self._i += 1 return DetectionResult(ts=ts, window_found=True, dot_found=True, rgb=(1, 1, 1), match=None, accepted=accepted, color=color) @property def rolling(self): return [] def fake_build_capture(cfg, capture_stub=False): return lambda: np.zeros((50, 50, 3), dtype=np.uint8) cfg = MagicMock() cfg.lockout_s = 60 cfg.heartbeat_min = 999 cfg.loop_interval_s = 0 cfg.config_version = "test" cfg.dead_letter_path = str(tmp_path / "dl.jsonl") cfg.canary.drift_threshold = 10 cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10 cfg.chart_window_region = None cfg.telegram.auto_poll_interval_s = 180 cfg.telegram.bot_token = "tok" cfg.telegram.chat_id = "123" cfg.telegram.allowed_chat_ids = ("123",) fake_sched = FakeScheduler() monkeypatch.chdir(tmp_path) class _Stub: def __init__(self, *a, **kw): pass def log(self, *a, **kw): pass def close(self, *a, **kw): pass def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub) monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub) monkeypatch.setattr("atm.audit.AuditLog", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched) # Bootstrap ctx.charts on tick 1 — see test_run_live_catchup_sell for context. from atm.config import ROI _bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)] monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip) with pytest.raises(_StopLoop): await _main.run_live_async(cfg, duration_s=None) arm_alerts = [a for a in captured_alerts if a.kind == "arm"] prime_alerts = [a for a in captured_alerts if a.kind == "prime"] trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"] assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}" assert arm_alerts[0].direction == "BUY" assert len(prime_alerts) == 1 assert prime_alerts[0].direction == "BUY" assert len(trigger_alerts) == 1 assert trigger_alerts[0].direction == "BUY" # Scheduler must have started (on PRIME) and stopped (on FIRE) assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}" assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}" start_idx = scheduler_events.index("start:180") stop_idx = scheduler_events.index("stop") assert start_idx < stop_idx, "scheduler started after it stopped" # --------------------------------------------------------------------------- # Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally, # even when canary is paused or when detection is otherwise skipped. # Prior bug: `continue` past the drain loop caused commands to pile up. # --------------------------------------------------------------------------- def _make_ctx_for_drain(cmd_queue, dispatched: list): """Build a minimal RunContext where _dispatch_command just records calls.""" import atm.main as _main class _FakeAudit: def __init__(self): self.events = [] def log(self, e): self.events.append(e) class _FakeNotifier: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _FakeCanary: def __init__(self, paused=True): self.is_paused = paused class _FakeScheduler: is_running = False interval_s = None def start(self, s): pass def stop(self): pass state = _main._LoopState(start=0.0) ctx = _main.RunContext( cfg=MagicMock(), capture=lambda: None, canary=_FakeCanary(paused=True), detector=MagicMock(), fsm=MagicMock(), notifier=_FakeNotifier(), audit=_FakeAudit(), detection_log=_FakeAudit(), scheduler=_FakeScheduler(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=cmd_queue, state=state, levels_extractor_factory=lambda *a, **kw: None, ) return ctx @pytest.mark.asyncio async def test_drain_works_when_canary_paused(monkeypatch): """Regression: when canary.is_paused, _drain_cmd_queue still dispatches. Prior bug: detection loop `continue`'d past the drain block whenever the tick returned res=None (canary paused). Commands accumulated forever. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) await _main._drain_cmd_queue(ctx) assert dispatched == ["status", "ss"] assert q.empty() @pytest.mark.asyncio async def test_drain_works_when_out_of_window(monkeypatch): """Drain must still fire when the tick skipped (e.g. out of operating hours). The refactored loop runs _drain_cmd_queue unconditionally after every tick, regardless of `_TickSyncResult` content. """ import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="stop")) dispatched: list = [] async def _fake_dispatch(ctx, cmd): dispatched.append(cmd.action) monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, dispatched) # Simulate out-of-window tick (empty _TickSyncResult, no res) await _main._handle_fsm_result(ctx, _main._TickSyncResult()) await _main._drain_cmd_queue(ctx) assert dispatched == ["stop"] @pytest.mark.asyncio async def test_drain_isolates_dispatch_exceptions(monkeypatch): """If one command raises, remaining commands still drain + warn alert sent.""" import atm.main as _main from atm.commands import Command q: asyncio.Queue = asyncio.Queue() await q.put(Command(action="status")) await q.put(Command(action="ss")) attempts: list = [] async def _fake_dispatch(ctx, cmd): attempts.append(cmd.action) if cmd.action == "status": raise RuntimeError("boom") monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) ctx = _make_ctx_for_drain(q, attempts) await _main._drain_cmd_queue(ctx) assert attempts == ["status", "ss"] # warn alert for the failed command warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"] assert any("status" in t for t in warn_titles) # command_error audit event errs = [e for e in ctx.audit.events if e.get("event") == "command_error"] assert len(errs) == 1 and errs[0]["action"] == "status" # --------------------------------------------------------------------------- # Commit 4: operating hours + LifecycleState transitions # --------------------------------------------------------------------------- from zoneinfo import ZoneInfo as _ZI # noqa: E402 import datetime as _dt # noqa: E402 def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"), start="09:30", stop="16:00", tz="America/New_York"): """Build a lightweight cfg-like object with operating_hours populated.""" oh = types.SimpleNamespace( enabled=enabled, timezone=tz, weekdays=weekdays, start_hhmm=start, stop_hhmm=stop, _tz_cache=_ZI(tz) if enabled else None, ) return types.SimpleNamespace(operating_hours=oh, window_title=None) def _fake_canary(paused=False): return types.SimpleNamespace(is_paused=paused) @pytest.mark.parametrize( "local_dt,expected", [ # Monday 09:30 NY — exact open → active (None) (_dt.datetime(2026, 4, 20, 9, 30), None), # Monday 16:00 NY — exact close → inactive (>= stop) (_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"), # Monday 08:00 NY — before open (_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"), # Monday 12:00 NY — active (_dt.datetime(2026, 4, 20, 12, 0), None), # Saturday 12:00 NY — weekend (_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"), # Sunday 23:00 NY — weekend (_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"), ], ) def test_operating_hours_skip_matrix(local_dt, expected): """Timezone-aware start/stop + weekday checks.""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache now_ts = local_dt.replace(tzinfo=tz).timestamp() lifecycle = _main.LifecycleState() result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary()) assert result == expected def test_market_open_close_transitions_logged_once(): """Crossing a boundary emits exactly one market_open / market_closed event.""" import atm.main as _main audit_events = [] alerts = [] class _A: def log(self, e): audit_events.append(e) class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState() canary = _fake_canary() # Prime as closed (before open, Monday 08:00) pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary) _main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N()) # First evaluation seeds state, no alert yet. assert lifecycle.last_window_state == "closed" assert alerts == [] assert audit_events == [] # Transition to open mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() skip_mid = _main._should_skip(mid, lifecycle, cfg, canary) _main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N()) assert lifecycle.last_window_state == "open" assert len(alerts) == 1 assert any(e.get("event") == "market_open" for e in audit_events) # Repeated open tick — no duplicate log alerts.clear() audit_events.clear() skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary) _main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N()) assert alerts == [] assert audit_events == [] # Transition to close close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp() skip_close = _main._should_skip(close, lifecycle, cfg, canary) _main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N()) assert lifecycle.last_window_state == "closed" assert any(e.get("event") == "market_closed" for e in audit_events) def test_market_transition_sends_notification(): """market_open / market_closed transitions produce kind=status alerts.""" import atm.main as _main alerts = [] class _A: def log(self, e): pass class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState(last_window_state="closed") mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() _main._maybe_log_transition(None, lifecycle, mid, _A(), _N()) assert len(alerts) == 1 assert alerts[0].kind == "status" assert "piața" in alerts[0].title.lower() or "monitorizare" in alerts[0].body.lower() def test_startup_in_window_suppresses_market_open(): """R2 #20: first evaluation in-window just seeds state; no alert fires.""" import atm.main as _main alerts = [] events = [] class _A: def log(self, e): events.append(e) class _N: def send(self, a): alerts.append(a) cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState() # last_window_state is None in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary()) assert skip is None _main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N()) # Seeded silently assert lifecycle.last_window_state == "open" assert alerts == [] assert not any(e.get("event") == "market_open" for e in events) # Two more ticks, still in-window → no spurious alert for _ in range(2): skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary()) _main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N()) assert alerts == [] def test_operating_hours_weekday_locale_independent(): """R2 #22: weekday check must not depend on process locale (strftime('%a')).""" import locale as _locale import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache # Saturday 12:00 NY sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() original = _locale.setlocale(_locale.LC_TIME) try: for loc in ("C", "de_DE.UTF-8"): try: _locale.setlocale(_locale.LC_TIME, loc) except _locale.Error: continue # locale not installed → skip gracefully lifecycle = _main.LifecycleState() result = _main._should_skip(sat, lifecycle, cfg, _fake_canary()) assert result == "out_of_window_weekend", ( f"locale={loc} returned {result!r}" ) finally: try: _locale.setlocale(_locale.LC_TIME, original) except _locale.Error: _locale.setlocale(_locale.LC_TIME, "C") def test_should_skip_user_paused_wins(): import atm.main as _main cfg = _oh_cfg() lifecycle = _main.LifecycleState(user_paused=True) # Mid-Monday (in-window) — should still skip because user_paused tz = cfg.operating_hours._tz_cache mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused" def test_should_skip_canary_drift_wins_over_window(): import atm.main as _main cfg = _oh_cfg() lifecycle = _main.LifecycleState() tz = cfg.operating_hours._tz_cache mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused" # --------------------------------------------------------------------------- # Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21) # --------------------------------------------------------------------------- def _dispatch_ctx(canary=None, lifecycle=None, cfg=None): """Minimal RunContext for _dispatch_command unit tests.""" import atm.main as _main class _A: def __init__(self): self.events = [] def log(self, e): self.events.append(e) class _N: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _S: is_running = False interval_s = None def start(self, s): self.is_running = True def stop(self): self.is_running = False if canary is None: canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) if lifecycle is None: lifecycle = _main.LifecycleState() if cfg is None: cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) # Skip window focus in dispatch tests — MagicMock window_title would # propagate into _focus_window_by_title (real Win32 call). cfg.window_title = None state = _main._LoopState(start=0.0) ctx = _main.RunContext( cfg=cfg, capture=lambda: None, canary=canary, detector=MagicMock(), fsm=MagicMock(), notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=MagicMock(), state=state, levels_extractor_factory=lambda *a, **kw: None, lifecycle=lifecycle, ) return ctx @pytest.mark.asyncio async def test_pause_command_sets_user_paused_and_skips_detection(): import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() await _main._dispatch_command(ctx, Command(action="pause")) assert ctx.lifecycle.user_paused is True # When combined with _should_skip, we get user_paused assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused" # Audit + notif assert any(e.get("event") == "user_paused" for e in ctx.audit.events) assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts) @pytest.mark.asyncio async def test_resume_clears_user_paused_and_canary_when_forced(): import atm.main as _main from atm.commands import Command canary_state = {"paused": True} canary = types.SimpleNamespace( is_paused=True, resume=lambda: canary_state.__setitem__("paused", False), ) # Re-bind is_paused via property so resume() effect is visible class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert ctx.lifecycle.user_paused is False assert canary.is_paused is False force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert force_events and force_events[0]["force"] is True @pytest.mark.asyncio async def test_resume_plain_also_clears_canary_drift(): """2026-04-21 decision: /resume (no arg) now clears BOTH user_paused and canary drift. /resume force remains accepted as legacy alias. Previous behavior (force required for drift) was a UX trap — see plan doc.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume")) # no force assert ctx.lifecycle.user_paused is False assert canary.is_paused is False # drift cleared without force # Audit event still records was_drift + force=False for traceability resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert resumed and resumed[0]["was_drift"] is True and resumed[0]["force"] is False # Message mentions drift-pause was cleared (kind is "screenshot" now since /resume attaches image) alerts = ctx.notifier.alerts assert alerts and ("drift" in (alerts[0].title + alerts[0].body).lower()) @pytest.mark.asyncio async def test_resume_force_alias_still_works(): """/resume force (value=1) remains accepted — legacy muscle memory.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert ctx.lifecycle.user_paused is False assert canary.is_paused is False resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert resumed and resumed[0]["force"] is True # --------------------------------------------------------------------------- # /rebase: propose + confirm flow # --------------------------------------------------------------------------- def _rebase_cfg_and_path(tmp_path): """Write a minimal TOML with baseline_phash, return (cfg_live, path, cfg_version).""" name = "rebase_test" path = tmp_path / "configs" / f"{name}.toml" path.parent.mkdir(parents=True, exist_ok=True) old = "a" * 64 path.write_text( f'[canary]\n' f'baseline_phash = "{old}" # comment stays\n' f'drift_threshold = 8\n' f'[canary.roi]\nx = 0\ny = 0\nw = 4\nh = 4\n', encoding="utf-8", ) cfg = types.SimpleNamespace( window_title=None, config_version=name, canary=types.SimpleNamespace( roi=types.SimpleNamespace(x=0, y=0, w=4, h=4), baseline_phash=old, drift_threshold=8, ), telegram=types.SimpleNamespace(auto_poll_interval_s=180), operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None), ) return cfg, path, old def _blue_frame(h=200, w=200): import numpy as _np return _np.full((h, w, 3), (50, 80, 20), dtype=_np.uint8) def test_rewrite_baseline_phash_updates_only_target_line(tmp_path): import atm.main as _main p = tmp_path / "cfg.toml" p.write_text( '[canary]\n' 'baseline_phash = "deadbeef" # keep this comment\n' 'drift_threshold = 8\n', encoding="utf-8", ) old = _main._rewrite_baseline_phash(p, "cafef00d") assert old == "deadbeef" txt = p.read_text(encoding="utf-8") assert 'baseline_phash = "cafef00d" # keep this comment' in txt assert "drift_threshold = 8" in txt def test_rewrite_baseline_phash_raises_when_missing(tmp_path): import atm.main as _main p = tmp_path / "cfg.toml" p.write_text("[canary]\ndrift_threshold = 8\n", encoding="utf-8") with pytest.raises(ValueError): _main._rewrite_baseline_phash(p, "ff") @pytest.mark.asyncio async def test_rebase_propose_sets_pending_and_does_not_touch_file(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.capture = lambda: _blue_frame() ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) assert ctx.pending_rebase is not None proposed_ts, new_phash, pending_path = ctx.pending_rebase assert new_phash != old assert pending_path == Path("configs") / f"{cfg.config_version}.toml" # File unchanged at propose time assert f'"{old}"' in cfg_path.read_text(encoding="utf-8") events = [e["event"] for e in ctx.audit.events] assert "rebase_proposed" in events @pytest.mark.asyncio async def test_rebase_confirm_applies_and_clears_pauses(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) canary = _Canary() ctx = _dispatch_ctx(cfg=cfg, canary=canary) ctx.capture = lambda: _blue_frame() ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() ctx.lifecycle.user_paused = True monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) assert ctx.pending_rebase is not None _, new_phash, _ = ctx.pending_rebase await _main._dispatch_command(ctx, Command(action="rebase", value=1)) assert ctx.pending_rebase is None assert ctx.lifecycle.user_paused is False assert canary.is_paused is False assert cfg.canary.baseline_phash == new_phash txt = cfg_path.read_text(encoding="utf-8") assert f'"{new_phash}"' in txt assert f'"{old}"' not in txt applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] assert applied and applied[0]["old_phash"] == old and applied[0]["new_phash"] == new_phash @pytest.mark.asyncio async def test_rebase_confirm_without_pending_warns(tmp_path): import atm.main as _main from atm.commands import Command cfg, _, _ = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() await _main._dispatch_command(ctx, Command(action="rebase", value=1)) assert ctx.pending_rebase is None alerts = ctx.notifier.alerts assert alerts and "nimic" in alerts[0].title.lower() applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] assert not applied @pytest.mark.asyncio async def test_rebase_confirm_expired_pending_warns(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() # Pretend propose happened 1h ago import time as _t ctx.pending_rebase = (_t.time() - 3600, "b" * 64, cfg_path) await _main._dispatch_command(ctx, Command(action="rebase", value=1)) assert ctx.pending_rebase is None # File untouched assert f'"{old}"' in cfg_path.read_text(encoding="utf-8") alerts = ctx.notifier.alerts assert alerts and "expirat" in alerts[0].title.lower() @pytest.mark.asyncio async def test_rebase_confirm_mutates_frozen_canary_region(tmp_path, monkeypatch): """Regression: CanaryRegion is @dataclass(frozen=True). Plain assignment raises FrozenInstanceError. /rebase confirm must mirror the new phash into the live cfg anyway — otherwise canary.check() keeps the stale hash and drift re-pauses within one tick after the user confirms.""" import atm.main as _main from atm.commands import Command from atm.config import CanaryRegion, ROI as _ROI class _Canary: def __init__(self): self._p = False @property def is_paused(self): return self._p def resume(self): self._p = False # Build cfg with the REAL frozen dataclass, not a SimpleNamespace. _, cfg_path, old = _rebase_cfg_and_path(tmp_path) cfg = types.SimpleNamespace( window_title=None, config_version="rebase_test", canary=CanaryRegion( roi=_ROI(x=0, y=0, w=4, h=4), baseline_phash=old, drift_threshold=8, ), telegram=types.SimpleNamespace(auto_poll_interval_s=180), operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None), ) ctx = _dispatch_ctx(cfg=cfg, canary=_Canary()) ctx.capture = lambda: _blue_frame() ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) _, new_phash, _ = ctx.pending_rebase await _main._dispatch_command(ctx, Command(action="rebase", value=1)) # Live cfg mirrors the new hash (in-memory canary.check() sees it) assert cfg.canary.baseline_phash == new_phash # TOML on disk also updated assert f'"{new_phash}"' in cfg_path.read_text(encoding="utf-8") applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] assert applied @pytest.mark.asyncio async def test_rebase_propose_capture_failed_warns(tmp_path, monkeypatch): import atm.main as _main from atm.commands import Command cfg, _, _ = _rebase_cfg_and_path(tmp_path) ctx = _dispatch_ctx(cfg=cfg) ctx.capture = lambda: None ctx.fires_dir = tmp_path / "fires" ctx.fires_dir.mkdir() monkeypatch.chdir(tmp_path) await _main._dispatch_command(ctx, Command(action="rebase")) assert ctx.pending_rebase is None alerts = ctx.notifier.alerts assert alerts and "e" in alerts[0].title.lower() # "Captură eșuată" @pytest.mark.asyncio async def test_resume_out_of_window_responds_with_pending_message(): """/resume while operating-hours window is closed → special body.""" import atm.main as _main from atm.commands import Command cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed") canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg) # Pin time to Saturday import atm.main as _mm real_time = _mm.time fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() class _FakeTime: def time(self): return fake_ts def monotonic(self): return 0.0 _mm.time = _FakeTime() try: await _main._dispatch_command(ctx, Command(action="resume")) finally: _mm.time = real_time assert ctx.lifecycle.user_paused is False alerts = ctx.notifier.alerts assert alerts combined = (alerts[0].title + alerts[0].body).lower() assert "închis" in combined or "piața" in combined or "ferestr" in combined @pytest.mark.asyncio async def test_dispatch_resume_sends_inline_screenshot(monkeypatch, tmp_path): """/resume produces a single Alert with image_path + FSM pick caption when capture succeeds.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True ctx.capture = lambda: object() # non-None frame ctx.fires_dir = tmp_path ctx.cfg.window_title = None _fake_detections = [{ "idx": 0, "name": "light_green", "rgb": (0, 255, 0), "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), }] monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "fake_resume.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="resume")) # Exactly one alert, with image attached + caption in body. alerts = ctx.notifier.alerts assert len(alerts) == 1 alert = alerts[0] assert alert.image_path == tmp_path / "fake_resume.png" assert "Monitorizare reluată" in alert.title assert "← pick" in alert.body assert "captură eșuată" not in alert.title @pytest.mark.asyncio async def test_dispatch_resume_capture_failed_still_resumes(monkeypatch, tmp_path): """/resume with capture=None → Alert title contains capture-failed, no image, resume still executes.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True ctx.capture = lambda: None # capture fails ctx.fires_dir = tmp_path ctx.cfg.window_title = None await _main._dispatch_command(ctx, Command(action="resume")) # State still cleared despite capture failure. assert ctx.lifecycle.user_paused is False assert canary.is_paused is False alerts = ctx.notifier.alerts assert len(alerts) == 1 assert alerts[0].image_path is None assert "captură eșuată" in alerts[0].title assert "Monitorizare reluată" in alerts[0].title @pytest.mark.asyncio async def test_dispatch_resume_captures_before_state_clear(monkeypatch, tmp_path): """Capture must run BEFORE clearing user_paused / canary.resume() to avoid race with FSM tick.""" import atm.main as _main from atm.commands import Command class _Canary: def __init__(self): self._p = True self.resumed_at: float | None = None @property def is_paused(self): return self._p def resume(self): self._p = False self.resumed_at = _capture_sequence[0] if _capture_sequence else 0 canary = _Canary() ctx = _dispatch_ctx(canary=canary) ctx.lifecycle.user_paused = True ctx.fires_dir = tmp_path ctx.cfg.window_title = None _capture_sequence: list[int] = [] _capture_called = [0] def _capture(): _capture_called[0] += 1 # State must still be paused at capture time. assert ctx.lifecycle.user_paused is True, "capture ran AFTER user_paused was cleared" assert canary.is_paused is True, "capture ran AFTER canary.resume()" _capture_sequence.append(_capture_called[0]) return object() ctx.capture = _capture monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "ok.png", [])) await _main._dispatch_command(ctx, Command(action="resume")) assert _capture_called[0] == 1 assert ctx.lifecycle.user_paused is False # cleared after capture @pytest.mark.asyncio async def test_ss_and_fire_agree_on_rightmost_dot(tmp_path): """Parity: _save_inspect_frame's detections[0].pos_abs must match find_rightmost_dot output on the same frame + ROI. Prevents silent drift between /ss verify and fire path.""" import atm.main as _main from atm.vision import find_rightmost_dot, crop_roi from atm.config import ROI, ColorSpec, YAxisCalib # Synthetic frame with one bright green dot. frame = np.zeros((100, 200, 3), dtype=np.uint8) frame[:, :] = (18, 18, 18) # BGR background matching the palette entry below cv2.circle(frame, (150, 50), 5, (0, 255, 0), -1) cfg = types.SimpleNamespace( dot_roi=ROI(x=10, y=10, w=180, h=80), colors={ "background": ColorSpec(rgb=(18, 18, 18), tolerance=15.0), "light_green": ColorSpec(rgb=(0, 255, 0), tolerance=60.0), }, y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=90, p2_price=50.0), version="parity-test", ) dot_crop = crop_roi(frame, cfg.dot_roi) fire_pos = find_rightmost_dot(dot_crop, cfg.colors["background"].rgb) assert fire_pos is not None fire_abs = (cfg.dot_roi.x + fire_pos[0], cfg.dot_roi.y + fire_pos[1]) path, detections = _main._save_inspect_frame(frame, cfg, tmp_path, now=123.0) assert path is not None assert detections, "inspect should detect the green dot" inspect_abs = detections[0]["pos_abs"] assert inspect_abs == fire_abs, ( f"Parity break: fire={fire_abs} inspect={inspect_abs} — " "fire path and /ss would show different rightmost positions." ) @pytest.mark.asyncio async def test_status_command_reports_pause_reason(): """/status body must mention pause reason + window state.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.lifecycle.user_paused = True # Stub detector.rolling for status ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status body = status[0].body assert "pauză manuală" in body or "pauza" in body.lower() @pytest.mark.asyncio async def test_set_interval_refused_while_canary_paused(): """2026-04-21: /set_interval must not start scheduler while canary is drift-paused. Previously started scheduler silently, misleading the user into thinking detection was live. Now emits a warn and refuses.""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=True, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) await _main._dispatch_command(ctx, Command(action="set_interval", value=60)) # Scheduler must NOT have started assert ctx.scheduler.is_running is False # No scheduler_started audit event assert not any(e.get("event") == "scheduler_started" for e in ctx.audit.events) # A warn alert must have been sent referencing /resume warns = [a for a in ctx.notifier.alerts if a.kind == "warn"] assert warns combined = (warns[0].title + warns[0].body).lower() assert "drift" in combined and "/resume" in combined @pytest.mark.asyncio async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path): """2026-04-21: /ss still captures while canary is paused, but the alert body must warn that detection is off.""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=True, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) # Bypass window focus + use a simple non-None capture result fake_frame = object() ctx = types.SimpleNamespace(**{**ctx.__dict__}) # shallow copy RunContext fields # Simpler: just override the capture and save functions used async_capture_called = {"n": 0} def _capture(): async_capture_called["n"] += 1 return fake_frame ctx.capture = _capture ctx.fires_dir = tmp_path # window_title off so we skip focus branch ctx.cfg.window_title = None # stub _save_inspect_frame to return (path, detections) _fake_detections = [{ "idx": 0, "name": "light_green", "rgb": (0, 255, 0), "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), }] monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="ss")) screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"] assert screenshots assert "DETECȚIE OPRITĂ" in screenshots[0].body or "drift" in screenshots[0].body.lower() assert "/resume" in screenshots[0].body # Caption with FSM pick must appear alongside the warn. assert "← pick" in screenshots[0].body @pytest.mark.asyncio async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path): """/ss body contains caption only when canary is not paused (no warn prefix).""" import atm.main as _main from atm.commands import Command canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) ctx = _dispatch_ctx(canary=canary) ctx.capture = lambda: object() ctx.fires_dir = tmp_path ctx.cfg.window_title = None _fake_detections = [{ "idx": 0, "name": "light_green", "rgb": (0, 255, 0), "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), }] monkeypatch.setattr(_main, "_save_inspect_frame", lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="ss")) screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"] assert screenshots # Body should contain the caption (no warn), not be empty. assert "← pick" in screenshots[0].body assert "DETECȚIE OPRITĂ" not in screenshots[0].body @pytest.mark.asyncio async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path): """E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert. This test verifies the full command-driven lifecycle in isolation: - canary starts drift-paused, _should_skip returns drift_paused - /resume force clears canary + user_paused - subsequent detection produces SELL fire through normal FSM path """ import atm.main as _main from atm.commands import Command # Canary with mutable pause state class _Canary: def __init__(self): self._p = True @property def is_paused(self): return self._p def resume(self): self._p = False canary = _Canary() cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) cfg.window_title = None # skip Win32 focus path in unit test ctx = _dispatch_ctx(canary=canary, cfg=cfg) # 1. While drift-paused, _should_skip returns drift_paused assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused" # 2. User issues /resume force await _main._dispatch_command(ctx, Command(action="resume", value=1)) assert canary.is_paused is False assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None # 3. Feed a yellow→light_red sequence through _handle_tick (FSM path) from atm.state_machine import StateMachine, State fsm = StateMachine(lockout_s=60) class _N: def __init__(self): self.alerts = [] def send(self, a): self.alerts.append(a) class _A: def log(self, _e): pass notif = _N() audit = _A() cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True)) _main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock) _main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock) tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock) # FSM reached fire via normal path assert tr is not None and tr.trigger == "SELL" assert fsm.state == State.IDLE # --------------------------------------------------------------------------- # _in_trading_window tests # --------------------------------------------------------------------------- import datetime as _dt def test_in_trading_window_disabled(): """Returns True when operating_hours disabled.""" import atm.main as _main cfg = types.SimpleNamespace(operating_hours=types.SimpleNamespace(enabled=False)) assert _main._in_trading_window(0.0, cfg) is True def test_in_trading_window_no_oh(): """Returns True when cfg has no operating_hours attr.""" import atm.main as _main cfg = types.SimpleNamespace() assert _main._in_trading_window(0.0, cfg) is True def test_in_trading_window_in_window(): """Returns True during configured hours (Monday 12:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is True def test_in_trading_window_out_of_hours(): """Returns False before market open (Monday 08:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is False def test_in_trading_window_weekend(): """Returns False on weekend (Sunday 12:00 NY).""" import atm.main as _main cfg = _oh_cfg() tz = cfg.operating_hours._tz_cache ts = _dt.datetime(2026, 4, 19, 12, 0, tzinfo=tz).timestamp() assert _main._in_trading_window(ts, cfg) is False def test_heartbeat_suppressed_outside_hours(monkeypatch): """_in_trading_window=False prevents heartbeat from sending.""" import atm.main as _main monkeypatch.setattr(_main, "_in_trading_window", lambda ts, cfg: False) sent = [] # Simulate the check: outside window → no send if not _main._in_trading_window(0.0, None): pass # heartbeat_due reset, continue — no send assert sent == [] def test_build_heartbeat_alert_active_when_not_paused(): """Healthy state → title=activ, body shows fsm.state plainly.""" import atm.main as _main a = _main._build_heartbeat_alert( fsm_state="ARMED_BUY", fire_count=2, uptime_h=1.5, canary_paused=False, ) assert a.kind == "heartbeat" assert a.title == "activ" assert "ARMED_BUY" in a.body assert "[drift-pause]" not in a.body assert "semnale: 2" in a.body def test_build_heartbeat_alert_paused_when_canary_drift(): """2026-04-21: heartbeat must reflect canary drift instead of lying with 'activ'.""" import atm.main as _main a = _main._build_heartbeat_alert( fsm_state="ARMED_SELL", fire_count=0, uptime_h=3.2, canary_paused=True, ) assert a.kind == "heartbeat" assert "pauzat" in a.title assert "drift" in a.title.lower() assert "ARMED_SELL" in a.body assert "[drift-pause]" in a.body @pytest.mark.asyncio async def test_status_compact_active(): """/status produces compact 2-line format; 'Canary' absent.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="ARMED_BUY") ctx.state.fire_count = 3 await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status body = status[0].body lines = body.splitlines() assert len(lines) == 2 # no fereastră line (oh.enabled=False) assert "ARMED_BUY" in lines[0] assert "semnale: 3" in lines[0] assert "Canary" not in body assert "canary" not in body @pytest.mark.asyncio async def test_status_compact_paused_manual(): """/status shows 'pauză manuală' on line 1 when user_paused.""" import atm.main as _main from atm.commands import Command ctx = _dispatch_ctx() ctx.lifecycle.user_paused = True ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] body = status[0].body assert body.startswith("pauză manuală") @pytest.mark.asyncio async def test_status_window_line_when_oh_enabled(): """/status adds fereastră line when operating_hours enabled.""" import atm.main as _main from atm.commands import Command cfg = _oh_cfg() lifecycle = _main.LifecycleState(last_window_state="open") ctx = _dispatch_ctx(lifecycle=lifecycle, cfg=cfg) ctx.detector.rolling = [] ctx.fsm.state = types.SimpleNamespace(value="IDLE") await _main._dispatch_command(ctx, Command(action="status")) status = [a for a in ctx.notifier.alerts if a.kind == "status"] body = status[0].body assert "fereastră: deschisă" in body # --------------------------------------------------------------------------- # Multi-chart split workspace tests # --------------------------------------------------------------------------- def test_chart_state_defaults(): """ChartState's first_accepted defaults to True; last_saved_color to None.""" import atm.main as _main from atm.config import ROI roi = ROI(x=0, y=0, w=200, h=35) chart = _main.ChartState( chart_id="", sub_roi=roi, detector=MagicMock(), fsm=MagicMock(), ) assert chart.first_accepted is True assert chart.last_saved_color is None assert chart.chart_id == "" assert chart.sub_roi is roi def test_alert_prefix_import(): """_alert_prefix lives in atm.notifier and produces the expected prefixes.""" from atm.notifier import _alert_prefix assert _alert_prefix("") == "" assert _alert_prefix("left") == "[stânga] " assert _alert_prefix("right") == "[dreapta] " assert _alert_prefix("chart_0") == "[chart 1] " assert _alert_prefix("chart_2") == "[chart 3] " def test_save_annotated_frame_no_price_overlay(tmp_path): """_save_annotated_frame must NOT draw the price overlay anymore. Top-right area must contain no white pixels (text was rendered there before). """ import atm.main as _main from atm.config import ROI, YAxisCalib, ColorSpec frame = np.zeros((200, 400, 3), dtype=np.uint8) cfg = types.SimpleNamespace( dot_roi=ROI(x=10, y=10, w=380, h=180), y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=190, p2_price=50.0), colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)}, ) fpath = _main._save_annotated_frame( frame, cfg, tmp_path, "test", now=123.0, dot_pos_abs=(200, 100), canary_ok=True, ) assert fpath is not None saved = cv2.imread(str(fpath)) assert saved is not None # Top-right corner where the "$..." text used to live (rows 0..40, cols 300..390). # The cyan ROI rect is on top edge but only 2px thick → vast majority of that # corner is still pure black. White text would average a much higher value. corner = saved[0:40, 300:390] # No bright white pixels (text was 255,255,255). white_pixels = ((corner[:, :, 0] > 200) & (corner[:, :, 1] > 200) & (corner[:, :, 2] > 200)).sum() assert white_pixels == 0, f"expected 0 white pixels (no price text), got {white_pixels}" def test_save_annotated_frame_uses_roi_param(tmp_path): """When roi= is passed, the cyan rect is drawn there (not at cfg.dot_roi).""" import atm.main as _main from atm.config import ROI, ColorSpec frame = np.zeros((200, 400, 3), dtype=np.uint8) cfg = types.SimpleNamespace( dot_roi=ROI(x=0, y=0, w=10, h=10), # tiny cfg ROI colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)}, ) custom_roi = ROI(x=100, y=50, w=200, h=80) fpath = _main._save_annotated_frame( frame, cfg, tmp_path, "rtest", now=123.0, roi=custom_roi, ) assert fpath is not None saved = cv2.imread(str(fpath)) # ROI rect color is (0, 255, 255) BGR. Find any pixel where G+R are saturated and B=0. edge = saved[49:52, 100:300] rect_present = ((edge[:, :, 0] < 50) & (edge[:, :, 1] > 200) & (edge[:, :, 2] > 200)).any() assert rect_present, "Expected ROI rect along custom roi top edge" # Also check the cfg.dot_roi rect (x=0..10, y=0..10) is NOT drawn — proves we used roi=custom. cfg_corner = saved[0:10, 0:10] rect_at_cfg = ((cfg_corner[:, :, 0] < 50) & (cfg_corner[:, :, 1] > 200) & (cfg_corner[:, :, 2] > 200)).any() assert not rect_at_cfg, "cfg.dot_roi rect must not appear when roi= override is used" def test_commit_layout_change_resets_fsm(monkeypatch): """_commit_layout_change rebuilds ctx.charts, resets FSM, zeroes n_primed_global.""" import atm.main as _main from atm.config import ROI from atm.state_machine import StateMachine, State # Prebuild a ctx with one chart in ARMED_BUY. cfg = MagicMock() cfg.lockout_s = 60 cfg.colors = {} cfg.debounce_depth = 3 fsm_old = StateMachine(lockout_s=60) fsm_old.feed("turquoise", 1.0) # IDLE -> ARMED_BUY assert fsm_old.state == State.ARMED_BUY initial_chart = _main.ChartState( chart_id="", sub_roi=ROI(x=0, y=0, w=400, h=35), detector=MagicMock(), fsm=fsm_old, ) state = _main._LoopState() state.n_primed_global = 5 # any non-zero value notifier_alerts: list = [] audit_events: list = [] class _N: def send(self, a): notifier_alerts.append(a) class _A: def log(self, e): audit_events.append(e) class _S: is_running = True def stop(self): type(self).is_running = False ctx = _main.RunContext( cfg=cfg, capture=lambda: None, canary=MagicMock(), detector=MagicMock(), fsm=fsm_old, notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=MagicMock(), state=state, levels_extractor_factory=lambda *a, **kw: None, ) ctx.charts = [initial_chart] new_strips = [ ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35), ] _main._commit_layout_change(ctx, new_strips, now=100.0) assert len(ctx.charts) == 2 assert ctx.charts[0].chart_id == "left" assert ctx.charts[1].chart_id == "right" # New FSMs must be IDLE (fresh StateMachine) assert ctx.charts[0].fsm.state == State.IDLE assert ctx.charts[1].fsm.state == State.IDLE # Old fsm not reused assert ctx.charts[0].fsm is not fsm_old # n_primed_global reset assert state.n_primed_global == 0 # layout_change audit + status alert assert any(e.get("event") == "layout_change" and e["new_n"] == 2 for e in audit_events) assert any("Layout TS schimbat" in a.title for a in notifier_alerts) def test_commit_layout_change_chart_ids_for_n3(monkeypatch): """For n>=3 charts use 'chart_0', 'chart_1', ...""" import atm.main as _main from atm.config import ROI cfg = MagicMock() cfg.lockout_s = 60 cfg.colors = {} cfg.debounce_depth = 3 state = _main._LoopState() class _N: def send(self, a): pass class _A: def log(self, e): pass class _S: is_running = False def stop(self): pass ctx = _main.RunContext( cfg=cfg, capture=lambda: None, canary=MagicMock(), detector=MagicMock(), fsm=MagicMock(), notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=MagicMock(), state=state, levels_extractor_factory=lambda *a, **kw: None, ) ctx.charts = [] strips = [ROI(x=i * 100, y=0, w=80, h=35) for i in range(3)] _main._commit_layout_change(ctx, strips, now=100.0) assert [c.chart_id for c in ctx.charts] == ["chart_0", "chart_1", "chart_2"] def test_strips_match_silent_update_jitter(): """_strips_match returns True for <=10px jitter — used to trigger silent update.""" from atm.layout import _strips_match from atm.config import ROI a = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)] b = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)] assert _strips_match(a, b, tol=10) is True # Drift 12px breaks the match c = [ROI(x=12, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)] assert _strips_match(a, c, tol=10) is False def test_build_heartbeat_alert_single_compat(): """Without charts arg, _build_heartbeat_alert returns the legacy single-chart body.""" import atm.main as _main a = _main._build_heartbeat_alert( fsm_state="IDLE", fire_count=0, uptime_h=1.0, canary_paused=False, ) assert a.kind == "heartbeat" assert a.title == "activ" assert a.body == "IDLE | semnale: 0 | 1.0h" def test_build_heartbeat_alert_multi_chart(): """charts=[left,right] body lines reference [stânga] / [dreapta].""" import atm.main as _main from atm.config import ROI fsm_left = types.SimpleNamespace(state=types.SimpleNamespace(value="ARMED_BUY")) fsm_right = types.SimpleNamespace(state=types.SimpleNamespace(value="IDLE")) cs_left = _main.ChartState( chart_id="left", sub_roi=ROI(x=0, y=0, w=10, h=10), detector=MagicMock(), fsm=fsm_left, ) cs_right = _main.ChartState( chart_id="right", sub_roi=ROI(x=0, y=0, w=10, h=10), detector=MagicMock(), fsm=fsm_right, ) a = _main._build_heartbeat_alert( fsm_state="ignored", fire_count=2, uptime_h=1.5, canary_paused=False, charts=[cs_left, cs_right], ) assert a.kind == "heartbeat" assert "[stânga]" in a.body assert "[dreapta]" in a.body assert "ARMED_BUY" in a.body assert "IDLE" in a.body assert "semnale: 2" in a.body @pytest.mark.asyncio async def test_run_multi_tick_silent_jitter_update(monkeypatch, tmp_path): """When detect_strips returns positions with <=10px jitter, sub_roi updates silently (no _commit_layout_change → no extra status alert).""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) initial = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)] jittered = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)] class _Det: def __init__(self): self.last_roi = None def step(self, ts, frame=None): from atm.detector import DetectionResult return DetectionResult( ts=ts, window_found=True, dot_found=False, rgb=None, match=None, accepted=False, color=None, ) def update_dot_roi(self, roi): self.last_roi = roi class _FSM: state = types.SimpleNamespace(value="IDLE") _last_fire: dict = {} cfg = MagicMock() cfg.lockout_s = 60 cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False) cfg.window_title = None state_obj = _main._LoopState() notifier_alerts: list = [] audit_events: list = [] class _N: def send(self, a): notifier_alerts.append(a) class _A: def log(self, e): audit_events.append(e) class _S: is_running = False def start(self, s): pass def stop(self): pass canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False)) ctx = _main.RunContext( cfg=cfg, capture=lambda: np.zeros((200, 600, 3), dtype=np.uint8), canary=canary, detector=MagicMock(), fsm=_FSM(), notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path, cmd_queue=MagicMock(), state=state_obj, levels_extractor_factory=lambda *a, **kw: None, lifecycle=None, ) det_l, det_r = _Det(), _Det() ctx.charts = [ _main.ChartState("left", initial[0], det_l, _FSM()), _main.ChartState("right", initial[1], det_r, _FSM()), ] # Stub strip detection to return jittered strips. monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: jittered) results = await _main._run_multi_tick(ctx) assert len(results) == 2 # Sub-ROI silently updated, but no layout_change event. assert ctx.charts[0].sub_roi == jittered[0] assert ctx.charts[1].sub_roi == jittered[1] assert det_l.last_roi == jittered[0] assert det_r.last_roi == jittered[1] assert not any(e.get("event") == "layout_change" for e in audit_events) # No status "Layout TS schimbat" alert. assert not any("Layout TS schimbat" in a.title for a in notifier_alerts) @pytest.mark.asyncio async def test_run_multi_tick_silent_width_oscillation(monkeypatch, tmp_path): """Regression: width pulsations (e.g. 792↔810px) on a stable chart count must NOT trigger _commit_layout_change. Reproduces the production bug visible in logs/2026-05-04.jsonl (576 layout_change events/day). """ import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) initial = [ROI(x=0, y=0, w=792, h=35), ROI(x=912, y=0, w=845, h=35)] pulsed = [ROI(x=0, y=0, w=810, h=35), ROI(x=912, y=0, w=863, h=35)] class _Det: def __init__(self): self.last_roi = None def step(self, ts, frame=None): from atm.detector import DetectionResult return DetectionResult( ts=ts, window_found=True, dot_found=False, rgb=None, match=None, accepted=False, color=None, ) def update_dot_roi(self, roi): self.last_roi = roi class _FSM: state = types.SimpleNamespace(value="PRIMED_BUY") _last_fire: dict = {} cfg = MagicMock() cfg.lockout_s = 60 cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False) cfg.window_title = None state_obj = _main._LoopState() notifier_alerts: list = [] audit_events: list = [] class _N: def send(self, a): notifier_alerts.append(a) class _A: def log(self, e): audit_events.append(e) class _S: is_running = False def start(self, s): pass def stop(self): pass canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False)) fsm_left = _FSM() fsm_right = _FSM() ctx = _main.RunContext( cfg=cfg, capture=lambda: np.zeros((200, 1800, 3), dtype=np.uint8), canary=canary, detector=MagicMock(), fsm=fsm_left, notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path, cmd_queue=MagicMock(), state=state_obj, levels_extractor_factory=lambda *a, **kw: None, lifecycle=None, ) det_l, det_r = _Det(), _Det() ctx.charts = [ _main.ChartState("left", initial[0], det_l, fsm_left), _main.ChartState("right", initial[1], det_r, fsm_right), ] monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: pulsed) await _main._run_multi_tick(ctx) # Sub-ROI silently updated. assert ctx.charts[0].sub_roi == pulsed[0] assert ctx.charts[1].sub_roi == pulsed[1] assert det_l.last_roi == pulsed[0] assert det_r.last_roi == pulsed[1] # FSM identity preserved (not rebuilt). assert ctx.charts[0].fsm is fsm_left assert ctx.charts[1].fsm is fsm_right # Critical: no layout_change event, no status alert. assert not any(e.get("event") == "layout_change" for e in audit_events) assert not any("Layout TS schimbat" in a.title for a in notifier_alerts) def test_commit_layout_change_alert_is_silent(monkeypatch): """Layout-change alert on real count change uses silent=True so Telegram doesn't ping the user. Re-anchor info still visible in chat history. """ import atm.main as _main from atm.config import ROI from atm.state_machine import StateMachine cfg = MagicMock() cfg.lockout_s = 60 cfg.colors = {} cfg.debounce_depth = 3 cfg.confidence_min = 0.5 cfg.dot_roi = ROI(x=0, y=0, w=600, h=35) notifier_alerts: list = [] audit_events: list = [] class _N: def send(self, a): notifier_alerts.append(a) class _A: def log(self, e): audit_events.append(e) class _S: is_running = False def stop(self): pass state_obj = _main._LoopState() ctx = _main.RunContext( cfg=cfg, capture=lambda: None, canary=MagicMock(), detector=MagicMock(), fsm=StateMachine(60), notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), cmd_queue=MagicMock(), state=state_obj, levels_extractor_factory=lambda *a, **kw: None, lifecycle=None, ) ctx.charts = [ _main.ChartState("only", ROI(x=0, y=0, w=400, h=35), MagicMock(), StateMachine(60)), ] new_strips = [ ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35), ] _main._commit_layout_change(ctx, new_strips, now=100.0) layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title] assert len(layout_alerts) == 1 assert layout_alerts[0].silent is True # --------------------------------------------------------------------------- # Auto-rebase on layout change (two-signal gate: drift + strip-count change) # --------------------------------------------------------------------------- class _FakeCanary: """Stand-in for atm.canary.Canary that lets tests drive drift state. Mirrors the new split API (check pure, commit_pause + rebase explicit). """ def __init__(self, drift_distance: int = 0): self.drift_distance = drift_distance self.is_paused = False self.commit_pause_calls: list[int] = [] self.rebase_calls: list[str] = [] self.resume_calls = 0 def check(self, _frame): return types.SimpleNamespace( distance=self.drift_distance, drifted=self.drift_distance > 0, paused=self.is_paused, ) def commit_pause(self, distance: int) -> None: if self.is_paused: return self.is_paused = True self.commit_pause_calls.append(distance) def rebase(self, new_phash: str) -> None: self.rebase_calls.append(new_phash) def resume(self) -> None: self.is_paused = False self.resume_calls += 1 def _build_multi_tick_ctx(tmp_path, cfg, canary, initial_strips): """Construct a RunContext suitable for driving _run_multi_tick. Returns (ctx, notifier_alerts, audit_events, detector_stubs). Charts are seeded with stub Detectors that record dot_roi updates. """ import atm.main as _main from atm.detector import DetectionResult notifier_alerts: list = [] audit_events: list = [] class _Det: def __init__(self): self.last_roi = None def step(self, ts, frame=None): return DetectionResult( ts=ts, window_found=True, dot_found=False, rgb=None, match=None, accepted=False, color=None, ) def update_dot_roi(self, roi): self.last_roi = roi class _FSM: state = types.SimpleNamespace(value="IDLE") _last_fire: dict = {} class _N: def send(self, a): notifier_alerts.append(a) class _A: def log(self, e): audit_events.append(e) class _S: is_running = False def start(self, s): pass def stop(self): pass state_obj = _main._LoopState() detectors = [_Det() for _ in initial_strips] charts = [ _main.ChartState( chart_id=("left" if i == 0 else "right") if len(initial_strips) == 2 else "only", sub_roi=strip, detector=detectors[i], fsm=_FSM(), ) for i, strip in enumerate(initial_strips) ] ctx = _main.RunContext( cfg=cfg, capture=lambda: np.zeros((200, 1800, 3), dtype=np.uint8), canary=canary, detector=MagicMock(), fsm=_FSM(), notifier=_N(), audit=_A(), detection_log=_A(), scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path, cmd_queue=MagicMock(), state=state_obj, levels_extractor_factory=lambda *a, **kw: None, lifecycle=None, ) ctx.charts = charts return ctx, notifier_alerts, audit_events, detectors def _make_minimal_cfg_for_multi_tick(): """Minimal cfg that satisfies _run_multi_tick + _commit_layout_change paths.""" from atm.config import ROI cfg = MagicMock() cfg.lockout_s = 60 cfg.colors = {} cfg.debounce_depth = 3 cfg.confidence_min = 0.5 cfg.dot_roi = ROI(x=0, y=0, w=600, h=35) cfg.window_title = None cfg.attach_screenshots = types.SimpleNamespace( arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False, ) cfg.config_version = "test-cfg" cfg.canary = types.SimpleNamespace( roi=ROI(x=0, y=0, w=50, h=50), baseline_phash="0" * 64, drift_threshold=8, ) return cfg @pytest.mark.asyncio async def test_multi_tick_drift_with_strip_count_change_auto_rebases(monkeypatch, tmp_path): """Drift + strip count 2→1 → silent auto-rebase, no pause, single combined alert.""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) cfg = _make_minimal_cfg_for_multi_tick() canary = _FakeCanary(drift_distance=156) initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] new_strips = [ROI(x=0, y=0, w=900, h=35)] # 2 → 1 ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( tmp_path, cfg, canary, initial, ) monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) rewrite_calls: list = [] def _stub_rewrite(path, new_phash): rewrite_calls.append((path, new_phash)) return "OLD_PHASH" monkeypatch.setattr(_main, "_rewrite_baseline_phash", _stub_rewrite) await _main._run_multi_tick(ctx) # Layout committed: charts shrank to 1. assert len(ctx.charts) == 1 assert ctx.charts[0].sub_roi == new_strips[0] # Canary NOT paused (auto-rebase path). assert canary.is_paused is False assert canary.commit_pause_calls == [] # rebase() called with the new phash that was also passed to _rewrite_baseline_phash. assert len(canary.rebase_calls) == 1 assert len(rewrite_calls) == 1 new_phash = canary.rebase_calls[0] assert rewrite_calls[0][1] == new_phash # Audit: layout_change_with_rebase + standard layout_change (from _commit_layout_change). rebase_events = [e for e in audit_events if e.get("event") == "layout_change_with_rebase"] assert len(rebase_events) == 1 assert rebase_events[0]["old_n"] == 2 assert rebase_events[0]["new_n"] == 1 assert rebase_events[0]["distance"] == 156 assert rebase_events[0]["old_phash"] == "OLD_PHASH" assert rebase_events[0]["new_phash"] == new_phash # No "paused" audit event. assert not any(e.get("event") == "paused" for e in audit_events) # Exactly one combined alert. Generic layout_change alert was suppressed. combined = [a for a in notifier_alerts if "auto-rebased" in a.title] assert len(combined) == 1 assert "2→1" in combined[0].title plain_layout = [a for a in notifier_alerts if "Layout TS schimbat" in a.title and "auto-rebased" not in a.title] assert plain_layout == [] @pytest.mark.asyncio async def test_multi_tick_drift_with_same_strip_count_pauses(monkeypatch, tmp_path): """Drift without strip count change → real drift → pause as before.""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) cfg = _make_minimal_cfg_for_multi_tick() canary = _FakeCanary(drift_distance=156) initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] same_count = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( tmp_path, cfg, canary, initial, ) monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: same_count) monkeypatch.setattr(_main, "_rewrite_baseline_phash", lambda *a, **kw: pytest.fail("should not be called")) results = await _main._run_multi_tick(ctx) assert results == [] assert canary.is_paused is True assert canary.commit_pause_calls == [156] assert len(ctx.charts) == 2 # unchanged paused_events = [e for e in audit_events if e.get("event") == "paused"] assert len(paused_events) == 1 assert paused_events[0]["drift"] == 156 assert canary.rebase_calls == [] @pytest.mark.asyncio async def test_multi_tick_drift_with_zero_strips_pauses(monkeypatch, tmp_path): """Strip detection returns [] (chart blackout) → pause, do NOT auto-rebase.""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) cfg = _make_minimal_cfg_for_multi_tick() canary = _FakeCanary(drift_distance=156) initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] ctx, _alerts, audit_events, _ = _build_multi_tick_ctx( tmp_path, cfg, canary, initial, ) monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: []) monkeypatch.setattr(_main, "_rewrite_baseline_phash", lambda *a, **kw: pytest.fail("should not be called")) results = await _main._run_multi_tick(ctx) assert results == [] assert canary.is_paused is True assert canary.commit_pause_calls == [156] assert canary.rebase_calls == [] assert any(e.get("event") == "paused" for e in audit_events) @pytest.mark.asyncio async def test_multi_tick_no_drift_strip_count_change_unchanged_path(monkeypatch, tmp_path): """Strip count change without drift → existing _commit_layout_change path, canary baseline untouched, silent layout-change alert.""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) cfg = _make_minimal_cfg_for_multi_tick() canary = _FakeCanary(drift_distance=0) # no drift initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] new_strips = [ROI(x=0, y=0, w=900, h=35)] ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( tmp_path, cfg, canary, initial, ) monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) monkeypatch.setattr(_main, "_rewrite_baseline_phash", lambda *a, **kw: pytest.fail("should not be called")) await _main._run_multi_tick(ctx) assert len(ctx.charts) == 1 assert canary.rebase_calls == [] # Standard layout_change alert (not suppressed). layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title] assert len(layout_alerts) == 1 assert "auto-rebased" not in layout_alerts[0].title assert any(e.get("event") == "layout_change" for e in audit_events) assert not any(e.get("event") == "layout_change_with_rebase" for e in audit_events) @pytest.mark.asyncio async def test_multi_tick_auto_rebase_toml_failure_falls_back_to_pause(monkeypatch, tmp_path): """When _rewrite_baseline_phash raises, fall back to commit_pause + audit.""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) cfg = _make_minimal_cfg_for_multi_tick() canary = _FakeCanary(drift_distance=156) initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] new_strips = [ROI(x=0, y=0, w=900, h=35)] ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( tmp_path, cfg, canary, initial, ) monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) def _boom(_path, _new): raise ValueError("expected exactly 1 baseline_phash line, found 0") monkeypatch.setattr(_main, "_rewrite_baseline_phash", _boom) results = await _main._run_multi_tick(ctx) assert results == [] assert canary.is_paused is True assert canary.commit_pause_calls == [156] # Charts NOT mutated. assert len(ctx.charts) == 2 fail_events = [e for e in audit_events if e.get("event") == "auto_rebase_failed"] assert len(fail_events) == 1 paused_events = [e for e in audit_events if e.get("event") == "paused" and e.get("reason") == "auto_rebase_failed"] assert len(paused_events) == 1 @pytest.mark.asyncio async def test_multi_tick_symmetric_1_to_2(monkeypatch, tmp_path): """1→2 transition is symmetric: same auto-rebase + layout commit path.""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) cfg = _make_minimal_cfg_for_multi_tick() canary = _FakeCanary(drift_distance=120) initial = [ROI(x=0, y=0, w=900, h=35)] new_strips = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( tmp_path, cfg, canary, initial, ) monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) monkeypatch.setattr(_main, "_rewrite_baseline_phash", lambda path, new_phash: "OLD") await _main._run_multi_tick(ctx) assert len(ctx.charts) == 2 assert canary.is_paused is False assert len(canary.rebase_calls) == 1 rebase_events = [e for e in audit_events if e.get("event") == "layout_change_with_rebase"] assert len(rebase_events) == 1 assert rebase_events[0]["old_n"] == 1 assert rebase_events[0]["new_n"] == 2 combined = [a for a in notifier_alerts if "auto-rebased" in a.title] assert len(combined) == 1 assert "1→2" in combined[0].title @pytest.mark.asyncio async def test_multi_tick_already_paused_short_circuits(monkeypatch, tmp_path): """is_paused=True at tick start → return [], no auto-rebase attempted.""" import atm.main as _main from atm.config import ROI monkeypatch.chdir(tmp_path) cfg = _make_minimal_cfg_for_multi_tick() canary = _FakeCanary(drift_distance=156) canary.is_paused = True # already paused from a prior tick initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] new_strips = [ROI(x=0, y=0, w=900, h=35)] ctx, _alerts, audit_events, _ = _build_multi_tick_ctx( tmp_path, cfg, canary, initial, ) # Strip detection should NOT run on the already-paused short-circuit path. detect_calls: list = [] def _detect(c, f): detect_calls.append((c, f)) return new_strips monkeypatch.setattr(_main, "_detect_strips_for_ctx", _detect) monkeypatch.setattr(_main, "_rewrite_baseline_phash", lambda *a, **kw: pytest.fail("should not be called")) results = await _main._run_multi_tick(ctx) assert results == [] assert detect_calls == [] # short-circuited before strip detection assert canary.rebase_calls == [] assert canary.commit_pause_calls == [] # already paused; no new commit assert len(ctx.charts) == 2 assert any(e.get("event") == "paused" for e in audit_events)