From 4123b31a2261a145f37407835f055b0b3b505bd1 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Fri, 17 Apr 2026 10:18:08 +0000 Subject: [PATCH] feat(commands,scheduler): TelegramPoller + ScreenshotScheduler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TelegramPoller: httpx async long-poll, startup drain, chat_id filter, degrade after 3×401, Command dataclass with minute→second conversion. ScreenshotScheduler: asyncio task, capture+annotate in to_thread (decisions 9+13), silent=True on periodic screenshots, explicit constructor params. Co-Authored-By: Claude Sonnet 4.6 --- src/atm/commands.py | 163 +++++++++++++++++++++++++++++++++++++++++++ src/atm/scheduler.py | 118 +++++++++++++++++++++++++++++++ 2 files changed, 281 insertions(+) create mode 100644 src/atm/commands.py create mode 100644 src/atm/scheduler.py diff --git a/src/atm/commands.py b/src/atm/commands.py new file mode 100644 index 0000000..ca37977 --- /dev/null +++ b/src/atm/commands.py @@ -0,0 +1,163 @@ +"""Telegram command poller + Command dataclass. + +Uses httpx (async) for long-polling getUpdates. The sync TelegramNotifier +continues to use requests — this module is the only httpx consumer. +""" +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, Literal + +import httpx + +if TYPE_CHECKING: + from .config import TelegramCfg + +logger = logging.getLogger(__name__) + +CommandAction = Literal["set_interval", "stop", "status", "ss"] + +_BASE = "https://api.telegram.org/bot{token}/{method}" + + +@dataclass +class Command: + action: CommandAction + value: int | None = None # seconds; only for set_interval + + +class TelegramPoller: + """Long-poll Telegram getUpdates, emit Commands into asyncio.Queue. + + Security: rejects messages from chat_ids not in cfg.allowed_chat_ids. + Degrades (stops polling) after 3 consecutive 401 responses and warns + via Discord (caller responsibility — poller only logs + sets degraded flag). + """ + + def __init__( + self, + cfg: TelegramCfg, + cmd_queue: asyncio.Queue[Command], + audit, # _AuditLike + ) -> None: + self._cfg = cfg + self._cmd_queue = cmd_queue + self._audit = audit + self._offset = 0 + self._consecutive_401 = 0 + self._degraded = False + # fallback: if allowed_chat_ids is empty, accept only the primary chat + self._allowed = set(cfg.allowed_chat_ids) or {cfg.chat_id} + + @property + def degraded(self) -> bool: + return self._degraded + + async def run(self) -> None: + async with httpx.AsyncClient() as client: + await self._drain(client) + while True: + if self._degraded: + await asyncio.sleep(5) + continue + try: + await self._poll_once(client) + except asyncio.CancelledError: + raise + except (httpx.HTTPError, httpx.TimeoutException) as exc: + self._audit.log({"event": "poller_error", "error": str(exc)}) + await asyncio.sleep(5) + except Exception as exc: # json, unexpected + self._audit.log({"event": "poller_error", "error": str(exc)}) + await asyncio.sleep(5) + + async def _drain(self, client: httpx.AsyncClient) -> None: + """Discard all pending updates at startup so stale commands don't replay.""" + try: + resp = await client.get( + _BASE.format(token=self._cfg.bot_token, method="getUpdates"), + params={"timeout": 0, "offset": self._offset}, + timeout=10, + ) + body = resp.json() + if body.get("ok") and body.get("result"): + self._offset = body["result"][-1]["update_id"] + 1 + except Exception as exc: + logger.warning("TelegramPoller startup drain failed: %s", exc) + + async def _poll_once(self, client: httpx.AsyncClient) -> None: + resp = await client.get( + _BASE.format(token=self._cfg.bot_token, method="getUpdates"), + params={"timeout": self._cfg.poll_timeout_s, "offset": self._offset}, + timeout=self._cfg.poll_timeout_s + 5, + ) + + if resp.status_code == 401: + self._consecutive_401 += 1 + if self._consecutive_401 >= 3: + self._degraded = True + self._audit.log({"event": "poller_degraded", "reason": "3_consecutive_401"}) + return + self._consecutive_401 = 0 + + body = resp.json() + if not body.get("ok"): + return + + for update in body.get("result", []): + self._offset = update["update_id"] + 1 + await self._process_update(update) + + async def _process_update(self, update: dict) -> None: + if "callback_query" in update: + # Inline button pressed — may be expired; reply with fallback + cbq = update["callback_query"] + chat_id = str(cbq.get("from", {}).get("id", "")) + if chat_id not in self._allowed: + logger.info("Rejected callback_query from chat_id=%s", chat_id) + return + # Caller handles answerCallbackQuery; just note in audit + self._audit.log({"event": "command_received", "action": "callback_query", "chat_id": chat_id}) + return + + msg = update.get("message") or update.get("edited_message") + if not msg: + return + + chat_id = str(msg.get("chat", {}).get("id", "")) + if chat_id not in self._allowed: + logger.info("Rejected message from chat_id=%s", chat_id) + return + + text = (msg.get("text") or "").strip().lower() + cmd = self._parse_command(text) + if cmd is None: + return + + self._audit.log({ + "event": "command_received", + "action": cmd.action, + "value": cmd.value, + "chat_id": chat_id, + }) + await self._cmd_queue.put(cmd) + + def _parse_command(self, text: str) -> Command | None: + t = text.lstrip("/").strip() + if not t: + return None + if t == "stop": + return Command(action="stop") + if t == "status": + return Command(action="status") + if t in ("ss", "screenshot"): + return Command(action="ss") + # "3" → set_interval 3 minutes → 180s; "interval 3" also accepted + parts = t.split() + if len(parts) == 1 and parts[0].isdigit(): + return Command(action="set_interval", value=int(parts[0]) * 60) + if len(parts) == 2 and parts[0] in ("interval", "set_interval") and parts[1].isdigit(): + return Command(action="set_interval", value=int(parts[1]) * 60) + return None diff --git a/src/atm/scheduler.py b/src/atm/scheduler.py new file mode 100644 index 0000000..2757e6f --- /dev/null +++ b/src/atm/scheduler.py @@ -0,0 +1,118 @@ +"""ScreenshotScheduler — periodic capture + annotate + send. + +Runs as an asyncio task. capture() and cv2 work execute in asyncio.to_thread +to avoid blocking the event loop. Decision 13: scheduler calls capture() +directly, NOT via Detector. +""" +from __future__ import annotations + +import asyncio +import logging +import time +from pathlib import Path +from typing import Callable + +from .notifier import Alert + +logger = logging.getLogger(__name__) + + +class ScreenshotScheduler: + """Periodic screenshot sender. + + Constructor params are explicit (decision 11 outside-voice finding). + """ + + def __init__( + self, + capture: Callable, # () -> ndarray | None + save_fn: Callable, # (frame, label, now) -> Path | None + notifier, # _NotifierLike + audit, # _AuditLike + interval_s: int | None = None, + ) -> None: + self._capture = capture + self._save_fn = save_fn + self._notifier = notifier + self._audit = audit + self._interval_s = interval_s + self._is_running = False + self._next_due: float | None = None # monotonic + + # ------------------------------------------------------------------ + # Public state + # ------------------------------------------------------------------ + + @property + def is_running(self) -> bool: + return self._is_running + + @property + def interval_s(self) -> int | None: + return self._interval_s + + @property + def next_due(self) -> float | None: + return self._next_due + + # ------------------------------------------------------------------ + # Control (called from async event loop) + # ------------------------------------------------------------------ + + def start(self, interval_s: int) -> None: + self._interval_s = interval_s + self._is_running = True + self._next_due = time.monotonic() + interval_s + + def stop(self) -> None: + self._is_running = False + self._next_due = None + + # ------------------------------------------------------------------ + # Task body + # ------------------------------------------------------------------ + + async def run(self) -> None: + """Runs until cancelled.""" + while True: + await asyncio.sleep(1) + if not self._is_running or self._next_due is None: + continue + if time.monotonic() >= self._next_due: + await self._take_screenshot() + if self._is_running and self._interval_s is not None: + self._next_due = time.monotonic() + self._interval_s + + async def _take_screenshot(self) -> None: + now = time.time() + try: + frame = await asyncio.to_thread(self._capture) + except Exception as exc: + logger.warning("ScreenshotScheduler capture failed: %s", exc) + self._audit.log({"ts": now, "event": "screenshot_sent", "status": "capture_failed", "error": str(exc)}) + self._notifier.send(Alert( + kind="warn", + title="Captură eșuată — verificați fereastra TradeStation", + body="", + silent=True, + )) + return + + if frame is None: + self._notifier.send(Alert( + kind="warn", + title="Captură eșuată — verificați fereastra TradeStation", + body="", + silent=True, + )) + return + + path = await asyncio.to_thread(self._save_fn, frame, "poll", now) + self._audit.log({"ts": now, "event": "screenshot_sent", "path": str(path) if path else None}) + self._notifier.send(Alert( + kind="screenshot", + title="Screenshot periodic", + body="", + image_path=path, + silent=True, + ))