Compare commits
17 Commits
worktree-a
...
8bae507bbd
| Author | SHA1 | Date | |
|---|---|---|---|
| 8bae507bbd | |||
| 23865776e3 | |||
| 54f55752c1 | |||
| 8b53b8d3c9 | |||
| 9cf49caf8a | |||
| c5024ce600 | |||
| 153196f762 | |||
|
|
3b40aed939 | ||
|
|
0f7dd5dc84 | ||
|
|
63642e71dd | ||
|
|
424437ceaf | ||
|
|
ca6e578175 | ||
|
|
4123b31a22 | ||
|
|
c1b89ad6a9 | ||
|
|
fd04fcd5e6 | ||
|
|
c6714e8d5e | ||
|
|
238243b1ce |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -46,14 +46,18 @@ ENV/
|
||||
# ATM runtime artefacts
|
||||
logs/*.jsonl
|
||||
logs/dead_letter.jsonl
|
||||
logs/detections/
|
||||
logs/fires
|
||||
logs/pause.flag
|
||||
samples/*.png
|
||||
samples/*.jpg
|
||||
samples/labels.json
|
||||
trades.jsonl
|
||||
|
||||
# configs: keep template + current marker, not generated calibration
|
||||
# configs: keep template only; ignore generated calibration and runtime state
|
||||
configs/*.toml
|
||||
!configs/example.toml
|
||||
configs/current.txt
|
||||
|
||||
# Claude scheduler state
|
||||
.claude/
|
||||
|
||||
35
CLAUDE.md
Normal file
35
CLAUDE.md
Normal file
@@ -0,0 +1,35 @@
|
||||
# ATM — Automated Trading Monitor
|
||||
|
||||
Personal Faza-1 tool for the M2D strategy. Python 3.11+.
|
||||
|
||||
## Quick Reference
|
||||
|
||||
```bash
|
||||
pip install -e ".[windows]" # Windows: live capture
|
||||
pip install -e . # Linux/macOS: dev/dryrun only
|
||||
atm calibrate # Tk wizard
|
||||
atm debug --delay 5 # one-shot capture + detect
|
||||
atm run --start-at 16:30 --stop-at 23:00 # live session
|
||||
atm dryrun samples # corpus gate
|
||||
pytest # run tests
|
||||
```
|
||||
|
||||
## Skill routing
|
||||
|
||||
When the user's request matches an available skill, ALWAYS invoke it using the Skill
|
||||
tool as your FIRST action. Do NOT answer directly, do NOT use other tools first.
|
||||
The skill has specialized workflows that produce better results than ad-hoc answers.
|
||||
|
||||
Key routing rules:
|
||||
- Product ideas, "is this worth building", brainstorming → invoke office-hours
|
||||
- Bugs, errors, "why is this broken", 500 errors → invoke investigate
|
||||
- Ship, deploy, push, create PR → invoke ship
|
||||
- QA, test the site, find bugs → invoke qa
|
||||
- Code review, check my diff → invoke review
|
||||
- Update docs after shipping → invoke document-release
|
||||
- Weekly retro → invoke retro
|
||||
- Design system, brand → invoke design-consultation
|
||||
- Visual audit, design polish → invoke design-review
|
||||
- Architecture review → invoke plan-eng-review
|
||||
- Save progress, checkpoint, resume → invoke checkpoint
|
||||
- Code quality, health check → invoke health
|
||||
10
TODOS.md
10
TODOS.md
@@ -49,9 +49,17 @@ Read-only web view of today's audit JSONL + recent triggers. Useful for review a
|
||||
|
||||
---
|
||||
|
||||
## P2-yaxis-recalib-detect — Y-axis recalibration detection
|
||||
|
||||
Price overlay (from Telegram commands feature) uses `y_axis` linear interpolation to show current price on screenshots. When the user rescales the chart y-axis (common after overnight price gaps), the calibration becomes stale and prices shown are incorrect. Canary check detects layout drift but NOT y-axis range changes.
|
||||
|
||||
- Possible approaches: OCR on y-axis labels (fragile), track price range consistency across sessions, or simple "calibration age" warning after N hours.
|
||||
- Start after price overlay is live and the false-price frequency is known.
|
||||
- Depends on: Telegram commands + price overlay feature being shipped.
|
||||
|
||||
## Quality debt
|
||||
|
||||
- [ ] **Integration test for run_live loop**: currently mocked at module level. Add a short-duration in-memory loop test that threads real detector/state_machine/audit together (no network).
|
||||
- [x] **Integration test for run_live loop**: lifecycle async test added in `tests/test_main.py` (IDLE→ARMED→PRIMED auto-poll→FIRE auto-stop).
|
||||
- [ ] **Coverage report**: run `pytest --cov=atm --cov-report=term-missing`, aim for ≥ 85% per module.
|
||||
- [ ] **Typing strictness**: run `pyright src/` with strict mode, fix reported issues.
|
||||
- [ ] **Perf baseline**: profile one detection cycle on a representative frame; ensure < 100ms so 5s loop has ample headroom.
|
||||
|
||||
@@ -81,6 +81,24 @@ low_conf_run = 3
|
||||
phaseb_timeout_s = 600
|
||||
dead_letter_path = "logs/dead_letter.jsonl"
|
||||
|
||||
# Alert-behavior toggles (not screenshot-attachment; see attach_screenshots below).
|
||||
# fire_on_phase_skip: emit a backstop "PHASE SKIP" alert when the FSM observes
|
||||
# ARMED → light_green/light_red directly (skipping the dark prime). Default on
|
||||
# because missing a fire is worse than a false-positive phase-skip alert.
|
||||
[options.alerts]
|
||||
fire_on_phase_skip = true
|
||||
|
||||
# Operating hours — detection only runs on allowed weekdays + HH:MM window.
|
||||
# Timezone is the source of truth (NYSE local); the runtime converts tick
|
||||
# timestamps to this zone so DST rollovers stay aligned with the exchange.
|
||||
# Override from CLI with --tz / --weekdays / --oh-start / --oh-stop.
|
||||
[options.operating_hours]
|
||||
enabled = false
|
||||
timezone = "America/New_York"
|
||||
weekdays = ["MON", "TUE", "WED", "THU", "FRI"]
|
||||
start_hhmm = "09:30"
|
||||
stop_hhmm = "16:00"
|
||||
|
||||
# Per-kind screenshot-attach toggles. All default to true on upgrade.
|
||||
# Accepts either a bare bool (legacy: attach_screenshots = true) or this table.
|
||||
[options.attach_screenshots]
|
||||
|
||||
0
logs/.gitkeep
Normal file
0
logs/.gitkeep
Normal file
@@ -13,6 +13,7 @@ dependencies = [
|
||||
"pillow>=10.0",
|
||||
"requests>=2.31",
|
||||
"rich>=13.0",
|
||||
"httpx>=0.27",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@@ -24,6 +25,7 @@ windows = [
|
||||
dev = [
|
||||
"pytest>=8.0",
|
||||
"pytest-cov>=5.0",
|
||||
"pytest-asyncio>=0.23",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import threading
|
||||
from datetime import datetime, date
|
||||
from pathlib import Path
|
||||
from typing import Callable, IO
|
||||
@@ -16,21 +17,25 @@ class AuditLog:
|
||||
self._clock: Callable[[], datetime] = clock or datetime.now
|
||||
self._current_date: date | None = None
|
||||
self._fh: IO[str] | None = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def log(self, event: dict) -> None:
|
||||
now = self._clock()
|
||||
today = now.date()
|
||||
|
||||
if today != self._current_date:
|
||||
self._open(today)
|
||||
|
||||
if "ts" not in event:
|
||||
event = {**event, "ts": now.isoformat()}
|
||||
|
||||
assert self._fh is not None
|
||||
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
|
||||
with self._lock:
|
||||
if today != self._current_date:
|
||||
self._open(today)
|
||||
assert self._fh is not None
|
||||
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
|
||||
|
||||
def close(self) -> None:
|
||||
with self._lock:
|
||||
self._close_locked()
|
||||
|
||||
def _close_locked(self) -> None:
|
||||
"""Close file handle; must be called while holding self._lock."""
|
||||
if self._fh is not None:
|
||||
try:
|
||||
self._fh.close()
|
||||
@@ -47,7 +52,7 @@ class AuditLog:
|
||||
return self._base_dir / f"{self._current_date}.jsonl"
|
||||
|
||||
def _open(self, today: date) -> None:
|
||||
self.close()
|
||||
self._close_locked() # already holding self._lock
|
||||
self._base_dir.mkdir(parents=True, exist_ok=True)
|
||||
path = self._base_dir / f"{today}.jsonl"
|
||||
self._fh = open(path, "a", buffering=1, encoding="utf-8")
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
"""Layout drift detector via perceptual hash comparison."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
import numpy as np
|
||||
|
||||
from .config import Config
|
||||
from .vision import crop_roi, hamming_hex, phash
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CanaryResult:
|
||||
@@ -28,10 +32,15 @@ class Canary:
|
||||
self,
|
||||
cfg: Config,
|
||||
pause_flag_path: Path | None = None,
|
||||
on_pause_callback: Callable[[int], None] | None = None,
|
||||
) -> None:
|
||||
self._cfg = cfg
|
||||
self._pause_flag_path = pause_flag_path
|
||||
self._paused = False
|
||||
# Single-shot callback invoked exactly once per not_paused→paused transition.
|
||||
# Wrapped in try/except at call site so a faulty notifier never breaks
|
||||
# the detection cycle.
|
||||
self._on_pause = on_pause_callback
|
||||
|
||||
def check(self, frame_bgr: np.ndarray) -> CanaryResult:
|
||||
roi_img = crop_roi(frame_bgr, self._cfg.canary.roi)
|
||||
@@ -43,6 +52,12 @@ class Canary:
|
||||
self._paused = True
|
||||
if self._pause_flag_path is not None:
|
||||
self._pause_flag_path.write_text("paused", encoding="utf-8")
|
||||
if self._on_pause is not None:
|
||||
try:
|
||||
self._on_pause(distance)
|
||||
except Exception as exc:
|
||||
# Never let a notifier hiccup abort the detection cycle.
|
||||
logger.warning("canary on_pause_callback raised: %s", exc)
|
||||
|
||||
return CanaryResult(distance=distance, drifted=drifted, paused=self._paused)
|
||||
|
||||
|
||||
170
src/atm/commands.py
Normal file
170
src/atm/commands.py
Normal file
@@ -0,0 +1,170 @@
|
||||
"""Telegram command poller + Command dataclass.
|
||||
|
||||
Uses httpx (async) for long-polling getUpdates. The sync TelegramNotifier
|
||||
continues to use requests — this module is the only httpx consumer.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
import httpx
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .config import TelegramCfg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume"]
|
||||
|
||||
_BASE = "https://api.telegram.org/bot{token}/{method}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Command:
|
||||
action: CommandAction
|
||||
value: int | None = None # seconds; only for set_interval
|
||||
|
||||
|
||||
class TelegramPoller:
|
||||
"""Long-poll Telegram getUpdates, emit Commands into asyncio.Queue.
|
||||
|
||||
Security: rejects messages from chat_ids not in cfg.allowed_chat_ids.
|
||||
Degrades (stops polling) after 3 consecutive 401 responses and warns
|
||||
via Discord (caller responsibility — poller only logs + sets degraded flag).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cfg: TelegramCfg,
|
||||
cmd_queue: asyncio.Queue[Command],
|
||||
audit, # _AuditLike
|
||||
) -> None:
|
||||
self._cfg = cfg
|
||||
self._cmd_queue = cmd_queue
|
||||
self._audit = audit
|
||||
self._offset = 0
|
||||
self._consecutive_401 = 0
|
||||
self._degraded = False
|
||||
# fallback: if allowed_chat_ids is empty, accept only the primary chat
|
||||
self._allowed = set(cfg.allowed_chat_ids) or {cfg.chat_id}
|
||||
|
||||
@property
|
||||
def degraded(self) -> bool:
|
||||
return self._degraded
|
||||
|
||||
async def run(self) -> None:
|
||||
async with httpx.AsyncClient() as client:
|
||||
await self._drain(client)
|
||||
while True:
|
||||
if self._degraded:
|
||||
await asyncio.sleep(5)
|
||||
continue
|
||||
try:
|
||||
await self._poll_once(client)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except (httpx.HTTPError, httpx.TimeoutException) as exc:
|
||||
self._audit.log({"event": "poller_error", "error": str(exc)})
|
||||
await asyncio.sleep(5)
|
||||
except Exception as exc: # json, unexpected
|
||||
self._audit.log({"event": "poller_error", "error": str(exc)})
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def _drain(self, client: httpx.AsyncClient) -> None:
|
||||
"""Discard all pending updates at startup so stale commands don't replay."""
|
||||
try:
|
||||
resp = await client.get(
|
||||
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
|
||||
params={"timeout": 0, "offset": self._offset},
|
||||
timeout=10,
|
||||
)
|
||||
body = resp.json()
|
||||
if body.get("ok") and body.get("result"):
|
||||
self._offset = body["result"][-1]["update_id"] + 1
|
||||
except Exception as exc:
|
||||
logger.warning("TelegramPoller startup drain failed: %s", exc)
|
||||
|
||||
async def _poll_once(self, client: httpx.AsyncClient) -> None:
|
||||
resp = await client.get(
|
||||
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
|
||||
params={"timeout": self._cfg.poll_timeout_s, "offset": self._offset},
|
||||
timeout=self._cfg.poll_timeout_s + 5,
|
||||
)
|
||||
|
||||
if resp.status_code == 401:
|
||||
self._consecutive_401 += 1
|
||||
if self._consecutive_401 >= 3:
|
||||
self._degraded = True
|
||||
self._audit.log({"event": "poller_degraded", "reason": "3_consecutive_401"})
|
||||
return
|
||||
self._consecutive_401 = 0
|
||||
|
||||
body = resp.json()
|
||||
if not body.get("ok"):
|
||||
return
|
||||
|
||||
for update in body.get("result", []):
|
||||
self._offset = update["update_id"] + 1
|
||||
await self._process_update(update)
|
||||
|
||||
async def _process_update(self, update: dict) -> None:
|
||||
if "callback_query" in update:
|
||||
# Inline button pressed — may be expired; reply with fallback
|
||||
cbq = update["callback_query"]
|
||||
chat_id = str(cbq.get("from", {}).get("id", ""))
|
||||
if chat_id not in self._allowed:
|
||||
logger.info("Rejected callback_query from chat_id=%s", chat_id)
|
||||
return
|
||||
# Caller handles answerCallbackQuery; just note in audit
|
||||
self._audit.log({"event": "command_received", "action": "callback_query", "chat_id": chat_id})
|
||||
return
|
||||
|
||||
msg = update.get("message") or update.get("edited_message")
|
||||
if not msg:
|
||||
return
|
||||
|
||||
chat_id = str(msg.get("chat", {}).get("id", ""))
|
||||
if chat_id not in self._allowed:
|
||||
logger.info("Rejected message from chat_id=%s", chat_id)
|
||||
return
|
||||
|
||||
text = (msg.get("text") or "").strip().lower()
|
||||
cmd = self._parse_command(text)
|
||||
if cmd is None:
|
||||
return
|
||||
|
||||
self._audit.log({
|
||||
"event": "command_received",
|
||||
"action": cmd.action,
|
||||
"value": cmd.value,
|
||||
"chat_id": chat_id,
|
||||
})
|
||||
await self._cmd_queue.put(cmd)
|
||||
|
||||
def _parse_command(self, text: str) -> Command | None:
|
||||
t = text.lstrip("/").strip()
|
||||
if not t:
|
||||
return None
|
||||
if t == "stop":
|
||||
return Command(action="stop")
|
||||
if t == "status":
|
||||
return Command(action="status")
|
||||
if t in ("ss", "screenshot"):
|
||||
return Command(action="ss")
|
||||
if t == "pause":
|
||||
return Command(action="pause")
|
||||
if t == "resume":
|
||||
return Command(action="resume")
|
||||
if t == "resume force":
|
||||
# value=1 signals force: also lift canary drift-pause, not just user pause.
|
||||
return Command(action="resume", value=1)
|
||||
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
|
||||
parts = t.split()
|
||||
if len(parts) == 1 and parts[0].isdigit():
|
||||
return Command(action="set_interval", value=int(parts[0]) * 60)
|
||||
if len(parts) == 2 and parts[0] in ("interval", "set_interval") and parts[1].isdigit():
|
||||
return Command(action="set_interval", value=int(parts[1]) * 60)
|
||||
return None
|
||||
@@ -5,6 +5,9 @@ import tomllib
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
||||
|
||||
_VALID_WEEKDAYS: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
|
||||
|
||||
DotColor = Literal[
|
||||
"turquoise", "yellow",
|
||||
@@ -78,6 +81,9 @@ class DiscordCfg:
|
||||
class TelegramCfg:
|
||||
bot_token: str
|
||||
chat_id: str
|
||||
allowed_chat_ids: tuple[str, ...] = ()
|
||||
poll_timeout_s: int = 30
|
||||
auto_poll_interval_s: int = 180
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.bot_token or not self.chat_id:
|
||||
@@ -94,6 +100,43 @@ class AlertsCfg:
|
||||
trigger: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class OperatingHoursCfg:
|
||||
"""Session window: only run detection on allowed weekdays within HH:MM range.
|
||||
|
||||
Timezone is the source of truth for the exchange (default America/New_York
|
||||
for NYSE). Start/stop are compared against the clock in that timezone.
|
||||
Weekday check uses datetime.weekday() + a fixed MON..SUN list to stay
|
||||
locale-independent (strftime('%a') returns localized names).
|
||||
|
||||
The ZoneInfo is cached at config load time so the detection loop doesn't
|
||||
pay per-tick lookup cost.
|
||||
|
||||
NOTE: this dataclass is mutable (non-frozen) so Config._from_dict can stash
|
||||
the resolved ZoneInfo onto `_tz_cache` after validation. Treat fields as
|
||||
read-only at runtime.
|
||||
"""
|
||||
enabled: bool = False
|
||||
timezone: str = "America/New_York"
|
||||
weekdays: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI")
|
||||
start_hhmm: str = "09:30"
|
||||
stop_hhmm: str = "16:00"
|
||||
# Populated by Config._from_dict; None for disabled or failed-load cases.
|
||||
_tz_cache: ZoneInfo | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AlertBehaviorCfg:
|
||||
"""Alert behavior knobs (not screenshot toggles).
|
||||
|
||||
`fire_on_phase_skip`: backstop alert when FSM observes ARMED→light_{green,red}
|
||||
directly (skipping the dark prime phase — often means dark color was
|
||||
mis-classified as gray). Default True: missing a fire is worse than a noisy
|
||||
phase-skip alert. Disable via `[options.alerts] fire_on_phase_skip = false`.
|
||||
"""
|
||||
fire_on_phase_skip: bool = True
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Config:
|
||||
window_title: str
|
||||
@@ -114,6 +157,8 @@ class Config:
|
||||
phaseb_timeout_s: int = 600
|
||||
dead_letter_path: str = "logs/dead_letter.jsonl"
|
||||
attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg)
|
||||
alerts: AlertBehaviorCfg = field(default_factory=AlertBehaviorCfg)
|
||||
operating_hours: OperatingHoursCfg = field(default_factory=OperatingHoursCfg)
|
||||
config_version: str = "unknown"
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
@@ -156,9 +201,14 @@ class Config:
|
||||
drift_threshold=int(data["canary"].get("drift_threshold", 8)),
|
||||
)
|
||||
discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"])
|
||||
tg = data["telegram"]
|
||||
_allowed = [str(c) for c in tg.get("allowed_chat_ids", [])] or [str(tg["chat_id"])]
|
||||
telegram = TelegramCfg(
|
||||
bot_token=data["telegram"]["bot_token"],
|
||||
chat_id=str(data["telegram"]["chat_id"]),
|
||||
bot_token=tg["bot_token"],
|
||||
chat_id=str(tg["chat_id"]),
|
||||
allowed_chat_ids=tuple(_allowed),
|
||||
poll_timeout_s=int(tg.get("poll_timeout_s", 30)),
|
||||
auto_poll_interval_s=int(tg.get("auto_poll_interval_s", 180)),
|
||||
)
|
||||
opts = data.get("options", {})
|
||||
region = None
|
||||
@@ -176,6 +226,36 @@ class Config:
|
||||
)
|
||||
else:
|
||||
attach = AlertsCfg()
|
||||
|
||||
alerts_dict = opts.get("alerts", {}) or {}
|
||||
alert_behavior = AlertBehaviorCfg(
|
||||
fire_on_phase_skip=bool(alerts_dict.get("fire_on_phase_skip", True)),
|
||||
)
|
||||
|
||||
oh_dict = opts.get("operating_hours", {}) or {}
|
||||
oh_weekdays = tuple(
|
||||
str(w).upper() for w in oh_dict.get("weekdays", ("MON", "TUE", "WED", "THU", "FRI"))
|
||||
)
|
||||
for wd in oh_weekdays:
|
||||
if wd not in _VALID_WEEKDAYS:
|
||||
raise ValueError(
|
||||
f"operating_hours.weekdays contains invalid day {wd!r}; "
|
||||
f"expected any of {_VALID_WEEKDAYS}"
|
||||
)
|
||||
oh = OperatingHoursCfg(
|
||||
enabled=bool(oh_dict.get("enabled", False)),
|
||||
timezone=str(oh_dict.get("timezone", "America/New_York")),
|
||||
weekdays=oh_weekdays,
|
||||
start_hhmm=str(oh_dict.get("start_hhmm", "09:30")),
|
||||
stop_hhmm=str(oh_dict.get("stop_hhmm", "16:00")),
|
||||
)
|
||||
if oh.enabled:
|
||||
try:
|
||||
oh._tz_cache = ZoneInfo(oh.timezone)
|
||||
except ZoneInfoNotFoundError as exc:
|
||||
raise ValueError(
|
||||
f"operating_hours.timezone {oh.timezone!r} invalid: {exc}"
|
||||
) from exc
|
||||
return cls(
|
||||
window_title=data["window_title"],
|
||||
dot_roi=roi,
|
||||
@@ -195,5 +275,7 @@ class Config:
|
||||
phaseb_timeout_s=int(opts.get("phaseb_timeout_s", 600)),
|
||||
dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"),
|
||||
attach_screenshots=attach,
|
||||
alerts=alert_behavior,
|
||||
operating_hours=oh,
|
||||
config_version=version,
|
||||
)
|
||||
|
||||
@@ -28,6 +28,7 @@ class DetectionResult:
|
||||
match: ColorMatch | None # None if no dot
|
||||
accepted: bool # post-debounce; True only when match repeats debounce_depth times
|
||||
color: str | None # accepted color name (UNKNOWN excluded)
|
||||
dot_pos_abs: tuple[int, int] | None = None # absolute (x, y) in frame; set when dot_found
|
||||
|
||||
|
||||
class Detector:
|
||||
@@ -60,8 +61,14 @@ class Detector:
|
||||
self._debounce: deque[str | None] = deque(maxlen=cfg.debounce_depth)
|
||||
self._rolling: deque[DetectionResult] = deque(maxlen=20)
|
||||
|
||||
def step(self, ts: float) -> DetectionResult:
|
||||
frame = self._capture()
|
||||
def step(self, ts: float, frame=None) -> DetectionResult:
|
||||
"""Run one detection tick.
|
||||
|
||||
frame: pre-captured BGR ndarray (from asyncio.to_thread capture). When
|
||||
None (default), calls self._capture() — preserving the sync-loop behaviour.
|
||||
"""
|
||||
if frame is None:
|
||||
frame = self._capture()
|
||||
|
||||
if frame is None:
|
||||
self._debounce.append(None)
|
||||
@@ -117,6 +124,7 @@ class Detector:
|
||||
match=match,
|
||||
accepted=accepted,
|
||||
color=color,
|
||||
dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y),
|
||||
)
|
||||
self._rolling.append(r)
|
||||
return r
|
||||
|
||||
771
src/atm/main.py
771
src/atm/main.py
@@ -2,12 +2,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import contextlib
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Callable, Protocol, cast
|
||||
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
|
||||
|
||||
from atm.config import Config # stdlib-only (tomllib); safe at module level
|
||||
from atm.notifier import Alert
|
||||
@@ -89,6 +92,23 @@ def main(argv=None) -> None:
|
||||
help="Stop at local HH:MM (overrides --duration). If the time is in "
|
||||
"the past when the loop starts, rolls over to tomorrow.",
|
||||
)
|
||||
p_run.add_argument(
|
||||
"--tz", metavar="ZONE", default=None,
|
||||
help="Override operating_hours.timezone (e.g. America/New_York).",
|
||||
)
|
||||
p_run.add_argument(
|
||||
"--weekdays", metavar="DAYS", default=None,
|
||||
help="Override operating_hours.weekdays. Accepts comma list "
|
||||
"(MON,TUE) or range (MON-FRI).",
|
||||
)
|
||||
p_run.add_argument(
|
||||
"--oh-start", metavar="HH:MM", default=None,
|
||||
help="Override operating_hours.start_hhmm (exchange-local).",
|
||||
)
|
||||
p_run.add_argument(
|
||||
"--oh-stop", metavar="HH:MM", default=None,
|
||||
help="Override operating_hours.stop_hhmm (exchange-local).",
|
||||
)
|
||||
|
||||
# journal
|
||||
p_journal = sub.add_parser("journal", help="Add a trade journal entry interactively")
|
||||
@@ -179,6 +199,7 @@ def _cmd_dryrun(args) -> None:
|
||||
|
||||
def _cmd_run(args) -> None:
|
||||
cfg = Config.load_current(Path("configs"))
|
||||
cfg = _apply_operating_hours_cli_overrides(cfg, args)
|
||||
capture_stub = args.capture_stub or bool(os.environ.get("ATM_STUB_CAPTURE"))
|
||||
|
||||
# --start-at HH:MM: sleep until the next occurrence of that local wall-clock time
|
||||
@@ -238,6 +259,66 @@ def _cmd_run(args) -> None:
|
||||
run_live(cfg, duration_s=duration_s, capture_stub=capture_stub)
|
||||
|
||||
|
||||
_WEEKDAY_ORDER = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
|
||||
|
||||
|
||||
def _parse_weekdays_arg(raw: str) -> tuple[str, ...]:
|
||||
"""Accept 'MON,TUE,WED' or 'MON-FRI'. Case-insensitive."""
|
||||
txt = raw.strip().upper()
|
||||
if "-" in txt and "," not in txt:
|
||||
a, b = (p.strip() for p in txt.split("-", 1))
|
||||
if a not in _WEEKDAY_ORDER or b not in _WEEKDAY_ORDER:
|
||||
raise ValueError(f"unknown weekday(s) in range {raw!r}")
|
||||
i, j = _WEEKDAY_ORDER.index(a), _WEEKDAY_ORDER.index(b)
|
||||
if i > j:
|
||||
raise ValueError(f"weekday range reversed: {raw!r}")
|
||||
return tuple(_WEEKDAY_ORDER[i : j + 1])
|
||||
days = tuple(d.strip() for d in txt.split(",") if d.strip())
|
||||
for d in days:
|
||||
if d not in _WEEKDAY_ORDER:
|
||||
raise ValueError(f"unknown weekday {d!r} (valid: {_WEEKDAY_ORDER})")
|
||||
return days
|
||||
|
||||
|
||||
def _apply_operating_hours_cli_overrides(cfg, args):
|
||||
"""Return cfg (possibly new) with operating_hours overridden by CLI flags.
|
||||
|
||||
Config is a frozen dataclass, but operating_hours is non-frozen by design
|
||||
so we can tweak it in-place and recompute the tz cache. CLI flags implicitly
|
||||
enable operating_hours even if the TOML had it disabled.
|
||||
"""
|
||||
import dataclasses as _dc
|
||||
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
||||
|
||||
oh = cfg.operating_hours
|
||||
any_override = any(
|
||||
getattr(args, k, None)
|
||||
for k in ("tz", "weekdays", "oh_start", "oh_stop")
|
||||
)
|
||||
if not any_override:
|
||||
return cfg
|
||||
|
||||
new_tz = args.tz if args.tz else oh.timezone
|
||||
try:
|
||||
tz_cache = ZoneInfo(new_tz)
|
||||
except ZoneInfoNotFoundError as exc:
|
||||
sys.exit(f"--tz {new_tz!r} invalid: {exc}")
|
||||
|
||||
new_weekdays = _parse_weekdays_arg(args.weekdays) if args.weekdays else oh.weekdays
|
||||
new_start = args.oh_start if args.oh_start else oh.start_hhmm
|
||||
new_stop = args.oh_stop if args.oh_stop else oh.stop_hhmm
|
||||
oh.enabled = True
|
||||
oh.timezone = new_tz
|
||||
oh.weekdays = new_weekdays
|
||||
oh.start_hhmm = new_start
|
||||
oh.stop_hhmm = new_stop
|
||||
oh._tz_cache = tz_cache
|
||||
# Config is frozen but operating_hours is a mutable field object —
|
||||
# mutating it in place is sufficient; no dataclasses.replace needed.
|
||||
_ = _dc # keep import for future use
|
||||
return cfg
|
||||
|
||||
|
||||
def _cmd_journal(args) -> None:
|
||||
try:
|
||||
from atm.journal import Journal, prompt_entry
|
||||
@@ -390,6 +471,8 @@ def _save_annotated_frame(
|
||||
label: str,
|
||||
now: float,
|
||||
audit: _AuditLike | None = None,
|
||||
dot_pos_abs: "tuple[int, int] | None" = None,
|
||||
canary_ok: bool = True,
|
||||
) -> "Path | None":
|
||||
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
|
||||
|
||||
@@ -397,6 +480,10 @@ def _save_annotated_frame(
|
||||
audit (when provided) so disk-full / permission issues don't become silent
|
||||
regressions. Never raises — snapshot is a best-effort enhancement, the
|
||||
text alert must still go out.
|
||||
|
||||
dot_pos_abs + canary_ok: when both are set the price overlay is drawn
|
||||
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted
|
||||
since calibration may be stale.
|
||||
"""
|
||||
try:
|
||||
import cv2 # type: ignore[import-untyped]
|
||||
@@ -413,6 +500,22 @@ def _save_annotated_frame(
|
||||
annotated = frame.copy()
|
||||
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
|
||||
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
|
||||
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
|
||||
try:
|
||||
_, dot_y = dot_pos_abs
|
||||
ya = cfg.y_axis
|
||||
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
|
||||
price = ya.p1_price + (dot_y - ya.p1_y) * slope
|
||||
w_frame = annotated.shape[1]
|
||||
text = f"${price:.2f}"
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
scale, thickness = 1.2, 3
|
||||
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
|
||||
tx, ty = w_frame - tw - 10, th + 10
|
||||
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
|
||||
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
|
||||
except Exception:
|
||||
pass # price overlay is best-effort; never break the screenshot
|
||||
cv2.imwrite(str(fpath), annotated)
|
||||
return fpath
|
||||
except Exception as exc:
|
||||
@@ -432,6 +535,7 @@ def _handle_tick(
|
||||
audit: _AuditLike,
|
||||
first_accepted: bool,
|
||||
snapshot: Snapshot | None = None,
|
||||
cfg: Any = None,
|
||||
) -> Transition | None:
|
||||
"""Feed FSM for a single accepted color and dispatch arm/prime/late_start
|
||||
alerts. Returns the final Transition, or None when the color triggered a
|
||||
@@ -535,10 +639,464 @@ def _handle_tick(
|
||||
image_path=snap(prime_kind, prime_label),
|
||||
direction=direction,
|
||||
))
|
||||
# PHASE_SKIP fire backstop: ARMED→light_{green,red} directly (dark was missed).
|
||||
# Emits a fire-equivalent alert when cfg.alerts.fire_on_phase_skip (default True).
|
||||
# Uses public FSM lockout API (is_locked/record_fire) to reuse the standard
|
||||
# 240s dedupe window so bouncing detectors do not spam the user.
|
||||
elif tr.reason == "phase_skip" and color in ("light_green", "light_red"):
|
||||
flag_on = True
|
||||
if cfg is not None:
|
||||
alerts_cfg = getattr(cfg, "alerts", None)
|
||||
if alerts_cfg is not None:
|
||||
flag_on = bool(getattr(alerts_cfg, "fire_on_phase_skip", True))
|
||||
if flag_on:
|
||||
direction = "BUY" if color == "light_green" else "SELL"
|
||||
if not fsm.is_locked(direction, now):
|
||||
fsm.record_fire(direction, now)
|
||||
dark_name = "dark_green" if direction == "BUY" else "dark_red"
|
||||
notifier.send(Alert(
|
||||
kind="phase_skip_fire",
|
||||
title=f"PHASE SKIP {direction} — {dark_name} nu a fost detectat",
|
||||
body=(
|
||||
"Verifică chart-ul manual. Posibil necalibrare culoare "
|
||||
f"(observat {color} direct după armare)."
|
||||
),
|
||||
image_path=snap("phase_skip", f"phase_skip_{direction.lower()}"),
|
||||
direction=direction,
|
||||
))
|
||||
return tr
|
||||
|
||||
|
||||
@dataclass
|
||||
class _TickSyncResult:
|
||||
frame: Any = None
|
||||
res: Any = None # DetectionResult | None
|
||||
tr: Any = None # Transition | None
|
||||
first_consumed: bool = False
|
||||
late_start: bool = False
|
||||
new_color: str | None = None # corpus sample color when changed
|
||||
|
||||
|
||||
@dataclass
|
||||
class RunContext:
|
||||
"""Dependencies passed to module-scope detection-loop helpers.
|
||||
|
||||
Keeps `_run_tick`, `_handle_fsm_result`, `_drain_cmd_queue`, and
|
||||
`_dispatch_command` at module scope so they are directly unit-testable
|
||||
without reconstructing `run_live_async`.
|
||||
"""
|
||||
cfg: Any
|
||||
capture: Callable
|
||||
canary: Any
|
||||
detector: Any
|
||||
fsm: Any
|
||||
notifier: _NotifierLike
|
||||
audit: _AuditLike
|
||||
detection_log: _AuditLike
|
||||
scheduler: Any
|
||||
samples_dir: Path
|
||||
fires_dir: Path
|
||||
cmd_queue: Any # asyncio.Queue[Command]
|
||||
state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start
|
||||
levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now)
|
||||
lifecycle: Any = None # LifecycleState — window + user_paused tracking
|
||||
|
||||
|
||||
@dataclass
|
||||
class _LoopState:
|
||||
"""Per-loop mutable state (previously closure nonlocals)."""
|
||||
first_accepted: bool = True
|
||||
last_saved_color: str | None = None
|
||||
levels_extractor: Any = None
|
||||
fire_count: int = 0
|
||||
start: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class LifecycleState:
|
||||
"""Tracks user-pause / out-of-window state across detection ticks.
|
||||
|
||||
last_window_state: None at startup so _maybe_log_transition can seed it
|
||||
without emitting a spurious market_open alert on the first in-window tick.
|
||||
"""
|
||||
user_paused: bool = False
|
||||
last_window_state: str | None = None # "open" / "closed" / None (uninitialized)
|
||||
|
||||
|
||||
# Locale-independent weekday names; index matches datetime.weekday() (MON=0).
|
||||
_WEEKDAY_NAMES: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
|
||||
|
||||
|
||||
def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None:
|
||||
"""Return a reason string if detection should be skipped, else None.
|
||||
|
||||
Order: user_paused > canary drift > operating-hours window. Uses the
|
||||
ZoneInfo cached on cfg.operating_hours._tz_cache (populated at config load)
|
||||
to avoid per-tick tz lookup cost.
|
||||
"""
|
||||
if state.user_paused:
|
||||
return "user_paused"
|
||||
if getattr(canary, "is_paused", False):
|
||||
return "drift_paused"
|
||||
oh = getattr(cfg, "operating_hours", None)
|
||||
if oh is None or not oh.enabled:
|
||||
return None
|
||||
tz = getattr(oh, "_tz_cache", None)
|
||||
if tz is None:
|
||||
# Enabled but no tz resolved — skip the check rather than crash mid-loop.
|
||||
return None
|
||||
now_exchange = datetime.fromtimestamp(now_ts, tz=tz)
|
||||
# weekday() = 0..6 (MON..SUN). Locale-free; strftime('%a') is not.
|
||||
if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays:
|
||||
return "out_of_window_weekend"
|
||||
hhmm = now_exchange.strftime("%H:%M")
|
||||
if hhmm < oh.start_hhmm or hhmm >= oh.stop_hhmm:
|
||||
return "out_of_window_hours"
|
||||
return None
|
||||
|
||||
|
||||
def _maybe_log_transition(
|
||||
reason: str | None,
|
||||
state: LifecycleState,
|
||||
now: float,
|
||||
audit: _AuditLike,
|
||||
notifier: _NotifierLike,
|
||||
) -> None:
|
||||
"""Log market_open / market_closed exactly once per transition.
|
||||
|
||||
Startup guard (R2): when last_window_state is None we just seed it; no
|
||||
alert/audit event is emitted for the initial evaluation. This prevents a
|
||||
spurious market_open alert when run_live_async starts in-window.
|
||||
"""
|
||||
if reason is None:
|
||||
window_reason = "open"
|
||||
elif reason.startswith("out_of_window"):
|
||||
window_reason = "closed"
|
||||
else:
|
||||
# user_paused / drift_paused don't change market window state
|
||||
return
|
||||
|
||||
if window_reason == state.last_window_state:
|
||||
return
|
||||
|
||||
if state.last_window_state is None:
|
||||
state.last_window_state = window_reason
|
||||
return
|
||||
|
||||
event_name = "market_open" if window_reason == "open" else "market_closed"
|
||||
audit.log({"ts": now, "event": event_name, "reason": reason})
|
||||
body = (
|
||||
"Piața închisă — monitorizare pauzată până la următoarea deschidere"
|
||||
if event_name == "market_closed"
|
||||
else "Piața deschisă — monitorizare reluată"
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="status",
|
||||
title=event_name.replace("_", " ").title(),
|
||||
body=body,
|
||||
))
|
||||
state.last_window_state = window_reason
|
||||
|
||||
|
||||
def _sync_detection_tick(
|
||||
capture: Callable,
|
||||
canary: Any,
|
||||
cfg: Any,
|
||||
detector: Any,
|
||||
fsm: Any,
|
||||
notifier: _NotifierLike,
|
||||
audit: _AuditLike,
|
||||
detection_log: _AuditLike,
|
||||
fires_dir: Path,
|
||||
first_accepted: bool,
|
||||
last_saved_color: "str | None",
|
||||
now: float,
|
||||
samples_dir: Path,
|
||||
) -> _TickSyncResult:
|
||||
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread."""
|
||||
frame = capture()
|
||||
if frame is None:
|
||||
audit.log({"ts": now, "event": "window_lost"})
|
||||
return _TickSyncResult()
|
||||
|
||||
cr = canary.check(frame)
|
||||
if canary.is_paused:
|
||||
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
|
||||
return _TickSyncResult(frame=frame)
|
||||
|
||||
res = detector.step(now, frame)
|
||||
detection_log.log({
|
||||
"ts": now, "event": "frame",
|
||||
"window_found": res.window_found,
|
||||
"dot_found": res.dot_found,
|
||||
"rgb": list(res.rgb) if res.rgb is not None else None,
|
||||
"match_name": res.match.name if res.match is not None else None,
|
||||
"distance": round(res.match.distance, 2) if res.match is not None else None,
|
||||
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
|
||||
"accepted": res.accepted,
|
||||
"color": res.color,
|
||||
})
|
||||
|
||||
if not (res.accepted and res.color):
|
||||
return _TickSyncResult(frame=frame, res=res)
|
||||
|
||||
is_first = first_accepted
|
||||
|
||||
def _snapshot(kind: str, label: str) -> "Path | None":
|
||||
if not getattr(cfg.attach_screenshots, kind, True):
|
||||
return None
|
||||
return _save_annotated_frame(
|
||||
frame, cfg, fires_dir, label, now, audit=audit,
|
||||
dot_pos_abs=getattr(res, "dot_pos_abs", None),
|
||||
canary_ok=True,
|
||||
)
|
||||
|
||||
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot, cfg=cfg)
|
||||
|
||||
if tr is None:
|
||||
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True)
|
||||
|
||||
new_color: str | None = None
|
||||
if res.color != last_saved_color:
|
||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
|
||||
try:
|
||||
import cv2 # type: ignore[import-untyped]
|
||||
cv2.imwrite(str(sample_path), frame)
|
||||
except Exception as exc:
|
||||
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
|
||||
new_color = res.color
|
||||
|
||||
if tr.trigger and not tr.locked:
|
||||
fire_path: "Path | None" = None
|
||||
if cfg.attach_screenshots.trigger:
|
||||
fire_path = _save_annotated_frame(
|
||||
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
|
||||
dot_pos_abs=getattr(res, "dot_pos_abs", None),
|
||||
canary_ok=True,
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="trigger",
|
||||
title=f"Semnal {tr.trigger}",
|
||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=fire_path,
|
||||
direction=tr.trigger,
|
||||
))
|
||||
|
||||
return _TickSyncResult(
|
||||
frame=frame, res=res, tr=tr,
|
||||
first_consumed=is_first, new_color=new_color,
|
||||
)
|
||||
|
||||
|
||||
async def _run_tick(ctx: RunContext) -> _TickSyncResult:
|
||||
"""Execute one `_sync_detection_tick` in a thread; returns result or empty.
|
||||
|
||||
Lifecycle gating (user pause / operating hours / drift) happens here, not
|
||||
inside the sync tick, so the async loop can still drain commands and emit
|
||||
market_open / market_closed transitions even when the heavy detection
|
||||
work is skipped.
|
||||
"""
|
||||
now = time.time()
|
||||
if ctx.lifecycle is not None:
|
||||
skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary)
|
||||
_maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier)
|
||||
if skip is not None:
|
||||
# No detection this tick. Empty result → _handle_fsm_result no-op.
|
||||
return _TickSyncResult()
|
||||
return await asyncio.to_thread(
|
||||
_sync_detection_tick,
|
||||
ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm,
|
||||
ctx.notifier, ctx.audit, ctx.detection_log,
|
||||
ctx.fires_dir, ctx.state.first_accepted, ctx.state.last_saved_color,
|
||||
now, ctx.samples_dir,
|
||||
)
|
||||
|
||||
|
||||
async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None:
|
||||
"""Scheduler start/stop + levels extraction. No-op if res is None/late_start."""
|
||||
if result.first_consumed:
|
||||
ctx.state.first_accepted = False
|
||||
if result.new_color is not None:
|
||||
ctx.state.last_saved_color = result.new_color
|
||||
|
||||
tr = result.tr
|
||||
res = result.res
|
||||
|
||||
if result.late_start or res is None:
|
||||
return
|
||||
|
||||
if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None):
|
||||
if tr.reason == "prime" and not ctx.scheduler.is_running:
|
||||
ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
|
||||
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running:
|
||||
ctx.scheduler.stop()
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
|
||||
|
||||
if tr is not None and tr.trigger and not tr.locked:
|
||||
ctx.state.fire_count += 1
|
||||
if ctx.scheduler.is_running:
|
||||
ctx.scheduler.stop()
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
|
||||
ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time())
|
||||
|
||||
if ctx.state.levels_extractor is not None and result.frame is not None:
|
||||
lr = ctx.state.levels_extractor.step(result.frame, time.time())
|
||||
if lr.status in ("complete", "timeout"):
|
||||
if lr.status == "complete" and lr.levels:
|
||||
ctx.notifier.send(Alert(
|
||||
kind="levels",
|
||||
title="Niveluri",
|
||||
body=(
|
||||
f"SL={lr.levels.sl} "
|
||||
f"TP1={lr.levels.tp1} "
|
||||
f"TP2={lr.levels.tp2}"
|
||||
),
|
||||
))
|
||||
ctx.state.levels_extractor = None
|
||||
|
||||
|
||||
async def _dispatch_command(ctx: RunContext, cmd) -> None:
|
||||
"""Process a single Command. Exceptions bubble — caller wraps in try/except."""
|
||||
cfg = ctx.cfg
|
||||
if cmd.action == "set_interval":
|
||||
secs = cmd.value or cfg.telegram.auto_poll_interval_s
|
||||
ctx.scheduler.start(secs)
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
|
||||
ctx.notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
|
||||
elif cmd.action == "stop":
|
||||
if ctx.scheduler.is_running:
|
||||
ctx.scheduler.stop()
|
||||
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
|
||||
ctx.notifier.send(Alert(kind="status", title="Polling oprit", body=""))
|
||||
else:
|
||||
ctx.notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
|
||||
elif cmd.action == "status":
|
||||
uptime_s = time.monotonic() - ctx.state.start
|
||||
last_roll = ctx.detector.rolling[-1] if ctx.detector.rolling else None
|
||||
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—"
|
||||
last_color = (
|
||||
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—"
|
||||
) if last_roll else "—"
|
||||
sched_info = (
|
||||
f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ"
|
||||
) if ctx.scheduler.is_running else "oprit"
|
||||
canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok"
|
||||
|
||||
# Active / pause reason + window state
|
||||
active_info = "activ"
|
||||
window_info = "—"
|
||||
if ctx.lifecycle is not None:
|
||||
skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
|
||||
if skip is not None:
|
||||
active_info = f"pauzat:{skip}"
|
||||
oh = getattr(ctx.cfg, "operating_hours", None)
|
||||
if oh is not None and oh.enabled:
|
||||
window_info = ctx.lifecycle.last_window_state or "—"
|
||||
else:
|
||||
window_info = "always_on"
|
||||
|
||||
body = (
|
||||
f"Stare: {ctx.fsm.state.value}\n"
|
||||
f"Activ: {active_info} | Fereastră: {window_info}\n"
|
||||
f"Ultima detecție: {last_color} (conf {last_conf})\n"
|
||||
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n"
|
||||
f"Poller: {sched_info} | Canary: {canary_info}"
|
||||
)
|
||||
ctx.notifier.send(Alert(kind="status", title="ATM Status", body=body))
|
||||
elif cmd.action == "ss":
|
||||
now_ss = time.time()
|
||||
frame_ss = await asyncio.to_thread(ctx.capture)
|
||||
if frame_ss is None:
|
||||
ctx.notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
))
|
||||
return
|
||||
path_ss = await asyncio.to_thread(
|
||||
_save_annotated_frame, frame_ss, ctx.cfg, ctx.fires_dir, "ss", now_ss, ctx.audit,
|
||||
)
|
||||
ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
|
||||
ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
|
||||
elif cmd.action == "pause":
|
||||
# User manually stops monitoring. Canary drift state is untouched.
|
||||
if ctx.lifecycle is not None:
|
||||
ctx.lifecycle.user_paused = True
|
||||
ctx.audit.log({"ts": time.time(), "event": "user_paused"})
|
||||
ctx.notifier.send(Alert(
|
||||
kind="status",
|
||||
title="Monitorizare oprită manual",
|
||||
body="Folosește /resume pentru a relua.",
|
||||
))
|
||||
elif cmd.action == "resume":
|
||||
# R2: /resume clears only user_paused. Canary drift requires
|
||||
# /resume force (value == 1) so the user acknowledges the risk.
|
||||
was_drift = bool(getattr(ctx.canary, "is_paused", False))
|
||||
was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False
|
||||
force = cmd.value == 1
|
||||
if ctx.lifecycle is not None:
|
||||
ctx.lifecycle.user_paused = False
|
||||
if force and was_drift:
|
||||
ctx.canary.resume()
|
||||
ctx.audit.log({
|
||||
"ts": time.time(), "event": "user_resumed",
|
||||
"was_drift": was_drift, "was_user": was_user, "force": force,
|
||||
})
|
||||
# Adaptive response
|
||||
if was_drift and not force:
|
||||
title = "Pauză user eliminată — dar Canary drift activ"
|
||||
body = (
|
||||
"Trimite /resume force pentru a anula drift-pause. "
|
||||
"Recalibrează dacă driftul persistă."
|
||||
)
|
||||
elif force and was_drift:
|
||||
title = "Drift-pause anulat manual (force)"
|
||||
body = "Dacă driftul persistă, Canary va repauza."
|
||||
else:
|
||||
skip_now = None
|
||||
if ctx.lifecycle is not None:
|
||||
skip_now = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
|
||||
if skip_now and skip_now.startswith("out_of_window"):
|
||||
title = "Pauză eliminată — piața e închisă acum"
|
||||
body = "Monitorizarea va porni la următoarea fereastră."
|
||||
else:
|
||||
title = "Monitorizare reluată"
|
||||
body = ""
|
||||
ctx.notifier.send(Alert(kind="status", title=title, body=body))
|
||||
|
||||
|
||||
async def _drain_cmd_queue(ctx: RunContext) -> None:
|
||||
"""Drain all pending commands, isolating each dispatch in try/except.
|
||||
|
||||
CRITICAL: this MUST run every loop iteration, unconditionally, even when
|
||||
the detection tick returned nothing (canary paused, out-of-window, etc.).
|
||||
Prior bug: the main loop `continue`'d past this drain when res=None,
|
||||
causing commands to accumulate indefinitely while canary was drifted.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
cmd = ctx.cmd_queue.get_nowait()
|
||||
except asyncio.QueueEmpty:
|
||||
return
|
||||
try:
|
||||
await _dispatch_command(ctx, cmd)
|
||||
except Exception as exc:
|
||||
ctx.audit.log({
|
||||
"ts": time.time(), "event": "command_error",
|
||||
"action": cmd.action, "error": str(exc),
|
||||
})
|
||||
print(f"ERR command_dispatch /{cmd.action}: {exc}", flush=True)
|
||||
ctx.notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(exc)))
|
||||
|
||||
|
||||
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
"""Sync entry point — delegates to asyncio event loop."""
|
||||
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
|
||||
|
||||
|
||||
async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
"""Main live monitoring loop. Imports are lazy to keep --help fast."""
|
||||
try:
|
||||
from atm.detector import Detector
|
||||
@@ -548,14 +1106,38 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
from atm.notifier.discord import DiscordNotifier
|
||||
from atm.notifier.telegram import TelegramNotifier
|
||||
from atm.audit import AuditLog
|
||||
from atm.commands import TelegramPoller, Command
|
||||
from atm.scheduler import ScreenshotScheduler
|
||||
except ImportError as exc:
|
||||
sys.exit(f"run-loop dependencies not available: {exc}")
|
||||
|
||||
capture = _build_capture(cfg, capture_stub=capture_stub)
|
||||
detector = Detector(cfg, capture)
|
||||
fsm = StateMachine(lockout_s=cfg.lockout_s)
|
||||
canary = Canary(cfg, pause_flag_path=Path("logs/pause.flag"))
|
||||
audit = AuditLog(Path("logs"))
|
||||
|
||||
# Forward-declare notifier so the canary pause callback can close over it.
|
||||
# The notifier is constructed a few lines below once backends exist.
|
||||
_notifier_ref: dict = {}
|
||||
|
||||
def _on_canary_pause(distance: int) -> None:
|
||||
audit.log({"ts": time.time(), "event": "canary_drift_paused", "distance": distance})
|
||||
n = _notifier_ref.get("n")
|
||||
if n is not None:
|
||||
n.send(Alert(
|
||||
kind="warn",
|
||||
title=f"Canary drift={distance} — monitorizare pauzată",
|
||||
body=(
|
||||
"Fereastra/paleta s-a schimbat. Trimite /resume pentru a relua "
|
||||
"sau recalibrează."
|
||||
),
|
||||
))
|
||||
|
||||
canary = Canary(
|
||||
cfg,
|
||||
pause_flag_path=Path("logs/pause.flag"),
|
||||
on_pause_callback=_on_canary_pause,
|
||||
)
|
||||
detection_log = AuditLog(Path("logs/detections"))
|
||||
backends = [
|
||||
DiscordNotifier(cfg.discord.webhook_url),
|
||||
@@ -563,7 +1145,6 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
]
|
||||
|
||||
def _on_drop(backend_name: str, dropped: Alert) -> None:
|
||||
"""Audit la depășire coadă — face eșecul silențios vizibil."""
|
||||
audit.log({
|
||||
"ts": time.time(),
|
||||
"event": "queue_overflow_drop",
|
||||
@@ -573,8 +1154,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
})
|
||||
|
||||
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
|
||||
_notifier_ref["n"] = notifier
|
||||
|
||||
# Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea.
|
||||
# Initial frame + canary check
|
||||
first_frame = capture()
|
||||
if first_frame is None:
|
||||
print("WARN: first capture returned None — window/region missing", flush=True)
|
||||
@@ -584,9 +1166,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
|
||||
if first_check.drifted:
|
||||
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
|
||||
canary.resume() # clear the auto-pause so user can Ctrl+C and fix
|
||||
canary.resume()
|
||||
|
||||
dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
|
||||
dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
|
||||
notifier.send(Alert(
|
||||
kind="heartbeat",
|
||||
title="ATM pornit",
|
||||
@@ -598,106 +1180,55 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
|
||||
|
||||
start = time.monotonic()
|
||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
||||
levels_extractor = None
|
||||
last_saved_color: str | None = None
|
||||
first_accepted = True
|
||||
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
|
||||
samples_dir = Path("samples")
|
||||
samples_dir.mkdir(exist_ok=True)
|
||||
fires_dir = Path("logs") / "fires"
|
||||
fires_dir.mkdir(parents=True, exist_ok=True)
|
||||
import cv2 # type: ignore[import-untyped]
|
||||
|
||||
try:
|
||||
while duration_s is None or (time.monotonic() - start) < duration_s:
|
||||
now = time.time()
|
||||
frame = capture()
|
||||
if frame is None:
|
||||
audit.log({"ts": now, "event": "window_lost"})
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
# canary check
|
||||
cr = canary.check(frame)
|
||||
if canary.is_paused:
|
||||
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
# detection
|
||||
res = detector.step(now)
|
||||
detection_log.log({
|
||||
"ts": now,
|
||||
"event": "frame",
|
||||
"window_found": res.window_found,
|
||||
"dot_found": res.dot_found,
|
||||
"rgb": list(res.rgb) if res.rgb is not None else None,
|
||||
"match_name": res.match.name if res.match is not None else None,
|
||||
"distance": round(res.match.distance, 2) if res.match is not None else None,
|
||||
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
|
||||
"accepted": res.accepted,
|
||||
"color": res.color,
|
||||
})
|
||||
if res.accepted and res.color:
|
||||
is_first = first_accepted
|
||||
first_accepted = False
|
||||
import cv2 # noqa: F401 fail fast if cv2 is missing # type: ignore[import-untyped]
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Per-iteration closure — binds current frame/now, gates on config.
|
||||
def _snapshot(kind: str, label: str) -> "Path | None":
|
||||
if not getattr(cfg.attach_screenshots, kind, True):
|
||||
return None
|
||||
return _save_annotated_frame(
|
||||
frame, cfg, fires_dir, label, now, audit=audit,
|
||||
)
|
||||
cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
|
||||
loop_state = _LoopState(first_accepted=True, last_saved_color=None,
|
||||
levels_extractor=None, fire_count=0, start=start)
|
||||
|
||||
tr = _handle_tick(
|
||||
fsm, res.color, now, notifier, audit, is_first,
|
||||
snapshot=_snapshot,
|
||||
)
|
||||
if tr is None:
|
||||
# pornire târzie: FSM neatins, sari peste FIRE + salvare corpus
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
# corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară
|
||||
if res.color != last_saved_color:
|
||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
|
||||
try:
|
||||
cv2.imwrite(str(sample_path), frame)
|
||||
except Exception as exc:
|
||||
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
|
||||
last_saved_color = res.color
|
||||
# FIRE: adnotează frame-ul + salvează, atașează la alertă
|
||||
if tr.trigger and not tr.locked:
|
||||
fire_path: "Path | None" = None
|
||||
if cfg.attach_screenshots.trigger:
|
||||
fire_path = _save_annotated_frame(
|
||||
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="trigger",
|
||||
title=f"Semnal {tr.trigger}",
|
||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=fire_path,
|
||||
direction=tr.trigger,
|
||||
))
|
||||
levels_extractor = LevelsExtractor(cfg, tr.trigger, now)
|
||||
# phase-B levels
|
||||
if levels_extractor is not None:
|
||||
lr = levels_extractor.step(frame, now)
|
||||
if lr.status in ("complete", "timeout"):
|
||||
if lr.status == "complete" and lr.levels:
|
||||
notifier.send(Alert(
|
||||
kind="levels",
|
||||
title="Niveluri",
|
||||
body=(
|
||||
f"SL={lr.levels.sl} "
|
||||
f"TP1={lr.levels.tp1} "
|
||||
f"TP2={lr.levels.tp2}"
|
||||
),
|
||||
))
|
||||
levels_extractor = None
|
||||
# heartbeat — include statistici per-backend ca eșecurile silențioase
|
||||
# să apară la fiecare 30 min fără să aștepte oprirea.
|
||||
if time.time() > heartbeat_due:
|
||||
def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
|
||||
return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
|
||||
|
||||
scheduler = ScreenshotScheduler(
|
||||
capture=capture,
|
||||
save_fn=_bound_save,
|
||||
notifier=notifier,
|
||||
audit=audit,
|
||||
)
|
||||
poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
|
||||
|
||||
lifecycle = LifecycleState()
|
||||
# Seed lifecycle.last_window_state with the current status so we don't emit
|
||||
# a spurious market_open alert on the very first tick (R2).
|
||||
_pre_skip = _should_skip(time.time(), lifecycle, cfg, canary)
|
||||
_maybe_log_transition(_pre_skip, lifecycle, time.time(), audit, notifier)
|
||||
|
||||
ctx = RunContext(
|
||||
cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm,
|
||||
notifier=notifier, audit=audit, detection_log=detection_log,
|
||||
scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir,
|
||||
cmd_queue=cmd_queue, state=loop_state,
|
||||
levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts),
|
||||
lifecycle=lifecycle,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Nested async coroutines — heartbeat captures notifier + heartbeat_due
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _heartbeat_loop() -> None:
|
||||
nonlocal heartbeat_due
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
if time.monotonic() > heartbeat_due:
|
||||
try:
|
||||
stats = notifier.stats()
|
||||
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
|
||||
@@ -710,9 +1241,40 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
|
||||
except Exception:
|
||||
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
|
||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
|
||||
|
||||
async def _detection_loop() -> None:
|
||||
while True:
|
||||
if duration_s is not None and (time.monotonic() - start) >= duration_s:
|
||||
break
|
||||
result = await _run_tick(ctx)
|
||||
await _handle_fsm_result(ctx, result)
|
||||
await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang
|
||||
await asyncio.sleep(cfg.loop_interval_s)
|
||||
|
||||
# Launch background tasks
|
||||
t_scheduler = asyncio.create_task(scheduler.run(), name="scheduler")
|
||||
t_poller = asyncio.create_task(poller.run(), name="poller")
|
||||
t_heartbeat = asyncio.create_task(_heartbeat_loop(), name="heartbeat")
|
||||
|
||||
try:
|
||||
await _detection_loop()
|
||||
finally:
|
||||
# 7-step graceful shutdown
|
||||
# 1. cancel scheduler
|
||||
t_scheduler.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
await t_scheduler
|
||||
# 2. cancel poller
|
||||
t_poller.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
await t_poller
|
||||
# 3. cancel heartbeat
|
||||
t_heartbeat.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
await t_heartbeat
|
||||
# 4. drain detection — complete (we awaited _detection_loop directly)
|
||||
# 5. send shutdown alert
|
||||
try:
|
||||
stats = notifier.stats()
|
||||
lines = [f"după {time.monotonic() - start:.0f}s"]
|
||||
@@ -721,13 +1283,12 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
f"{name}: sent={s['sent']} failed={s['failed']} "
|
||||
f"dropped={s['dropped']} retries={s['retries']}"
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="heartbeat", title="ATM oprit",
|
||||
body="\n".join(lines),
|
||||
))
|
||||
notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines)))
|
||||
except Exception:
|
||||
pass
|
||||
# 6. notifier.stop() — flush + join FanoutNotifier threads
|
||||
notifier.stop()
|
||||
# 7. audit.close()
|
||||
audit.close()
|
||||
detection_log.close()
|
||||
|
||||
|
||||
@@ -5,11 +5,13 @@ from typing import Protocol
|
||||
|
||||
@dataclass
|
||||
class Alert:
|
||||
kind: str # "trigger" | "heartbeat" | "levels" | "warn" | "arm" | "prime" | "late_start"
|
||||
# flat union: "trigger"|"heartbeat"|"levels"|"warn"|"arm"|"prime"|"late_start"|"screenshot"|"status"
|
||||
kind: str
|
||||
title: str
|
||||
body: str
|
||||
image_path: Path | None = None # annotated screenshot
|
||||
direction: str | None = None # "BUY"/"SELL" when kind=trigger
|
||||
silent: bool = False # disable_notification for Telegram; ignored by Discord
|
||||
|
||||
|
||||
class Notifier(Protocol):
|
||||
|
||||
@@ -33,6 +33,7 @@ class TelegramNotifier:
|
||||
"chat_id": self._chat_id,
|
||||
"caption": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_notification": str(alert.silent).lower(),
|
||||
},
|
||||
files={"photo": fh},
|
||||
timeout=10,
|
||||
@@ -44,6 +45,7 @@ class TelegramNotifier:
|
||||
"chat_id": self._chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_notification": alert.silent,
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
118
src/atm/scheduler.py
Normal file
118
src/atm/scheduler.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""ScreenshotScheduler — periodic capture + annotate + send.
|
||||
|
||||
Runs as an asyncio task. capture() and cv2 work execute in asyncio.to_thread
|
||||
to avoid blocking the event loop. Decision 13: scheduler calls capture()
|
||||
directly, NOT via Detector.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from .notifier import Alert
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ScreenshotScheduler:
|
||||
"""Periodic screenshot sender.
|
||||
|
||||
Constructor params are explicit (decision 11 outside-voice finding).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
capture: Callable, # () -> ndarray | None
|
||||
save_fn: Callable, # (frame, label, now) -> Path | None
|
||||
notifier, # _NotifierLike
|
||||
audit, # _AuditLike
|
||||
interval_s: int | None = None,
|
||||
) -> None:
|
||||
self._capture = capture
|
||||
self._save_fn = save_fn
|
||||
self._notifier = notifier
|
||||
self._audit = audit
|
||||
self._interval_s = interval_s
|
||||
self._is_running = False
|
||||
self._next_due: float | None = None # monotonic
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public state
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._is_running
|
||||
|
||||
@property
|
||||
def interval_s(self) -> int | None:
|
||||
return self._interval_s
|
||||
|
||||
@property
|
||||
def next_due(self) -> float | None:
|
||||
return self._next_due
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Control (called from async event loop)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def start(self, interval_s: int) -> None:
|
||||
self._interval_s = interval_s
|
||||
self._is_running = True
|
||||
self._next_due = time.monotonic() + interval_s
|
||||
|
||||
def stop(self) -> None:
|
||||
self._is_running = False
|
||||
self._next_due = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Task body
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def run(self) -> None:
|
||||
"""Runs until cancelled."""
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
if not self._is_running or self._next_due is None:
|
||||
continue
|
||||
if time.monotonic() >= self._next_due:
|
||||
await self._take_screenshot()
|
||||
if self._is_running and self._interval_s is not None:
|
||||
self._next_due = time.monotonic() + self._interval_s
|
||||
|
||||
async def _take_screenshot(self) -> None:
|
||||
now = time.time()
|
||||
try:
|
||||
frame = await asyncio.to_thread(self._capture)
|
||||
except Exception as exc:
|
||||
logger.warning("ScreenshotScheduler capture failed: %s", exc)
|
||||
self._audit.log({"ts": now, "event": "screenshot_sent", "status": "capture_failed", "error": str(exc)})
|
||||
self._notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
silent=True,
|
||||
))
|
||||
return
|
||||
|
||||
if frame is None:
|
||||
self._notifier.send(Alert(
|
||||
kind="warn",
|
||||
title="Captură eșuată — verificați fereastra TradeStation",
|
||||
body="",
|
||||
silent=True,
|
||||
))
|
||||
return
|
||||
|
||||
path = await asyncio.to_thread(self._save_fn, frame, "poll", now)
|
||||
self._audit.log({"ts": now, "event": "screenshot_sent", "path": str(path) if path else None})
|
||||
self._notifier.send(Alert(
|
||||
kind="screenshot",
|
||||
title="Screenshot periodic",
|
||||
body="",
|
||||
image_path=path,
|
||||
silent=True,
|
||||
))
|
||||
@@ -232,3 +232,20 @@ class StateMachine:
|
||||
if last is None:
|
||||
return False
|
||||
return (ts - last) < self._lockout_s
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public lockout API — used by fire_on_phase_skip handler outside the
|
||||
# FSM. Mirrors _is_locked / _last_fire without leaking private attrs.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def is_locked(self, direction: str, ts: float) -> bool:
|
||||
"""True if a FIRE in `direction` at ts would be within the lockout window."""
|
||||
return self._is_locked(direction, ts)
|
||||
|
||||
def record_fire(self, direction: str, ts: float) -> None:
|
||||
"""Mark a FIRE for `direction` at ts, starting the lockout timer.
|
||||
|
||||
Used by backstop handlers (e.g. fire_on_phase_skip) that emit a
|
||||
fire-equivalent alert without going through the natural FSM path.
|
||||
"""
|
||||
self._last_fire[direction] = ts
|
||||
|
||||
@@ -140,6 +140,52 @@ def test_pause_file_written(tmp_path: Path) -> None:
|
||||
assert flag.exists()
|
||||
|
||||
|
||||
def test_canary_pause_callback_fires_once() -> None:
|
||||
"""Single-shot: callback invoked exactly once per not_paused→paused edge."""
|
||||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||||
calls: list[int] = []
|
||||
|
||||
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
|
||||
|
||||
canary.check(DRIFTED_FRAME) # transition → callback fires
|
||||
canary.check(DRIFTED_FRAME) # still paused → no new callback
|
||||
canary.check(BASELINE_FRAME) # clean but still paused → no new callback
|
||||
|
||||
assert len(calls) == 1
|
||||
assert calls[0] > 0 # distance should be positive
|
||||
|
||||
|
||||
def test_canary_resume_allows_new_pause_notification() -> None:
|
||||
"""After resume, a fresh drift must re-fire the callback."""
|
||||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||||
calls: list[int] = []
|
||||
|
||||
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
|
||||
|
||||
canary.check(DRIFTED_FRAME)
|
||||
assert len(calls) == 1
|
||||
|
||||
canary.resume()
|
||||
canary.check(DRIFTED_FRAME) # new pause transition
|
||||
|
||||
assert len(calls) == 2
|
||||
|
||||
|
||||
def test_canary_pause_callback_exception_does_not_crash_check() -> None:
|
||||
"""A failing callback must not break canary.check (detection cycle safety)."""
|
||||
cfg = _cfg_with_baseline(BASELINE_FRAME)
|
||||
|
||||
def _boom(_d: int) -> None:
|
||||
raise RuntimeError("notifier down")
|
||||
|
||||
canary = Canary(cfg, on_pause_callback=_boom)
|
||||
|
||||
# Must not raise — exception is swallowed + logged.
|
||||
result = canary.check(DRIFTED_FRAME)
|
||||
assert result.paused is True
|
||||
assert canary.is_paused is True
|
||||
|
||||
|
||||
def test_resume_deletes_pause_file(tmp_path: Path) -> None:
|
||||
"""resume() deletes the pause flag file."""
|
||||
flag = tmp_path / "paused.flag"
|
||||
|
||||
45
tests/test_commands.py
Normal file
45
tests/test_commands.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""Tests for atm.commands — /pause /resume parsing (Commit 5)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from atm.commands import Command, TelegramPoller
|
||||
|
||||
|
||||
def _make_poller() -> TelegramPoller:
|
||||
cfg = MagicMock()
|
||||
cfg.bot_token = "tok"
|
||||
cfg.chat_id = "123"
|
||||
cfg.allowed_chat_ids = ("123",)
|
||||
cfg.poll_timeout_s = 1
|
||||
return TelegramPoller(cfg, MagicMock(), MagicMock())
|
||||
|
||||
|
||||
def test_parse_pause():
|
||||
p = _make_poller()
|
||||
assert p._parse_command("pause") == Command(action="pause")
|
||||
assert p._parse_command("/pause") == Command(action="pause")
|
||||
|
||||
|
||||
def test_parse_resume_plain():
|
||||
p = _make_poller()
|
||||
assert p._parse_command("resume") == Command(action="resume")
|
||||
assert p._parse_command("/resume") == Command(action="resume")
|
||||
|
||||
|
||||
def test_parse_resume_force():
|
||||
p = _make_poller()
|
||||
# "resume force" → value=1 signals force-resume of canary drift
|
||||
cmd = p._parse_command("resume force")
|
||||
assert cmd is not None
|
||||
assert cmd.action == "resume"
|
||||
assert cmd.value == 1
|
||||
|
||||
|
||||
def test_parse_existing_commands_still_work():
|
||||
"""Regression: adding pause/resume must not break stop/status/ss/interval."""
|
||||
p = _make_poller()
|
||||
assert p._parse_command("stop") == Command(action="stop")
|
||||
assert p._parse_command("status") == Command(action="status")
|
||||
assert p._parse_command("ss") == Command(action="ss")
|
||||
assert p._parse_command("3") == Command(action="set_interval", value=180)
|
||||
@@ -97,3 +97,59 @@ def test_attach_screenshots_unknown_keys_ignored() -> None:
|
||||
}))
|
||||
assert cfg.attach_screenshots.arm is False
|
||||
# Should not raise even with unknown key
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Commit 3: AlertBehaviorCfg (fire_on_phase_skip)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_alerts_default_fire_on_phase_skip_true() -> None:
|
||||
cfg = Config._from_dict(_with_opts({}))
|
||||
assert cfg.alerts.fire_on_phase_skip is True
|
||||
|
||||
|
||||
def test_alerts_fire_on_phase_skip_can_be_disabled() -> None:
|
||||
cfg = Config._from_dict(_with_opts({"alerts": {"fire_on_phase_skip": False}}))
|
||||
assert cfg.alerts.fire_on_phase_skip is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Commit 4: OperatingHoursCfg parsing + tz cache
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_operating_hours_default_disabled() -> None:
|
||||
cfg = Config._from_dict(_with_opts({}))
|
||||
assert cfg.operating_hours.enabled is False
|
||||
assert cfg.operating_hours.timezone == "America/New_York"
|
||||
assert cfg.operating_hours._tz_cache is None
|
||||
|
||||
|
||||
def test_operating_hours_enabled_caches_tz() -> None:
|
||||
cfg = Config._from_dict(_with_opts({
|
||||
"operating_hours": {
|
||||
"enabled": True,
|
||||
"timezone": "America/New_York",
|
||||
"weekdays": ["MON", "TUE", "WED", "THU", "FRI"],
|
||||
"start_hhmm": "09:30",
|
||||
"stop_hhmm": "16:00",
|
||||
}
|
||||
}))
|
||||
assert cfg.operating_hours.enabled is True
|
||||
assert cfg.operating_hours._tz_cache is not None
|
||||
assert str(cfg.operating_hours._tz_cache) == "America/New_York"
|
||||
|
||||
|
||||
def test_operating_hours_invalid_tz_raises_valueerror() -> None:
|
||||
import pytest
|
||||
with pytest.raises(ValueError, match="operating_hours.timezone"):
|
||||
Config._from_dict(_with_opts({
|
||||
"operating_hours": {"enabled": True, "timezone": "Not/A_Zone"},
|
||||
}))
|
||||
|
||||
|
||||
def test_operating_hours_invalid_weekday_raises_valueerror() -> None:
|
||||
import pytest
|
||||
with pytest.raises(ValueError, match="weekdays"):
|
||||
Config._from_dict(_with_opts({
|
||||
"operating_hours": {"enabled": True, "weekdays": ["XYZ"]},
|
||||
}))
|
||||
|
||||
@@ -10,6 +10,8 @@ Covers the six cases from the arm+prime notification plan:
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
from atm.main import _handle_tick
|
||||
from atm.notifier import Alert
|
||||
from atm.state_machine import State, StateMachine
|
||||
@@ -486,3 +488,82 @@ def test_save_annotated_frame_succeeds(tmp_path, monkeypatch):
|
||||
assert "BUY" in result.name
|
||||
assert len(written) == 1
|
||||
assert not any(e.get("event") == "snapshot_fail" for e in audit.events)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Commit 3: fire_on_phase_skip backstop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _cfg_with_flag(enabled: bool):
|
||||
return SimpleNamespace(alerts=SimpleNamespace(fire_on_phase_skip=enabled))
|
||||
|
||||
|
||||
def test_phase_skip_fire_when_flag_on():
|
||||
"""ARMED_SELL → light_red directly with flag=True → phase_skip_fire alert."""
|
||||
fsm = StateMachine(lockout_s=240)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
|
||||
# Arm SELL (yellow from IDLE)
|
||||
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False,
|
||||
cfg=_cfg_with_flag(True))
|
||||
assert fsm.state == State.ARMED_SELL
|
||||
notif.alerts.clear()
|
||||
|
||||
# ARMED_SELL → light_red (skips dark_red) → phase_skip_fire
|
||||
tr = _handle_tick(fsm, "light_red", 2.0, notif, audit, first_accepted=False,
|
||||
cfg=_cfg_with_flag(True))
|
||||
assert tr is not None and tr.reason == "phase_skip"
|
||||
|
||||
ps_alerts = [a for a in notif.alerts if a.kind == "phase_skip_fire"]
|
||||
assert len(ps_alerts) == 1
|
||||
assert ps_alerts[0].direction == "SELL"
|
||||
assert "SELL" in ps_alerts[0].title
|
||||
|
||||
|
||||
def test_phase_skip_no_fire_when_flag_off():
|
||||
"""Same scenario, flag=False → no phase_skip_fire emitted."""
|
||||
fsm = StateMachine(lockout_s=240)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
|
||||
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False,
|
||||
cfg=_cfg_with_flag(False))
|
||||
notif.alerts.clear()
|
||||
|
||||
_handle_tick(fsm, "light_red", 2.0, notif, audit, first_accepted=False,
|
||||
cfg=_cfg_with_flag(False))
|
||||
|
||||
ps_alerts = [a for a in notif.alerts if a.kind == "phase_skip_fire"]
|
||||
assert ps_alerts == []
|
||||
|
||||
|
||||
def test_phase_skip_lockout_suppresses_spam():
|
||||
"""Two phase_skip events within lockout_s → only the first emits an alert."""
|
||||
fsm = StateMachine(lockout_s=240)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
cfg = _cfg_with_flag(True)
|
||||
|
||||
# First cycle
|
||||
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg)
|
||||
_handle_tick(fsm, "light_red", 2.0, notif, audit, first_accepted=False, cfg=cfg)
|
||||
# Second arm + phase_skip well within 240s
|
||||
_handle_tick(fsm, "yellow", 60.0, notif, audit, first_accepted=False, cfg=cfg)
|
||||
_handle_tick(fsm, "light_red", 61.0, notif, audit, first_accepted=False, cfg=cfg)
|
||||
|
||||
ps_alerts = [a for a in notif.alerts if a.kind == "phase_skip_fire"]
|
||||
assert len(ps_alerts) == 1, (
|
||||
f"expected 1 phase_skip_fire (lockout), got {len(ps_alerts)}"
|
||||
)
|
||||
|
||||
|
||||
def test_state_machine_is_locked_and_record_fire_public_api():
|
||||
"""Public lockout helpers mirror the private _is_locked / _last_fire behavior."""
|
||||
fsm = StateMachine(lockout_s=100)
|
||||
assert fsm.is_locked("BUY", 0.0) is False
|
||||
|
||||
fsm.record_fire("BUY", 10.0)
|
||||
assert fsm.is_locked("BUY", 50.0) is True # within 100s
|
||||
assert fsm.is_locked("BUY", 150.0) is False # past lockout
|
||||
assert fsm.is_locked("SELL", 50.0) is False # other direction unaffected
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Tests for atm.main unified CLI."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -186,7 +187,7 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
]
|
||||
def __init__(self, *a, **kw):
|
||||
self._i = 0
|
||||
def step(self, ts):
|
||||
def step(self, ts, frame=None):
|
||||
if self._i >= len(self._script):
|
||||
raise _StopLoop
|
||||
color, accepted = self._script[self._i]
|
||||
@@ -228,6 +229,17 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
def step(self, *a, **kw):
|
||||
return types.SimpleNamespace(status="pending", levels=None)
|
||||
|
||||
class _StubPoller:
|
||||
def __init__(self, *a, **kw): pass
|
||||
async def run(self): await asyncio.sleep(9999)
|
||||
|
||||
class _StubScheduler:
|
||||
def __init__(self, *a, **kw):
|
||||
self.is_running = False
|
||||
def start(self, interval_s): self.is_running = True
|
||||
def stop(self): self.is_running = False
|
||||
async def run(self): await asyncio.sleep(9999)
|
||||
|
||||
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
||||
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
||||
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
||||
@@ -237,6 +249,8 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
||||
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
||||
monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
|
||||
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
|
||||
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
|
||||
|
||||
with pytest.raises(_StopLoop):
|
||||
_main.run_live(cfg, duration_s=None)
|
||||
@@ -255,3 +269,713 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
|
||||
assert len(trigger) == 1
|
||||
assert trigger[0].direction == "SELL"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MUST-HAVE: async lifecycle integration test
|
||||
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
|
||||
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
|
||||
import numpy as np
|
||||
import atm.main as _main
|
||||
from atm.detector import DetectionResult
|
||||
|
||||
captured_alerts: list = []
|
||||
scheduler_events: list[str] = []
|
||||
|
||||
class FakeFanout:
|
||||
def __init__(self, *a, **kw): pass
|
||||
def send(self, alert): captured_alerts.append(alert)
|
||||
def stop(self): pass
|
||||
def stats(self): return {}
|
||||
|
||||
class FakeCanaryResult:
|
||||
distance = 0
|
||||
drifted = False
|
||||
paused = False
|
||||
|
||||
class FakeCanary:
|
||||
def __init__(self, *a, **kw): self.is_paused = False
|
||||
def check(self, frame): return FakeCanaryResult()
|
||||
def resume(self): pass
|
||||
|
||||
# Scheduler tracks start/stop calls
|
||||
class FakeScheduler:
|
||||
def __init__(self, *a, **kw):
|
||||
self.is_running = False
|
||||
self.interval_s = None
|
||||
def start(self, interval_s):
|
||||
self.is_running = True
|
||||
self.interval_s = interval_s
|
||||
scheduler_events.append(f"start:{interval_s}")
|
||||
def stop(self):
|
||||
self.is_running = False
|
||||
scheduler_events.append("stop")
|
||||
async def run(self):
|
||||
await asyncio.sleep(9999)
|
||||
|
||||
class FakePoller:
|
||||
def __init__(self, *a, **kw): pass
|
||||
async def run(self): await asyncio.sleep(9999)
|
||||
|
||||
class _StopLoop(Exception): pass
|
||||
|
||||
class ScriptedDetector:
|
||||
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
|
||||
_script = [
|
||||
("turquoise", True),
|
||||
("dark_green", True),
|
||||
("light_green", True),
|
||||
]
|
||||
def __init__(self, *a, **kw): self._i = 0
|
||||
def step(self, ts, frame=None):
|
||||
if self._i >= len(self._script):
|
||||
raise _StopLoop
|
||||
color, accepted = self._script[self._i]
|
||||
self._i += 1
|
||||
return DetectionResult(ts=ts, window_found=True, dot_found=True,
|
||||
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
|
||||
@property
|
||||
def rolling(self): return []
|
||||
|
||||
def fake_build_capture(cfg, capture_stub=False):
|
||||
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
|
||||
|
||||
cfg = MagicMock()
|
||||
cfg.lockout_s = 60
|
||||
cfg.heartbeat_min = 999
|
||||
cfg.loop_interval_s = 0
|
||||
cfg.config_version = "test"
|
||||
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
|
||||
cfg.canary.drift_threshold = 10
|
||||
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
|
||||
cfg.chart_window_region = None
|
||||
cfg.telegram.auto_poll_interval_s = 180
|
||||
cfg.telegram.bot_token = "tok"
|
||||
cfg.telegram.chat_id = "123"
|
||||
cfg.telegram.allowed_chat_ids = ("123",)
|
||||
|
||||
fake_sched = FakeScheduler()
|
||||
|
||||
monkeypatch.chdir(tmp_path)
|
||||
|
||||
class _Stub:
|
||||
def __init__(self, *a, **kw): pass
|
||||
def log(self, *a, **kw): pass
|
||||
def close(self, *a, **kw): pass
|
||||
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
|
||||
|
||||
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
|
||||
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
|
||||
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
|
||||
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
|
||||
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
|
||||
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
|
||||
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
|
||||
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
|
||||
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
|
||||
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
|
||||
|
||||
with pytest.raises(_StopLoop):
|
||||
await _main.run_live_async(cfg, duration_s=None)
|
||||
|
||||
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
|
||||
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
|
||||
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
|
||||
|
||||
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
|
||||
assert arm_alerts[0].direction == "BUY"
|
||||
|
||||
assert len(prime_alerts) == 1
|
||||
assert prime_alerts[0].direction == "BUY"
|
||||
|
||||
assert len(trigger_alerts) == 1
|
||||
assert trigger_alerts[0].direction == "BUY"
|
||||
|
||||
# Scheduler must have started (on PRIME) and stopped (on FIRE)
|
||||
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
|
||||
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
|
||||
start_idx = scheduler_events.index("start:180")
|
||||
stop_idx = scheduler_events.index("stop")
|
||||
assert start_idx < stop_idx, "scheduler started after it stopped"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally,
|
||||
# even when canary is paused or when detection is otherwise skipped.
|
||||
# Prior bug: `continue` past the drain loop caused commands to pile up.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_ctx_for_drain(cmd_queue, dispatched: list):
|
||||
"""Build a minimal RunContext where _dispatch_command just records calls."""
|
||||
import atm.main as _main
|
||||
|
||||
class _FakeAudit:
|
||||
def __init__(self): self.events = []
|
||||
def log(self, e): self.events.append(e)
|
||||
|
||||
class _FakeNotifier:
|
||||
def __init__(self): self.alerts = []
|
||||
def send(self, a): self.alerts.append(a)
|
||||
|
||||
class _FakeCanary:
|
||||
def __init__(self, paused=True):
|
||||
self.is_paused = paused
|
||||
|
||||
class _FakeScheduler:
|
||||
is_running = False
|
||||
interval_s = None
|
||||
def start(self, s): pass
|
||||
def stop(self): pass
|
||||
|
||||
state = _main._LoopState(start=0.0)
|
||||
ctx = _main.RunContext(
|
||||
cfg=MagicMock(),
|
||||
capture=lambda: None,
|
||||
canary=_FakeCanary(paused=True),
|
||||
detector=MagicMock(),
|
||||
fsm=MagicMock(),
|
||||
notifier=_FakeNotifier(),
|
||||
audit=_FakeAudit(),
|
||||
detection_log=_FakeAudit(),
|
||||
scheduler=_FakeScheduler(),
|
||||
samples_dir=Path("."),
|
||||
fires_dir=Path("."),
|
||||
cmd_queue=cmd_queue,
|
||||
state=state,
|
||||
levels_extractor_factory=lambda *a, **kw: None,
|
||||
)
|
||||
return ctx
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_works_when_canary_paused(monkeypatch):
|
||||
"""Regression: when canary.is_paused, _drain_cmd_queue still dispatches.
|
||||
|
||||
Prior bug: detection loop `continue`'d past the drain block whenever the
|
||||
tick returned res=None (canary paused). Commands accumulated forever.
|
||||
"""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
await q.put(Command(action="status"))
|
||||
await q.put(Command(action="ss"))
|
||||
|
||||
dispatched: list = []
|
||||
|
||||
async def _fake_dispatch(ctx, cmd):
|
||||
dispatched.append(cmd.action)
|
||||
|
||||
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
|
||||
|
||||
ctx = _make_ctx_for_drain(q, dispatched)
|
||||
|
||||
await _main._drain_cmd_queue(ctx)
|
||||
|
||||
assert dispatched == ["status", "ss"]
|
||||
assert q.empty()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_works_when_out_of_window(monkeypatch):
|
||||
"""Drain must still fire when the tick skipped (e.g. out of operating hours).
|
||||
|
||||
The refactored loop runs _drain_cmd_queue unconditionally after every tick,
|
||||
regardless of `_TickSyncResult` content.
|
||||
"""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
await q.put(Command(action="stop"))
|
||||
|
||||
dispatched: list = []
|
||||
|
||||
async def _fake_dispatch(ctx, cmd):
|
||||
dispatched.append(cmd.action)
|
||||
|
||||
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
|
||||
|
||||
ctx = _make_ctx_for_drain(q, dispatched)
|
||||
# Simulate out-of-window tick (empty _TickSyncResult, no res)
|
||||
await _main._handle_fsm_result(ctx, _main._TickSyncResult())
|
||||
await _main._drain_cmd_queue(ctx)
|
||||
|
||||
assert dispatched == ["stop"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_isolates_dispatch_exceptions(monkeypatch):
|
||||
"""If one command raises, remaining commands still drain + warn alert sent."""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
await q.put(Command(action="status"))
|
||||
await q.put(Command(action="ss"))
|
||||
|
||||
attempts: list = []
|
||||
|
||||
async def _fake_dispatch(ctx, cmd):
|
||||
attempts.append(cmd.action)
|
||||
if cmd.action == "status":
|
||||
raise RuntimeError("boom")
|
||||
|
||||
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
|
||||
|
||||
ctx = _make_ctx_for_drain(q, attempts)
|
||||
await _main._drain_cmd_queue(ctx)
|
||||
|
||||
assert attempts == ["status", "ss"]
|
||||
# warn alert for the failed command
|
||||
warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"]
|
||||
assert any("status" in t for t in warn_titles)
|
||||
# command_error audit event
|
||||
errs = [e for e in ctx.audit.events if e.get("event") == "command_error"]
|
||||
assert len(errs) == 1 and errs[0]["action"] == "status"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Commit 4: operating hours + LifecycleState transitions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
from zoneinfo import ZoneInfo as _ZI # noqa: E402
|
||||
import datetime as _dt # noqa: E402
|
||||
|
||||
|
||||
def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"),
|
||||
start="09:30", stop="16:00", tz="America/New_York"):
|
||||
"""Build a lightweight cfg-like object with operating_hours populated."""
|
||||
oh = types.SimpleNamespace(
|
||||
enabled=enabled,
|
||||
timezone=tz,
|
||||
weekdays=weekdays,
|
||||
start_hhmm=start,
|
||||
stop_hhmm=stop,
|
||||
_tz_cache=_ZI(tz) if enabled else None,
|
||||
)
|
||||
return types.SimpleNamespace(operating_hours=oh)
|
||||
|
||||
|
||||
def _fake_canary(paused=False):
|
||||
return types.SimpleNamespace(is_paused=paused)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"local_dt,expected",
|
||||
[
|
||||
# Monday 09:30 NY — exact open → active (None)
|
||||
(_dt.datetime(2026, 4, 20, 9, 30), None),
|
||||
# Monday 16:00 NY — exact close → inactive (>= stop)
|
||||
(_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"),
|
||||
# Monday 08:00 NY — before open
|
||||
(_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"),
|
||||
# Monday 12:00 NY — active
|
||||
(_dt.datetime(2026, 4, 20, 12, 0), None),
|
||||
# Saturday 12:00 NY — weekend
|
||||
(_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"),
|
||||
# Sunday 23:00 NY — weekend
|
||||
(_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"),
|
||||
],
|
||||
)
|
||||
def test_operating_hours_skip_matrix(local_dt, expected):
|
||||
"""Timezone-aware start/stop + weekday checks."""
|
||||
import atm.main as _main
|
||||
|
||||
cfg = _oh_cfg()
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
now_ts = local_dt.replace(tzinfo=tz).timestamp()
|
||||
|
||||
lifecycle = _main.LifecycleState()
|
||||
result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary())
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_market_open_close_transitions_logged_once():
|
||||
"""Crossing a boundary emits exactly one market_open / market_closed event."""
|
||||
import atm.main as _main
|
||||
|
||||
audit_events = []
|
||||
alerts = []
|
||||
|
||||
class _A:
|
||||
def log(self, e): audit_events.append(e)
|
||||
|
||||
class _N:
|
||||
def send(self, a): alerts.append(a)
|
||||
|
||||
cfg = _oh_cfg()
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
lifecycle = _main.LifecycleState()
|
||||
canary = _fake_canary()
|
||||
|
||||
# Prime as closed (before open, Monday 08:00)
|
||||
pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp()
|
||||
skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary)
|
||||
_main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N())
|
||||
# First evaluation seeds state, no alert yet.
|
||||
assert lifecycle.last_window_state == "closed"
|
||||
assert alerts == []
|
||||
assert audit_events == []
|
||||
|
||||
# Transition to open
|
||||
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
|
||||
skip_mid = _main._should_skip(mid, lifecycle, cfg, canary)
|
||||
_main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N())
|
||||
assert lifecycle.last_window_state == "open"
|
||||
assert len(alerts) == 1
|
||||
assert any(e.get("event") == "market_open" for e in audit_events)
|
||||
|
||||
# Repeated open tick — no duplicate log
|
||||
alerts.clear()
|
||||
audit_events.clear()
|
||||
skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary)
|
||||
_main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N())
|
||||
assert alerts == []
|
||||
assert audit_events == []
|
||||
|
||||
# Transition to close
|
||||
close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp()
|
||||
skip_close = _main._should_skip(close, lifecycle, cfg, canary)
|
||||
_main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N())
|
||||
assert lifecycle.last_window_state == "closed"
|
||||
assert any(e.get("event") == "market_closed" for e in audit_events)
|
||||
|
||||
|
||||
def test_market_transition_sends_notification():
|
||||
"""market_open / market_closed transitions produce kind=status alerts."""
|
||||
import atm.main as _main
|
||||
|
||||
alerts = []
|
||||
|
||||
class _A:
|
||||
def log(self, e): pass
|
||||
|
||||
class _N:
|
||||
def send(self, a): alerts.append(a)
|
||||
|
||||
cfg = _oh_cfg()
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
lifecycle = _main.LifecycleState(last_window_state="closed")
|
||||
|
||||
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
|
||||
_main._maybe_log_transition(None, lifecycle, mid, _A(), _N())
|
||||
assert len(alerts) == 1
|
||||
assert alerts[0].kind == "status"
|
||||
assert "market" in alerts[0].title.lower() or "piața" in alerts[0].body.lower()
|
||||
|
||||
|
||||
def test_startup_in_window_suppresses_market_open():
|
||||
"""R2 #20: first evaluation in-window just seeds state; no alert fires."""
|
||||
import atm.main as _main
|
||||
|
||||
alerts = []
|
||||
events = []
|
||||
|
||||
class _A:
|
||||
def log(self, e): events.append(e)
|
||||
|
||||
class _N:
|
||||
def send(self, a): alerts.append(a)
|
||||
|
||||
cfg = _oh_cfg()
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
lifecycle = _main.LifecycleState() # last_window_state is None
|
||||
|
||||
in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
|
||||
skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary())
|
||||
assert skip is None
|
||||
_main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N())
|
||||
|
||||
# Seeded silently
|
||||
assert lifecycle.last_window_state == "open"
|
||||
assert alerts == []
|
||||
assert not any(e.get("event") == "market_open" for e in events)
|
||||
|
||||
# Two more ticks, still in-window → no spurious alert
|
||||
for _ in range(2):
|
||||
skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary())
|
||||
_main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N())
|
||||
assert alerts == []
|
||||
|
||||
|
||||
def test_operating_hours_weekday_locale_independent():
|
||||
"""R2 #22: weekday check must not depend on process locale (strftime('%a'))."""
|
||||
import locale as _locale
|
||||
import atm.main as _main
|
||||
|
||||
cfg = _oh_cfg()
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
# Saturday 12:00 NY
|
||||
sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
|
||||
|
||||
original = _locale.setlocale(_locale.LC_TIME)
|
||||
try:
|
||||
for loc in ("C", "de_DE.UTF-8"):
|
||||
try:
|
||||
_locale.setlocale(_locale.LC_TIME, loc)
|
||||
except _locale.Error:
|
||||
continue # locale not installed → skip gracefully
|
||||
lifecycle = _main.LifecycleState()
|
||||
result = _main._should_skip(sat, lifecycle, cfg, _fake_canary())
|
||||
assert result == "out_of_window_weekend", (
|
||||
f"locale={loc} returned {result!r}"
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
_locale.setlocale(_locale.LC_TIME, original)
|
||||
except _locale.Error:
|
||||
_locale.setlocale(_locale.LC_TIME, "C")
|
||||
|
||||
|
||||
def test_should_skip_user_paused_wins():
|
||||
import atm.main as _main
|
||||
cfg = _oh_cfg()
|
||||
lifecycle = _main.LifecycleState(user_paused=True)
|
||||
# Mid-Monday (in-window) — should still skip because user_paused
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
|
||||
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused"
|
||||
|
||||
|
||||
def test_should_skip_canary_drift_wins_over_window():
|
||||
import atm.main as _main
|
||||
cfg = _oh_cfg()
|
||||
lifecycle = _main.LifecycleState()
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
|
||||
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _dispatch_ctx(canary=None, lifecycle=None, cfg=None):
|
||||
"""Minimal RunContext for _dispatch_command unit tests."""
|
||||
import atm.main as _main
|
||||
|
||||
class _A:
|
||||
def __init__(self): self.events = []
|
||||
def log(self, e): self.events.append(e)
|
||||
|
||||
class _N:
|
||||
def __init__(self): self.alerts = []
|
||||
def send(self, a): self.alerts.append(a)
|
||||
|
||||
class _S:
|
||||
is_running = False
|
||||
interval_s = None
|
||||
def start(self, s): self.is_running = True
|
||||
def stop(self): self.is_running = False
|
||||
|
||||
if canary is None:
|
||||
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
|
||||
if lifecycle is None:
|
||||
lifecycle = _main.LifecycleState()
|
||||
if cfg is None:
|
||||
cfg = MagicMock()
|
||||
cfg.telegram.auto_poll_interval_s = 180
|
||||
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
|
||||
|
||||
state = _main._LoopState(start=0.0)
|
||||
ctx = _main.RunContext(
|
||||
cfg=cfg, capture=lambda: None, canary=canary,
|
||||
detector=MagicMock(), fsm=MagicMock(),
|
||||
notifier=_N(), audit=_A(), detection_log=_A(),
|
||||
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
|
||||
cmd_queue=MagicMock(), state=state,
|
||||
levels_extractor_factory=lambda *a, **kw: None,
|
||||
lifecycle=lifecycle,
|
||||
)
|
||||
return ctx
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pause_command_sets_user_paused_and_skips_detection():
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
ctx = _dispatch_ctx()
|
||||
await _main._dispatch_command(ctx, Command(action="pause"))
|
||||
|
||||
assert ctx.lifecycle.user_paused is True
|
||||
# When combined with _should_skip, we get user_paused
|
||||
assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused"
|
||||
# Audit + notif
|
||||
assert any(e.get("event") == "user_paused" for e in ctx.audit.events)
|
||||
assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resume_clears_user_paused_and_canary_when_forced():
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
canary_state = {"paused": True}
|
||||
canary = types.SimpleNamespace(
|
||||
is_paused=True,
|
||||
resume=lambda: canary_state.__setitem__("paused", False),
|
||||
)
|
||||
# Re-bind is_paused via property so resume() effect is visible
|
||||
class _Canary:
|
||||
def __init__(self): self._p = True
|
||||
@property
|
||||
def is_paused(self): return self._p
|
||||
def resume(self): self._p = False
|
||||
canary = _Canary()
|
||||
|
||||
ctx = _dispatch_ctx(canary=canary)
|
||||
ctx.lifecycle.user_paused = True
|
||||
|
||||
await _main._dispatch_command(ctx, Command(action="resume", value=1))
|
||||
|
||||
assert ctx.lifecycle.user_paused is False
|
||||
assert canary.is_paused is False
|
||||
force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"]
|
||||
assert force_events and force_events[0]["force"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resume_during_drift_keeps_canary_paused_without_force():
|
||||
"""R2 #21: plain /resume during drift clears user_paused but NOT canary."""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
class _Canary:
|
||||
def __init__(self): self._p = True
|
||||
@property
|
||||
def is_paused(self): return self._p
|
||||
def resume(self): self._p = False
|
||||
canary = _Canary()
|
||||
|
||||
ctx = _dispatch_ctx(canary=canary)
|
||||
ctx.lifecycle.user_paused = True
|
||||
|
||||
await _main._dispatch_command(ctx, Command(action="resume")) # no force
|
||||
|
||||
assert ctx.lifecycle.user_paused is False
|
||||
assert canary.is_paused is True # still drift-paused
|
||||
# Message must mention drift
|
||||
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
|
||||
assert status and ("drift" in (status[0].title + status[0].body).lower())
|
||||
|
||||
# Now force
|
||||
ctx.notifier.alerts.clear()
|
||||
await _main._dispatch_command(ctx, Command(action="resume", value=1))
|
||||
assert canary.is_paused is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resume_out_of_window_responds_with_pending_message():
|
||||
"""/resume while operating-hours window is closed → special body."""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
cfg = _oh_cfg()
|
||||
tz = cfg.operating_hours._tz_cache
|
||||
lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed")
|
||||
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
|
||||
|
||||
ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg)
|
||||
|
||||
# Pin time to Saturday
|
||||
import atm.main as _mm
|
||||
real_time = _mm.time
|
||||
fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
|
||||
class _FakeTime:
|
||||
def time(self): return fake_ts
|
||||
def monotonic(self): return 0.0
|
||||
_mm.time = _FakeTime()
|
||||
try:
|
||||
await _main._dispatch_command(ctx, Command(action="resume"))
|
||||
finally:
|
||||
_mm.time = real_time
|
||||
|
||||
assert ctx.lifecycle.user_paused is False
|
||||
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
|
||||
assert status
|
||||
combined = (status[0].title + status[0].body).lower()
|
||||
assert "închis" in combined or "piața" in combined or "ferestr" in combined
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_command_reports_pause_reason():
|
||||
"""/status body must mention pause reason + window state."""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
ctx = _dispatch_ctx()
|
||||
ctx.lifecycle.user_paused = True
|
||||
# Stub detector.rolling for status
|
||||
ctx.detector.rolling = []
|
||||
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
|
||||
|
||||
await _main._dispatch_command(ctx, Command(action="status"))
|
||||
|
||||
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
|
||||
assert status
|
||||
body = status[0].body
|
||||
assert "user_paused" in body or "pauzat:user_paused" in body
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path):
|
||||
"""E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert.
|
||||
|
||||
This test verifies the full command-driven lifecycle in isolation:
|
||||
- canary starts drift-paused, _should_skip returns drift_paused
|
||||
- /resume force clears canary + user_paused
|
||||
- subsequent detection produces SELL fire through normal FSM path
|
||||
"""
|
||||
import atm.main as _main
|
||||
from atm.commands import Command
|
||||
|
||||
# Canary with mutable pause state
|
||||
class _Canary:
|
||||
def __init__(self): self._p = True
|
||||
@property
|
||||
def is_paused(self): return self._p
|
||||
def resume(self): self._p = False
|
||||
|
||||
canary = _Canary()
|
||||
cfg = MagicMock()
|
||||
cfg.telegram.auto_poll_interval_s = 180
|
||||
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
|
||||
|
||||
ctx = _dispatch_ctx(canary=canary, cfg=cfg)
|
||||
|
||||
# 1. While drift-paused, _should_skip returns drift_paused
|
||||
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused"
|
||||
|
||||
# 2. User issues /resume force
|
||||
await _main._dispatch_command(ctx, Command(action="resume", value=1))
|
||||
assert canary.is_paused is False
|
||||
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None
|
||||
|
||||
# 3. Feed a yellow→light_red sequence through _handle_tick (FSM path)
|
||||
from atm.state_machine import StateMachine, State
|
||||
fsm = StateMachine(lockout_s=60)
|
||||
|
||||
class _N:
|
||||
def __init__(self): self.alerts = []
|
||||
def send(self, a): self.alerts.append(a)
|
||||
|
||||
class _A:
|
||||
def log(self, _e): pass
|
||||
|
||||
notif = _N()
|
||||
audit = _A()
|
||||
cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True))
|
||||
|
||||
_main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock)
|
||||
_main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock)
|
||||
tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock)
|
||||
|
||||
# FSM reached fire via normal path
|
||||
assert tr is not None and tr.trigger == "SELL"
|
||||
assert fsm.state == State.IDLE
|
||||
|
||||
Reference in New Issue
Block a user