From c5024ce600325cd8bfdcd1b7c1ca1f33006a9808 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Sat, 18 Apr 2026 11:52:28 +0300 Subject: [PATCH] feat(run): extract detection loop helpers + unconditional cmd drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor _detection_loop by moving _run_tick, _handle_fsm_result, _dispatch_command, and _drain_cmd_queue to module scope, passing dependencies via a RunContext dataclass. This unblocks direct unit testing of the drain path. CRITICAL bug fix: the previous loop issued `continue` when the tick returned res=None (canary paused or similar), which skipped the drain block. Commands piled up in cmd_queue while detection was paused — the hang observed on 2026-04-17 after canary drift-pause. The refactored loop now runs _drain_cmd_queue UNCONDITIONALLY on every iteration, after _handle_fsm_result, so pause-state never starves the command channel. Tests: test_drain_works_when_canary_paused, test_drain_works_when_out_of_window, test_drain_isolates_dispatch_exceptions (exception isolation + audit/warn wiring). Co-Authored-By: Claude Sonnet 4.6 --- src/atm/main.py | 297 +++++++++++++++++++++++++++------------------ tests/test_main.py | 136 +++++++++++++++++++++ 2 files changed, 314 insertions(+), 119 deletions(-) diff --git a/src/atm/main.py b/src/atm/main.py index 965fdd4..d70312a 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -531,6 +531,40 @@ class _TickSyncResult: new_color: str | None = None # corpus sample color when changed +@dataclass +class RunContext: + """Dependencies passed to module-scope detection-loop helpers. + + Keeps `_run_tick`, `_handle_fsm_result`, `_drain_cmd_queue`, and + `_dispatch_command` at module scope so they are directly unit-testable + without reconstructing `run_live_async`. + """ + cfg: Any + capture: Callable + canary: Any + detector: Any + fsm: Any + notifier: _NotifierLike + audit: _AuditLike + detection_log: _AuditLike + scheduler: Any + samples_dir: Path + fires_dir: Path + cmd_queue: Any # asyncio.Queue[Command] + state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start + levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now) + + +@dataclass +class _LoopState: + """Per-loop mutable state (previously closure nonlocals).""" + first_accepted: bool = True + last_saved_color: str | None = None + levels_extractor: Any = None + fire_count: int = 0 + start: float = 0.0 + + def _sync_detection_tick( capture: Callable, canary: Any, @@ -622,6 +656,136 @@ def _sync_detection_tick( ) +async def _run_tick(ctx: RunContext) -> _TickSyncResult: + """Execute one `_sync_detection_tick` in a thread; returns result or empty.""" + now = time.time() + return await asyncio.to_thread( + _sync_detection_tick, + ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm, + ctx.notifier, ctx.audit, ctx.detection_log, + ctx.fires_dir, ctx.state.first_accepted, ctx.state.last_saved_color, + now, ctx.samples_dir, + ) + + +async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None: + """Scheduler start/stop + levels extraction. No-op if res is None/late_start.""" + if result.first_consumed: + ctx.state.first_accepted = False + if result.new_color is not None: + ctx.state.last_saved_color = result.new_color + + tr = result.tr + res = result.res + + if result.late_start or res is None: + return + + if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None): + if tr.reason == "prime" and not ctx.scheduler.is_running: + ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s) + ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"}) + elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running: + ctx.scheduler.stop() + ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason}) + + if tr is not None and tr.trigger and not tr.locked: + ctx.state.fire_count += 1 + if ctx.scheduler.is_running: + ctx.scheduler.stop() + ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"}) + ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time()) + + if ctx.state.levels_extractor is not None and result.frame is not None: + lr = ctx.state.levels_extractor.step(result.frame, time.time()) + if lr.status in ("complete", "timeout"): + if lr.status == "complete" and lr.levels: + ctx.notifier.send(Alert( + kind="levels", + title="Niveluri", + body=( + f"SL={lr.levels.sl} " + f"TP1={lr.levels.tp1} " + f"TP2={lr.levels.tp2}" + ), + )) + ctx.state.levels_extractor = None + + +async def _dispatch_command(ctx: RunContext, cmd) -> None: + """Process a single Command. Exceptions bubble — caller wraps in try/except.""" + cfg = ctx.cfg + if cmd.action == "set_interval": + secs = cmd.value or cfg.telegram.auto_poll_interval_s + ctx.scheduler.start(secs) + ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs}) + ctx.notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body="")) + elif cmd.action == "stop": + if ctx.scheduler.is_running: + ctx.scheduler.stop() + ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"}) + ctx.notifier.send(Alert(kind="status", title="Polling oprit", body="")) + else: + ctx.notifier.send(Alert(kind="status", title="Polling nu este activ", body="")) + elif cmd.action == "status": + uptime_s = time.monotonic() - ctx.state.start + last_roll = ctx.detector.rolling[-1] if ctx.detector.rolling else None + last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—" + last_color = ( + (last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—" + ) if last_roll else "—" + sched_info = ( + f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ" + ) if ctx.scheduler.is_running else "oprit" + canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok" + body = ( + f"Stare: {ctx.fsm.state.value}\n" + f"Ultima detecție: {last_color} (conf {last_conf})\n" + f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n" + f"Poller: {sched_info} | Canary: {canary_info}" + ) + ctx.notifier.send(Alert(kind="status", title="ATM Status", body=body)) + elif cmd.action == "ss": + now_ss = time.time() + frame_ss = await asyncio.to_thread(ctx.capture) + if frame_ss is None: + ctx.notifier.send(Alert( + kind="warn", + title="Captură eșuată — verificați fereastra TradeStation", + body="", + )) + return + path_ss = await asyncio.to_thread( + _save_annotated_frame, frame_ss, ctx.cfg, ctx.fires_dir, "ss", now_ss, ctx.audit, + ) + ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None}) + ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss)) + + +async def _drain_cmd_queue(ctx: RunContext) -> None: + """Drain all pending commands, isolating each dispatch in try/except. + + CRITICAL: this MUST run every loop iteration, unconditionally, even when + the detection tick returned nothing (canary paused, out-of-window, etc.). + Prior bug: the main loop `continue`'d past this drain when res=None, + causing commands to accumulate indefinitely while canary was drifted. + """ + while True: + try: + cmd = ctx.cmd_queue.get_nowait() + except asyncio.QueueEmpty: + return + try: + await _dispatch_command(ctx, cmd) + except Exception as exc: + ctx.audit.log({ + "ts": time.time(), "event": "command_error", + "action": cmd.action, "error": str(exc), + }) + print(f"ERR command_dispatch /{cmd.action}: {exc}", flush=True) + ctx.notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(exc))) + + def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: """Sync entry point — delegates to asyncio event loop.""" asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub)) @@ -699,10 +863,8 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No pass cmd_queue: asyncio.Queue[Command] = asyncio.Queue() - first_accepted = True - last_saved_color: str | None = None - levels_extractor = None - fire_count = 0 + loop_state = _LoopState(first_accepted=True, last_saved_color=None, + levels_extractor=None, fire_count=0, start=start) def _bound_save(frame: Any, label: str, now: float) -> "Path | None": return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit) @@ -715,8 +877,16 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No ) poller = TelegramPoller(cfg.telegram, cmd_queue, audit) + ctx = RunContext( + cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm, + notifier=notifier, audit=audit, detection_log=detection_log, + scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir, + cmd_queue=cmd_queue, state=loop_state, + levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts), + ) + # ------------------------------------------------------------------ - # Nested async coroutines — capture nonlocal state from run_live_async + # Nested async coroutines — heartbeat captures notifier + heartbeat_due # ------------------------------------------------------------------ async def _heartbeat_loop() -> None: @@ -738,124 +908,13 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok")) heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 - async def _dispatch_command(cmd: Command) -> None: - nonlocal fire_count - if cmd.action == "set_interval": - secs = cmd.value or cfg.telegram.auto_poll_interval_s - scheduler.start(secs) - audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs}) - notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body="")) - elif cmd.action == "stop": - if scheduler.is_running: - scheduler.stop() - audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"}) - notifier.send(Alert(kind="status", title="Polling oprit", body="")) - else: - notifier.send(Alert(kind="status", title="Polling nu este activ", body="")) - elif cmd.action == "status": - uptime_s = time.monotonic() - start - last_roll = detector.rolling[-1] if detector.rolling else None - last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—" - last_color = ( - (last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—" - ) if last_roll else "—" - sched_info = ( - f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ" - ) if scheduler.is_running else "oprit" - canary_info = "drift (pauze)" if canary.is_paused else "ok" - body = ( - f"Stare: {fsm.state.value}\n" - f"Ultima detecție: {last_color} (conf {last_conf})\n" - f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n" - f"Poller: {sched_info} | Canary: {canary_info}" - ) - notifier.send(Alert(kind="status", title="ATM Status", body=body)) - elif cmd.action == "ss": - now_ss = time.time() - frame_ss = await asyncio.to_thread(capture) - if frame_ss is None: - notifier.send(Alert( - kind="warn", - title="Captură eșuată — verificați fereastra TradeStation", - body="", - )) - return - path_ss = await asyncio.to_thread( - _save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit, - ) - audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None}) - notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss)) - async def _detection_loop() -> None: - nonlocal first_accepted, last_saved_color, levels_extractor, fire_count - while True: if duration_s is not None and (time.monotonic() - start) >= duration_s: break - - now = time.time() - - result: _TickSyncResult = await asyncio.to_thread( - _sync_detection_tick, - capture, canary, cfg, detector, fsm, notifier, audit, detection_log, - fires_dir, first_accepted, last_saved_color, now, samples_dir, - ) - - if result.first_consumed: - first_accepted = False - if result.new_color is not None: - last_saved_color = result.new_color - - tr = result.tr - res = result.res - - if result.late_start or res is None: - await asyncio.sleep(cfg.loop_interval_s) - continue - - if tr is not None and res.accepted and res.color: - if tr.reason == "prime" and not scheduler.is_running: - scheduler.start(cfg.telegram.auto_poll_interval_s) - audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"}) - elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running: - scheduler.stop() - audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason}) - - if tr is not None and tr.trigger and not tr.locked: - fire_count += 1 - if scheduler.is_running: - scheduler.stop() - audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"}) - levels_extractor = LevelsExtractor(cfg, tr.trigger, now) - - if levels_extractor is not None and result.frame is not None: - lr = levels_extractor.step(result.frame, now) - if lr.status in ("complete", "timeout"): - if lr.status == "complete" and lr.levels: - notifier.send(Alert( - kind="levels", - title="Niveluri", - body=( - f"SL={lr.levels.sl} " - f"TP1={lr.levels.tp1} " - f"TP2={lr.levels.tp2}" - ), - )) - levels_extractor = None - - while True: - try: - cmd = cmd_queue.get_nowait() - except asyncio.QueueEmpty: - break - try: - await _dispatch_command(cmd) - except Exception as _cmd_exc: - _msg = f"/{cmd.action}: {_cmd_exc}" - audit.log({"ts": time.time(), "event": "command_error", "action": cmd.action, "error": str(_cmd_exc)}) - print(f"ERR command_dispatch {_msg}", flush=True) - notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(_cmd_exc))) - + result = await _run_tick(ctx) + await _handle_fsm_result(ctx, result) + await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang await asyncio.sleep(cfg.loop_interval_s) # Launch background tasks diff --git a/tests/test_main.py b/tests/test_main.py index 4626726..4c98334 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -401,3 +401,139 @@ async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_p start_idx = scheduler_events.index("start:180") stop_idx = scheduler_events.index("stop") assert start_idx < stop_idx, "scheduler started after it stopped" + + +# --------------------------------------------------------------------------- +# Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally, +# even when canary is paused or when detection is otherwise skipped. +# Prior bug: `continue` past the drain loop caused commands to pile up. +# --------------------------------------------------------------------------- + +def _make_ctx_for_drain(cmd_queue, dispatched: list): + """Build a minimal RunContext where _dispatch_command just records calls.""" + import atm.main as _main + + class _FakeAudit: + def __init__(self): self.events = [] + def log(self, e): self.events.append(e) + + class _FakeNotifier: + def __init__(self): self.alerts = [] + def send(self, a): self.alerts.append(a) + + class _FakeCanary: + def __init__(self, paused=True): + self.is_paused = paused + + class _FakeScheduler: + is_running = False + interval_s = None + def start(self, s): pass + def stop(self): pass + + state = _main._LoopState(start=0.0) + ctx = _main.RunContext( + cfg=MagicMock(), + capture=lambda: None, + canary=_FakeCanary(paused=True), + detector=MagicMock(), + fsm=MagicMock(), + notifier=_FakeNotifier(), + audit=_FakeAudit(), + detection_log=_FakeAudit(), + scheduler=_FakeScheduler(), + samples_dir=Path("."), + fires_dir=Path("."), + cmd_queue=cmd_queue, + state=state, + levels_extractor_factory=lambda *a, **kw: None, + ) + return ctx + + +@pytest.mark.asyncio +async def test_drain_works_when_canary_paused(monkeypatch): + """Regression: when canary.is_paused, _drain_cmd_queue still dispatches. + + Prior bug: detection loop `continue`'d past the drain block whenever the + tick returned res=None (canary paused). Commands accumulated forever. + """ + import atm.main as _main + from atm.commands import Command + + q: asyncio.Queue = asyncio.Queue() + await q.put(Command(action="status")) + await q.put(Command(action="ss")) + + dispatched: list = [] + + async def _fake_dispatch(ctx, cmd): + dispatched.append(cmd.action) + + monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) + + ctx = _make_ctx_for_drain(q, dispatched) + + await _main._drain_cmd_queue(ctx) + + assert dispatched == ["status", "ss"] + assert q.empty() + + +@pytest.mark.asyncio +async def test_drain_works_when_out_of_window(monkeypatch): + """Drain must still fire when the tick skipped (e.g. out of operating hours). + + The refactored loop runs _drain_cmd_queue unconditionally after every tick, + regardless of `_TickSyncResult` content. + """ + import atm.main as _main + from atm.commands import Command + + q: asyncio.Queue = asyncio.Queue() + await q.put(Command(action="stop")) + + dispatched: list = [] + + async def _fake_dispatch(ctx, cmd): + dispatched.append(cmd.action) + + monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) + + ctx = _make_ctx_for_drain(q, dispatched) + # Simulate out-of-window tick (empty _TickSyncResult, no res) + await _main._handle_fsm_result(ctx, _main._TickSyncResult()) + await _main._drain_cmd_queue(ctx) + + assert dispatched == ["stop"] + + +@pytest.mark.asyncio +async def test_drain_isolates_dispatch_exceptions(monkeypatch): + """If one command raises, remaining commands still drain + warn alert sent.""" + import atm.main as _main + from atm.commands import Command + + q: asyncio.Queue = asyncio.Queue() + await q.put(Command(action="status")) + await q.put(Command(action="ss")) + + attempts: list = [] + + async def _fake_dispatch(ctx, cmd): + attempts.append(cmd.action) + if cmd.action == "status": + raise RuntimeError("boom") + + monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch) + + ctx = _make_ctx_for_drain(q, attempts) + await _main._drain_cmd_queue(ctx) + + assert attempts == ["status", "ss"] + # warn alert for the failed command + warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"] + assert any("status" in t for t in warn_titles) + # command_error audit event + errs = [e for e in ctx.audit.events if e.get("event") == "command_error"] + assert len(errs) == 1 and errs[0]["action"] == "status"