feat(run): extract detection loop helpers + unconditional cmd drain

Refactor _detection_loop by moving _run_tick, _handle_fsm_result,
_dispatch_command, and _drain_cmd_queue to module scope, passing
dependencies via a RunContext dataclass. This unblocks direct unit
testing of the drain path.

CRITICAL bug fix: the previous loop issued `continue` when the tick
returned res=None (canary paused or similar), which skipped the
drain block. Commands piled up in cmd_queue while detection was
paused — the hang observed on 2026-04-17 after canary drift-pause.
The refactored loop now runs _drain_cmd_queue UNCONDITIONALLY on
every iteration, after _handle_fsm_result, so pause-state never
starves the command channel.

Tests: test_drain_works_when_canary_paused,
test_drain_works_when_out_of_window,
test_drain_isolates_dispatch_exceptions (exception isolation +
audit/warn wiring).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 11:52:28 +03:00
parent 153196f762
commit c5024ce600
2 changed files with 314 additions and 119 deletions

View File

@@ -531,6 +531,40 @@ class _TickSyncResult:
new_color: str | None = None # corpus sample color when changed
@dataclass
class RunContext:
"""Dependencies passed to module-scope detection-loop helpers.
Keeps `_run_tick`, `_handle_fsm_result`, `_drain_cmd_queue`, and
`_dispatch_command` at module scope so they are directly unit-testable
without reconstructing `run_live_async`.
"""
cfg: Any
capture: Callable
canary: Any
detector: Any
fsm: Any
notifier: _NotifierLike
audit: _AuditLike
detection_log: _AuditLike
scheduler: Any
samples_dir: Path
fires_dir: Path
cmd_queue: Any # asyncio.Queue[Command]
state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start
levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now)
@dataclass
class _LoopState:
"""Per-loop mutable state (previously closure nonlocals)."""
first_accepted: bool = True
last_saved_color: str | None = None
levels_extractor: Any = None
fire_count: int = 0
start: float = 0.0
def _sync_detection_tick(
capture: Callable,
canary: Any,
@@ -622,6 +656,136 @@ def _sync_detection_tick(
)
async def _run_tick(ctx: RunContext) -> _TickSyncResult:
"""Execute one `_sync_detection_tick` in a thread; returns result or empty."""
now = time.time()
return await asyncio.to_thread(
_sync_detection_tick,
ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm,
ctx.notifier, ctx.audit, ctx.detection_log,
ctx.fires_dir, ctx.state.first_accepted, ctx.state.last_saved_color,
now, ctx.samples_dir,
)
async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None:
"""Scheduler start/stop + levels extraction. No-op if res is None/late_start."""
if result.first_consumed:
ctx.state.first_accepted = False
if result.new_color is not None:
ctx.state.last_saved_color = result.new_color
tr = result.tr
res = result.res
if result.late_start or res is None:
return
if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None):
if tr.reason == "prime" and not ctx.scheduler.is_running:
ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr is not None and tr.trigger and not tr.locked:
ctx.state.fire_count += 1
if ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time())
if ctx.state.levels_extractor is not None and result.frame is not None:
lr = ctx.state.levels_extractor.step(result.frame, time.time())
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
ctx.notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
ctx.state.levels_extractor = None
async def _dispatch_command(ctx: RunContext, cmd) -> None:
"""Process a single Command. Exceptions bubble — caller wraps in try/except."""
cfg = ctx.cfg
if cmd.action == "set_interval":
secs = cmd.value or cfg.telegram.auto_poll_interval_s
ctx.scheduler.start(secs)
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
ctx.notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
elif cmd.action == "stop":
if ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
ctx.notifier.send(Alert(kind="status", title="Polling oprit", body=""))
else:
ctx.notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
elif cmd.action == "status":
uptime_s = time.monotonic() - ctx.state.start
last_roll = ctx.detector.rolling[-1] if ctx.detector.rolling else None
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else ""
last_color = (
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else ""
) if last_roll else ""
sched_info = (
f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ"
) if ctx.scheduler.is_running else "oprit"
canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok"
body = (
f"Stare: {ctx.fsm.state.value}\n"
f"Ultima detecție: {last_color} (conf {last_conf})\n"
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n"
f"Poller: {sched_info} | Canary: {canary_info}"
)
ctx.notifier.send(Alert(kind="status", title="ATM Status", body=body))
elif cmd.action == "ss":
now_ss = time.time()
frame_ss = await asyncio.to_thread(ctx.capture)
if frame_ss is None:
ctx.notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
))
return
path_ss = await asyncio.to_thread(
_save_annotated_frame, frame_ss, ctx.cfg, ctx.fires_dir, "ss", now_ss, ctx.audit,
)
ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
async def _drain_cmd_queue(ctx: RunContext) -> None:
"""Drain all pending commands, isolating each dispatch in try/except.
CRITICAL: this MUST run every loop iteration, unconditionally, even when
the detection tick returned nothing (canary paused, out-of-window, etc.).
Prior bug: the main loop `continue`'d past this drain when res=None,
causing commands to accumulate indefinitely while canary was drifted.
"""
while True:
try:
cmd = ctx.cmd_queue.get_nowait()
except asyncio.QueueEmpty:
return
try:
await _dispatch_command(ctx, cmd)
except Exception as exc:
ctx.audit.log({
"ts": time.time(), "event": "command_error",
"action": cmd.action, "error": str(exc),
})
print(f"ERR command_dispatch /{cmd.action}: {exc}", flush=True)
ctx.notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(exc)))
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
"""Sync entry point — delegates to asyncio event loop."""
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
@@ -699,10 +863,8 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
pass
cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
first_accepted = True
last_saved_color: str | None = None
levels_extractor = None
fire_count = 0
loop_state = _LoopState(first_accepted=True, last_saved_color=None,
levels_extractor=None, fire_count=0, start=start)
def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
@@ -715,8 +877,16 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
)
poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
ctx = RunContext(
cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm,
notifier=notifier, audit=audit, detection_log=detection_log,
scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir,
cmd_queue=cmd_queue, state=loop_state,
levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts),
)
# ------------------------------------------------------------------
# Nested async coroutines — capture nonlocal state from run_live_async
# Nested async coroutines — heartbeat captures notifier + heartbeat_due
# ------------------------------------------------------------------
async def _heartbeat_loop() -> None:
@@ -738,124 +908,13 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
async def _dispatch_command(cmd: Command) -> None:
nonlocal fire_count
if cmd.action == "set_interval":
secs = cmd.value or cfg.telegram.auto_poll_interval_s
scheduler.start(secs)
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
elif cmd.action == "stop":
if scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
notifier.send(Alert(kind="status", title="Polling oprit", body=""))
else:
notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
elif cmd.action == "status":
uptime_s = time.monotonic() - start
last_roll = detector.rolling[-1] if detector.rolling else None
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else ""
last_color = (
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else ""
) if last_roll else ""
sched_info = (
f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ"
) if scheduler.is_running else "oprit"
canary_info = "drift (pauze)" if canary.is_paused else "ok"
body = (
f"Stare: {fsm.state.value}\n"
f"Ultima detecție: {last_color} (conf {last_conf})\n"
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n"
f"Poller: {sched_info} | Canary: {canary_info}"
)
notifier.send(Alert(kind="status", title="ATM Status", body=body))
elif cmd.action == "ss":
now_ss = time.time()
frame_ss = await asyncio.to_thread(capture)
if frame_ss is None:
notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
))
return
path_ss = await asyncio.to_thread(
_save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit,
)
audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
async def _detection_loop() -> None:
nonlocal first_accepted, last_saved_color, levels_extractor, fire_count
while True:
if duration_s is not None and (time.monotonic() - start) >= duration_s:
break
now = time.time()
result: _TickSyncResult = await asyncio.to_thread(
_sync_detection_tick,
capture, canary, cfg, detector, fsm, notifier, audit, detection_log,
fires_dir, first_accepted, last_saved_color, now, samples_dir,
)
if result.first_consumed:
first_accepted = False
if result.new_color is not None:
last_saved_color = result.new_color
tr = result.tr
res = result.res
if result.late_start or res is None:
await asyncio.sleep(cfg.loop_interval_s)
continue
if tr is not None and res.accepted and res.color:
if tr.reason == "prime" and not scheduler.is_running:
scheduler.start(cfg.telegram.auto_poll_interval_s)
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr is not None and tr.trigger and not tr.locked:
fire_count += 1
if scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
if levels_extractor is not None and result.frame is not None:
lr = levels_extractor.step(result.frame, now)
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
levels_extractor = None
while True:
try:
cmd = cmd_queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
await _dispatch_command(cmd)
except Exception as _cmd_exc:
_msg = f"/{cmd.action}: {_cmd_exc}"
audit.log({"ts": time.time(), "event": "command_error", "action": cmd.action, "error": str(_cmd_exc)})
print(f"ERR command_dispatch {_msg}", flush=True)
notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(_cmd_exc)))
result = await _run_tick(ctx)
await _handle_fsm_result(ctx, result)
await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang
await asyncio.sleep(cfg.loop_interval_s)
# Launch background tasks

View File

@@ -401,3 +401,139 @@ async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_p
start_idx = scheduler_events.index("start:180")
stop_idx = scheduler_events.index("stop")
assert start_idx < stop_idx, "scheduler started after it stopped"
# ---------------------------------------------------------------------------
# Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally,
# even when canary is paused or when detection is otherwise skipped.
# Prior bug: `continue` past the drain loop caused commands to pile up.
# ---------------------------------------------------------------------------
def _make_ctx_for_drain(cmd_queue, dispatched: list):
"""Build a minimal RunContext where _dispatch_command just records calls."""
import atm.main as _main
class _FakeAudit:
def __init__(self): self.events = []
def log(self, e): self.events.append(e)
class _FakeNotifier:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _FakeCanary:
def __init__(self, paused=True):
self.is_paused = paused
class _FakeScheduler:
is_running = False
interval_s = None
def start(self, s): pass
def stop(self): pass
state = _main._LoopState(start=0.0)
ctx = _main.RunContext(
cfg=MagicMock(),
capture=lambda: None,
canary=_FakeCanary(paused=True),
detector=MagicMock(),
fsm=MagicMock(),
notifier=_FakeNotifier(),
audit=_FakeAudit(),
detection_log=_FakeAudit(),
scheduler=_FakeScheduler(),
samples_dir=Path("."),
fires_dir=Path("."),
cmd_queue=cmd_queue,
state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
return ctx
@pytest.mark.asyncio
async def test_drain_works_when_canary_paused(monkeypatch):
"""Regression: when canary.is_paused, _drain_cmd_queue still dispatches.
Prior bug: detection loop `continue`'d past the drain block whenever the
tick returned res=None (canary paused). Commands accumulated forever.
"""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="status"))
await q.put(Command(action="ss"))
dispatched: list = []
async def _fake_dispatch(ctx, cmd):
dispatched.append(cmd.action)
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, dispatched)
await _main._drain_cmd_queue(ctx)
assert dispatched == ["status", "ss"]
assert q.empty()
@pytest.mark.asyncio
async def test_drain_works_when_out_of_window(monkeypatch):
"""Drain must still fire when the tick skipped (e.g. out of operating hours).
The refactored loop runs _drain_cmd_queue unconditionally after every tick,
regardless of `_TickSyncResult` content.
"""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="stop"))
dispatched: list = []
async def _fake_dispatch(ctx, cmd):
dispatched.append(cmd.action)
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, dispatched)
# Simulate out-of-window tick (empty _TickSyncResult, no res)
await _main._handle_fsm_result(ctx, _main._TickSyncResult())
await _main._drain_cmd_queue(ctx)
assert dispatched == ["stop"]
@pytest.mark.asyncio
async def test_drain_isolates_dispatch_exceptions(monkeypatch):
"""If one command raises, remaining commands still drain + warn alert sent."""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="status"))
await q.put(Command(action="ss"))
attempts: list = []
async def _fake_dispatch(ctx, cmd):
attempts.append(cmd.action)
if cmd.action == "status":
raise RuntimeError("boom")
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, attempts)
await _main._drain_cmd_queue(ctx)
assert attempts == ["status", "ss"]
# warn alert for the failed command
warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"]
assert any("status" in t for t in warn_titles)
# command_error audit event
errs = [e for e in ctx.audit.events if e.get("event") == "command_error"]
assert len(errs) == 1 and errs[0]["action"] == "status"