Files
atm/tests/test_main.py
Marius Mutu c950a5a699 feat(multi-chart): refactor _run_multi_tick + fix alert spam pe oscilație strip
Bug critic: _strips_match(tol=10) trip pe pulsații naturale de lățime ~18px între
ticks (ex. 792↔810px). Fiecare trip → _commit_layout_change → reset FSM + alert
Telegram + scheduler stop. Logul 2026-05-04.jsonl arăta 576 evenimente
layout_change/zi, plus prime alerts repetate la dark_red/dark_green (FSM resetat
înghite lockout-ul) și sincronizare cross-chart pe ambele FSM-uri simultan.

Fix:
- main.py:1511 — gate doar pe count change (len(new) != len(current));
  count stabil → silent update sub_roi indiferent de jitter
- main.py:1438 — silent=True pe alert layout_change (Telegram fără sunet)
- 2 teste regresie noi: width oscillation 792↔810 + silent assertion
- 2 teste async reparate: bootstrap _detect_strips_for_ctx pentru ScriptedDetector
  (regresie după ce _run_multi_tick a devenit unica cale de detecție)

Plus refactor multi-chart pre-existent: layout.py modul nou, _detect_strips_for_ctx,
ChartState per-chart FSM/Detector, ROI per-strip pe screenshots, scripts/diag_*.

Verificat: 292 passed, 2 skipped în 10s.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-05-05 17:59:18 +03:00

2092 lines
73 KiB
Python

"""Tests for atm.main unified CLI."""
from __future__ import annotations
import asyncio
import os
import subprocess
import sys
import types
from dataclasses import dataclass
from pathlib import Path
from unittest.mock import MagicMock
import cv2
import numpy as np
import pytest
SUBCOMMANDS = ["calibrate", "label", "dryrun", "run", "journal", "report"]
# Ensure subprocess invocations find the atm package even without pip install
_SRC = str(Path(__file__).resolve().parent.parent / "src")
_SUBPROCESS_ENV = {**os.environ, "PYTHONPATH": _SRC}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_config_class(cfg=None):
"""Return a Config-like class whose load_current() returns *cfg*."""
if cfg is None:
cfg = MagicMock()
# window_title must be a real falsy value, not a MagicMock auto-attribute;
# otherwise _cmd_run enters _focus_window_by_title and TypeErrors.
cfg.window_title = None
mock_cls = MagicMock()
mock_cls.load_current.return_value = cfg
return mock_cls
# ---------------------------------------------------------------------------
# test_help_works
# ---------------------------------------------------------------------------
def test_help_works():
result = subprocess.run(
[sys.executable, "-m", "atm", "--help"],
capture_output=True,
text=True,
env=_SUBPROCESS_ENV,
)
assert result.returncode == 0, result.stderr
# ---------------------------------------------------------------------------
# test_subcommands_listed
# ---------------------------------------------------------------------------
def test_subcommands_listed():
result = subprocess.run(
[sys.executable, "-m", "atm", "--help"],
capture_output=True,
text=True,
env=_SUBPROCESS_ENV,
)
output = result.stdout
for cmd in SUBCOMMANDS:
assert cmd in output, f"Expected subcommand '{cmd}' in --help output"
# ---------------------------------------------------------------------------
# test_dryrun_wiring
# ---------------------------------------------------------------------------
@dataclass
class _DryrunResult:
acceptance_pass: bool
def _make_dryrun_module(acceptance_pass: bool):
mod = types.ModuleType("atm.dryrun")
mod.dryrun = lambda *a, **kw: _DryrunResult(acceptance_pass=acceptance_pass)
mod.print_report = lambda r: None
return mod
def test_dryrun_wiring_pass(monkeypatch, tmp_path):
import atm.main as _main
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=True))
monkeypatch.setattr("atm.main.Config", _mock_config_class())
with pytest.raises(SystemExit) as exc_info:
_main.main(["dryrun", str(tmp_path)])
assert exc_info.value.code == 0
def test_dryrun_wiring_fail(monkeypatch, tmp_path):
import atm.main as _main
monkeypatch.setattr("atm.main.dryrun", _make_dryrun_module(acceptance_pass=False))
monkeypatch.setattr("atm.main.Config", _mock_config_class())
with pytest.raises(SystemExit) as exc_info:
_main.main(["dryrun", str(tmp_path)])
assert exc_info.value.code == 1
# ---------------------------------------------------------------------------
# test_report_current_week_default
# ---------------------------------------------------------------------------
def test_report_current_week_default(monkeypatch, tmp_path):
import atm.main as _main
# Journal.all returns no entries — report should print a zero-trade week
monkeypatch.setattr("atm.journal.Journal.all", lambda self: [])
# Should not raise; no sys.exit expected
_main.main(["report", "--file", str(tmp_path / "trades.jsonl")])
# ---------------------------------------------------------------------------
# test_run_live_dry
# ---------------------------------------------------------------------------
def test_run_live_dry(monkeypatch):
import atm.main as _main
calls: list[dict] = []
def _mock_run_live(cfg, duration_s=None, capture_stub=False):
calls.append({"cfg": cfg, "duration_s": duration_s, "capture_stub": capture_stub})
monkeypatch.setattr("atm.main.run_live", _mock_run_live)
monkeypatch.setattr("atm.main.Config", _mock_config_class())
_main.main(["run", "--duration", "0"])
assert len(calls) == 1
assert calls[0]["duration_s"] == pytest.approx(0.0)
# ---------------------------------------------------------------------------
# Regression integration test — user bug 2026-04-16.
# Session starts with an accepted gray tick followed by dark_red. Catchup
# synth-arm must fire on dark_red (the previous gray consumed first_accepted),
# then light_red triggers SELL. Proves run_live wiring dispatches alerts for
# the user's exact scenario.
# ---------------------------------------------------------------------------
def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
import numpy as np
import atm.main as _main
from atm.detector import DetectionResult
captured = []
class FakeFanout:
def __init__(self, *a, **kw):
pass
def send(self, alert):
captured.append(alert)
def stop(self):
pass
class FakeCanaryResult:
distance = 0
drifted = False
paused = False
class FakeCanary:
def __init__(self, *a, **kw):
self.is_paused = False
def check(self, frame):
return FakeCanaryResult()
def resume(self):
pass
class _StopLoop(Exception):
pass
class ScriptedDetector:
_script = [
("gray", True),
("gray", True),
("dark_red", True),
("dark_red", True),
("light_red", True),
]
def __init__(self, *a, **kw):
self._i = 0
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]
self._i += 1
return DetectionResult(
ts=ts,
window_found=True,
dot_found=True,
rgb=(1, 1, 1),
match=None,
accepted=accepted,
color=color,
)
def fake_build_capture(cfg, capture_stub=False):
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
cfg = MagicMock()
cfg.lockout_s = 60
cfg.heartbeat_min = 999
cfg.loop_interval_s = 0
cfg.config_version = "test"
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
cfg.canary.drift_threshold = 10
cfg.dot_roi.x = 0
cfg.dot_roi.y = 0
cfg.dot_roi.w = 10
cfg.dot_roi.h = 10
cfg.chart_window_region = None
monkeypatch.chdir(tmp_path)
class _Stub:
def __init__(self, *a, **kw):
pass
def log(self, *a, **kw):
pass
def close(self, *a, **kw):
pass
def step(self, *a, **kw):
return types.SimpleNamespace(status="pending", levels=None)
class _StubPoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StubScheduler:
def __init__(self, *a, **kw):
self.is_running = False
def start(self, interval_s): self.is_running = True
def stop(self): self.is_running = False
async def run(self): await asyncio.sleep(9999)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
# Bootstrap a single chart so _run_multi_tick populates ctx.charts on tick 1.
# Frame is zeros → real detect_strips returns [] → without this, charts stays
# empty and the ScriptedDetector loop never advances (regression after the
# multi-chart refactor made _run_multi_tick the single detection path).
from atm.config import ROI
_bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)]
monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip)
with pytest.raises(_StopLoop):
_main.run_live(cfg, duration_s=None)
arm = [a for a in captured if a.kind == "arm"]
prime = [a for a in captured if a.kind == "prime"]
trigger = [a for a in captured if a.kind == "trigger"]
assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})"
assert arm[0].direction == "SELL"
assert "recuperare" in (arm[0].title + arm[0].body).lower()
assert len(prime) == 1
assert prime[0].direction == "SELL"
assert "recuperare" in (prime[0].title + prime[0].body).lower()
assert len(trigger) == 1
assert trigger[0].direction == "SELL"
# ---------------------------------------------------------------------------
# MUST-HAVE: async lifecycle integration test
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
import numpy as np
import atm.main as _main
from atm.detector import DetectionResult
captured_alerts: list = []
scheduler_events: list[str] = []
class FakeFanout:
def __init__(self, *a, **kw): pass
def send(self, alert): captured_alerts.append(alert)
def stop(self): pass
def stats(self): return {}
class FakeCanaryResult:
distance = 0
drifted = False
paused = False
class FakeCanary:
def __init__(self, *a, **kw): self.is_paused = False
def check(self, frame): return FakeCanaryResult()
def resume(self): pass
# Scheduler tracks start/stop calls
class FakeScheduler:
def __init__(self, *a, **kw):
self.is_running = False
self.interval_s = None
def start(self, interval_s):
self.is_running = True
self.interval_s = interval_s
scheduler_events.append(f"start:{interval_s}")
def stop(self):
self.is_running = False
scheduler_events.append("stop")
async def run(self):
await asyncio.sleep(9999)
class FakePoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StopLoop(Exception): pass
class ScriptedDetector:
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
_script = [
("turquoise", True),
("dark_green", True),
("light_green", True),
]
def __init__(self, *a, **kw): self._i = 0
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]
self._i += 1
return DetectionResult(ts=ts, window_found=True, dot_found=True,
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
@property
def rolling(self): return []
def fake_build_capture(cfg, capture_stub=False):
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
cfg = MagicMock()
cfg.lockout_s = 60
cfg.heartbeat_min = 999
cfg.loop_interval_s = 0
cfg.config_version = "test"
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
cfg.canary.drift_threshold = 10
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
cfg.chart_window_region = None
cfg.telegram.auto_poll_interval_s = 180
cfg.telegram.bot_token = "tok"
cfg.telegram.chat_id = "123"
cfg.telegram.allowed_chat_ids = ("123",)
fake_sched = FakeScheduler()
monkeypatch.chdir(tmp_path)
class _Stub:
def __init__(self, *a, **kw): pass
def log(self, *a, **kw): pass
def close(self, *a, **kw): pass
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
# Bootstrap ctx.charts on tick 1 — see test_run_live_catchup_sell for context.
from atm.config import ROI
_bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)]
monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip)
with pytest.raises(_StopLoop):
await _main.run_live_async(cfg, duration_s=None)
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
assert arm_alerts[0].direction == "BUY"
assert len(prime_alerts) == 1
assert prime_alerts[0].direction == "BUY"
assert len(trigger_alerts) == 1
assert trigger_alerts[0].direction == "BUY"
# Scheduler must have started (on PRIME) and stopped (on FIRE)
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
start_idx = scheduler_events.index("start:180")
stop_idx = scheduler_events.index("stop")
assert start_idx < stop_idx, "scheduler started after it stopped"
# ---------------------------------------------------------------------------
# Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally,
# even when canary is paused or when detection is otherwise skipped.
# Prior bug: `continue` past the drain loop caused commands to pile up.
# ---------------------------------------------------------------------------
def _make_ctx_for_drain(cmd_queue, dispatched: list):
"""Build a minimal RunContext where _dispatch_command just records calls."""
import atm.main as _main
class _FakeAudit:
def __init__(self): self.events = []
def log(self, e): self.events.append(e)
class _FakeNotifier:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _FakeCanary:
def __init__(self, paused=True):
self.is_paused = paused
class _FakeScheduler:
is_running = False
interval_s = None
def start(self, s): pass
def stop(self): pass
state = _main._LoopState(start=0.0)
ctx = _main.RunContext(
cfg=MagicMock(),
capture=lambda: None,
canary=_FakeCanary(paused=True),
detector=MagicMock(),
fsm=MagicMock(),
notifier=_FakeNotifier(),
audit=_FakeAudit(),
detection_log=_FakeAudit(),
scheduler=_FakeScheduler(),
samples_dir=Path("."),
fires_dir=Path("."),
cmd_queue=cmd_queue,
state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
return ctx
@pytest.mark.asyncio
async def test_drain_works_when_canary_paused(monkeypatch):
"""Regression: when canary.is_paused, _drain_cmd_queue still dispatches.
Prior bug: detection loop `continue`'d past the drain block whenever the
tick returned res=None (canary paused). Commands accumulated forever.
"""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="status"))
await q.put(Command(action="ss"))
dispatched: list = []
async def _fake_dispatch(ctx, cmd):
dispatched.append(cmd.action)
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, dispatched)
await _main._drain_cmd_queue(ctx)
assert dispatched == ["status", "ss"]
assert q.empty()
@pytest.mark.asyncio
async def test_drain_works_when_out_of_window(monkeypatch):
"""Drain must still fire when the tick skipped (e.g. out of operating hours).
The refactored loop runs _drain_cmd_queue unconditionally after every tick,
regardless of `_TickSyncResult` content.
"""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="stop"))
dispatched: list = []
async def _fake_dispatch(ctx, cmd):
dispatched.append(cmd.action)
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, dispatched)
# Simulate out-of-window tick (empty _TickSyncResult, no res)
await _main._handle_fsm_result(ctx, _main._TickSyncResult())
await _main._drain_cmd_queue(ctx)
assert dispatched == ["stop"]
@pytest.mark.asyncio
async def test_drain_isolates_dispatch_exceptions(monkeypatch):
"""If one command raises, remaining commands still drain + warn alert sent."""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="status"))
await q.put(Command(action="ss"))
attempts: list = []
async def _fake_dispatch(ctx, cmd):
attempts.append(cmd.action)
if cmd.action == "status":
raise RuntimeError("boom")
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, attempts)
await _main._drain_cmd_queue(ctx)
assert attempts == ["status", "ss"]
# warn alert for the failed command
warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"]
assert any("status" in t for t in warn_titles)
# command_error audit event
errs = [e for e in ctx.audit.events if e.get("event") == "command_error"]
assert len(errs) == 1 and errs[0]["action"] == "status"
# ---------------------------------------------------------------------------
# Commit 4: operating hours + LifecycleState transitions
# ---------------------------------------------------------------------------
from zoneinfo import ZoneInfo as _ZI # noqa: E402
import datetime as _dt # noqa: E402
def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"),
start="09:30", stop="16:00", tz="America/New_York"):
"""Build a lightweight cfg-like object with operating_hours populated."""
oh = types.SimpleNamespace(
enabled=enabled,
timezone=tz,
weekdays=weekdays,
start_hhmm=start,
stop_hhmm=stop,
_tz_cache=_ZI(tz) if enabled else None,
)
return types.SimpleNamespace(operating_hours=oh, window_title=None)
def _fake_canary(paused=False):
return types.SimpleNamespace(is_paused=paused)
@pytest.mark.parametrize(
"local_dt,expected",
[
# Monday 09:30 NY — exact open → active (None)
(_dt.datetime(2026, 4, 20, 9, 30), None),
# Monday 16:00 NY — exact close → inactive (>= stop)
(_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"),
# Monday 08:00 NY — before open
(_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"),
# Monday 12:00 NY — active
(_dt.datetime(2026, 4, 20, 12, 0), None),
# Saturday 12:00 NY — weekend
(_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"),
# Sunday 23:00 NY — weekend
(_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"),
],
)
def test_operating_hours_skip_matrix(local_dt, expected):
"""Timezone-aware start/stop + weekday checks."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
now_ts = local_dt.replace(tzinfo=tz).timestamp()
lifecycle = _main.LifecycleState()
result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary())
assert result == expected
def test_market_open_close_transitions_logged_once():
"""Crossing a boundary emits exactly one market_open / market_closed event."""
import atm.main as _main
audit_events = []
alerts = []
class _A:
def log(self, e): audit_events.append(e)
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState()
canary = _fake_canary()
# Prime as closed (before open, Monday 08:00)
pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp()
skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N())
# First evaluation seeds state, no alert yet.
assert lifecycle.last_window_state == "closed"
assert alerts == []
assert audit_events == []
# Transition to open
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
skip_mid = _main._should_skip(mid, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N())
assert lifecycle.last_window_state == "open"
assert len(alerts) == 1
assert any(e.get("event") == "market_open" for e in audit_events)
# Repeated open tick — no duplicate log
alerts.clear()
audit_events.clear()
skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N())
assert alerts == []
assert audit_events == []
# Transition to close
close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp()
skip_close = _main._should_skip(close, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N())
assert lifecycle.last_window_state == "closed"
assert any(e.get("event") == "market_closed" for e in audit_events)
def test_market_transition_sends_notification():
"""market_open / market_closed transitions produce kind=status alerts."""
import atm.main as _main
alerts = []
class _A:
def log(self, e): pass
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState(last_window_state="closed")
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
_main._maybe_log_transition(None, lifecycle, mid, _A(), _N())
assert len(alerts) == 1
assert alerts[0].kind == "status"
assert "piața" in alerts[0].title.lower() or "monitorizare" in alerts[0].body.lower()
def test_startup_in_window_suppresses_market_open():
"""R2 #20: first evaluation in-window just seeds state; no alert fires."""
import atm.main as _main
alerts = []
events = []
class _A:
def log(self, e): events.append(e)
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState() # last_window_state is None
in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary())
assert skip is None
_main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N())
# Seeded silently
assert lifecycle.last_window_state == "open"
assert alerts == []
assert not any(e.get("event") == "market_open" for e in events)
# Two more ticks, still in-window → no spurious alert
for _ in range(2):
skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary())
_main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N())
assert alerts == []
def test_operating_hours_weekday_locale_independent():
"""R2 #22: weekday check must not depend on process locale (strftime('%a'))."""
import locale as _locale
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
# Saturday 12:00 NY
sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
original = _locale.setlocale(_locale.LC_TIME)
try:
for loc in ("C", "de_DE.UTF-8"):
try:
_locale.setlocale(_locale.LC_TIME, loc)
except _locale.Error:
continue # locale not installed → skip gracefully
lifecycle = _main.LifecycleState()
result = _main._should_skip(sat, lifecycle, cfg, _fake_canary())
assert result == "out_of_window_weekend", (
f"locale={loc} returned {result!r}"
)
finally:
try:
_locale.setlocale(_locale.LC_TIME, original)
except _locale.Error:
_locale.setlocale(_locale.LC_TIME, "C")
def test_should_skip_user_paused_wins():
import atm.main as _main
cfg = _oh_cfg()
lifecycle = _main.LifecycleState(user_paused=True)
# Mid-Monday (in-window) — should still skip because user_paused
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused"
def test_should_skip_canary_drift_wins_over_window():
import atm.main as _main
cfg = _oh_cfg()
lifecycle = _main.LifecycleState()
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused"
# ---------------------------------------------------------------------------
# Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21)
# ---------------------------------------------------------------------------
def _dispatch_ctx(canary=None, lifecycle=None, cfg=None):
"""Minimal RunContext for _dispatch_command unit tests."""
import atm.main as _main
class _A:
def __init__(self): self.events = []
def log(self, e): self.events.append(e)
class _N:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _S:
is_running = False
interval_s = None
def start(self, s): self.is_running = True
def stop(self): self.is_running = False
if canary is None:
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
if lifecycle is None:
lifecycle = _main.LifecycleState()
if cfg is None:
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
# Skip window focus in dispatch tests — MagicMock window_title would
# propagate into _focus_window_by_title (real Win32 call).
cfg.window_title = None
state = _main._LoopState(start=0.0)
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=canary,
detector=MagicMock(), fsm=MagicMock(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=lifecycle,
)
return ctx
@pytest.mark.asyncio
async def test_pause_command_sets_user_paused_and_skips_detection():
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
await _main._dispatch_command(ctx, Command(action="pause"))
assert ctx.lifecycle.user_paused is True
# When combined with _should_skip, we get user_paused
assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused"
# Audit + notif
assert any(e.get("event") == "user_paused" for e in ctx.audit.events)
assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts)
@pytest.mark.asyncio
async def test_resume_clears_user_paused_and_canary_when_forced():
import atm.main as _main
from atm.commands import Command
canary_state = {"paused": True}
canary = types.SimpleNamespace(
is_paused=True,
resume=lambda: canary_state.__setitem__("paused", False),
)
# Re-bind is_paused via property so resume() effect is visible
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"]
assert force_events and force_events[0]["force"] is True
@pytest.mark.asyncio
async def test_resume_plain_also_clears_canary_drift():
"""2026-04-21 decision: /resume (no arg) now clears BOTH user_paused and
canary drift. /resume force remains accepted as legacy alias. Previous
behavior (force required for drift) was a UX trap — see plan doc."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
await _main._dispatch_command(ctx, Command(action="resume")) # no force
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False # drift cleared without force
# Audit event still records was_drift + force=False for traceability
resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"]
assert resumed and resumed[0]["was_drift"] is True and resumed[0]["force"] is False
# Message mentions drift-pause was cleared (kind is "screenshot" now since /resume attaches image)
alerts = ctx.notifier.alerts
assert alerts and ("drift" in (alerts[0].title + alerts[0].body).lower())
@pytest.mark.asyncio
async def test_resume_force_alias_still_works():
"""/resume force (value=1) remains accepted — legacy muscle memory."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"]
assert resumed and resumed[0]["force"] is True
# ---------------------------------------------------------------------------
# /rebase: propose + confirm flow
# ---------------------------------------------------------------------------
def _rebase_cfg_and_path(tmp_path):
"""Write a minimal TOML with baseline_phash, return (cfg_live, path, cfg_version)."""
name = "rebase_test"
path = tmp_path / "configs" / f"{name}.toml"
path.parent.mkdir(parents=True, exist_ok=True)
old = "a" * 64
path.write_text(
f'[canary]\n'
f'baseline_phash = "{old}" # comment stays\n'
f'drift_threshold = 8\n'
f'[canary.roi]\nx = 0\ny = 0\nw = 4\nh = 4\n',
encoding="utf-8",
)
cfg = types.SimpleNamespace(
window_title=None,
config_version=name,
canary=types.SimpleNamespace(
roi=types.SimpleNamespace(x=0, y=0, w=4, h=4),
baseline_phash=old,
drift_threshold=8,
),
telegram=types.SimpleNamespace(auto_poll_interval_s=180),
operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None),
)
return cfg, path, old
def _blue_frame(h=200, w=200):
import numpy as _np
return _np.full((h, w, 3), (50, 80, 20), dtype=_np.uint8)
def test_rewrite_baseline_phash_updates_only_target_line(tmp_path):
import atm.main as _main
p = tmp_path / "cfg.toml"
p.write_text(
'[canary]\n'
'baseline_phash = "deadbeef" # keep this comment\n'
'drift_threshold = 8\n',
encoding="utf-8",
)
old = _main._rewrite_baseline_phash(p, "cafef00d")
assert old == "deadbeef"
txt = p.read_text(encoding="utf-8")
assert 'baseline_phash = "cafef00d" # keep this comment' in txt
assert "drift_threshold = 8" in txt
def test_rewrite_baseline_phash_raises_when_missing(tmp_path):
import atm.main as _main
p = tmp_path / "cfg.toml"
p.write_text("[canary]\ndrift_threshold = 8\n", encoding="utf-8")
with pytest.raises(ValueError):
_main._rewrite_baseline_phash(p, "ff")
@pytest.mark.asyncio
async def test_rebase_propose_sets_pending_and_does_not_touch_file(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.capture = lambda: _blue_frame()
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
assert ctx.pending_rebase is not None
proposed_ts, new_phash, pending_path = ctx.pending_rebase
assert new_phash != old
assert pending_path == Path("configs") / f"{cfg.config_version}.toml"
# File unchanged at propose time
assert f'"{old}"' in cfg_path.read_text(encoding="utf-8")
events = [e["event"] for e in ctx.audit.events]
assert "rebase_proposed" in events
@pytest.mark.asyncio
async def test_rebase_confirm_applies_and_clears_pauses(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path)
canary = _Canary()
ctx = _dispatch_ctx(cfg=cfg, canary=canary)
ctx.capture = lambda: _blue_frame()
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
ctx.lifecycle.user_paused = True
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
assert ctx.pending_rebase is not None
_, new_phash, _ = ctx.pending_rebase
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
assert ctx.pending_rebase is None
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
assert cfg.canary.baseline_phash == new_phash
txt = cfg_path.read_text(encoding="utf-8")
assert f'"{new_phash}"' in txt
assert f'"{old}"' not in txt
applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"]
assert applied and applied[0]["old_phash"] == old and applied[0]["new_phash"] == new_phash
@pytest.mark.asyncio
async def test_rebase_confirm_without_pending_warns(tmp_path):
import atm.main as _main
from atm.commands import Command
cfg, _, _ = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
assert ctx.pending_rebase is None
alerts = ctx.notifier.alerts
assert alerts and "nimic" in alerts[0].title.lower()
applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"]
assert not applied
@pytest.mark.asyncio
async def test_rebase_confirm_expired_pending_warns(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
# Pretend propose happened 1h ago
import time as _t
ctx.pending_rebase = (_t.time() - 3600, "b" * 64, cfg_path)
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
assert ctx.pending_rebase is None
# File untouched
assert f'"{old}"' in cfg_path.read_text(encoding="utf-8")
alerts = ctx.notifier.alerts
assert alerts and "expirat" in alerts[0].title.lower()
@pytest.mark.asyncio
async def test_rebase_confirm_mutates_frozen_canary_region(tmp_path, monkeypatch):
"""Regression: CanaryRegion is @dataclass(frozen=True). Plain assignment
raises FrozenInstanceError. /rebase confirm must mirror the new phash into
the live cfg anyway — otherwise canary.check() keeps the stale hash and
drift re-pauses within one tick after the user confirms."""
import atm.main as _main
from atm.commands import Command
from atm.config import CanaryRegion, ROI as _ROI
class _Canary:
def __init__(self): self._p = False
@property
def is_paused(self): return self._p
def resume(self): self._p = False
# Build cfg with the REAL frozen dataclass, not a SimpleNamespace.
_, cfg_path, old = _rebase_cfg_and_path(tmp_path)
cfg = types.SimpleNamespace(
window_title=None,
config_version="rebase_test",
canary=CanaryRegion(
roi=_ROI(x=0, y=0, w=4, h=4),
baseline_phash=old,
drift_threshold=8,
),
telegram=types.SimpleNamespace(auto_poll_interval_s=180),
operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None),
)
ctx = _dispatch_ctx(cfg=cfg, canary=_Canary())
ctx.capture = lambda: _blue_frame()
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
_, new_phash, _ = ctx.pending_rebase
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
# Live cfg mirrors the new hash (in-memory canary.check() sees it)
assert cfg.canary.baseline_phash == new_phash
# TOML on disk also updated
assert f'"{new_phash}"' in cfg_path.read_text(encoding="utf-8")
applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"]
assert applied
@pytest.mark.asyncio
async def test_rebase_propose_capture_failed_warns(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
cfg, _, _ = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.capture = lambda: None
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
assert ctx.pending_rebase is None
alerts = ctx.notifier.alerts
assert alerts and "e" in alerts[0].title.lower() # "Captură eșuată"
@pytest.mark.asyncio
async def test_resume_out_of_window_responds_with_pending_message():
"""/resume while operating-hours window is closed → special body."""
import atm.main as _main
from atm.commands import Command
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed")
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg)
# Pin time to Saturday
import atm.main as _mm
real_time = _mm.time
fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
class _FakeTime:
def time(self): return fake_ts
def monotonic(self): return 0.0
_mm.time = _FakeTime()
try:
await _main._dispatch_command(ctx, Command(action="resume"))
finally:
_mm.time = real_time
assert ctx.lifecycle.user_paused is False
alerts = ctx.notifier.alerts
assert alerts
combined = (alerts[0].title + alerts[0].body).lower()
assert "închis" in combined or "piața" in combined or "ferestr" in combined
@pytest.mark.asyncio
async def test_dispatch_resume_sends_inline_screenshot(monkeypatch, tmp_path):
"""/resume produces a single Alert with image_path + FSM pick caption when capture succeeds."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
ctx.capture = lambda: object() # non-None frame
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
_fake_detections = [{
"idx": 0, "name": "light_green", "rgb": (0, 255, 0),
"distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200),
}]
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "fake_resume.png", _fake_detections))
await _main._dispatch_command(ctx, Command(action="resume"))
# Exactly one alert, with image attached + caption in body.
alerts = ctx.notifier.alerts
assert len(alerts) == 1
alert = alerts[0]
assert alert.image_path == tmp_path / "fake_resume.png"
assert "Monitorizare reluată" in alert.title
assert "← pick" in alert.body
assert "captură eșuată" not in alert.title
@pytest.mark.asyncio
async def test_dispatch_resume_capture_failed_still_resumes(monkeypatch, tmp_path):
"""/resume with capture=None → Alert title contains capture-failed, no image, resume still executes."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
ctx.capture = lambda: None # capture fails
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
await _main._dispatch_command(ctx, Command(action="resume"))
# State still cleared despite capture failure.
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
alerts = ctx.notifier.alerts
assert len(alerts) == 1
assert alerts[0].image_path is None
assert "captură eșuată" in alerts[0].title
assert "Monitorizare reluată" in alerts[0].title
@pytest.mark.asyncio
async def test_dispatch_resume_captures_before_state_clear(monkeypatch, tmp_path):
"""Capture must run BEFORE clearing user_paused / canary.resume() to avoid race with FSM tick."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self):
self._p = True
self.resumed_at: float | None = None
@property
def is_paused(self): return self._p
def resume(self):
self._p = False
self.resumed_at = _capture_sequence[0] if _capture_sequence else 0
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
_capture_sequence: list[int] = []
_capture_called = [0]
def _capture():
_capture_called[0] += 1
# State must still be paused at capture time.
assert ctx.lifecycle.user_paused is True, "capture ran AFTER user_paused was cleared"
assert canary.is_paused is True, "capture ran AFTER canary.resume()"
_capture_sequence.append(_capture_called[0])
return object()
ctx.capture = _capture
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "ok.png", []))
await _main._dispatch_command(ctx, Command(action="resume"))
assert _capture_called[0] == 1
assert ctx.lifecycle.user_paused is False # cleared after capture
@pytest.mark.asyncio
async def test_ss_and_fire_agree_on_rightmost_dot(tmp_path):
"""Parity: _save_inspect_frame's detections[0].pos_abs must match find_rightmost_dot
output on the same frame + ROI. Prevents silent drift between /ss verify and fire path."""
import atm.main as _main
from atm.vision import find_rightmost_dot, crop_roi
from atm.config import ROI, ColorSpec, YAxisCalib
# Synthetic frame with one bright green dot.
frame = np.zeros((100, 200, 3), dtype=np.uint8)
frame[:, :] = (18, 18, 18) # BGR background matching the palette entry below
cv2.circle(frame, (150, 50), 5, (0, 255, 0), -1)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=10, y=10, w=180, h=80),
colors={
"background": ColorSpec(rgb=(18, 18, 18), tolerance=15.0),
"light_green": ColorSpec(rgb=(0, 255, 0), tolerance=60.0),
},
y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=90, p2_price=50.0),
version="parity-test",
)
dot_crop = crop_roi(frame, cfg.dot_roi)
fire_pos = find_rightmost_dot(dot_crop, cfg.colors["background"].rgb)
assert fire_pos is not None
fire_abs = (cfg.dot_roi.x + fire_pos[0], cfg.dot_roi.y + fire_pos[1])
path, detections = _main._save_inspect_frame(frame, cfg, tmp_path, now=123.0)
assert path is not None
assert detections, "inspect should detect the green dot"
inspect_abs = detections[0]["pos_abs"]
assert inspect_abs == fire_abs, (
f"Parity break: fire={fire_abs} inspect={inspect_abs}"
"fire path and /ss would show different rightmost positions."
)
@pytest.mark.asyncio
async def test_status_command_reports_pause_reason():
"""/status body must mention pause reason + window state."""
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
ctx.lifecycle.user_paused = True
# Stub detector.rolling for status
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
body = status[0].body
assert "pauză manuală" in body or "pauza" in body.lower()
@pytest.mark.asyncio
async def test_set_interval_refused_while_canary_paused():
"""2026-04-21: /set_interval must not start scheduler while canary is drift-paused.
Previously started scheduler silently, misleading the user into thinking
detection was live. Now emits a warn and refuses."""
import atm.main as _main
from atm.commands import Command
canary = types.SimpleNamespace(is_paused=True, resume=lambda: None)
ctx = _dispatch_ctx(canary=canary)
await _main._dispatch_command(ctx, Command(action="set_interval", value=60))
# Scheduler must NOT have started
assert ctx.scheduler.is_running is False
# No scheduler_started audit event
assert not any(e.get("event") == "scheduler_started" for e in ctx.audit.events)
# A warn alert must have been sent referencing /resume
warns = [a for a in ctx.notifier.alerts if a.kind == "warn"]
assert warns
combined = (warns[0].title + warns[0].body).lower()
assert "drift" in combined and "/resume" in combined
@pytest.mark.asyncio
async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path):
"""2026-04-21: /ss still captures while canary is paused, but the alert body
must warn that detection is off."""
import atm.main as _main
from atm.commands import Command
canary = types.SimpleNamespace(is_paused=True, resume=lambda: None)
ctx = _dispatch_ctx(canary=canary)
# Bypass window focus + use a simple non-None capture result
fake_frame = object()
ctx = types.SimpleNamespace(**{**ctx.__dict__}) # shallow copy RunContext fields
# Simpler: just override the capture and save functions used
async_capture_called = {"n": 0}
def _capture():
async_capture_called["n"] += 1
return fake_frame
ctx.capture = _capture
ctx.fires_dir = tmp_path
# window_title off so we skip focus branch
ctx.cfg.window_title = None
# stub _save_inspect_frame to return (path, detections)
_fake_detections = [{
"idx": 0, "name": "light_green", "rgb": (0, 255, 0),
"distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200),
}]
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections))
await _main._dispatch_command(ctx, Command(action="ss"))
screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"]
assert screenshots
assert "DETECȚIE OPRITĂ" in screenshots[0].body or "drift" in screenshots[0].body.lower()
assert "/resume" in screenshots[0].body
# Caption with FSM pick must appear alongside the warn.
assert "← pick" in screenshots[0].body
@pytest.mark.asyncio
async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path):
"""/ss body contains caption only when canary is not paused (no warn prefix)."""
import atm.main as _main
from atm.commands import Command
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
ctx = _dispatch_ctx(canary=canary)
ctx.capture = lambda: object()
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
_fake_detections = [{
"idx": 0, "name": "light_green", "rgb": (0, 255, 0),
"distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200),
}]
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections))
await _main._dispatch_command(ctx, Command(action="ss"))
screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"]
assert screenshots
# Body should contain the caption (no warn), not be empty.
assert "← pick" in screenshots[0].body
assert "DETECȚIE OPRITĂ" not in screenshots[0].body
@pytest.mark.asyncio
async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path):
"""E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert.
This test verifies the full command-driven lifecycle in isolation:
- canary starts drift-paused, _should_skip returns drift_paused
- /resume force clears canary + user_paused
- subsequent detection produces SELL fire through normal FSM path
"""
import atm.main as _main
from atm.commands import Command
# Canary with mutable pause state
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
cfg.window_title = None # skip Win32 focus path in unit test
ctx = _dispatch_ctx(canary=canary, cfg=cfg)
# 1. While drift-paused, _should_skip returns drift_paused
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused"
# 2. User issues /resume force
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert canary.is_paused is False
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None
# 3. Feed a yellow→light_red sequence through _handle_tick (FSM path)
from atm.state_machine import StateMachine, State
fsm = StateMachine(lockout_s=60)
class _N:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _A:
def log(self, _e): pass
notif = _N()
audit = _A()
cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True))
_main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock)
_main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock)
tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock)
# FSM reached fire via normal path
assert tr is not None and tr.trigger == "SELL"
assert fsm.state == State.IDLE
# ---------------------------------------------------------------------------
# _in_trading_window tests
# ---------------------------------------------------------------------------
import datetime as _dt
def test_in_trading_window_disabled():
"""Returns True when operating_hours disabled."""
import atm.main as _main
cfg = types.SimpleNamespace(operating_hours=types.SimpleNamespace(enabled=False))
assert _main._in_trading_window(0.0, cfg) is True
def test_in_trading_window_no_oh():
"""Returns True when cfg has no operating_hours attr."""
import atm.main as _main
cfg = types.SimpleNamespace()
assert _main._in_trading_window(0.0, cfg) is True
def test_in_trading_window_in_window():
"""Returns True during configured hours (Monday 12:00 NY)."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
ts = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._in_trading_window(ts, cfg) is True
def test_in_trading_window_out_of_hours():
"""Returns False before market open (Monday 08:00 NY)."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
ts = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp()
assert _main._in_trading_window(ts, cfg) is False
def test_in_trading_window_weekend():
"""Returns False on weekend (Sunday 12:00 NY)."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
ts = _dt.datetime(2026, 4, 19, 12, 0, tzinfo=tz).timestamp()
assert _main._in_trading_window(ts, cfg) is False
def test_heartbeat_suppressed_outside_hours(monkeypatch):
"""_in_trading_window=False prevents heartbeat from sending."""
import atm.main as _main
monkeypatch.setattr(_main, "_in_trading_window", lambda ts, cfg: False)
sent = []
# Simulate the check: outside window → no send
if not _main._in_trading_window(0.0, None):
pass # heartbeat_due reset, continue — no send
assert sent == []
def test_build_heartbeat_alert_active_when_not_paused():
"""Healthy state → title=activ, body shows fsm.state plainly."""
import atm.main as _main
a = _main._build_heartbeat_alert(
fsm_state="ARMED_BUY", fire_count=2, uptime_h=1.5, canary_paused=False,
)
assert a.kind == "heartbeat"
assert a.title == "activ"
assert "ARMED_BUY" in a.body
assert "[drift-pause]" not in a.body
assert "semnale: 2" in a.body
def test_build_heartbeat_alert_paused_when_canary_drift():
"""2026-04-21: heartbeat must reflect canary drift instead of lying with 'activ'."""
import atm.main as _main
a = _main._build_heartbeat_alert(
fsm_state="ARMED_SELL", fire_count=0, uptime_h=3.2, canary_paused=True,
)
assert a.kind == "heartbeat"
assert "pauzat" in a.title
assert "drift" in a.title.lower()
assert "ARMED_SELL" in a.body
assert "[drift-pause]" in a.body
@pytest.mark.asyncio
async def test_status_compact_active():
"""/status produces compact 2-line format; 'Canary' absent."""
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="ARMED_BUY")
ctx.state.fire_count = 3
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
body = status[0].body
lines = body.splitlines()
assert len(lines) == 2 # no fereastră line (oh.enabled=False)
assert "ARMED_BUY" in lines[0]
assert "semnale: 3" in lines[0]
assert "Canary" not in body
assert "canary" not in body
@pytest.mark.asyncio
async def test_status_compact_paused_manual():
"""/status shows 'pauză manuală' on line 1 when user_paused."""
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
ctx.lifecycle.user_paused = True
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
body = status[0].body
assert body.startswith("pauză manuală")
@pytest.mark.asyncio
async def test_status_window_line_when_oh_enabled():
"""/status adds fereastră line when operating_hours enabled."""
import atm.main as _main
from atm.commands import Command
cfg = _oh_cfg()
lifecycle = _main.LifecycleState(last_window_state="open")
ctx = _dispatch_ctx(lifecycle=lifecycle, cfg=cfg)
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
body = status[0].body
assert "fereastră: deschisă" in body
# ---------------------------------------------------------------------------
# Multi-chart split workspace tests
# ---------------------------------------------------------------------------
def test_chart_state_defaults():
"""ChartState's first_accepted defaults to True; last_saved_color to None."""
import atm.main as _main
from atm.config import ROI
roi = ROI(x=0, y=0, w=200, h=35)
chart = _main.ChartState(
chart_id="", sub_roi=roi, detector=MagicMock(), fsm=MagicMock(),
)
assert chart.first_accepted is True
assert chart.last_saved_color is None
assert chart.chart_id == ""
assert chart.sub_roi is roi
def test_alert_prefix_import():
"""_alert_prefix lives in atm.notifier and produces the expected prefixes."""
from atm.notifier import _alert_prefix
assert _alert_prefix("") == ""
assert _alert_prefix("left") == "[stânga] "
assert _alert_prefix("right") == "[dreapta] "
assert _alert_prefix("chart_0") == "[chart 1] "
assert _alert_prefix("chart_2") == "[chart 3] "
def test_save_annotated_frame_no_price_overlay(tmp_path):
"""_save_annotated_frame must NOT draw the price overlay anymore.
Top-right area must contain no white pixels (text was rendered there before).
"""
import atm.main as _main
from atm.config import ROI, YAxisCalib, ColorSpec
frame = np.zeros((200, 400, 3), dtype=np.uint8)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=10, y=10, w=380, h=180),
y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=190, p2_price=50.0),
colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)},
)
fpath = _main._save_annotated_frame(
frame, cfg, tmp_path, "test", now=123.0,
dot_pos_abs=(200, 100), canary_ok=True,
)
assert fpath is not None
saved = cv2.imread(str(fpath))
assert saved is not None
# Top-right corner where the "$..." text used to live (rows 0..40, cols 300..390).
# The cyan ROI rect is on top edge but only 2px thick → vast majority of that
# corner is still pure black. White text would average a much higher value.
corner = saved[0:40, 300:390]
# No bright white pixels (text was 255,255,255).
white_pixels = ((corner[:, :, 0] > 200) & (corner[:, :, 1] > 200) & (corner[:, :, 2] > 200)).sum()
assert white_pixels == 0, f"expected 0 white pixels (no price text), got {white_pixels}"
def test_save_annotated_frame_uses_roi_param(tmp_path):
"""When roi= is passed, the cyan rect is drawn there (not at cfg.dot_roi)."""
import atm.main as _main
from atm.config import ROI, ColorSpec
frame = np.zeros((200, 400, 3), dtype=np.uint8)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=0, y=0, w=10, h=10), # tiny cfg ROI
colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)},
)
custom_roi = ROI(x=100, y=50, w=200, h=80)
fpath = _main._save_annotated_frame(
frame, cfg, tmp_path, "rtest", now=123.0, roi=custom_roi,
)
assert fpath is not None
saved = cv2.imread(str(fpath))
# ROI rect color is (0, 255, 255) BGR. Find any pixel where G+R are saturated and B=0.
edge = saved[49:52, 100:300]
rect_present = ((edge[:, :, 0] < 50) & (edge[:, :, 1] > 200) & (edge[:, :, 2] > 200)).any()
assert rect_present, "Expected ROI rect along custom roi top edge"
# Also check the cfg.dot_roi rect (x=0..10, y=0..10) is NOT drawn — proves we used roi=custom.
cfg_corner = saved[0:10, 0:10]
rect_at_cfg = ((cfg_corner[:, :, 0] < 50) & (cfg_corner[:, :, 1] > 200) & (cfg_corner[:, :, 2] > 200)).any()
assert not rect_at_cfg, "cfg.dot_roi rect must not appear when roi= override is used"
def test_commit_layout_change_resets_fsm(monkeypatch):
"""_commit_layout_change rebuilds ctx.charts, resets FSM, zeroes n_primed_global."""
import atm.main as _main
from atm.config import ROI
from atm.state_machine import StateMachine, State
# Prebuild a ctx with one chart in ARMED_BUY.
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
fsm_old = StateMachine(lockout_s=60)
fsm_old.feed("turquoise", 1.0) # IDLE -> ARMED_BUY
assert fsm_old.state == State.ARMED_BUY
initial_chart = _main.ChartState(
chart_id="", sub_roi=ROI(x=0, y=0, w=400, h=35),
detector=MagicMock(), fsm=fsm_old,
)
state = _main._LoopState()
state.n_primed_global = 5 # any non-zero value
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = True
def stop(self):
type(self).is_running = False
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=fsm_old,
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
ctx.charts = [initial_chart]
new_strips = [
ROI(x=0, y=0, w=200, h=35),
ROI(x=250, y=0, w=200, h=35),
]
_main._commit_layout_change(ctx, new_strips, now=100.0)
assert len(ctx.charts) == 2
assert ctx.charts[0].chart_id == "left"
assert ctx.charts[1].chart_id == "right"
# New FSMs must be IDLE (fresh StateMachine)
assert ctx.charts[0].fsm.state == State.IDLE
assert ctx.charts[1].fsm.state == State.IDLE
# Old fsm not reused
assert ctx.charts[0].fsm is not fsm_old
# n_primed_global reset
assert state.n_primed_global == 0
# layout_change audit + status alert
assert any(e.get("event") == "layout_change" and e["new_n"] == 2 for e in audit_events)
assert any("Layout TS schimbat" in a.title for a in notifier_alerts)
def test_commit_layout_change_chart_ids_for_n3(monkeypatch):
"""For n>=3 charts use 'chart_0', 'chart_1', ..."""
import atm.main as _main
from atm.config import ROI
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
state = _main._LoopState()
class _N:
def send(self, a): pass
class _A:
def log(self, e): pass
class _S:
is_running = False
def stop(self): pass
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=MagicMock(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
ctx.charts = []
strips = [ROI(x=i * 100, y=0, w=80, h=35) for i in range(3)]
_main._commit_layout_change(ctx, strips, now=100.0)
assert [c.chart_id for c in ctx.charts] == ["chart_0", "chart_1", "chart_2"]
def test_strips_match_silent_update_jitter():
"""_strips_match returns True for <=10px jitter — used to trigger silent update."""
from atm.layout import _strips_match
from atm.config import ROI
a = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
b = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)]
assert _strips_match(a, b, tol=10) is True
# Drift 12px breaks the match
c = [ROI(x=12, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
assert _strips_match(a, c, tol=10) is False
def test_build_heartbeat_alert_single_compat():
"""Without charts arg, _build_heartbeat_alert returns the legacy single-chart body."""
import atm.main as _main
a = _main._build_heartbeat_alert(
fsm_state="IDLE", fire_count=0, uptime_h=1.0, canary_paused=False,
)
assert a.kind == "heartbeat"
assert a.title == "activ"
assert a.body == "IDLE | semnale: 0 | 1.0h"
def test_build_heartbeat_alert_multi_chart():
"""charts=[left,right] body lines reference [stânga] / [dreapta]."""
import atm.main as _main
from atm.config import ROI
fsm_left = types.SimpleNamespace(state=types.SimpleNamespace(value="ARMED_BUY"))
fsm_right = types.SimpleNamespace(state=types.SimpleNamespace(value="IDLE"))
cs_left = _main.ChartState(
chart_id="left", sub_roi=ROI(x=0, y=0, w=10, h=10),
detector=MagicMock(), fsm=fsm_left,
)
cs_right = _main.ChartState(
chart_id="right", sub_roi=ROI(x=0, y=0, w=10, h=10),
detector=MagicMock(), fsm=fsm_right,
)
a = _main._build_heartbeat_alert(
fsm_state="ignored", fire_count=2, uptime_h=1.5, canary_paused=False,
charts=[cs_left, cs_right],
)
assert a.kind == "heartbeat"
assert "[stânga]" in a.body
assert "[dreapta]" in a.body
assert "ARMED_BUY" in a.body
assert "IDLE" in a.body
assert "semnale: 2" in a.body
@pytest.mark.asyncio
async def test_run_multi_tick_silent_jitter_update(monkeypatch, tmp_path):
"""When detect_strips returns positions with <=10px jitter, sub_roi updates
silently (no _commit_layout_change → no extra status alert)."""
import atm.main as _main
from atm.config import ROI
monkeypatch.chdir(tmp_path)
initial = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
jittered = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)]
class _Det:
def __init__(self): self.last_roi = None
def step(self, ts, frame=None):
from atm.detector import DetectionResult
return DetectionResult(
ts=ts, window_found=True, dot_found=False,
rgb=None, match=None, accepted=False, color=None,
)
def update_dot_roi(self, roi):
self.last_roi = roi
class _FSM:
state = types.SimpleNamespace(value="IDLE")
_last_fire: dict = {}
cfg = MagicMock()
cfg.lockout_s = 60
cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False)
cfg.window_title = None
state_obj = _main._LoopState()
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def start(self, s): pass
def stop(self): pass
canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False))
ctx = _main.RunContext(
cfg=cfg, capture=lambda: np.zeros((200, 600, 3), dtype=np.uint8),
canary=canary, detector=MagicMock(), fsm=_FSM(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path,
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
det_l, det_r = _Det(), _Det()
ctx.charts = [
_main.ChartState("left", initial[0], det_l, _FSM()),
_main.ChartState("right", initial[1], det_r, _FSM()),
]
# Stub strip detection to return jittered strips.
monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: jittered)
results = await _main._run_multi_tick(ctx)
assert len(results) == 2
# Sub-ROI silently updated, but no layout_change event.
assert ctx.charts[0].sub_roi == jittered[0]
assert ctx.charts[1].sub_roi == jittered[1]
assert det_l.last_roi == jittered[0]
assert det_r.last_roi == jittered[1]
assert not any(e.get("event") == "layout_change" for e in audit_events)
# No status "Layout TS schimbat" alert.
assert not any("Layout TS schimbat" in a.title for a in notifier_alerts)
@pytest.mark.asyncio
async def test_run_multi_tick_silent_width_oscillation(monkeypatch, tmp_path):
"""Regression: width pulsations (e.g. 792↔810px) on a stable chart count
must NOT trigger _commit_layout_change. Reproduces the production bug
visible in logs/2026-05-04.jsonl (576 layout_change events/day).
"""
import atm.main as _main
from atm.config import ROI
monkeypatch.chdir(tmp_path)
initial = [ROI(x=0, y=0, w=792, h=35), ROI(x=912, y=0, w=845, h=35)]
pulsed = [ROI(x=0, y=0, w=810, h=35), ROI(x=912, y=0, w=863, h=35)]
class _Det:
def __init__(self): self.last_roi = None
def step(self, ts, frame=None):
from atm.detector import DetectionResult
return DetectionResult(
ts=ts, window_found=True, dot_found=False,
rgb=None, match=None, accepted=False, color=None,
)
def update_dot_roi(self, roi):
self.last_roi = roi
class _FSM:
state = types.SimpleNamespace(value="PRIMED_BUY")
_last_fire: dict = {}
cfg = MagicMock()
cfg.lockout_s = 60
cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False)
cfg.window_title = None
state_obj = _main._LoopState()
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def start(self, s): pass
def stop(self): pass
canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False))
fsm_left = _FSM()
fsm_right = _FSM()
ctx = _main.RunContext(
cfg=cfg, capture=lambda: np.zeros((200, 1800, 3), dtype=np.uint8),
canary=canary, detector=MagicMock(), fsm=fsm_left,
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path,
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
det_l, det_r = _Det(), _Det()
ctx.charts = [
_main.ChartState("left", initial[0], det_l, fsm_left),
_main.ChartState("right", initial[1], det_r, fsm_right),
]
monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: pulsed)
await _main._run_multi_tick(ctx)
# Sub-ROI silently updated.
assert ctx.charts[0].sub_roi == pulsed[0]
assert ctx.charts[1].sub_roi == pulsed[1]
assert det_l.last_roi == pulsed[0]
assert det_r.last_roi == pulsed[1]
# FSM identity preserved (not rebuilt).
assert ctx.charts[0].fsm is fsm_left
assert ctx.charts[1].fsm is fsm_right
# Critical: no layout_change event, no status alert.
assert not any(e.get("event") == "layout_change" for e in audit_events)
assert not any("Layout TS schimbat" in a.title for a in notifier_alerts)
def test_commit_layout_change_alert_is_silent(monkeypatch):
"""Layout-change alert on real count change uses silent=True so Telegram
doesn't ping the user. Re-anchor info still visible in chat history.
"""
import atm.main as _main
from atm.config import ROI
from atm.state_machine import StateMachine
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
cfg.confidence_min = 0.5
cfg.dot_roi = ROI(x=0, y=0, w=600, h=35)
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def stop(self): pass
state_obj = _main._LoopState()
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=StateMachine(60),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
ctx.charts = [
_main.ChartState("only", ROI(x=0, y=0, w=400, h=35), MagicMock(), StateMachine(60)),
]
new_strips = [
ROI(x=0, y=0, w=200, h=35),
ROI(x=250, y=0, w=200, h=35),
]
_main._commit_layout_change(ctx, new_strips, now=100.0)
layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title]
assert len(layout_alerts) == 1
assert layout_alerts[0].silent is True