Compare commits

...

17 Commits

Author SHA1 Message Date
8bae507bbd feat(cli): atm validate-calibration — offline color classification gate
Adds `atm validate-calibration LABEL_FILE` subcommand that runs the Detector
on a set of labeled PNG frames and reports per-sample PASS/FAIL with top-3
candidate colors and RGB-distance suggestions for failures. Exits 0 on 100%
PASS, 1 on any FAIL, 2 on missing/malformed label file.

- New module src/atm/validate.py with ValidationReport + SampleRecord
  dataclasses; reuses Detector.step(frame), does not reimplement color
  classification.
- main.py: new `validate-calibration` subparser and _cmd_validate_calibration
  handler wired into the dispatch map.
- samples/calibration_labels.json seeded with 3 entries from the 2026-04-17
  incident, plus a README describing the schema.
- tests/test_validate.py covers the 3 planned cases: PASS, FAIL w/ top-3
  + suggestion, missing file (graceful error, no traceback).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 12:02:48 +03:00
23865776e3 feat(commands): /pause /resume + adaptive dispatch + richer /status
Add two new Telegram commands so the user can manage monitoring without
restarting the process:

- /pause sets lifecycle.user_paused = True. The detection loop then
  short-circuits via _should_skip without touching FSM / canary state.
- /resume clears user_paused. R2 decision: drift-pause is NOT lifted by
  plain /resume (the drift may be legit and require recalibration).
  "/resume force" (value=1) also calls canary.resume(). The response
  message adapts to context:
    - drift active + plain resume → explains force requirement
    - force + drift → confirms override, warns about recurrence
    - out-of-window → explains monitor will resume at next open
    - otherwise → plain "Monitorizare reluată"
- /status now shows "Activ: <pause_reason | activ>" and window state.

commands.py: extend CommandAction literal and _parse_command to accept
pause, resume, and "resume force" (value=1 signal).

Tests: test_commands.py parse coverage;
test_pause_command_sets_user_paused_and_skips_detection,
test_resume_clears_user_paused_and_canary_when_forced,
test_resume_during_drift_keeps_canary_paused_without_force (R2 #21),
test_resume_out_of_window_responds_with_pending_message,
test_status_command_reports_pause_reason,
test_lifecycle_with_drift_then_resume_then_fire (E2E #16).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 12:01:19 +03:00
54f55752c1 feat(run,config): operating hours window + timezone-aware lifecycle state
Add OperatingHoursCfg (enabled/timezone/weekdays/start_hhmm/stop_hhmm) so
the run loop can align with NYSE session hours instead of the user's
local wall clock (fixes DST drift between NY and Europe/Bucharest).

- Config parses [options.operating_hours] and resolves ZoneInfo at load,
  fail-fast on invalid tz or weekday names. The tz is cached on
  _tz_cache so the detection loop pays zero per-tick cost.
- LifecycleState tracks user_paused + last_window_state across ticks.
- Module-scope _should_skip(now, state, cfg, canary) returns skip reason
  or None. Weekday check uses datetime.weekday() + a fixed MON..SUN list
  (locale-free; strftime('%a') is localized).
- _maybe_log_transition emits market_open / market_closed once per edge.
  R2: when last_window_state is None (startup), just seed — do not send
  a spurious market_open alert when run_live_async launches in-window.
- _run_tick consults the lifecycle guard before scheduling the heavy
  detection thread, so drain + transition logging still happen when the
  tick is skipped.
- CLI flags --tz / --weekdays / --oh-start / --oh-stop override TOML.
  (Kept distinct from the existing --start-at/--stop-at sleep-until-time
  semantics to avoid breaking current deployments — deviation noted.)
- configs/example.toml documents the new [options.operating_hours] table.

Tests: parametrized window matrix (tests #8), transition logging (#9),
notification side-effect (#10), R2 #20 startup suppression, R2 #22
locale-independent weekday, plus guards for user_paused / canary
precedence and config-parse error paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 11:59:22 +03:00
8b53b8d3c9 feat(alerts): fire_on_phase_skip backstop + public FSM lockout API
When the FSM observes ARMED → light_{green,red} directly (the dark
prime was missed), the color classifier likely mis-labeled the dark
phase as gray/background. Missing a fire is worse than a noisy alert,
so the new [options.alerts] fire_on_phase_skip flag (default True)
emits a phase_skip_fire alert on that transition with the standard
240s lockout to dedupe detector bounces.

Adds public StateMachine.is_locked / record_fire so the handler does
not reach into private attrs. _handle_tick now accepts an optional
cfg to read the flag; falls back to True if cfg is absent (tests).

Config gains AlertBehaviorCfg (new alerts field), parsed from
[options.alerts]. Example TOML updated with an explanatory comment.

Tests: test_phase_skip_fire_when_flag_on,
test_phase_skip_no_fire_when_flag_off,
test_phase_skip_lockout_suppresses_spam,
test_state_machine_is_locked_and_record_fire_public_api.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 11:55:39 +03:00
9cf49caf8a feat(canary): single-shot on_pause_callback + wire Telegram drift alert
Canary auto-pause was silent: when drift > threshold the module flipped
to paused without any user-facing notification, leaving the user to
wonder why detection went dark. Add an optional on_pause_callback
invoked exactly once per not_paused→paused transition. Wrap the call
in try/except so a notifier failure can never break the detection
cycle.

main.py wires the callback to emit canary_drift_paused audit event
plus a warn Alert guiding the user toward /resume or recalibration.

Tests: test_canary_pause_callback_fires_once (idempotent),
test_canary_resume_allows_new_pause_notification (re-arms after
resume), test_canary_pause_callback_exception_does_not_crash_check
(safety).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 11:53:22 +03:00
c5024ce600 feat(run): extract detection loop helpers + unconditional cmd drain
Refactor _detection_loop by moving _run_tick, _handle_fsm_result,
_dispatch_command, and _drain_cmd_queue to module scope, passing
dependencies via a RunContext dataclass. This unblocks direct unit
testing of the drain path.

CRITICAL bug fix: the previous loop issued `continue` when the tick
returned res=None (canary paused or similar), which skipped the
drain block. Commands piled up in cmd_queue while detection was
paused — the hang observed on 2026-04-17 after canary drift-pause.
The refactored loop now runs _drain_cmd_queue UNCONDITIONALLY on
every iteration, after _handle_fsm_result, so pause-state never
starves the command channel.

Tests: test_drain_works_when_canary_paused,
test_drain_works_when_out_of_window,
test_drain_isolates_dispatch_exceptions (exception isolation +
audit/warn wiring).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 11:52:28 +03:00
153196f762 chore(git): track logs dir; ignore runtime state files
Add logs/.gitkeep to track directory structure. Extend .gitignore with
logs/fires, logs/pause.flag, logs/detections/, and configs/current.txt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 10:10:21 +03:00
Claude Agent
3b40aed939 fix(run): isolate command dispatch exceptions from detection loop
Any exception in _dispatch_command (status, ss, etc.) was leaking out of the
asyncio.QueueEmpty try/except, crashing _detection_loop and cancelling the
poller — making the bot permanently unresponsive for the rest of the session.

Separate the queue-empty check from the dispatch into two try blocks.
Dispatch errors now log to audit + print to terminal + send a Telegram warn.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 11:29:42 +00:00
Claude Agent
0f7dd5dc84 fix(deps+tests): move httpx to prod deps; stub Poller+Scheduler in sync test
httpx was in dev deps only, causing ImportError for users doing `pip install -e .`
since atm.commands imports httpx at module level. Moved to main dependencies.

Also stubs TelegramPoller and ScreenshotScheduler in the sync catchup test to
prevent flaky CI failures from attempted real network connections.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 11:00:40 +00:00
Claude Agent
63642e71dd chore(todos): mark integration test done
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:54:24 +00:00
Claude Agent
424437ceaf fix(audit)+test: deadlock fix + lifecycle test + pytest-asyncio
AuditLog deadlock: log() held self._lock and called _open() which called
close() which tried to acquire self._lock again — RLock not needed,
refactored to _close_locked() (called while already holding lock).

pyproject.toml: pytest-asyncio + httpx in dev deps.

test_main.py:
- lifecycle integration test (MUST-HAVE): IDLE→ARMED→PRIMED→auto-poll
  starts→FIRE→auto-poll stops, asserts scheduler event order
- asyncio import for async test marker

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:54:10 +00:00
Claude Agent
ca6e578175 feat(run): async refactor — run_live_async + 7-step shutdown
run_live() is now a thin asyncio.run() wrapper. run_live_async():
- Blocking pipeline (capture→canary→detect→_handle_tick→snapshot) in
  asyncio.to_thread() per decision 1 (_sync_detection_tick function)
- TelegramPoller + ScreenshotScheduler as background asyncio tasks
- asyncio.Queue[Command] for inter-task communication
- Auto-start scheduler on PRIMED, auto-stop on fire/cooled/phase_skip
- 7-step graceful shutdown sequence
- heartbeat_due uses time.monotonic() (prevents immediate-fire regression)
- Status command: FSM state, last detection, uptime, fire count, canary health
- "ss" command: one-shot capture+annotate+send via to_thread
- Price overlay in _save_annotated_frame (dot_pos_abs + canary_ok params)
- test_main.py: ScriptedDetector.step(ts, frame=None) for zero regression

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:37:17 +00:00
Claude Agent
4123b31a22 feat(commands,scheduler): TelegramPoller + ScreenshotScheduler
TelegramPoller: httpx async long-poll, startup drain, chat_id filter,
degrade after 3×401, Command dataclass with minute→second conversion.

ScreenshotScheduler: asyncio task, capture+annotate in to_thread (decisions 9+13),
silent=True on periodic screenshots, explicit constructor params.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:18:08 +00:00
Claude Agent
c1b89ad6a9 feat(config,detector): TelegramCfg polling fields + Detector.step optional frame
TelegramCfg gains allowed_chat_ids (default: [chat_id]), poll_timeout_s=30,
auto_poll_interval_s=180. _from_dict reads from TOML; absent key defaults to
primary chat_id so existing configs need no changes.

Detector.step(ts, frame=None): when frame is provided the capture() call is
skipped — async loop pre-captures once, shares frame between canary+detection.
DetectionResult.dot_pos_abs carries absolute (x,y) for price overlay.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:17:17 +00:00
Claude Agent
fd04fcd5e6 fix(audit): threading.Lock on AuditLog.log + close (P1 bug)
detection thread and async heartbeat call log() concurrently.
Without a lock, two threads can both see today != _current_date
and double-open the file, corrupting the handle.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:16:28 +00:00
Claude Agent
c6714e8d5e feat(notifier): Alert.silent + TelegramNotifier disable_notification
Silent screenshots for periodic auto-poll — Telegram param
disable_notification=True suppresses phone notification sound.
Discord ignores the field (no equivalent).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 10:16:17 +00:00
Claude Agent
238243b1ce chore: add gstack skill routing rules to CLAUDE.md 2026-04-17 08:32:50 +00:00
25 changed files with 2655 additions and 121 deletions

6
.gitignore vendored
View File

@@ -46,14 +46,18 @@ ENV/
# ATM runtime artefacts # ATM runtime artefacts
logs/*.jsonl logs/*.jsonl
logs/dead_letter.jsonl logs/dead_letter.jsonl
logs/detections/
logs/fires
logs/pause.flag
samples/*.png samples/*.png
samples/*.jpg samples/*.jpg
samples/labels.json samples/labels.json
trades.jsonl trades.jsonl
# configs: keep template + current marker, not generated calibration # configs: keep template only; ignore generated calibration and runtime state
configs/*.toml configs/*.toml
!configs/example.toml !configs/example.toml
configs/current.txt
# Claude scheduler state # Claude scheduler state
.claude/ .claude/

35
CLAUDE.md Normal file
View File

@@ -0,0 +1,35 @@
# ATM — Automated Trading Monitor
Personal Faza-1 tool for the M2D strategy. Python 3.11+.
## Quick Reference
```bash
pip install -e ".[windows]" # Windows: live capture
pip install -e . # Linux/macOS: dev/dryrun only
atm calibrate # Tk wizard
atm debug --delay 5 # one-shot capture + detect
atm run --start-at 16:30 --stop-at 23:00 # live session
atm dryrun samples # corpus gate
pytest # run tests
```
## Skill routing
When the user's request matches an available skill, ALWAYS invoke it using the Skill
tool as your FIRST action. Do NOT answer directly, do NOT use other tools first.
The skill has specialized workflows that produce better results than ad-hoc answers.
Key routing rules:
- Product ideas, "is this worth building", brainstorming → invoke office-hours
- Bugs, errors, "why is this broken", 500 errors → invoke investigate
- Ship, deploy, push, create PR → invoke ship
- QA, test the site, find bugs → invoke qa
- Code review, check my diff → invoke review
- Update docs after shipping → invoke document-release
- Weekly retro → invoke retro
- Design system, brand → invoke design-consultation
- Visual audit, design polish → invoke design-review
- Architecture review → invoke plan-eng-review
- Save progress, checkpoint, resume → invoke checkpoint
- Code quality, health check → invoke health

View File

@@ -49,9 +49,17 @@ Read-only web view of today's audit JSONL + recent triggers. Useful for review a
--- ---
## P2-yaxis-recalib-detect — Y-axis recalibration detection
Price overlay (from Telegram commands feature) uses `y_axis` linear interpolation to show current price on screenshots. When the user rescales the chart y-axis (common after overnight price gaps), the calibration becomes stale and prices shown are incorrect. Canary check detects layout drift but NOT y-axis range changes.
- Possible approaches: OCR on y-axis labels (fragile), track price range consistency across sessions, or simple "calibration age" warning after N hours.
- Start after price overlay is live and the false-price frequency is known.
- Depends on: Telegram commands + price overlay feature being shipped.
## Quality debt ## Quality debt
- [ ] **Integration test for run_live loop**: currently mocked at module level. Add a short-duration in-memory loop test that threads real detector/state_machine/audit together (no network). - [x] **Integration test for run_live loop**: lifecycle async test added in `tests/test_main.py` (IDLE→ARMED→PRIMED auto-poll→FIRE auto-stop).
- [ ] **Coverage report**: run `pytest --cov=atm --cov-report=term-missing`, aim for ≥ 85% per module. - [ ] **Coverage report**: run `pytest --cov=atm --cov-report=term-missing`, aim for ≥ 85% per module.
- [ ] **Typing strictness**: run `pyright src/` with strict mode, fix reported issues. - [ ] **Typing strictness**: run `pyright src/` with strict mode, fix reported issues.
- [ ] **Perf baseline**: profile one detection cycle on a representative frame; ensure < 100ms so 5s loop has ample headroom. - [ ] **Perf baseline**: profile one detection cycle on a representative frame; ensure < 100ms so 5s loop has ample headroom.

View File

@@ -81,6 +81,24 @@ low_conf_run = 3
phaseb_timeout_s = 600 phaseb_timeout_s = 600
dead_letter_path = "logs/dead_letter.jsonl" dead_letter_path = "logs/dead_letter.jsonl"
# Alert-behavior toggles (not screenshot-attachment; see attach_screenshots below).
# fire_on_phase_skip: emit a backstop "PHASE SKIP" alert when the FSM observes
# ARMED → light_green/light_red directly (skipping the dark prime). Default on
# because missing a fire is worse than a false-positive phase-skip alert.
[options.alerts]
fire_on_phase_skip = true
# Operating hours — detection only runs on allowed weekdays + HH:MM window.
# Timezone is the source of truth (NYSE local); the runtime converts tick
# timestamps to this zone so DST rollovers stay aligned with the exchange.
# Override from CLI with --tz / --weekdays / --oh-start / --oh-stop.
[options.operating_hours]
enabled = false
timezone = "America/New_York"
weekdays = ["MON", "TUE", "WED", "THU", "FRI"]
start_hhmm = "09:30"
stop_hhmm = "16:00"
# Per-kind screenshot-attach toggles. All default to true on upgrade. # Per-kind screenshot-attach toggles. All default to true on upgrade.
# Accepts either a bare bool (legacy: attach_screenshots = true) or this table. # Accepts either a bare bool (legacy: attach_screenshots = true) or this table.
[options.attach_screenshots] [options.attach_screenshots]

0
logs/.gitkeep Normal file
View File

View File

@@ -13,6 +13,7 @@ dependencies = [
"pillow>=10.0", "pillow>=10.0",
"requests>=2.31", "requests>=2.31",
"rich>=13.0", "rich>=13.0",
"httpx>=0.27",
] ]
[project.optional-dependencies] [project.optional-dependencies]
@@ -24,6 +25,7 @@ windows = [
dev = [ dev = [
"pytest>=8.0", "pytest>=8.0",
"pytest-cov>=5.0", "pytest-cov>=5.0",
"pytest-asyncio>=0.23",
] ]
[project.scripts] [project.scripts]

View File

@@ -0,0 +1,33 @@
# calibration_labels.json — schema
Used by `atm validate-calibration` to check that the current color calibration
classifies known-good screenshots correctly before a live session.
## Schema
A JSON array of entries. Each entry:
| Field | Type | Required | Description |
|------------|---------|----------|----------------------------------------------------------------|
| `path` | string | yes | Path to a PNG frame (relative to CWD or absolute). |
| `expected` | string | yes | Expected color name: one of `turquoise`, `yellow`, `dark_green`, `dark_red`, `light_green`, `light_red`, `gray`. |
| `note` | string | no | Freeform annotation; shown in SUGGESTIONS output. |
## Usage
```bash
atm validate-calibration samples/calibration_labels.json
```
Exit codes:
- `0` — every sample PASS
- `1` — one or more FAIL
- `2` — label file missing or malformed JSON
## Adding new samples
1. Find a screenshot in `logs/fires/` whose dot color you can verify by eye.
2. Append an entry with `path`, `expected`, and an optional `note`.
3. Re-run validation. If it FAILs, the SUGGESTIONS section will tell you the
RGB distance between the observed pixel and the expected color's center —
use that as input for `atm calibrate`.

View File

@@ -0,0 +1,17 @@
[
{
"path": "logs/fires/20260417_201500_arm_sell.png",
"expected": "yellow",
"note": "first arm of SELL cycle 2026-04-17"
},
{
"path": "logs/fires/20260417_205302_ss.png",
"expected": "dark_red",
"note": "user confirmed via screenshot (missed live alert)"
},
{
"path": "logs/fires/20260417_210441_ss.png",
"expected": "light_red",
"note": "fire phase (missed live alert)"
}
]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
import threading
from datetime import datetime, date from datetime import datetime, date
from pathlib import Path from pathlib import Path
from typing import Callable, IO from typing import Callable, IO
@@ -16,21 +17,25 @@ class AuditLog:
self._clock: Callable[[], datetime] = clock or datetime.now self._clock: Callable[[], datetime] = clock or datetime.now
self._current_date: date | None = None self._current_date: date | None = None
self._fh: IO[str] | None = None self._fh: IO[str] | None = None
self._lock = threading.Lock()
def log(self, event: dict) -> None: def log(self, event: dict) -> None:
now = self._clock() now = self._clock()
today = now.date() today = now.date()
if today != self._current_date:
self._open(today)
if "ts" not in event: if "ts" not in event:
event = {**event, "ts": now.isoformat()} event = {**event, "ts": now.isoformat()}
with self._lock:
assert self._fh is not None if today != self._current_date:
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n") self._open(today)
assert self._fh is not None
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
def close(self) -> None: def close(self) -> None:
with self._lock:
self._close_locked()
def _close_locked(self) -> None:
"""Close file handle; must be called while holding self._lock."""
if self._fh is not None: if self._fh is not None:
try: try:
self._fh.close() self._fh.close()
@@ -47,7 +52,7 @@ class AuditLog:
return self._base_dir / f"{self._current_date}.jsonl" return self._base_dir / f"{self._current_date}.jsonl"
def _open(self, today: date) -> None: def _open(self, today: date) -> None:
self.close() self._close_locked() # already holding self._lock
self._base_dir.mkdir(parents=True, exist_ok=True) self._base_dir.mkdir(parents=True, exist_ok=True)
path = self._base_dir / f"{today}.jsonl" path = self._base_dir / f"{today}.jsonl"
self._fh = open(path, "a", buffering=1, encoding="utf-8") self._fh = open(path, "a", buffering=1, encoding="utf-8")

View File

@@ -1,14 +1,18 @@
"""Layout drift detector via perceptual hash comparison.""" """Layout drift detector via perceptual hash comparison."""
from __future__ import annotations from __future__ import annotations
import logging
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Callable
import numpy as np import numpy as np
from .config import Config from .config import Config
from .vision import crop_roi, hamming_hex, phash from .vision import crop_roi, hamming_hex, phash
logger = logging.getLogger(__name__)
@dataclass @dataclass
class CanaryResult: class CanaryResult:
@@ -28,10 +32,15 @@ class Canary:
self, self,
cfg: Config, cfg: Config,
pause_flag_path: Path | None = None, pause_flag_path: Path | None = None,
on_pause_callback: Callable[[int], None] | None = None,
) -> None: ) -> None:
self._cfg = cfg self._cfg = cfg
self._pause_flag_path = pause_flag_path self._pause_flag_path = pause_flag_path
self._paused = False self._paused = False
# Single-shot callback invoked exactly once per not_paused→paused transition.
# Wrapped in try/except at call site so a faulty notifier never breaks
# the detection cycle.
self._on_pause = on_pause_callback
def check(self, frame_bgr: np.ndarray) -> CanaryResult: def check(self, frame_bgr: np.ndarray) -> CanaryResult:
roi_img = crop_roi(frame_bgr, self._cfg.canary.roi) roi_img = crop_roi(frame_bgr, self._cfg.canary.roi)
@@ -43,6 +52,12 @@ class Canary:
self._paused = True self._paused = True
if self._pause_flag_path is not None: if self._pause_flag_path is not None:
self._pause_flag_path.write_text("paused", encoding="utf-8") self._pause_flag_path.write_text("paused", encoding="utf-8")
if self._on_pause is not None:
try:
self._on_pause(distance)
except Exception as exc:
# Never let a notifier hiccup abort the detection cycle.
logger.warning("canary on_pause_callback raised: %s", exc)
return CanaryResult(distance=distance, drifted=drifted, paused=self._paused) return CanaryResult(distance=distance, drifted=drifted, paused=self._paused)

170
src/atm/commands.py Normal file
View File

@@ -0,0 +1,170 @@
"""Telegram command poller + Command dataclass.
Uses httpx (async) for long-polling getUpdates. The sync TelegramNotifier
continues to use requests — this module is the only httpx consumer.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal
import httpx
if TYPE_CHECKING:
from .config import TelegramCfg
logger = logging.getLogger(__name__)
CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume"]
_BASE = "https://api.telegram.org/bot{token}/{method}"
@dataclass
class Command:
action: CommandAction
value: int | None = None # seconds; only for set_interval
class TelegramPoller:
"""Long-poll Telegram getUpdates, emit Commands into asyncio.Queue.
Security: rejects messages from chat_ids not in cfg.allowed_chat_ids.
Degrades (stops polling) after 3 consecutive 401 responses and warns
via Discord (caller responsibility — poller only logs + sets degraded flag).
"""
def __init__(
self,
cfg: TelegramCfg,
cmd_queue: asyncio.Queue[Command],
audit, # _AuditLike
) -> None:
self._cfg = cfg
self._cmd_queue = cmd_queue
self._audit = audit
self._offset = 0
self._consecutive_401 = 0
self._degraded = False
# fallback: if allowed_chat_ids is empty, accept only the primary chat
self._allowed = set(cfg.allowed_chat_ids) or {cfg.chat_id}
@property
def degraded(self) -> bool:
return self._degraded
async def run(self) -> None:
async with httpx.AsyncClient() as client:
await self._drain(client)
while True:
if self._degraded:
await asyncio.sleep(5)
continue
try:
await self._poll_once(client)
except asyncio.CancelledError:
raise
except (httpx.HTTPError, httpx.TimeoutException) as exc:
self._audit.log({"event": "poller_error", "error": str(exc)})
await asyncio.sleep(5)
except Exception as exc: # json, unexpected
self._audit.log({"event": "poller_error", "error": str(exc)})
await asyncio.sleep(5)
async def _drain(self, client: httpx.AsyncClient) -> None:
"""Discard all pending updates at startup so stale commands don't replay."""
try:
resp = await client.get(
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
params={"timeout": 0, "offset": self._offset},
timeout=10,
)
body = resp.json()
if body.get("ok") and body.get("result"):
self._offset = body["result"][-1]["update_id"] + 1
except Exception as exc:
logger.warning("TelegramPoller startup drain failed: %s", exc)
async def _poll_once(self, client: httpx.AsyncClient) -> None:
resp = await client.get(
_BASE.format(token=self._cfg.bot_token, method="getUpdates"),
params={"timeout": self._cfg.poll_timeout_s, "offset": self._offset},
timeout=self._cfg.poll_timeout_s + 5,
)
if resp.status_code == 401:
self._consecutive_401 += 1
if self._consecutive_401 >= 3:
self._degraded = True
self._audit.log({"event": "poller_degraded", "reason": "3_consecutive_401"})
return
self._consecutive_401 = 0
body = resp.json()
if not body.get("ok"):
return
for update in body.get("result", []):
self._offset = update["update_id"] + 1
await self._process_update(update)
async def _process_update(self, update: dict) -> None:
if "callback_query" in update:
# Inline button pressed — may be expired; reply with fallback
cbq = update["callback_query"]
chat_id = str(cbq.get("from", {}).get("id", ""))
if chat_id not in self._allowed:
logger.info("Rejected callback_query from chat_id=%s", chat_id)
return
# Caller handles answerCallbackQuery; just note in audit
self._audit.log({"event": "command_received", "action": "callback_query", "chat_id": chat_id})
return
msg = update.get("message") or update.get("edited_message")
if not msg:
return
chat_id = str(msg.get("chat", {}).get("id", ""))
if chat_id not in self._allowed:
logger.info("Rejected message from chat_id=%s", chat_id)
return
text = (msg.get("text") or "").strip().lower()
cmd = self._parse_command(text)
if cmd is None:
return
self._audit.log({
"event": "command_received",
"action": cmd.action,
"value": cmd.value,
"chat_id": chat_id,
})
await self._cmd_queue.put(cmd)
def _parse_command(self, text: str) -> Command | None:
t = text.lstrip("/").strip()
if not t:
return None
if t == "stop":
return Command(action="stop")
if t == "status":
return Command(action="status")
if t in ("ss", "screenshot"):
return Command(action="ss")
if t == "pause":
return Command(action="pause")
if t == "resume":
return Command(action="resume")
if t == "resume force":
# value=1 signals force: also lift canary drift-pause, not just user pause.
return Command(action="resume", value=1)
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
parts = t.split()
if len(parts) == 1 and parts[0].isdigit():
return Command(action="set_interval", value=int(parts[0]) * 60)
if len(parts) == 2 and parts[0] in ("interval", "set_interval") and parts[1].isdigit():
return Command(action="set_interval", value=int(parts[1]) * 60)
return None

View File

@@ -5,6 +5,9 @@ import tomllib
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Literal from typing import Literal
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
_VALID_WEEKDAYS: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
DotColor = Literal[ DotColor = Literal[
"turquoise", "yellow", "turquoise", "yellow",
@@ -78,6 +81,9 @@ class DiscordCfg:
class TelegramCfg: class TelegramCfg:
bot_token: str bot_token: str
chat_id: str chat_id: str
allowed_chat_ids: tuple[str, ...] = ()
poll_timeout_s: int = 30
auto_poll_interval_s: int = 180
def __post_init__(self) -> None: def __post_init__(self) -> None:
if not self.bot_token or not self.chat_id: if not self.bot_token or not self.chat_id:
@@ -94,6 +100,43 @@ class AlertsCfg:
trigger: bool = True trigger: bool = True
@dataclass
class OperatingHoursCfg:
"""Session window: only run detection on allowed weekdays within HH:MM range.
Timezone is the source of truth for the exchange (default America/New_York
for NYSE). Start/stop are compared against the clock in that timezone.
Weekday check uses datetime.weekday() + a fixed MON..SUN list to stay
locale-independent (strftime('%a') returns localized names).
The ZoneInfo is cached at config load time so the detection loop doesn't
pay per-tick lookup cost.
NOTE: this dataclass is mutable (non-frozen) so Config._from_dict can stash
the resolved ZoneInfo onto `_tz_cache` after validation. Treat fields as
read-only at runtime.
"""
enabled: bool = False
timezone: str = "America/New_York"
weekdays: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI")
start_hhmm: str = "09:30"
stop_hhmm: str = "16:00"
# Populated by Config._from_dict; None for disabled or failed-load cases.
_tz_cache: ZoneInfo | None = None
@dataclass(frozen=True)
class AlertBehaviorCfg:
"""Alert behavior knobs (not screenshot toggles).
`fire_on_phase_skip`: backstop alert when FSM observes ARMED→light_{green,red}
directly (skipping the dark prime phase — often means dark color was
mis-classified as gray). Default True: missing a fire is worse than a noisy
phase-skip alert. Disable via `[options.alerts] fire_on_phase_skip = false`.
"""
fire_on_phase_skip: bool = True
@dataclass(frozen=True) @dataclass(frozen=True)
class Config: class Config:
window_title: str window_title: str
@@ -114,6 +157,8 @@ class Config:
phaseb_timeout_s: int = 600 phaseb_timeout_s: int = 600
dead_letter_path: str = "logs/dead_letter.jsonl" dead_letter_path: str = "logs/dead_letter.jsonl"
attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg) attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg)
alerts: AlertBehaviorCfg = field(default_factory=AlertBehaviorCfg)
operating_hours: OperatingHoursCfg = field(default_factory=OperatingHoursCfg)
config_version: str = "unknown" config_version: str = "unknown"
def __post_init__(self) -> None: def __post_init__(self) -> None:
@@ -156,9 +201,14 @@ class Config:
drift_threshold=int(data["canary"].get("drift_threshold", 8)), drift_threshold=int(data["canary"].get("drift_threshold", 8)),
) )
discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"]) discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"])
tg = data["telegram"]
_allowed = [str(c) for c in tg.get("allowed_chat_ids", [])] or [str(tg["chat_id"])]
telegram = TelegramCfg( telegram = TelegramCfg(
bot_token=data["telegram"]["bot_token"], bot_token=tg["bot_token"],
chat_id=str(data["telegram"]["chat_id"]), chat_id=str(tg["chat_id"]),
allowed_chat_ids=tuple(_allowed),
poll_timeout_s=int(tg.get("poll_timeout_s", 30)),
auto_poll_interval_s=int(tg.get("auto_poll_interval_s", 180)),
) )
opts = data.get("options", {}) opts = data.get("options", {})
region = None region = None
@@ -176,6 +226,36 @@ class Config:
) )
else: else:
attach = AlertsCfg() attach = AlertsCfg()
alerts_dict = opts.get("alerts", {}) or {}
alert_behavior = AlertBehaviorCfg(
fire_on_phase_skip=bool(alerts_dict.get("fire_on_phase_skip", True)),
)
oh_dict = opts.get("operating_hours", {}) or {}
oh_weekdays = tuple(
str(w).upper() for w in oh_dict.get("weekdays", ("MON", "TUE", "WED", "THU", "FRI"))
)
for wd in oh_weekdays:
if wd not in _VALID_WEEKDAYS:
raise ValueError(
f"operating_hours.weekdays contains invalid day {wd!r}; "
f"expected any of {_VALID_WEEKDAYS}"
)
oh = OperatingHoursCfg(
enabled=bool(oh_dict.get("enabled", False)),
timezone=str(oh_dict.get("timezone", "America/New_York")),
weekdays=oh_weekdays,
start_hhmm=str(oh_dict.get("start_hhmm", "09:30")),
stop_hhmm=str(oh_dict.get("stop_hhmm", "16:00")),
)
if oh.enabled:
try:
oh._tz_cache = ZoneInfo(oh.timezone)
except ZoneInfoNotFoundError as exc:
raise ValueError(
f"operating_hours.timezone {oh.timezone!r} invalid: {exc}"
) from exc
return cls( return cls(
window_title=data["window_title"], window_title=data["window_title"],
dot_roi=roi, dot_roi=roi,
@@ -195,5 +275,7 @@ class Config:
phaseb_timeout_s=int(opts.get("phaseb_timeout_s", 600)), phaseb_timeout_s=int(opts.get("phaseb_timeout_s", 600)),
dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"), dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"),
attach_screenshots=attach, attach_screenshots=attach,
alerts=alert_behavior,
operating_hours=oh,
config_version=version, config_version=version,
) )

View File

@@ -28,6 +28,7 @@ class DetectionResult:
match: ColorMatch | None # None if no dot match: ColorMatch | None # None if no dot
accepted: bool # post-debounce; True only when match repeats debounce_depth times accepted: bool # post-debounce; True only when match repeats debounce_depth times
color: str | None # accepted color name (UNKNOWN excluded) color: str | None # accepted color name (UNKNOWN excluded)
dot_pos_abs: tuple[int, int] | None = None # absolute (x, y) in frame; set when dot_found
class Detector: class Detector:
@@ -60,8 +61,14 @@ class Detector:
self._debounce: deque[str | None] = deque(maxlen=cfg.debounce_depth) self._debounce: deque[str | None] = deque(maxlen=cfg.debounce_depth)
self._rolling: deque[DetectionResult] = deque(maxlen=20) self._rolling: deque[DetectionResult] = deque(maxlen=20)
def step(self, ts: float) -> DetectionResult: def step(self, ts: float, frame=None) -> DetectionResult:
frame = self._capture() """Run one detection tick.
frame: pre-captured BGR ndarray (from asyncio.to_thread capture). When
None (default), calls self._capture() — preserving the sync-loop behaviour.
"""
if frame is None:
frame = self._capture()
if frame is None: if frame is None:
self._debounce.append(None) self._debounce.append(None)
@@ -117,6 +124,7 @@ class Detector:
match=match, match=match,
accepted=accepted, accepted=accepted,
color=color, color=color,
dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y),
) )
self._rolling.append(r) self._rolling.append(r)
return r return r

View File

@@ -2,12 +2,15 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import asyncio
import contextlib
import os import os
import sys import sys
import time import time
from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Callable, Protocol, cast from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
from atm.config import Config # stdlib-only (tomllib); safe at module level from atm.config import Config # stdlib-only (tomllib); safe at module level
from atm.notifier import Alert from atm.notifier import Alert
@@ -89,6 +92,23 @@ def main(argv=None) -> None:
help="Stop at local HH:MM (overrides --duration). If the time is in " help="Stop at local HH:MM (overrides --duration). If the time is in "
"the past when the loop starts, rolls over to tomorrow.", "the past when the loop starts, rolls over to tomorrow.",
) )
p_run.add_argument(
"--tz", metavar="ZONE", default=None,
help="Override operating_hours.timezone (e.g. America/New_York).",
)
p_run.add_argument(
"--weekdays", metavar="DAYS", default=None,
help="Override operating_hours.weekdays. Accepts comma list "
"(MON,TUE) or range (MON-FRI).",
)
p_run.add_argument(
"--oh-start", metavar="HH:MM", default=None,
help="Override operating_hours.start_hhmm (exchange-local).",
)
p_run.add_argument(
"--oh-stop", metavar="HH:MM", default=None,
help="Override operating_hours.stop_hhmm (exchange-local).",
)
# journal # journal
p_journal = sub.add_parser("journal", help="Add a trade journal entry interactively") p_journal = sub.add_parser("journal", help="Add a trade journal entry interactively")
@@ -115,6 +135,16 @@ def main(argv=None) -> None:
metavar="PATH", help="Journal JSONL file (default: trades.jsonl)", metavar="PATH", help="Journal JSONL file (default: trades.jsonl)",
) )
# validate-calibration
p_valid = sub.add_parser(
"validate-calibration",
help="Offline: run Detector on labeled frames and report PASS/FAIL",
)
p_valid.add_argument(
"label_file", type=Path, metavar="LABEL_FILE",
help="JSON array with [{path, expected, note?}, ...] entries",
)
args = parser.parse_args(argv) args = parser.parse_args(argv)
_dispatch = { _dispatch = {
@@ -125,6 +155,7 @@ def main(argv=None) -> None:
"debug": _cmd_debug, "debug": _cmd_debug,
"journal": _cmd_journal, "journal": _cmd_journal,
"report": _cmd_report, "report": _cmd_report,
"validate-calibration": _cmd_validate_calibration,
} }
_dispatch[args.command](args) _dispatch[args.command](args)
@@ -168,6 +199,7 @@ def _cmd_dryrun(args) -> None:
def _cmd_run(args) -> None: def _cmd_run(args) -> None:
cfg = Config.load_current(Path("configs")) cfg = Config.load_current(Path("configs"))
cfg = _apply_operating_hours_cli_overrides(cfg, args)
capture_stub = args.capture_stub or bool(os.environ.get("ATM_STUB_CAPTURE")) capture_stub = args.capture_stub or bool(os.environ.get("ATM_STUB_CAPTURE"))
# --start-at HH:MM: sleep until the next occurrence of that local wall-clock time # --start-at HH:MM: sleep until the next occurrence of that local wall-clock time
@@ -227,6 +259,66 @@ def _cmd_run(args) -> None:
run_live(cfg, duration_s=duration_s, capture_stub=capture_stub) run_live(cfg, duration_s=duration_s, capture_stub=capture_stub)
_WEEKDAY_ORDER = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
def _parse_weekdays_arg(raw: str) -> tuple[str, ...]:
"""Accept 'MON,TUE,WED' or 'MON-FRI'. Case-insensitive."""
txt = raw.strip().upper()
if "-" in txt and "," not in txt:
a, b = (p.strip() for p in txt.split("-", 1))
if a not in _WEEKDAY_ORDER or b not in _WEEKDAY_ORDER:
raise ValueError(f"unknown weekday(s) in range {raw!r}")
i, j = _WEEKDAY_ORDER.index(a), _WEEKDAY_ORDER.index(b)
if i > j:
raise ValueError(f"weekday range reversed: {raw!r}")
return tuple(_WEEKDAY_ORDER[i : j + 1])
days = tuple(d.strip() for d in txt.split(",") if d.strip())
for d in days:
if d not in _WEEKDAY_ORDER:
raise ValueError(f"unknown weekday {d!r} (valid: {_WEEKDAY_ORDER})")
return days
def _apply_operating_hours_cli_overrides(cfg, args):
"""Return cfg (possibly new) with operating_hours overridden by CLI flags.
Config is a frozen dataclass, but operating_hours is non-frozen by design
so we can tweak it in-place and recompute the tz cache. CLI flags implicitly
enable operating_hours even if the TOML had it disabled.
"""
import dataclasses as _dc
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
oh = cfg.operating_hours
any_override = any(
getattr(args, k, None)
for k in ("tz", "weekdays", "oh_start", "oh_stop")
)
if not any_override:
return cfg
new_tz = args.tz if args.tz else oh.timezone
try:
tz_cache = ZoneInfo(new_tz)
except ZoneInfoNotFoundError as exc:
sys.exit(f"--tz {new_tz!r} invalid: {exc}")
new_weekdays = _parse_weekdays_arg(args.weekdays) if args.weekdays else oh.weekdays
new_start = args.oh_start if args.oh_start else oh.start_hhmm
new_stop = args.oh_stop if args.oh_stop else oh.stop_hhmm
oh.enabled = True
oh.timezone = new_tz
oh.weekdays = new_weekdays
oh.start_hhmm = new_start
oh.stop_hhmm = new_stop
oh._tz_cache = tz_cache
# Config is frozen but operating_hours is a mutable field object —
# mutating it in place is sufficient; no dataclasses.replace needed.
_ = _dc # keep import for future use
return cfg
def _cmd_journal(args) -> None: def _cmd_journal(args) -> None:
try: try:
from atm.journal import Journal, prompt_entry from atm.journal import Journal, prompt_entry
@@ -337,6 +429,37 @@ def _cmd_report(args) -> None:
) )
def _cmd_validate_calibration(args) -> None:
"""Run offline calibration validation; exit 0 on 100% PASS, 1 otherwise."""
try:
from atm.validate import validate_calibration, ValidationError
except ImportError as exc:
sys.exit(f"validate module not available: {exc}")
label_file = Path(args.label_file)
try:
cfg = Config.load_current(Path("configs"))
except FileNotFoundError as exc:
sys.exit(f"config not found: {exc}")
try:
config_name = ""
cur_ptr = Path("configs") / "current.txt"
if cur_ptr.exists():
config_name = cur_ptr.read_text(encoding="utf-8").strip()
except Exception:
config_name = ""
try:
report = validate_calibration(label_file, cfg, config_name=config_name)
except ValidationError as exc:
print(f"error: {exc}", file=sys.stderr)
sys.exit(2)
print(report.render())
sys.exit(0 if report.all_pass else 1)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Live loop # Live loop
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -348,6 +471,8 @@ def _save_annotated_frame(
label: str, label: str,
now: float, now: float,
audit: _AuditLike | None = None, audit: _AuditLike | None = None,
dot_pos_abs: "tuple[int, int] | None" = None,
canary_ok: bool = True,
) -> "Path | None": ) -> "Path | None":
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``. """Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
@@ -355,6 +480,10 @@ def _save_annotated_frame(
audit (when provided) so disk-full / permission issues don't become silent audit (when provided) so disk-full / permission issues don't become silent
regressions. Never raises — snapshot is a best-effort enhancement, the regressions. Never raises — snapshot is a best-effort enhancement, the
text alert must still go out. text alert must still go out.
dot_pos_abs + canary_ok: when both are set the price overlay is drawn
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted
since calibration may be stale.
""" """
try: try:
import cv2 # type: ignore[import-untyped] import cv2 # type: ignore[import-untyped]
@@ -371,6 +500,22 @@ def _save_annotated_frame(
annotated = frame.copy() annotated = frame.copy()
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
try:
_, dot_y = dot_pos_abs
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass # price overlay is best-effort; never break the screenshot
cv2.imwrite(str(fpath), annotated) cv2.imwrite(str(fpath), annotated)
return fpath return fpath
except Exception as exc: except Exception as exc:
@@ -390,6 +535,7 @@ def _handle_tick(
audit: _AuditLike, audit: _AuditLike,
first_accepted: bool, first_accepted: bool,
snapshot: Snapshot | None = None, snapshot: Snapshot | None = None,
cfg: Any = None,
) -> Transition | None: ) -> Transition | None:
"""Feed FSM for a single accepted color and dispatch arm/prime/late_start """Feed FSM for a single accepted color and dispatch arm/prime/late_start
alerts. Returns the final Transition, or None when the color triggered a alerts. Returns the final Transition, or None when the color triggered a
@@ -493,10 +639,464 @@ def _handle_tick(
image_path=snap(prime_kind, prime_label), image_path=snap(prime_kind, prime_label),
direction=direction, direction=direction,
)) ))
# PHASE_SKIP fire backstop: ARMED→light_{green,red} directly (dark was missed).
# Emits a fire-equivalent alert when cfg.alerts.fire_on_phase_skip (default True).
# Uses public FSM lockout API (is_locked/record_fire) to reuse the standard
# 240s dedupe window so bouncing detectors do not spam the user.
elif tr.reason == "phase_skip" and color in ("light_green", "light_red"):
flag_on = True
if cfg is not None:
alerts_cfg = getattr(cfg, "alerts", None)
if alerts_cfg is not None:
flag_on = bool(getattr(alerts_cfg, "fire_on_phase_skip", True))
if flag_on:
direction = "BUY" if color == "light_green" else "SELL"
if not fsm.is_locked(direction, now):
fsm.record_fire(direction, now)
dark_name = "dark_green" if direction == "BUY" else "dark_red"
notifier.send(Alert(
kind="phase_skip_fire",
title=f"PHASE SKIP {direction}{dark_name} nu a fost detectat",
body=(
"Verifică chart-ul manual. Posibil necalibrare culoare "
f"(observat {color} direct după armare)."
),
image_path=snap("phase_skip", f"phase_skip_{direction.lower()}"),
direction=direction,
))
return tr return tr
@dataclass
class _TickSyncResult:
frame: Any = None
res: Any = None # DetectionResult | None
tr: Any = None # Transition | None
first_consumed: bool = False
late_start: bool = False
new_color: str | None = None # corpus sample color when changed
@dataclass
class RunContext:
"""Dependencies passed to module-scope detection-loop helpers.
Keeps `_run_tick`, `_handle_fsm_result`, `_drain_cmd_queue`, and
`_dispatch_command` at module scope so they are directly unit-testable
without reconstructing `run_live_async`.
"""
cfg: Any
capture: Callable
canary: Any
detector: Any
fsm: Any
notifier: _NotifierLike
audit: _AuditLike
detection_log: _AuditLike
scheduler: Any
samples_dir: Path
fires_dir: Path
cmd_queue: Any # asyncio.Queue[Command]
state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start
levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now)
lifecycle: Any = None # LifecycleState — window + user_paused tracking
@dataclass
class _LoopState:
"""Per-loop mutable state (previously closure nonlocals)."""
first_accepted: bool = True
last_saved_color: str | None = None
levels_extractor: Any = None
fire_count: int = 0
start: float = 0.0
@dataclass
class LifecycleState:
"""Tracks user-pause / out-of-window state across detection ticks.
last_window_state: None at startup so _maybe_log_transition can seed it
without emitting a spurious market_open alert on the first in-window tick.
"""
user_paused: bool = False
last_window_state: str | None = None # "open" / "closed" / None (uninitialized)
# Locale-independent weekday names; index matches datetime.weekday() (MON=0).
_WEEKDAY_NAMES: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None:
"""Return a reason string if detection should be skipped, else None.
Order: user_paused > canary drift > operating-hours window. Uses the
ZoneInfo cached on cfg.operating_hours._tz_cache (populated at config load)
to avoid per-tick tz lookup cost.
"""
if state.user_paused:
return "user_paused"
if getattr(canary, "is_paused", False):
return "drift_paused"
oh = getattr(cfg, "operating_hours", None)
if oh is None or not oh.enabled:
return None
tz = getattr(oh, "_tz_cache", None)
if tz is None:
# Enabled but no tz resolved — skip the check rather than crash mid-loop.
return None
now_exchange = datetime.fromtimestamp(now_ts, tz=tz)
# weekday() = 0..6 (MON..SUN). Locale-free; strftime('%a') is not.
if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays:
return "out_of_window_weekend"
hhmm = now_exchange.strftime("%H:%M")
if hhmm < oh.start_hhmm or hhmm >= oh.stop_hhmm:
return "out_of_window_hours"
return None
def _maybe_log_transition(
reason: str | None,
state: LifecycleState,
now: float,
audit: _AuditLike,
notifier: _NotifierLike,
) -> None:
"""Log market_open / market_closed exactly once per transition.
Startup guard (R2): when last_window_state is None we just seed it; no
alert/audit event is emitted for the initial evaluation. This prevents a
spurious market_open alert when run_live_async starts in-window.
"""
if reason is None:
window_reason = "open"
elif reason.startswith("out_of_window"):
window_reason = "closed"
else:
# user_paused / drift_paused don't change market window state
return
if window_reason == state.last_window_state:
return
if state.last_window_state is None:
state.last_window_state = window_reason
return
event_name = "market_open" if window_reason == "open" else "market_closed"
audit.log({"ts": now, "event": event_name, "reason": reason})
body = (
"Piața închisă — monitorizare pauzată până la următoarea deschidere"
if event_name == "market_closed"
else "Piața deschisă — monitorizare reluată"
)
notifier.send(Alert(
kind="status",
title=event_name.replace("_", " ").title(),
body=body,
))
state.last_window_state = window_reason
def _sync_detection_tick(
capture: Callable,
canary: Any,
cfg: Any,
detector: Any,
fsm: Any,
notifier: _NotifierLike,
audit: _AuditLike,
detection_log: _AuditLike,
fires_dir: Path,
first_accepted: bool,
last_saved_color: "str | None",
now: float,
samples_dir: Path,
) -> _TickSyncResult:
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread."""
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
return _TickSyncResult()
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
return _TickSyncResult(frame=frame)
res = detector.step(now, frame)
detection_log.log({
"ts": now, "event": "frame",
"window_found": res.window_found,
"dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None,
"match_name": res.match.name if res.match is not None else None,
"distance": round(res.match.distance, 2) if res.match is not None else None,
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
"accepted": res.accepted,
"color": res.color,
})
if not (res.accepted and res.color):
return _TickSyncResult(frame=frame, res=res)
is_first = first_accepted
def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True):
return None
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
)
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot, cfg=cfg)
if tr is None:
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True)
new_color: str | None = None
if res.color != last_saved_color:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
try:
import cv2 # type: ignore[import-untyped]
cv2.imwrite(str(sample_path), frame)
except Exception as exc:
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
new_color = res.color
if tr.trigger and not tr.locked:
fire_path: "Path | None" = None
if cfg.attach_screenshots.trigger:
fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
)
notifier.send(Alert(
kind="trigger",
title=f"Semnal {tr.trigger}",
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=fire_path,
direction=tr.trigger,
))
return _TickSyncResult(
frame=frame, res=res, tr=tr,
first_consumed=is_first, new_color=new_color,
)
async def _run_tick(ctx: RunContext) -> _TickSyncResult:
"""Execute one `_sync_detection_tick` in a thread; returns result or empty.
Lifecycle gating (user pause / operating hours / drift) happens here, not
inside the sync tick, so the async loop can still drain commands and emit
market_open / market_closed transitions even when the heavy detection
work is skipped.
"""
now = time.time()
if ctx.lifecycle is not None:
skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary)
_maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier)
if skip is not None:
# No detection this tick. Empty result → _handle_fsm_result no-op.
return _TickSyncResult()
return await asyncio.to_thread(
_sync_detection_tick,
ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm,
ctx.notifier, ctx.audit, ctx.detection_log,
ctx.fires_dir, ctx.state.first_accepted, ctx.state.last_saved_color,
now, ctx.samples_dir,
)
async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None:
"""Scheduler start/stop + levels extraction. No-op if res is None/late_start."""
if result.first_consumed:
ctx.state.first_accepted = False
if result.new_color is not None:
ctx.state.last_saved_color = result.new_color
tr = result.tr
res = result.res
if result.late_start or res is None:
return
if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None):
if tr.reason == "prime" and not ctx.scheduler.is_running:
ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr is not None and tr.trigger and not tr.locked:
ctx.state.fire_count += 1
if ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time())
if ctx.state.levels_extractor is not None and result.frame is not None:
lr = ctx.state.levels_extractor.step(result.frame, time.time())
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
ctx.notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
ctx.state.levels_extractor = None
async def _dispatch_command(ctx: RunContext, cmd) -> None:
"""Process a single Command. Exceptions bubble — caller wraps in try/except."""
cfg = ctx.cfg
if cmd.action == "set_interval":
secs = cmd.value or cfg.telegram.auto_poll_interval_s
ctx.scheduler.start(secs)
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "set_interval", "interval_s": secs})
ctx.notifier.send(Alert(kind="status", title=f"Polling activ — interval {secs // 60} min", body=""))
elif cmd.action == "stop":
if ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "command_stop"})
ctx.notifier.send(Alert(kind="status", title="Polling oprit", body=""))
else:
ctx.notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
elif cmd.action == "status":
uptime_s = time.monotonic() - ctx.state.start
last_roll = ctx.detector.rolling[-1] if ctx.detector.rolling else None
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else ""
last_color = (
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else ""
) if last_roll else ""
sched_info = (
f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ"
) if ctx.scheduler.is_running else "oprit"
canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok"
# Active / pause reason + window state
active_info = "activ"
window_info = ""
if ctx.lifecycle is not None:
skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
if skip is not None:
active_info = f"pauzat:{skip}"
oh = getattr(ctx.cfg, "operating_hours", None)
if oh is not None and oh.enabled:
window_info = ctx.lifecycle.last_window_state or ""
else:
window_info = "always_on"
body = (
f"Stare: {ctx.fsm.state.value}\n"
f"Activ: {active_info} | Fereastră: {window_info}\n"
f"Ultima detecție: {last_color} (conf {last_conf})\n"
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n"
f"Poller: {sched_info} | Canary: {canary_info}"
)
ctx.notifier.send(Alert(kind="status", title="ATM Status", body=body))
elif cmd.action == "ss":
now_ss = time.time()
frame_ss = await asyncio.to_thread(ctx.capture)
if frame_ss is None:
ctx.notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
))
return
path_ss = await asyncio.to_thread(
_save_annotated_frame, frame_ss, ctx.cfg, ctx.fires_dir, "ss", now_ss, ctx.audit,
)
ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
elif cmd.action == "pause":
# User manually stops monitoring. Canary drift state is untouched.
if ctx.lifecycle is not None:
ctx.lifecycle.user_paused = True
ctx.audit.log({"ts": time.time(), "event": "user_paused"})
ctx.notifier.send(Alert(
kind="status",
title="Monitorizare oprită manual",
body="Folosește /resume pentru a relua.",
))
elif cmd.action == "resume":
# R2: /resume clears only user_paused. Canary drift requires
# /resume force (value == 1) so the user acknowledges the risk.
was_drift = bool(getattr(ctx.canary, "is_paused", False))
was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False
force = cmd.value == 1
if ctx.lifecycle is not None:
ctx.lifecycle.user_paused = False
if force and was_drift:
ctx.canary.resume()
ctx.audit.log({
"ts": time.time(), "event": "user_resumed",
"was_drift": was_drift, "was_user": was_user, "force": force,
})
# Adaptive response
if was_drift and not force:
title = "Pauză user eliminată — dar Canary drift activ"
body = (
"Trimite /resume force pentru a anula drift-pause. "
"Recalibrează dacă driftul persistă."
)
elif force and was_drift:
title = "Drift-pause anulat manual (force)"
body = "Dacă driftul persistă, Canary va repauza."
else:
skip_now = None
if ctx.lifecycle is not None:
skip_now = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
if skip_now and skip_now.startswith("out_of_window"):
title = "Pauză eliminată — piața e închisă acum"
body = "Monitorizarea va porni la următoarea fereastră."
else:
title = "Monitorizare reluată"
body = ""
ctx.notifier.send(Alert(kind="status", title=title, body=body))
async def _drain_cmd_queue(ctx: RunContext) -> None:
"""Drain all pending commands, isolating each dispatch in try/except.
CRITICAL: this MUST run every loop iteration, unconditionally, even when
the detection tick returned nothing (canary paused, out-of-window, etc.).
Prior bug: the main loop `continue`'d past this drain when res=None,
causing commands to accumulate indefinitely while canary was drifted.
"""
while True:
try:
cmd = ctx.cmd_queue.get_nowait()
except asyncio.QueueEmpty:
return
try:
await _dispatch_command(ctx, cmd)
except Exception as exc:
ctx.audit.log({
"ts": time.time(), "event": "command_error",
"action": cmd.action, "error": str(exc),
})
print(f"ERR command_dispatch /{cmd.action}: {exc}", flush=True)
ctx.notifier.send(Alert(kind="warn", title=f"Eroare comandă /{cmd.action}", body=str(exc)))
def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None: def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
"""Sync entry point — delegates to asyncio event loop."""
asyncio.run(run_live_async(cfg, duration_s=duration_s, capture_stub=capture_stub))
async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> None:
"""Main live monitoring loop. Imports are lazy to keep --help fast.""" """Main live monitoring loop. Imports are lazy to keep --help fast."""
try: try:
from atm.detector import Detector from atm.detector import Detector
@@ -506,14 +1106,38 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
from atm.notifier.discord import DiscordNotifier from atm.notifier.discord import DiscordNotifier
from atm.notifier.telegram import TelegramNotifier from atm.notifier.telegram import TelegramNotifier
from atm.audit import AuditLog from atm.audit import AuditLog
from atm.commands import TelegramPoller, Command
from atm.scheduler import ScreenshotScheduler
except ImportError as exc: except ImportError as exc:
sys.exit(f"run-loop dependencies not available: {exc}") sys.exit(f"run-loop dependencies not available: {exc}")
capture = _build_capture(cfg, capture_stub=capture_stub) capture = _build_capture(cfg, capture_stub=capture_stub)
detector = Detector(cfg, capture) detector = Detector(cfg, capture)
fsm = StateMachine(lockout_s=cfg.lockout_s) fsm = StateMachine(lockout_s=cfg.lockout_s)
canary = Canary(cfg, pause_flag_path=Path("logs/pause.flag"))
audit = AuditLog(Path("logs")) audit = AuditLog(Path("logs"))
# Forward-declare notifier so the canary pause callback can close over it.
# The notifier is constructed a few lines below once backends exist.
_notifier_ref: dict = {}
def _on_canary_pause(distance: int) -> None:
audit.log({"ts": time.time(), "event": "canary_drift_paused", "distance": distance})
n = _notifier_ref.get("n")
if n is not None:
n.send(Alert(
kind="warn",
title=f"Canary drift={distance} — monitorizare pauzată",
body=(
"Fereastra/paleta s-a schimbat. Trimite /resume pentru a relua "
"sau recalibrează."
),
))
canary = Canary(
cfg,
pause_flag_path=Path("logs/pause.flag"),
on_pause_callback=_on_canary_pause,
)
detection_log = AuditLog(Path("logs/detections")) detection_log = AuditLog(Path("logs/detections"))
backends = [ backends = [
DiscordNotifier(cfg.discord.webhook_url), DiscordNotifier(cfg.discord.webhook_url),
@@ -521,7 +1145,6 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
] ]
def _on_drop(backend_name: str, dropped: Alert) -> None: def _on_drop(backend_name: str, dropped: Alert) -> None:
"""Audit la depășire coadă — face eșecul silențios vizibil."""
audit.log({ audit.log({
"ts": time.time(), "ts": time.time(),
"event": "queue_overflow_drop", "event": "queue_overflow_drop",
@@ -531,8 +1154,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
}) })
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop) notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
_notifier_ref["n"] = notifier
# Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea. # Initial frame + canary check
first_frame = capture() first_frame = capture()
if first_frame is None: if first_frame is None:
print("WARN: first capture returned None — window/region missing", flush=True) print("WARN: first capture returned None — window/region missing", flush=True)
@@ -542,9 +1166,9 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}" canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
if first_check.drifted: if first_check.drifted:
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True) print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
canary.resume() # clear the auto-pause so user can Ctrl+C and fix canary.resume()
dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h" dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
notifier.send(Alert( notifier.send(Alert(
kind="heartbeat", kind="heartbeat",
title="ATM pornit", title="ATM pornit",
@@ -556,106 +1180,55 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status}) audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
start = time.monotonic() start = time.monotonic()
heartbeat_due = time.time() + cfg.heartbeat_min * 60 heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
levels_extractor = None
last_saved_color: str | None = None
first_accepted = True
samples_dir = Path("samples") samples_dir = Path("samples")
samples_dir.mkdir(exist_ok=True) samples_dir.mkdir(exist_ok=True)
fires_dir = Path("logs") / "fires" fires_dir = Path("logs") / "fires"
fires_dir.mkdir(parents=True, exist_ok=True) fires_dir.mkdir(parents=True, exist_ok=True)
import cv2 # type: ignore[import-untyped]
try: try:
while duration_s is None or (time.monotonic() - start) < duration_s: import cv2 # noqa: F401 fail fast if cv2 is missing # type: ignore[import-untyped]
now = time.time() except ImportError:
frame = capture() pass
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
time.sleep(cfg.loop_interval_s)
continue
# canary check
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
time.sleep(cfg.loop_interval_s)
continue
# detection
res = detector.step(now)
detection_log.log({
"ts": now,
"event": "frame",
"window_found": res.window_found,
"dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None,
"match_name": res.match.name if res.match is not None else None,
"distance": round(res.match.distance, 2) if res.match is not None else None,
"confidence": round(res.match.confidence, 3) if res.match is not None else None,
"accepted": res.accepted,
"color": res.color,
})
if res.accepted and res.color:
is_first = first_accepted
first_accepted = False
# Per-iteration closure — binds current frame/now, gates on config. cmd_queue: asyncio.Queue[Command] = asyncio.Queue()
def _snapshot(kind: str, label: str) -> "Path | None": loop_state = _LoopState(first_accepted=True, last_saved_color=None,
if not getattr(cfg.attach_screenshots, kind, True): levels_extractor=None, fire_count=0, start=start)
return None
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
)
tr = _handle_tick( def _bound_save(frame: Any, label: str, now: float) -> "Path | None":
fsm, res.color, now, notifier, audit, is_first, return _save_annotated_frame(frame, cfg, fires_dir, label, now, audit=audit)
snapshot=_snapshot,
) scheduler = ScreenshotScheduler(
if tr is None: capture=capture,
# pornire târzie: FSM neatins, sari peste FIRE + salvare corpus save_fn=_bound_save,
time.sleep(cfg.loop_interval_s) notifier=notifier,
continue audit=audit,
# corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară )
if res.color != last_saved_color: poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png" lifecycle = LifecycleState()
try: # Seed lifecycle.last_window_state with the current status so we don't emit
cv2.imwrite(str(sample_path), frame) # a spurious market_open alert on the very first tick (R2).
except Exception as exc: _pre_skip = _should_skip(time.time(), lifecycle, cfg, canary)
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)}) _maybe_log_transition(_pre_skip, lifecycle, time.time(), audit, notifier)
last_saved_color = res.color
# FIRE: adnotează frame-ul + salvează, atașează la alertă ctx = RunContext(
if tr.trigger and not tr.locked: cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm,
fire_path: "Path | None" = None notifier=notifier, audit=audit, detection_log=detection_log,
if cfg.attach_screenshots.trigger: scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir,
fire_path = _save_annotated_frame( cmd_queue=cmd_queue, state=loop_state,
frame, cfg, fires_dir, tr.trigger, now, audit=audit, levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts),
) lifecycle=lifecycle,
notifier.send(Alert( )
kind="trigger",
title=f"Semnal {tr.trigger}", # ------------------------------------------------------------------
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", # Nested async coroutines — heartbeat captures notifier + heartbeat_due
image_path=fire_path, # ------------------------------------------------------------------
direction=tr.trigger,
)) async def _heartbeat_loop() -> None:
levels_extractor = LevelsExtractor(cfg, tr.trigger, now) nonlocal heartbeat_due
# phase-B levels while True:
if levels_extractor is not None: await asyncio.sleep(60)
lr = levels_extractor.step(frame, now) if time.monotonic() > heartbeat_due:
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
levels_extractor = None
# heartbeat — include statistici per-backend ca eșecurile silențioase
# să apară la fiecare 30 min fără să aștepte oprirea.
if time.time() > heartbeat_due:
try: try:
stats = notifier.stats() stats = notifier.stats()
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats}) audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
@@ -668,9 +1241,40 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines))) notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
except Exception: except Exception:
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok")) notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
heartbeat_due = time.time() + cfg.heartbeat_min * 60 heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
time.sleep(cfg.loop_interval_s)
async def _detection_loop() -> None:
while True:
if duration_s is not None and (time.monotonic() - start) >= duration_s:
break
result = await _run_tick(ctx)
await _handle_fsm_result(ctx, result)
await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang
await asyncio.sleep(cfg.loop_interval_s)
# Launch background tasks
t_scheduler = asyncio.create_task(scheduler.run(), name="scheduler")
t_poller = asyncio.create_task(poller.run(), name="poller")
t_heartbeat = asyncio.create_task(_heartbeat_loop(), name="heartbeat")
try:
await _detection_loop()
finally: finally:
# 7-step graceful shutdown
# 1. cancel scheduler
t_scheduler.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_scheduler
# 2. cancel poller
t_poller.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_poller
# 3. cancel heartbeat
t_heartbeat.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t_heartbeat
# 4. drain detection — complete (we awaited _detection_loop directly)
# 5. send shutdown alert
try: try:
stats = notifier.stats() stats = notifier.stats()
lines = [f"după {time.monotonic() - start:.0f}s"] lines = [f"după {time.monotonic() - start:.0f}s"]
@@ -679,13 +1283,12 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
f"{name}: sent={s['sent']} failed={s['failed']} " f"{name}: sent={s['sent']} failed={s['failed']} "
f"dropped={s['dropped']} retries={s['retries']}" f"dropped={s['dropped']} retries={s['retries']}"
) )
notifier.send(Alert( notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines)))
kind="heartbeat", title="ATM oprit",
body="\n".join(lines),
))
except Exception: except Exception:
pass pass
# 6. notifier.stop() — flush + join FanoutNotifier threads
notifier.stop() notifier.stop()
# 7. audit.close()
audit.close() audit.close()
detection_log.close() detection_log.close()

View File

@@ -5,11 +5,13 @@ from typing import Protocol
@dataclass @dataclass
class Alert: class Alert:
kind: str # "trigger" | "heartbeat" | "levels" | "warn" | "arm" | "prime" | "late_start" # flat union: "trigger"|"heartbeat"|"levels"|"warn"|"arm"|"prime"|"late_start"|"screenshot"|"status"
kind: str
title: str title: str
body: str body: str
image_path: Path | None = None # annotated screenshot image_path: Path | None = None # annotated screenshot
direction: str | None = None # "BUY"/"SELL" when kind=trigger direction: str | None = None # "BUY"/"SELL" when kind=trigger
silent: bool = False # disable_notification for Telegram; ignored by Discord
class Notifier(Protocol): class Notifier(Protocol):

View File

@@ -33,6 +33,7 @@ class TelegramNotifier:
"chat_id": self._chat_id, "chat_id": self._chat_id,
"caption": text, "caption": text,
"parse_mode": "HTML", "parse_mode": "HTML",
"disable_notification": str(alert.silent).lower(),
}, },
files={"photo": fh}, files={"photo": fh},
timeout=10, timeout=10,
@@ -44,6 +45,7 @@ class TelegramNotifier:
"chat_id": self._chat_id, "chat_id": self._chat_id,
"text": text, "text": text,
"parse_mode": "HTML", "parse_mode": "HTML",
"disable_notification": alert.silent,
}, },
timeout=10, timeout=10,
) )

118
src/atm/scheduler.py Normal file
View File

@@ -0,0 +1,118 @@
"""ScreenshotScheduler — periodic capture + annotate + send.
Runs as an asyncio task. capture() and cv2 work execute in asyncio.to_thread
to avoid blocking the event loop. Decision 13: scheduler calls capture()
directly, NOT via Detector.
"""
from __future__ import annotations
import asyncio
import logging
import time
from pathlib import Path
from typing import Callable
from .notifier import Alert
logger = logging.getLogger(__name__)
class ScreenshotScheduler:
"""Periodic screenshot sender.
Constructor params are explicit (decision 11 outside-voice finding).
"""
def __init__(
self,
capture: Callable, # () -> ndarray | None
save_fn: Callable, # (frame, label, now) -> Path | None
notifier, # _NotifierLike
audit, # _AuditLike
interval_s: int | None = None,
) -> None:
self._capture = capture
self._save_fn = save_fn
self._notifier = notifier
self._audit = audit
self._interval_s = interval_s
self._is_running = False
self._next_due: float | None = None # monotonic
# ------------------------------------------------------------------
# Public state
# ------------------------------------------------------------------
@property
def is_running(self) -> bool:
return self._is_running
@property
def interval_s(self) -> int | None:
return self._interval_s
@property
def next_due(self) -> float | None:
return self._next_due
# ------------------------------------------------------------------
# Control (called from async event loop)
# ------------------------------------------------------------------
def start(self, interval_s: int) -> None:
self._interval_s = interval_s
self._is_running = True
self._next_due = time.monotonic() + interval_s
def stop(self) -> None:
self._is_running = False
self._next_due = None
# ------------------------------------------------------------------
# Task body
# ------------------------------------------------------------------
async def run(self) -> None:
"""Runs until cancelled."""
while True:
await asyncio.sleep(1)
if not self._is_running or self._next_due is None:
continue
if time.monotonic() >= self._next_due:
await self._take_screenshot()
if self._is_running and self._interval_s is not None:
self._next_due = time.monotonic() + self._interval_s
async def _take_screenshot(self) -> None:
now = time.time()
try:
frame = await asyncio.to_thread(self._capture)
except Exception as exc:
logger.warning("ScreenshotScheduler capture failed: %s", exc)
self._audit.log({"ts": now, "event": "screenshot_sent", "status": "capture_failed", "error": str(exc)})
self._notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
silent=True,
))
return
if frame is None:
self._notifier.send(Alert(
kind="warn",
title="Captură eșuată — verificați fereastra TradeStation",
body="",
silent=True,
))
return
path = await asyncio.to_thread(self._save_fn, frame, "poll", now)
self._audit.log({"ts": now, "event": "screenshot_sent", "path": str(path) if path else None})
self._notifier.send(Alert(
kind="screenshot",
title="Screenshot periodic",
body="",
image_path=path,
silent=True,
))

View File

@@ -232,3 +232,20 @@ class StateMachine:
if last is None: if last is None:
return False return False
return (ts - last) < self._lockout_s return (ts - last) < self._lockout_s
# ------------------------------------------------------------------
# Public lockout API — used by fire_on_phase_skip handler outside the
# FSM. Mirrors _is_locked / _last_fire without leaking private attrs.
# ------------------------------------------------------------------
def is_locked(self, direction: str, ts: float) -> bool:
"""True if a FIRE in `direction` at ts would be within the lockout window."""
return self._is_locked(direction, ts)
def record_fire(self, direction: str, ts: float) -> None:
"""Mark a FIRE for `direction` at ts, starting the lockout timer.
Used by backstop handlers (e.g. fire_on_phase_skip) that emit a
fire-equivalent alert without going through the natural FSM path.
"""
self._last_fire[direction] = ts

229
src/atm/validate.py Normal file
View File

@@ -0,0 +1,229 @@
"""Offline calibration validation: run Detector on labeled frames, report PASS/FAIL.
Used by the `atm validate-calibration` subcommand. Reports per-sample detection
results against expected labels, and for failures, computes RGB distance to
each color threshold and emits tuning suggestions.
Reuses `Detector.step(frame)` - does NOT reimplement color classification.
"""
from __future__ import annotations
import json
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from .config import Config
@dataclass
class SampleRecord:
path: str
expected: str
detected: str | None
confidence: float
rgb: tuple[int, int, int] | None
top3: list[tuple[str, float]] # [(name, score), ...] ranked by RGB distance
passed: bool
note: str = ""
error: str | None = None # non-None if frame load failed / schema bad
@dataclass
class ValidationReport:
records: list[SampleRecord] = field(default_factory=list)
config_name: str = ""
@property
def total(self) -> int:
return len(self.records)
@property
def passed(self) -> int:
return sum(1 for r in self.records if r.passed)
@property
def failed(self) -> int:
return self.total - self.passed
@property
def all_pass(self) -> bool:
return self.total > 0 and self.failed == 0
def render(self) -> str:
lines: list[str] = []
hdr = f"Testing {self.total} frames"
if self.config_name:
hdr += f" against config {self.config_name}"
hdr += "..."
lines.append(hdr)
lines.append("")
for r in self.records:
name = Path(r.path).name or r.path
if r.error:
lines.append(f" [FAIL] {name}")
lines.append(f" error: {r.error}")
continue
tag = "PASS" if r.passed else "FAIL"
rgb_str = f"RGB {r.rgb}" if r.rgb is not None else "RGB n/a"
detected = r.detected if r.detected is not None else "none"
lines.append(f" [{tag}] {name}")
lines.append(
f" expected={r.expected} detected={detected} "
f"(conf {r.confidence:.2f}, {rgb_str})"
)
if not r.passed and r.top3:
top3_str = " ".join(f"{n}({c:.2f})" for n, c in r.top3)
lines.append(f" Top 3 candidates: {top3_str}")
lines.append("")
pct = (self.passed / self.total * 100.0) if self.total else 0.0
lines.append(f"SUMMARY: {self.passed}/{self.total} PASS ({pct:.0f}%)")
fails = [r for r in self.records if not r.passed]
if fails:
lines.append("FAILED:")
for r in fails:
name = Path(r.path).name or r.path
if r.error:
lines.append(f" - {name}: {r.error}")
continue
detected = r.detected if r.detected is not None else "none"
lines.append(
f" - {name}: expected {r.expected}, got {detected}"
)
sug_lines = [
r._suggestion # type: ignore[attr-defined]
for r in fails
if getattr(r, "_suggestion", "")
]
if sug_lines:
lines.append("")
lines.append("SUGGESTIONS:")
for s in sug_lines:
lines.append(f" - {s}")
return "\n".join(lines)
def __str__(self) -> str:
return self.render()
class ValidationError(Exception):
"""Raised for missing label files or invalid schema."""
def _rgb_distance(a: tuple[int, int, int], b: tuple[int, int, int]) -> float:
return math.sqrt(sum((a[i] - b[i]) ** 2 for i in range(3)))
def _load_labels(label_file: Path) -> list[dict[str, Any]]:
if not label_file.exists():
raise ValidationError(f"label file not found: {label_file}")
try:
data = json.loads(label_file.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise ValidationError(f"invalid JSON in {label_file}: {exc}") from exc
if not isinstance(data, list):
raise ValidationError(
f"label file must be a JSON array; got {type(data).__name__}"
)
return data
def validate_calibration(
label_file: Path,
cfg: Config,
config_name: str = "",
) -> ValidationReport:
"""Run Detector on each labeled frame; return a ValidationReport.
Reuses `Detector.step(frame)`. Loads frames via cv2.imread.
Raises ValidationError if the label file is missing or malformed.
"""
import cv2 # local import keeps module import cheap
from .detector import Detector
entries = _load_labels(label_file)
report = ValidationReport(config_name=config_name)
palette = {
name: spec.rgb
for name, spec in cfg.colors.items()
if name != "background"
}
detector = Detector(cfg=cfg, capture=lambda: None)
for entry in entries:
path = str(entry.get("path", ""))
expected = str(entry.get("expected", ""))
note = str(entry.get("note", ""))
if not path or not expected:
rec = SampleRecord(
path=path, expected=expected, detected=None, confidence=0.0,
rgb=None, top3=[], passed=False, note=note,
error="missing 'path' or 'expected' field",
)
rec._suggestion = "" # type: ignore[attr-defined]
report.records.append(rec)
continue
frame = cv2.imread(path)
if frame is None:
rec = SampleRecord(
path=path, expected=expected, detected=None, confidence=0.0,
rgb=None, top3=[], passed=False, note=note,
error=f"cv2.imread failed for {path}",
)
rec._suggestion = "" # type: ignore[attr-defined]
report.records.append(rec)
continue
result = detector.step(ts=0.0, frame=frame)
match = result.match
if match is None:
detected: str | None = None
confidence = 0.0
else:
detected = match.name if match.name != "UNKNOWN" else None
confidence = match.confidence
rgb = result.rgb
# Top 3 candidates: rank palette entries by RGB distance to observed.
top3: list[tuple[str, float]] = []
if rgb is not None:
scored: list[tuple[str, float]] = []
for name, ref in palette.items():
scored.append((name, _rgb_distance(rgb, ref)))
scored.sort(key=lambda t: t[1])
top3 = [(n, 1.0 / (1.0 + d / 20.0)) for n, d in scored[:3]]
passed = detected == expected
rec = SampleRecord(
path=path, expected=expected, detected=detected,
confidence=confidence, rgb=rgb, top3=top3, passed=passed, note=note,
)
if not passed and rgb is not None and expected in palette:
ref = palette[expected]
tol = cfg.colors[expected].tolerance
dist = _rgb_distance(rgb, ref)
rec._suggestion = ( # type: ignore[attr-defined]
f"{expected} praguri curente: RGB{ref} +/- {tol:.0f}. "
f"Pixelul observat {rgb} e la distanta {dist:.1f} "
f"-> recalibreaza cu acest sample."
)
else:
rec._suggestion = "" # type: ignore[attr-defined]
report.records.append(rec)
return report

View File

@@ -140,6 +140,52 @@ def test_pause_file_written(tmp_path: Path) -> None:
assert flag.exists() assert flag.exists()
def test_canary_pause_callback_fires_once() -> None:
"""Single-shot: callback invoked exactly once per not_paused→paused edge."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
calls: list[int] = []
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
canary.check(DRIFTED_FRAME) # transition → callback fires
canary.check(DRIFTED_FRAME) # still paused → no new callback
canary.check(BASELINE_FRAME) # clean but still paused → no new callback
assert len(calls) == 1
assert calls[0] > 0 # distance should be positive
def test_canary_resume_allows_new_pause_notification() -> None:
"""After resume, a fresh drift must re-fire the callback."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
calls: list[int] = []
canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d))
canary.check(DRIFTED_FRAME)
assert len(calls) == 1
canary.resume()
canary.check(DRIFTED_FRAME) # new pause transition
assert len(calls) == 2
def test_canary_pause_callback_exception_does_not_crash_check() -> None:
"""A failing callback must not break canary.check (detection cycle safety)."""
cfg = _cfg_with_baseline(BASELINE_FRAME)
def _boom(_d: int) -> None:
raise RuntimeError("notifier down")
canary = Canary(cfg, on_pause_callback=_boom)
# Must not raise — exception is swallowed + logged.
result = canary.check(DRIFTED_FRAME)
assert result.paused is True
assert canary.is_paused is True
def test_resume_deletes_pause_file(tmp_path: Path) -> None: def test_resume_deletes_pause_file(tmp_path: Path) -> None:
"""resume() deletes the pause flag file.""" """resume() deletes the pause flag file."""
flag = tmp_path / "paused.flag" flag = tmp_path / "paused.flag"

45
tests/test_commands.py Normal file
View File

@@ -0,0 +1,45 @@
"""Tests for atm.commands — /pause /resume parsing (Commit 5)."""
from __future__ import annotations
from unittest.mock import MagicMock
from atm.commands import Command, TelegramPoller
def _make_poller() -> TelegramPoller:
cfg = MagicMock()
cfg.bot_token = "tok"
cfg.chat_id = "123"
cfg.allowed_chat_ids = ("123",)
cfg.poll_timeout_s = 1
return TelegramPoller(cfg, MagicMock(), MagicMock())
def test_parse_pause():
p = _make_poller()
assert p._parse_command("pause") == Command(action="pause")
assert p._parse_command("/pause") == Command(action="pause")
def test_parse_resume_plain():
p = _make_poller()
assert p._parse_command("resume") == Command(action="resume")
assert p._parse_command("/resume") == Command(action="resume")
def test_parse_resume_force():
p = _make_poller()
# "resume force" → value=1 signals force-resume of canary drift
cmd = p._parse_command("resume force")
assert cmd is not None
assert cmd.action == "resume"
assert cmd.value == 1
def test_parse_existing_commands_still_work():
"""Regression: adding pause/resume must not break stop/status/ss/interval."""
p = _make_poller()
assert p._parse_command("stop") == Command(action="stop")
assert p._parse_command("status") == Command(action="status")
assert p._parse_command("ss") == Command(action="ss")
assert p._parse_command("3") == Command(action="set_interval", value=180)

View File

@@ -97,3 +97,59 @@ def test_attach_screenshots_unknown_keys_ignored() -> None:
})) }))
assert cfg.attach_screenshots.arm is False assert cfg.attach_screenshots.arm is False
# Should not raise even with unknown key # Should not raise even with unknown key
# ---------------------------------------------------------------------------
# Commit 3: AlertBehaviorCfg (fire_on_phase_skip)
# ---------------------------------------------------------------------------
def test_alerts_default_fire_on_phase_skip_true() -> None:
cfg = Config._from_dict(_with_opts({}))
assert cfg.alerts.fire_on_phase_skip is True
def test_alerts_fire_on_phase_skip_can_be_disabled() -> None:
cfg = Config._from_dict(_with_opts({"alerts": {"fire_on_phase_skip": False}}))
assert cfg.alerts.fire_on_phase_skip is False
# ---------------------------------------------------------------------------
# Commit 4: OperatingHoursCfg parsing + tz cache
# ---------------------------------------------------------------------------
def test_operating_hours_default_disabled() -> None:
cfg = Config._from_dict(_with_opts({}))
assert cfg.operating_hours.enabled is False
assert cfg.operating_hours.timezone == "America/New_York"
assert cfg.operating_hours._tz_cache is None
def test_operating_hours_enabled_caches_tz() -> None:
cfg = Config._from_dict(_with_opts({
"operating_hours": {
"enabled": True,
"timezone": "America/New_York",
"weekdays": ["MON", "TUE", "WED", "THU", "FRI"],
"start_hhmm": "09:30",
"stop_hhmm": "16:00",
}
}))
assert cfg.operating_hours.enabled is True
assert cfg.operating_hours._tz_cache is not None
assert str(cfg.operating_hours._tz_cache) == "America/New_York"
def test_operating_hours_invalid_tz_raises_valueerror() -> None:
import pytest
with pytest.raises(ValueError, match="operating_hours.timezone"):
Config._from_dict(_with_opts({
"operating_hours": {"enabled": True, "timezone": "Not/A_Zone"},
}))
def test_operating_hours_invalid_weekday_raises_valueerror() -> None:
import pytest
with pytest.raises(ValueError, match="weekdays"):
Config._from_dict(_with_opts({
"operating_hours": {"enabled": True, "weekdays": ["XYZ"]},
}))

View File

@@ -10,6 +10,8 @@ Covers the six cases from the arm+prime notification plan:
""" """
from __future__ import annotations from __future__ import annotations
from types import SimpleNamespace
from atm.main import _handle_tick from atm.main import _handle_tick
from atm.notifier import Alert from atm.notifier import Alert
from atm.state_machine import State, StateMachine from atm.state_machine import State, StateMachine
@@ -486,3 +488,82 @@ def test_save_annotated_frame_succeeds(tmp_path, monkeypatch):
assert "BUY" in result.name assert "BUY" in result.name
assert len(written) == 1 assert len(written) == 1
assert not any(e.get("event") == "snapshot_fail" for e in audit.events) assert not any(e.get("event") == "snapshot_fail" for e in audit.events)
# ---------------------------------------------------------------------------
# Commit 3: fire_on_phase_skip backstop
# ---------------------------------------------------------------------------
def _cfg_with_flag(enabled: bool):
return SimpleNamespace(alerts=SimpleNamespace(fire_on_phase_skip=enabled))
def test_phase_skip_fire_when_flag_on():
"""ARMED_SELL → light_red directly with flag=True → phase_skip_fire alert."""
fsm = StateMachine(lockout_s=240)
notif = FakeNotifier()
audit = FakeAudit()
# Arm SELL (yellow from IDLE)
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False,
cfg=_cfg_with_flag(True))
assert fsm.state == State.ARMED_SELL
notif.alerts.clear()
# ARMED_SELL → light_red (skips dark_red) → phase_skip_fire
tr = _handle_tick(fsm, "light_red", 2.0, notif, audit, first_accepted=False,
cfg=_cfg_with_flag(True))
assert tr is not None and tr.reason == "phase_skip"
ps_alerts = [a for a in notif.alerts if a.kind == "phase_skip_fire"]
assert len(ps_alerts) == 1
assert ps_alerts[0].direction == "SELL"
assert "SELL" in ps_alerts[0].title
def test_phase_skip_no_fire_when_flag_off():
"""Same scenario, flag=False → no phase_skip_fire emitted."""
fsm = StateMachine(lockout_s=240)
notif = FakeNotifier()
audit = FakeAudit()
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False,
cfg=_cfg_with_flag(False))
notif.alerts.clear()
_handle_tick(fsm, "light_red", 2.0, notif, audit, first_accepted=False,
cfg=_cfg_with_flag(False))
ps_alerts = [a for a in notif.alerts if a.kind == "phase_skip_fire"]
assert ps_alerts == []
def test_phase_skip_lockout_suppresses_spam():
"""Two phase_skip events within lockout_s → only the first emits an alert."""
fsm = StateMachine(lockout_s=240)
notif = FakeNotifier()
audit = FakeAudit()
cfg = _cfg_with_flag(True)
# First cycle
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg)
_handle_tick(fsm, "light_red", 2.0, notif, audit, first_accepted=False, cfg=cfg)
# Second arm + phase_skip well within 240s
_handle_tick(fsm, "yellow", 60.0, notif, audit, first_accepted=False, cfg=cfg)
_handle_tick(fsm, "light_red", 61.0, notif, audit, first_accepted=False, cfg=cfg)
ps_alerts = [a for a in notif.alerts if a.kind == "phase_skip_fire"]
assert len(ps_alerts) == 1, (
f"expected 1 phase_skip_fire (lockout), got {len(ps_alerts)}"
)
def test_state_machine_is_locked_and_record_fire_public_api():
"""Public lockout helpers mirror the private _is_locked / _last_fire behavior."""
fsm = StateMachine(lockout_s=100)
assert fsm.is_locked("BUY", 0.0) is False
fsm.record_fire("BUY", 10.0)
assert fsm.is_locked("BUY", 50.0) is True # within 100s
assert fsm.is_locked("BUY", 150.0) is False # past lockout
assert fsm.is_locked("SELL", 50.0) is False # other direction unaffected

View File

@@ -1,6 +1,7 @@
"""Tests for atm.main unified CLI.""" """Tests for atm.main unified CLI."""
from __future__ import annotations from __future__ import annotations
import asyncio
import os import os
import subprocess import subprocess
import sys import sys
@@ -186,7 +187,7 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
] ]
def __init__(self, *a, **kw): def __init__(self, *a, **kw):
self._i = 0 self._i = 0
def step(self, ts): def step(self, ts, frame=None):
if self._i >= len(self._script): if self._i >= len(self._script):
raise _StopLoop raise _StopLoop
color, accepted = self._script[self._i] color, accepted = self._script[self._i]
@@ -228,6 +229,17 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
def step(self, *a, **kw): def step(self, *a, **kw):
return types.SimpleNamespace(status="pending", levels=None) return types.SimpleNamespace(status="pending", levels=None)
class _StubPoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StubScheduler:
def __init__(self, *a, **kw):
self.is_running = False
def start(self, interval_s): self.is_running = True
def stop(self): self.is_running = False
async def run(self): await asyncio.sleep(9999)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector) monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary) monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout) monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
@@ -237,6 +249,8 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub) monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture) monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.main.time.sleep", lambda s: None) monkeypatch.setattr("atm.main.time.sleep", lambda s: None)
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
with pytest.raises(_StopLoop): with pytest.raises(_StopLoop):
_main.run_live(cfg, duration_s=None) _main.run_live(cfg, duration_s=None)
@@ -255,3 +269,713 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
assert len(trigger) == 1 assert len(trigger) == 1
assert trigger[0].direction == "SELL" assert trigger[0].direction == "SELL"
# ---------------------------------------------------------------------------
# MUST-HAVE: async lifecycle integration test
# IDLE → ARMED → PRIMED (auto-poll scheduler starts) → FIRE (scheduler stops)
# Tests: scheduler starts on prime, stops on fire, fire alert sent.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_path):
import numpy as np
import atm.main as _main
from atm.detector import DetectionResult
captured_alerts: list = []
scheduler_events: list[str] = []
class FakeFanout:
def __init__(self, *a, **kw): pass
def send(self, alert): captured_alerts.append(alert)
def stop(self): pass
def stats(self): return {}
class FakeCanaryResult:
distance = 0
drifted = False
paused = False
class FakeCanary:
def __init__(self, *a, **kw): self.is_paused = False
def check(self, frame): return FakeCanaryResult()
def resume(self): pass
# Scheduler tracks start/stop calls
class FakeScheduler:
def __init__(self, *a, **kw):
self.is_running = False
self.interval_s = None
def start(self, interval_s):
self.is_running = True
self.interval_s = interval_s
scheduler_events.append(f"start:{interval_s}")
def stop(self):
self.is_running = False
scheduler_events.append("stop")
async def run(self):
await asyncio.sleep(9999)
class FakePoller:
def __init__(self, *a, **kw): pass
async def run(self): await asyncio.sleep(9999)
class _StopLoop(Exception): pass
class ScriptedDetector:
# turquoise→ARM, dark_green→PRIME, light_green→FIRE
_script = [
("turquoise", True),
("dark_green", True),
("light_green", True),
]
def __init__(self, *a, **kw): self._i = 0
def step(self, ts, frame=None):
if self._i >= len(self._script):
raise _StopLoop
color, accepted = self._script[self._i]
self._i += 1
return DetectionResult(ts=ts, window_found=True, dot_found=True,
rgb=(1, 1, 1), match=None, accepted=accepted, color=color)
@property
def rolling(self): return []
def fake_build_capture(cfg, capture_stub=False):
return lambda: np.zeros((50, 50, 3), dtype=np.uint8)
cfg = MagicMock()
cfg.lockout_s = 60
cfg.heartbeat_min = 999
cfg.loop_interval_s = 0
cfg.config_version = "test"
cfg.dead_letter_path = str(tmp_path / "dl.jsonl")
cfg.canary.drift_threshold = 10
cfg.dot_roi.x = 0; cfg.dot_roi.y = 0; cfg.dot_roi.w = 10; cfg.dot_roi.h = 10
cfg.chart_window_region = None
cfg.telegram.auto_poll_interval_s = 180
cfg.telegram.bot_token = "tok"
cfg.telegram.chat_id = "123"
cfg.telegram.allowed_chat_ids = ("123",)
fake_sched = FakeScheduler()
monkeypatch.chdir(tmp_path)
class _Stub:
def __init__(self, *a, **kw): pass
def log(self, *a, **kw): pass
def close(self, *a, **kw): pass
def step(self, *a, **kw): return types.SimpleNamespace(status="pending", levels=None)
monkeypatch.setattr("atm.detector.Detector", ScriptedDetector)
monkeypatch.setattr("atm.canary.Canary", FakeCanary)
monkeypatch.setattr("atm.notifier.fanout.FanoutNotifier", FakeFanout)
monkeypatch.setattr("atm.notifier.discord.DiscordNotifier", _Stub)
monkeypatch.setattr("atm.notifier.telegram.TelegramNotifier", _Stub)
monkeypatch.setattr("atm.audit.AuditLog", _Stub)
monkeypatch.setattr("atm.levels.LevelsExtractor", _Stub)
monkeypatch.setattr("atm.main._build_capture", fake_build_capture)
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
with pytest.raises(_StopLoop):
await _main.run_live_async(cfg, duration_s=None)
arm_alerts = [a for a in captured_alerts if a.kind == "arm"]
prime_alerts = [a for a in captured_alerts if a.kind == "prime"]
trigger_alerts = [a for a in captured_alerts if a.kind == "trigger"]
assert len(arm_alerts) == 1, f"expected 1 arm, got {[a.title for a in captured_alerts]}"
assert arm_alerts[0].direction == "BUY"
assert len(prime_alerts) == 1
assert prime_alerts[0].direction == "BUY"
assert len(trigger_alerts) == 1
assert trigger_alerts[0].direction == "BUY"
# Scheduler must have started (on PRIME) and stopped (on FIRE)
assert "start:180" in scheduler_events, f"scheduler not started: {scheduler_events}"
assert "stop" in scheduler_events, f"scheduler not stopped: {scheduler_events}"
start_idx = scheduler_events.index("start:180")
stop_idx = scheduler_events.index("stop")
assert start_idx < stop_idx, "scheduler started after it stopped"
# ---------------------------------------------------------------------------
# Commit 1 regression tests: _drain_cmd_queue MUST run unconditionally,
# even when canary is paused or when detection is otherwise skipped.
# Prior bug: `continue` past the drain loop caused commands to pile up.
# ---------------------------------------------------------------------------
def _make_ctx_for_drain(cmd_queue, dispatched: list):
"""Build a minimal RunContext where _dispatch_command just records calls."""
import atm.main as _main
class _FakeAudit:
def __init__(self): self.events = []
def log(self, e): self.events.append(e)
class _FakeNotifier:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _FakeCanary:
def __init__(self, paused=True):
self.is_paused = paused
class _FakeScheduler:
is_running = False
interval_s = None
def start(self, s): pass
def stop(self): pass
state = _main._LoopState(start=0.0)
ctx = _main.RunContext(
cfg=MagicMock(),
capture=lambda: None,
canary=_FakeCanary(paused=True),
detector=MagicMock(),
fsm=MagicMock(),
notifier=_FakeNotifier(),
audit=_FakeAudit(),
detection_log=_FakeAudit(),
scheduler=_FakeScheduler(),
samples_dir=Path("."),
fires_dir=Path("."),
cmd_queue=cmd_queue,
state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
return ctx
@pytest.mark.asyncio
async def test_drain_works_when_canary_paused(monkeypatch):
"""Regression: when canary.is_paused, _drain_cmd_queue still dispatches.
Prior bug: detection loop `continue`'d past the drain block whenever the
tick returned res=None (canary paused). Commands accumulated forever.
"""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="status"))
await q.put(Command(action="ss"))
dispatched: list = []
async def _fake_dispatch(ctx, cmd):
dispatched.append(cmd.action)
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, dispatched)
await _main._drain_cmd_queue(ctx)
assert dispatched == ["status", "ss"]
assert q.empty()
@pytest.mark.asyncio
async def test_drain_works_when_out_of_window(monkeypatch):
"""Drain must still fire when the tick skipped (e.g. out of operating hours).
The refactored loop runs _drain_cmd_queue unconditionally after every tick,
regardless of `_TickSyncResult` content.
"""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="stop"))
dispatched: list = []
async def _fake_dispatch(ctx, cmd):
dispatched.append(cmd.action)
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, dispatched)
# Simulate out-of-window tick (empty _TickSyncResult, no res)
await _main._handle_fsm_result(ctx, _main._TickSyncResult())
await _main._drain_cmd_queue(ctx)
assert dispatched == ["stop"]
@pytest.mark.asyncio
async def test_drain_isolates_dispatch_exceptions(monkeypatch):
"""If one command raises, remaining commands still drain + warn alert sent."""
import atm.main as _main
from atm.commands import Command
q: asyncio.Queue = asyncio.Queue()
await q.put(Command(action="status"))
await q.put(Command(action="ss"))
attempts: list = []
async def _fake_dispatch(ctx, cmd):
attempts.append(cmd.action)
if cmd.action == "status":
raise RuntimeError("boom")
monkeypatch.setattr(_main, "_dispatch_command", _fake_dispatch)
ctx = _make_ctx_for_drain(q, attempts)
await _main._drain_cmd_queue(ctx)
assert attempts == ["status", "ss"]
# warn alert for the failed command
warn_titles = [a.title for a in ctx.notifier.alerts if a.kind == "warn"]
assert any("status" in t for t in warn_titles)
# command_error audit event
errs = [e for e in ctx.audit.events if e.get("event") == "command_error"]
assert len(errs) == 1 and errs[0]["action"] == "status"
# ---------------------------------------------------------------------------
# Commit 4: operating hours + LifecycleState transitions
# ---------------------------------------------------------------------------
from zoneinfo import ZoneInfo as _ZI # noqa: E402
import datetime as _dt # noqa: E402
def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"),
start="09:30", stop="16:00", tz="America/New_York"):
"""Build a lightweight cfg-like object with operating_hours populated."""
oh = types.SimpleNamespace(
enabled=enabled,
timezone=tz,
weekdays=weekdays,
start_hhmm=start,
stop_hhmm=stop,
_tz_cache=_ZI(tz) if enabled else None,
)
return types.SimpleNamespace(operating_hours=oh)
def _fake_canary(paused=False):
return types.SimpleNamespace(is_paused=paused)
@pytest.mark.parametrize(
"local_dt,expected",
[
# Monday 09:30 NY — exact open → active (None)
(_dt.datetime(2026, 4, 20, 9, 30), None),
# Monday 16:00 NY — exact close → inactive (>= stop)
(_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"),
# Monday 08:00 NY — before open
(_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"),
# Monday 12:00 NY — active
(_dt.datetime(2026, 4, 20, 12, 0), None),
# Saturday 12:00 NY — weekend
(_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"),
# Sunday 23:00 NY — weekend
(_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"),
],
)
def test_operating_hours_skip_matrix(local_dt, expected):
"""Timezone-aware start/stop + weekday checks."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
now_ts = local_dt.replace(tzinfo=tz).timestamp()
lifecycle = _main.LifecycleState()
result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary())
assert result == expected
def test_market_open_close_transitions_logged_once():
"""Crossing a boundary emits exactly one market_open / market_closed event."""
import atm.main as _main
audit_events = []
alerts = []
class _A:
def log(self, e): audit_events.append(e)
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState()
canary = _fake_canary()
# Prime as closed (before open, Monday 08:00)
pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp()
skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N())
# First evaluation seeds state, no alert yet.
assert lifecycle.last_window_state == "closed"
assert alerts == []
assert audit_events == []
# Transition to open
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
skip_mid = _main._should_skip(mid, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N())
assert lifecycle.last_window_state == "open"
assert len(alerts) == 1
assert any(e.get("event") == "market_open" for e in audit_events)
# Repeated open tick — no duplicate log
alerts.clear()
audit_events.clear()
skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N())
assert alerts == []
assert audit_events == []
# Transition to close
close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp()
skip_close = _main._should_skip(close, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N())
assert lifecycle.last_window_state == "closed"
assert any(e.get("event") == "market_closed" for e in audit_events)
def test_market_transition_sends_notification():
"""market_open / market_closed transitions produce kind=status alerts."""
import atm.main as _main
alerts = []
class _A:
def log(self, e): pass
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState(last_window_state="closed")
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
_main._maybe_log_transition(None, lifecycle, mid, _A(), _N())
assert len(alerts) == 1
assert alerts[0].kind == "status"
assert "market" in alerts[0].title.lower() or "piața" in alerts[0].body.lower()
def test_startup_in_window_suppresses_market_open():
"""R2 #20: first evaluation in-window just seeds state; no alert fires."""
import atm.main as _main
alerts = []
events = []
class _A:
def log(self, e): events.append(e)
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState() # last_window_state is None
in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary())
assert skip is None
_main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N())
# Seeded silently
assert lifecycle.last_window_state == "open"
assert alerts == []
assert not any(e.get("event") == "market_open" for e in events)
# Two more ticks, still in-window → no spurious alert
for _ in range(2):
skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary())
_main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N())
assert alerts == []
def test_operating_hours_weekday_locale_independent():
"""R2 #22: weekday check must not depend on process locale (strftime('%a'))."""
import locale as _locale
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
# Saturday 12:00 NY
sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
original = _locale.setlocale(_locale.LC_TIME)
try:
for loc in ("C", "de_DE.UTF-8"):
try:
_locale.setlocale(_locale.LC_TIME, loc)
except _locale.Error:
continue # locale not installed → skip gracefully
lifecycle = _main.LifecycleState()
result = _main._should_skip(sat, lifecycle, cfg, _fake_canary())
assert result == "out_of_window_weekend", (
f"locale={loc} returned {result!r}"
)
finally:
try:
_locale.setlocale(_locale.LC_TIME, original)
except _locale.Error:
_locale.setlocale(_locale.LC_TIME, "C")
def test_should_skip_user_paused_wins():
import atm.main as _main
cfg = _oh_cfg()
lifecycle = _main.LifecycleState(user_paused=True)
# Mid-Monday (in-window) — should still skip because user_paused
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused"
def test_should_skip_canary_drift_wins_over_window():
import atm.main as _main
cfg = _oh_cfg()
lifecycle = _main.LifecycleState()
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused"
# ---------------------------------------------------------------------------
# Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21)
# ---------------------------------------------------------------------------
def _dispatch_ctx(canary=None, lifecycle=None, cfg=None):
"""Minimal RunContext for _dispatch_command unit tests."""
import atm.main as _main
class _A:
def __init__(self): self.events = []
def log(self, e): self.events.append(e)
class _N:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _S:
is_running = False
interval_s = None
def start(self, s): self.is_running = True
def stop(self): self.is_running = False
if canary is None:
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
if lifecycle is None:
lifecycle = _main.LifecycleState()
if cfg is None:
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
state = _main._LoopState(start=0.0)
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=canary,
detector=MagicMock(), fsm=MagicMock(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=lifecycle,
)
return ctx
@pytest.mark.asyncio
async def test_pause_command_sets_user_paused_and_skips_detection():
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
await _main._dispatch_command(ctx, Command(action="pause"))
assert ctx.lifecycle.user_paused is True
# When combined with _should_skip, we get user_paused
assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused"
# Audit + notif
assert any(e.get("event") == "user_paused" for e in ctx.audit.events)
assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts)
@pytest.mark.asyncio
async def test_resume_clears_user_paused_and_canary_when_forced():
import atm.main as _main
from atm.commands import Command
canary_state = {"paused": True}
canary = types.SimpleNamespace(
is_paused=True,
resume=lambda: canary_state.__setitem__("paused", False),
)
# Re-bind is_paused via property so resume() effect is visible
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"]
assert force_events and force_events[0]["force"] is True
@pytest.mark.asyncio
async def test_resume_during_drift_keeps_canary_paused_without_force():
"""R2 #21: plain /resume during drift clears user_paused but NOT canary."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
await _main._dispatch_command(ctx, Command(action="resume")) # no force
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is True # still drift-paused
# Message must mention drift
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status and ("drift" in (status[0].title + status[0].body).lower())
# Now force
ctx.notifier.alerts.clear()
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert canary.is_paused is False
@pytest.mark.asyncio
async def test_resume_out_of_window_responds_with_pending_message():
"""/resume while operating-hours window is closed → special body."""
import atm.main as _main
from atm.commands import Command
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed")
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg)
# Pin time to Saturday
import atm.main as _mm
real_time = _mm.time
fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
class _FakeTime:
def time(self): return fake_ts
def monotonic(self): return 0.0
_mm.time = _FakeTime()
try:
await _main._dispatch_command(ctx, Command(action="resume"))
finally:
_mm.time = real_time
assert ctx.lifecycle.user_paused is False
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
combined = (status[0].title + status[0].body).lower()
assert "închis" in combined or "piața" in combined or "ferestr" in combined
@pytest.mark.asyncio
async def test_status_command_reports_pause_reason():
"""/status body must mention pause reason + window state."""
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
ctx.lifecycle.user_paused = True
# Stub detector.rolling for status
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
body = status[0].body
assert "user_paused" in body or "pauzat:user_paused" in body
@pytest.mark.asyncio
async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path):
"""E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert.
This test verifies the full command-driven lifecycle in isolation:
- canary starts drift-paused, _should_skip returns drift_paused
- /resume force clears canary + user_paused
- subsequent detection produces SELL fire through normal FSM path
"""
import atm.main as _main
from atm.commands import Command
# Canary with mutable pause state
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
ctx = _dispatch_ctx(canary=canary, cfg=cfg)
# 1. While drift-paused, _should_skip returns drift_paused
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused"
# 2. User issues /resume force
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert canary.is_paused is False
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None
# 3. Feed a yellow→light_red sequence through _handle_tick (FSM path)
from atm.state_machine import StateMachine, State
fsm = StateMachine(lockout_s=60)
class _N:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _A:
def log(self, _e): pass
notif = _N()
audit = _A()
cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True))
_main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock)
_main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock)
tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock)
# FSM reached fire via normal path
assert tr is not None and tr.trigger == "SELL"
assert fsm.state == State.IDLE

214
tests/test_validate.py Normal file
View File

@@ -0,0 +1,214 @@
"""Tests for atm.validate — offline calibration validation.
Covers the 3 tests from plan section D':
17. test_validate_calibration_pass
18. test_validate_calibration_fail_reports_top_candidates
19. test_validate_calibration_file_not_found
"""
from __future__ import annotations
import json
from pathlib import Path
import numpy as np
import pytest
from atm.config import (
CanaryRegion,
ColorSpec,
Config,
DiscordCfg,
ROI,
TelegramCfg,
YAxisCalib,
)
from atm.detector import DetectionResult
from atm.vision import ColorMatch
def _make_config() -> Config:
"""Minimal Config with a palette large enough to support top-3 candidates."""
colors = {
"turquoise": ColorSpec(rgb=(0, 200, 200), tolerance=30),
"yellow": ColorSpec(rgb=(255, 255, 0), tolerance=30),
"dark_green": ColorSpec(rgb=(0, 100, 0), tolerance=30),
"dark_red": ColorSpec(rgb=(165, 42, 42), tolerance=30),
"light_green": ColorSpec(rgb=(144, 238, 144), tolerance=30),
"light_red": ColorSpec(rgb=(255, 182, 193), tolerance=30),
"gray": ColorSpec(rgb=(128, 128, 128), tolerance=30),
"background": ColorSpec(rgb=(18, 18, 18), tolerance=15),
}
return Config(
window_title="test",
dot_roi=ROI(x=0, y=0, w=100, h=100),
chart_roi=ROI(x=0, y=0, w=100, h=100),
colors=colors,
y_axis=YAxisCalib(p1_y=0, p1_price=100.0, p2_y=100, p2_price=0.0),
canary=CanaryRegion(
roi=ROI(x=0, y=0, w=10, h=10),
baseline_phash="0" * 64,
),
discord=DiscordCfg(webhook_url="http://localhost/fake"),
telegram=TelegramCfg(bot_token="fake_token", chat_id="123"),
debounce_depth=1,
)
def _write_labels(tmp_path: Path, entries: list[dict]) -> Path:
f = tmp_path / "labels.json"
f.write_text(json.dumps(entries), encoding="utf-8")
return f
def _write_blank_png(tmp_path: Path, name: str) -> Path:
"""Write a trivially-valid 10x10 BGR image so cv2.imread returns non-None."""
import cv2
p = tmp_path / name
arr = np.zeros((10, 10, 3), dtype=np.uint8)
cv2.imwrite(str(p), arr)
return p
# ---------------------------------------------------------------------------
# Test 17: PASS path — mocked Detector.step returns expected color
# ---------------------------------------------------------------------------
def test_validate_calibration_pass(monkeypatch, tmp_path):
from atm import validate as validate_mod
img_path = _write_blank_png(tmp_path, "yellow_sample.png")
labels = _write_labels(
tmp_path,
[{"path": str(img_path), "expected": "yellow", "note": "test"}],
)
def fake_step(self, ts, frame=None):
return DetectionResult(
ts=ts,
window_found=True,
dot_found=True,
rgb=(250, 250, 5),
match=ColorMatch(name="yellow", distance=6.0, confidence=0.94),
accepted=True,
color="yellow",
)
monkeypatch.setattr("atm.detector.Detector.step", fake_step)
report = validate_mod.validate_calibration(labels, _make_config())
assert report.total == 1
assert report.passed == 1
assert report.failed == 0
assert report.all_pass is True
rec = report.records[0]
assert rec.passed is True
assert rec.detected == "yellow"
assert rec.expected == "yellow"
assert "[PASS]" in report.render()
# CLI wiring: exit 0
import atm.main as _main
class _Args:
label_file = labels
monkeypatch.setattr("atm.config.Config.load_current", classmethod(lambda cls, d: _make_config()))
with pytest.raises(SystemExit) as exc_info:
_main._cmd_validate_calibration(_Args())
assert exc_info.value.code == 0
# ---------------------------------------------------------------------------
# Test 18: FAIL path — Detector returns wrong color; report lists top 3
# candidates and a SUGGESTIONS line with RGB distance.
# ---------------------------------------------------------------------------
def test_validate_calibration_fail_reports_top_candidates(monkeypatch, tmp_path):
from atm import validate as validate_mod
img_path = _write_blank_png(tmp_path, "dark_red_sample.png")
labels = _write_labels(
tmp_path,
[{"path": str(img_path), "expected": "dark_red", "note": "missed dark_red"}],
)
# Observed RGB closer to gray than dark_red (like the real 2026-04-17 miss).
def fake_step(self, ts, frame=None):
return DetectionResult(
ts=ts,
window_found=True,
dot_found=True,
rgb=(135, 62, 67),
match=ColorMatch(name="gray", distance=45.0, confidence=0.12),
accepted=True,
color="gray",
)
monkeypatch.setattr("atm.detector.Detector.step", fake_step)
report = validate_mod.validate_calibration(labels, _make_config())
assert report.total == 1
assert report.failed == 1
assert report.all_pass is False
rec = report.records[0]
assert rec.passed is False
assert rec.detected == "gray"
assert rec.expected == "dark_red"
# Top 3 candidates populated (name, score) sorted by RGB distance.
assert len(rec.top3) == 3
names = [n for n, _ in rec.top3]
# dark_red should appear in top candidates since observed RGB(135,62,67)
# is reasonably close to dark_red(165,42,42).
assert "dark_red" in names
rendered = report.render()
assert "[FAIL]" in rendered
assert "Top 3 candidates:" in rendered
assert "SUGGESTIONS:" in rendered
# The suggestion must mention the expected color's RGB and the measured distance.
assert "dark_red" in rendered
assert "(165, 42, 42)" in rendered
# CLI wiring: exit 1
import atm.main as _main
class _Args:
label_file = labels
monkeypatch.setattr("atm.config.Config.load_current", classmethod(lambda cls, d: _make_config()))
with pytest.raises(SystemExit) as exc_info:
_main._cmd_validate_calibration(_Args())
assert exc_info.value.code == 1
# ---------------------------------------------------------------------------
# Test 19: missing label file — clean error, non-zero exit, no stack trace
# ---------------------------------------------------------------------------
def test_validate_calibration_file_not_found(monkeypatch, tmp_path, capsys):
from atm import validate as validate_mod
missing = tmp_path / "nope.json"
# Library-level: raises ValidationError (not bare FileNotFoundError).
with pytest.raises(validate_mod.ValidationError) as exc_info:
validate_mod.validate_calibration(missing, _make_config())
assert "not found" in str(exc_info.value).lower()
# CLI-level: graceful sys.exit with non-zero code, message on stderr.
import atm.main as _main
class _Args:
label_file = missing
monkeypatch.setattr("atm.config.Config.load_current", classmethod(lambda cls, d: _make_config()))
with pytest.raises(SystemExit) as exc_info:
_main._cmd_validate_calibration(_Args())
assert exc_info.value.code != 0
err = capsys.readouterr().err
assert "not found" in err.lower()
# Ensure no python traceback leaked through.
assert "Traceback" not in err