feat(run): extract detection loop helpers + unconditional cmd drain
Refactor _detection_loop by moving _run_tick, _handle_fsm_result, _dispatch_command, and _drain_cmd_queue to module scope, passing dependencies via a RunContext dataclass. This unblocks direct unit testing of the drain path. CRITICAL bug fix: the previous loop issued `continue` when the tick returned res=None (canary paused or similar), which skipped the drain block. Commands piled up in cmd_queue while detection was paused — the hang observed on 2026-04-17 after canary drift-pause. The refactored loop now runs _drain_cmd_queue UNCONDITIONALLY on every iteration, after _handle_fsm_result, so pause-state never starves the command channel. Tests: test_drain_works_when_canary_paused, test_drain_works_when_out_of_window, test_drain_isolates_dispatch_exceptions (exception isolation + audit/warn wiring). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
297
src/atm/main.py
297
src/atm/main.py
@@ -531,6 +531,40 @@ class _TickSyncResult:
|
||||
new_color: str | None = None # corpus sample color when changed
|
||||
|
||||
|
||||
@dataclass
|
||||
class RunContext:
|
||||
"""Dependencies passed to module-scope detection-loop helpers.
|
||||
|
||||
Keeps `_run_tick`, `_handle_fsm_result`, `_drain_cmd_queue`, and
|
||||
`_dispatch_command` at module scope so they are directly unit-testable
|
||||
without reconstructing `run_live_async`.
|
||||
"""
|
||||
cfg: Any
|
||||
capture: Callable
|
||||
canary: Any
|
||||
detector: Any
|
||||
fsm: Any
|
||||
notifier: _NotifierLike
|
||||
audit: _AuditLike
|
||||
detection_log: _AuditLike
|
||||
scheduler: Any
|
||||
samples_dir: Path
|
||||
fires_dir: Path
|
||||
cmd_queue: Any # asyncio.Queue[Command]
|
||||
state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start
|
||||
levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _LoopState:
|
||||
"""Per-loop mutable state (previously closure nonlocals)."""
|
||||
first_accepted: bool = True
|
||||
last_saved_color: str | None = None
|
||||
levels_extractor: Any = None
|
||||
fire_count: int = 0
|
||||
start: float = 0.0
|
||||
|
||||
|
||||
def _sync_detection_tick(
|
||||
capture: Callable,
|
||||
canary: Any,
|
||||
@@ -622,6 +656,136 @@ def _sync_detection_tick(
|
||||
)
|
||||
|
||||
|
||||
async def _run_tick(ctx: RunContext) -> _TickSyncResult:
|
||||
"""Execute one `_sync_detection_tick` in a thread; returns result or empty."""
|
||||
now = time.time()
|
||||
return await asyncio.to_thread(
|
||||
_sync_detection_tick,
|
||||
ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm,
|
||||
ctx.notifier, ctx.audit, ctx.detection_log,
|
||||
ctx.fires_dir, ctx.state.first_accepted, ctx.state.last_saved_color,
|
||||
now, ctx.samples_dir,
|
||||
)
|
||||
|
||||
|
||||
async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None:
|
||||
"""Scheduler start/stop + levels extraction. No-op if res is None/late_start."""
|
||||
if result.first_consumed:
|
||||
ctx.state.first_accepted = False
|
||||
if result.new_color is not None:
|
||||
ctx.state.last_saved_color = result.new_color
|
||||
|
||||
tr = result.tr
|
||||
res = result.res
|
||||
|
||||
if result.late_start or res is None:
|
||||
return
|
||||
|
||||
if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None):
|
||||
if tr.reason == "prime" and not ctx.scheduler.is_running:
|
||||
ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
|
||||
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running:
|
||||
ctx.scheduler.stop()
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
|
||||
|
||||
if tr is not None and tr.trigger and not tr.locked:
|
||||
ctx.state.fire_count += 1
|
||||
if ctx.scheduler.is_running:
|
||||
ctx.scheduler.stop()
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
|
||||
ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time())
|
||||
|
||||
if ctx.state.levels_extractor is not None and result.frame is not None:
|
||||
lr = ctx.state.levels_extractor.step(result.frame, time.time())
|
||||
if lr.status in ("complete", "timeout"):
|
||||
if lr.status == "complete" and lr.levels:
|
||||
ctx.notifier.send(Alert(
|
||||
kind="levels",
|
||||
title="Niveluri",
|
||||
body=(
|
||||
f"SL={lr.levels.sl} "
|
||||
f"TP1={lr.levels.tp1} "
|
||||
f"TP2={lr.levels.tp2}"
|
||||
),
|
||||
))
|
||||
ctx.state.levels_extractor = None
|
||||
|
||||
|
||||
async def _dispatch_command(ctx: RunContext, cmd) -> None:
|
||||
"""Process a single Command. Exceptions bubble — caller wraps in try/except."""
|
||||
cfg = ctx.cfg
|
||||
if cmd.action == "set_interval":
|
||||
secs = cmd.value or cfg.telegram.auto_poll_interval_s
|
||||
ctx.scheduler.start(secs)
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
|
||||
ctx.notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
|
||||
elif cmd.action == "stop":
|
||||
if ctx.scheduler.is_running:
|
||||
ctx.scheduler.stop()
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
|
||||
ctx.notifier.send(Alert(kind="status", title="Polling oprit", body=""))
|
||||
else:
|
||||
ctx.notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
|
||||
elif cmd.action == "status":
|
||||
uptime_s = time.monotonic() - ctx.state.start
|
||||
last_roll = ctx.detector.rolling[-1] if ctx.detector.rolling else None
|
||||
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—"
|
||||
last_color = (
|
||||
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—"
|
||||
) if last_roll else "—"
|
||||
sched_info = (
|
||||
f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ"
|
||||
) if ctx.scheduler.is_running else "oprit"
|
||||
canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok"
|
||||
body = (
|
||||
f"Stare: {ctx.fsm.state.value}\n"
|
||||
f"Ultima detecție: {last_color} (conf {last_conf})\n"
|
||||
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n"
|
||||
f"Poller: {sched_info} | Canary: {canary_info}"
|
||||
)
|
||||
ctx.notifier.send(Alert(kind="status", title="ATM Status", body=body))
|
||||
elif cmd.action == "ss":
|
||||
now_ss = time.time()
|
||||
frame_ss = await asyncio.to_thread(ctx.capture)
|
||||
if frame_ss is None:
|
||||
ctx.notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
))
|
||||
return
|
||||
path_ss = await asyncio.to_thread(
|
||||
_save_annotated_frame, frame_ss, ctx.cfg, ctx.fires_dir, "ss", now_ss, ctx.audit,
|
||||
)
|
||||
ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
|
||||
ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
|
||||
|
||||
|
||||
async def _drain_cmd_queue(ctx: RunContext) -> None:
|
||||
"""Drain all pending commands, isolating each dispatch in try/except.
|
||||
|
||||
CRITICAL: this MUST run every loop iteration, unconditionally, even when
|
||||
the detection tick returned nothing (canary paused, out-of-window, etc.).
|
||||
Prior bug: the main loop `continue`'d past this drain when res=None,
|
||||
causing commands to accumulate indefinitely while canary was drifted.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
cmd = ctx.cmd_queue.get_nowait()
|
||||
except asyncio.QueueEmpty:
|
||||
return
|
||||
try:
|
||||
await _dispatch_command(ctx, cmd)
|
||||
except Exception as exc:
|
||||
ctx.audit.log({
|
||||
"ts": time.time(), "event": "command_error",
|
||||
"action": cmd.action, "error": str(exc),
|
||||
})
|
||||
print(f"ERR command_dispatch /{cmd.action}: {exc}", flush=True)
|
||||
ctx.notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(exc)))
|
||||
|
||||
|
||||
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
"""Sync entry point — delegates to asyncio event loop."""
|
||||
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
|
||||
@@ -699,10 +863,8 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
|
||||
pass
|
||||
|
||||
cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
|
||||
first_accepted = True
|
||||
last_saved_color: str | None = None
|
||||
levels_extractor = None
|
||||
fire_count = 0
|
||||
loop_state = _LoopState(first_accepted=True, last_saved_color=None,
|
||||
levels_extractor=None, fire_count=0, start=start)
|
||||
|
||||
def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
|
||||
return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
|
||||
@@ -715,8 +877,16 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
|
||||
)
|
||||
poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
|
||||
|
||||
ctx = RunContext(
|
||||
cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm,
|
||||
notifier=notifier, audit=audit, detection_log=detection_log,
|
||||
scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir,
|
||||
cmd_queue=cmd_queue, state=loop_state,
|
||||
levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts),
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Nested async coroutines — capture nonlocal state from run_live_async
|
||||
# Nested async coroutines — heartbeat captures notifier + heartbeat_due
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _heartbeat_loop() -> None:
|
||||
@@ -738,124 +908,13 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
|
||||
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
|
||||
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
|
||||
|
||||
async def _dispatch_command(cmd: Command) -> None:
|
||||
nonlocal fire_count
|
||||
if cmd.action == "set_interval":
|
||||
secs = cmd.value or cfg.telegram.auto_poll_interval_s
|
||||
scheduler.start(secs)
|
||||
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
|
||||
notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
|
||||
elif cmd.action == "stop":
|
||||
if scheduler.is_running:
|
||||
scheduler.stop()
|
||||
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
|
||||
notifier.send(Alert(kind="status", title="Polling oprit", body=""))
|
||||
else:
|
||||
notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
|
||||
elif cmd.action == "status":
|
||||
uptime_s = time.monotonic() - start
|
||||
last_roll = detector.rolling[-1] if detector.rolling else None
|
||||
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—"
|
||||
last_color = (
|
||||
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—"
|
||||
) if last_roll else "—"
|
||||
sched_info = (
|
||||
f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ"
|
||||
) if scheduler.is_running else "oprit"
|
||||
canary_info = "drift (pauze)" if canary.is_paused else "ok"
|
||||
body = (
|
||||
f"Stare: {fsm.state.value}\n"
|
||||
f"Ultima detecție: {last_color} (conf {last_conf})\n"
|
||||
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n"
|
||||
f"Poller: {sched_info} | Canary: {canary_info}"
|
||||
)
|
||||
notifier.send(Alert(kind="status", title="ATM Status", body=body))
|
||||
elif cmd.action == "ss":
|
||||
now_ss = time.time()
|
||||
frame_ss = await asyncio.to_thread(capture)
|
||||
if frame_ss is None:
|
||||
notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
))
|
||||
return
|
||||
path_ss = await asyncio.to_thread(
|
||||
_save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit,
|
||||
)
|
||||
audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
|
||||
notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
|
||||
|
||||
async def _detection_loop() -> None:
|
||||
nonlocal first_accepted, last_saved_color, levels_extractor, fire_count
|
||||
|
||||
while True:
|
||||
if duration_s is not None and (time.monotonic() - start) >= duration_s:
|
||||
break
|
||||
|
||||
now = time.time()
|
||||
|
||||
result: _TickSyncResult = await asyncio.to_thread(
|
||||
_sync_detection_tick,
|
||||
capture, canary, cfg, detector, fsm, notifier, audit, detection_log,
|
||||
fires_dir, first_accepted, last_saved_color, now, samples_dir,
|
||||
)
|
||||
|
||||
if result.first_consumed:
|
||||
first_accepted = False
|
||||
if result.new_color is not None:
|
||||
last_saved_color = result.new_color
|
||||
|
||||
tr = result.tr
|
||||
res = result.res
|
||||
|
||||
if result.late_start or res is None:
|
||||
await asyncio.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
|
||||
if tr is not None and res.accepted and res.color:
|
||||
if tr.reason == "prime" and not scheduler.is_running:
|
||||
scheduler.start(cfg.telegram.auto_poll_interval_s)
|
||||
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
|
||||
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running:
|
||||
scheduler.stop()
|
||||
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
|
||||
|
||||
if tr is not None and tr.trigger and not tr.locked:
|
||||
fire_count += 1
|
||||
if scheduler.is_running:
|
||||
scheduler.stop()
|
||||
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
|
||||
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
|
||||
|
||||
if levels_extractor is not None and result.frame is not None:
|
||||
lr = levels_extractor.step(result.frame, now)
|
||||
if lr.status in ("complete", "timeout"):
|
||||
if lr.status == "complete" and lr.levels:
|
||||
notifier.send(Alert(
|
||||
kind="levels",
|
||||
title="Niveluri",
|
||||
body=(
|
||||
f"SL={lr.levels.sl} "
|
||||
f"TP1={lr.levels.tp1} "
|
||||
f"TP2={lr.levels.tp2}"
|
||||
),
|
||||
))
|
||||
levels_extractor = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
cmd = cmd_queue.get_nowait()
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
try:
|
||||
await _dispatch_command(cmd)
|
||||
except Exception as _cmd_exc:
|
||||
_msg = f"/{cmd.action}: {_cmd_exc}"
|
||||
audit.log({"ts": time.time(), "event": "command_error", "action": cmd.action, "error": str(_cmd_exc)})
|
||||
print(f"ERR command_dispatch {_msg}", flush=True)
|
||||
notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(_cmd_exc)))
|
||||
|
||||
result = await _run_tick(ctx)
|
||||
await _handle_fsm_result(ctx, result)
|
||||
await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang
|
||||
await asyncio.sleep(cfg.loop_interval_s)
|
||||
|
||||
# Launch background tasks
|
||||
|
||||
@@ -401,3 +401,139 @@ async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_p
|
||||
start_idx = scheduler_events.index("start:180")
|
||||
stop_idx = scheduler_events.index("stop")
|
||||
assert start_idx < stop_idx, "scheduler started after it stopped"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally,
|
||||
# even when canary is paused or when detection is otherwise skipped.
|
||||
# Prior bug: `continue` past the drain loop caused commands to pile up.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_ctx_for_drain(cmd_queue, dispatched: list):
|
||||
"""Build a minimal RunContext where _dispatch_command just records calls."""
|
||||
import atm.main as _main
|
||||
|
||||
class _FakeAudit:
|
||||
def __init__(self): self.events = []
|
||||
def log(self, e): self.events.append(e)
|
||||
|
||||
class _FakeNotifier:
|
||||
def __init__(self): self.alerts = []
|
||||
def send(self, a): self.alerts.append(a)
|
||||
|
||||
class _FakeCanary:
|
||||
def __init__(self, paused=True):
|
||||
self.is_paused = paused
|
||||
|
||||
class _FakeScheduler:
|
||||
is_running = False
|
||||
interval_s = None
|
||||
def start(self, s): pass
|
||||
def stop(self): pass
|
||||
|
||||
state = _main._LoopState(start=0.0)
|
||||
ctx = _main.RunContext(
|
||||
cfg=MagicMock(),
|
||||
capture=lambda: None,
|
||||
canary=_FakeCanary(paused=True),
|
||||
detector=MagicMock(),
|
||||
fsm=MagicMock(),
|
||||
notifier=_FakeNotifier(),
|
||||
audit=_FakeAudit(),
|
||||
detection_log=_FakeAudit(),
|
||||
scheduler=_FakeScheduler(),
|
||||
samples_dir=Path("."),
|
||||
fires_dir=Path("."),
|
||||
cmd_queue=cmd_queue,
|
||||
state=state,
|
||||
levels_extractor_factory=lambda *a, **kw: None,
|
||||
)
|
||||
return ctx
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_works_when_canary_paused(monkeypatch):
|
||||
"""Regression: when canary.is_paused, _drain_cmd_queue still dispatches.
|
||||
|
||||
Prior bug: detection loop `continue`'d past the drain block whenever the
|
||||
tick returned res=None (canary paused). Commands accumulated forever.
|
||||
"""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
await q.put(Command(action="status"))
|
||||
await q.put(Command(action="ss"))
|
||||
|
||||
dispatched: list = []
|
||||
|
||||
async def _fake_dispatch(ctx, cmd):
|
||||
dispatched.append(cmd.action)
|
||||
|
||||
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
|
||||
|
||||
ctx = _make_ctx_for_drain(q, dispatched)
|
||||
|
||||
await _main._drain_cmd_queue(ctx)
|
||||
|
||||
assert dispatched == ["status", "ss"]
|
||||
assert q.empty()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_works_when_out_of_window(monkeypatch):
|
||||
"""Drain must still fire when the tick skipped (e.g. out of operating hours).
|
||||
|
||||
The refactored loop runs _drain_cmd_queue unconditionally after every tick,
|
||||
regardless of `_TickSyncResult` content.
|
||||
"""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
await q.put(Command(action="stop"))
|
||||
|
||||
dispatched: list = []
|
||||
|
||||
async def _fake_dispatch(ctx, cmd):
|
||||
dispatched.append(cmd.action)
|
||||
|
||||
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
|
||||
|
||||
ctx = _make_ctx_for_drain(q, dispatched)
|
||||
# Simulate out-of-window tick (empty _TickSyncResult, no res)
|
||||
await _main._handle_fsm_result(ctx, _main._TickSyncResult())
|
||||
await _main._drain_cmd_queue(ctx)
|
||||
|
||||
assert dispatched == ["stop"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_isolates_dispatch_exceptions(monkeypatch):
|
||||
"""If one command raises, remaining commands still drain + warn alert sent."""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
await q.put(Command(action="status"))
|
||||
await q.put(Command(action="ss"))
|
||||
|
||||
attempts: list = []
|
||||
|
||||
async def _fake_dispatch(ctx, cmd):
|
||||
attempts.append(cmd.action)
|
||||
if cmd.action == "status":
|
||||
raise RuntimeError("boom")
|
||||
|
||||
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
|
||||
|
||||
ctx = _make_ctx_for_drain(q, attempts)
|
||||
await _main._drain_cmd_queue(ctx)
|
||||
|
||||
assert attempts == ["status", "ss"]
|
||||
# warn alert for the failed command
|
||||
warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"]
|
||||
assert any("status" in t for t in warn_titles)
|
||||
# command_error audit event
|
||||
errs = [e for e in ctx.audit.events if e.get("event") == "command_error"]
|
||||
assert len(errs) == 1 and errs[0]["action"] == "status"
|
||||
|
||||
Reference in New Issue
Block a user