initial: scaffold atm trading monitor (Faza 1)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-04-15 22:03:36 +00:00
commit 9207197a56
21 changed files with 2139 additions and 0 deletions

64
.gitignore vendored Normal file
View File

@@ -0,0 +1,64 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
*.egg
MANIFEST
.pytest_cache/
.coverage
.coverage.*
htmlcov/
.tox/
.nox/
.hypothesis/
.mypy_cache/
.ruff_cache/
.pyright/
# Virtualenv
.venv/
venv/
env/
ENV/
# IDE
.vscode/
.idea/
*.swp
*.swo
.DS_Store
# ATM runtime artefacts
logs/*.jsonl
logs/dead_letter.jsonl
samples/*.png
samples/*.jpg
samples/labels.json
trades.jsonl
# configs: keep template + current marker, not generated calibration
configs/*.toml
!configs/example.toml
# Secrets
config.toml
.env
*.secret
# Images in root (e.g. reference screenshots)
/image.png

5
conftest.py Normal file
View File

@@ -0,0 +1,5 @@
"""Root conftest — add src/ to sys.path so 'import atm' works without install."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "src"))

View File

@@ -0,0 +1,149 @@
# Design: ATM — Automated Trading Monitor (M2D Strategy)
Generated by /office-hours on 2026-04-15
Branch: master
Repo: /workspace/atm (greenfield)
Status: APPROVED
Mode: Builder (personal live-trading tool, high-stakes)
## Problem Statement
User trades the M2D strategy on DIA (TradeStation chart with custom indicator) with execution on TradeLocker US30 CFD (prop firm account). Same strategy also applies to GLD → XAUUSD. Bridging signal source (TradeStation Windows app) with execution (TradeLocker web) currently requires user to watch both screens for 4 hours per evening. Goal: bot detects the trigger signal automatically and notifies user via Telegram/Discord with chart screenshot + SL/TP levels so user can execute the trade in TradeLocker.
## Strategy M2D — Full Spec
**Setup:** TradeStation, 3-minute chart, DIA (or GLD) symbol, custom indicator "M2D MAPS" that renders a horizontal strip of colored dots below the price panel. Dots are indexed by time, y-position is fixed.
### BUY sequence (sequential in time, rightmost N dots):
1. **Turquoise dot** — 15-minute buy trigger
2. **Dark green dot** — 3-minute sell
3. **Light green dot** — 3-minute buy → **TRIGGER**
At trigger:
- Execute BUY on TradeLocker, instrument US30 CFD
- Stop Loss 0.6%
- Volume 0.1 lots maximum
- TP1, TP2, SL are drawn automatically as horizontal lines on the TradeStation chart after entry
- User manual lifecycle: at TP1 close half, move SL to ~breakeven; at TP2 close remaining half
### SELL sequence (mirror):
1. **Yellow dot** — 15-minute sell (red 15min candle)
2. **Dark red dot** — 3-minute buy
3. **Light red dot** — 3-minute sell → **TRIGGER**
Same size (0.1 lots), same SL %, same TP management.
### Instrument mapping (intentional asymmetry):
- DIA chart (TradeStation) ↔ US30 CFD (TradeLocker)
- GLD chart (TradeStation) ↔ XAUUSD CFD (TradeLocker)
### Trading window:
- NY open first 2 hours + NY close last 2 hours
- RO summer time: 16:30-18:30 and 21:00-23:00
- Typical frequency: 1 trade per evening
## Constraints
- **Prop firm account on TradeLocker.** Faza 2 (auto-execution) requires reading prop TOS first — many prop firms prohibit automation or detect robotic timing patterns.
- No API on TradeLocker. No signal export on TradeStation for compiled custom indicator.
- Bot runs on the same Windows machine as TradeStation. Cross-machine (RDP/VNC) screenshot adds latency and fragility.
## Premises (agreed)
1. Screenshot + visual detection is the only viable bridge.
2. Notification-first (Faza 1) is the right sequencing. Zero-click MVP removes all financial bug risk.
3. M2D MAPS dot strip has stable y-position on fixed TradeStation layout → ROI color sampling is the right detection method.
4. DIA→US30 price divergence is acceptable risk (user's judgment, has been trading this pairing live).
5. Bot runs on the same Windows machine as TradeStation.
## Recommended Approach — B: Structured Service with Dry-Run and Audit Log
Python package on Windows, structured for clean extension to Faza 2.
### Components:
- **Detector core:** `mss` screenshot of TradeStation window (located by title via `pygetwindow`) → crop M2D MAPS ROI → scan rightmost N dot positions → classify each by closest-color match with tolerance → feed into state machine that tracks 3-dot sequences (turquoise→dark-green→light-green = BUY trigger; yellow→dark-red→light-red = SELL trigger).
- **Level extractor:** after trigger, scan chart region for horizontal colored lines (SL/TP1/TP2). Convert pixel y to price via calibration of y-axis scale.
- **Calibration tool (Tkinter):** interactive — user clicks on each dot color sample, captures RGB + tolerance, clicks on ROI corners, captures y-axis price references. Writes to `config.toml`.
- **Dry-run mode:** runs detector against a folder of saved screenshots (recorded during normal operation). Shows what notification WOULD have been sent for each. Used to validate new color thresholds or strategy tweaks without live risk.
- **Notifier abstraction:** interface with Discord webhook and Telegram bot implementations. Sends: annotated screenshot + decoded SL/TP1/TP2 prices + signal type (BUY/SELL) + timestamp.
- **Audit log (JSONL):** every detection cycle — timestamp, detected dots, classification, decision, notification sent y/n. Replayable, debuggable.
- **Scheduler:** Windows Task Scheduler entry, auto-start/stop at 16:30 / 18:30 / 21:00 / 23:00 local time (summer/winter offset aware).
### Structure:
```
atm/
├── pyproject.toml
├── config.toml # populated by calibration tool
├── src/atm/
│ ├── detector.py # screenshot + color classification + state machine
│ ├── levels.py # SL/TP1/TP2 pixel-to-price extraction
│ ├── notifier/
│ │ ├── __init__.py # abstract Notifier
│ │ ├── discord.py
│ │ └── telegram.py
│ ├── audit.py # JSONL logger
│ ├── calibrate.py # Tkinter UI
│ ├── dryrun.py # replay on saved screenshots
│ └── main.py # orchestration + scheduler hooks
├── samples/ # saved screenshots for dry-run corpus
└── logs/ # JSONL audit
```
### Detection algorithm (core loop):
1. Every 1 second during trading window:
- Locate TradeStation window
- If not foreground or minimized, log + skip
- Screenshot M2D MAPS ROI (fixed offsets from window bounds)
- For rightmost N=5 dot positions, sample center pixel, classify to nearest labeled color within tolerance
- Update rolling window of last 10 dots with their timestamps
- Evaluate state machine: did the last 3 classified dots (within a bounded time window) complete a BUY or SELL sequence?
- If trigger fired AND not already fired for this bar: extract SL/TP1/TP2 levels, send notification, log, mark fired.
### Anti-duplicate logic:
- Each trigger dot is keyed by (x-pixel position at capture, color). Once fired, stored in "recently fired" set with 10-minute TTL. Prevents re-fire if same dot persists across cycles.
### Sanity guards:
- If classification confidence (color distance) low for 3+ cycles in a row → push "bot lost sight" alert to user. Layout may have changed.
- If TradeStation window not found for 60 seconds → push "bot cannot find chart" alert.
## Open Questions (non-blocking)
- Exact color tolerance values — determined during calibration session, not a design question.
- GLD/XAUUSD: same M2D indicator on GLD chart? Assume yes, confirm during calibration.
- Multi-symbol monitoring — single window switched manually, or two TradeStation windows side by side? Defer; v1 = single chart at a time, user switches manually.
## Success Criteria (Faza 1)
- Over 20 live trading sessions, bot detects ≥95% of signals user also spotted manually.
- Zero false-positive notifications during the bot's first 5 sessions (tune tolerances aggressively).
- Notification delivered within 3 seconds of trigger dot appearing.
- Audit log lets user reproduce "why was no notification sent" for any missed signal.
## Distribution Plan
Personal tool, single user. No distribution channel needed — runs locally on user's Windows box. Git repo at `/workspace/atm`. `pyproject.toml` + `pip install -e .` for local dev. No CI/CD; user's own `scheduled task` starts/stops it.
## Risk Flag — Faza 2 (deferred)
Before extending to auto-execution in TradeLocker:
1. Read prop firm TOS (search for "EA", "automation", "bot", "copy trading", "external signal"). If prohibited, **Faza 2 is off the table** — tool stays notification-only.
2. If permitted, implement via Playwright browser automation against TradeLocker web UI.
3. Add human-like click timing randomization (100-400ms jitter) to avoid robotic detection.
4. Dry-run mode then becomes: "click coordinates resolved, action NOT sent" — user reviews the intended click before enabling live.
## Next Steps (concrete)
1. Init `/workspace/atm` as Python project. `pyproject.toml`, basic structure.
2. Build calibration tool first. Without calibrated config, nothing works.
3. Record 20-30 sample screenshots across several trading sessions (can start this today — doesn't need any code yet; just `mss` screenshot on a 5-second timer dumping to disk).
4. Build detector + state machine. Validate against recorded screenshots in dry-run mode.
5. Wire Discord webhook first (simpler than Telegram bot). Test end-to-end on live session.
6. Add audit log.
7. Schedule Windows task for trading hours.
## What I noticed about how you think
- You explicitly asked for dry-run before writing a line of code. "Să verific dacă vrea să apese corect, fără să apese efectiv." That's not a common instinct for someone building their own tool; it's the instinct of someone who has already had something break expensively.
- You phased the project yourself — "faza 2 după ce mă conving că merge." That's the right ordering and you arrived at it unprompted.
- When I challenged the API premise, you answered with specifics: the indicator is custom, the account doesn't support API. You knew the constraint, not guessed it.
- You flagged the prop account almost casually at the end. A lot of builders would have skipped that detail. It turned out to be the most important constraint in the entire design.

View File

@@ -0,0 +1,43 @@
# Plan: ATM Eng Review — Findings Applied
## Context
User ran `/plan-eng-review` on `partitioned-honking-unicorn.md` (ATM trading monitor, Faza 1). Eng review complete. All 4 decisions resolved, obvious fixes applied, plan file updated in place.
## Where the changes live
The reviewed plan (with all eng-review edits) is at:
**`/home/claude/.claude/plans/partitioned-honking-unicorn.md`**
Test plan artifact at:
**`~/.gstack/projects/romfast-workspace/claude-master-eng-review-test-plan-20260415-212932.md`**
## What changed in the reviewed plan
### 4 decisions (AskUserQuestion)
1. **Bar flicker** → debounce depth=1 (configurable); screenshot in alert = visual check.
2. **Phase A entry price** → dropped; Phase A is direction + screenshot only; user puts manual 0.6% SL in TradeLocker; Phase B sends real levels from chart.
3. **Notifier blocking** → fire-and-forget worker threads per backend, bounded queue, retry + dead-letter.
4. **Alert SPoF** → Discord + Telegram parallel from day 1.
### Obvious fixes (stated, applied)
- Exhaustive state transition table (default-noise rule, SELL mirror explicit, phase-skip handling).
- Python 3.11+ pin → drop `tomli`, use stdlib `tomllib`.
- Windows symlink replaced by `configs/current.txt` marker file.
- New `vision.py` shared module (ROI/hash/interp/Hough).
- `@dataclass Config` with load-time validation.
- DPI check added to calibrate + README note.
### Test coverage
Expanded from state-machine-only to: every module + 1 E2E replay harness. Acceptance gate unchanged (precision=100%, recall≥95% on labeled corpus).
## Verification (post-implementation)
Run the full verification checklist from `partitioned-honking-unicorn.md` (sections 1-9). Specifically:
- `pytest tests/` — all new unit tests + E2E replay pass.
- `atm dryrun ./samples` hits acceptance gate.
- Live 2-session test: both Discord and Telegram fire; kill one mid-session and confirm the other still delivers + dead-letter file gets the failed alert.
## Status
**CEO + ENG CLEARED.** No further reviews required before implementation. Design + DX reviews properly skipped (no UI scope; personal single-user tool). Run `/ship` after implementation.

BIN
docs/image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 160 KiB

View File

@@ -0,0 +1,258 @@
# Plan: ATM — Automated Trading Monitor (M2D, Faza 1) — ENG-REVIEWED
**Source plan:** `/home/claude/.claude/plans/swirling-drifting-starfish.md`
**CEO plan artifact:** `~/.gstack/projects/romfast-workspace/ceo-plans/2026-04-15-atm-trading.md`
**Eng review mode:** FULL_REVIEW (4 decisions made, 0 unresolved)
**Design doc:** `~/.gstack/projects/romfast-workspace/claude-master-design-20260415-atm-trading.md` (APPROVED)
**Eng test plan:** `~/.gstack/projects/romfast-workspace/claude-master-eng-review-test-plan-20260415-212932.md`
---
## Context
User trades M2D strategy manually on DIA (TradeStation) with execution on TradeLocker US30 CFD (prop firm). Same strategy on GLD → XAUUSD. 4h/evening dual-screen monitoring. Faza 1 goal: bot auto-detects M2D trigger, sends Discord/Telegram notification with screenshot + SL/TP1/TP2 levels; user executes manually in TradeLocker. Faza 2 (auto-execution) deferred until prop firm TOS verified and Faza 1 proven over 20+ sessions.
**Review changed two things from the original plan:**
1. **State machine spec corrected.** Original "last 3 consecutive non-gray dots" is wrong. Actual M2D is phased: Phase 1 arming (turquoise → gray/dark-green) → Phase 2 trigger (light-green).
2. **Levels extraction corrected.** Original plan had levels.py extracting SL/TP at trigger. But those lines only appear on TradeStation chart *after* user enters trade in TradeLocker. Corrected to two-phase: spec-math at trigger, chart-scan after entry.
Plus 5 accepted expansions (labeled corpus, level fallback, layout canary, trade journal, TOS checklist).
---
## Approach: B (Structured Python service, dry-run, audit log) + CEO-reviewed additions
Runs on Windows machine alongside TradeStation. `mss` screenshots → ROI color-sample on M2D MAPS strip → phased state machine → Discord webhook + Telegram bot → JSONL audit + trade journal → dry-run replay against labeled corpus.
---
## State Machine Spec (corrected + exhaustive)
States:
- `IDLE`
- `ARMED_BUY` — turquoise seen
- `PRIMED_BUY` — turquoise + at least one dark-green seen
- `ARMED_SELL` — yellow seen
- `PRIMED_SELL` — yellow + at least one dark-red seen
**Default rule:** any (state, event) pair not listed below → stay in current state, no action, log as `noise`.
Transitions — BUY side:
| From | Event | To | Action |
|------|-------|-----|--------|
| IDLE | turquoise | ARMED_BUY | log arm_ts |
| IDLE | yellow | ARMED_SELL | log arm_ts (sell) |
| IDLE | dark-green / dark-red / light-green / light-red / gray | IDLE | noise (log phase-skip if light-green/light-red) |
| ARMED_BUY | gray | ARMED_BUY | persist |
| ARMED_BUY | turquoise | ARMED_BUY | refresh arm_ts |
| ARMED_BUY | dark-green | PRIMED_BUY | log prime_ts |
| ARMED_BUY | yellow | ARMED_SELL | opposite rearm |
| ARMED_BUY | dark-red | ARMED_BUY | ignore (minority noise) |
| ARMED_BUY | light-green | IDLE | **skip detected** — no FIRE, log phase_skip |
| ARMED_BUY | light-red | IDLE | skip detected, log |
| PRIMED_BUY | dark-green | PRIMED_BUY | accumulate |
| PRIMED_BUY | dark-red | PRIMED_BUY | ignore (minority noise) |
| PRIMED_BUY | **light-green** | IDLE | **FIRE BUY**, lockout(BUY)=4min |
| PRIMED_BUY | light-red | IDLE | skip detected (wrong trigger) |
| PRIMED_BUY | gray | IDLE | **COOLED** — signal dead, log |
| PRIMED_BUY | turquoise | ARMED_BUY | rearm fresh |
| PRIMED_BUY | yellow | ARMED_SELL | opposite rearm |
SELL side mirrors exactly: swap turquoise↔yellow, dark-green↔dark-red, light-green↔light-red, BUY↔SELL.
Notes:
- No time-based TTL on ARMED/PRIMED. State persists until trigger fires, cooled by gray after PRIMED, opposite-color rearm, or process restart (Windows Task Scheduler stops bot at session end → natural session-boundary reset).
- Cooling rule: "gray after dark-green" = signal racit (user's term). Gray during ARMED_BUY (before any dark-green) is OK.
- After FIRE: 4-minute lockout per-direction. BUY lockout doesn't block SELL and vice versa. Single timestamp per direction.
- Opposite-color-Phase-1 triggers rearm to opposite side (captures direction flip).
- Phase-skip (arming color → trigger color with no phase-2 step) → IDLE, no FIRE, logged. Would be legitimate only if indicator collapses phases, which it doesn't per observed behavior.
---
## Detection Details
- **Loop interval:** 5 seconds (36 cycles per 3-min bar; stays well inside notification-latency target).
- **Rightmost-dot detection:** scan ROI from right edge leftward, find first non-background pixel cluster → that's the rightmost dot. Don't hardcode x-pixel positions (chart scrolls; hardcoded positions drift).
- **Debounce:** configurable `debounce_depth` in config.toml (default `1` — single-read acceptance). Increase if future sessions show mid-bar color flicker. Screenshot-in-notification is the user's visual verification on top.
- **Rolling window:** keep last 20 classified dots with their detection timestamps. State machine consumes the newest *accepted* (post-debounce) dot per cycle.
- **Classification:** nearest-color match in RGB Euclidean distance, per-color tolerance from calibration. Report confidence = `1 - distance_nearest / distance_second_nearest`. Log confidence every cycle. If all distances > tolerance → `UNKNOWN`, state unchanged.
---
## Levels Extraction (two-phase, simplified)
**Phase A — at trigger (immediate alert to Discord + Telegram):**
- No entry-price compute. No spec-math SL/TP. User places a manual 0.6% SL in TradeLocker at entry; actual TP1/TP2/SL come in Phase B from the chart.
- Notification: `🟢 BUY signal DIA→US30 | 22:47:03` + annotated screenshot (detected dot highlighted).
**Phase B — after user trades (chart-scan confirmation):**
- After Phase A fires, detector keeps watching the chart ROI for horizontal colored lines (red=SL, green=TP1/TP2).
- When lines appear (user has entered trade in TradeLocker and TradeStation drew them) → scan y-pixels via Hough + color mask, convert via y-axis calibration → send second alert to both channels: `✅ Levels: SL=484.35 | TP1=485.20 | TP2=485.88`.
- If chart-line scan times out (no lines in 10 min) → silent (user didn't trade).
- If only 2 lines detected (user didn't set TP2 or line not rendered yet) → partial-result alert.
- Phase B overlap with next signal: guarded by per-direction lockout + Phase-B completion flag; a new FIRE cannot issue until prior Phase B closes (timeout or success).
---
## Dedup / Lockout
- Time-based lockout: after any FIRE, block re-fire for 4 minutes (one 3-min bar + 1 min safety).
- Tracked per-direction: BUY lockout doesn't block SELL.
- Stored as single timestamp per direction (not pixel-keyed).
---
## Observability
- **Heartbeat:** every 30 min to a separate Discord thread (not main alerts channel): `🟢 22:00 alive | 0 triggers | confidence avg 0.85 | chart OK`. Silence >35 min = watchdog concern (user notices).
- **Layout canary:** every 60 cycles (5 min), hash a stable reference region (axis labels, chart border). Stored baseline in config. On significant divergence (>threshold) → `⚠️ Layout changed — auto-paused, recalibrate` to alerts channel. Bot pauses detection until operator acknowledges (touch a pause-file or restart).
- **Low-confidence alert:** 3+ consecutive cycles with confidence below threshold → `⚠️ Bot lost sight` (already in original plan).
- **Window-lost alert:** TradeStation window not found for 60s → `⚠️ Cannot find chart`.
- **Audit JSONL:** per-cycle, daily rotation (`logs/YYYY-MM-DD.jsonl`), fields: `{ts, window_found, roi_ok, rightmost_dot_color, confidence, state, transition, trigger, notified, reason}`.
---
## Files to Create
- `/workspace/atm/pyproject.toml` — Python 3.11+ required. Deps: `mss`, `opencv-python`, `numpy`, `requests`, `pygetwindow`, `pywin32` (DPI + window capture), `rich` (CLI), `pillow` (screenshot annotation). **No `tomli` — use stdlib `tomllib`.**
- `/workspace/atm/config.toml` — populated by calibration tool (ROI coords, per-color RGB + tolerance, `debounce_depth`, y-axis scale, canary-region baseline hash, Discord webhook URL, Telegram bot token + chat_id)
- `/workspace/atm/src/atm/config.py`**[ENG-REVIEW]** `@dataclass Config` with `Config.load(path)` that validates on load (RGB tuples, positive tolerances, both notifier credentials present, y-axis 2-point pair). Fail fast at startup.
- `/workspace/atm/src/atm/vision.py`**[ENG-REVIEW]** shared primitives: ROI crop, perceptual hash, pixel-to-price linear interp, Hough line detection with color mask. Used by detector/canary/levels to avoid drift.
- `/workspace/atm/src/atm/detector.py` — screenshot loop, rightmost-dot scan, color classification, rolling window, debounce
- `/workspace/atm/src/atm/state_machine.py` — explicit phased state machine (spec above), exhaustive transition table
- `/workspace/atm/src/atm/levels.py` — Phase B chart-scan only (Phase A entry-price compute removed after ENG-REVIEW)
- `/workspace/atm/src/atm/canary.py` — layout fingerprint hash + drift check + auto-pause
- `/workspace/atm/src/atm/notifier/__init__.py` — abstract `Notifier` protocol: `send_alert()`, `send_heartbeat()`, `send_levels_confirm()`
- `/workspace/atm/src/atm/notifier/fanout.py`**[ENG-REVIEW]** `FanoutNotifier` wraps N backends, each with its own worker thread + bounded queue (size 50, drop-oldest on overflow) + retry with exponential backoff + dead-letter file on total failure. Main loop never blocks.
- `/workspace/atm/src/atm/notifier/discord.py` — webhook POST, annotated screenshot upload (multipart)
- `/workspace/atm/src/atm/notifier/telegram.py`**[ENG-REVIEW]** built in parallel with Discord (no longer deferred); bot API, photo upload
- `/workspace/atm/src/atm/audit.py` — JSONL logger with daily local-midnight rotation, line-buffered write for crash safety
- `/workspace/atm/src/atm/calibrate.py` — Tkinter: window pick → DPI check → ROI corners → per-color sample → y-axis scale → canary region → save versioned config
- `/workspace/atm/src/atm/labeler.py`**[EXPANSION]** Tkinter label UI → `labels.json`
- `/workspace/atm/src/atm/dryrun.py` — replay with precision/recall/confusion matrix when labels present
- `/workspace/atm/src/atm/journal.py`**[EXPANSION]** `atm journal` CLI → `trades.jsonl`
- `/workspace/atm/src/atm/report.py`**[EXPANSION]** weekly aggregation
- `/workspace/atm/src/atm/main.py` — CLI: `atm calibrate`, `atm label <dir>`, `atm dryrun <dir>`, `atm run [--duration Xh]`, `atm journal`, `atm report [--week YYYY-WW]`
- `/workspace/atm/tests/`**[ENG-REVIEW]** unit + E2E per test plan at `~/.gstack/projects/romfast-workspace/claude-master-eng-review-test-plan-20260415-212932.md`
- `/workspace/atm/samples/`, `/workspace/atm/logs/`
- `/workspace/atm/configs/` — versioned config archive. **[ENG-REVIEW]** No symlink (Windows admin-required); use `configs/current.txt` marker file storing the active filename. `Config.load()` reads the marker.
- `/workspace/atm/docs/phase2-prop-firm-audit.md` — structured TOS checklist
- `/workspace/atm/README.md` — setup, calibration workflow, per-session operating checklist, DPI/multi-monitor notes
---
## Build Order
1. **`pyproject.toml` + package scaffold** — Python 3.11+, `pip install -e .`, `atm --help` works.
2. **Standalone screenshot-dump script**`mss` timer dumps to `samples/` every 5s during trading sessions. Build corpus in parallel.
3. **`config.py` + `vision.py`** — Config dataclass with validation; shared vision primitives. Ship with unit tests for config load + pixel-to-price interp.
4. **`calibrate.py`** — versioned config in `configs/YYYY-MM-DD-HHMM.toml`; `configs/current.txt` marker file points at active. DPI check + canary region capture.
5. **`labeler.py`** — once ~30 samples exist, tag them. `labels.json` is ground truth.
6. **`state_machine.py`** + **unit tests** (clean BUY, clean SELL, cooling, opposite-rearm, lockout per-direction, noise, phase-skip, all state×color pairs via parameterized test).
7. **`detector.py`** + **unit tests** (empty/background ROI, rightmost-cluster, rolling window FIFO, debounce depth=1, classification edges including UNKNOWN).
8. **`canary.py`** + **unit tests** (drift threshold, pause-file gating).
9. **`levels.py`** (Phase B only) + **unit tests** (Hough line detection with color mask, 2 vs 3 lines, 10-min timeout, pixel-to-price roundtrip).
10. **`notifier/fanout.py` + `discord.py` + `telegram.py`** + **unit tests** (queue overflow drop-oldest, 429 backoff, dead-letter on total failure, fanout: one backend down still delivers). Both channels built in parallel — fire together from day 1.
11. **`audit.py`** + **unit tests** (daily rotation at local midnight, line-buffered flush crash safety).
12. **`dryrun.py`** — replay on `samples/` against `labels.json`. **Acceptance gate before live: precision = 100%, recall ≥ 95%.**
13. **E2E replay test** — feed `samples/` through detector → state_machine → notifier-mock → in-memory audit; assert labels match FIREs.
14. **`journal.py`**, **`report.py`**, **`main.py`** (unified CLI).
15. **Windows Task Scheduler setup** — 16:30→18:30, 21:00→23:00. `atm run --duration 2h`. Manual DST check twice yearly.
16. **`docs/phase2-prop-firm-audit.md`** — TOS checklist template.
---
## Existing Utilities to Reuse
Greenfield Python project. No internal utilities. External libs: `mss` (screenshot), `pygetwindow` (window locate), `opencv-python` (line detection in Phase B), `numpy` (color math), `requests` (Discord webhook), `tomli` (config parsing), `pillow` (annotated screenshots).
---
## Verification
End-to-end, in build order:
1. **State machine unit tests:** `pytest tests/test_state_machine.py` — all scenarios (clean BUY, clean SELL, cooling, rearm, lockout, noise) pass.
2. **Calibration:** `atm calibrate` → step through → `config.toml` populated with plausible RGBs for described colors + y-axis scale sane + canary region picked.
3. **Labeled corpus:** ≥30 screenshots in `samples/`, `atm label ./samples` tags each.
4. **Dry-run with metrics:** `atm dryrun ./samples` → precision + recall + confusion matrix printed. **Acceptance gate:** precision = 100%, recall ≥ 95%. If not met → tune tolerances, re-run.
5. **Live test notification-only (2 sessions):** `atm run`. Verify:
- Discord + Telegram notifications within 5s of trigger, both channels receive.
- Phase A message: direction + timestamp + annotated screenshot.
- Phase B levels-alert fires once TradeStation draws SL/TP lines; correct SL/TP1/TP2 prices.
- Heartbeat messages every 30 min in thread.
- Audit JSONL complete, state transitions visible.
- Kill one notifier (e.g. wrong token) → other still delivers, dead-letter file for failed one.
6. **Canary test:** manually move TradeStation window during session → layout-changed alert within 5 min. Move back → restart bot → resumes.
7. **Scheduler test:** Windows Task Scheduler starts bot at 16:30, stops at 18:30 cleanly, log rotates at midnight.
8. **Journal test:** after real trade, `atm journal` → prompt flow complete → `trades.jsonl` entry present.
9. **Report test:** after 1 week of live use, `atm report --week 2026-16` → precision per color, slippage distribution, P&L summary.
---
## Risk Register
- **Prop firm TOS (Faza 2 blocker):** read TOS using `docs/phase2-prop-firm-audit.md` checklist before any auto-execution work. If EA/automation prohibited → Faza 2 dead, stay on Faza 1 permanently.
- **TradeStation layout change:** canary catches it within 5 min → auto-pause. Recalibrate. Losing a session to a layout change is acceptable cost.
- **Calibration drift over time:** versioned configs in `configs/` let you roll back to last-known-good if new calibration misfires.
- **DIA↔US30 price divergence:** accepted (user's judgment). Phase 1 journal captures slippage per signal, feeding Faza 2 go/no-go.
- **Screen sharing / RDP during trading:** overlay can break classification. Low prob, documented in README as operator hygiene.
- **Windows Task Scheduler DST transitions:** twice per year, schedule may misfire. Manual check first week of each DST change.
---
## Out of Scope (Faza 1)
- Any automated click in TradeLocker (Faza 2 work)
- Multi-symbol concurrent monitoring (single chart at a time; user switches manually between DIA and GLD)
- Backtesting on historical data (strategy already manually validated)
- Web UI / dashboard (headless + Discord/Telegram only)
- Ack feedback loop (react-on-notification labeling) — deferred to TODOS.md as `P2-ack-loop`: shipping baseline first, adding feedback once detection quality verified
- Telegram notifier — built only after Discord is stable 5+ sessions
---
## Accepted Expansions (CEO review, SELECTIVE mode)
1.**Labeled sample corpus + dry-run metrics**`labeler.py`, `labels.json`, automated precision/recall in dryrun. Makes acceptance criteria ("false-positives = 0, false-negatives ≤ 5%") machine-checkable.
2.**Level-extractor fallback (spec-math)** — Phase A always uses spec-math; Phase B validates against chart. Redundancy on fragile piece.
3.**Layout canary + auto-pause**`canary.py` hashes stable UI region, auto-pauses on drift. Catches silent classification-with-wrong-positions failure mode.
4.**Trade journal CLI**`atm journal` + `trades.jsonl` + weekly report. Data for Faza 2 go/no-go decision.
5.**Prop-firm TOS audit checklist**`docs/phase2-prop-firm-audit.md`. Structured Faza 2 evaluation framework shipped now.
## Deferred to TODOS.md
- **Ack feedback loop** — Discord reaction emojis feeding precision tuning. High value, operationally heavier (bot vs webhook). Add after Faza 1 baseline stable.
---
## GSTACK REVIEW REPORT
| Review | Trigger | Why | Runs | Status | Findings |
|--------|---------|-----|------|--------|----------|
| CEO Review | `/plan-ceo-review` | Scope & strategy | 1 | CLEAR (SELECTIVE EXPANSION) | 6 proposals, 5 accepted, 1 deferred; 2 arch corrections |
| Codex Review | `/codex review` | Independent 2nd opinion | 0 | — | — |
| Eng Review | `/plan-eng-review` | Architecture & tests (required) | 1 | CLEAR (FULL_REVIEW) | 9 issues found, 0 critical gaps; 4 decisions made, 0 unresolved |
| Design Review | `/plan-design-review` | UI/UX gaps | 0 | — | SKIPPED (no UI scope — CLI + Discord/Telegram) |
| DX Review | `/plan-devex-review` | Developer experience gaps | 0 | — | SKIPPED (personal tool, single user) |
**UNRESOLVED:** 0
**ENG REVIEW DECISIONS:**
1. **Bar flicker** → debounce depth=1 (configurable), rely on screenshot-in-notification for visual verification.
2. **Phase A entry price** → dropped. User places manual 0.6% SL in TradeLocker at entry. Phase A = direction + screenshot only. Phase B = real SL/TP1/TP2 from chart.
3. **Notifier blocking** → fire-and-forget worker threads per backend, bounded queue (size 50, drop-oldest), retry w/ backoff, dead-letter on total failure.
4. **Alert SPoF** → Discord + Telegram built in parallel from day 1, both fire together.
**ENG REVIEW OBVIOUS FIXES (stated, no decision):**
- Exhaustive state transition table (all state×color pairs, default-noise rule, SELL mirror explicit).
- Python 3.11+ pin, drop `tomli` dep, use stdlib `tomllib`.
- Windows symlink → `configs/current.txt` marker file.
- Shared `vision.py` module (ROI, hash, interp, Hough).
- `@dataclass Config` with fail-fast load-time validation.
- DPI check + multi-monitor note in calibrate + README.
**ENG REVIEW TEST SCOPE (accepted: FULL):** unit tests for every module (state_machine, detector, levels Phase B, canary, audit, notifier fanout/retry, calibrate roundtrip, config validate) + 1 E2E replay harness asserting labeled-corpus precision/recall. Test plan artifact: `~/.gstack/projects/romfast-workspace/claude-master-eng-review-test-plan-20260415-212932.md`.
**VERDICT:** CEO + ENG CLEARED — ready to implement. Run `/ship` after implementation. No further reviews required before build.

View File

@@ -0,0 +1,74 @@
# Plan: ATM — Automated Trading Monitor (M2D, Faza 1)
## Context
User tranzacționează manual strategia M2D pe DIA (TradeStation) cu execuție pe TradeLocker US30 CFD (cont prop firm). Aceeași strategie merge și pe GLD → XAUUSD. 4 ore/seară trebuie să urmărească 2 ecrane. Obiectiv Faza 1: bot detectează automat trigger-ul și trimite notificare Telegram/Discord cu screenshot + nivele SL/TP1/TP2, user execută manual în TradeLocker. Faza 2 (auto-execution) deferată până prop firm TOS verificat + Faza 1 dovedită.
Design doc complet salvat la `~/.gstack/projects/romfast-workspace/claude-master-design-20260415-atm-trading.md` (include strategia M2D cu toate detaliile).
## Approach: B — Structured Python service + dry-run + audit log
Rulează pe aceeași mașină Windows cu TradeStation. ROI color sampling pe strip-ul M2D MAPS, state machine pentru secvența 3-dot, notifier abstraction (Discord/Telegram), calibration Tkinter, dry-run pe screenshot-uri salvate.
## Files to Create
- `/workspace/atm/pyproject.toml` — packaging, deps: `mss`, `opencv-python`, `numpy`, `requests`, `pygetwindow`, `tomli`
- `/workspace/atm/config.toml` — populat de calibration tool (ROI coords, culori referință + toleranțe, y-axis scale)
- `/workspace/atm/src/atm/detector.py` — screenshot loop + color classification + state machine 3-dot
- `/workspace/atm/src/atm/levels.py` — extragere SL/TP1/TP2 din liniile orizontale (pixel y → preț)
- `/workspace/atm/src/atm/notifier/__init__.py` — interface `Notifier.send(signal, screenshot, levels)`
- `/workspace/atm/src/atm/notifier/discord.py` — webhook POST
- `/workspace/atm/src/atm/notifier/telegram.py` — bot API
- `/workspace/atm/src/atm/audit.py` — JSONL logger, fiecare ciclu
- `/workspace/atm/src/atm/calibrate.py` — Tkinter UI: click pe dot → capture RGB + tolerance; click pe colț ROI → salvează; click pe 2 puncte pe axa Y cu prețurile → calibrare scale
- `/workspace/atm/src/atm/dryrun.py` — replay detector pe folder de screenshot-uri
- `/workspace/atm/src/atm/main.py` — orchestration, CLI (`atm run`, `atm calibrate`, `atm dryrun <dir>`)
- `/workspace/atm/samples/` — director screenshot-uri pentru dry-run corpus
- `/workspace/atm/logs/` — director JSONL audit
- `/workspace/atm/README.md` — setup + calibration workflow
## Build Order
1. **`pyproject.toml` + scaffold package** — `pip install -e .`, `atm --help` funcționează.
2. **Script standalone de capture samples** (înainte de orice logică) — rulezi în timpul următoarelor sesiuni trading, dump screenshot la 5s interval în `samples/`. Ai corpus pentru dry-run.
3. **`calibrate.py`** — fără config calibrat, nimic nu merge. Tkinter cu: pas 1 (select TradeStation window by title), pas 2 (click pe colțuri ROI M2D MAPS), pas 3 (click pe fiecare culoare: turquoise, verde închis, verde deschis, galben, roșu închis, roșu deschis + gri neutru; capturează RGB + rază de toleranță implicită 20), pas 4 (2 click-uri pe axa Y + valori preț introduse → scale factor pixel→preț). Salvează `config.toml`.
4. **`detector.py`** — loop 1s: locate window, screenshot ROI, sample rightmost 5 dots pe pozițiile calibrate, clasifică fiecare la cea mai apropiată culoare (Euclidean in RGB cu toleranță). Rolling window ultimele 10 clasificări + timestamp. State machine: ultimele 3 non-gri consecutive = secvență BUY sau SELL? Fire o dată pe trigger (dedup set cu TTL 10min).
5. **`levels.py`** — după trigger, scan chart region pentru liniile orizontale roșii (SL) și verzi (TP1/TP2). Extrage y-pixel al fiecărei linii, convertește la preț folosind scale-ul calibrat.
6. **`notifier/discord.py`** — POST multipart cu screenshot adnotat + mesaj formatat: `🟢 BUY DIA→US30 | SL: 484.35 | TP1: 485.20 | TP2: 485.90 | 22:47:03`.
7. **`dryrun.py`** — iterează `samples/`, rulează detector, printează ce AR fi trimis. Validare logică detecție înainte de live.
8. **`audit.py`** — wrap detector loop, scrie JSONL: `{ts, window_found, roi_ok, dots:[...], classification:[...], trigger:null|"BUY"|"SELL", notified:true|false, reason}`.
9. **`main.py`** — CLI unificat. `atm calibrate`, `atm dryrun ./samples`, `atm run` (loop live cu audit).
10. **Windows Task Scheduler** — 2 task-uri: start 16:30 (stop 18:30), start 21:00 (stop 23:00). `atm run --duration 2h`.
11. **`notifier/telegram.py`** — opțional după ce Discord e stabil.
## Existing Utilities to Reuse
N/A — greenfield project. No internal utilities to reuse.
## Verification
End-to-end, în ordinea din build:
1. **Calibration workflow:** `atm calibrate` → urmezi pașii → rezultă `config.toml` complet. Verifică manual că RGB-urile sunt plauzibile pentru culorile descrise.
2. **Dry-run corpus:** ai ≥20 screenshot-uri din sesiuni reale în `samples/`. Rulezi `atm dryrun ./samples` → output per screenshot: clasificare + decizie trigger. Manual verifici că cazurile unde ai văzut tu semnal reali → trigger; cazurile neutre → no-trigger. False-positives = 0 țintă, false-negatives ≤ 5%.
3. **Live test notification-only (2 sesiuni):** `atm run` în fereastra trading. Verifici:
- Notificările Discord apar în 3s de când vezi trigger-ul pe chart.
- Screenshot atașat e clar, lizibil.
- SL/TP1/TP2 extrase sunt la ≤$0.05 de nivelele reale pe chart.
- Audit log (`logs/YYYY-MM-DD.jsonl`) conține fiecare ciclu; poți reproduce un missed signal.
4. **Sanity alerts:** mută/redimensionează fereastra TradeStation → bot detectează "window lost" în 60s → notificare. Restabilește fereastra → bot reia.
5. **Scheduler validation:** Windows Task Scheduler pornește `atm run` la 16:30, se oprește curat la 18:30, audit log salvează fără corupere.
## Risk Register
- **Prop firm TOS (Faza 2 blocker, NU Faza 1):** înainte de orice extensie spre auto-execution în TradeLocker, citești TOS-ul prop-ului, cauți "EA / automation / bot / copy trading / external signals". Dacă e interzis, Faza 2 e moartă și rămâi permanent pe Faza 1.
- **Indicator layout change:** dacă TradeStation update schimbă render-ul M2D MAPS → re-calibration. Audit log va arăta degradare graduală a confidence-ului → alert activ via "bot lost sight".
- **Price divergence DIA↔US30:** trigger-ul se dă pe DIA; poate fi o secundă unde US30 deja a mișcat diferit. Risc acceptabil (judgment user), dar monitorizat în Faza 2 prin slippage analysis.
- **Screenshot pe ecran sharing / AnyDesk / RDP:** dacă cineva se conectează remote la Windows-ul tău în timpul trading, screenshot-urile pot cuprinde overlay-uri nepotrivite. Mic, dar notabil.
## Out of Scope (Faza 1)
- Orice click automat în TradeLocker
- Multi-symbol concurrent monitoring (single chart la un moment dat)
- Backtesting pe date istorice (strategia e deja validată manual)
- UI / dashboard web — totul rulează headless cu notificări externe

37
pyproject.toml Normal file
View File

@@ -0,0 +1,37 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "atm"
version = "0.1.0"
description = "M2D Trading Monitor — TradeStation chart watcher, Discord/Telegram notifier"
requires-python = ">=3.11"
dependencies = [
"numpy>=1.26",
"opencv-python>=4.9",
"pillow>=10.0",
"requests>=2.31",
"rich>=13.0",
]
[project.optional-dependencies]
windows = [
"mss>=9.0",
"pygetwindow>=0.0.9",
"pywin32>=306",
]
dev = [
"pytest>=8.0",
"pytest-cov>=5.0",
]
[project.scripts]
atm = "atm.main:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]

1
src/atm/__init__.py Normal file
View File

@@ -0,0 +1 @@
__version__ = "0.1.0"

54
src/atm/audit.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import json
from datetime import datetime, date
from pathlib import Path
from typing import Callable, IO
class AuditLog:
def __init__(
self,
base_dir: Path,
clock: Callable[[], datetime] | None = None,
) -> None:
self._base_dir = Path(base_dir)
self._clock: Callable[[], datetime] = clock or datetime.now
self._current_date: date | None = None
self._fh: IO[str] | None = None
def log(self, event: dict) -> None:
now = self._clock()
today = now.date()
if today != self._current_date:
self._open(today)
if "ts" not in event:
event = {**event, "ts": now.isoformat()}
assert self._fh is not None
self._fh.write(json.dumps(event, separators=(",", ":")) + "\n")
def close(self) -> None:
if self._fh is not None:
try:
self._fh.close()
except Exception:
pass
finally:
self._fh = None
@property
def current_path(self) -> Path:
if self._current_date is None:
today = self._clock().date()
return self._base_dir / f"{today}.jsonl"
return self._base_dir / f"{self._current_date}.jsonl"
def _open(self, today: date) -> None:
self.close()
self._base_dir.mkdir(parents=True, exist_ok=True)
path = self._base_dir / f"{today}.jsonl"
self._fh = open(path, "a", buffering=1, encoding="utf-8")
self._current_date = today

170
src/atm/config.py Normal file
View File

@@ -0,0 +1,170 @@
"""Config dataclass with load-time validation. Fail fast."""
from __future__ import annotations
import tomllib
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
DotColor = Literal[
"turquoise", "yellow",
"dark_green", "dark_red",
"light_green", "light_red",
"gray", "background",
]
VALID_COLORS: tuple[str, ...] = (
"turquoise", "yellow", "dark_green", "dark_red",
"light_green", "light_red", "gray", "background",
)
@dataclass(frozen=True)
class ROI:
x: int
y: int
w: int
h: int
def __post_init__(self) -> None:
if self.w <= 0 or self.h <= 0:
raise ValueError(f"ROI w/h must be positive: {self}")
if self.x < 0 or self.y < 0:
raise ValueError(f"ROI x/y must be non-negative: {self}")
@dataclass(frozen=True)
class ColorSpec:
rgb: tuple[int, int, int]
tolerance: float
def __post_init__(self) -> None:
if len(self.rgb) != 3 or any(not (0 <= c <= 255) for c in self.rgb):
raise ValueError(f"rgb must be 3 ints 0-255: {self.rgb}")
if self.tolerance <= 0:
raise ValueError(f"tolerance must be positive: {self.tolerance}")
@dataclass(frozen=True)
class YAxisCalib:
"""Two reference points (pixel_y, price) for linear interp."""
p1_y: int
p1_price: float
p2_y: int
p2_price: float
def __post_init__(self) -> None:
if self.p1_y == self.p2_y:
raise ValueError("y-axis calibration points must differ in y")
@dataclass(frozen=True)
class CanaryRegion:
roi: ROI
baseline_phash: str
drift_threshold: int = 8
@dataclass(frozen=True)
class DiscordCfg:
webhook_url: str
def __post_init__(self) -> None:
if not self.webhook_url.startswith("http"):
raise ValueError("discord webhook_url required")
@dataclass(frozen=True)
class TelegramCfg:
bot_token: str
chat_id: str
def __post_init__(self) -> None:
if not self.bot_token or not self.chat_id:
raise ValueError("telegram bot_token + chat_id required")
@dataclass(frozen=True)
class Config:
window_title: str
dot_roi: ROI
chart_roi: ROI
colors: dict[str, ColorSpec]
y_axis: YAxisCalib
canary: CanaryRegion
discord: DiscordCfg
telegram: TelegramCfg
debounce_depth: int = 1
loop_interval_s: float = 5.0
heartbeat_min: int = 30
lockout_s: int = 240
low_conf_threshold: float = 0.2
low_conf_run: int = 3
phaseb_timeout_s: int = 600
dead_letter_path: str = "logs/dead_letter.jsonl"
config_version: str = "unknown"
def __post_init__(self) -> None:
required = {"turquoise", "yellow", "dark_green", "dark_red",
"light_green", "light_red", "gray"}
missing = required - set(self.colors.keys())
if missing:
raise ValueError(f"config.colors missing: {missing}")
if self.debounce_depth < 1:
raise ValueError("debounce_depth >= 1")
if self.loop_interval_s <= 0:
raise ValueError("loop_interval_s > 0")
@classmethod
def load(cls, path: str | Path) -> "Config":
p = Path(path)
data = tomllib.loads(p.read_text(encoding="utf-8"))
return cls._from_dict(data, version=p.stem)
@classmethod
def load_current(cls, configs_dir: str | Path) -> "Config":
"""Resolve configs/current.txt → active config file."""
d = Path(configs_dir)
marker = d / "current.txt"
if not marker.exists():
raise FileNotFoundError(f"marker not found: {marker}")
name = marker.read_text(encoding="utf-8").strip()
return cls.load(d / name)
@classmethod
def _from_dict(cls, data: dict, version: str = "unknown") -> "Config":
roi = ROI(**data["dot_roi"])
chart = ROI(**data["chart_roi"])
colors = {k: ColorSpec(rgb=tuple(v["rgb"]), tolerance=float(v["tolerance"]))
for k, v in data["colors"].items()}
y = YAxisCalib(**data["y_axis"])
canary = CanaryRegion(
roi=ROI(**data["canary"]["roi"]),
baseline_phash=data["canary"]["baseline_phash"],
drift_threshold=int(data["canary"].get("drift_threshold", 8)),
)
discord = DiscordCfg(webhook_url=data["discord"]["webhook_url"])
telegram = TelegramCfg(
bot_token=data["telegram"]["bot_token"],
chat_id=str(data["telegram"]["chat_id"]),
)
opts = data.get("options", {})
return cls(
window_title=data["window_title"],
dot_roi=roi,
chart_roi=chart,
colors=colors,
y_axis=y,
canary=canary,
discord=discord,
telegram=telegram,
debounce_depth=int(opts.get("debounce_depth", 1)),
loop_interval_s=float(opts.get("loop_interval_s", 5.0)),
heartbeat_min=int(opts.get("heartbeat_min", 30)),
lockout_s=int(opts.get("lockout_s", 240)),
low_conf_threshold=float(opts.get("low_conf_threshold", 0.2)),
low_conf_run=int(opts.get("low_conf_run", 3)),
phaseb_timeout_s=int(opts.get("phaseb_timeout_s", 600)),
dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"),
config_version=version,
)

View File

@@ -0,0 +1,18 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
@dataclass
class Alert:
kind: str # "trigger" | "heartbeat" | "levels" | "warn"
title: str
body: str
image_path: Path | None = None # annotated screenshot
direction: str | None = None # "BUY"/"SELL" when kind=trigger
class Notifier(Protocol):
name: str
def send(self, alert: Alert) -> None: ... # raises on failure

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import requests
from . import Alert
class DiscordNotifier:
name = "discord"
def __init__(self, webhook_url: str, session: Any = None) -> None:
self._url = webhook_url
self._session = session or requests.Session()
def send(self, alert: Alert) -> None:
content = f"**{alert.title}**\n{alert.body}"
if alert.image_path and Path(alert.image_path).exists():
with open(alert.image_path, "rb") as fh:
resp = self._session.post(
self._url,
data={"content": content},
files={"file": fh},
timeout=10,
)
else:
resp = self._session.post(
self._url,
json={"content": content},
timeout=10,
)
if resp.status_code == 429:
raise RuntimeError(f"Discord rate-limited (429): {resp.text}")
if resp.status_code >= 500:
raise RuntimeError(f"Discord server error ({resp.status_code}): {resp.text}")
# 200 / 204 are OK; other 4xx are treated as errors too
if not (resp.status_code in (200, 204)):
raise RuntimeError(f"Discord unexpected status ({resp.status_code}): {resp.text}")

128
src/atm/notifier/fanout.py Normal file
View File

@@ -0,0 +1,128 @@
from __future__ import annotations
import json
import queue
import threading
import time
from copy import copy
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from . import Alert, Notifier
_SENTINEL = object()
@dataclass
class _Stats:
sent: int = 0
failed: int = 0
dropped: int = 0
retries: int = 0
class FanoutNotifier:
def __init__(
self,
backends: list[Notifier],
dead_letter_path: Path,
queue_size: int = 50,
max_retries: int = 3,
backoff_base: float = 0.5,
) -> None:
self._backends = backends
self._dead_letter_path = Path(dead_letter_path)
self._queue_size = queue_size
self._max_retries = max_retries
self._backoff_base = backoff_base
self._dl_lock = threading.Lock()
self._queues: dict[str, queue.Queue[Any]] = {}
self._stats: dict[str, _Stats] = {}
self._workers: list[threading.Thread] = []
for backend in backends:
q: queue.Queue[Any] = queue.Queue(maxsize=queue_size)
self._queues[backend.name] = q
self._stats[backend.name] = _Stats()
t = threading.Thread(
target=self._worker,
args=(backend, q),
daemon=True,
name=f"fanout-{backend.name}",
)
t.start()
self._workers.append(t)
def send(self, alert: Alert) -> None:
alert_copy = copy(alert)
for backend in self._backends:
q = self._queues[backend.name]
stats = self._stats[backend.name]
if q.full():
try:
q.get_nowait()
stats.dropped += 1
except queue.Empty:
pass
q.put(alert_copy)
def stop(self, timeout: float = 5.0) -> None:
for q in self._queues.values():
q.put(_SENTINEL)
for w in self._workers:
w.join(timeout=timeout)
def stats(self) -> dict[str, dict[str, int]]:
return {
name: {
"sent": s.sent,
"failed": s.failed,
"dropped": s.dropped,
"retries": s.retries,
}
for name, s in self._stats.items()
}
def _worker(self, backend: Notifier, q: queue.Queue[Any]) -> None:
stats = self._stats[backend.name]
while True:
item = q.get()
if item is _SENTINEL:
break
self._dispatch(backend, item, stats)
def _dispatch(self, backend: Notifier, alert: Alert, stats: _Stats) -> None:
last_exc: Exception | None = None
for attempt in range(self._max_retries + 1):
if attempt > 0:
delay = self._backoff_base * (2 ** (attempt - 1))
time.sleep(delay)
stats.retries += 1
try:
backend.send(alert)
stats.sent += 1
return
except Exception as exc:
last_exc = exc
# Exhausted all retries
stats.failed += 1
self._write_dead_letter(backend.name, alert, last_exc)
def _write_dead_letter(
self, backend_name: str, alert: Alert, exc: Exception | None
) -> None:
record = {
"backend": backend_name,
"alert_title": alert.title,
"alert_kind": alert.kind,
"error_str": str(exc) if exc else "",
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
line = json.dumps(record) + "\n"
with self._dl_lock:
self._dead_letter_path.parent.mkdir(parents=True, exist_ok=True)
with open(self._dead_letter_path, "a", encoding="utf-8") as fh:
fh.write(line)

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import requests
from . import Alert
_BASE = "https://api.telegram.org/bot{token}/{method}"
class TelegramNotifier:
name = "telegram"
def __init__(self, bot_token: str, chat_id: str, session: Any = None) -> None:
self._token = bot_token
self._chat_id = chat_id
self._session = session or requests.Session()
def _url(self, method: str) -> str:
return _BASE.format(token=self._token, method=method)
def send(self, alert: Alert) -> None:
text = f"*{alert.title}*\n{alert.body}"
if alert.image_path and Path(alert.image_path).exists():
with open(alert.image_path, "rb") as fh:
resp = self._session.post(
self._url("sendPhoto"),
data={
"chat_id": self._chat_id,
"caption": text,
"parse_mode": "Markdown",
},
files={"photo": fh},
timeout=10,
)
else:
resp = self._session.post(
self._url("sendMessage"),
json={
"chat_id": self._chat_id,
"text": text,
"parse_mode": "Markdown",
},
timeout=10,
)
if resp.status_code == 429:
raise RuntimeError(f"Telegram rate-limited (429): {resp.text}")
if resp.status_code >= 500:
raise RuntimeError(f"Telegram server error ({resp.status_code}): {resp.text}")
if resp.status_code != 200:
raise RuntimeError(f"Telegram unexpected status ({resp.status_code}): {resp.text}")

224
src/atm/state_machine.py Normal file
View File

@@ -0,0 +1,224 @@
"""ATM trading state machine — pure stdlib, no cv2/numpy."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Literal
class State(Enum):
IDLE = "IDLE"
ARMED_BUY = "ARMED_BUY"
PRIMED_BUY = "PRIMED_BUY"
ARMED_SELL = "ARMED_SELL"
PRIMED_SELL = "PRIMED_SELL"
DotColor = Literal[
"turquoise", "yellow", "dark_green", "dark_red", "light_green", "light_red", "gray"
]
@dataclass(frozen=True)
class Transition:
prev: State
next: State
event: str
reason: str
trigger: Literal["BUY", "SELL"] | None = None
locked: bool = False
arm_ts: float | None = None
prime_ts: float | None = None
fire_ts: float | None = None
class StateMachine:
"""Finite state machine for ATM dot-color trading signals.
BUY side and SELL side are mirrors of each other:
turquoise ↔ yellow
dark_green ↔ dark_red
light_green ↔ light_red
Lockout: after a fire event, same-direction fires within lockout_s
seconds produce locked=True transitions (caller suppresses notify).
Opposite direction is unaffected.
"""
def __init__(self, lockout_s: int = 240) -> None:
self._state = State.IDLE
self._lockout_s = lockout_s
# arm_ts / prime_ts are reset per-arm / per-prime cycle
self._arm_ts: float | None = None
self._prime_ts: float | None = None
# last fire timestamps per direction
self._last_fire: dict[str, float] = {}
@property
def state(self) -> State:
return self._state
# ------------------------------------------------------------------
# Core feed
# ------------------------------------------------------------------
def feed(self, color: DotColor, ts: float) -> Transition: # noqa: C901
prev = self._state
match self._state:
case State.IDLE:
t = self._from_idle(color, ts)
case State.ARMED_BUY:
t = self._from_armed(color, ts, direction="BUY")
case State.ARMED_SELL:
t = self._from_armed(color, ts, direction="SELL")
case State.PRIMED_BUY:
t = self._from_primed(color, ts, direction="BUY")
case State.PRIMED_SELL:
t = self._from_primed(color, ts, direction="SELL")
case _: # pragma: no cover
t = self._noise(color, ts)
assert t.prev == prev
self._state = t.next
return t
# ------------------------------------------------------------------
# State handlers
# ------------------------------------------------------------------
def _from_idle(self, color: DotColor, ts: float) -> Transition:
prev = State.IDLE
match color:
case "turquoise":
self._arm_ts = ts
self._prime_ts = None
return Transition(prev, State.ARMED_BUY, color, "arm", arm_ts=ts)
case "yellow":
self._arm_ts = ts
self._prime_ts = None
return Transition(prev, State.ARMED_SELL, color, "arm", arm_ts=ts)
case "light_green" | "light_red":
return Transition(prev, State.IDLE, color, "phase_skip")
case _:
# dark_green, dark_red, gray → noise
return Transition(prev, State.IDLE, color, "noise")
def _from_armed(
self, color: DotColor, ts: float, direction: Literal["BUY", "SELL"]
) -> Transition:
"""Handle events from ARMED_BUY or ARMED_SELL."""
prev = State.ARMED_BUY if direction == "BUY" else State.ARMED_SELL
# Map color names to logical roles for the given direction
arm_color = "turquoise" if direction == "BUY" else "yellow"
opposite_arm = "yellow" if direction == "BUY" else "turquoise"
prime_color = "dark_green" if direction == "BUY" else "dark_red"
skip_correct = "light_green" if direction == "BUY" else "light_red"
skip_wrong = "light_red" if direction == "BUY" else "light_green"
primed_state = State.PRIMED_BUY if direction == "BUY" else State.PRIMED_SELL
opposite_state = State.ARMED_SELL if direction == "BUY" else State.ARMED_BUY
match color:
case c if c == "gray":
return Transition(prev, prev, color, "persist", arm_ts=self._arm_ts)
case c if c == arm_color:
# refresh: update arm_ts
self._arm_ts = ts
return Transition(prev, prev, color, "refresh", arm_ts=ts)
case c if c == prime_color:
self._prime_ts = ts
return Transition(
prev, primed_state, color, "prime",
arm_ts=self._arm_ts, prime_ts=ts,
)
case c if c == opposite_arm:
self._arm_ts = ts
self._prime_ts = None
return Transition(
prev, opposite_state, color, "opposite_rearm", arm_ts=ts
)
case c if c == skip_correct:
# phase_skip — no fire
self._arm_ts = None
self._prime_ts = None
return Transition(prev, State.IDLE, color, "phase_skip")
case c if c == skip_wrong:
self._arm_ts = None
self._prime_ts = None
return Transition(prev, State.IDLE, color, "phase_skip")
case _:
# dark_red (for BUY) or dark_green (for SELL) → ignore
return Transition(prev, prev, color, "ignore", arm_ts=self._arm_ts)
def _from_primed(
self, color: DotColor, ts: float, direction: Literal["BUY", "SELL"]
) -> Transition:
"""Handle events from PRIMED_BUY or PRIMED_SELL."""
prev = State.PRIMED_BUY if direction == "BUY" else State.PRIMED_SELL
accumulate_color = "dark_green" if direction == "BUY" else "dark_red"
ignore_color = "dark_red" if direction == "BUY" else "dark_green"
fire_color = "light_green" if direction == "BUY" else "light_red"
skip_color = "light_red" if direction == "BUY" else "light_green"
arm_color = "turquoise" if direction == "BUY" else "yellow"
opposite_arm = "yellow" if direction == "BUY" else "turquoise"
armed_self = State.ARMED_BUY if direction == "BUY" else State.ARMED_SELL
armed_opp = State.ARMED_SELL if direction == "BUY" else State.ARMED_BUY
match color:
case c if c == accumulate_color:
return Transition(
prev, prev, color, "accumulate",
arm_ts=self._arm_ts, prime_ts=self._prime_ts,
)
case c if c == ignore_color:
return Transition(
prev, prev, color, "ignore",
arm_ts=self._arm_ts, prime_ts=self._prime_ts,
)
case c if c == fire_color:
locked = self._is_locked(direction, ts)
fire_ts = ts
self._last_fire[direction] = ts
self._arm_ts = None
self._prime_ts = None
return Transition(
prev, State.IDLE, color, "fire",
trigger=direction,
locked=locked,
fire_ts=fire_ts,
)
case c if c == skip_color:
self._arm_ts = None
self._prime_ts = None
return Transition(prev, State.IDLE, color, "phase_skip")
case c if c == "gray":
self._arm_ts = None
self._prime_ts = None
return Transition(prev, State.IDLE, color, "cooled")
case c if c == arm_color:
self._arm_ts = ts
self._prime_ts = None
return Transition(prev, armed_self, color, "rearm", arm_ts=ts)
case c if c == opposite_arm:
self._arm_ts = ts
self._prime_ts = None
return Transition(prev, armed_opp, color, "opposite_rearm", arm_ts=ts)
case _:
return Transition(
prev, prev, color, "noise",
arm_ts=self._arm_ts, prime_ts=self._prime_ts,
)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _noise(self, color: DotColor, ts: float) -> Transition: # pragma: no cover
return Transition(self._state, self._state, color, "noise")
def _is_locked(self, direction: str, ts: float) -> bool:
last = self._last_fire.get(direction)
if last is None:
return False
return (ts - last) < self._lockout_s

162
src/atm/vision.py Normal file
View File

@@ -0,0 +1,162 @@
"""Shared vision primitives: ROI crop, perceptual hash, pixel↔price interp, Hough lines."""
from __future__ import annotations
from dataclasses import dataclass
import cv2
import numpy as np
from .config import ROI, YAxisCalib
def crop_roi(img: np.ndarray, roi: ROI) -> np.ndarray:
"""Crop RGB/BGR image by ROI. Raises on out-of-bounds."""
h, w = img.shape[:2]
if roi.x + roi.w > w or roi.y + roi.h > h:
raise ValueError(f"ROI {roi} exceeds image {w}x{h}")
return img[roi.y:roi.y + roi.h, roi.x:roi.x + roi.w].copy()
def phash(img: np.ndarray, size: int = 16) -> str:
"""Perceptual hash via DCT. Returns hex string. Image may be RGB/BGR/gray."""
if img.ndim == 3:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
else:
gray = img
small = cv2.resize(gray, (32, 32), interpolation=cv2.INTER_AREA).astype(np.float32)
dct = cv2.dct(small)
block = dct[:size, :size]
med = np.median(block[1:, 1:]) # skip DC term
bits = (block > med).flatten()
# pack into hex
out = 0
for b in bits:
out = (out << 1) | int(b)
return f"{out:0{len(bits) // 4}x}"
def hamming_hex(a: str, b: str) -> int:
if len(a) != len(b):
raise ValueError("hash length mismatch")
return bin(int(a, 16) ^ int(b, 16)).count("1")
def pixel_y_to_price(y: int, calib: YAxisCalib) -> float:
"""Linear interp from screen-y to price. y grows downward; prices usually invert."""
dy = calib.p2_y - calib.p1_y
dp = calib.p2_price - calib.p1_price
return calib.p1_price + (y - calib.p1_y) * (dp / dy)
def price_to_pixel_y(price: float, calib: YAxisCalib) -> int:
dy = calib.p2_y - calib.p1_y
dp = calib.p2_price - calib.p1_price
return int(round(calib.p1_y + (price - calib.p1_price) * (dy / dp)))
@dataclass(frozen=True)
class ColorMatch:
name: str
distance: float
confidence: float # 1 - d_nearest / d_second
def classify_pixel(
rgb: tuple[int, int, int],
palette: dict[str, tuple[tuple[int, int, int], float]],
) -> ColorMatch:
"""Nearest-color in RGB Euclidean. palette: name → (rgb, tolerance).
Returns UNKNOWN if nearest exceeds its tolerance.
"""
best_name = "UNKNOWN"
best_d = float("inf")
second_d = float("inf")
best_tol = 0.0
p = np.array(rgb, dtype=np.float32)
for name, (ref, tol) in palette.items():
d = float(np.linalg.norm(p - np.array(ref, dtype=np.float32)))
if d < best_d:
second_d = best_d
best_d = d
best_name = name
best_tol = tol
elif d < second_d:
second_d = d
if best_d > best_tol:
return ColorMatch(name="UNKNOWN", distance=best_d, confidence=0.0)
conf = 1.0 - (best_d / second_d) if second_d > 0 else 1.0
return ColorMatch(name=best_name, distance=best_d, confidence=max(0.0, conf))
def find_rightmost_dot(
roi_img: np.ndarray,
bg_rgb: tuple[int, int, int],
bg_tol: float = 15.0,
min_cluster_px: int = 3,
) -> tuple[int, int] | None:
"""Scan ROI right→left; return (x, y) of rightmost non-background cluster center.
roi_img is BGR (OpenCV convention). bg_rgb is RGB.
"""
bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32)
diff = np.linalg.norm(roi_img.astype(np.float32) - bgr_bg, axis=2)
mask = diff > bg_tol # True = non-background
_h, w = mask.shape
for x in range(w - 1, -1, -1):
col = mask[:, x]
if col.sum() >= min_cluster_px:
ys = np.where(col)[0]
return (x, int(ys.mean()))
return None
def pixel_rgb(roi_img: np.ndarray, x: int, y: int, box: int = 3) -> tuple[int, int, int]:
"""Sample mean RGB of a (2*box+1)² patch around (x,y). Input BGR → returns RGB."""
h, w = roi_img.shape[:2]
x0, x1 = max(0, x - box), min(w, x + box + 1)
y0, y1 = max(0, y - box), min(h, y + box + 1)
patch = roi_img[y0:y1, x0:x1]
mean = patch.reshape(-1, 3).mean(axis=0)
# BGR → RGB
return (int(mean[2]), int(mean[1]), int(mean[0]))
def detect_color_lines(
chart_img: np.ndarray,
color_rgb: tuple[int, int, int],
tol: float = 30.0,
min_line_length_frac: float = 0.3,
) -> list[int]:
"""Find horizontal lines of given color. Returns list of y-coords (sorted).
chart_img is BGR. Color mask + Hough for near-horizontal lines.
"""
_h, w = chart_img.shape[:2]
bgr_target = np.array([color_rgb[2], color_rgb[1], color_rgb[0]], dtype=np.float32)
diff = np.linalg.norm(chart_img.astype(np.float32) - bgr_target, axis=2)
mask = (diff < tol).astype(np.uint8) * 255
if mask.sum() == 0:
return []
min_len = max(10, int(w * min_line_length_frac))
lines = cv2.HoughLinesP(
mask,
rho=1, theta=np.pi / 180,
threshold=min_len,
minLineLength=min_len,
maxLineGap=int(w * 0.05),
)
if lines is None:
return []
ys: list[int] = []
for _x1, y1, _x2, y2 in lines[:, 0]:
if abs(y2 - y1) <= 2: # near-horizontal
ys.append(int((y1 + y2) / 2))
# merge near-duplicates within 3 px
ys.sort()
merged: list[int] = []
for y in ys:
if merged and abs(y - merged[-1]) <= 3:
continue
merged.append(y)
return merged

0
tests/__init__.py Normal file
View File

107
tests/test_audit.py Normal file
View File

@@ -0,0 +1,107 @@
"""Tests for AuditLog."""
from __future__ import annotations
import json
import os
from datetime import datetime
from pathlib import Path
import pytest
from atm.audit import AuditLog
def _dt(s: str) -> datetime:
return datetime.fromisoformat(s)
def test_writes_jsonl(tmp_path: Path) -> None:
clock_dt = _dt("2026-04-15T10:00:00")
log = AuditLog(tmp_path / "logs", clock=lambda: clock_dt)
events = [
{"msg": "first", "ts": "2026-04-15T10:00:00"},
{"msg": "second", "ts": "2026-04-15T10:01:00"},
{"msg": "third", "ts": "2026-04-15T10:02:00"},
]
for e in events:
log.log(e)
log.close()
lines = (tmp_path / "logs" / "2026-04-15.jsonl").read_text().splitlines()
assert len(lines) == 3
for original, line in zip(events, lines):
parsed = json.loads(line)
assert parsed["msg"] == original["msg"]
assert parsed["ts"] == original["ts"]
def test_daily_rotation(tmp_path: Path) -> None:
base = tmp_path / "logs"
times = [
_dt("2026-04-15T23:59:59"), # just before midnight
_dt("2026-04-16T00:00:01"), # just after midnight
]
idx = 0
def clock() -> datetime:
return times[min(idx, len(times) - 1)]
log = AuditLog(base, clock=clock)
log.log({"msg": "before"})
idx = 1
log.log({"msg": "after"})
log.close()
file_15 = base / "2026-04-15.jsonl"
file_16 = base / "2026-04-16.jsonl"
assert file_15.exists(), "File for Apr 15 should exist"
assert file_16.exists(), "File for Apr 16 should exist"
lines_15 = [json.loads(l) for l in file_15.read_text().splitlines()]
lines_16 = [json.loads(l) for l in file_16.read_text().splitlines()]
assert lines_15[0]["msg"] == "before"
assert lines_16[0]["msg"] == "after"
def test_line_buffered(tmp_path: Path) -> None:
base = tmp_path / "logs"
clock_dt = _dt("2026-04-15T12:00:00")
log = AuditLog(base, clock=lambda: clock_dt)
log.log({"msg": "hello", "ts": "2026-04-15T12:00:00"})
# Do NOT call close() — file should already have content due to line buffering
path = base / "2026-04-15.jsonl"
assert os.stat(path).st_size > 0
log.close()
def test_adds_ts_when_missing(tmp_path: Path) -> None:
clock_dt = _dt("2026-04-15T09:30:00")
log = AuditLog(tmp_path / "logs", clock=lambda: clock_dt)
log.log({"msg": "hi"})
log.close()
lines = (tmp_path / "logs" / "2026-04-15.jsonl").read_text().splitlines()
parsed = json.loads(lines[0])
assert "ts" in parsed
assert parsed["ts"] == "2026-04-15T09:30:00"
def test_preserves_existing_ts(tmp_path: Path) -> None:
clock_dt = _dt("2026-04-15T09:30:00")
log = AuditLog(tmp_path / "logs", clock=lambda: clock_dt)
log.log({"ts": "2026-01-01T00:00:00", "msg": "hi"})
log.close()
lines = (tmp_path / "logs" / "2026-04-15.jsonl").read_text().splitlines()
parsed = json.loads(lines[0])
assert parsed["ts"] == "2026-01-01T00:00:00"
def test_close_idempotent(tmp_path: Path) -> None:
clock_dt = _dt("2026-04-15T10:00:00")
log = AuditLog(tmp_path / "logs", clock=lambda: clock_dt)
log.log({"msg": "x", "ts": "2026-04-15T10:00:00"})
log.close()
log.close() # should not raise

221
tests/test_notifier.py Normal file
View File

@@ -0,0 +1,221 @@
"""Tests for notifier module: FanoutNotifier, DiscordNotifier, TelegramNotifier."""
from __future__ import annotations
import json
import time
from pathlib import Path
import pytest
from atm.notifier import Alert
from atm.notifier.fanout import FanoutNotifier
# ---------------------------------------------------------------------------
# Fake backends
# ---------------------------------------------------------------------------
class FakeBackend:
"""Configurable fake backend for testing."""
def __init__(
self,
name: str = "fake",
always_fail: bool = False,
fail_first_n: int = 0,
sleep_s: float = 0.0,
) -> None:
self.name = name
self._always_fail = always_fail
self._fail_first_n = fail_first_n
self._sleep_s = sleep_s
self._call_count = 0
def send(self, alert: Alert) -> None:
self._call_count += 1
if self._sleep_s:
time.sleep(self._sleep_s)
if self._always_fail:
raise RuntimeError(f"{self.name}: simulated failure")
if self._call_count <= self._fail_first_n:
raise RuntimeError(f"{self.name}: simulated failure #{self._call_count}")
def _alert(title: str = "test", kind: str = "trigger") -> Alert:
return Alert(kind=kind, title=title, body="body text")
# ---------------------------------------------------------------------------
# FanoutNotifier tests
# ---------------------------------------------------------------------------
def test_fanout_both_delivered(tmp_path: Path) -> None:
dl = tmp_path / "dead.jsonl"
b1 = FakeBackend("b1")
b2 = FakeBackend("b2")
fan = FanoutNotifier([b1, b2], dl, backoff_base=0.01)
for i in range(3):
fan.send(_alert(f"alert-{i}"))
fan.stop(timeout=5.0)
s = fan.stats()
assert s["b1"]["sent"] == 3
assert s["b2"]["sent"] == 3
assert s["b1"]["failed"] == 0
assert s["b2"]["failed"] == 0
def test_one_backend_down_other_delivers(tmp_path: Path) -> None:
dl = tmp_path / "dead.jsonl"
ok_backend = FakeBackend("ok")
bad_backend = FakeBackend("bad", always_fail=True)
fan = FanoutNotifier(
[ok_backend, bad_backend], dl, max_retries=1, backoff_base=0.01
)
for i in range(2):
fan.send(_alert(f"a{i}"))
fan.stop(timeout=5.0)
s = fan.stats()
assert s["ok"]["sent"] == 2
assert s["bad"]["failed"] == 2
# dead letter file should have entries for the bad backend
assert dl.exists()
lines = [json.loads(l) for l in dl.read_text().splitlines()]
assert all(e["backend"] == "bad" for e in lines)
assert len(lines) == 2
def test_dead_letter_on_exhausted_retries(tmp_path: Path) -> None:
dl = tmp_path / "dead.jsonl"
bad = FakeBackend("bad", always_fail=True)
fan = FanoutNotifier([bad], dl, max_retries=3, backoff_base=0.01)
fan.send(_alert("my-alert"))
fan.stop(timeout=5.0)
s = fan.stats()
assert s["bad"]["failed"] == 1
# retries = max_retries (3 extra attempts after first)
assert s["bad"]["retries"] == 3
assert dl.exists()
lines = [json.loads(l) for l in dl.read_text().splitlines()]
assert len(lines) == 1
entry = lines[0]
assert entry["backend"] == "bad"
assert entry["alert_title"] == "my-alert"
assert "error_str" in entry
assert "timestamp" in entry
def test_queue_drop_oldest(tmp_path: Path) -> None:
dl = tmp_path / "dead.jsonl"
# slow backend: each send takes 0.5s so queue fills fast
slow = FakeBackend("slow", sleep_s=0.5)
fan = FanoutNotifier([slow], dl, queue_size=2, backoff_base=0.01)
# Pump 10 alerts rapidly; worker can't keep up
for i in range(10):
fan.send(_alert(f"a{i}"))
fan.stop(timeout=10.0)
s = fan.stats()
assert s["slow"]["dropped"] > 0
assert s["slow"]["sent"] <= 2 + 1 # queue_size + possibly 1 in-flight
def test_retry_backoff_recovers(tmp_path: Path) -> None:
dl = tmp_path / "dead.jsonl"
# Fails only the very first call, succeeds after
b = FakeBackend("b", fail_first_n=1)
fan = FanoutNotifier([b], dl, max_retries=3, backoff_base=0.01)
fan.send(_alert("recover"))
fan.stop(timeout=5.0)
s = fan.stats()
assert s["b"]["sent"] == 1
assert s["b"]["retries"] == 1
assert s["b"]["failed"] == 0
assert not dl.exists()
def test_stop_drains(tmp_path: Path) -> None:
dl = tmp_path / "dead.jsonl"
b = FakeBackend("b")
fan = FanoutNotifier([b], dl, backoff_base=0.01)
for i in range(5):
fan.send(_alert(f"a{i}"))
fan.stop(timeout=5.0)
# All items should have been processed before stop returned
assert fan.stats()["b"]["sent"] == 5
# ---------------------------------------------------------------------------
# DiscordNotifier unit tests (no real HTTP)
# ---------------------------------------------------------------------------
class _MockResponse:
def __init__(self, status_code: int, text: str = "") -> None:
self.status_code = status_code
self.text = text
class _MockSession:
def __init__(self, status_code: int = 204) -> None:
self.status_code = status_code
self.calls: list[dict] = []
def post(self, url: str, **kwargs):
self.calls.append({"url": url, **kwargs})
return _MockResponse(self.status_code)
def test_discord_send_ok() -> None:
from atm.notifier.discord import DiscordNotifier
session = _MockSession(204)
n = DiscordNotifier("https://discord.example/hook", session=session)
n.send(_alert("Hello"))
assert len(session.calls) == 1
assert "**Hello**" in session.calls[0]["json"]["content"]
def test_discord_429_raises() -> None:
from atm.notifier.discord import DiscordNotifier
n = DiscordNotifier("https://discord.example/hook", session=_MockSession(429))
with pytest.raises(RuntimeError, match="429"):
n.send(_alert("x"))
def test_discord_5xx_raises() -> None:
from atm.notifier.discord import DiscordNotifier
n = DiscordNotifier("https://discord.example/hook", session=_MockSession(500))
with pytest.raises(RuntimeError, match="500"):
n.send(_alert("x"))
# ---------------------------------------------------------------------------
# TelegramNotifier unit tests (no real HTTP)
# ---------------------------------------------------------------------------
def test_telegram_send_ok() -> None:
from atm.notifier.telegram import TelegramNotifier
session = _MockSession(200)
n = TelegramNotifier("token", "chat123", session=session)
n.send(_alert("Hi"))
assert len(session.calls) == 1
assert "*Hi*" in session.calls[0]["json"]["text"]
def test_telegram_429_raises() -> None:
from atm.notifier.telegram import TelegramNotifier
n = TelegramNotifier("token", "chat123", session=_MockSession(429))
with pytest.raises(RuntimeError, match="429"):
n.send(_alert("x"))
def test_telegram_5xx_raises() -> None:
from atm.notifier.telegram import TelegramNotifier
n = TelegramNotifier("token", "chat123", session=_MockSession(500))
with pytest.raises(RuntimeError, match="500"):
n.send(_alert("x"))

327
tests/test_state_machine.py Normal file
View File

@@ -0,0 +1,327 @@
"""Tests for atm.state_machine."""
from __future__ import annotations
import pytest
from atm.state_machine import DotColor, State, StateMachine, Transition
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _drive(sm: StateMachine, events: list[tuple[DotColor, float]]) -> list[Transition]:
return [sm.feed(color, ts) for color, ts in events]
def _buy_sequence_to_primed(sm: StateMachine, start_ts: float = 1.0) -> None:
"""Drive sm from IDLE → PRIMED_BUY (does not fire)."""
sm.feed("turquoise", start_ts) # → ARMED_BUY
sm.feed("dark_green", start_ts + 1) # → PRIMED_BUY
def _sell_sequence_to_primed(sm: StateMachine, start_ts: float = 1.0) -> None:
"""Drive sm from IDLE → PRIMED_SELL (does not fire)."""
sm.feed("yellow", start_ts) # → ARMED_SELL
sm.feed("dark_red", start_ts + 1) # → PRIMED_SELL
# ---------------------------------------------------------------------------
# 1. clean_buy
# ---------------------------------------------------------------------------
def test_clean_buy() -> None:
sm = StateMachine()
t1 = sm.feed("turquoise", 1.0)
assert t1.prev == State.IDLE
assert t1.next == State.ARMED_BUY
assert t1.reason == "arm"
assert sm.state == State.ARMED_BUY
t2 = sm.feed("gray", 2.0)
assert t2.prev == State.ARMED_BUY
assert t2.next == State.ARMED_BUY
assert t2.reason == "persist"
assert sm.state == State.ARMED_BUY
t3 = sm.feed("dark_green", 3.0)
assert t3.prev == State.ARMED_BUY
assert t3.next == State.PRIMED_BUY
assert t3.reason == "prime"
assert sm.state == State.PRIMED_BUY
t4 = sm.feed("light_green", 4.0)
assert t4.prev == State.PRIMED_BUY
assert t4.next == State.IDLE
assert t4.reason == "fire"
assert t4.trigger == "BUY"
assert t4.locked is False
assert sm.state == State.IDLE
# ---------------------------------------------------------------------------
# 2. clean_sell (mirror of clean_buy)
# ---------------------------------------------------------------------------
def test_clean_sell() -> None:
sm = StateMachine()
t1 = sm.feed("yellow", 1.0)
assert t1.prev == State.IDLE
assert t1.next == State.ARMED_SELL
assert t1.reason == "arm"
t2 = sm.feed("gray", 2.0)
assert t2.prev == State.ARMED_SELL
assert t2.next == State.ARMED_SELL
assert t2.reason == "persist"
t3 = sm.feed("dark_red", 3.0)
assert t3.prev == State.ARMED_SELL
assert t3.next == State.PRIMED_SELL
assert t3.reason == "prime"
t4 = sm.feed("light_red", 4.0)
assert t4.prev == State.PRIMED_SELL
assert t4.next == State.IDLE
assert t4.reason == "fire"
assert t4.trigger == "SELL"
assert t4.locked is False
# ---------------------------------------------------------------------------
# 3. cooled
# ---------------------------------------------------------------------------
def test_cooled() -> None:
sm = StateMachine()
_buy_sequence_to_primed(sm, start_ts=1.0)
assert sm.state == State.PRIMED_BUY
t = sm.feed("gray", 10.0)
assert t.prev == State.PRIMED_BUY
assert t.next == State.IDLE
assert t.reason == "cooled"
assert t.trigger is None
# ---------------------------------------------------------------------------
# 4. opposite_rearm from ARMED_BUY
# ---------------------------------------------------------------------------
def test_opposite_rearm_from_armed_buy() -> None:
sm = StateMachine()
sm.feed("turquoise", 1.0) # → ARMED_BUY
assert sm.state == State.ARMED_BUY
t = sm.feed("yellow", 2.0)
assert t.prev == State.ARMED_BUY
assert t.next == State.ARMED_SELL
assert t.reason == "opposite_rearm"
assert sm.state == State.ARMED_SELL
# ---------------------------------------------------------------------------
# 5. opposite_rearm from PRIMED_BUY
# ---------------------------------------------------------------------------
def test_opposite_rearm_from_primed_buy() -> None:
sm = StateMachine()
_buy_sequence_to_primed(sm, start_ts=1.0)
assert sm.state == State.PRIMED_BUY
t = sm.feed("yellow", 5.0)
assert t.prev == State.PRIMED_BUY
assert t.next == State.ARMED_SELL
assert t.reason == "opposite_rearm"
assert sm.state == State.ARMED_SELL
# ---------------------------------------------------------------------------
# 6. lockout_same_direction
# ---------------------------------------------------------------------------
def test_lockout_same_direction() -> None:
sm = StateMachine(lockout_s=240)
# First fire at t=100
_buy_sequence_to_primed(sm, start_ts=90.0)
t_fire1 = sm.feed("light_green", 100.0)
assert t_fire1.trigger == "BUY"
assert t_fire1.locked is False
# Re-prime
_buy_sequence_to_primed(sm, start_ts=110.0)
# Second fire at t=200 — inside lockout window (200 - 100 = 100 < 240)
t_fire2 = sm.feed("light_green", 200.0)
assert t_fire2.trigger == "BUY"
assert t_fire2.locked is True
assert t_fire2.next == State.IDLE
# Re-prime again
_buy_sequence_to_primed(sm, start_ts=300.0)
# Third fire at t=341 — outside lockout (341 - 200 = 141 < 240, still locked)
# Actually 341 - 100 would be outside but last fire is at t=200:
# 341 - 200 = 141 < 240 → still locked
# We need t > 200 + 240 = 440
_buy_sequence_to_primed(sm, start_ts=430.0)
t_fire3 = sm.feed("light_green", 441.0)
assert t_fire3.trigger == "BUY"
assert t_fire3.locked is False
def test_lockout_same_direction_boundary() -> None:
"""Spec requirement: fire BUY @ t=100; fire again @ t=341 → locked=False (241 >= 240)."""
sm = StateMachine(lockout_s=240)
_buy_sequence_to_primed(sm, start_ts=90.0)
t_fire1 = sm.feed("light_green", 100.0)
assert t_fire1.locked is False
# Re-prime and fire just inside window: 339-100=239 < 240 → locked
_buy_sequence_to_primed(sm, start_ts=110.0)
t_locked = sm.feed("light_green", 339.0)
assert t_locked.locked is True
# last_fire is now 339. Re-prime and fire just outside: 580-339=241 >= 240 → unlocked
_buy_sequence_to_primed(sm, start_ts=340.0)
t_free = sm.feed("light_green", 580.0)
assert t_free.locked is False
# ---------------------------------------------------------------------------
# 7. lockout_does_not_block_opposite
# ---------------------------------------------------------------------------
def test_lockout_does_not_block_opposite() -> None:
sm = StateMachine(lockout_s=240)
# Fire BUY at t=100
_buy_sequence_to_primed(sm, start_ts=90.0)
sm.feed("light_green", 100.0)
# Drive SELL sequence — opposite direction must not be locked
_sell_sequence_to_primed(sm, start_ts=110.0)
t_sell = sm.feed("light_red", 200.0)
assert t_sell.trigger == "SELL"
assert t_sell.locked is False
# ---------------------------------------------------------------------------
# 8. phase_skip from ARMED_BUY (light_green without priming)
# ---------------------------------------------------------------------------
def test_phase_skip_armed_buy() -> None:
sm = StateMachine()
sm.feed("turquoise", 1.0) # → ARMED_BUY
assert sm.state == State.ARMED_BUY
t = sm.feed("light_green", 2.0)
assert t.prev == State.ARMED_BUY
assert t.next == State.IDLE
assert t.reason == "phase_skip"
assert t.trigger is None
# ---------------------------------------------------------------------------
# 9. noise_from_idle
# ---------------------------------------------------------------------------
def test_noise_from_idle() -> None:
sm = StateMachine()
t = sm.feed("dark_green", 1.0)
assert t.prev == State.IDLE
assert t.next == State.IDLE
assert t.reason == "noise"
assert t.trigger is None
assert sm.state == State.IDLE
# ---------------------------------------------------------------------------
# 10. refresh_arm_ts
# ---------------------------------------------------------------------------
def test_refresh_arm_ts() -> None:
sm = StateMachine()
sm.feed("turquoise", 1.0) # arm at t=1
t1 = sm.feed("turquoise", 5.0) # refresh at t=5
assert t1.prev == State.ARMED_BUY
assert t1.next == State.ARMED_BUY
assert t1.reason == "refresh"
assert t1.arm_ts == 5.0
t2 = sm.feed("turquoise", 9.0) # refresh again at t=9
assert t2.arm_ts == 9.0
# ---------------------------------------------------------------------------
# 11. exhaustive — parameterize over every (state, color) pair
# ---------------------------------------------------------------------------
ALL_STATES = list(State)
ALL_COLORS: list[DotColor] = [
"turquoise", "yellow", "dark_green", "dark_red",
"light_green", "light_red", "gray",
]
FIRE_DIRECTIONS: dict[str, str] = {
State.PRIMED_BUY.value: "BUY",
State.PRIMED_SELL.value: "SELL",
}
VALID_STATES = set(State)
def _sm_in_state(target: State) -> StateMachine:
"""Return a fresh StateMachine already in the given state."""
sm = StateMachine()
match target:
case State.IDLE:
pass
case State.ARMED_BUY:
sm.feed("turquoise", 1.0)
case State.ARMED_SELL:
sm.feed("yellow", 1.0)
case State.PRIMED_BUY:
sm.feed("turquoise", 1.0)
sm.feed("dark_green", 2.0)
case State.PRIMED_SELL:
sm.feed("yellow", 1.0)
sm.feed("dark_red", 2.0)
assert sm.state == target, f"Setup failed: wanted {target}, got {sm.state}"
return sm
@pytest.mark.parametrize("state", ALL_STATES)
@pytest.mark.parametrize("color", ALL_COLORS)
def test_exhaustive(state: State, color: DotColor) -> None:
sm = _sm_in_state(state)
t = sm.feed(color, 10.0)
# (a) resulting state is valid
assert t.next in VALID_STATES, f"Invalid next state: {t.next}"
# (b) reason is non-empty
assert t.reason, f"Empty reason for ({state}, {color})"
# (c) if fire, trigger matches direction
if t.reason == "fire":
expected_dir = FIRE_DIRECTIONS.get(state.value)
assert t.trigger == expected_dir, (
f"Wrong trigger for fire from {state}: got {t.trigger}, expected {expected_dir}"
)