Compare commits
9 Commits
8ff31ed241
...
0f7dd5dc84
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f7dd5dc84 | ||
|
|
63642e71dd | ||
|
|
424437ceaf | ||
|
|
ca6e578175 | ||
|
|
4123b31a22 | ||
|
|
c1b89ad6a9 | ||
|
|
fd04fcd5e6 | ||
|
|
c6714e8d5e | ||
|
|
238243b1ce |
35
CLAUDE.md
Normal file
35
CLAUDE.md
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
# ATM — Automated Trading Monitor
|
||||||
|
|
||||||
|
Personal Faza-1 tool for the M2D strategy. Python 3.11+.
|
||||||
|
|
||||||
|
## Quick Reference
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install -e ".[windows]" # Windows: live capture
|
||||||
|
pip install -e . # Linux/macOS: dev/dryrun only
|
||||||
|
atm calibrate # Tk wizard
|
||||||
|
atm debug --delay 5 # one-shot capture + detect
|
||||||
|
atm run --start-at 16:30 --stop-at 23:00 # live session
|
||||||
|
atm dryrun samples # corpus gate
|
||||||
|
pytest # run tests
|
||||||
|
```
|
||||||
|
|
||||||
|
## Skill routing
|
||||||
|
|
||||||
|
When the user's request matches an available skill, ALWAYS invoke it using the Skill
|
||||||
|
tool as your FIRST action. Do NOT answer directly, do NOT use other tools first.
|
||||||
|
The skill has specialized workflows that produce better results than ad-hoc answers.
|
||||||
|
|
||||||
|
Key routing rules:
|
||||||
|
- Product ideas, "is this worth building", brainstorming → invoke office-hours
|
||||||
|
- Bugs, errors, "why is this broken", 500 errors → invoke investigate
|
||||||
|
- Ship, deploy, push, create PR → invoke ship
|
||||||
|
- QA, test the site, find bugs → invoke qa
|
||||||
|
- Code review, check my diff → invoke review
|
||||||
|
- Update docs after shipping → invoke document-release
|
||||||
|
- Weekly retro → invoke retro
|
||||||
|
- Design system, brand → invoke design-consultation
|
||||||
|
- Visual audit, design polish → invoke design-review
|
||||||
|
- Architecture review → invoke plan-eng-review
|
||||||
|
- Save progress, checkpoint, resume → invoke checkpoint
|
||||||
|
- Code quality, health check → invoke health
|
||||||
10
TODOS.md
10
TODOS.md
@@ -49,9 +49,17 @@ Read-only web view of today's audit JSONL + recent triggers. Useful for review a
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## P2-yaxis-recalib-detect — Y-axis recalibration detection
|
||||||
|
|
||||||
|
Price overlay (from Telegram commands feature) uses `y_axis` linear interpolation to show current price on screenshots. When the user rescales the chart y-axis (common after overnight price gaps), the calibration becomes stale and prices shown are incorrect. Canary check detects layout drift but NOT y-axis range changes.
|
||||||
|
|
||||||
|
- Possible approaches: OCR on y-axis labels (fragile), track price range consistency across sessions, or simple "calibration age" warning after N hours.
|
||||||
|
- Start after price overlay is live and the false-price frequency is known.
|
||||||
|
- Depends on: Telegram commands + price overlay feature being shipped.
|
||||||
|
|
||||||
## Quality debt
|
## Quality debt
|
||||||
|
|
||||||
- [ ] **Integration test for run_live loop**: currently mocked at module level. Add a short-duration in-memory loop test that threads real detector/state_machine/audit together (no network).
|
- [x] **Integration test for run_live loop**: lifecycle async test added in `tests/test_main.py` (IDLE→ARMED→PRIMED auto-poll→FIRE auto-stop).
|
||||||
- [ ] **Coverage report**: run `pytest --cov=atm --cov-report=term-missing`, aim for ≥ 85% per module.
|
- [ ] **Coverage report**: run `pytest --cov=atm --cov-report=term-missing`, aim for ≥ 85% per module.
|
||||||
- [ ] **Typing strictness**: run `pyright src/` with strict mode, fix reported issues.
|
- [ ] **Typing strictness**: run `pyright src/` with strict mode, fix reported issues.
|
||||||
- [ ] **Perf baseline**: profile one detection cycle on a representative frame; ensure < 100ms so 5s loop has ample headroom.
|
- [ ] **Perf baseline**: profile one detection cycle on a representative frame; ensure < 100ms so 5s loop has ample headroom.
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ dependencies = [
|
|||||||
"pillow>=10.0",
|
"pillow>=10.0",
|
||||||
"requests>=2.31",
|
"requests>=2.31",
|
||||||
"rich>=13.0",
|
"rich>=13.0",
|
||||||
|
"httpx>=0.27",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
@@ -24,6 +25,7 @@ windows = [
|
|||||||
dev = [
|
dev = [
|
||||||
"pytest>=8.0",
|
"pytest>=8.0",
|
||||||
"pytest-cov>=5.0",
|
"pytest-cov>=5.0",
|
||||||
|
"pytest-asyncio>=0.23",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import threading
|
||||||
from datetime import datetime, date
|
from datetime import datetime, date
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Callable, IO
|
from typing import Callable, IO
|
||||||
@@ -16,21 +17,25 @@ class AuditLog:
|
|||||||
self._clock: Callable[[], datetime] = clock or datetime.now
|
self._clock: Callable[[], datetime] = clock or datetime.now
|
||||||
self._current_date: date | None = None
|
self._current_date: date | None = None
|
||||||
self._fh: IO[str] | None = None
|
self._fh: IO[str] | None = None
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
def log(self, event: dict) -> None:
|
def log(self, event: dict) -> None:
|
||||||
now = self._clock()
|
now = self._clock()
|
||||||
today = now.date()
|
today = now.date()
|
||||||
|
|
||||||
if today != self._current_date:
|
|
||||||
self._open(today)
|
|
||||||
|
|
||||||
if "ts" not in event:
|
if "ts" not in event:
|
||||||
event = {**event, "ts": now.isoformat()}
|
event = {**event, "ts": now.isoformat()}
|
||||||
|
with self._lock:
|
||||||
assert self._fh is not None
|
if today != self._current_date:
|
||||||
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
|
self._open(today)
|
||||||
|
assert self._fh is not None
|
||||||
|
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._close_locked()
|
||||||
|
|
||||||
|
def _close_locked(self) -> None:
|
||||||
|
"""Close file handle; must be called while holding self._lock."""
|
||||||
if self._fh is not None:
|
if self._fh is not None:
|
||||||
try:
|
try:
|
||||||
self._fh.close()
|
self._fh.close()
|
||||||
@@ -47,7 +52,7 @@ class AuditLog:
|
|||||||
return self._base_dir / f"{self._current_date}.jsonl"
|
return self._base_dir / f"{self._current_date}.jsonl"
|
||||||
|
|
||||||
def _open(self, today: date) -> None:
|
def _open(self, today: date) -> None:
|
||||||
self.close()
|
self._close_locked() # already holding self._lock
|
||||||
self._base_dir.mkdir(parents=True, exist_ok=True)
|
self._base_dir.mkdir(parents=True, exist_ok=True)
|
||||||
path = self._base_dir / f"{today}.jsonl"
|
path = self._base_dir / f"{today}.jsonl"
|
||||||
self._fh = open(path, "a", buffering=1, encoding="utf-8")
|
self._fh = open(path, "a", buffering=1, encoding="utf-8")
|
||||||
|
|||||||
163
src/atm/commands.py
Normal file
163
src/atm/commands.py
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
"""Telegram command poller + Command dataclass.
|
||||||
|
|
||||||
|
Uses httpx (async) for long-polling getUpdates. The sync TelegramNotifier
|
||||||
|
continues to use requests — this module is the only httpx consumer.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import TYPE_CHECKING, Literal
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .config import TelegramCfg
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CommandAction = Literal["set_interval", "stop", "status", "ss"]
|
||||||
|
|
||||||
|
_BASE = "https://api.telegram.org/bot{token}/{method}"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Command:
|
||||||
|
action: CommandAction
|
||||||
|
value: int | None = None # seconds; only for set_interval
|
||||||
|
|
||||||
|
|
||||||
|
class TelegramPoller:
|
||||||
|
"""Long-poll Telegram getUpdates, emit Commands into asyncio.Queue.
|
||||||
|
|
||||||
|
Security: rejects messages from chat_ids not in cfg.allowed_chat_ids.
|
||||||
|
Degrades (stops polling) after 3 consecutive 401 responses and warns
|
||||||
|
via Discord (caller responsibility — poller only logs + sets degraded flag).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cfg: TelegramCfg,
|
||||||
|
cmd_queue: asyncio.Queue[Command],
|
||||||
|
audit, # _AuditLike
|
||||||
|
) -> None:
|
||||||
|
self._cfg = cfg
|
||||||
|
self._cmd_queue = cmd_queue
|
||||||
|
self._audit = audit
|
||||||
|
self._offset = 0
|
||||||
|
self._consecutive_401 = 0
|
||||||
|
self._degraded = False
|
||||||
|
# fallback: if allowed_chat_ids is empty, accept only the primary chat
|
||||||
|
self._allowed = set(cfg.allowed_chat_ids) or {cfg.chat_id}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def degraded(self) -> bool:
|
||||||
|
return self._degraded
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
await self._drain(client)
|
||||||
|
while True:
|
||||||
|
if self._degraded:
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
await self._poll_once(client)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except (httpx.HTTPError, httpx.TimeoutException) as exc:
|
||||||
|
self._audit.log({"event": "poller_error", "error": str(exc)})
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
except Exception as exc: # json, unexpected
|
||||||
|
self._audit.log({"event": "poller_error", "error": str(exc)})
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
async def _drain(self, client: httpx.AsyncClient) -> None:
|
||||||
|
"""Discard all pending updates at startup so stale commands don't replay."""
|
||||||
|
try:
|
||||||
|
resp = await client.get(
|
||||||
|
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
|
||||||
|
params={"timeout": 0, "offset": self._offset},
|
||||||
|
timeout=10,
|
||||||
|
)
|
||||||
|
body = resp.json()
|
||||||
|
if body.get("ok") and body.get("result"):
|
||||||
|
self._offset = body["result"][-1]["update_id"] + 1
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("TelegramPoller startup drain failed: %s", exc)
|
||||||
|
|
||||||
|
async def _poll_once(self, client: httpx.AsyncClient) -> None:
|
||||||
|
resp = await client.get(
|
||||||
|
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
|
||||||
|
params={"timeout": self._cfg.poll_timeout_s, "offset": self._offset},
|
||||||
|
timeout=self._cfg.poll_timeout_s + 5,
|
||||||
|
)
|
||||||
|
|
||||||
|
if resp.status_code == 401:
|
||||||
|
self._consecutive_401 += 1
|
||||||
|
if self._consecutive_401 >= 3:
|
||||||
|
self._degraded = True
|
||||||
|
self._audit.log({"event": "poller_degraded", "reason": "3_consecutive_401"})
|
||||||
|
return
|
||||||
|
self._consecutive_401 = 0
|
||||||
|
|
||||||
|
body = resp.json()
|
||||||
|
if not body.get("ok"):
|
||||||
|
return
|
||||||
|
|
||||||
|
for update in body.get("result", []):
|
||||||
|
self._offset = update["update_id"] + 1
|
||||||
|
await self._process_update(update)
|
||||||
|
|
||||||
|
async def _process_update(self, update: dict) -> None:
|
||||||
|
if "callback_query" in update:
|
||||||
|
# Inline button pressed — may be expired; reply with fallback
|
||||||
|
cbq = update["callback_query"]
|
||||||
|
chat_id = str(cbq.get("from", {}).get("id", ""))
|
||||||
|
if chat_id not in self._allowed:
|
||||||
|
logger.info("Rejected callback_query from chat_id=%s", chat_id)
|
||||||
|
return
|
||||||
|
# Caller handles answerCallbackQuery; just note in audit
|
||||||
|
self._audit.log({"event": "command_received", "action": "callback_query", "chat_id": chat_id})
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = update.get("message") or update.get("edited_message")
|
||||||
|
if not msg:
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = str(msg.get("chat", {}).get("id", ""))
|
||||||
|
if chat_id not in self._allowed:
|
||||||
|
logger.info("Rejected message from chat_id=%s", chat_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
text = (msg.get("text") or "").strip().lower()
|
||||||
|
cmd = self._parse_command(text)
|
||||||
|
if cmd is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._audit.log({
|
||||||
|
"event": "command_received",
|
||||||
|
"action": cmd.action,
|
||||||
|
"value": cmd.value,
|
||||||
|
"chat_id": chat_id,
|
||||||
|
})
|
||||||
|
await self._cmd_queue.put(cmd)
|
||||||
|
|
||||||
|
def _parse_command(self, text: str) -> Command | None:
|
||||||
|
t = text.lstrip("/").strip()
|
||||||
|
if not t:
|
||||||
|
return None
|
||||||
|
if t == "stop":
|
||||||
|
return Command(action="stop")
|
||||||
|
if t == "status":
|
||||||
|
return Command(action="status")
|
||||||
|
if t in ("ss", "screenshot"):
|
||||||
|
return Command(action="ss")
|
||||||
|
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
|
||||||
|
parts = t.split()
|
||||||
|
if len(parts) == 1 and parts[0].isdigit():
|
||||||
|
return Command(action="set_interval", value=int(parts[0]) * 60)
|
||||||
|
if len(parts) == 2 and parts[0] in ("interval", "set_interval") and parts[1].isdigit():
|
||||||
|
return Command(action="set_interval", value=int(parts[1]) * 60)
|
||||||
|
return None
|
||||||
@@ -78,6 +78,9 @@ class DiscordCfg:
|
|||||||
class TelegramCfg:
|
class TelegramCfg:
|
||||||
bot_token: str
|
bot_token: str
|
||||||
chat_id: str
|
chat_id: str
|
||||||
|
allowed_chat_ids: tuple[str, ...] = ()
|
||||||
|
poll_timeout_s: int = 30
|
||||||
|
auto_poll_interval_s: int = 180
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
if not self.bot_token or not self.chat_id:
|
if not self.bot_token or not self.chat_id:
|
||||||
@@ -156,9 +159,14 @@ class Config:
|
|||||||
drift_threshold=int(data["canary"].get("drift_threshold", 8)),
|
drift_threshold=int(data["canary"].get("drift_threshold", 8)),
|
||||||
)
|
)
|
||||||
discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"])
|
discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"])
|
||||||
|
tg = data["telegram"]
|
||||||
|
_allowed = [str(c) for c in tg.get("allowed_chat_ids", [])] or [str(tg["chat_id"])]
|
||||||
telegram = TelegramCfg(
|
telegram = TelegramCfg(
|
||||||
bot_token=data["telegram"]["bot_token"],
|
bot_token=tg["bot_token"],
|
||||||
chat_id=str(data["telegram"]["chat_id"]),
|
chat_id=str(tg["chat_id"]),
|
||||||
|
allowed_chat_ids=tuple(_allowed),
|
||||||
|
poll_timeout_s=int(tg.get("poll_timeout_s", 30)),
|
||||||
|
auto_poll_interval_s=int(tg.get("auto_poll_interval_s", 180)),
|
||||||
)
|
)
|
||||||
opts = data.get("options", {})
|
opts = data.get("options", {})
|
||||||
region = None
|
region = None
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ class DetectionResult:
|
|||||||
match: ColorMatch | None # None if no dot
|
match: ColorMatch | None # None if no dot
|
||||||
accepted: bool # post-debounce; True only when match repeats debounce_depth times
|
accepted: bool # post-debounce; True only when match repeats debounce_depth times
|
||||||
color: str | None # accepted color name (UNKNOWN excluded)
|
color: str | None # accepted color name (UNKNOWN excluded)
|
||||||
|
dot_pos_abs: tuple[int, int] | None = None # absolute (x, y) in frame; set when dot_found
|
||||||
|
|
||||||
|
|
||||||
class Detector:
|
class Detector:
|
||||||
@@ -60,8 +61,14 @@ class Detector:
|
|||||||
self._debounce: deque[str | None] = deque(maxlen=cfg.debounce_depth)
|
self._debounce: deque[str | None] = deque(maxlen=cfg.debounce_depth)
|
||||||
self._rolling: deque[DetectionResult] = deque(maxlen=20)
|
self._rolling: deque[DetectionResult] = deque(maxlen=20)
|
||||||
|
|
||||||
def step(self, ts: float) -> DetectionResult:
|
def step(self, ts: float, frame=None) -> DetectionResult:
|
||||||
frame = self._capture()
|
"""Run one detection tick.
|
||||||
|
|
||||||
|
frame: pre-captured BGR ndarray (from asyncio.to_thread capture). When
|
||||||
|
None (default), calls self._capture() — preserving the sync-loop behaviour.
|
||||||
|
"""
|
||||||
|
if frame is None:
|
||||||
|
frame = self._capture()
|
||||||
|
|
||||||
if frame is None:
|
if frame is None:
|
||||||
self._debounce.append(None)
|
self._debounce.append(None)
|
||||||
@@ -117,6 +124,7 @@ class Detector:
|
|||||||
match=match,
|
match=match,
|
||||||
accepted=accepted,
|
accepted=accepted,
|
||||||
color=color,
|
color=color,
|
||||||
|
dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y),
|
||||||
)
|
)
|
||||||
self._rolling.append(r)
|
self._rolling.append(r)
|
||||||
return r
|
return r
|
||||||
|
|||||||
411
src/atm/main.py
411
src/atm/main.py
@@ -2,12 +2,15 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import contextlib
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import TYPE_CHECKING, Callable, Protocol, cast
|
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
|
||||||
|
|
||||||
from atm.config import Config # stdlib-only (tomllib); safe at module level
|
from atm.config import Config # stdlib-only (tomllib); safe at module level
|
||||||
from atm.notifier import Alert
|
from atm.notifier import Alert
|
||||||
@@ -348,6 +351,8 @@ def _save_annotated_frame(
|
|||||||
label: str,
|
label: str,
|
||||||
now: float,
|
now: float,
|
||||||
audit: _AuditLike | None = None,
|
audit: _AuditLike | None = None,
|
||||||
|
dot_pos_abs: "tuple[int, int] | None" = None,
|
||||||
|
canary_ok: bool = True,
|
||||||
) -> "Path | None":
|
) -> "Path | None":
|
||||||
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
|
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
|
||||||
|
|
||||||
@@ -355,6 +360,10 @@ def _save_annotated_frame(
|
|||||||
audit (when provided) so disk-full / permission issues don't become silent
|
audit (when provided) so disk-full / permission issues don't become silent
|
||||||
regressions. Never raises — snapshot is a best-effort enhancement, the
|
regressions. Never raises — snapshot is a best-effort enhancement, the
|
||||||
text alert must still go out.
|
text alert must still go out.
|
||||||
|
|
||||||
|
dot_pos_abs + canary_ok: when both are set the price overlay is drawn
|
||||||
|
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted
|
||||||
|
since calibration may be stale.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
import cv2 # type: ignore[import-untyped]
|
import cv2 # type: ignore[import-untyped]
|
||||||
@@ -371,6 +380,22 @@ def _save_annotated_frame(
|
|||||||
annotated = frame.copy()
|
annotated = frame.copy()
|
||||||
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
|
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
|
||||||
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
|
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
|
||||||
|
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
|
||||||
|
try:
|
||||||
|
_, dot_y = dot_pos_abs
|
||||||
|
ya = cfg.y_axis
|
||||||
|
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
|
||||||
|
price = ya.p1_price + (dot_y - ya.p1_y) * slope
|
||||||
|
w_frame = annotated.shape[1]
|
||||||
|
text = f"${price:.2f}"
|
||||||
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||||
|
scale, thickness = 1.2, 3
|
||||||
|
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
|
||||||
|
tx, ty = w_frame - tw - 10, th + 10
|
||||||
|
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
|
||||||
|
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
|
||||||
|
except Exception:
|
||||||
|
pass # price overlay is best-effort; never break the screenshot
|
||||||
cv2.imwrite(str(fpath), annotated)
|
cv2.imwrite(str(fpath), annotated)
|
||||||
return fpath
|
return fpath
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
@@ -496,7 +521,113 @@ def _handle_tick(
|
|||||||
return tr
|
return tr
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _TickSyncResult:
|
||||||
|
frame: Any = None
|
||||||
|
res: Any = None # DetectionResult | None
|
||||||
|
tr: Any = None # Transition | None
|
||||||
|
first_consumed: bool = False
|
||||||
|
late_start: bool = False
|
||||||
|
new_color: str | None = None # corpus sample color when changed
|
||||||
|
|
||||||
|
|
||||||
|
def _sync_detection_tick(
|
||||||
|
capture: Callable,
|
||||||
|
canary: Any,
|
||||||
|
cfg: Any,
|
||||||
|
detector: Any,
|
||||||
|
fsm: Any,
|
||||||
|
notifier: _NotifierLike,
|
||||||
|
audit: _AuditLike,
|
||||||
|
detection_log: _AuditLike,
|
||||||
|
fires_dir: Path,
|
||||||
|
first_accepted: bool,
|
||||||
|
last_saved_color: "str | None",
|
||||||
|
now: float,
|
||||||
|
samples_dir: Path,
|
||||||
|
) -> _TickSyncResult:
|
||||||
|
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread."""
|
||||||
|
frame = capture()
|
||||||
|
if frame is None:
|
||||||
|
audit.log({"ts": now, "event": "window_lost"})
|
||||||
|
return _TickSyncResult()
|
||||||
|
|
||||||
|
cr = canary.check(frame)
|
||||||
|
if canary.is_paused:
|
||||||
|
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
|
||||||
|
return _TickSyncResult(frame=frame)
|
||||||
|
|
||||||
|
res = detector.step(now, frame)
|
||||||
|
detection_log.log({
|
||||||
|
"ts": now, "event": "frame",
|
||||||
|
"window_found": res.window_found,
|
||||||
|
"dot_found": res.dot_found,
|
||||||
|
"rgb": list(res.rgb) if res.rgb is not None else None,
|
||||||
|
"match_name": res.match.name if res.match is not None else None,
|
||||||
|
"distance": round(res.match.distance, 2) if res.match is not None else None,
|
||||||
|
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
|
||||||
|
"accepted": res.accepted,
|
||||||
|
"color": res.color,
|
||||||
|
})
|
||||||
|
|
||||||
|
if not (res.accepted and res.color):
|
||||||
|
return _TickSyncResult(frame=frame, res=res)
|
||||||
|
|
||||||
|
is_first = first_accepted
|
||||||
|
|
||||||
|
def _snapshot(kind: str, label: str) -> "Path | None":
|
||||||
|
if not getattr(cfg.attach_screenshots, kind, True):
|
||||||
|
return None
|
||||||
|
return _save_annotated_frame(
|
||||||
|
frame, cfg, fires_dir, label, now, audit=audit,
|
||||||
|
dot_pos_abs=getattr(res, "dot_pos_abs", None),
|
||||||
|
canary_ok=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot)
|
||||||
|
|
||||||
|
if tr is None:
|
||||||
|
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True)
|
||||||
|
|
||||||
|
new_color: str | None = None
|
||||||
|
if res.color != last_saved_color:
|
||||||
|
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||||
|
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
|
||||||
|
try:
|
||||||
|
import cv2 # type: ignore[import-untyped]
|
||||||
|
cv2.imwrite(str(sample_path), frame)
|
||||||
|
except Exception as exc:
|
||||||
|
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
|
||||||
|
new_color = res.color
|
||||||
|
|
||||||
|
if tr.trigger and not tr.locked:
|
||||||
|
fire_path: "Path | None" = None
|
||||||
|
if cfg.attach_screenshots.trigger:
|
||||||
|
fire_path = _save_annotated_frame(
|
||||||
|
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
|
||||||
|
dot_pos_abs=getattr(res, "dot_pos_abs", None),
|
||||||
|
canary_ok=True,
|
||||||
|
)
|
||||||
|
notifier.send(Alert(
|
||||||
|
kind="trigger",
|
||||||
|
title=f"Semnal {tr.trigger}",
|
||||||
|
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||||
|
image_path=fire_path,
|
||||||
|
direction=tr.trigger,
|
||||||
|
))
|
||||||
|
|
||||||
|
return _TickSyncResult(
|
||||||
|
frame=frame, res=res, tr=tr,
|
||||||
|
first_consumed=is_first, new_color=new_color,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||||
|
"""Sync entry point — delegates to asyncio event loop."""
|
||||||
|
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
|
||||||
|
|
||||||
|
|
||||||
|
async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||||
"""Main live monitoring loop. Imports are lazy to keep --help fast."""
|
"""Main live monitoring loop. Imports are lazy to keep --help fast."""
|
||||||
try:
|
try:
|
||||||
from atm.detector import Detector
|
from atm.detector import Detector
|
||||||
@@ -506,6 +637,8 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
|||||||
from atm.notifier.discord import DiscordNotifier
|
from atm.notifier.discord import DiscordNotifier
|
||||||
from atm.notifier.telegram import TelegramNotifier
|
from atm.notifier.telegram import TelegramNotifier
|
||||||
from atm.audit import AuditLog
|
from atm.audit import AuditLog
|
||||||
|
from atm.commands import TelegramPoller, Command
|
||||||
|
from atm.scheduler import ScreenshotScheduler
|
||||||
except ImportError as exc:
|
except ImportError as exc:
|
||||||
sys.exit(f"run-loop dependencies not available: {exc}")
|
sys.exit(f"run-loop dependencies not available: {exc}")
|
||||||
|
|
||||||
@@ -521,7 +654,6 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
|||||||
]
|
]
|
||||||
|
|
||||||
def _on_drop(backend_name: str, dropped: Alert) -> None:
|
def _on_drop(backend_name: str, dropped: Alert) -> None:
|
||||||
"""Audit la depășire coadă — face eșecul silențios vizibil."""
|
|
||||||
audit.log({
|
audit.log({
|
||||||
"ts": time.time(),
|
"ts": time.time(),
|
||||||
"event": "queue_overflow_drop",
|
"event": "queue_overflow_drop",
|
||||||
@@ -532,7 +664,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
|||||||
|
|
||||||
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
|
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
|
||||||
|
|
||||||
# Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea.
|
# Initial frame + canary check
|
||||||
first_frame = capture()
|
first_frame = capture()
|
||||||
if first_frame is None:
|
if first_frame is None:
|
||||||
print("WARN: first capture returned None — window/region missing", flush=True)
|
print("WARN: first capture returned None — window/region missing", flush=True)
|
||||||
@@ -542,9 +674,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
|||||||
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
|
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
|
||||||
if first_check.drifted:
|
if first_check.drifted:
|
||||||
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
|
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
|
||||||
canary.resume() # clear the auto-pause so user can Ctrl+C and fix
|
canary.resume()
|
||||||
|
|
||||||
dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
|
dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
|
||||||
notifier.send(Alert(
|
notifier.send(Alert(
|
||||||
kind="heartbeat",
|
kind="heartbeat",
|
||||||
title="ATM pornit",
|
title="ATM pornit",
|
||||||
@@ -556,106 +688,42 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
|||||||
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
|
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
|
||||||
|
|
||||||
start = time.monotonic()
|
start = time.monotonic()
|
||||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
|
||||||
levels_extractor = None
|
|
||||||
last_saved_color: str | None = None
|
|
||||||
first_accepted = True
|
|
||||||
samples_dir = Path("samples")
|
samples_dir = Path("samples")
|
||||||
samples_dir.mkdir(exist_ok=True)
|
samples_dir.mkdir(exist_ok=True)
|
||||||
fires_dir = Path("logs") / "fires"
|
fires_dir = Path("logs") / "fires"
|
||||||
fires_dir.mkdir(parents=True, exist_ok=True)
|
fires_dir.mkdir(parents=True, exist_ok=True)
|
||||||
import cv2 # type: ignore[import-untyped]
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while duration_s is None or (time.monotonic() - start) < duration_s:
|
import cv2 # noqa: F401 fail fast if cv2 is missing # type: ignore[import-untyped]
|
||||||
now = time.time()
|
except ImportError:
|
||||||
frame = capture()
|
pass
|
||||||
if frame is None:
|
|
||||||
audit.log({"ts": now, "event": "window_lost"})
|
|
||||||
time.sleep(cfg.loop_interval_s)
|
|
||||||
continue
|
|
||||||
# canary check
|
|
||||||
cr = canary.check(frame)
|
|
||||||
if canary.is_paused:
|
|
||||||
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
|
|
||||||
time.sleep(cfg.loop_interval_s)
|
|
||||||
continue
|
|
||||||
# detection
|
|
||||||
res = detector.step(now)
|
|
||||||
detection_log.log({
|
|
||||||
"ts": now,
|
|
||||||
"event": "frame",
|
|
||||||
"window_found": res.window_found,
|
|
||||||
"dot_found": res.dot_found,
|
|
||||||
"rgb": list(res.rgb) if res.rgb is not None else None,
|
|
||||||
"match_name": res.match.name if res.match is not None else None,
|
|
||||||
"distance": round(res.match.distance, 2) if res.match is not None else None,
|
|
||||||
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
|
|
||||||
"accepted": res.accepted,
|
|
||||||
"color": res.color,
|
|
||||||
})
|
|
||||||
if res.accepted and res.color:
|
|
||||||
is_first = first_accepted
|
|
||||||
first_accepted = False
|
|
||||||
|
|
||||||
# Per-iteration closure — binds current frame/now, gates on config.
|
cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
|
||||||
def _snapshot(kind: str, label: str) -> "Path | None":
|
first_accepted = True
|
||||||
if not getattr(cfg.attach_screenshots, kind, True):
|
last_saved_color: str | None = None
|
||||||
return None
|
levels_extractor = None
|
||||||
return _save_annotated_frame(
|
fire_count = 0
|
||||||
frame, cfg, fires_dir, label, now, audit=audit,
|
|
||||||
)
|
|
||||||
|
|
||||||
tr = _handle_tick(
|
def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
|
||||||
fsm, res.color, now, notifier, audit, is_first,
|
return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
|
||||||
snapshot=_snapshot,
|
|
||||||
)
|
scheduler = ScreenshotScheduler(
|
||||||
if tr is None:
|
capture=capture,
|
||||||
# pornire târzie: FSM neatins, sari peste FIRE + salvare corpus
|
save_fn=_bound_save,
|
||||||
time.sleep(cfg.loop_interval_s)
|
notifier=notifier,
|
||||||
continue
|
audit=audit,
|
||||||
# corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară
|
)
|
||||||
if res.color != last_saved_color:
|
poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
|
||||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
|
||||||
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
|
# ------------------------------------------------------------------
|
||||||
try:
|
# Nested async coroutines — capture nonlocal state from run_live_async
|
||||||
cv2.imwrite(str(sample_path), frame)
|
# ------------------------------------------------------------------
|
||||||
except Exception as exc:
|
|
||||||
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
|
async def _heartbeat_loop() -> None:
|
||||||
last_saved_color = res.color
|
nonlocal heartbeat_due
|
||||||
# FIRE: adnotează frame-ul + salvează, atașează la alertă
|
while True:
|
||||||
if tr.trigger and not tr.locked:
|
await asyncio.sleep(60)
|
||||||
fire_path: "Path | None" = None
|
if time.monotonic() > heartbeat_due:
|
||||||
if cfg.attach_screenshots.trigger:
|
|
||||||
fire_path = _save_annotated_frame(
|
|
||||||
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
|
|
||||||
)
|
|
||||||
notifier.send(Alert(
|
|
||||||
kind="trigger",
|
|
||||||
title=f"Semnal {tr.trigger}",
|
|
||||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
|
||||||
image_path=fire_path,
|
|
||||||
direction=tr.trigger,
|
|
||||||
))
|
|
||||||
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
|
|
||||||
# phase-B levels
|
|
||||||
if levels_extractor is not None:
|
|
||||||
lr = levels_extractor.step(frame, now)
|
|
||||||
if lr.status in ("complete", "timeout"):
|
|
||||||
if lr.status == "complete" and lr.levels:
|
|
||||||
notifier.send(Alert(
|
|
||||||
kind="levels",
|
|
||||||
title="Niveluri",
|
|
||||||
body=(
|
|
||||||
f"SL={lr.levels.sl} "
|
|
||||||
f"TP1={lr.levels.tp1} "
|
|
||||||
f"TP2={lr.levels.tp2}"
|
|
||||||
),
|
|
||||||
))
|
|
||||||
levels_extractor = None
|
|
||||||
# heartbeat — include statistici per-backend ca eșecurile silențioase
|
|
||||||
# să apară la fiecare 30 min fără să aștepte oprirea.
|
|
||||||
if time.time() > heartbeat_due:
|
|
||||||
try:
|
try:
|
||||||
stats = notifier.stats()
|
stats = notifier.stats()
|
||||||
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
|
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
|
||||||
@@ -668,9 +736,145 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
|||||||
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
|
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
|
||||||
except Exception:
|
except Exception:
|
||||||
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
|
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
|
||||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
|
||||||
time.sleep(cfg.loop_interval_s)
|
|
||||||
|
async def _dispatch_command(cmd: Command) -> None:
|
||||||
|
nonlocal fire_count
|
||||||
|
if cmd.action == "set_interval":
|
||||||
|
secs = cmd.value or cfg.telegram.auto_poll_interval_s
|
||||||
|
scheduler.start(secs)
|
||||||
|
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
|
||||||
|
notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
|
||||||
|
elif cmd.action == "stop":
|
||||||
|
if scheduler.is_running:
|
||||||
|
scheduler.stop()
|
||||||
|
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
|
||||||
|
notifier.send(Alert(kind="status", title="Polling oprit", body=""))
|
||||||
|
else:
|
||||||
|
notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
|
||||||
|
elif cmd.action == "status":
|
||||||
|
uptime_s = time.monotonic() - start
|
||||||
|
last_roll = detector.rolling[-1] if detector.rolling else None
|
||||||
|
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—"
|
||||||
|
last_color = (
|
||||||
|
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—"
|
||||||
|
) if last_roll else "—"
|
||||||
|
sched_info = (
|
||||||
|
f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ"
|
||||||
|
) if scheduler.is_running else "oprit"
|
||||||
|
canary_info = "drift (pauze)" if canary.is_paused else "ok"
|
||||||
|
body = (
|
||||||
|
f"Stare: {fsm.state.value}\n"
|
||||||
|
f"Ultima detecție: {last_color} (conf {last_conf})\n"
|
||||||
|
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n"
|
||||||
|
f"Poller: {sched_info} | Canary: {canary_info}"
|
||||||
|
)
|
||||||
|
notifier.send(Alert(kind="status", title="ATM Status", body=body))
|
||||||
|
elif cmd.action == "ss":
|
||||||
|
now_ss = time.time()
|
||||||
|
frame_ss = await asyncio.to_thread(capture)
|
||||||
|
if frame_ss is None:
|
||||||
|
notifier.send(Alert(
|
||||||
|
kind="warn",
|
||||||
|
title="Captură eșuată — verificați fereastra TradeStation",
|
||||||
|
body="",
|
||||||
|
))
|
||||||
|
return
|
||||||
|
path_ss = await asyncio.to_thread(
|
||||||
|
_save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit,
|
||||||
|
)
|
||||||
|
audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
|
||||||
|
notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
|
||||||
|
|
||||||
|
async def _detection_loop() -> None:
|
||||||
|
nonlocal first_accepted, last_saved_color, levels_extractor, fire_count
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if duration_s is not None and (time.monotonic() - start) >= duration_s:
|
||||||
|
break
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
result: _TickSyncResult = await asyncio.to_thread(
|
||||||
|
_sync_detection_tick,
|
||||||
|
capture, canary, cfg, detector, fsm, notifier, audit, detection_log,
|
||||||
|
fires_dir, first_accepted, last_saved_color, now, samples_dir,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.first_consumed:
|
||||||
|
first_accepted = False
|
||||||
|
if result.new_color is not None:
|
||||||
|
last_saved_color = result.new_color
|
||||||
|
|
||||||
|
tr = result.tr
|
||||||
|
res = result.res
|
||||||
|
|
||||||
|
if result.late_start or res is None:
|
||||||
|
await asyncio.sleep(cfg.loop_interval_s)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if tr is not None and res.accepted and res.color:
|
||||||
|
if tr.reason == "prime" and not scheduler.is_running:
|
||||||
|
scheduler.start(cfg.telegram.auto_poll_interval_s)
|
||||||
|
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
|
||||||
|
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running:
|
||||||
|
scheduler.stop()
|
||||||
|
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
|
||||||
|
|
||||||
|
if tr is not None and tr.trigger and not tr.locked:
|
||||||
|
fire_count += 1
|
||||||
|
if scheduler.is_running:
|
||||||
|
scheduler.stop()
|
||||||
|
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
|
||||||
|
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
|
||||||
|
|
||||||
|
if levels_extractor is not None and result.frame is not None:
|
||||||
|
lr = levels_extractor.step(result.frame, now)
|
||||||
|
if lr.status in ("complete", "timeout"):
|
||||||
|
if lr.status == "complete" and lr.levels:
|
||||||
|
notifier.send(Alert(
|
||||||
|
kind="levels",
|
||||||
|
title="Niveluri",
|
||||||
|
body=(
|
||||||
|
f"SL={lr.levels.sl} "
|
||||||
|
f"TP1={lr.levels.tp1} "
|
||||||
|
f"TP2={lr.levels.tp2}"
|
||||||
|
),
|
||||||
|
))
|
||||||
|
levels_extractor = None
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
cmd = cmd_queue.get_nowait()
|
||||||
|
await _dispatch_command(cmd)
|
||||||
|
except asyncio.QueueEmpty:
|
||||||
|
break
|
||||||
|
|
||||||
|
await asyncio.sleep(cfg.loop_interval_s)
|
||||||
|
|
||||||
|
# Launch background tasks
|
||||||
|
t_scheduler = asyncio.create_task(scheduler.run(), name="scheduler")
|
||||||
|
t_poller = asyncio.create_task(poller.run(), name="poller")
|
||||||
|
t_heartbeat = asyncio.create_task(_heartbeat_loop(), name="heartbeat")
|
||||||
|
|
||||||
|
try:
|
||||||
|
await _detection_loop()
|
||||||
finally:
|
finally:
|
||||||
|
# 7-step graceful shutdown
|
||||||
|
# 1. cancel scheduler
|
||||||
|
t_scheduler.cancel()
|
||||||
|
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||||
|
await t_scheduler
|
||||||
|
# 2. cancel poller
|
||||||
|
t_poller.cancel()
|
||||||
|
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||||
|
await t_poller
|
||||||
|
# 3. cancel heartbeat
|
||||||
|
t_heartbeat.cancel()
|
||||||
|
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||||
|
await t_heartbeat
|
||||||
|
# 4. drain detection — complete (we awaited _detection_loop directly)
|
||||||
|
# 5. send shutdown alert
|
||||||
try:
|
try:
|
||||||
stats = notifier.stats()
|
stats = notifier.stats()
|
||||||
lines = [f"după {time.monotonic() - start:.0f}s"]
|
lines = [f"după {time.monotonic() - start:.0f}s"]
|
||||||
@@ -679,13 +883,12 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
|||||||
f"{name}: sent={s['sent']} failed={s['failed']} "
|
f"{name}: sent={s['sent']} failed={s['failed']} "
|
||||||
f"dropped={s['dropped']} retries={s['retries']}"
|
f"dropped={s['dropped']} retries={s['retries']}"
|
||||||
)
|
)
|
||||||
notifier.send(Alert(
|
notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines)))
|
||||||
kind="heartbeat", title="ATM oprit",
|
|
||||||
body="\n".join(lines),
|
|
||||||
))
|
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
# 6. notifier.stop() — flush + join FanoutNotifier threads
|
||||||
notifier.stop()
|
notifier.stop()
|
||||||
|
# 7. audit.close()
|
||||||
audit.close()
|
audit.close()
|
||||||
detection_log.close()
|
detection_log.close()
|
||||||
|
|
||||||
|
|||||||
@@ -5,11 +5,13 @@ from typing import Protocol
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Alert:
|
class Alert:
|
||||||
kind: str # "trigger" | "heartbeat" | "levels" | "warn" | "arm" | "prime" | "late_start"
|
# flat union: "trigger"|"heartbeat"|"levels"|"warn"|"arm"|"prime"|"late_start"|"screenshot"|"status"
|
||||||
|
kind: str
|
||||||
title: str
|
title: str
|
||||||
body: str
|
body: str
|
||||||
image_path: Path | None = None # annotated screenshot
|
image_path: Path | None = None # annotated screenshot
|
||||||
direction: str | None = None # "BUY"/"SELL" when kind=trigger
|
direction: str | None = None # "BUY"/"SELL" when kind=trigger
|
||||||
|
silent: bool = False # disable_notification for Telegram; ignored by Discord
|
||||||
|
|
||||||
|
|
||||||
class Notifier(Protocol):
|
class Notifier(Protocol):
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ class TelegramNotifier:
|
|||||||
"chat_id": self._chat_id,
|
"chat_id": self._chat_id,
|
||||||
"caption": text,
|
"caption": text,
|
||||||
"parse_mode": "HTML",
|
"parse_mode": "HTML",
|
||||||
|
"disable_notification": str(alert.silent).lower(),
|
||||||
},
|
},
|
||||||
files={"photo": fh},
|
files={"photo": fh},
|
||||||
timeout=10,
|
timeout=10,
|
||||||
@@ -44,6 +45,7 @@ class TelegramNotifier:
|
|||||||
"chat_id": self._chat_id,
|
"chat_id": self._chat_id,
|
||||||
"text": text,
|
"text": text,
|
||||||
"parse_mode": "HTML",
|
"parse_mode": "HTML",
|
||||||
|
"disable_notification": alert.silent,
|
||||||
},
|
},
|
||||||
timeout=10,
|
timeout=10,
|
||||||
)
|
)
|
||||||
|
|||||||
118
src/atm/scheduler.py
Normal file
118
src/atm/scheduler.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
"""ScreenshotScheduler — periodic capture + annotate + send.
|
||||||
|
|
||||||
|
Runs as an asyncio task. capture() and cv2 work execute in asyncio.to_thread
|
||||||
|
to avoid blocking the event loop. Decision 13: scheduler calls capture()
|
||||||
|
directly, NOT via Detector.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
from .notifier import Alert
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ScreenshotScheduler:
|
||||||
|
"""Periodic screenshot sender.
|
||||||
|
|
||||||
|
Constructor params are explicit (decision 11 outside-voice finding).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
capture: Callable, # () -> ndarray | None
|
||||||
|
save_fn: Callable, # (frame, label, now) -> Path | None
|
||||||
|
notifier, # _NotifierLike
|
||||||
|
audit, # _AuditLike
|
||||||
|
interval_s: int | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._capture = capture
|
||||||
|
self._save_fn = save_fn
|
||||||
|
self._notifier = notifier
|
||||||
|
self._audit = audit
|
||||||
|
self._interval_s = interval_s
|
||||||
|
self._is_running = False
|
||||||
|
self._next_due: float | None = None # monotonic
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Public state
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
return self._is_running
|
||||||
|
|
||||||
|
@property
|
||||||
|
def interval_s(self) -> int | None:
|
||||||
|
return self._interval_s
|
||||||
|
|
||||||
|
@property
|
||||||
|
def next_due(self) -> float | None:
|
||||||
|
return self._next_due
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Control (called from async event loop)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def start(self, interval_s: int) -> None:
|
||||||
|
self._interval_s = interval_s
|
||||||
|
self._is_running = True
|
||||||
|
self._next_due = time.monotonic() + interval_s
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
self._is_running = False
|
||||||
|
self._next_due = None
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Task body
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
"""Runs until cancelled."""
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
if not self._is_running or self._next_due is None:
|
||||||
|
continue
|
||||||
|
if time.monotonic() >= self._next_due:
|
||||||
|
await self._take_screenshot()
|
||||||
|
if self._is_running and self._interval_s is not None:
|
||||||
|
self._next_due = time.monotonic() + self._interval_s
|
||||||
|
|
||||||
|
async def _take_screenshot(self) -> None:
|
||||||
|
now = time.time()
|
||||||
|
try:
|
||||||
|
frame = await asyncio.to_thread(self._capture)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("ScreenshotScheduler capture failed: %s", exc)
|
||||||
|
self._audit.log({"ts": now, "event": "screenshot_sent", "status": "capture_failed", "error": str(exc)})
|
||||||
|
self._notifier.send(Alert(
|
||||||
|
kind="warn",
|
||||||
|
title="Captură eșuată — verificați fereastra TradeStation",
|
||||||
|
body="",
|
||||||
|
silent=True,
|
||||||
|
))
|
||||||
|
return
|
||||||
|
|
||||||
|
if frame is None:
|
||||||
|
self._notifier.send(Alert(
|
||||||
|
kind="warn",
|
||||||
|
title="Captură eșuată — verificați fereastra TradeStation",
|
||||||
|
body="",
|
||||||
|
silent=True,
|
||||||
|
))
|
||||||
|
return
|
||||||
|
|
||||||
|
path = await asyncio.to_thread(self._save_fn, frame, "poll", now)
|
||||||
|
self._audit.log({"ts": now, "event": "screenshot_sent", "path": str(path) if path else None})
|
||||||
|
self._notifier.send(Alert(
|
||||||
|
kind="screenshot",
|
||||||
|
title="Screenshot periodic",
|
||||||
|
body="",
|
||||||
|
image_path=path,
|
||||||
|
silent=True,
|
||||||
|
))
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
"""Tests for atm.main unified CLI."""
|
"""Tests for atm.main unified CLI."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
@@ -186,7 +187,7 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
|||||||
]
|
]
|
||||||
def __init__(self, *a, **kw):
|
def __init__(self, *a, **kw):
|
||||||
self._i = 0
|
self._i = 0
|
||||||
def step(self, ts):
|
def step(self, ts, frame=None):
|
||||||
if self._i >= len(self._script):
|
if self._i >= len(self._script):
|
||||||
raise _StopLoop
|
raise _StopLoop
|
||||||
color, accepted = self._script[self._i]
|
color, accepted = self._script[self._i]
|
||||||
@@ -228,6 +229,17 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
|||||||
def step(self, *a, **kw):
|
def step(self, *a, **kw):
|
||||||
return types.SimpleNamespace(status="pending", levels=None)
|
return types.SimpleNamespace(status="pending", levels=None)
|
||||||
|
|
||||||
|
class _StubPoller:
|
||||||
|
def __init__(self, *a, **kw): pass
|
||||||
|
async def run(self): await asyncio.sleep(9999)
|
||||||
|
|
||||||
|
class _StubScheduler:
|
||||||
|
def __init__(self, *a, **kw):
|
||||||
|
self.is_running = False
|
||||||
|
def start(self, interval_s): self.is_running = True
|
||||||
|
def stop(self): self.is_running = False
|
||||||
|
async def run(self): await asyncio.sleep(9999)
|
||||||
|
|
||||||
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
||||||
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
||||||
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
||||||
@@ -237,6 +249,8 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
|||||||
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
||||||
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
||||||
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
|
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
|
||||||
|
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
|
||||||
|
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
|
||||||
|
|
||||||
with pytest.raises(_StopLoop):
|
with pytest.raises(_StopLoop):
|
||||||
_main.run_live(cfg, duration_s=None)
|
_main.run_live(cfg, duration_s=None)
|
||||||
@@ -255,3 +269,135 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
|||||||
|
|
||||||
assert len(trigger) == 1
|
assert len(trigger) == 1
|
||||||
assert trigger[0].direction == "SELL"
|
assert trigger[0].direction == "SELL"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# MUST-HAVE: async lifecycle integration test
|
||||||
|
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
|
||||||
|
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
|
||||||
|
import numpy as np
|
||||||
|
import atm.main as _main
|
||||||
|
from atm.detector import DetectionResult
|
||||||
|
|
||||||
|
captured_alerts: list = []
|
||||||
|
scheduler_events: list[str] = []
|
||||||
|
|
||||||
|
class FakeFanout:
|
||||||
|
def __init__(self, *a, **kw): pass
|
||||||
|
def send(self, alert): captured_alerts.append(alert)
|
||||||
|
def stop(self): pass
|
||||||
|
def stats(self): return {}
|
||||||
|
|
||||||
|
class FakeCanaryResult:
|
||||||
|
distance = 0
|
||||||
|
drifted = False
|
||||||
|
paused = False
|
||||||
|
|
||||||
|
class FakeCanary:
|
||||||
|
def __init__(self, *a, **kw): self.is_paused = False
|
||||||
|
def check(self, frame): return FakeCanaryResult()
|
||||||
|
def resume(self): pass
|
||||||
|
|
||||||
|
# Scheduler tracks start/stop calls
|
||||||
|
class FakeScheduler:
|
||||||
|
def __init__(self, *a, **kw):
|
||||||
|
self.is_running = False
|
||||||
|
self.interval_s = None
|
||||||
|
def start(self, interval_s):
|
||||||
|
self.is_running = True
|
||||||
|
self.interval_s = interval_s
|
||||||
|
scheduler_events.append(f"start:{interval_s}")
|
||||||
|
def stop(self):
|
||||||
|
self.is_running = False
|
||||||
|
scheduler_events.append("stop")
|
||||||
|
async def run(self):
|
||||||
|
await asyncio.sleep(9999)
|
||||||
|
|
||||||
|
class FakePoller:
|
||||||
|
def __init__(self, *a, **kw): pass
|
||||||
|
async def run(self): await asyncio.sleep(9999)
|
||||||
|
|
||||||
|
class _StopLoop(Exception): pass
|
||||||
|
|
||||||
|
class ScriptedDetector:
|
||||||
|
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
|
||||||
|
_script = [
|
||||||
|
("turquoise", True),
|
||||||
|
("dark_green", True),
|
||||||
|
("light_green", True),
|
||||||
|
]
|
||||||
|
def __init__(self, *a, **kw): self._i = 0
|
||||||
|
def step(self, ts, frame=None):
|
||||||
|
if self._i >= len(self._script):
|
||||||
|
raise _StopLoop
|
||||||
|
color, accepted = self._script[self._i]
|
||||||
|
self._i += 1
|
||||||
|
return DetectionResult(ts=ts, window_found=True, dot_found=True,
|
||||||
|
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
|
||||||
|
@property
|
||||||
|
def rolling(self): return []
|
||||||
|
|
||||||
|
def fake_build_capture(cfg, capture_stub=False):
|
||||||
|
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
|
||||||
|
|
||||||
|
cfg = MagicMock()
|
||||||
|
cfg.lockout_s = 60
|
||||||
|
cfg.heartbeat_min = 999
|
||||||
|
cfg.loop_interval_s = 0
|
||||||
|
cfg.config_version = "test"
|
||||||
|
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
|
||||||
|
cfg.canary.drift_threshold = 10
|
||||||
|
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
|
||||||
|
cfg.chart_window_region = None
|
||||||
|
cfg.telegram.auto_poll_interval_s = 180
|
||||||
|
cfg.telegram.bot_token = "tok"
|
||||||
|
cfg.telegram.chat_id = "123"
|
||||||
|
cfg.telegram.allowed_chat_ids = ("123",)
|
||||||
|
|
||||||
|
fake_sched = FakeScheduler()
|
||||||
|
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
|
||||||
|
class _Stub:
|
||||||
|
def __init__(self, *a, **kw): pass
|
||||||
|
def log(self, *a, **kw): pass
|
||||||
|
def close(self, *a, **kw): pass
|
||||||
|
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
|
||||||
|
|
||||||
|
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
||||||
|
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
||||||
|
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
||||||
|
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
|
||||||
|
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
|
||||||
|
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
|
||||||
|
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
||||||
|
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
||||||
|
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
|
||||||
|
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
|
||||||
|
|
||||||
|
with pytest.raises(_StopLoop):
|
||||||
|
await _main.run_live_async(cfg, duration_s=None)
|
||||||
|
|
||||||
|
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
|
||||||
|
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
|
||||||
|
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
|
||||||
|
|
||||||
|
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
|
||||||
|
assert arm_alerts[0].direction == "BUY"
|
||||||
|
|
||||||
|
assert len(prime_alerts) == 1
|
||||||
|
assert prime_alerts[0].direction == "BUY"
|
||||||
|
|
||||||
|
assert len(trigger_alerts) == 1
|
||||||
|
assert trigger_alerts[0].direction == "BUY"
|
||||||
|
|
||||||
|
# Scheduler must have started (on PRIME) and stopped (on FIRE)
|
||||||
|
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
|
||||||
|
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
|
||||||
|
start_idx = scheduler_events.index("start:180")
|
||||||
|
stop_idx = scheduler_events.index("stop")
|
||||||
|
assert start_idx < stop_idx, "scheduler started after it stopped"
|
||||||
|
|||||||
Reference in New Issue
Block a user