Compare commits

...

11 Commits

Author SHA1 Message Date
153196f762 chore(git): track logs dir; ignore runtime state files
Add logs/.gitkeep to track directory structure. Extend .gitignore with
logs/fires, logs/pause.flag, logs/detections/, and configs/current.txt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 10:10:21 +03:00
Claude Agent
3b40aed939 fix(run): isolate command dispatch exceptions from detection loop
Any exception in _dispatch_command (status, ss, etc.) was leaking out of the
asyncio.QueueEmpty try/except, crashing _detection_loop and cancelling the
poller — making the bot permanently unresponsive for the rest of the session.

Separate the queue-empty check from the dispatch into two try blocks.
Dispatch errors now log to audit + print to terminal + send a Telegram warn.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 11:29:42 +00:00
Claude Agent
0f7dd5dc84 fix(deps+tests): move httpx to prod deps; stub Poller+Scheduler in sync test
httpx was in dev deps only, causing ImportError for users doing `pip install -e .`
since atm.commands imports httpx at module level. Moved to main dependencies.

Also stubs TelegramPoller and ScreenshotScheduler in the sync catchup test to
prevent flaky CI failures from attempted real network connections.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 11:00:40 +00:00
Claude Agent
63642e71dd chore(todos): mark integration test done
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:54:24 +00:00
Claude Agent
424437ceaf fix(audit)+test: deadlock fix + lifecycle test + pytest-asyncio
AuditLog deadlock: log() held self._lock and called _open() which called
close() which tried to acquire self._lock again — RLock not needed,
refactored to _close_locked() (called while already holding lock).

pyproject.toml: pytest-asyncio + httpx in dev deps.

test_main.py:
- lifecycle integration test (MUST-HAVE): IDLE→ARMED→PRIMED→auto-poll
  starts→FIRE→auto-poll stops, asserts scheduler event order
- asyncio import for async test marker

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:54:10 +00:00
Claude Agent
ca6e578175 feat(run): async refactor — run_live_async + 7-step shutdown
run_live() is now a thin asyncio.run() wrapper. run_live_async():
- Blocking pipeline (capture→canary→detect→_handle_tick→snapshot) in
  asyncio.to_thread() per decision 1 (_sync_detection_tick function)
- TelegramPoller + ScreenshotScheduler as background asyncio tasks
- asyncio.Queue[Command] for inter-task communication
- Auto-start scheduler on PRIMED, auto-stop on fire/cooled/phase_skip
- 7-step graceful shutdown sequence
- heartbeat_due uses time.monotonic() (prevents immediate-fire regression)
- Status command: FSM state, last detection, uptime, fire count, canary health
- "ss" command: one-shot capture+annotate+send via to_thread
- Price overlay in _save_annotated_frame (dot_pos_abs + canary_ok params)
- test_main.py: ScriptedDetector.step(ts, frame=None) for zero regression

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:37:17 +00:00
Claude Agent
4123b31a22 feat(commands,scheduler): TelegramPoller + ScreenshotScheduler
TelegramPoller: httpx async long-poll, startup drain, chat_id filter,
degrade after 3×401, Command dataclass with minute→second conversion.

ScreenshotScheduler: asyncio task, capture+annotate in to_thread (decisions 9+13),
silent=True on periodic screenshots, explicit constructor params.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:18:08 +00:00
Claude Agent
c1b89ad6a9 feat(config,detector): TelegramCfg polling fields + Detector.step optional frame
TelegramCfg gains allowed_chat_ids (default: [chat_id]), poll_timeout_s=30,
auto_poll_interval_s=180. _from_dict reads from TOML; absent key defaults to
primary chat_id so existing configs need no changes.

Detector.step(ts, frame=None): when frame is provided the capture() call is
skipped — async loop pre-captures once, shares frame between canary+detection.
DetectionResult.dot_pos_abs carries absolute (x,y) for price overlay.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:17:17 +00:00
Claude Agent
fd04fcd5e6 fix(audit): threading.Lock on AuditLog.log + close (P1 bug)
detection thread and async heartbeat call log() concurrently.
Without a lock, two threads can both see today != _current_date
and double-open the file, corrupting the handle.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:16:28 +00:00
Claude Agent
c6714e8d5e feat(notifier): Alert.silent + TelegramNotifier disable_notification
Silent screenshots for periodic auto-poll — Telegram param
disable_notification=True suppresses phone notification sound.
Discord ignores the field (no equivalent).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:16:17 +00:00
Claude Agent
238243b1ce chore: add gstack skill routing rules to CLAUDE.md 2026-04-17 08:32:50 +00:00
14 changed files with 830 additions and 120 deletions

6
.gitignore vendored
View File

@@ -46,14 +46,18 @@ ENV/
# ATM runtime artefacts
logs/*.jsonl
logs/dead_letter.jsonl
logs/detections/
logs/fires
logs/pause.flag
samples/*.png
samples/*.jpg
samples/labels.json
trades.jsonl
# configs: keep template + current marker, not generated calibration
# configs: keep template only; ignore generated calibration and runtime state
configs/*.toml
!configs/example.toml
configs/current.txt
# Claude scheduler state
.claude/

35
CLAUDE.md Normal file
View File

@@ -0,0 +1,35 @@
# ATM — Automated Trading Monitor
Personal Faza-1 tool for the M2D strategy. Python 3.11+.
## Quick Reference
```bash
pip install -e ".[windows]" # Windows: live capture
pip install -e . # Linux/macOS: dev/dryrun only
atm calibrate # Tk wizard
atm debug --delay 5 # one-shot capture + detect
atm run --start-at 16:30 --stop-at 23:00 # live session
atm dryrun samples # corpus gate
pytest # run tests
```
## Skill routing
When the user's request matches an available skill, ALWAYS invoke it using the Skill
tool as your FIRST action. Do NOT answer directly, do NOT use other tools first.
The skill has specialized workflows that produce better results than ad-hoc answers.
Key routing rules:
- Product ideas, "is this worth building", brainstorming → invoke office-hours
- Bugs, errors, "why is this broken", 500 errors → invoke investigate
- Ship, deploy, push, create PR → invoke ship
- QA, test the site, find bugs → invoke qa
- Code review, check my diff → invoke review
- Update docs after shipping → invoke document-release
- Weekly retro → invoke retro
- Design system, brand → invoke design-consultation
- Visual audit, design polish → invoke design-review
- Architecture review → invoke plan-eng-review
- Save progress, checkpoint, resume → invoke checkpoint
- Code quality, health check → invoke health

View File

@@ -49,9 +49,17 @@ Read-only web view of today's audit JSONL + recent triggers. Useful for review a
---
## P2-yaxis-recalib-detect — Y-axis recalibration detection
Price overlay (from Telegram commands feature) uses `y_axis` linear interpolation to show current price on screenshots. When the user rescales the chart y-axis (common after overnight price gaps), the calibration becomes stale and prices shown are incorrect. Canary check detects layout drift but NOT y-axis range changes.
- Possible approaches: OCR on y-axis labels (fragile), track price range consistency across sessions, or simple "calibration age" warning after N hours.
- Start after price overlay is live and the false-price frequency is known.
- Depends on: Telegram commands + price overlay feature being shipped.
## Quality debt
- [ ] **Integration test for run_live loop**: currently mocked at module level. Add a short-duration in-memory loop test that threads real detector/state_machine/audit together (no network).
- [x] **Integration test for run_live loop**: lifecycle async test added in `tests/test_main.py` (IDLE→ARMED→PRIMED auto-poll→FIRE auto-stop).
- [ ] **Coverage report**: run `pytest --cov=atm --cov-report=term-missing`, aim for ≥ 85% per module.
- [ ] **Typing strictness**: run `pyright src/` with strict mode, fix reported issues.
- [ ] **Perf baseline**: profile one detection cycle on a representative frame; ensure < 100ms so 5s loop has ample headroom.

0
logs/.gitkeep Normal file
View File

View File

@@ -13,6 +13,7 @@ dependencies = [
"pillow>=10.0",
"requests>=2.31",
"rich>=13.0",
"httpx>=0.27",
]
[project.optional-dependencies]
@@ -24,6 +25,7 @@ windows = [
dev = [
"pytest>=8.0",
"pytest-cov>=5.0",
"pytest-asyncio>=0.23",
]
[project.scripts]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import threading
from datetime import datetime, date
from pathlib import Path
from typing import Callable, IO
@@ -16,21 +17,25 @@ class AuditLog:
self._clock: Callable[[], datetime] = clock or datetime.now
self._current_date: date | None = None
self._fh: IO[str] | None = None
self._lock = threading.Lock()
def log(self, event: dict) -> None:
now = self._clock()
today = now.date()
if today != self._current_date:
self._open(today)
if "ts" not in event:
event = {**event, "ts": now.isoformat()}
with self._lock:
if today != self._current_date:
self._open(today)
assert self._fh is not None
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
def close(self) -> None:
with self._lock:
self._close_locked()
def _close_locked(self) -> None:
"""Close file handle; must be called while holding self._lock."""
if self._fh is not None:
try:
self._fh.close()
@@ -47,7 +52,7 @@ class AuditLog:
return self._base_dir / f"{self._current_date}.jsonl"
def _open(self, today: date) -> None:
self.close()
self._close_locked() # already holding self._lock
self._base_dir.mkdir(parents=True, exist_ok=True)
path = self._base_dir / f"{today}.jsonl"
self._fh = open(path, "a", buffering=1, encoding="utf-8")

163
src/atm/commands.py Normal file
View File

@@ -0,0 +1,163 @@
"""Telegram command poller + Command dataclass.
Uses httpx (async) for long-polling getUpdates. The sync TelegramNotifier
continues to use requests — this module is the only httpx consumer.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal
import httpx
if TYPE_CHECKING:
from .config import TelegramCfg
logger = logging.getLogger(__name__)
CommandAction = Literal["set_interval", "stop", "status", "ss"]
_BASE = "https://api.telegram.org/bot{token}/{method}"
@dataclass
class Command:
action: CommandAction
value: int | None = None # seconds; only for set_interval
class TelegramPoller:
"""Long-poll Telegram getUpdates, emit Commands into asyncio.Queue.
Security: rejects messages from chat_ids not in cfg.allowed_chat_ids.
Degrades (stops polling) after 3 consecutive 401 responses and warns
via Discord (caller responsibility — poller only logs + sets degraded flag).
"""
def __init__(
self,
cfg: TelegramCfg,
cmd_queue: asyncio.Queue[Command],
audit, # _AuditLike
) -> None:
self._cfg = cfg
self._cmd_queue = cmd_queue
self._audit = audit
self._offset = 0
self._consecutive_401 = 0
self._degraded = False
# fallback: if allowed_chat_ids is empty, accept only the primary chat
self._allowed = set(cfg.allowed_chat_ids) or {cfg.chat_id}
@property
def degraded(self) -> bool:
return self._degraded
async def run(self) -> None:
async with httpx.AsyncClient() as client:
await self._drain(client)
while True:
if self._degraded:
await asyncio.sleep(5)
continue
try:
await self._poll_once(client)
except asyncio.CancelledError:
raise
except (httpx.HTTPError, httpx.TimeoutException) as exc:
self._audit.log({"event": "poller_error", "error": str(exc)})
await asyncio.sleep(5)
except Exception as exc: # json, unexpected
self._audit.log({"event": "poller_error", "error": str(exc)})
await asyncio.sleep(5)
async def _drain(self, client: httpx.AsyncClient) -> None:
"""Discard all pending updates at startup so stale commands don't replay."""
try:
resp = await client.get(
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
params={"timeout": 0, "offset": self._offset},
timeout=10,
)
body = resp.json()
if body.get("ok") and body.get("result"):
self._offset = body["result"][-1]["update_id"] + 1
except Exception as exc:
logger.warning("TelegramPoller startup drain failed: %s", exc)
async def _poll_once(self, client: httpx.AsyncClient) -> None:
resp = await client.get(
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
params={"timeout": self._cfg.poll_timeout_s, "offset": self._offset},
timeout=self._cfg.poll_timeout_s + 5,
)
if resp.status_code == 401:
self._consecutive_401 += 1
if self._consecutive_401 >= 3:
self._degraded = True
self._audit.log({"event": "poller_degraded", "reason": "3_consecutive_401"})
return
self._consecutive_401 = 0
body = resp.json()
if not body.get("ok"):
return
for update in body.get("result", []):
self._offset = update["update_id"] + 1
await self._process_update(update)
async def _process_update(self, update: dict) -> None:
if "callback_query" in update:
# Inline button pressed — may be expired; reply with fallback
cbq = update["callback_query"]
chat_id = str(cbq.get("from", {}).get("id", ""))
if chat_id not in self._allowed:
logger.info("Rejected callback_query from chat_id=%s", chat_id)
return
# Caller handles answerCallbackQuery; just note in audit
self._audit.log({"event": "command_received", "action": "callback_query", "chat_id": chat_id})
return
msg = update.get("message") or update.get("edited_message")
if not msg:
return
chat_id = str(msg.get("chat", {}).get("id", ""))
if chat_id not in self._allowed:
logger.info("Rejected message from chat_id=%s", chat_id)
return
text = (msg.get("text") or "").strip().lower()
cmd = self._parse_command(text)
if cmd is None:
return
self._audit.log({
"event": "command_received",
"action": cmd.action,
"value": cmd.value,
"chat_id": chat_id,
})
await self._cmd_queue.put(cmd)
def _parse_command(self, text: str) -> Command | None:
t = text.lstrip("/").strip()
if not t:
return None
if t == "stop":
return Command(action="stop")
if t == "status":
return Command(action="status")
if t in ("ss", "screenshot"):
return Command(action="ss")
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
parts = t.split()
if len(parts) == 1 and parts[0].isdigit():
return Command(action="set_interval", value=int(parts[0]) * 60)
if len(parts) == 2 and parts[0] in ("interval", "set_interval") and parts[1].isdigit():
return Command(action="set_interval", value=int(parts[1]) * 60)
return None

View File

@@ -78,6 +78,9 @@ class DiscordCfg:
class TelegramCfg:
bot_token: str
chat_id: str
allowed_chat_ids: tuple[str, ...] = ()
poll_timeout_s: int = 30
auto_poll_interval_s: int = 180
def __post_init__(self) -> None:
if not self.bot_token or not self.chat_id:
@@ -156,9 +159,14 @@ class Config:
drift_threshold=int(data["canary"].get("drift_threshold", 8)),
)
discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"])
tg = data["telegram"]
_allowed = [str(c) for c in tg.get("allowed_chat_ids", [])] or [str(tg["chat_id"])]
telegram = TelegramCfg(
bot_token=data["telegram"]["bot_token"],
chat_id=str(data["telegram"]["chat_id"]),
bot_token=tg["bot_token"],
chat_id=str(tg["chat_id"]),
allowed_chat_ids=tuple(_allowed),
poll_timeout_s=int(tg.get("poll_timeout_s", 30)),
auto_poll_interval_s=int(tg.get("auto_poll_interval_s", 180)),
)
opts = data.get("options", {})
region = None

View File

@@ -28,6 +28,7 @@ class DetectionResult:
match: ColorMatch | None # None if no dot
accepted: bool # post-debounce; True only when match repeats debounce_depth times
color: str | None # accepted color name (UNKNOWN excluded)
dot_pos_abs: tuple[int, int] | None = None # absolute (x, y) in frame; set when dot_found
class Detector:
@@ -60,7 +61,13 @@ class Detector:
self._debounce: deque[str | None] = deque(maxlen=cfg.debounce_depth)
self._rolling: deque[DetectionResult] = deque(maxlen=20)
def step(self, ts: float) -> DetectionResult:
def step(self, ts: float, frame=None) -> DetectionResult:
"""Run one detection tick.
frame: pre-captured BGR ndarray (from asyncio.to_thread capture). When
None (default), calls self._capture() — preserving the sync-loop behaviour.
"""
if frame is None:
frame = self._capture()
if frame is None:
@@ -117,6 +124,7 @@ class Detector:
match=match,
accepted=accepted,
color=color,
dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y),
)
self._rolling.append(r)
return r

View File

@@ -2,12 +2,15 @@
from __future__ import annotations
import argparse
import asyncio
import contextlib
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Protocol, cast
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
from atm.config import Config # stdlib-only (tomllib); safe at module level
from atm.notifier import Alert
@@ -348,6 +351,8 @@ def _save_annotated_frame(
label: str,
now: float,
audit: _AuditLike | None = None,
dot_pos_abs: "tuple[int, int] | None" = None,
canary_ok: bool = True,
) -> "Path | None":
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
@@ -355,6 +360,10 @@ def _save_annotated_frame(
audit (when provided) so disk-full / permission issues don't become silent
regressions. Never raises — snapshot is a best-effort enhancement, the
text alert must still go out.
dot_pos_abs + canary_ok: when both are set the price overlay is drawn
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted
since calibration may be stale.
"""
try:
import cv2 # type: ignore[import-untyped]
@@ -371,6 +380,22 @@ def _save_annotated_frame(
annotated = frame.copy()
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
try:
_, dot_y = dot_pos_abs
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass # price overlay is best-effort; never break the screenshot
cv2.imwrite(str(fpath), annotated)
return fpath
except Exception as exc:
@@ -496,7 +521,113 @@ def _handle_tick(
return tr
@dataclass
class _TickSyncResult:
frame: Any = None
res: Any = None # DetectionResult | None
tr: Any = None # Transition | None
first_consumed: bool = False
late_start: bool = False
new_color: str | None = None # corpus sample color when changed
def _sync_detection_tick(
capture: Callable,
canary: Any,
cfg: Any,
detector: Any,
fsm: Any,
notifier: _NotifierLike,
audit: _AuditLike,
detection_log: _AuditLike,
fires_dir: Path,
first_accepted: bool,
last_saved_color: "str | None",
now: float,
samples_dir: Path,
) -> _TickSyncResult:
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread."""
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
return _TickSyncResult()
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
return _TickSyncResult(frame=frame)
res = detector.step(now, frame)
detection_log.log({
"ts": now, "event": "frame",
"window_found": res.window_found,
"dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None,
"match_name": res.match.name if res.match is not None else None,
"distance": round(res.match.distance, 2) if res.match is not None else None,
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
"accepted": res.accepted,
"color": res.color,
})
if not (res.accepted and res.color):
return _TickSyncResult(frame=frame, res=res)
is_first = first_accepted
def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True):
return None
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
)
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot)
if tr is None:
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True)
new_color: str | None = None
if res.color != last_saved_color:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
try:
import cv2 # type: ignore[import-untyped]
cv2.imwrite(str(sample_path), frame)
except Exception as exc:
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
new_color = res.color
if tr.trigger and not tr.locked:
fire_path: "Path | None" = None
if cfg.attach_screenshots.trigger:
fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
)
notifier.send(Alert(
kind="trigger",
title=f"Semnal {tr.trigger}",
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=fire_path,
direction=tr.trigger,
))
return _TickSyncResult(
frame=frame, res=res, tr=tr,
first_consumed=is_first, new_color=new_color,
)
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
"""Sync entry point — delegates to asyncio event loop."""
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> None:
"""Main live monitoring loop. Imports are lazy to keep --help fast."""
try:
from atm.detector import Detector
@@ -506,6 +637,8 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
from atm.notifier.discord import DiscordNotifier
from atm.notifier.telegram import TelegramNotifier
from atm.audit import AuditLog
from atm.commands import TelegramPoller, Command
from atm.scheduler import ScreenshotScheduler
except ImportError as exc:
sys.exit(f"run-loop dependencies not available: {exc}")
@@ -521,7 +654,6 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
]
def _on_drop(backend_name: str, dropped: Alert) -> None:
"""Audit la depășire coadă — face eșecul silențios vizibil."""
audit.log({
"ts": time.time(),
"event": "queue_overflow_drop",
@@ -532,7 +664,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
# Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea.
# Initial frame + canary check
first_frame = capture()
if first_frame is None:
print("WARN: first capture returned None — window/region missing", flush=True)
@@ -542,9 +674,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
if first_check.drifted:
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
canary.resume() # clear the auto-pause so user can Ctrl+C and fix
canary.resume()
dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
notifier.send(Alert(
kind="heartbeat",
title="ATM pornit",
@@ -556,106 +688,42 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
start = time.monotonic()
heartbeat_due = time.time() + cfg.heartbeat_min * 60
levels_extractor = None
last_saved_color: str | None = None
first_accepted = True
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
samples_dir = Path("samples")
samples_dir.mkdir(exist_ok=True)
fires_dir = Path("logs") / "fires"
fires_dir.mkdir(parents=True, exist_ok=True)
import cv2 # type: ignore[import-untyped]
try:
while duration_s is None or (time.monotonic() - start) < duration_s:
now = time.time()
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
time.sleep(cfg.loop_interval_s)
continue
# canary check
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
time.sleep(cfg.loop_interval_s)
continue
# detection
res = detector.step(now)
detection_log.log({
"ts": now,
"event": "frame",
"window_found": res.window_found,
"dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None,
"match_name": res.match.name if res.match is not None else None,
"distance": round(res.match.distance, 2) if res.match is not None else None,
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
"accepted": res.accepted,
"color": res.color,
})
if res.accepted and res.color:
is_first = first_accepted
first_accepted = False
import cv2 # noqa: F401 fail fast if cv2 is missing # type: ignore[import-untyped]
except ImportError:
pass
# Per-iteration closure — binds current frame/now, gates on config.
def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True):
return None
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
)
tr = _handle_tick(
fsm, res.color, now, notifier, audit, is_first,
snapshot=_snapshot,
)
if tr is None:
# pornire târzie: FSM neatins, sari peste FIRE + salvare corpus
time.sleep(cfg.loop_interval_s)
continue
# corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară
if res.color != last_saved_color:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
try:
cv2.imwrite(str(sample_path), frame)
except Exception as exc:
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
last_saved_color = res.color
# FIRE: adnotează frame-ul + salvează, atașează la alertă
if tr.trigger and not tr.locked:
fire_path: "Path | None" = None
if cfg.attach_screenshots.trigger:
fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
)
notifier.send(Alert(
kind="trigger",
title=f"Semnal {tr.trigger}",
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=fire_path,
direction=tr.trigger,
))
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
# phase-B levels
if levels_extractor is not None:
lr = levels_extractor.step(frame, now)
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
first_accepted = True
last_saved_color: str | None = None
levels_extractor = None
# heartbeat — include statistici per-backend ca eșecurile silențioase
# să apară la fiecare 30 min fără să aștepte oprirea.
if time.time() > heartbeat_due:
fire_count = 0
def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
scheduler = ScreenshotScheduler(
capture=capture,
save_fn=_bound_save,
notifier=notifier,
audit=audit,
)
poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
# ------------------------------------------------------------------
# Nested async coroutines — capture nonlocal state from run_live_async
# ------------------------------------------------------------------
async def _heartbeat_loop() -> None:
nonlocal heartbeat_due
while True:
await asyncio.sleep(60)
if time.monotonic() > heartbeat_due:
try:
stats = notifier.stats()
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
@@ -668,9 +736,151 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
except Exception:
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
heartbeat_due = time.time() + cfg.heartbeat_min * 60
time.sleep(cfg.loop_interval_s)
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
async def _dispatch_command(cmd: Command) -> None:
nonlocal fire_count
if cmd.action == "set_interval":
secs = cmd.value or cfg.telegram.auto_poll_interval_s
scheduler.start(secs)
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
elif cmd.action == "stop":
if scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
notifier.send(Alert(kind="status", title="Polling oprit", body=""))
else:
notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
elif cmd.action == "status":
uptime_s = time.monotonic() - start
last_roll = detector.rolling[-1] if detector.rolling else None
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else ""
last_color = (
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else ""
) if last_roll else ""
sched_info = (
f"activ @{scheduler.interval_s // 60}min" if scheduler.interval_s else "activ"
) if scheduler.is_running else "oprit"
canary_info = "drift (pauze)" if canary.is_paused else "ok"
body = (
f"Stare: {fsm.state.value}\n"
f"Ultima detecție: {last_color} (conf {last_conf})\n"
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {fire_count}\n"
f"Poller: {sched_info} | Canary: {canary_info}"
)
notifier.send(Alert(kind="status", title="ATM Status", body=body))
elif cmd.action == "ss":
now_ss = time.time()
frame_ss = await asyncio.to_thread(capture)
if frame_ss is None:
notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
))
return
path_ss = await asyncio.to_thread(
_save_annotated_frame, frame_ss, cfg, fires_dir, "ss", now_ss, audit,
)
audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
async def _detection_loop() -> None:
nonlocal first_accepted, last_saved_color, levels_extractor, fire_count
while True:
if duration_s is not None and (time.monotonic() - start) >= duration_s:
break
now = time.time()
result: _TickSyncResult = await asyncio.to_thread(
_sync_detection_tick,
capture, canary, cfg, detector, fsm, notifier, audit, detection_log,
fires_dir, first_accepted, last_saved_color, now, samples_dir,
)
if result.first_consumed:
first_accepted = False
if result.new_color is not None:
last_saved_color = result.new_color
tr = result.tr
res = result.res
if result.late_start or res is None:
await asyncio.sleep(cfg.loop_interval_s)
continue
if tr is not None and res.accepted and res.color:
if tr.reason == "prime" and not scheduler.is_running:
scheduler.start(cfg.telegram.auto_poll_interval_s)
audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr is not None and tr.trigger and not tr.locked:
fire_count += 1
if scheduler.is_running:
scheduler.stop()
audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
if levels_extractor is not None and result.frame is not None:
lr = levels_extractor.step(result.frame, now)
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
levels_extractor = None
while True:
try:
cmd = cmd_queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
await _dispatch_command(cmd)
except Exception as _cmd_exc:
_msg = f"/{cmd.action}: {_cmd_exc}"
audit.log({"ts": time.time(), "event": "command_error", "action": cmd.action, "error": str(_cmd_exc)})
print(f"ERR command_dispatch {_msg}", flush=True)
notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(_cmd_exc)))
await asyncio.sleep(cfg.loop_interval_s)
# Launch background tasks
t_scheduler = asyncio.create_task(scheduler.run(), name="scheduler")
t_poller = asyncio.create_task(poller.run(), name="poller")
t_heartbeat = asyncio.create_task(_heartbeat_loop(), name="heartbeat")
try:
await _detection_loop()
finally:
# 7-step graceful shutdown
# 1. cancel scheduler
t_scheduler.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_scheduler
# 2. cancel poller
t_poller.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_poller
# 3. cancel heartbeat
t_heartbeat.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_heartbeat
# 4. drain detection — complete (we awaited _detection_loop directly)
# 5. send shutdown alert
try:
stats = notifier.stats()
lines = [f"după {time.monotonic() - start:.0f}s"]
@@ -679,13 +889,12 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
f"{name}: sent={s['sent']} failed={s['failed']} "
f"dropped={s['dropped']} retries={s['retries']}"
)
notifier.send(Alert(
kind="heartbeat", title="ATM oprit",
body="\n".join(lines),
))
notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines)))
except Exception:
pass
# 6. notifier.stop() — flush + join FanoutNotifier threads
notifier.stop()
# 7. audit.close()
audit.close()
detection_log.close()

View File

@@ -5,11 +5,13 @@ from typing import Protocol
@dataclass
class Alert:
kind: str # "trigger" | "heartbeat" | "levels" | "warn" | "arm" | "prime" | "late_start"
# flat union: "trigger"|"heartbeat"|"levels"|"warn"|"arm"|"prime"|"late_start"|"screenshot"|"status"
kind: str
title: str
body: str
image_path: Path | None = None # annotated screenshot
direction: str | None = None # "BUY"/"SELL" when kind=trigger
silent: bool = False # disable_notification for Telegram; ignored by Discord
class Notifier(Protocol):

View File

@@ -33,6 +33,7 @@ class TelegramNotifier:
"chat_id": self._chat_id,
"caption": text,
"parse_mode": "HTML",
"disable_notification": str(alert.silent).lower(),
},
files={"photo": fh},
timeout=10,
@@ -44,6 +45,7 @@ class TelegramNotifier:
"chat_id": self._chat_id,
"text": text,
"parse_mode": "HTML",
"disable_notification": alert.silent,
},
timeout=10,
)

118
src/atm/scheduler.py Normal file
View File

@@ -0,0 +1,118 @@
"""ScreenshotScheduler — periodic capture + annotate + send.
Runs as an asyncio task. capture() and cv2 work execute in asyncio.to_thread
to avoid blocking the event loop. Decision 13: scheduler calls capture()
directly, NOT via Detector.
"""
from __future__ import annotations
import asyncio
import logging
import time
from pathlib import Path
from typing import Callable
from .notifier import Alert
logger = logging.getLogger(__name__)
class ScreenshotScheduler:
"""Periodic screenshot sender.
Constructor params are explicit (decision 11 outside-voice finding).
"""
def __init__(
self,
capture: Callable, # () -> ndarray | None
save_fn: Callable, # (frame, label, now) -> Path | None
notifier, # _NotifierLike
audit, # _AuditLike
interval_s: int | None = None,
) -> None:
self._capture = capture
self._save_fn = save_fn
self._notifier = notifier
self._audit = audit
self._interval_s = interval_s
self._is_running = False
self._next_due: float | None = None # monotonic
# ------------------------------------------------------------------
# Public state
# ------------------------------------------------------------------
@property
def is_running(self) -> bool:
return self._is_running
@property
def interval_s(self) -> int | None:
return self._interval_s
@property
def next_due(self) -> float | None:
return self._next_due
# ------------------------------------------------------------------
# Control (called from async event loop)
# ------------------------------------------------------------------
def start(self, interval_s: int) -> None:
self._interval_s = interval_s
self._is_running = True
self._next_due = time.monotonic() + interval_s
def stop(self) -> None:
self._is_running = False
self._next_due = None
# ------------------------------------------------------------------
# Task body
# ------------------------------------------------------------------
async def run(self) -> None:
"""Runs until cancelled."""
while True:
await asyncio.sleep(1)
if not self._is_running or self._next_due is None:
continue
if time.monotonic() >= self._next_due:
await self._take_screenshot()
if self._is_running and self._interval_s is not None:
self._next_due = time.monotonic() + self._interval_s
async def _take_screenshot(self) -> None:
now = time.time()
try:
frame = await asyncio.to_thread(self._capture)
except Exception as exc:
logger.warning("ScreenshotScheduler capture failed: %s", exc)
self._audit.log({"ts": now, "event": "screenshot_sent", "status": "capture_failed", "error": str(exc)})
self._notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
silent=True,
))
return
if frame is None:
self._notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
silent=True,
))
return
path = await asyncio.to_thread(self._save_fn, frame, "poll", now)
self._audit.log({"ts": now, "event": "screenshot_sent", "path": str(path) if path else None})
self._notifier.send(Alert(
kind="screenshot",
title="Screenshot periodic",
body="",
image_path=path,
silent=True,
))

View File

@@ -1,6 +1,7 @@
"""Tests for atm.main unified CLI."""
from __future__ import annotations
import asyncio
import os
import subprocess
import sys
@@ -186,7 +187,7 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
]
def __init__(self, *a, **kw):
self._i = 0
def step(self, ts):
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]
@@ -228,6 +229,17 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
def step(self, *a, **kw):
return types.SimpleNamespace(status="pending", levels=None)
class _StubPoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StubScheduler:
def __init__(self, *a, **kw):
self.is_running = False
def start(self, interval_s): self.is_running = True
def stop(self): self.is_running = False
async def run(self): await asyncio.sleep(9999)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
@@ -237,6 +249,8 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
with pytest.raises(_StopLoop):
_main.run_live(cfg, duration_s=None)
@@ -255,3 +269,135 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
assert len(trigger) == 1
assert trigger[0].direction == "SELL"
# ---------------------------------------------------------------------------
# MUST-HAVE: async lifecycle integration test
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
import numpy as np
import atm.main as _main
from atm.detector import DetectionResult
captured_alerts: list = []
scheduler_events: list[str] = []
class FakeFanout:
def __init__(self, *a, **kw): pass
def send(self, alert): captured_alerts.append(alert)
def stop(self): pass
def stats(self): return {}
class FakeCanaryResult:
distance = 0
drifted = False
paused = False
class FakeCanary:
def __init__(self, *a, **kw): self.is_paused = False
def check(self, frame): return FakeCanaryResult()
def resume(self): pass
# Scheduler tracks start/stop calls
class FakeScheduler:
def __init__(self, *a, **kw):
self.is_running = False
self.interval_s = None
def start(self, interval_s):
self.is_running = True
self.interval_s = interval_s
scheduler_events.append(f"start:{interval_s}")
def stop(self):
self.is_running = False
scheduler_events.append("stop")
async def run(self):
await asyncio.sleep(9999)
class FakePoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StopLoop(Exception): pass
class ScriptedDetector:
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
_script = [
("turquoise", True),
("dark_green", True),
("light_green", True),
]
def __init__(self, *a, **kw): self._i = 0
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]
self._i += 1
return DetectionResult(ts=ts, window_found=True, dot_found=True,
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
@property
def rolling(self): return []
def fake_build_capture(cfg, capture_stub=False):
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
cfg = MagicMock()
cfg.lockout_s = 60
cfg.heartbeat_min = 999
cfg.loop_interval_s = 0
cfg.config_version = "test"
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
cfg.canary.drift_threshold = 10
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
cfg.chart_window_region = None
cfg.telegram.auto_poll_interval_s = 180
cfg.telegram.bot_token = "tok"
cfg.telegram.chat_id = "123"
cfg.telegram.allowed_chat_ids = ("123",)
fake_sched = FakeScheduler()
monkeypatch.chdir(tmp_path)
class _Stub:
def __init__(self, *a, **kw): pass
def log(self, *a, **kw): pass
def close(self, *a, **kw): pass
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
with pytest.raises(_StopLoop):
await _main.run_live_async(cfg, duration_s=None)
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
assert arm_alerts[0].direction == "BUY"
assert len(prime_alerts) == 1
assert prime_alerts[0].direction == "BUY"
assert len(trigger_alerts) == 1
assert trigger_alerts[0].direction == "BUY"
# Scheduler must have started (on PRIME) and stopped (on FIRE)
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
start_idx = scheduler_events.index("start:180")
stop_idx = scheduler_events.index("stop")
assert start_idx < stop_idx, "scheduler started after it stopped"