All 12 modules built per reviewed plan: - detector, state_machine (5-state phased FSM), canary, levels Phase B - notifier fanout (Discord + Telegram, bounded queue, retry, dead-letter) - audit (JSONL daily rotation), journal, report (weekly R-multiple PnL) - calibrate + labeler (Tk, lazy-imported), dryrun with acceptance gate - unified CLI: atm calibrate|label|dryrun|run|journal|report README + Phase 2 prop-firm TOS audit checklist included. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
199 lines
6.0 KiB
Python
199 lines
6.0 KiB
Python
"""Tests for src/atm/detector.py."""
|
|
from __future__ import annotations
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from atm.config import (
|
|
CanaryRegion,
|
|
ColorSpec,
|
|
Config,
|
|
DiscordCfg,
|
|
ROI,
|
|
TelegramCfg,
|
|
YAxisCalib,
|
|
)
|
|
from atm.detector import DetectionResult, Detector
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
DOT_ROI = ROI(x=10, y=10, w=280, h=80)
|
|
BG_VAL = 18 # background pixel value (18, 18, 18)
|
|
|
|
# BGR values (OpenCV convention: B, G, R)
|
|
# turquoise RGB=(0,255,255) → BGR=(255,255,0)
|
|
# yellow RGB=(255,255,0) → BGR=(0,255,255)
|
|
TURQUOISE_BGR = (255, 255, 0)
|
|
YELLOW_BGR = (0, 255, 255)
|
|
# A purple-ish colour far from every palette entry (RGB=(100,150,50))
|
|
UNKNOWN_BGR = (50, 150, 100)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_frame(*dot_specs: tuple[tuple[int, int, int], int, int]) -> np.ndarray:
|
|
"""Create a (100, 300, 3) uint8 BGR frame filled with background.
|
|
|
|
Each spec is (bgr_color, roi_x_start, roi_x_end) and paints a
|
|
full-height stripe inside DOT_ROI. roi_x_end=280 reaches the right
|
|
boundary so pixel_rgb sampling stays within the dot.
|
|
"""
|
|
frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8)
|
|
for bgr, x0, x1 in dot_specs:
|
|
fx0 = DOT_ROI.x + x0
|
|
fx1 = DOT_ROI.x + x1
|
|
fy0 = DOT_ROI.y
|
|
fy1 = DOT_ROI.y + DOT_ROI.h
|
|
frame[fy0:fy1, fx0:fx1] = bgr
|
|
return frame
|
|
|
|
|
|
def _make_cfg(debounce_depth: int = 1) -> Config:
|
|
colors = {
|
|
"turquoise": ColorSpec(rgb=(0, 255, 255), tolerance=30.0),
|
|
"yellow": ColorSpec(rgb=(255, 255, 0), tolerance=30.0),
|
|
"dark_green": ColorSpec(rgb=(0, 100, 0), tolerance=30.0),
|
|
"dark_red": ColorSpec(rgb=(100, 0, 0), tolerance=30.0),
|
|
"light_green": ColorSpec(rgb=(0, 255, 0), tolerance=30.0),
|
|
"light_red": ColorSpec(rgb=(255, 0, 0), tolerance=30.0),
|
|
"gray": ColorSpec(rgb=(128, 128, 128), tolerance=30.0),
|
|
}
|
|
return Config(
|
|
window_title="test",
|
|
dot_roi=DOT_ROI,
|
|
chart_roi=ROI(x=0, y=0, w=600, h=400),
|
|
colors=colors,
|
|
y_axis=YAxisCalib(p1_y=0, p1_price=100.0, p2_y=400, p2_price=80.0),
|
|
canary=CanaryRegion(
|
|
roi=ROI(x=0, y=0, w=50, h=50),
|
|
baseline_phash="0" * 64,
|
|
drift_threshold=8,
|
|
),
|
|
discord=DiscordCfg(webhook_url="http://example.com/hook"),
|
|
telegram=TelegramCfg(bot_token="tok", chat_id="123"),
|
|
debounce_depth=debounce_depth,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_empty_roi_no_dot() -> None:
|
|
"""All-background frame → dot not found."""
|
|
frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8)
|
|
cfg = _make_cfg()
|
|
det = Detector(cfg, capture=lambda: frame)
|
|
|
|
r = det.step(0.0)
|
|
|
|
assert r.window_found is True
|
|
assert r.dot_found is False
|
|
assert r.rgb is None
|
|
assert r.match is None
|
|
assert r.accepted is False
|
|
|
|
|
|
def test_rightmost_cluster() -> None:
|
|
"""Two dots at different x positions → detector returns rightmost colour."""
|
|
# turquoise on the left, yellow extending to the right ROI edge
|
|
frame = _make_frame(
|
|
(TURQUOISE_BGR, 50, 100), # roi_x [50, 100)
|
|
(YELLOW_BGR, 200, 280), # roi_x [200, 280) → right edge
|
|
)
|
|
cfg = _make_cfg()
|
|
det = Detector(cfg, capture=lambda: frame)
|
|
|
|
r = det.step(0.0)
|
|
|
|
assert r.dot_found is True
|
|
assert r.match is not None
|
|
assert r.match.name == "yellow"
|
|
|
|
|
|
def test_debounce_depth_1() -> None:
|
|
"""depth=1: single valid frame → accepted=True."""
|
|
frame = _make_frame((YELLOW_BGR, 200, 280))
|
|
cfg = _make_cfg(debounce_depth=1)
|
|
det = Detector(cfg, capture=lambda: frame)
|
|
|
|
r = det.step(0.0)
|
|
|
|
assert r.accepted is True
|
|
assert r.color == "yellow"
|
|
|
|
|
|
def test_debounce_depth_2() -> None:
|
|
"""depth=2: first frame → accepted=False; second same → accepted=True."""
|
|
frame = _make_frame((YELLOW_BGR, 200, 280))
|
|
cfg = _make_cfg(debounce_depth=2)
|
|
det = Detector(cfg, capture=lambda: frame)
|
|
|
|
r1 = det.step(0.0)
|
|
r2 = det.step(1.0)
|
|
|
|
assert r1.accepted is False
|
|
assert r2.accepted is True
|
|
assert r2.color == "yellow"
|
|
|
|
|
|
def test_debounce_reset_on_change() -> None:
|
|
"""depth=2: A then B → neither accepted."""
|
|
frame_a = _make_frame((TURQUOISE_BGR, 200, 280))
|
|
frame_b = _make_frame((YELLOW_BGR, 200, 280))
|
|
cfg = _make_cfg(debounce_depth=2)
|
|
frames = iter([frame_a, frame_b])
|
|
det = Detector(cfg, capture=lambda: next(frames))
|
|
|
|
r1 = det.step(0.0)
|
|
r2 = det.step(1.0)
|
|
|
|
assert r1.accepted is False
|
|
assert r2.accepted is False
|
|
|
|
|
|
def test_unknown_not_accepted() -> None:
|
|
"""Colour outside every palette tolerance → UNKNOWN, accepted=False."""
|
|
frame = _make_frame((UNKNOWN_BGR, 200, 280))
|
|
cfg = _make_cfg(debounce_depth=1)
|
|
det = Detector(cfg, capture=lambda: frame)
|
|
|
|
r = det.step(0.0)
|
|
|
|
assert r.dot_found is True
|
|
assert r.match is not None
|
|
assert r.match.name == "UNKNOWN"
|
|
assert r.accepted is False
|
|
assert r.color is None
|
|
|
|
|
|
def test_window_lost() -> None:
|
|
"""capture() returns None → window_found=False, safe defaults."""
|
|
cfg = _make_cfg()
|
|
det = Detector(cfg, capture=lambda: None)
|
|
|
|
r = det.step(0.0)
|
|
|
|
assert r.window_found is False
|
|
assert r.dot_found is False
|
|
assert r.rgb is None
|
|
assert r.match is None
|
|
assert r.accepted is False
|
|
assert r.color is None
|
|
|
|
|
|
def test_rolling_window() -> None:
|
|
"""Rolling window never exceeds 20 entries."""
|
|
frame = _make_frame((YELLOW_BGR, 200, 280))
|
|
cfg = _make_cfg()
|
|
det = Detector(cfg, capture=lambda: frame)
|
|
|
|
for i in range(25):
|
|
det.step(float(i))
|
|
|
|
assert len(det.rolling) <= 20
|
|
assert len(det.rolling) == 20
|