From ca6e57817537df8adadda32c0ec12e52190c751b Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Fri, 17 Apr 2026 10:37:17 +0000 Subject: [PATCH] =?UTF-8?q?feat(run):=20async=20refactor=20=E2=80=94=20run?= =?UTF-8?q?=5Flive=5Fasync=20+=207-step=20shutdown?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_live() is now a thin asyncio.run() wrapper. run_live_async(): - Blocking pipeline (capture→canary→detect→_handle_tick→snapshot) in asyncio.to_thread() per decision 1 (_sync_detection_tick function) - TelegramPoller + ScreenshotScheduler as background asyncio tasks - asyncio.Queue[Command] for inter-task communication - Auto-start scheduler on PRIMED, auto-stop on fire/cooled/phase_skip - 7-step graceful shutdown sequence - heartbeat_due uses time.monotonic() (prevents immediate-fire regression) - Status command: FSM state, last detection, uptime, fire count, canary health - "ss" command: one-shot capture+annotate+send via to_thread - Price overlay in _save_annotated_frame (dot_pos_abs + canary_ok params) - test_main.py: ScriptedDetector.step(ts, frame=None) for zero regression Co-Authored-By: Claude Sonnet 4.6 --- src/atm/main.py | 411 +++++++++++++++++++++++++++++++++------------ tests/test_main.py | 2 +- 2 files changed, 308 insertions(+), 105 deletions(-) diff --git a/src/atm/main.py b/src/atm/main.py index 4b71ff8..333b5f8 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -2,12 +2,15 @@ from __future__ import annotations import argparse +import asyncio +import contextlib import os import sys import time +from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Callable, Protocol, cast +from typing import TYPE_CHECKING, Any, Callable, Protocol, cast from atm.config import Config # stdlib-only (tomllib); safe at module level from atm.notifier import Alert @@ -348,6 +351,8 @@ def _save_annotated_frame( label: str, now: float, audit: _AuditLike | None = None, + dot_pos_abs: "tuple[int, int] | None" = None, + canary_ok: bool = True, ) -> "Path | None": """Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``. @@ -355,6 +360,10 @@ def _save_annotated_frame( audit (when provided) so disk-full / permission issues don't become silent regressions. Never raises — snapshot is a best-effort enhancement, the text alert must still go out. + + dot_pos_abs + canary_ok: when both are set the price overlay is drawn + (y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted + since calibration may be stale. """ try: import cv2 # type: ignore[import-untyped] @@ -371,6 +380,22 @@ def _save_annotated_frame( annotated = frame.copy() x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) + if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"): + try: + _, dot_y = dot_pos_abs + ya = cfg.y_axis + slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y) + price = ya.p1_price + (dot_y - ya.p1_y) * slope + w_frame = annotated.shape[1] + text = f"${price:.2f}" + font = cv2.FONT_HERSHEY_SIMPLEX + scale, thickness = 1.2, 3 + (tw, th), _ = cv2.getTextSize(text, font, scale, thickness) + tx, ty = w_frame - tw - 10, th + 10 + cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1) + cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA) + except Exception: + pass # price overlay is best-effort; never break the screenshot cv2.imwrite(str(fpath), annotated) return fpath except Exception as exc: @@ -496,7 +521,113 @@ def _handle_tick( return tr +@dataclass +class _TickSyncResult: + frame: Any = None + res: Any = None # DetectionResult | None + tr: Any = None # Transition | None + first_consumed: bool = False + late_start: bool = False + new_color: str | None = None # corpus sample color when changed + + +def _sync_detection_tick( + capture: Callable, + canary: Any, + cfg: Any, + detector: Any, + fsm: Any, + notifier: _NotifierLike, + audit: _AuditLike, + detection_log: _AuditLike, + fires_dir: Path, + first_accepted: bool, + last_saved_color: "str | None", + now: float, + samples_dir: Path, +) -> _TickSyncResult: + """One full detection tick (blocking I/O). Runs in asyncio.to_thread.""" + frame = capture() + if frame is None: + audit.log({"ts": now, "event": "window_lost"}) + return _TickSyncResult() + + cr = canary.check(frame) + if canary.is_paused: + audit.log({"ts": now, "event": "paused", "drift": cr.distance}) + return _TickSyncResult(frame=frame) + + res = detector.step(now, frame) + detection_log.log({ + "ts": now, "event": "frame", + "window_found": res.window_found, + "dot_found": res.dot_found, + "rgb": list(res.rgb) if res.rgb is not None else None, + "match_name": res.match.name if res.match is not None else None, + "distance": round(res.match.distance, 2) if res.match is not None else None, + "confidence": round(res.match.confidence, 3) if res.match is not None else None, + "accepted": res.accepted, + "color": res.color, + }) + + if not (res.accepted and res.color): + return _TickSyncResult(frame=frame, res=res) + + is_first = first_accepted + + def _snapshot(kind: str, label: str) -> "Path | None": + if not getattr(cfg.attach_screenshots, kind, True): + return None + return _save_annotated_frame( + frame, cfg, fires_dir, label, now, audit=audit, + dot_pos_abs=getattr(res, "dot_pos_abs", None), + canary_ok=True, + ) + + tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot) + + if tr is None: + return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True) + + new_color: str | None = None + if res.color != last_saved_color: + ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") + sample_path = samples_dir / f"{ts_str}_{res.color}.png" + try: + import cv2 # type: ignore[import-untyped] + cv2.imwrite(str(sample_path), frame) + except Exception as exc: + audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)}) + new_color = res.color + + if tr.trigger and not tr.locked: + fire_path: "Path | None" = None + if cfg.attach_screenshots.trigger: + fire_path = _save_annotated_frame( + frame, cfg, fires_dir, tr.trigger, now, audit=audit, + dot_pos_abs=getattr(res, "dot_pos_abs", None), + canary_ok=True, + ) + notifier.send(Alert( + kind="trigger", + title=f"Semnal {tr.trigger}", + body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", + image_path=fire_path, + direction=tr.trigger, + )) + + return _TickSyncResult( + frame=frame, res=res, tr=tr, + first_consumed=is_first, new_color=new_color, + ) + + def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: + """Sync entry point — delegates to asyncio event loop.""" + asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub)) + + +async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> None: """Main live monitoring loop. Imports are lazy to keep --help fast.""" try: from atm.detector import Detector @@ -506,6 +637,8 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: from atm.notifier.discord import DiscordNotifier from atm.notifier.telegram import TelegramNotifier from atm.audit import AuditLog + from atm.commands import TelegramPoller, Command + from atm.scheduler import ScreenshotScheduler except ImportError as exc: sys.exit(f"run-loop dependencies not available: {exc}") @@ -521,7 +654,6 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: ] def _on_drop(backend_name: str, dropped: Alert) -> None: - """Audit la depășire coadă — face eșecul silențios vizibil.""" audit.log({ "ts": time.time(), "event": "queue_overflow_drop", @@ -532,7 +664,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop) - # Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea. + # Initial frame + canary check first_frame = capture() if first_frame is None: print("WARN: first capture returned None — window/region missing", flush=True) @@ -542,9 +674,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}" if first_check.drifted: print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True) - canary.resume() # clear the auto-pause so user can Ctrl+C and fix + canary.resume() - dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h" + dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h" notifier.send(Alert( kind="heartbeat", title="ATM pornit", @@ -556,106 +688,42 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status}) start = time.monotonic() - heartbeat_due = time.time() + cfg.heartbeat_min * 60 - levels_extractor = None - last_saved_color: str | None = None - first_accepted = True + heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 samples_dir = Path("samples") samples_dir.mkdir(exist_ok=True) fires_dir = Path("logs") / "fires" fires_dir.mkdir(parents=True, exist_ok=True) - import cv2 # type: ignore[import-untyped] - try: - while duration_s is None or (time.monotonic() - start) < duration_s: - now = time.time() - frame = capture() - if frame is None: - audit.log({"ts": now, "event": "window_lost"}) - time.sleep(cfg.loop_interval_s) - continue - # canary check - cr = canary.check(frame) - if canary.is_paused: - audit.log({"ts": now, "event": "paused", "drift": cr.distance}) - time.sleep(cfg.loop_interval_s) - continue - # detection - res = detector.step(now) - detection_log.log({ - "ts": now, - "event": "frame", - "window_found": res.window_found, - "dot_found": res.dot_found, - "rgb": list(res.rgb) if res.rgb is not None else None, - "match_name": res.match.name if res.match is not None else None, - "distance": round(res.match.distance, 2) if res.match is not None else None, - "confidence": round(res.match.confidence, 3) if res.match is not None else None, - "accepted": res.accepted, - "color": res.color, - }) - if res.accepted and res.color: - is_first = first_accepted - first_accepted = False + import cv2 # noqa: F401 fail fast if cv2 is missing # type: ignore[import-untyped] + except ImportError: + pass - # Per-iteration closure — binds current frame/now, gates on config. - def _snapshot(kind: str, label: str) -> "Path | None": - if not getattr(cfg.attach_screenshots, kind, True): - return None - return _save_annotated_frame( - frame, cfg, fires_dir, label, now, audit=audit, - ) + cmd_queue: asyncio.Queue[Command] = asyncio.Queue() + first_accepted = True + last_saved_color: str | None = None + levels_extractor = None + fire_count = 0 - tr = _handle_tick( - fsm, res.color, now, notifier, audit, is_first, - snapshot=_snapshot, - ) - if tr is None: - # pornire târzie: FSM neatins, sari peste FIRE + salvare corpus - time.sleep(cfg.loop_interval_s) - continue - # corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară - if res.color != last_saved_color: - ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") - sample_path = samples_dir / f"{ts_str}_{res.color}.png" - try: - cv2.imwrite(str(sample_path), frame) - except Exception as exc: - audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)}) - last_saved_color = res.color - # FIRE: adnotează frame-ul + salvează, atașează la alertă - if tr.trigger and not tr.locked: - fire_path: "Path | None" = None - if cfg.attach_screenshots.trigger: - fire_path = _save_annotated_frame( - frame, cfg, fires_dir, tr.trigger, now, audit=audit, - ) - notifier.send(Alert( - kind="trigger", - title=f"Semnal {tr.trigger}", - body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", - image_path=fire_path, - direction=tr.trigger, - )) - levels_extractor = LevelsExtractor(cfg, tr.trigger, now) - # phase-B levels - if levels_extractor is not None: - lr = levels_extractor.step(frame, now) - if lr.status in ("complete", "timeout"): - if lr.status == "complete" and lr.levels: - notifier.send(Alert( - kind="levels", - title="Niveluri", - body=( - f"SL={lr.levels.sl} " - f"TP1={lr.levels.tp1} " - f"TP2={lr.levels.tp2}" - ), - )) - levels_extractor = None - # heartbeat — include statistici per-backend ca eșecurile silențioase - # să apară la fiecare 30 min fără să aștepte oprirea. - if time.time() > heartbeat_due: + def _bound_save(frame: Any, label: str, now: float) -> "Path | None": + return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit) + + scheduler = ScreenshotScheduler( + capture=capture, + save_fn=_bound_save, + notifier=notifier, + audit=audit, + ) + poller = TelegramPoller(cfg.telegram, cmd_queue, audit) + + # ------------------------------------------------------------------ + # Nested async coroutines — capture nonlocal state from run_live_async + # ------------------------------------------------------------------ + + async def _heartbeat_loop() -> None: + nonlocal heartbeat_due + while True: + await asyncio.sleep(60) + if time.monotonic() > heartbeat_due: try: stats = notifier.stats() audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats}) @@ -668,9 +736,145 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines))) except Exception: notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok")) - heartbeat_due = time.time() + cfg.heartbeat_min * 60 - time.sleep(cfg.loop_interval_s) + heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 + + async def _dispatch_command(cmd: Command) -> None: + nonlocal fire_count + if cmd.action == "set_interval": + secs = cmd.value or cfg.telegram.auto_poll_interval_s + scheduler.start(secs) + audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs}) + notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body="")) + elif cmd.action == "stop": + if scheduler.is_running: + scheduler.stop() + audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"}) + notifier.send(Alert(kind="status", title="Polling oprit", body="")) + else: + notifier.send(Alert(kind="status", title="Polling nu este activ", body="")) + elif cmd.action == "status": + uptime_s = time.monotonic() - start + last_roll = detector.rolling[-1] if detector.rolling else None + last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—" + last_color = ( + (last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—" + ) if last_roll else "—" + sched_info = ( + f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ" + ) if scheduler.is_running else "oprit" + canary_info = "drift (pauze)" if canary.is_paused else "ok" + body = ( + f"Stare: {fsm.state.value}\n" + f"Ultima detecție: {last_color} (conf {last_conf})\n" + f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n" + f"Poller: {sched_info} | Canary: {canary_info}" + ) + notifier.send(Alert(kind="status", title="ATM Status", body=body)) + elif cmd.action == "ss": + now_ss = time.time() + frame_ss = await asyncio.to_thread(capture) + if frame_ss is None: + notifier.send(Alert( + kind="warn", + title="Captură eșuată — verificați fereastra TradeStation", + body="", + )) + return + path_ss = await asyncio.to_thread( + _save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit, + ) + audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None}) + notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss)) + + async def _detection_loop() -> None: + nonlocal first_accepted, last_saved_color, levels_extractor, fire_count + + while True: + if duration_s is not None and (time.monotonic() - start) >= duration_s: + break + + now = time.time() + + result: _TickSyncResult = await asyncio.to_thread( + _sync_detection_tick, + capture, canary, cfg, detector, fsm, notifier, audit, detection_log, + fires_dir, first_accepted, last_saved_color, now, samples_dir, + ) + + if result.first_consumed: + first_accepted = False + if result.new_color is not None: + last_saved_color = result.new_color + + tr = result.tr + res = result.res + + if result.late_start or res is None: + await asyncio.sleep(cfg.loop_interval_s) + continue + + if tr is not None and res.accepted and res.color: + if tr.reason == "prime" and not scheduler.is_running: + scheduler.start(cfg.telegram.auto_poll_interval_s) + audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"}) + elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running: + scheduler.stop() + audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason}) + + if tr is not None and tr.trigger and not tr.locked: + fire_count += 1 + if scheduler.is_running: + scheduler.stop() + audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"}) + levels_extractor = LevelsExtractor(cfg, tr.trigger, now) + + if levels_extractor is not None and result.frame is not None: + lr = levels_extractor.step(result.frame, now) + if lr.status in ("complete", "timeout"): + if lr.status == "complete" and lr.levels: + notifier.send(Alert( + kind="levels", + title="Niveluri", + body=( + f"SL={lr.levels.sl} " + f"TP1={lr.levels.tp1} " + f"TP2={lr.levels.tp2}" + ), + )) + levels_extractor = None + + while True: + try: + cmd = cmd_queue.get_nowait() + await _dispatch_command(cmd) + except asyncio.QueueEmpty: + break + + await asyncio.sleep(cfg.loop_interval_s) + + # Launch background tasks + t_scheduler = asyncio.create_task(scheduler.run(), name="scheduler") + t_poller = asyncio.create_task(poller.run(), name="poller") + t_heartbeat = asyncio.create_task(_heartbeat_loop(), name="heartbeat") + + try: + await _detection_loop() finally: + # 7-step graceful shutdown + # 1. cancel scheduler + t_scheduler.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await t_scheduler + # 2. cancel poller + t_poller.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await t_poller + # 3. cancel heartbeat + t_heartbeat.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await t_heartbeat + # 4. drain detection — complete (we awaited _detection_loop directly) + # 5. send shutdown alert try: stats = notifier.stats() lines = [f"după {time.monotonic() - start:.0f}s"] @@ -679,13 +883,12 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: f"{name}: sent={s['sent']} failed={s['failed']} " f"dropped={s['dropped']} retries={s['retries']}" ) - notifier.send(Alert( - kind="heartbeat", title="ATM oprit", - body="\n".join(lines), - )) + notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines))) except Exception: pass + # 6. notifier.stop() — flush + join FanoutNotifier threads notifier.stop() + # 7. audit.close() audit.close() detection_log.close() diff --git a/tests/test_main.py b/tests/test_main.py index ee8e18e..302cceb 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -186,7 +186,7 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path): ] def __init__(self, *a, **kw): self._i = 0 - def step(self, ts): + def step(self, ts, frame=None): if self._i >= len(self._script): raise _StopLoop color, accepted = self._script[self._i]