feat(run): async refactor — run_live_async + 7-step shutdown

run_live() is now a thin asyncio.run() wrapper. run_live_async():
- Blocking pipeline (capture→canary→detect→_handle_tick→snapshot) in
  asyncio.to_thread() per decision 1 (_sync_detection_tick function)
- TelegramPoller + ScreenshotScheduler as background asyncio tasks
- asyncio.Queue[Command] for inter-task communication
- Auto-start scheduler on PRIMED, auto-stop on fire/cooled/phase_skip
- 7-step graceful shutdown sequence
- heartbeat_due uses time.monotonic() (prevents immediate-fire regression)
- Status command: FSM state, last detection, uptime, fire count, canary health
- "ss" command: one-shot capture+annotate+send via to_thread
- Price overlay in _save_annotated_frame (dot_pos_abs + canary_ok params)
- test_main.py: ScriptedDetector.step(ts, frame=None) for zero regression

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-04-17 10:37:17 +00:00
parent 4123b31a22
commit ca6e578175
2 changed files with 308 additions and 105 deletions

View File

@@ -2,12 +2,15 @@
from __future__ import annotations
import argparse
import asyncio
import contextlib
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Protocol, cast
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
from atm.config import Config # stdlib-only (tomllib); safe at module level
from atm.notifier import Alert
@@ -348,6 +351,8 @@ def _save_annotated_frame(
label: str,
now: float,
audit: _AuditLike | None = None,
dot_pos_abs: "tuple[int, int] | None" = None,
canary_ok: bool = True,
) -> "Path | None":
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
@@ -355,6 +360,10 @@ def _save_annotated_frame(
audit (when provided) so disk-full / permission issues don't become silent
regressions. Never raises — snapshot is a best-effort enhancement, the
text alert must still go out.
dot_pos_abs + canary_ok: when both are set the price overlay is drawn
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted
since calibration may be stale.
"""
try:
import cv2 # type: ignore[import-untyped]
@@ -371,6 +380,22 @@ def _save_annotated_frame(
annotated = frame.copy()
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
try:
_, dot_y = dot_pos_abs
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass # price overlay is best-effort; never break the screenshot
cv2.imwrite(str(fpath), annotated)
return fpath
except Exception as exc:
@@ -496,7 +521,113 @@ def _handle_tick(
return tr
@dataclass
class _TickSyncResult:
frame: Any = None
res: Any = None # DetectionResult | None
tr: Any = None # Transition | None
first_consumed: bool = False
late_start: bool = False
new_color: str | None = None # corpus sample color when changed
def _sync_detection_tick(
capture: Callable,
canary: Any,
cfg: Any,
detector: Any,
fsm: Any,
notifier: _NotifierLike,
audit: _AuditLike,
detection_log: _AuditLike,
fires_dir: Path,
first_accepted: bool,
last_saved_color: "str | None",
now: float,
samples_dir: Path,
) -> _TickSyncResult:
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread."""
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
return _TickSyncResult()
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
return _TickSyncResult(frame=frame)
res = detector.step(now, frame)
detection_log.log({
"ts": now, "event": "frame",
"window_found": res.window_found,
"dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None,
"match_name": res.match.name if res.match is not None else None,
"distance": round(res.match.distance, 2) if res.match is not None else None,
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
"accepted": res.accepted,
"color": res.color,
})
if not (res.accepted and res.color):
return _TickSyncResult(frame=frame, res=res)
is_first = first_accepted
def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True):
return None
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
)
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot)
if tr is None:
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True)
new_color: str | None = None
if res.color != last_saved_color:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
try:
import cv2 # type: ignore[import-untyped]
cv2.imwrite(str(sample_path), frame)
except Exception as exc:
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
new_color = res.color
if tr.trigger and not tr.locked:
fire_path: "Path | None" = None
if cfg.attach_screenshots.trigger:
fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
)
notifier.send(Alert(
kind="trigger",
title=f"Semnal {tr.trigger}",
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=fire_path,
direction=tr.trigger,
))
return _TickSyncResult(
frame=frame, res=res, tr=tr,
first_consumed=is_first, new_color=new_color,
)
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
"""Sync entry point — delegates to asyncio event loop."""
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> None:
"""Main live monitoring loop. Imports are lazy to keep --help fast."""
try:
from atm.detector import Detector
@@ -506,6 +637,8 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
from atm.notifier.discord import DiscordNotifier
from atm.notifier.telegram import TelegramNotifier
from atm.audit import AuditLog
from atm.commands import TelegramPoller, Command
from atm.scheduler import ScreenshotScheduler
except ImportError as exc:
sys.exit(f"run-loop dependencies not available: {exc}")
@@ -521,7 +654,6 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
]
def _on_drop(backend_name: str, dropped: Alert) -> None:
"""Audit la depășire coadă — face eșecul silențios vizibil."""
audit.log({
"ts": time.time(),
"event": "queue_overflow_drop",
@@ -532,7 +664,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
# Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea.
# Initial frame + canary check
first_frame = capture()
if first_frame is None:
print("WARN: first capture returned None — window/region missing", flush=True)
@@ -542,9 +674,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
if first_check.drifted:
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
canary.resume() # clear the auto-pause so user can Ctrl+C and fix
canary.resume()
dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
notifier.send(Alert(
kind="heartbeat",
title="ATM pornit",
@@ -556,106 +688,42 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
start = time.monotonic()
heartbeat_due = time.time() + cfg.heartbeat_min * 60
levels_extractor = None
last_saved_color: str | None = None
first_accepted = True
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
samples_dir = Path("samples")
samples_dir.mkdir(exist_ok=True)
fires_dir = Path("logs") / "fires"
fires_dir.mkdir(parents=True, exist_ok=True)
import cv2 # type: ignore[import-untyped]
try:
while duration_s is None or (time.monotonic() - start) < duration_s:
now = time.time()
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
time.sleep(cfg.loop_interval_s)
continue
# canary check
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
time.sleep(cfg.loop_interval_s)
continue
# detection
res = detector.step(now)
detection_log.log({
"ts": now,
"event": "frame",
"window_found": res.window_found,
"dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None,
"match_name": res.match.name if res.match is not None else None,
"distance": round(res.match.distance, 2) if res.match is not None else None,
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
"accepted": res.accepted,
"color": res.color,
})
if res.accepted and res.color:
is_first = first_accepted
first_accepted = False
import cv2 # noqa: F401 fail fast if cv2 is missing # type: ignore[import-untyped]
except ImportError:
pass
# Per-iteration closure — binds current frame/now, gates on config.
def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True):
return None
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
)
tr = _handle_tick(
fsm, res.color, now, notifier, audit, is_first,
snapshot=_snapshot,
)
if tr is None:
# pornire târzie: FSM neatins, sari peste FIRE + salvare corpus
time.sleep(cfg.loop_interval_s)
continue
# corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară
if res.color != last_saved_color:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
try:
cv2.imwrite(str(sample_path), frame)
except Exception as exc:
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
last_saved_color = res.color
# FIRE: adnotează frame-ul + salvează, atașează la alertă
if tr.trigger and not tr.locked:
fire_path: "Path | None" = None
if cfg.attach_screenshots.trigger:
fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
)
notifier.send(Alert(
kind="trigger",
title=f"Semnal {tr.trigger}",
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=fire_path,
direction=tr.trigger,
))
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
# phase-B levels
if levels_extractor is not None:
lr = levels_extractor.step(frame, now)
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
first_accepted = True
last_saved_color: str | None = None
levels_extractor = None
# heartbeat — include statistici per-backend ca eșecurile silențioase
# să apară la fiecare 30 min fără să aștepte oprirea.
if time.time() > heartbeat_due:
fire_count = 0
def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
scheduler = ScreenshotScheduler(
capture=capture,
save_fn=_bound_save,
notifier=notifier,
audit=audit,
)
poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
# ------------------------------------------------------------------
# Nested async coroutines — capture nonlocal state from run_live_async
# ------------------------------------------------------------------
async def _heartbeat_loop() -> None:
nonlocal heartbeat_due
while True:
await asyncio.sleep(60)
if time.monotonic() > heartbeat_due:
try:
stats = notifier.stats()
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
@@ -668,9 +736,145 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
except Exception:
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
heartbeat_due = time.time() + cfg.heartbeat_min * 60
time.sleep(cfg.loop_interval_s)
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
async def _dispatch_command(cmd: Command) -> None:
nonlocal fire_count
if cmd.action == "set_interval":
secs = cmd.value or cfg.telegram.auto_poll_interval_s
scheduler.start(secs)
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
elif cmd.action == "stop":
if scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
notifier.send(Alert(kind="status", title="Polling oprit", body=""))
else:
notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
elif cmd.action == "status":
uptime_s = time.monotonic() - start
last_roll = detector.rolling[-1] if detector.rolling else None
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else ""
last_color = (
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else ""
) if last_roll else ""
sched_info = (
f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ"
) if scheduler.is_running else "oprit"
canary_info = "drift (pauze)" if canary.is_paused else "ok"
body = (
f"Stare: {fsm.state.value}\n"
f"Ultima detecție: {last_color} (conf {last_conf})\n"
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n"
f"Poller: {sched_info} | Canary: {canary_info}"
)
notifier.send(Alert(kind="status", title="ATM Status", body=body))
elif cmd.action == "ss":
now_ss = time.time()
frame_ss = await asyncio.to_thread(capture)
if frame_ss is None:
notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
))
return
path_ss = await asyncio.to_thread(
_save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit,
)
audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
async def _detection_loop() -> None:
nonlocal first_accepted, last_saved_color, levels_extractor, fire_count
while True:
if duration_s is not None and (time.monotonic() - start) >= duration_s:
break
now = time.time()
result: _TickSyncResult = await asyncio.to_thread(
_sync_detection_tick,
capture, canary, cfg, detector, fsm, notifier, audit, detection_log,
fires_dir, first_accepted, last_saved_color, now, samples_dir,
)
if result.first_consumed:
first_accepted = False
if result.new_color is not None:
last_saved_color = result.new_color
tr = result.tr
res = result.res
if result.late_start or res is None:
await asyncio.sleep(cfg.loop_interval_s)
continue
if tr is not None and res.accepted and res.color:
if tr.reason == "prime" and not scheduler.is_running:
scheduler.start(cfg.telegram.auto_poll_interval_s)
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr is not None and tr.trigger and not tr.locked:
fire_count += 1
if scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
if levels_extractor is not None and result.frame is not None:
lr = levels_extractor.step(result.frame, now)
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
levels_extractor = None
while True:
try:
cmd = cmd_queue.get_nowait()
await _dispatch_command(cmd)
except asyncio.QueueEmpty:
break
await asyncio.sleep(cfg.loop_interval_s)
# Launch background tasks
t_scheduler = asyncio.create_task(scheduler.run(), name="scheduler")
t_poller = asyncio.create_task(poller.run(), name="poller")
t_heartbeat = asyncio.create_task(_heartbeat_loop(), name="heartbeat")
try:
await _detection_loop()
finally:
# 7-step graceful shutdown
# 1. cancel scheduler
t_scheduler.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_scheduler
# 2. cancel poller
t_poller.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_poller
# 3. cancel heartbeat
t_heartbeat.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_heartbeat
# 4. drain detection — complete (we awaited _detection_loop directly)
# 5. send shutdown alert
try:
stats = notifier.stats()
lines = [f"după {time.monotonic() - start:.0f}s"]
@@ -679,13 +883,12 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
f"{name}: sent={s['sent']} failed={s['failed']} "
f"dropped={s['dropped']} retries={s['retries']}"
)
notifier.send(Alert(
kind="heartbeat", title="ATM oprit",
body="\n".join(lines),
))
notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines)))
except Exception:
pass
# 6. notifier.stop() — flush + join FanoutNotifier threads
notifier.stop()
# 7. audit.close()
audit.close()
detection_log.close()

View File

@@ -186,7 +186,7 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
]
def __init__(self, *a, **kw):
self._i = 0
def step(self, ts):
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]