Compare commits
9 Commits
8ff31ed241
...
0f7dd5dc84
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f7dd5dc84 | ||
|
|
63642e71dd | ||
|
|
424437ceaf | ||
|
|
ca6e578175 | ||
|
|
4123b31a22 | ||
|
|
c1b89ad6a9 | ||
|
|
fd04fcd5e6 | ||
|
|
c6714e8d5e | ||
|
|
238243b1ce |
35
CLAUDE.md
Normal file
35
CLAUDE.md
Normal file
@@ -0,0 +1,35 @@
|
||||
# ATM — Automated Trading Monitor
|
||||
|
||||
Personal Faza-1 tool for the M2D strategy. Python 3.11+.
|
||||
|
||||
## Quick Reference
|
||||
|
||||
```bash
|
||||
pip install -e ".[windows]" # Windows: live capture
|
||||
pip install -e . # Linux/macOS: dev/dryrun only
|
||||
atm calibrate # Tk wizard
|
||||
atm debug --delay 5 # one-shot capture + detect
|
||||
atm run --start-at 16:30 --stop-at 23:00 # live session
|
||||
atm dryrun samples # corpus gate
|
||||
pytest # run tests
|
||||
```
|
||||
|
||||
## Skill routing
|
||||
|
||||
When the user's request matches an available skill, ALWAYS invoke it using the Skill
|
||||
tool as your FIRST action. Do NOT answer directly, do NOT use other tools first.
|
||||
The skill has specialized workflows that produce better results than ad-hoc answers.
|
||||
|
||||
Key routing rules:
|
||||
- Product ideas, "is this worth building", brainstorming → invoke office-hours
|
||||
- Bugs, errors, "why is this broken", 500 errors → invoke investigate
|
||||
- Ship, deploy, push, create PR → invoke ship
|
||||
- QA, test the site, find bugs → invoke qa
|
||||
- Code review, check my diff → invoke review
|
||||
- Update docs after shipping → invoke document-release
|
||||
- Weekly retro → invoke retro
|
||||
- Design system, brand → invoke design-consultation
|
||||
- Visual audit, design polish → invoke design-review
|
||||
- Architecture review → invoke plan-eng-review
|
||||
- Save progress, checkpoint, resume → invoke checkpoint
|
||||
- Code quality, health check → invoke health
|
||||
10
TODOS.md
10
TODOS.md
@@ -49,9 +49,17 @@ Read-only web view of today's audit JSONL + recent triggers. Useful for review a
|
||||
|
||||
---
|
||||
|
||||
## P2-yaxis-recalib-detect — Y-axis recalibration detection
|
||||
|
||||
Price overlay (from Telegram commands feature) uses `y_axis` linear interpolation to show current price on screenshots. When the user rescales the chart y-axis (common after overnight price gaps), the calibration becomes stale and prices shown are incorrect. Canary check detects layout drift but NOT y-axis range changes.
|
||||
|
||||
- Possible approaches: OCR on y-axis labels (fragile), track price range consistency across sessions, or simple "calibration age" warning after N hours.
|
||||
- Start after price overlay is live and the false-price frequency is known.
|
||||
- Depends on: Telegram commands + price overlay feature being shipped.
|
||||
|
||||
## Quality debt
|
||||
|
||||
- [ ] **Integration test for run_live loop**: currently mocked at module level. Add a short-duration in-memory loop test that threads real detector/state_machine/audit together (no network).
|
||||
- [x] **Integration test for run_live loop**: lifecycle async test added in `tests/test_main.py` (IDLE→ARMED→PRIMED auto-poll→FIRE auto-stop).
|
||||
- [ ] **Coverage report**: run `pytest --cov=atm --cov-report=term-missing`, aim for ≥ 85% per module.
|
||||
- [ ] **Typing strictness**: run `pyright src/` with strict mode, fix reported issues.
|
||||
- [ ] **Perf baseline**: profile one detection cycle on a representative frame; ensure < 100ms so 5s loop has ample headroom.
|
||||
|
||||
@@ -13,6 +13,7 @@ dependencies = [
|
||||
"pillow>=10.0",
|
||||
"requests>=2.31",
|
||||
"rich>=13.0",
|
||||
"httpx>=0.27",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@@ -24,6 +25,7 @@ windows = [
|
||||
dev = [
|
||||
"pytest>=8.0",
|
||||
"pytest-cov>=5.0",
|
||||
"pytest-asyncio>=0.23",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import threading
|
||||
from datetime import datetime, date
|
||||
from pathlib import Path
|
||||
from typing import Callable, IO
|
||||
@@ -16,21 +17,25 @@ class AuditLog:
|
||||
self._clock: Callable[[], datetime] = clock or datetime.now
|
||||
self._current_date: date | None = None
|
||||
self._fh: IO[str] | None = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def log(self, event: dict) -> None:
|
||||
now = self._clock()
|
||||
today = now.date()
|
||||
|
||||
if today != self._current_date:
|
||||
self._open(today)
|
||||
|
||||
if "ts" not in event:
|
||||
event = {**event, "ts": now.isoformat()}
|
||||
|
||||
assert self._fh is not None
|
||||
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
|
||||
with self._lock:
|
||||
if today != self._current_date:
|
||||
self._open(today)
|
||||
assert self._fh is not None
|
||||
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
|
||||
|
||||
def close(self) -> None:
|
||||
with self._lock:
|
||||
self._close_locked()
|
||||
|
||||
def _close_locked(self) -> None:
|
||||
"""Close file handle; must be called while holding self._lock."""
|
||||
if self._fh is not None:
|
||||
try:
|
||||
self._fh.close()
|
||||
@@ -47,7 +52,7 @@ class AuditLog:
|
||||
return self._base_dir / f"{self._current_date}.jsonl"
|
||||
|
||||
def _open(self, today: date) -> None:
|
||||
self.close()
|
||||
self._close_locked() # already holding self._lock
|
||||
self._base_dir.mkdir(parents=True, exist_ok=True)
|
||||
path = self._base_dir / f"{today}.jsonl"
|
||||
self._fh = open(path, "a", buffering=1, encoding="utf-8")
|
||||
|
||||
163
src/atm/commands.py
Normal file
163
src/atm/commands.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""Telegram command poller + Command dataclass.
|
||||
|
||||
Uses httpx (async) for long-polling getUpdates. The sync TelegramNotifier
|
||||
continues to use requests — this module is the only httpx consumer.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
import httpx
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .config import TelegramCfg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CommandAction = Literal["set_interval", "stop", "status", "ss"]
|
||||
|
||||
_BASE = "https://api.telegram.org/bot{token}/{method}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Command:
|
||||
action: CommandAction
|
||||
value: int | None = None # seconds; only for set_interval
|
||||
|
||||
|
||||
class TelegramPoller:
|
||||
"""Long-poll Telegram getUpdates, emit Commands into asyncio.Queue.
|
||||
|
||||
Security: rejects messages from chat_ids not in cfg.allowed_chat_ids.
|
||||
Degrades (stops polling) after 3 consecutive 401 responses and warns
|
||||
via Discord (caller responsibility — poller only logs + sets degraded flag).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cfg: TelegramCfg,
|
||||
cmd_queue: asyncio.Queue[Command],
|
||||
audit, # _AuditLike
|
||||
) -> None:
|
||||
self._cfg = cfg
|
||||
self._cmd_queue = cmd_queue
|
||||
self._audit = audit
|
||||
self._offset = 0
|
||||
self._consecutive_401 = 0
|
||||
self._degraded = False
|
||||
# fallback: if allowed_chat_ids is empty, accept only the primary chat
|
||||
self._allowed = set(cfg.allowed_chat_ids) or {cfg.chat_id}
|
||||
|
||||
@property
|
||||
def degraded(self) -> bool:
|
||||
return self._degraded
|
||||
|
||||
async def run(self) -> None:
|
||||
async with httpx.AsyncClient() as client:
|
||||
await self._drain(client)
|
||||
while True:
|
||||
if self._degraded:
|
||||
await asyncio.sleep(5)
|
||||
continue
|
||||
try:
|
||||
await self._poll_once(client)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except (httpx.HTTPError, httpx.TimeoutException) as exc:
|
||||
self._audit.log({"event": "poller_error", "error": str(exc)})
|
||||
await asyncio.sleep(5)
|
||||
except Exception as exc: # json, unexpected
|
||||
self._audit.log({"event": "poller_error", "error": str(exc)})
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def _drain(self, client: httpx.AsyncClient) -> None:
|
||||
"""Discard all pending updates at startup so stale commands don't replay."""
|
||||
try:
|
||||
resp = await client.get(
|
||||
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
|
||||
params={"timeout": 0, "offset": self._offset},
|
||||
timeout=10,
|
||||
)
|
||||
body = resp.json()
|
||||
if body.get("ok") and body.get("result"):
|
||||
self._offset = body["result"][-1]["update_id"] + 1
|
||||
except Exception as exc:
|
||||
logger.warning("TelegramPoller startup drain failed: %s", exc)
|
||||
|
||||
async def _poll_once(self, client: httpx.AsyncClient) -> None:
|
||||
resp = await client.get(
|
||||
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
|
||||
params={"timeout": self._cfg.poll_timeout_s, "offset": self._offset},
|
||||
timeout=self._cfg.poll_timeout_s + 5,
|
||||
)
|
||||
|
||||
if resp.status_code == 401:
|
||||
self._consecutive_401 += 1
|
||||
if self._consecutive_401 >= 3:
|
||||
self._degraded = True
|
||||
self._audit.log({"event": "poller_degraded", "reason": "3_consecutive_401"})
|
||||
return
|
||||
self._consecutive_401 = 0
|
||||
|
||||
body = resp.json()
|
||||
if not body.get("ok"):
|
||||
return
|
||||
|
||||
for update in body.get("result", []):
|
||||
self._offset = update["update_id"] + 1
|
||||
await self._process_update(update)
|
||||
|
||||
async def _process_update(self, update: dict) -> None:
|
||||
if "callback_query" in update:
|
||||
# Inline button pressed — may be expired; reply with fallback
|
||||
cbq = update["callback_query"]
|
||||
chat_id = str(cbq.get("from", {}).get("id", ""))
|
||||
if chat_id not in self._allowed:
|
||||
logger.info("Rejected callback_query from chat_id=%s", chat_id)
|
||||
return
|
||||
# Caller handles answerCallbackQuery; just note in audit
|
||||
self._audit.log({"event": "command_received", "action": "callback_query", "chat_id": chat_id})
|
||||
return
|
||||
|
||||
msg = update.get("message") or update.get("edited_message")
|
||||
if not msg:
|
||||
return
|
||||
|
||||
chat_id = str(msg.get("chat", {}).get("id", ""))
|
||||
if chat_id not in self._allowed:
|
||||
logger.info("Rejected message from chat_id=%s", chat_id)
|
||||
return
|
||||
|
||||
text = (msg.get("text") or "").strip().lower()
|
||||
cmd = self._parse_command(text)
|
||||
if cmd is None:
|
||||
return
|
||||
|
||||
self._audit.log({
|
||||
"event": "command_received",
|
||||
"action": cmd.action,
|
||||
"value": cmd.value,
|
||||
"chat_id": chat_id,
|
||||
})
|
||||
await self._cmd_queue.put(cmd)
|
||||
|
||||
def _parse_command(self, text: str) -> Command | None:
|
||||
t = text.lstrip("/").strip()
|
||||
if not t:
|
||||
return None
|
||||
if t == "stop":
|
||||
return Command(action="stop")
|
||||
if t == "status":
|
||||
return Command(action="status")
|
||||
if t in ("ss", "screenshot"):
|
||||
return Command(action="ss")
|
||||
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
|
||||
parts = t.split()
|
||||
if len(parts) == 1 and parts[0].isdigit():
|
||||
return Command(action="set_interval", value=int(parts[0]) * 60)
|
||||
if len(parts) == 2 and parts[0] in ("interval", "set_interval") and parts[1].isdigit():
|
||||
return Command(action="set_interval", value=int(parts[1]) * 60)
|
||||
return None
|
||||
@@ -78,6 +78,9 @@ class DiscordCfg:
|
||||
class TelegramCfg:
|
||||
bot_token: str
|
||||
chat_id: str
|
||||
allowed_chat_ids: tuple[str, ...] = ()
|
||||
poll_timeout_s: int = 30
|
||||
auto_poll_interval_s: int = 180
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.bot_token or not self.chat_id:
|
||||
@@ -156,9 +159,14 @@ class Config:
|
||||
drift_threshold=int(data["canary"].get("drift_threshold", 8)),
|
||||
)
|
||||
discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"])
|
||||
tg = data["telegram"]
|
||||
_allowed = [str(c) for c in tg.get("allowed_chat_ids", [])] or [str(tg["chat_id"])]
|
||||
telegram = TelegramCfg(
|
||||
bot_token=data["telegram"]["bot_token"],
|
||||
chat_id=str(data["telegram"]["chat_id"]),
|
||||
bot_token=tg["bot_token"],
|
||||
chat_id=str(tg["chat_id"]),
|
||||
allowed_chat_ids=tuple(_allowed),
|
||||
poll_timeout_s=int(tg.get("poll_timeout_s", 30)),
|
||||
auto_poll_interval_s=int(tg.get("auto_poll_interval_s", 180)),
|
||||
)
|
||||
opts = data.get("options", {})
|
||||
region = None
|
||||
|
||||
@@ -28,6 +28,7 @@ class DetectionResult:
|
||||
match: ColorMatch | None # None if no dot
|
||||
accepted: bool # post-debounce; True only when match repeats debounce_depth times
|
||||
color: str | None # accepted color name (UNKNOWN excluded)
|
||||
dot_pos_abs: tuple[int, int] | None = None # absolute (x, y) in frame; set when dot_found
|
||||
|
||||
|
||||
class Detector:
|
||||
@@ -60,8 +61,14 @@ class Detector:
|
||||
self._debounce: deque[str | None] = deque(maxlen=cfg.debounce_depth)
|
||||
self._rolling: deque[DetectionResult] = deque(maxlen=20)
|
||||
|
||||
def step(self, ts: float) -> DetectionResult:
|
||||
frame = self._capture()
|
||||
def step(self, ts: float, frame=None) -> DetectionResult:
|
||||
"""Run one detection tick.
|
||||
|
||||
frame: pre-captured BGR ndarray (from asyncio.to_thread capture). When
|
||||
None (default), calls self._capture() — preserving the sync-loop behaviour.
|
||||
"""
|
||||
if frame is None:
|
||||
frame = self._capture()
|
||||
|
||||
if frame is None:
|
||||
self._debounce.append(None)
|
||||
@@ -117,6 +124,7 @@ class Detector:
|
||||
match=match,
|
||||
accepted=accepted,
|
||||
color=color,
|
||||
dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y),
|
||||
)
|
||||
self._rolling.append(r)
|
||||
return r
|
||||
|
||||
411
src/atm/main.py
411
src/atm/main.py
@@ -2,12 +2,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import contextlib
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Callable, Protocol, cast
|
||||
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
|
||||
|
||||
from atm.config import Config # stdlib-only (tomllib); safe at module level
|
||||
from atm.notifier import Alert
|
||||
@@ -348,6 +351,8 @@ def _save_annotated_frame(
|
||||
label: str,
|
||||
now: float,
|
||||
audit: _AuditLike | None = None,
|
||||
dot_pos_abs: "tuple[int, int] | None" = None,
|
||||
canary_ok: bool = True,
|
||||
) -> "Path | None":
|
||||
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
|
||||
|
||||
@@ -355,6 +360,10 @@ def _save_annotated_frame(
|
||||
audit (when provided) so disk-full / permission issues don't become silent
|
||||
regressions. Never raises — snapshot is a best-effort enhancement, the
|
||||
text alert must still go out.
|
||||
|
||||
dot_pos_abs + canary_ok: when both are set the price overlay is drawn
|
||||
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted
|
||||
since calibration may be stale.
|
||||
"""
|
||||
try:
|
||||
import cv2 # type: ignore[import-untyped]
|
||||
@@ -371,6 +380,22 @@ def _save_annotated_frame(
|
||||
annotated = frame.copy()
|
||||
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
|
||||
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
|
||||
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
|
||||
try:
|
||||
_, dot_y = dot_pos_abs
|
||||
ya = cfg.y_axis
|
||||
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
|
||||
price = ya.p1_price + (dot_y - ya.p1_y) * slope
|
||||
w_frame = annotated.shape[1]
|
||||
text = f"${price:.2f}"
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
scale, thickness = 1.2, 3
|
||||
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
|
||||
tx, ty = w_frame - tw - 10, th + 10
|
||||
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
|
||||
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
|
||||
except Exception:
|
||||
pass # price overlay is best-effort; never break the screenshot
|
||||
cv2.imwrite(str(fpath), annotated)
|
||||
return fpath
|
||||
except Exception as exc:
|
||||
@@ -496,7 +521,113 @@ def _handle_tick(
|
||||
return tr
|
||||
|
||||
|
||||
@dataclass
|
||||
class _TickSyncResult:
|
||||
frame: Any = None
|
||||
res: Any = None # DetectionResult | None
|
||||
tr: Any = None # Transition | None
|
||||
first_consumed: bool = False
|
||||
late_start: bool = False
|
||||
new_color: str | None = None # corpus sample color when changed
|
||||
|
||||
|
||||
def _sync_detection_tick(
|
||||
capture: Callable,
|
||||
canary: Any,
|
||||
cfg: Any,
|
||||
detector: Any,
|
||||
fsm: Any,
|
||||
notifier: _NotifierLike,
|
||||
audit: _AuditLike,
|
||||
detection_log: _AuditLike,
|
||||
fires_dir: Path,
|
||||
first_accepted: bool,
|
||||
last_saved_color: "str | None",
|
||||
now: float,
|
||||
samples_dir: Path,
|
||||
) -> _TickSyncResult:
|
||||
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread."""
|
||||
frame = capture()
|
||||
if frame is None:
|
||||
audit.log({"ts": now, "event": "window_lost"})
|
||||
return _TickSyncResult()
|
||||
|
||||
cr = canary.check(frame)
|
||||
if canary.is_paused:
|
||||
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
|
||||
return _TickSyncResult(frame=frame)
|
||||
|
||||
res = detector.step(now, frame)
|
||||
detection_log.log({
|
||||
"ts": now, "event": "frame",
|
||||
"window_found": res.window_found,
|
||||
"dot_found": res.dot_found,
|
||||
"rgb": list(res.rgb) if res.rgb is not None else None,
|
||||
"match_name": res.match.name if res.match is not None else None,
|
||||
"distance": round(res.match.distance, 2) if res.match is not None else None,
|
||||
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
|
||||
"accepted": res.accepted,
|
||||
"color": res.color,
|
||||
})
|
||||
|
||||
if not (res.accepted and res.color):
|
||||
return _TickSyncResult(frame=frame, res=res)
|
||||
|
||||
is_first = first_accepted
|
||||
|
||||
def _snapshot(kind: str, label: str) -> "Path | None":
|
||||
if not getattr(cfg.attach_screenshots, kind, True):
|
||||
return None
|
||||
return _save_annotated_frame(
|
||||
frame, cfg, fires_dir, label, now, audit=audit,
|
||||
dot_pos_abs=getattr(res, "dot_pos_abs", None),
|
||||
canary_ok=True,
|
||||
)
|
||||
|
||||
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot)
|
||||
|
||||
if tr is None:
|
||||
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True)
|
||||
|
||||
new_color: str | None = None
|
||||
if res.color != last_saved_color:
|
||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
|
||||
try:
|
||||
import cv2 # type: ignore[import-untyped]
|
||||
cv2.imwrite(str(sample_path), frame)
|
||||
except Exception as exc:
|
||||
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
|
||||
new_color = res.color
|
||||
|
||||
if tr.trigger and not tr.locked:
|
||||
fire_path: "Path | None" = None
|
||||
if cfg.attach_screenshots.trigger:
|
||||
fire_path = _save_annotated_frame(
|
||||
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
|
||||
dot_pos_abs=getattr(res, "dot_pos_abs", None),
|
||||
canary_ok=True,
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="trigger",
|
||||
title=f"Semnal {tr.trigger}",
|
||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=fire_path,
|
||||
direction=tr.trigger,
|
||||
))
|
||||
|
||||
return _TickSyncResult(
|
||||
frame=frame, res=res, tr=tr,
|
||||
first_consumed=is_first, new_color=new_color,
|
||||
)
|
||||
|
||||
|
||||
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
"""Sync entry point — delegates to asyncio event loop."""
|
||||
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
|
||||
|
||||
|
||||
async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
"""Main live monitoring loop. Imports are lazy to keep --help fast."""
|
||||
try:
|
||||
from atm.detector import Detector
|
||||
@@ -506,6 +637,8 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
from atm.notifier.discord import DiscordNotifier
|
||||
from atm.notifier.telegram import TelegramNotifier
|
||||
from atm.audit import AuditLog
|
||||
from atm.commands import TelegramPoller, Command
|
||||
from atm.scheduler import ScreenshotScheduler
|
||||
except ImportError as exc:
|
||||
sys.exit(f"run-loop dependencies not available: {exc}")
|
||||
|
||||
@@ -521,7 +654,6 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
]
|
||||
|
||||
def _on_drop(backend_name: str, dropped: Alert) -> None:
|
||||
"""Audit la depășire coadă — face eșecul silențios vizibil."""
|
||||
audit.log({
|
||||
"ts": time.time(),
|
||||
"event": "queue_overflow_drop",
|
||||
@@ -532,7 +664,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
|
||||
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
|
||||
|
||||
# Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea.
|
||||
# Initial frame + canary check
|
||||
first_frame = capture()
|
||||
if first_frame is None:
|
||||
print("WARN: first capture returned None — window/region missing", flush=True)
|
||||
@@ -542,9 +674,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
|
||||
if first_check.drifted:
|
||||
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
|
||||
canary.resume() # clear the auto-pause so user can Ctrl+C and fix
|
||||
canary.resume()
|
||||
|
||||
dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
|
||||
dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
|
||||
notifier.send(Alert(
|
||||
kind="heartbeat",
|
||||
title="ATM pornit",
|
||||
@@ -556,106 +688,42 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
|
||||
|
||||
start = time.monotonic()
|
||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
||||
levels_extractor = None
|
||||
last_saved_color: str | None = None
|
||||
first_accepted = True
|
||||
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
|
||||
samples_dir = Path("samples")
|
||||
samples_dir.mkdir(exist_ok=True)
|
||||
fires_dir = Path("logs") / "fires"
|
||||
fires_dir.mkdir(parents=True, exist_ok=True)
|
||||
import cv2 # type: ignore[import-untyped]
|
||||
|
||||
try:
|
||||
while duration_s is None or (time.monotonic() - start) < duration_s:
|
||||
now = time.time()
|
||||
frame = capture()
|
||||
if frame is None:
|
||||
audit.log({"ts": now, "event": "window_lost"})
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
# canary check
|
||||
cr = canary.check(frame)
|
||||
if canary.is_paused:
|
||||
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
# detection
|
||||
res = detector.step(now)
|
||||
detection_log.log({
|
||||
"ts": now,
|
||||
"event": "frame",
|
||||
"window_found": res.window_found,
|
||||
"dot_found": res.dot_found,
|
||||
"rgb": list(res.rgb) if res.rgb is not None else None,
|
||||
"match_name": res.match.name if res.match is not None else None,
|
||||
"distance": round(res.match.distance, 2) if res.match is not None else None,
|
||||
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
|
||||
"accepted": res.accepted,
|
||||
"color": res.color,
|
||||
})
|
||||
if res.accepted and res.color:
|
||||
is_first = first_accepted
|
||||
first_accepted = False
|
||||
import cv2 # noqa: F401 fail fast if cv2 is missing # type: ignore[import-untyped]
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Per-iteration closure — binds current frame/now, gates on config.
|
||||
def _snapshot(kind: str, label: str) -> "Path | None":
|
||||
if not getattr(cfg.attach_screenshots, kind, True):
|
||||
return None
|
||||
return _save_annotated_frame(
|
||||
frame, cfg, fires_dir, label, now, audit=audit,
|
||||
)
|
||||
cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
|
||||
first_accepted = True
|
||||
last_saved_color: str | None = None
|
||||
levels_extractor = None
|
||||
fire_count = 0
|
||||
|
||||
tr = _handle_tick(
|
||||
fsm, res.color, now, notifier, audit, is_first,
|
||||
snapshot=_snapshot,
|
||||
)
|
||||
if tr is None:
|
||||
# pornire târzie: FSM neatins, sari peste FIRE + salvare corpus
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
# corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară
|
||||
if res.color != last_saved_color:
|
||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
|
||||
try:
|
||||
cv2.imwrite(str(sample_path), frame)
|
||||
except Exception as exc:
|
||||
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
|
||||
last_saved_color = res.color
|
||||
# FIRE: adnotează frame-ul + salvează, atașează la alertă
|
||||
if tr.trigger and not tr.locked:
|
||||
fire_path: "Path | None" = None
|
||||
if cfg.attach_screenshots.trigger:
|
||||
fire_path = _save_annotated_frame(
|
||||
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="trigger",
|
||||
title=f"Semnal {tr.trigger}",
|
||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=fire_path,
|
||||
direction=tr.trigger,
|
||||
))
|
||||
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
|
||||
# phase-B levels
|
||||
if levels_extractor is not None:
|
||||
lr = levels_extractor.step(frame, now)
|
||||
if lr.status in ("complete", "timeout"):
|
||||
if lr.status == "complete" and lr.levels:
|
||||
notifier.send(Alert(
|
||||
kind="levels",
|
||||
title="Niveluri",
|
||||
body=(
|
||||
f"SL={lr.levels.sl} "
|
||||
f"TP1={lr.levels.tp1} "
|
||||
f"TP2={lr.levels.tp2}"
|
||||
),
|
||||
))
|
||||
levels_extractor = None
|
||||
# heartbeat — include statistici per-backend ca eșecurile silențioase
|
||||
# să apară la fiecare 30 min fără să aștepte oprirea.
|
||||
if time.time() > heartbeat_due:
|
||||
def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
|
||||
return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
|
||||
|
||||
scheduler = ScreenshotScheduler(
|
||||
capture=capture,
|
||||
save_fn=_bound_save,
|
||||
notifier=notifier,
|
||||
audit=audit,
|
||||
)
|
||||
poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Nested async coroutines — capture nonlocal state from run_live_async
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _heartbeat_loop() -> None:
|
||||
nonlocal heartbeat_due
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
if time.monotonic() > heartbeat_due:
|
||||
try:
|
||||
stats = notifier.stats()
|
||||
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
|
||||
@@ -668,9 +736,145 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
|
||||
except Exception:
|
||||
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
|
||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
|
||||
|
||||
async def _dispatch_command(cmd: Command) -> None:
|
||||
nonlocal fire_count
|
||||
if cmd.action == "set_interval":
|
||||
secs = cmd.value or cfg.telegram.auto_poll_interval_s
|
||||
scheduler.start(secs)
|
||||
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
|
||||
notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
|
||||
elif cmd.action == "stop":
|
||||
if scheduler.is_running:
|
||||
scheduler.stop()
|
||||
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
|
||||
notifier.send(Alert(kind="status", title="Polling oprit", body=""))
|
||||
else:
|
||||
notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
|
||||
elif cmd.action == "status":
|
||||
uptime_s = time.monotonic() - start
|
||||
last_roll = detector.rolling[-1] if detector.rolling else None
|
||||
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—"
|
||||
last_color = (
|
||||
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—"
|
||||
) if last_roll else "—"
|
||||
sched_info = (
|
||||
f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ"
|
||||
) if scheduler.is_running else "oprit"
|
||||
canary_info = "drift (pauze)" if canary.is_paused else "ok"
|
||||
body = (
|
||||
f"Stare: {fsm.state.value}\n"
|
||||
f"Ultima detecție: {last_color} (conf {last_conf})\n"
|
||||
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n"
|
||||
f"Poller: {sched_info} | Canary: {canary_info}"
|
||||
)
|
||||
notifier.send(Alert(kind="status", title="ATM Status", body=body))
|
||||
elif cmd.action == "ss":
|
||||
now_ss = time.time()
|
||||
frame_ss = await asyncio.to_thread(capture)
|
||||
if frame_ss is None:
|
||||
notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
))
|
||||
return
|
||||
path_ss = await asyncio.to_thread(
|
||||
_save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit,
|
||||
)
|
||||
audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
|
||||
notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
|
||||
|
||||
async def _detection_loop() -> None:
|
||||
nonlocal first_accepted, last_saved_color, levels_extractor, fire_count
|
||||
|
||||
while True:
|
||||
if duration_s is not None and (time.monotonic() - start) >= duration_s:
|
||||
break
|
||||
|
||||
now = time.time()
|
||||
|
||||
result: _TickSyncResult = await asyncio.to_thread(
|
||||
_sync_detection_tick,
|
||||
capture, canary, cfg, detector, fsm, notifier, audit, detection_log,
|
||||
fires_dir, first_accepted, last_saved_color, now, samples_dir,
|
||||
)
|
||||
|
||||
if result.first_consumed:
|
||||
first_accepted = False
|
||||
if result.new_color is not None:
|
||||
last_saved_color = result.new_color
|
||||
|
||||
tr = result.tr
|
||||
res = result.res
|
||||
|
||||
if result.late_start or res is None:
|
||||
await asyncio.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
|
||||
if tr is not None and res.accepted and res.color:
|
||||
if tr.reason == "prime" and not scheduler.is_running:
|
||||
scheduler.start(cfg.telegram.auto_poll_interval_s)
|
||||
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
|
||||
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running:
|
||||
scheduler.stop()
|
||||
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
|
||||
|
||||
if tr is not None and tr.trigger and not tr.locked:
|
||||
fire_count += 1
|
||||
if scheduler.is_running:
|
||||
scheduler.stop()
|
||||
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
|
||||
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
|
||||
|
||||
if levels_extractor is not None and result.frame is not None:
|
||||
lr = levels_extractor.step(result.frame, now)
|
||||
if lr.status in ("complete", "timeout"):
|
||||
if lr.status == "complete" and lr.levels:
|
||||
notifier.send(Alert(
|
||||
kind="levels",
|
||||
title="Niveluri",
|
||||
body=(
|
||||
f"SL={lr.levels.sl} "
|
||||
f"TP1={lr.levels.tp1} "
|
||||
f"TP2={lr.levels.tp2}"
|
||||
),
|
||||
))
|
||||
levels_extractor = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
cmd = cmd_queue.get_nowait()
|
||||
await _dispatch_command(cmd)
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
|
||||
await asyncio.sleep(cfg.loop_interval_s)
|
||||
|
||||
# Launch background tasks
|
||||
t_scheduler = asyncio.create_task(scheduler.run(), name="scheduler")
|
||||
t_poller = asyncio.create_task(poller.run(), name="poller")
|
||||
t_heartbeat = asyncio.create_task(_heartbeat_loop(), name="heartbeat")
|
||||
|
||||
try:
|
||||
await _detection_loop()
|
||||
finally:
|
||||
# 7-step graceful shutdown
|
||||
# 1. cancel scheduler
|
||||
t_scheduler.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
await t_scheduler
|
||||
# 2. cancel poller
|
||||
t_poller.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
await t_poller
|
||||
# 3. cancel heartbeat
|
||||
t_heartbeat.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
await t_heartbeat
|
||||
# 4. drain detection — complete (we awaited _detection_loop directly)
|
||||
# 5. send shutdown alert
|
||||
try:
|
||||
stats = notifier.stats()
|
||||
lines = [f"după {time.monotonic() - start:.0f}s"]
|
||||
@@ -679,13 +883,12 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
f"{name}: sent={s['sent']} failed={s['failed']} "
|
||||
f"dropped={s['dropped']} retries={s['retries']}"
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="heartbeat", title="ATM oprit",
|
||||
body="\n".join(lines),
|
||||
))
|
||||
notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines)))
|
||||
except Exception:
|
||||
pass
|
||||
# 6. notifier.stop() — flush + join FanoutNotifier threads
|
||||
notifier.stop()
|
||||
# 7. audit.close()
|
||||
audit.close()
|
||||
detection_log.close()
|
||||
|
||||
|
||||
@@ -5,11 +5,13 @@ from typing import Protocol
|
||||
|
||||
@dataclass
|
||||
class Alert:
|
||||
kind: str # "trigger" | "heartbeat" | "levels" | "warn" | "arm" | "prime" | "late_start"
|
||||
# flat union: "trigger"|"heartbeat"|"levels"|"warn"|"arm"|"prime"|"late_start"|"screenshot"|"status"
|
||||
kind: str
|
||||
title: str
|
||||
body: str
|
||||
image_path: Path | None = None # annotated screenshot
|
||||
direction: str | None = None # "BUY"/"SELL" when kind=trigger
|
||||
silent: bool = False # disable_notification for Telegram; ignored by Discord
|
||||
|
||||
|
||||
class Notifier(Protocol):
|
||||
|
||||
@@ -33,6 +33,7 @@ class TelegramNotifier:
|
||||
"chat_id": self._chat_id,
|
||||
"caption": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_notification": str(alert.silent).lower(),
|
||||
},
|
||||
files={"photo": fh},
|
||||
timeout=10,
|
||||
@@ -44,6 +45,7 @@ class TelegramNotifier:
|
||||
"chat_id": self._chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_notification": alert.silent,
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
118
src/atm/scheduler.py
Normal file
118
src/atm/scheduler.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""ScreenshotScheduler — periodic capture + annotate + send.
|
||||
|
||||
Runs as an asyncio task. capture() and cv2 work execute in asyncio.to_thread
|
||||
to avoid blocking the event loop. Decision 13: scheduler calls capture()
|
||||
directly, NOT via Detector.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from .notifier import Alert
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ScreenshotScheduler:
|
||||
"""Periodic screenshot sender.
|
||||
|
||||
Constructor params are explicit (decision 11 outside-voice finding).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
capture: Callable, # () -> ndarray | None
|
||||
save_fn: Callable, # (frame, label, now) -> Path | None
|
||||
notifier, # _NotifierLike
|
||||
audit, # _AuditLike
|
||||
interval_s: int | None = None,
|
||||
) -> None:
|
||||
self._capture = capture
|
||||
self._save_fn = save_fn
|
||||
self._notifier = notifier
|
||||
self._audit = audit
|
||||
self._interval_s = interval_s
|
||||
self._is_running = False
|
||||
self._next_due: float | None = None # monotonic
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public state
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._is_running
|
||||
|
||||
@property
|
||||
def interval_s(self) -> int | None:
|
||||
return self._interval_s
|
||||
|
||||
@property
|
||||
def next_due(self) -> float | None:
|
||||
return self._next_due
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Control (called from async event loop)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def start(self, interval_s: int) -> None:
|
||||
self._interval_s = interval_s
|
||||
self._is_running = True
|
||||
self._next_due = time.monotonic() + interval_s
|
||||
|
||||
def stop(self) -> None:
|
||||
self._is_running = False
|
||||
self._next_due = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Task body
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def run(self) -> None:
|
||||
"""Runs until cancelled."""
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
if not self._is_running or self._next_due is None:
|
||||
continue
|
||||
if time.monotonic() >= self._next_due:
|
||||
await self._take_screenshot()
|
||||
if self._is_running and self._interval_s is not None:
|
||||
self._next_due = time.monotonic() + self._interval_s
|
||||
|
||||
async def _take_screenshot(self) -> None:
|
||||
now = time.time()
|
||||
try:
|
||||
frame = await asyncio.to_thread(self._capture)
|
||||
except Exception as exc:
|
||||
logger.warning("ScreenshotScheduler capture failed: %s", exc)
|
||||
self._audit.log({"ts": now, "event": "screenshot_sent", "status": "capture_failed", "error": str(exc)})
|
||||
self._notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
silent=True,
|
||||
))
|
||||
return
|
||||
|
||||
if frame is None:
|
||||
self._notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
silent=True,
|
||||
))
|
||||
return
|
||||
|
||||
path = await asyncio.to_thread(self._save_fn, frame, "poll", now)
|
||||
self._audit.log({"ts": now, "event": "screenshot_sent", "path": str(path) if path else None})
|
||||
self._notifier.send(Alert(
|
||||
kind="screenshot",
|
||||
title="Screenshot periodic",
|
||||
body="",
|
||||
image_path=path,
|
||||
silent=True,
|
||||
))
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Tests for atm.main unified CLI."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -186,7 +187,7 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
]
|
||||
def __init__(self, *a, **kw):
|
||||
self._i = 0
|
||||
def step(self, ts):
|
||||
def step(self, ts, frame=None):
|
||||
if self._i >= len(self._script):
|
||||
raise _StopLoop
|
||||
color, accepted = self._script[self._i]
|
||||
@@ -228,6 +229,17 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
def step(self, *a, **kw):
|
||||
return types.SimpleNamespace(status="pending", levels=None)
|
||||
|
||||
class _StubPoller:
|
||||
def __init__(self, *a, **kw): pass
|
||||
async def run(self): await asyncio.sleep(9999)
|
||||
|
||||
class _StubScheduler:
|
||||
def __init__(self, *a, **kw):
|
||||
self.is_running = False
|
||||
def start(self, interval_s): self.is_running = True
|
||||
def stop(self): self.is_running = False
|
||||
async def run(self): await asyncio.sleep(9999)
|
||||
|
||||
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
||||
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
||||
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
||||
@@ -237,6 +249,8 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
||||
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
||||
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
|
||||
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
|
||||
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
|
||||
|
||||
with pytest.raises(_StopLoop):
|
||||
_main.run_live(cfg, duration_s=None)
|
||||
@@ -255,3 +269,135 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
|
||||
assert len(trigger) == 1
|
||||
assert trigger[0].direction == "SELL"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MUST-HAVE: async lifecycle integration test
|
||||
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
|
||||
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
|
||||
import numpy as np
|
||||
import atm.main as _main
|
||||
from atm.detector import DetectionResult
|
||||
|
||||
captured_alerts: list = []
|
||||
scheduler_events: list[str] = []
|
||||
|
||||
class FakeFanout:
|
||||
def __init__(self, *a, **kw): pass
|
||||
def send(self, alert): captured_alerts.append(alert)
|
||||
def stop(self): pass
|
||||
def stats(self): return {}
|
||||
|
||||
class FakeCanaryResult:
|
||||
distance = 0
|
||||
drifted = False
|
||||
paused = False
|
||||
|
||||
class FakeCanary:
|
||||
def __init__(self, *a, **kw): self.is_paused = False
|
||||
def check(self, frame): return FakeCanaryResult()
|
||||
def resume(self): pass
|
||||
|
||||
# Scheduler tracks start/stop calls
|
||||
class FakeScheduler:
|
||||
def __init__(self, *a, **kw):
|
||||
self.is_running = False
|
||||
self.interval_s = None
|
||||
def start(self, interval_s):
|
||||
self.is_running = True
|
||||
self.interval_s = interval_s
|
||||
scheduler_events.append(f"start:{interval_s}")
|
||||
def stop(self):
|
||||
self.is_running = False
|
||||
scheduler_events.append("stop")
|
||||
async def run(self):
|
||||
await asyncio.sleep(9999)
|
||||
|
||||
class FakePoller:
|
||||
def __init__(self, *a, **kw): pass
|
||||
async def run(self): await asyncio.sleep(9999)
|
||||
|
||||
class _StopLoop(Exception): pass
|
||||
|
||||
class ScriptedDetector:
|
||||
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
|
||||
_script = [
|
||||
("turquoise", True),
|
||||
("dark_green", True),
|
||||
("light_green", True),
|
||||
]
|
||||
def __init__(self, *a, **kw): self._i = 0
|
||||
def step(self, ts, frame=None):
|
||||
if self._i >= len(self._script):
|
||||
raise _StopLoop
|
||||
color, accepted = self._script[self._i]
|
||||
self._i += 1
|
||||
return DetectionResult(ts=ts, window_found=True, dot_found=True,
|
||||
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
|
||||
@property
|
||||
def rolling(self): return []
|
||||
|
||||
def fake_build_capture(cfg, capture_stub=False):
|
||||
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
|
||||
|
||||
cfg = MagicMock()
|
||||
cfg.lockout_s = 60
|
||||
cfg.heartbeat_min = 999
|
||||
cfg.loop_interval_s = 0
|
||||
cfg.config_version = "test"
|
||||
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
|
||||
cfg.canary.drift_threshold = 10
|
||||
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
|
||||
cfg.chart_window_region = None
|
||||
cfg.telegram.auto_poll_interval_s = 180
|
||||
cfg.telegram.bot_token = "tok"
|
||||
cfg.telegram.chat_id = "123"
|
||||
cfg.telegram.allowed_chat_ids = ("123",)
|
||||
|
||||
fake_sched = FakeScheduler()
|
||||
|
||||
monkeypatch.chdir(tmp_path)
|
||||
|
||||
class _Stub:
|
||||
def __init__(self, *a, **kw): pass
|
||||
def log(self, *a, **kw): pass
|
||||
def close(self, *a, **kw): pass
|
||||
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
|
||||
|
||||
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
||||
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
||||
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
||||
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
|
||||
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
|
||||
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
|
||||
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
||||
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
||||
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
|
||||
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
|
||||
|
||||
with pytest.raises(_StopLoop):
|
||||
await _main.run_live_async(cfg, duration_s=None)
|
||||
|
||||
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
|
||||
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
|
||||
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
|
||||
|
||||
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
|
||||
assert arm_alerts[0].direction == "BUY"
|
||||
|
||||
assert len(prime_alerts) == 1
|
||||
assert prime_alerts[0].direction == "BUY"
|
||||
|
||||
assert len(trigger_alerts) == 1
|
||||
assert trigger_alerts[0].direction == "BUY"
|
||||
|
||||
# Scheduler must have started (on PRIME) and stopped (on FIRE)
|
||||
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
|
||||
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
|
||||
start_idx = scheduler_events.index("start:180")
|
||||
stop_idx = scheduler_events.index("stop")
|
||||
assert start_idx < stop_idx, "scheduler started after it stopped"
|
||||
|
||||
Reference in New Issue
Block a user