Compare commits
5 Commits
d7305fbbfc
...
worktree-a
| Author | SHA1 | Date | |
|---|---|---|---|
| 3cb647e084 | |||
|
|
8ff31ed241 | ||
|
|
eca2b39e64 | ||
|
|
51e98ae3d3 | ||
|
|
840c23f74c |
@@ -181,6 +181,7 @@ atm report --week 2026-16 # weekly win rate + R PnL + slippage
|
||||
| `WARN: no window contains 'xxx'` at startup | `cfg.window_title` substring matches nothing | Edit `window_title` in TOML to a substring that's unique to TradeStation. |
|
||||
| No alerts even after trigger ought to fire | Check `logs/YYYY-MM-DD.jsonl` for `event=tick` entries — are colours accepted? Is `trigger` ever set? | If always UNKNOWN → tolerances too tight. If `trigger` but `locked=true` → lockout from prior fire, normal. |
|
||||
| Discord OK, Telegram silent (or vice versa) | `logs/dead_letter.jsonl` contains failed alerts with error | Fix credentials in TOML, restart. |
|
||||
| Heartbeat shows `telegram: failed > 0` | Telegram returned `ok:false` (bot blocked, invalid chat_id, parse error) | Check `logs/dead_letter.jsonl` for the `error_str` / `description` field. Common: bot never started by user in Telegram, or wrong `chat_id` flavor (channel vs group vs DM). |
|
||||
| Debug circle on mid-strip instead of right edge | Anti-aliasing bridges dots in the mask | Already fixed via erosion+connected-components — ensure `git pull` is current. |
|
||||
| Wizard window is tiny / image not visible | Tk geometry default on Windows | Already fixed — `git pull`. Image is scaled to fit screen. |
|
||||
|
||||
|
||||
@@ -80,3 +80,12 @@ low_conf_threshold = 0.2
|
||||
low_conf_run = 3
|
||||
phaseb_timeout_s = 600
|
||||
dead_letter_path = "logs/dead_letter.jsonl"
|
||||
|
||||
# Per-kind screenshot-attach toggles. All default to true on upgrade.
|
||||
# Accepts either a bare bool (legacy: attach_screenshots = true) or this table.
|
||||
[options.attach_screenshots]
|
||||
late_start = true # screenshot on startup-late alerts
|
||||
catchup = true # screenshot on mid-session catchup arm + prime
|
||||
arm = true # screenshot on normal arm (turquoise/yellow) — noisiest
|
||||
prime = true # screenshot on normal prime (dark_green/dark_red)
|
||||
trigger = true # screenshot on FIRE
|
||||
|
||||
33
samples/calibration_labels.README.md
Normal file
33
samples/calibration_labels.README.md
Normal file
@@ -0,0 +1,33 @@
|
||||
# calibration_labels.json — schema
|
||||
|
||||
Used by `atm validate-calibration` to check that the current color calibration
|
||||
classifies known-good screenshots correctly before a live session.
|
||||
|
||||
## Schema
|
||||
|
||||
A JSON array of entries. Each entry:
|
||||
|
||||
| Field | Type | Required | Description |
|
||||
|------------|---------|----------|----------------------------------------------------------------|
|
||||
| `path` | string | yes | Path to a PNG frame (relative to CWD or absolute). |
|
||||
| `expected` | string | yes | Expected color name: one of `turquoise`, `yellow`, `dark_green`, `dark_red`, `light_green`, `light_red`, `gray`. |
|
||||
| `note` | string | no | Freeform annotation; shown in SUGGESTIONS output. |
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
atm validate-calibration samples/calibration_labels.json
|
||||
```
|
||||
|
||||
Exit codes:
|
||||
- `0` — every sample PASS
|
||||
- `1` — one or more FAIL
|
||||
- `2` — label file missing or malformed JSON
|
||||
|
||||
## Adding new samples
|
||||
|
||||
1. Find a screenshot in `logs/fires/` whose dot color you can verify by eye.
|
||||
2. Append an entry with `path`, `expected`, and an optional `note`.
|
||||
3. Re-run validation. If it FAILs, the SUGGESTIONS section will tell you the
|
||||
RGB distance between the observed pixel and the expected color's center —
|
||||
use that as input for `atm calibrate`.
|
||||
17
samples/calibration_labels.json
Normal file
17
samples/calibration_labels.json
Normal file
@@ -0,0 +1,17 @@
|
||||
[
|
||||
{
|
||||
"path": "logs/fires/20260417_201500_arm_sell.png",
|
||||
"expected": "yellow",
|
||||
"note": "first arm of SELL cycle 2026-04-17"
|
||||
},
|
||||
{
|
||||
"path": "logs/fires/20260417_205302_ss.png",
|
||||
"expected": "dark_red",
|
||||
"note": "user confirmed via screenshot (missed live alert)"
|
||||
},
|
||||
{
|
||||
"path": "logs/fires/20260417_210441_ss.png",
|
||||
"expected": "light_red",
|
||||
"note": "fire phase (missed live alert)"
|
||||
}
|
||||
]
|
||||
@@ -2,7 +2,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import tomllib
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
@@ -84,6 +84,16 @@ class TelegramCfg:
|
||||
raise ValueError("telegram bot_token + chat_id required")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AlertsCfg:
|
||||
"""Per-kind screenshot-attach toggles. Default all True — zero behavior change."""
|
||||
late_start: bool = True
|
||||
catchup: bool = True
|
||||
arm: bool = True
|
||||
prime: bool = True
|
||||
trigger: bool = True
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Config:
|
||||
window_title: str
|
||||
@@ -103,6 +113,7 @@ class Config:
|
||||
low_conf_run: int = 3
|
||||
phaseb_timeout_s: int = 600
|
||||
dead_letter_path: str = "logs/dead_letter.jsonl"
|
||||
attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg)
|
||||
config_version: str = "unknown"
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
@@ -153,6 +164,18 @@ class Config:
|
||||
region = None
|
||||
if "chart_window_region" in data:
|
||||
region = ROI(**data["chart_window_region"])
|
||||
scr = opts.get("attach_screenshots")
|
||||
if isinstance(scr, bool):
|
||||
attach = AlertsCfg(
|
||||
late_start=scr, catchup=scr, arm=scr, prime=scr, trigger=scr,
|
||||
)
|
||||
elif isinstance(scr, dict):
|
||||
fields_set = set(AlertsCfg.__dataclass_fields__)
|
||||
attach = AlertsCfg(
|
||||
**{k: bool(v) for k, v in scr.items() if k in fields_set}
|
||||
)
|
||||
else:
|
||||
attach = AlertsCfg()
|
||||
return cls(
|
||||
window_title=data["window_title"],
|
||||
dot_roi=roi,
|
||||
@@ -171,5 +194,6 @@ class Config:
|
||||
low_conf_run=int(opts.get("low_conf_run", 3)),
|
||||
phaseb_timeout_s=int(opts.get("phaseb_timeout_s", 600)),
|
||||
dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"),
|
||||
attach_screenshots=attach,
|
||||
config_version=version,
|
||||
)
|
||||
|
||||
259
src/atm/main.py
259
src/atm/main.py
@@ -7,7 +7,7 @@ import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Protocol, cast
|
||||
from typing import TYPE_CHECKING, Callable, Protocol, cast
|
||||
|
||||
from atm.config import Config # stdlib-only (tomllib); safe at module level
|
||||
from atm.notifier import Alert
|
||||
@@ -17,6 +17,12 @@ if TYPE_CHECKING:
|
||||
from atm.state_machine import DotColor
|
||||
|
||||
|
||||
# Snapshot closure: called by _handle_tick when an alert is about to fire.
|
||||
# Takes (kind, label); returns path to annotated screenshot or None (gated off,
|
||||
# cv2 missing, or write failure — in which case the alert still sends text-only).
|
||||
Snapshot = Callable[[str, str], "Path | None"]
|
||||
|
||||
|
||||
class _NotifierLike(Protocol):
|
||||
def send(self, alert: Alert) -> None: ...
|
||||
|
||||
@@ -109,6 +115,16 @@ def main(argv=None) -> None:
|
||||
metavar="PATH", help="Journal JSONL file (default: trades.jsonl)",
|
||||
)
|
||||
|
||||
# validate-calibration
|
||||
p_valid = sub.add_parser(
|
||||
"validate-calibration",
|
||||
help="Offline: run Detector on labeled frames and report PASS/FAIL",
|
||||
)
|
||||
p_valid.add_argument(
|
||||
"label_file", type=Path, metavar="LABEL_FILE",
|
||||
help="JSON array with [{path, expected, note?}, ...] entries",
|
||||
)
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
_dispatch = {
|
||||
@@ -119,6 +135,7 @@ def main(argv=None) -> None:
|
||||
"debug": _cmd_debug,
|
||||
"journal": _cmd_journal,
|
||||
"report": _cmd_report,
|
||||
"validate-calibration": _cmd_validate_calibration,
|
||||
}
|
||||
_dispatch[args.command](args)
|
||||
|
||||
@@ -331,10 +348,82 @@ def _cmd_report(args) -> None:
|
||||
)
|
||||
|
||||
|
||||
def _cmd_validate_calibration(args) -> None:
|
||||
"""Run offline calibration validation; exit 0 on 100% PASS, 1 otherwise."""
|
||||
try:
|
||||
from atm.validate import validate_calibration, ValidationError
|
||||
except ImportError as exc:
|
||||
sys.exit(f"validate module not available: {exc}")
|
||||
|
||||
label_file = Path(args.label_file)
|
||||
try:
|
||||
cfg = Config.load_current(Path("configs"))
|
||||
except FileNotFoundError as exc:
|
||||
sys.exit(f"config not found: {exc}")
|
||||
|
||||
try:
|
||||
config_name = ""
|
||||
cur_ptr = Path("configs") / "current.txt"
|
||||
if cur_ptr.exists():
|
||||
config_name = cur_ptr.read_text(encoding="utf-8").strip()
|
||||
except Exception:
|
||||
config_name = ""
|
||||
|
||||
try:
|
||||
report = validate_calibration(label_file, cfg, config_name=config_name)
|
||||
except ValidationError as exc:
|
||||
print(f"error: {exc}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
print(report.render())
|
||||
sys.exit(0 if report.all_pass else 1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Live loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _save_annotated_frame(
|
||||
frame,
|
||||
cfg,
|
||||
fires_dir: Path,
|
||||
label: str,
|
||||
now: float,
|
||||
audit: _AuditLike | None = None,
|
||||
) -> "Path | None":
|
||||
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
|
||||
|
||||
Returns the path on success, ``None`` on any error. Failures are logged to
|
||||
audit (when provided) so disk-full / permission issues don't become silent
|
||||
regressions. Never raises — snapshot is a best-effort enhancement, the
|
||||
text alert must still go out.
|
||||
"""
|
||||
try:
|
||||
import cv2 # type: ignore[import-untyped]
|
||||
except ImportError as exc:
|
||||
if audit is not None:
|
||||
try:
|
||||
audit.log({"ts": now, "event": "snapshot_fail", "label": label, "error": f"cv2 missing: {exc}"})
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
try:
|
||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||
fpath = fires_dir / f"{ts_str}_{label}.png"
|
||||
annotated = frame.copy()
|
||||
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
|
||||
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
|
||||
cv2.imwrite(str(fpath), annotated)
|
||||
return fpath
|
||||
except Exception as exc:
|
||||
if audit is not None:
|
||||
try:
|
||||
audit.log({"ts": now, "event": "snapshot_fail", "label": label, "error": str(exc)})
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _handle_tick(
|
||||
fsm: StateMachine,
|
||||
color: str,
|
||||
@@ -342,16 +431,23 @@ def _handle_tick(
|
||||
notifier: _NotifierLike,
|
||||
audit: _AuditLike,
|
||||
first_accepted: bool,
|
||||
snapshot: Snapshot | None = None,
|
||||
) -> Transition | None:
|
||||
"""Feed FSM for a single accepted color and dispatch arm/prime/late_start
|
||||
alerts. Returns the final Transition, or None when the color triggered a
|
||||
late-start short-circuit (FSM untouched; caller should skip FIRE handling).
|
||||
|
||||
When ``snapshot`` is provided it is called with ``(kind, label)`` for each
|
||||
alert; it returns the PNG path to attach or None (gated off / write failed).
|
||||
Default None keeps test call sites numpy-free.
|
||||
|
||||
Pure in the sense that all state lives in the arguments — safe to unit-test
|
||||
with a FakeNotifier and FakeAudit.
|
||||
"""
|
||||
# Late start: the very first accepted color is already at FIRE phase.
|
||||
# User came online after the trade signal fired — warn and skip FSM feed.
|
||||
snap: Snapshot = snapshot or (lambda _k, _l: None)
|
||||
|
||||
# Pornire târzie: prima culoare acceptată e deja în faza FIRE.
|
||||
# Utilizatorul a venit online după ce semnalul s-a declanșat — avertizare fără feed FSM.
|
||||
if first_accepted and color in ("light_green", "light_red") and fsm.state == State.IDLE:
|
||||
direction = "BUY" if color == "light_green" else "SELL"
|
||||
audit.log({
|
||||
@@ -361,38 +457,47 @@ def _handle_tick(
|
||||
})
|
||||
notifier.send(Alert(
|
||||
kind="late_start",
|
||||
title=f"ATM started late — {direction} already fired",
|
||||
body=f"Observed {color} at startup. Check chart manually.",
|
||||
title=f"ATM pornit târziu — {direction} deja declanșat",
|
||||
body=f"Detectat {color} la pornire. Verifică graficul manual.",
|
||||
image_path=snap("late_start", f"late_start_{direction.lower()}"),
|
||||
direction=direction,
|
||||
))
|
||||
return None
|
||||
|
||||
# Catchup synth-arm: first accepted color is already at PRIME phase.
|
||||
# Drive FSM through a synthetic arm so the real PRIME transition fires a
|
||||
# normal prime alert below. Audit entry is tagged catchup:true.
|
||||
# Recuperare synth-arm: prima culoare acceptată e deja în faza PRIME.
|
||||
# Forțează FSM printr-un arm sintetic ca tranziția reală PRIME să emită
|
||||
# alertă normală mai jos. Intrarea audit e marcată catchup:true.
|
||||
#
|
||||
# Gardă contra punctelor dark_* reziduale post-FIRE: după ce light_green
|
||||
# declanșează ciclul, FSM revine la IDLE dar punctele dark_green continuă
|
||||
# restul ferestrei de 15m. Sunt zgomot, NU un semnal nou de prime.
|
||||
# Verificarea fired_in_session(direction) suprimă synth-arm în acest caz —
|
||||
# tick-ul trece la feed-ul normal _from_idle care îl clasifică drept zgomot.
|
||||
# (Un ciclu nou autentic vine întotdeauna cu turquoise/yellow proaspăt.)
|
||||
catchup = False
|
||||
if color in ("dark_green", "dark_red") and fsm.state == State.IDLE:
|
||||
assert fsm.state == State.IDLE, "synth-arm invariant: FSM must be IDLE"
|
||||
arm_color = "turquoise" if color == "dark_green" else "yellow"
|
||||
direction = "BUY" if color == "dark_green" else "SELL"
|
||||
tr_synth = fsm.feed(cast("DotColor", arm_color), now)
|
||||
audit.log({
|
||||
"ts": now,
|
||||
"event": "tick",
|
||||
"color": arm_color,
|
||||
"state": tr_synth.next.value,
|
||||
"reason": tr_synth.reason,
|
||||
"catchup": True,
|
||||
"synthesized_from": color,
|
||||
})
|
||||
notifier.send(Alert(
|
||||
kind="arm",
|
||||
title=f"{direction} armed ({arm_color}) — catchup",
|
||||
body=f"catchup — session already armed at startup "
|
||||
f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
direction=direction,
|
||||
))
|
||||
catchup = True
|
||||
if not fsm.fired_in_session(direction):
|
||||
arm_color = "turquoise" if color == "dark_green" else "yellow"
|
||||
tr_synth = fsm.feed(cast("DotColor", arm_color), now)
|
||||
audit.log({
|
||||
"ts": now,
|
||||
"event": "tick",
|
||||
"color": arm_color,
|
||||
"state": tr_synth.next.value,
|
||||
"reason": tr_synth.reason,
|
||||
"catchup": True,
|
||||
"synthesized_from": color,
|
||||
})
|
||||
notifier.send(Alert(
|
||||
kind="arm",
|
||||
title=f"{direction} armat ({arm_color}) — recuperare",
|
||||
body=f"recuperare — sesiunea era deja armată la pornire "
|
||||
f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=snap("catchup", f"catchup_arm_{direction.lower()}"),
|
||||
direction=direction,
|
||||
))
|
||||
catchup = True
|
||||
|
||||
# Normal FSM feed
|
||||
tr = fsm.feed(cast("DotColor", color), now)
|
||||
@@ -407,23 +512,27 @@ def _handle_tick(
|
||||
tick_event["catchup"] = True
|
||||
audit.log(tick_event)
|
||||
|
||||
# ARM: turquoise (BUY) / yellow (SELL) — only on fresh IDLE→ARMED
|
||||
# ARM: turquoise (BUY) / yellow (SELL) — doar la tranziție nouă IDLE→ARMED
|
||||
if tr.reason == "arm":
|
||||
direction = "BUY" if tr.next == State.ARMED_BUY else "SELL"
|
||||
notifier.send(Alert(
|
||||
kind="arm",
|
||||
title=f"{direction} armed ({color})",
|
||||
title=f"{direction} armat ({color})",
|
||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=snap("arm", f"arm_{direction.lower()}"),
|
||||
direction=direction,
|
||||
))
|
||||
# PRIME: dark_green (BUY) / dark_red (SELL) — only on ARMED→PRIMED
|
||||
# PRIME: dark_green (BUY) / dark_red (SELL) — doar la ARMED→PRIMED
|
||||
elif tr.reason == "prime":
|
||||
direction = "BUY" if tr.next == State.PRIMED_BUY else "SELL"
|
||||
suffix = " — catchup" if catchup else ""
|
||||
suffix = " — recuperare" if catchup else ""
|
||||
prime_kind = "catchup" if catchup else "prime"
|
||||
prime_label = f"prime_{direction.lower()}_catchup" if catchup else f"prime_{direction.lower()}"
|
||||
notifier.send(Alert(
|
||||
kind="prime",
|
||||
title=f"{direction} primed ({color}){suffix}",
|
||||
title=f"{direction} pregătit ({color}){suffix}",
|
||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=snap(prime_kind, prime_label),
|
||||
direction=direction,
|
||||
))
|
||||
return tr
|
||||
@@ -452,9 +561,20 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
DiscordNotifier(cfg.discord.webhook_url),
|
||||
TelegramNotifier(cfg.telegram.bot_token, cfg.telegram.chat_id),
|
||||
]
|
||||
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path))
|
||||
|
||||
# Sanity check: capture one frame, confirm canary matches calibration.
|
||||
def _on_drop(backend_name: str, dropped: Alert) -> None:
|
||||
"""Audit la depășire coadă — face eșecul silențios vizibil."""
|
||||
audit.log({
|
||||
"ts": time.time(),
|
||||
"event": "queue_overflow_drop",
|
||||
"backend": backend_name,
|
||||
"dropped_title": dropped.title,
|
||||
"dropped_kind": dropped.kind,
|
||||
})
|
||||
|
||||
notifier = FanoutNotifier(backends, Path(cfg.dead_letter_path), on_drop=_on_drop)
|
||||
|
||||
# Verificare inițială: captură un frame, confirmă că canary se potrivește cu calibrarea.
|
||||
first_frame = capture()
|
||||
if first_frame is None:
|
||||
print("WARN: first capture returned None — window/region missing", flush=True)
|
||||
@@ -469,7 +589,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
dur_note = f" dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
|
||||
notifier.send(Alert(
|
||||
kind="heartbeat",
|
||||
title="ATM started",
|
||||
title="ATM pornit",
|
||||
body=(
|
||||
f"cfg={cfg.config_version}{dur_note} @ {datetime.now().isoformat(timespec='seconds')}\n"
|
||||
f"canary: {canary_status}"
|
||||
@@ -478,7 +598,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
|
||||
|
||||
start = time.monotonic()
|
||||
heartbeat_due = start + cfg.heartbeat_min * 60
|
||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
||||
levels_extractor = None
|
||||
last_saved_color: str | None = None
|
||||
first_accepted = True
|
||||
@@ -519,12 +639,24 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
if res.accepted and res.color:
|
||||
is_first = first_accepted
|
||||
first_accepted = False
|
||||
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first)
|
||||
|
||||
# Per-iteration closure — binds current frame/now, gates on config.
|
||||
def _snapshot(kind: str, label: str) -> "Path | None":
|
||||
if not getattr(cfg.attach_screenshots, kind, True):
|
||||
return None
|
||||
return _save_annotated_frame(
|
||||
frame, cfg, fires_dir, label, now, audit=audit,
|
||||
)
|
||||
|
||||
tr = _handle_tick(
|
||||
fsm, res.color, now, notifier, audit, is_first,
|
||||
snapshot=_snapshot,
|
||||
)
|
||||
if tr is None:
|
||||
# late_start short-circuit: FSM untouched, skip FIRE + corpus save
|
||||
# pornire târzie: FSM neatins, sari peste FIRE + salvare corpus
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
continue
|
||||
# corpus: save full frame on each new distinct color for later labeling
|
||||
# corpus: salvează frame complet la fiecare culoare nouă distinctă, pt etichetare ulterioară
|
||||
if res.color != last_saved_color:
|
||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
|
||||
@@ -533,20 +665,16 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
except Exception as exc:
|
||||
audit.log({"ts": now, "event": "sample_save_failed", "error": str(exc)})
|
||||
last_saved_color = res.color
|
||||
# FIRE: annotate frame + save, attach to alert
|
||||
# FIRE: adnotează frame-ul + salvează, atașează la alertă
|
||||
if tr.trigger and not tr.locked:
|
||||
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
|
||||
fire_path = fires_dir / f"{ts_str}_{tr.trigger}.png"
|
||||
try:
|
||||
annotated = frame.copy()
|
||||
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
|
||||
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
|
||||
cv2.imwrite(str(fire_path), annotated)
|
||||
except Exception:
|
||||
fire_path = None
|
||||
fire_path: "Path | None" = None
|
||||
if cfg.attach_screenshots.trigger:
|
||||
fire_path = _save_annotated_frame(
|
||||
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="trigger",
|
||||
title=f"{tr.trigger} signal",
|
||||
title=f"Semnal {tr.trigger}",
|
||||
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
|
||||
image_path=fire_path,
|
||||
direction=tr.trigger,
|
||||
@@ -559,7 +687,7 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
if lr.status == "complete" and lr.levels:
|
||||
notifier.send(Alert(
|
||||
kind="levels",
|
||||
title="Levels",
|
||||
title="Niveluri",
|
||||
body=(
|
||||
f"SL={lr.levels.sl} "
|
||||
f"TP1={lr.levels.tp1} "
|
||||
@@ -567,16 +695,35 @@ def run_live(cfg, duration_s=None, capture_stub: bool = False) -> None:
|
||||
),
|
||||
))
|
||||
levels_extractor = None
|
||||
# heartbeat
|
||||
# heartbeat — include statistici per-backend ca eșecurile silențioase
|
||||
# să apară la fiecare 30 min fără să aștepte oprirea.
|
||||
if time.time() > heartbeat_due:
|
||||
notifier.send(Alert(kind="heartbeat", title="alive", body="confidence ok"))
|
||||
try:
|
||||
stats = notifier.stats()
|
||||
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
|
||||
body_lines = ["confidence ok"]
|
||||
for name, s in stats.items():
|
||||
body_lines.append(
|
||||
f"{name}: sent={s['sent']} failed={s['failed']} "
|
||||
f"dropped={s['dropped']} retries={s['retries']}"
|
||||
)
|
||||
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
|
||||
except Exception:
|
||||
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
|
||||
heartbeat_due = time.time() + cfg.heartbeat_min * 60
|
||||
time.sleep(cfg.loop_interval_s)
|
||||
finally:
|
||||
try:
|
||||
stats = notifier.stats()
|
||||
lines = [f"după {time.monotonic() - start:.0f}s"]
|
||||
for name, s in stats.items():
|
||||
lines.append(
|
||||
f"{name}: sent={s['sent']} failed={s['failed']} "
|
||||
f"dropped={s['dropped']} retries={s['retries']}"
|
||||
)
|
||||
notifier.send(Alert(
|
||||
kind="heartbeat", title="ATM stopped",
|
||||
body=f"after {time.monotonic() - start:.0f}s",
|
||||
kind="heartbeat", title="ATM oprit",
|
||||
body="\n".join(lines),
|
||||
))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -7,7 +7,7 @@ import time
|
||||
from copy import copy
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
from . import Alert, Notifier
|
||||
|
||||
@@ -30,12 +30,14 @@ class FanoutNotifier:
|
||||
queue_size: int = 50,
|
||||
max_retries: int = 3,
|
||||
backoff_base: float = 0.5,
|
||||
on_drop: Callable[[str, Alert], None] | None = None,
|
||||
) -> None:
|
||||
self._backends = backends
|
||||
self._dead_letter_path = Path(dead_letter_path)
|
||||
self._queue_size = queue_size
|
||||
self._max_retries = max_retries
|
||||
self._backoff_base = backoff_base
|
||||
self._on_drop = on_drop
|
||||
self._dl_lock = threading.Lock()
|
||||
|
||||
self._queues: dict[str, queue.Queue[Any]] = {}
|
||||
@@ -62,8 +64,13 @@ class FanoutNotifier:
|
||||
stats = self._stats[backend.name]
|
||||
if q.full():
|
||||
try:
|
||||
q.get_nowait()
|
||||
dropped = q.get_nowait()
|
||||
stats.dropped += 1
|
||||
if self._on_drop is not None and isinstance(dropped, Alert):
|
||||
try:
|
||||
self._on_drop(backend.name, dropped)
|
||||
except Exception:
|
||||
pass # audit failure must never break dispatch
|
||||
except queue.Empty:
|
||||
pass
|
||||
q.put(alert_copy)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import html as _html
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -22,7 +23,7 @@ class TelegramNotifier:
|
||||
return _BASE.format(token=self._token, method=method)
|
||||
|
||||
def send(self, alert: Alert) -> None:
|
||||
text = f"*{alert.title}*\n{alert.body}"
|
||||
text = f"<b>{_html.escape(alert.title)}</b>\n{_html.escape(alert.body)}"
|
||||
|
||||
if alert.image_path and Path(alert.image_path).exists():
|
||||
with open(alert.image_path, "rb") as fh:
|
||||
@@ -31,7 +32,7 @@ class TelegramNotifier:
|
||||
data={
|
||||
"chat_id": self._chat_id,
|
||||
"caption": text,
|
||||
"parse_mode": "Markdown",
|
||||
"parse_mode": "HTML",
|
||||
},
|
||||
files={"photo": fh},
|
||||
timeout=10,
|
||||
@@ -42,7 +43,7 @@ class TelegramNotifier:
|
||||
json={
|
||||
"chat_id": self._chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "Markdown",
|
||||
"parse_mode": "HTML",
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
@@ -53,3 +54,15 @@ class TelegramNotifier:
|
||||
raise RuntimeError(f"Telegram server error ({resp.status_code}): {resp.text}")
|
||||
if resp.status_code != 200:
|
||||
raise RuntimeError(f"Telegram unexpected status ({resp.status_code}): {resp.text}")
|
||||
|
||||
# Telegram Bot API returns 200 OK with {"ok": false, ...} for several
|
||||
# logical failures (bot blocked, invalid chat_id, parse_mode errors).
|
||||
# Parse body and raise on ok:false so Fanout retries + DLQs + stats count it.
|
||||
try:
|
||||
body = resp.json()
|
||||
except ValueError:
|
||||
return # non-JSON body — treat as success (shouldn't happen in practice)
|
||||
if isinstance(body, dict) and body.get("ok") is False:
|
||||
code = body.get("error_code", "?")
|
||||
desc = body.get("description", "unknown")
|
||||
raise RuntimeError(f"Telegram logical failure ({code}): {desc}")
|
||||
|
||||
@@ -59,6 +59,16 @@ class StateMachine:
|
||||
def state(self) -> State:
|
||||
return self._state
|
||||
|
||||
def fired_in_session(self, direction: str) -> bool:
|
||||
"""True if this FSM instance has recorded any FIRE in the given direction.
|
||||
|
||||
Used by catchup logic to suppress synth-arm after a cycle already fired.
|
||||
Residual dark_green / dark_red dots for the remainder of the 15m window
|
||||
look identical to startup-catchup from (color, state) alone, but must
|
||||
be classified as noise, not a new prime signal.
|
||||
"""
|
||||
return direction in self._last_fire
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Core feed
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
229
src/atm/validate.py
Normal file
229
src/atm/validate.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""Offline calibration validation: run Detector on labeled frames, report PASS/FAIL.
|
||||
|
||||
Used by the `atm validate-calibration` subcommand. Reports per-sample detection
|
||||
results against expected labels, and for failures, computes RGB distance to
|
||||
each color threshold and emits tuning suggestions.
|
||||
|
||||
Reuses `Detector.step(frame)` - does NOT reimplement color classification.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .config import Config
|
||||
|
||||
|
||||
@dataclass
|
||||
class SampleRecord:
|
||||
path: str
|
||||
expected: str
|
||||
detected: str | None
|
||||
confidence: float
|
||||
rgb: tuple[int, int, int] | None
|
||||
top3: list[tuple[str, float]] # [(name, score), ...] ranked by RGB distance
|
||||
passed: bool
|
||||
note: str = ""
|
||||
error: str | None = None # non-None if frame load failed / schema bad
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationReport:
|
||||
records: list[SampleRecord] = field(default_factory=list)
|
||||
config_name: str = ""
|
||||
|
||||
@property
|
||||
def total(self) -> int:
|
||||
return len(self.records)
|
||||
|
||||
@property
|
||||
def passed(self) -> int:
|
||||
return sum(1 for r in self.records if r.passed)
|
||||
|
||||
@property
|
||||
def failed(self) -> int:
|
||||
return self.total - self.passed
|
||||
|
||||
@property
|
||||
def all_pass(self) -> bool:
|
||||
return self.total > 0 and self.failed == 0
|
||||
|
||||
def render(self) -> str:
|
||||
lines: list[str] = []
|
||||
hdr = f"Testing {self.total} frames"
|
||||
if self.config_name:
|
||||
hdr += f" against config {self.config_name}"
|
||||
hdr += "..."
|
||||
lines.append(hdr)
|
||||
lines.append("")
|
||||
|
||||
for r in self.records:
|
||||
name = Path(r.path).name or r.path
|
||||
if r.error:
|
||||
lines.append(f" [FAIL] {name}")
|
||||
lines.append(f" error: {r.error}")
|
||||
continue
|
||||
tag = "PASS" if r.passed else "FAIL"
|
||||
rgb_str = f"RGB {r.rgb}" if r.rgb is not None else "RGB n/a"
|
||||
detected = r.detected if r.detected is not None else "none"
|
||||
lines.append(f" [{tag}] {name}")
|
||||
lines.append(
|
||||
f" expected={r.expected} detected={detected} "
|
||||
f"(conf {r.confidence:.2f}, {rgb_str})"
|
||||
)
|
||||
if not r.passed and r.top3:
|
||||
top3_str = " ".join(f"{n}({c:.2f})" for n, c in r.top3)
|
||||
lines.append(f" Top 3 candidates: {top3_str}")
|
||||
|
||||
lines.append("")
|
||||
pct = (self.passed / self.total * 100.0) if self.total else 0.0
|
||||
lines.append(f"SUMMARY: {self.passed}/{self.total} PASS ({pct:.0f}%)")
|
||||
|
||||
fails = [r for r in self.records if not r.passed]
|
||||
if fails:
|
||||
lines.append("FAILED:")
|
||||
for r in fails:
|
||||
name = Path(r.path).name or r.path
|
||||
if r.error:
|
||||
lines.append(f" - {name}: {r.error}")
|
||||
continue
|
||||
detected = r.detected if r.detected is not None else "none"
|
||||
lines.append(
|
||||
f" - {name}: expected {r.expected}, got {detected}"
|
||||
)
|
||||
|
||||
sug_lines = [
|
||||
r._suggestion # type: ignore[attr-defined]
|
||||
for r in fails
|
||||
if getattr(r, "_suggestion", "")
|
||||
]
|
||||
if sug_lines:
|
||||
lines.append("")
|
||||
lines.append("SUGGESTIONS:")
|
||||
for s in sug_lines:
|
||||
lines.append(f" - {s}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.render()
|
||||
|
||||
|
||||
class ValidationError(Exception):
|
||||
"""Raised for missing label files or invalid schema."""
|
||||
|
||||
|
||||
def _rgb_distance(a: tuple[int, int, int], b: tuple[int, int, int]) -> float:
|
||||
return math.sqrt(sum((a[i] - b[i]) ** 2 for i in range(3)))
|
||||
|
||||
|
||||
def _load_labels(label_file: Path) -> list[dict[str, Any]]:
|
||||
if not label_file.exists():
|
||||
raise ValidationError(f"label file not found: {label_file}")
|
||||
try:
|
||||
data = json.loads(label_file.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError as exc:
|
||||
raise ValidationError(f"invalid JSON in {label_file}: {exc}") from exc
|
||||
if not isinstance(data, list):
|
||||
raise ValidationError(
|
||||
f"label file must be a JSON array; got {type(data).__name__}"
|
||||
)
|
||||
return data
|
||||
|
||||
|
||||
def validate_calibration(
|
||||
label_file: Path,
|
||||
cfg: Config,
|
||||
config_name: str = "",
|
||||
) -> ValidationReport:
|
||||
"""Run Detector on each labeled frame; return a ValidationReport.
|
||||
|
||||
Reuses `Detector.step(frame)`. Loads frames via cv2.imread.
|
||||
Raises ValidationError if the label file is missing or malformed.
|
||||
"""
|
||||
import cv2 # local import keeps module import cheap
|
||||
from .detector import Detector
|
||||
|
||||
entries = _load_labels(label_file)
|
||||
report = ValidationReport(config_name=config_name)
|
||||
|
||||
palette = {
|
||||
name: spec.rgb
|
||||
for name, spec in cfg.colors.items()
|
||||
if name != "background"
|
||||
}
|
||||
|
||||
detector = Detector(cfg=cfg, capture=lambda: None)
|
||||
|
||||
for entry in entries:
|
||||
path = str(entry.get("path", ""))
|
||||
expected = str(entry.get("expected", ""))
|
||||
note = str(entry.get("note", ""))
|
||||
|
||||
if not path or not expected:
|
||||
rec = SampleRecord(
|
||||
path=path, expected=expected, detected=None, confidence=0.0,
|
||||
rgb=None, top3=[], passed=False, note=note,
|
||||
error="missing 'path' or 'expected' field",
|
||||
)
|
||||
rec._suggestion = "" # type: ignore[attr-defined]
|
||||
report.records.append(rec)
|
||||
continue
|
||||
|
||||
frame = cv2.imread(path)
|
||||
if frame is None:
|
||||
rec = SampleRecord(
|
||||
path=path, expected=expected, detected=None, confidence=0.0,
|
||||
rgb=None, top3=[], passed=False, note=note,
|
||||
error=f"cv2.imread failed for {path}",
|
||||
)
|
||||
rec._suggestion = "" # type: ignore[attr-defined]
|
||||
report.records.append(rec)
|
||||
continue
|
||||
|
||||
result = detector.step(ts=0.0, frame=frame)
|
||||
|
||||
match = result.match
|
||||
if match is None:
|
||||
detected: str | None = None
|
||||
confidence = 0.0
|
||||
else:
|
||||
detected = match.name if match.name != "UNKNOWN" else None
|
||||
confidence = match.confidence
|
||||
|
||||
rgb = result.rgb
|
||||
|
||||
# Top 3 candidates: rank palette entries by RGB distance to observed.
|
||||
top3: list[tuple[str, float]] = []
|
||||
if rgb is not None:
|
||||
scored: list[tuple[str, float]] = []
|
||||
for name, ref in palette.items():
|
||||
scored.append((name, _rgb_distance(rgb, ref)))
|
||||
scored.sort(key=lambda t: t[1])
|
||||
top3 = [(n, 1.0 / (1.0 + d / 20.0)) for n, d in scored[:3]]
|
||||
|
||||
passed = detected == expected
|
||||
|
||||
rec = SampleRecord(
|
||||
path=path, expected=expected, detected=detected,
|
||||
confidence=confidence, rgb=rgb, top3=top3, passed=passed, note=note,
|
||||
)
|
||||
|
||||
if not passed and rgb is not None and expected in palette:
|
||||
ref = palette[expected]
|
||||
tol = cfg.colors[expected].tolerance
|
||||
dist = _rgb_distance(rgb, ref)
|
||||
rec._suggestion = ( # type: ignore[attr-defined]
|
||||
f"{expected} praguri curente: RGB{ref} +/- {tol:.0f}. "
|
||||
f"Pixelul observat {rgb} e la distanta {dist:.1f} "
|
||||
f"-> recalibreaza cu acest sample."
|
||||
)
|
||||
else:
|
||||
rec._suggestion = "" # type: ignore[attr-defined]
|
||||
|
||||
report.records.append(rec)
|
||||
|
||||
return report
|
||||
99
tests/test_config.py
Normal file
99
tests/test_config.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Tests for atm.config — focused on attach_screenshots parsing (legacy bool vs new dict)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from atm.config import AlertsCfg, Config
|
||||
|
||||
|
||||
_BASE = {
|
||||
"window_title": "X",
|
||||
"dot_roi": {"x": 0, "y": 0, "w": 10, "h": 10},
|
||||
"chart_roi": {"x": 0, "y": 0, "w": 100, "h": 100},
|
||||
"colors": {
|
||||
"turquoise": {"rgb": [64, 224, 208], "tolerance": 30.0},
|
||||
"yellow": {"rgb": [255, 215, 0], "tolerance": 30.0},
|
||||
"dark_green": {"rgb": [0, 100, 0], "tolerance": 30.0},
|
||||
"dark_red": {"rgb": [139, 0, 0], "tolerance": 30.0},
|
||||
"light_green": {"rgb": [0, 230, 118], "tolerance": 30.0},
|
||||
"light_red": {"rgb": [255, 82, 82], "tolerance": 30.0},
|
||||
"gray": {"rgb": [128, 128, 128], "tolerance": 25.0},
|
||||
},
|
||||
"y_axis": {"p1_y": 100, "p1_price": 485.0, "p2_y": 200, "p2_price": 484.0},
|
||||
"canary": {
|
||||
"roi": {"x": 0, "y": 0, "w": 10, "h": 10},
|
||||
"baseline_phash": "0" * 16,
|
||||
"drift_threshold": 8,
|
||||
},
|
||||
"discord": {"webhook_url": "https://example.com/hook"},
|
||||
"telegram": {"bot_token": "tok", "chat_id": "123"},
|
||||
}
|
||||
|
||||
|
||||
def _with_opts(opts: dict) -> dict:
|
||||
d = {k: v for k, v in _BASE.items()}
|
||||
d["options"] = opts
|
||||
return d
|
||||
|
||||
|
||||
def test_attach_screenshots_default_all_true() -> None:
|
||||
"""Missing attach_screenshots → all fields True."""
|
||||
cfg = Config._from_dict(_with_opts({}))
|
||||
assert cfg.attach_screenshots == AlertsCfg(
|
||||
late_start=True, catchup=True, arm=True, prime=True, trigger=True,
|
||||
)
|
||||
|
||||
|
||||
def test_attach_screenshots_legacy_bool_true() -> None:
|
||||
"""Legacy: attach_screenshots = true → all fields True."""
|
||||
cfg = Config._from_dict(_with_opts({"attach_screenshots": True}))
|
||||
assert cfg.attach_screenshots.arm is True
|
||||
assert cfg.attach_screenshots.catchup is True
|
||||
assert cfg.attach_screenshots.trigger is True
|
||||
|
||||
|
||||
def test_attach_screenshots_legacy_bool_false() -> None:
|
||||
"""Legacy: attach_screenshots = false → all fields False."""
|
||||
cfg = Config._from_dict(_with_opts({"attach_screenshots": False}))
|
||||
assert cfg.attach_screenshots.arm is False
|
||||
assert cfg.attach_screenshots.catchup is False
|
||||
assert cfg.attach_screenshots.trigger is False
|
||||
assert cfg.attach_screenshots.late_start is False
|
||||
|
||||
|
||||
def test_attach_screenshots_partial_dict() -> None:
|
||||
"""Dict form with only some keys → specified False, others default True."""
|
||||
cfg = Config._from_dict(_with_opts({
|
||||
"attach_screenshots": {"arm": False, "prime": False},
|
||||
}))
|
||||
assert cfg.attach_screenshots.arm is False
|
||||
assert cfg.attach_screenshots.prime is False
|
||||
# Unspecified → dataclass default True
|
||||
assert cfg.attach_screenshots.trigger is True
|
||||
assert cfg.attach_screenshots.catchup is True
|
||||
assert cfg.attach_screenshots.late_start is True
|
||||
|
||||
|
||||
def test_attach_screenshots_full_dict() -> None:
|
||||
"""Dict form with all keys specified."""
|
||||
cfg = Config._from_dict(_with_opts({
|
||||
"attach_screenshots": {
|
||||
"late_start": False,
|
||||
"catchup": True,
|
||||
"arm": False,
|
||||
"prime": True,
|
||||
"trigger": True,
|
||||
},
|
||||
}))
|
||||
assert cfg.attach_screenshots.late_start is False
|
||||
assert cfg.attach_screenshots.catchup is True
|
||||
assert cfg.attach_screenshots.arm is False
|
||||
assert cfg.attach_screenshots.prime is True
|
||||
assert cfg.attach_screenshots.trigger is True
|
||||
|
||||
|
||||
def test_attach_screenshots_unknown_keys_ignored() -> None:
|
||||
"""Unknown keys are silently dropped (dataclass won't accept them)."""
|
||||
cfg = Config._from_dict(_with_opts({
|
||||
"attach_screenshots": {"arm": False, "nonexistent_knob": True},
|
||||
}))
|
||||
assert cfg.attach_screenshots.arm is False
|
||||
# Should not raise even with unknown key
|
||||
@@ -92,8 +92,7 @@ def test_handle_tick_prime_buy():
|
||||
assert notif.alerts[0].kind == "arm"
|
||||
assert notif.alerts[1].kind == "prime"
|
||||
assert notif.alerts[1].direction == "BUY"
|
||||
# Non-catchup prime alert must not mention catchup
|
||||
assert "catchup" not in notif.alerts[1].title.lower()
|
||||
assert "recuperare" not in notif.alerts[1].title.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -133,10 +132,10 @@ def test_handle_tick_catchup_dark_green():
|
||||
assert len(notif.alerts) == 2
|
||||
assert notif.alerts[0].kind == "arm"
|
||||
assert notif.alerts[0].direction == "BUY"
|
||||
assert "catchup" in notif.alerts[0].title.lower() or "catchup" in notif.alerts[0].body.lower()
|
||||
assert "recuperare" in notif.alerts[0].title.lower() or "recuperare" in notif.alerts[0].body.lower()
|
||||
assert notif.alerts[1].kind == "prime"
|
||||
assert notif.alerts[1].direction == "BUY"
|
||||
assert "catchup" in notif.alerts[1].title.lower() or "catchup" in notif.alerts[1].body.lower()
|
||||
assert "recuperare" in notif.alerts[1].title.lower() or "recuperare" in notif.alerts[1].body.lower()
|
||||
|
||||
# Audit: both tick entries tagged catchup:true
|
||||
catchup_events = [e for e in audit.events if e.get("catchup")]
|
||||
@@ -216,8 +215,8 @@ def test_handle_tick_first_turquoise_no_catchup_label():
|
||||
assert len(notif.alerts) == 1
|
||||
assert notif.alerts[0].kind == "arm"
|
||||
# Decision 3A: cannot distinguish fresh arm from catchup-at-arm-phase
|
||||
assert "catchup" not in notif.alerts[0].title.lower()
|
||||
assert "catchup" not in notif.alerts[0].body.lower()
|
||||
assert "recuperare" not in notif.alerts[0].title.lower()
|
||||
assert "recuperare" not in notif.alerts[0].body.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -238,7 +237,7 @@ def test_handle_tick_catchup_dark_red_when_not_first_accepted():
|
||||
assert len(notif.alerts) == 2
|
||||
assert notif.alerts[0].kind == "arm"
|
||||
assert notif.alerts[0].direction == "SELL"
|
||||
assert "catchup" in (notif.alerts[0].title + notif.alerts[0].body).lower()
|
||||
assert "recuperare" in (notif.alerts[0].title + notif.alerts[0].body).lower()
|
||||
assert notif.alerts[1].kind == "prime"
|
||||
assert notif.alerts[1].direction == "SELL"
|
||||
|
||||
@@ -256,3 +255,234 @@ def test_handle_tick_catchup_dark_green_when_not_first_accepted():
|
||||
assert len(notif.alerts) == 2
|
||||
assert notif.alerts[0].direction == "BUY"
|
||||
assert notif.alerts[1].direction == "BUY"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression: user bug 2026-04-16 (image.png). After a BUY FIRE, the chart
|
||||
# shows residual dark_green dots for the rest of the 15m window. Those are
|
||||
# noise, NOT a new prime signal. Previously triggered false catchup arm+prime
|
||||
# because FSM returns to IDLE after fire and the catchup branch only checked
|
||||
# (color, state). Now gated on fsm.fired_in_session(direction).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_handle_tick_dark_green_after_light_green_fire_no_catchup():
|
||||
"""REGRESSION: post-FIRE dark_green must NOT re-arm catchup."""
|
||||
fsm = StateMachine(lockout_s=240)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
|
||||
# Full BUY cycle: arm → prime → fire
|
||||
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False)
|
||||
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False)
|
||||
_handle_tick(fsm, "light_green", 3.0, notif, audit, first_accepted=False)
|
||||
# After fire: FSM is IDLE, _last_fire[BUY]=3.0
|
||||
assert fsm.state == State.IDLE
|
||||
assert fsm.fired_in_session("BUY") is True
|
||||
baseline_alerts = len(notif.alerts) # arm + prime (fire alert is handled in main, not here)
|
||||
|
||||
# Residual dark_green post-FIRE — must stay IDLE, must not fire any alert
|
||||
tr = _handle_tick(fsm, "dark_green", 10.0, notif, audit, first_accepted=False)
|
||||
|
||||
assert fsm.state == State.IDLE
|
||||
assert tr is not None
|
||||
assert tr.reason == "noise"
|
||||
assert len(notif.alerts) == baseline_alerts, (
|
||||
f"post-FIRE dark_green falsely fired: "
|
||||
f"new alerts {notif.alerts[baseline_alerts:]}"
|
||||
)
|
||||
|
||||
|
||||
def test_handle_tick_dark_red_after_light_red_fire_no_catchup():
|
||||
"""REGRESSION mirror — SELL side."""
|
||||
fsm = StateMachine(lockout_s=240)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
|
||||
_handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False)
|
||||
_handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False)
|
||||
_handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False)
|
||||
assert fsm.state == State.IDLE
|
||||
assert fsm.fired_in_session("SELL") is True
|
||||
baseline_alerts = len(notif.alerts)
|
||||
|
||||
tr = _handle_tick(fsm, "dark_red", 10.0, notif, audit, first_accepted=False)
|
||||
|
||||
assert fsm.state == State.IDLE
|
||||
assert tr is not None
|
||||
assert tr.reason == "noise"
|
||||
assert len(notif.alerts) == baseline_alerts
|
||||
|
||||
|
||||
def test_handle_tick_opposite_direction_catchup_after_fire():
|
||||
"""After BUY fires, seeing dark_red at IDLE should STILL trigger SELL
|
||||
catchup (direction-scoped gate, not session-wide). Proves the gate only
|
||||
suppresses same-direction residual, not a genuine opposite-direction cycle
|
||||
the user joined mid-way."""
|
||||
fsm = StateMachine(lockout_s=240)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
|
||||
# Fire BUY cycle
|
||||
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False)
|
||||
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False)
|
||||
_handle_tick(fsm, "light_green", 3.0, notif, audit, first_accepted=False)
|
||||
assert fsm.fired_in_session("BUY") is True
|
||||
assert fsm.fired_in_session("SELL") is False
|
||||
baseline_alerts = len(notif.alerts)
|
||||
|
||||
# Now dark_red at IDLE — SELL hasn't fired, so catchup must still work
|
||||
tr = _handle_tick(fsm, "dark_red", 10.0, notif, audit, first_accepted=False)
|
||||
|
||||
assert tr is not None
|
||||
assert tr.next == State.PRIMED_SELL
|
||||
assert tr.reason == "prime"
|
||||
# synth-arm alert + real prime alert = 2 new
|
||||
assert len(notif.alerts) == baseline_alerts + 2
|
||||
assert notif.alerts[baseline_alerts].kind == "arm"
|
||||
assert notif.alerts[baseline_alerts].direction == "SELL"
|
||||
assert "recuperare" in (notif.alerts[baseline_alerts].title + notif.alerts[baseline_alerts].body).lower()
|
||||
assert notif.alerts[baseline_alerts + 1].kind == "prime"
|
||||
assert notif.alerts[baseline_alerts + 1].direction == "SELL"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Snapshot callable: _handle_tick should call snapshot(kind, label) for each
|
||||
# alert and attach the returned path to Alert.image_path. None default keeps
|
||||
# existing tests oblivious.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_handle_tick_snapshot_called_for_each_alert():
|
||||
"""snapshot callable invoked with (kind, label); returned path attached."""
|
||||
from pathlib import Path
|
||||
|
||||
fsm = StateMachine(lockout_s=60)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
calls: list[tuple[str, str]] = []
|
||||
|
||||
def snap(kind: str, label: str):
|
||||
calls.append((kind, label))
|
||||
return Path(f"/tmp/fake_{label}.png")
|
||||
|
||||
# BUY cycle arm + prime (2 alerts, 2 snapshot calls)
|
||||
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False, snapshot=snap)
|
||||
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False, snapshot=snap)
|
||||
|
||||
assert len(notif.alerts) == 2
|
||||
assert calls == [("arm", "arm_buy"), ("prime", "prime_buy")]
|
||||
assert notif.alerts[0].image_path == Path("/tmp/fake_arm_buy.png")
|
||||
assert notif.alerts[1].image_path == Path("/tmp/fake_prime_buy.png")
|
||||
|
||||
|
||||
def test_handle_tick_snapshot_none_for_gated_kind():
|
||||
"""snapshot() returning None (config-gated off) → Alert.image_path=None,
|
||||
alert still sends text-only."""
|
||||
fsm = StateMachine(lockout_s=60)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
|
||||
def snap(kind: str, label: str):
|
||||
# Simulate cfg.attach_screenshots.arm = False
|
||||
return None if kind == "arm" else __import__("pathlib").Path(f"/tmp/{label}.png")
|
||||
|
||||
_handle_tick(fsm, "turquoise", 1.0, notif, audit, first_accepted=False, snapshot=snap)
|
||||
_handle_tick(fsm, "dark_green", 2.0, notif, audit, first_accepted=False, snapshot=snap)
|
||||
|
||||
assert notif.alerts[0].image_path is None # arm gated off
|
||||
assert notif.alerts[1].image_path is not None # prime went through
|
||||
|
||||
|
||||
def test_handle_tick_snapshot_called_for_late_start():
|
||||
"""late_start path also invokes snapshot."""
|
||||
fsm = StateMachine(lockout_s=60)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
calls: list[tuple[str, str]] = []
|
||||
|
||||
def snap(kind: str, label: str):
|
||||
calls.append((kind, label))
|
||||
return None
|
||||
|
||||
_handle_tick(fsm, "light_green", 1.0, notif, audit, first_accepted=True, snapshot=snap)
|
||||
|
||||
assert len(notif.alerts) == 1
|
||||
assert notif.alerts[0].kind == "late_start"
|
||||
assert calls == [("late_start", "late_start_buy")]
|
||||
|
||||
|
||||
def test_handle_tick_snapshot_called_for_catchup_prime():
|
||||
"""catchup path: arm snapshot uses kind=catchup, prime snapshot uses
|
||||
kind=catchup (so user's catchup toggle also gates the catchup prime)."""
|
||||
fsm = StateMachine(lockout_s=60)
|
||||
notif = FakeNotifier()
|
||||
audit = FakeAudit()
|
||||
calls: list[tuple[str, str]] = []
|
||||
|
||||
def snap(kind: str, label: str):
|
||||
calls.append((kind, label))
|
||||
return None
|
||||
|
||||
_handle_tick(fsm, "dark_red", 1.0, notif, audit, first_accepted=False, snapshot=snap)
|
||||
|
||||
# Synth-arm catchup alert + real prime alert (post-synth) — both tagged catchup
|
||||
assert len(notif.alerts) == 2
|
||||
assert calls == [("catchup", "catchup_arm_sell"), ("catchup", "prime_sell_catchup")]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _save_annotated_frame — audit-log failures instead of swallowing silently.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_save_annotated_frame_logs_audit_on_failure(tmp_path, monkeypatch):
|
||||
"""cv2.imwrite raising → return None AND audit.log event=snapshot_fail."""
|
||||
import atm.main as main_mod
|
||||
|
||||
# Force the lazy cv2 import to succeed but fail on imwrite
|
||||
class _FakeCv2:
|
||||
@staticmethod
|
||||
def rectangle(*a, **kw): pass
|
||||
@staticmethod
|
||||
def imwrite(*a, **kw):
|
||||
raise OSError("disk full")
|
||||
|
||||
import sys
|
||||
monkeypatch.setitem(sys.modules, "cv2", _FakeCv2)
|
||||
|
||||
cfg = type("C", (), {"dot_roi": type("R", (), {"x": 0, "y": 0, "w": 10, "h": 10})()})()
|
||||
frame = type("F", (), {"copy": lambda self: self})() # dummy with .copy()
|
||||
audit = FakeAudit()
|
||||
|
||||
result = main_mod._save_annotated_frame(frame, cfg, tmp_path, "test_label", 123.0, audit=audit)
|
||||
|
||||
assert result is None
|
||||
assert any(e.get("event") == "snapshot_fail" and e.get("label") == "test_label"
|
||||
for e in audit.events)
|
||||
|
||||
|
||||
def test_save_annotated_frame_succeeds(tmp_path, monkeypatch):
|
||||
"""Happy path: cv2 present + imwrite succeeds → returns path."""
|
||||
import atm.main as main_mod
|
||||
|
||||
written: list[str] = []
|
||||
|
||||
class _FakeCv2:
|
||||
@staticmethod
|
||||
def rectangle(*a, **kw): pass
|
||||
@staticmethod
|
||||
def imwrite(path, _img):
|
||||
written.append(path)
|
||||
|
||||
import sys
|
||||
monkeypatch.setitem(sys.modules, "cv2", _FakeCv2)
|
||||
|
||||
cfg = type("C", (), {"dot_roi": type("R", (), {"x": 0, "y": 0, "w": 10, "h": 10})()})()
|
||||
frame = type("F", (), {"copy": lambda self: self})()
|
||||
audit = FakeAudit()
|
||||
|
||||
result = main_mod._save_annotated_frame(frame, cfg, tmp_path, "BUY", 1700000000.0, audit=audit)
|
||||
|
||||
assert result is not None
|
||||
assert result.parent == tmp_path
|
||||
assert "BUY" in result.name
|
||||
assert len(written) == 1
|
||||
assert not any(e.get("event") == "snapshot_fail" for e in audit.events)
|
||||
|
||||
@@ -247,11 +247,11 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
|
||||
|
||||
assert len(arm) == 1, f"expected 1 arm alert, got {len(arm)} ({[a.title for a in captured]})"
|
||||
assert arm[0].direction == "SELL"
|
||||
assert "catchup" in (arm[0].title + arm[0].body).lower()
|
||||
assert "recuperare" in (arm[0].title + arm[0].body).lower()
|
||||
|
||||
assert len(prime) == 1
|
||||
assert prime[0].direction == "SELL"
|
||||
assert "catchup" in (prime[0].title + prime[0].body).lower()
|
||||
assert "recuperare" in (prime[0].title + prime[0].body).lower()
|
||||
|
||||
assert len(trigger) == 1
|
||||
assert trigger[0].direction == "SELL"
|
||||
|
||||
@@ -156,19 +156,43 @@ def test_stop_drains(tmp_path: Path) -> None:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _MockResponse:
|
||||
def __init__(self, status_code: int, text: str = "") -> None:
|
||||
def __init__(
|
||||
self,
|
||||
status_code: int,
|
||||
text: str = "",
|
||||
json_body: dict | None = None,
|
||||
raise_on_json: bool = False,
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self.text = text
|
||||
self._json_body = json_body if json_body is not None else {"ok": True, "result": {}}
|
||||
self._raise_on_json = raise_on_json
|
||||
|
||||
def json(self):
|
||||
if self._raise_on_json:
|
||||
raise ValueError("no JSON body")
|
||||
return self._json_body
|
||||
|
||||
|
||||
class _MockSession:
|
||||
def __init__(self, status_code: int = 204) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
status_code: int = 204,
|
||||
json_body: dict | None = None,
|
||||
raise_on_json: bool = False,
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self._json_body = json_body
|
||||
self._raise_on_json = raise_on_json
|
||||
self.calls: list[dict] = []
|
||||
|
||||
def post(self, url: str, **kwargs):
|
||||
self.calls.append({"url": url, **kwargs})
|
||||
return _MockResponse(self.status_code)
|
||||
return _MockResponse(
|
||||
self.status_code,
|
||||
json_body=self._json_body,
|
||||
raise_on_json=self._raise_on_json,
|
||||
)
|
||||
|
||||
|
||||
def test_discord_send_ok() -> None:
|
||||
@@ -204,7 +228,7 @@ def test_telegram_send_ok() -> None:
|
||||
n = TelegramNotifier("token", "chat123", session=session)
|
||||
n.send(_alert("Hi"))
|
||||
assert len(session.calls) == 1
|
||||
assert "*Hi*" in session.calls[0]["json"]["text"]
|
||||
assert "<b>Hi</b>" in session.calls[0]["json"]["text"]
|
||||
|
||||
|
||||
def test_telegram_429_raises() -> None:
|
||||
@@ -219,3 +243,118 @@ def test_telegram_5xx_raises() -> None:
|
||||
n = TelegramNotifier("token", "chat123", session=_MockSession(500))
|
||||
with pytest.raises(RuntimeError, match="500"):
|
||||
n.send(_alert("x"))
|
||||
|
||||
|
||||
# Telegram returns 200 OK with {"ok": false, ...} for logical failures (bot
|
||||
# blocked, invalid chat_id, parse_mode errors). Previously silent — now raises
|
||||
# so FanoutNotifier retries + DLQs + stats count the failure.
|
||||
|
||||
def test_telegram_ok_true_passes() -> None:
|
||||
"""200 + {ok:true} → success, no raise."""
|
||||
from atm.notifier.telegram import TelegramNotifier
|
||||
session = _MockSession(200, json_body={"ok": True, "result": {"message_id": 42}})
|
||||
n = TelegramNotifier("token", "chat123", session=session)
|
||||
n.send(_alert("ok body")) # must not raise
|
||||
assert len(session.calls) == 1
|
||||
|
||||
|
||||
def test_telegram_ok_false_raises() -> None:
|
||||
"""200 + {ok:false, ...} → RuntimeError with code + description."""
|
||||
from atm.notifier.telegram import TelegramNotifier
|
||||
session = _MockSession(
|
||||
200,
|
||||
json_body={
|
||||
"ok": False,
|
||||
"error_code": 403,
|
||||
"description": "Forbidden: bot was blocked by the user",
|
||||
},
|
||||
)
|
||||
n = TelegramNotifier("token", "chat123", session=session)
|
||||
with pytest.raises(RuntimeError, match="logical failure.*403.*blocked"):
|
||||
n.send(_alert("x"))
|
||||
|
||||
|
||||
def test_telegram_malformed_json_treated_as_success() -> None:
|
||||
"""200 with non-JSON body → no raise (edge case, shouldn't happen in practice)."""
|
||||
from atm.notifier.telegram import TelegramNotifier
|
||||
session = _MockSession(200, raise_on_json=True)
|
||||
n = TelegramNotifier("token", "chat123", session=session)
|
||||
n.send(_alert("x")) # must not raise
|
||||
|
||||
|
||||
def test_telegram_ok_false_goes_to_dlq(tmp_path: Path) -> None:
|
||||
"""Integration: ok:false → 3 retries → DLQ entry written with description."""
|
||||
from atm.notifier.telegram import TelegramNotifier
|
||||
session = _MockSession(
|
||||
200,
|
||||
json_body={"ok": False, "error_code": 400, "description": "chat not found"},
|
||||
)
|
||||
backend = TelegramNotifier("token", "chat123", session=session)
|
||||
|
||||
dl = tmp_path / "dead.jsonl"
|
||||
fan = FanoutNotifier([backend], dl, max_retries=3, backoff_base=0.01)
|
||||
fan.send(_alert("will-fail"))
|
||||
fan.stop(timeout=5.0)
|
||||
|
||||
# 4 HTTP calls (1 initial + 3 retries)
|
||||
assert len(session.calls) == 4
|
||||
s = fan.stats()
|
||||
assert s["telegram"]["failed"] == 1
|
||||
assert s["telegram"]["retries"] == 3
|
||||
assert s["telegram"]["sent"] == 0
|
||||
|
||||
assert dl.exists()
|
||||
lines = [json.loads(l) for l in dl.read_text().splitlines()]
|
||||
assert len(lines) == 1
|
||||
entry = lines[0]
|
||||
assert entry["backend"] == "telegram"
|
||||
assert entry["alert_title"] == "will-fail"
|
||||
assert "chat not found" in entry["error_str"]
|
||||
assert "400" in entry["error_str"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# on_drop callback — queue overflow audit trail
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_fanout_on_drop_callback_invoked(tmp_path: Path) -> None:
|
||||
"""Queue-overflow drop calls on_drop(backend_name, dropped_alert)."""
|
||||
drops: list[tuple[str, Alert]] = []
|
||||
|
||||
def on_drop(name: str, alert: Alert) -> None:
|
||||
drops.append((name, alert))
|
||||
|
||||
dl = tmp_path / "dead.jsonl"
|
||||
slow = FakeBackend("slow", sleep_s=0.2)
|
||||
fan = FanoutNotifier(
|
||||
[slow], dl, queue_size=2, backoff_base=0.01, on_drop=on_drop,
|
||||
)
|
||||
for i in range(10):
|
||||
fan.send(_alert(f"a{i}"))
|
||||
fan.stop(timeout=10.0)
|
||||
|
||||
assert len(drops) > 0
|
||||
assert all(name == "slow" for name, _ in drops)
|
||||
# Oldest alerts are the ones dropped
|
||||
dropped_titles = {a.title for _, a in drops}
|
||||
assert "a0" in dropped_titles or "a1" in dropped_titles
|
||||
|
||||
|
||||
def test_fanout_on_drop_exception_swallowed(tmp_path: Path) -> None:
|
||||
"""on_drop raising must not break dispatch — audit failure must not silence alerts."""
|
||||
def bad_on_drop(_name: str, _alert: Alert) -> None:
|
||||
raise RuntimeError("audit broken")
|
||||
|
||||
dl = tmp_path / "dead.jsonl"
|
||||
slow = FakeBackend("slow", sleep_s=0.2)
|
||||
fan = FanoutNotifier(
|
||||
[slow], dl, queue_size=2, backoff_base=0.01, on_drop=bad_on_drop,
|
||||
)
|
||||
# Must not raise despite every drop invoking bad_on_drop
|
||||
for i in range(10):
|
||||
fan.send(_alert(f"a{i}"))
|
||||
fan.stop(timeout=10.0)
|
||||
|
||||
s = fan.stats()
|
||||
# Some alerts still went through
|
||||
assert s["slow"]["sent"] > 0 or s["slow"]["dropped"] > 0
|
||||
|
||||
@@ -269,6 +269,58 @@ def test_refresh_arm_ts() -> None:
|
||||
assert t2.arm_ts == 9.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 11. fired_in_session — public API for catchup suppression after fire
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_fired_in_session_fresh_fsm() -> None:
|
||||
"""Fresh FSM — neither direction has fired."""
|
||||
sm = StateMachine()
|
||||
assert sm.fired_in_session("BUY") is False
|
||||
assert sm.fired_in_session("SELL") is False
|
||||
|
||||
|
||||
def test_fired_in_session_after_buy_fire() -> None:
|
||||
"""After a BUY fire: BUY=True, SELL=False (direction-scoped)."""
|
||||
sm = StateMachine(lockout_s=240)
|
||||
sm.feed("turquoise", 1.0)
|
||||
sm.feed("dark_green", 2.0)
|
||||
t = sm.feed("light_green", 3.0)
|
||||
assert t.reason == "fire"
|
||||
|
||||
assert sm.fired_in_session("BUY") is True
|
||||
assert sm.fired_in_session("SELL") is False
|
||||
|
||||
|
||||
def test_fired_in_session_after_sell_fire() -> None:
|
||||
"""Mirror — after SELL fire: SELL=True, BUY=False."""
|
||||
sm = StateMachine(lockout_s=240)
|
||||
sm.feed("yellow", 1.0)
|
||||
sm.feed("dark_red", 2.0)
|
||||
t = sm.feed("light_red", 3.0)
|
||||
assert t.reason == "fire"
|
||||
|
||||
assert sm.fired_in_session("SELL") is True
|
||||
assert sm.fired_in_session("BUY") is False
|
||||
|
||||
|
||||
def test_fired_in_session_both_directions() -> None:
|
||||
"""Fire both directions — both True."""
|
||||
sm = StateMachine(lockout_s=240)
|
||||
# BUY cycle
|
||||
sm.feed("turquoise", 1.0)
|
||||
sm.feed("dark_green", 2.0)
|
||||
sm.feed("light_green", 3.0)
|
||||
# SELL cycle
|
||||
sm.feed("yellow", 100.0)
|
||||
sm.feed("dark_red", 101.0)
|
||||
sm.feed("light_red", 102.0)
|
||||
|
||||
assert sm.fired_in_session("BUY") is True
|
||||
assert sm.fired_in_session("SELL") is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 11. exhaustive — parameterize over every (state, color) pair
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
214
tests/test_validate.py
Normal file
214
tests/test_validate.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""Tests for atm.validate — offline calibration validation.
|
||||
|
||||
Covers the 3 tests from plan section D':
|
||||
17. test_validate_calibration_pass
|
||||
18. test_validate_calibration_fail_reports_top_candidates
|
||||
19. test_validate_calibration_file_not_found
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from atm.config import (
|
||||
CanaryRegion,
|
||||
ColorSpec,
|
||||
Config,
|
||||
DiscordCfg,
|
||||
ROI,
|
||||
TelegramCfg,
|
||||
YAxisCalib,
|
||||
)
|
||||
from atm.detector import DetectionResult
|
||||
from atm.vision import ColorMatch
|
||||
|
||||
|
||||
def _make_config() -> Config:
|
||||
"""Minimal Config with a palette large enough to support top-3 candidates."""
|
||||
colors = {
|
||||
"turquoise": ColorSpec(rgb=(0, 200, 200), tolerance=30),
|
||||
"yellow": ColorSpec(rgb=(255, 255, 0), tolerance=30),
|
||||
"dark_green": ColorSpec(rgb=(0, 100, 0), tolerance=30),
|
||||
"dark_red": ColorSpec(rgb=(165, 42, 42), tolerance=30),
|
||||
"light_green": ColorSpec(rgb=(144, 238, 144), tolerance=30),
|
||||
"light_red": ColorSpec(rgb=(255, 182, 193), tolerance=30),
|
||||
"gray": ColorSpec(rgb=(128, 128, 128), tolerance=30),
|
||||
"background": ColorSpec(rgb=(18, 18, 18), tolerance=15),
|
||||
}
|
||||
return Config(
|
||||
window_title="test",
|
||||
dot_roi=ROI(x=0, y=0, w=100, h=100),
|
||||
chart_roi=ROI(x=0, y=0, w=100, h=100),
|
||||
colors=colors,
|
||||
y_axis=YAxisCalib(p1_y=0, p1_price=100.0, p2_y=100, p2_price=0.0),
|
||||
canary=CanaryRegion(
|
||||
roi=ROI(x=0, y=0, w=10, h=10),
|
||||
baseline_phash="0" * 64,
|
||||
),
|
||||
discord=DiscordCfg(webhook_url="http://localhost/fake"),
|
||||
telegram=TelegramCfg(bot_token="fake_token", chat_id="123"),
|
||||
debounce_depth=1,
|
||||
)
|
||||
|
||||
|
||||
def _write_labels(tmp_path: Path, entries: list[dict]) -> Path:
|
||||
f = tmp_path / "labels.json"
|
||||
f.write_text(json.dumps(entries), encoding="utf-8")
|
||||
return f
|
||||
|
||||
|
||||
def _write_blank_png(tmp_path: Path, name: str) -> Path:
|
||||
"""Write a trivially-valid 10x10 BGR image so cv2.imread returns non-None."""
|
||||
import cv2
|
||||
p = tmp_path / name
|
||||
arr = np.zeros((10, 10, 3), dtype=np.uint8)
|
||||
cv2.imwrite(str(p), arr)
|
||||
return p
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test 17: PASS path — mocked Detector.step returns expected color
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_validate_calibration_pass(monkeypatch, tmp_path):
|
||||
from atm import validate as validate_mod
|
||||
|
||||
img_path = _write_blank_png(tmp_path, "yellow_sample.png")
|
||||
labels = _write_labels(
|
||||
tmp_path,
|
||||
[{"path": str(img_path), "expected": "yellow", "note": "test"}],
|
||||
)
|
||||
|
||||
def fake_step(self, ts, frame=None):
|
||||
return DetectionResult(
|
||||
ts=ts,
|
||||
window_found=True,
|
||||
dot_found=True,
|
||||
rgb=(250, 250, 5),
|
||||
match=ColorMatch(name="yellow", distance=6.0, confidence=0.94),
|
||||
accepted=True,
|
||||
color="yellow",
|
||||
)
|
||||
|
||||
monkeypatch.setattr("atm.detector.Detector.step", fake_step)
|
||||
|
||||
report = validate_mod.validate_calibration(labels, _make_config())
|
||||
|
||||
assert report.total == 1
|
||||
assert report.passed == 1
|
||||
assert report.failed == 0
|
||||
assert report.all_pass is True
|
||||
rec = report.records[0]
|
||||
assert rec.passed is True
|
||||
assert rec.detected == "yellow"
|
||||
assert rec.expected == "yellow"
|
||||
assert "[PASS]" in report.render()
|
||||
|
||||
# CLI wiring: exit 0
|
||||
import atm.main as _main
|
||||
|
||||
class _Args:
|
||||
label_file = labels
|
||||
|
||||
monkeypatch.setattr("atm.config.Config.load_current", classmethod(lambda cls, d: _make_config()))
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_main._cmd_validate_calibration(_Args())
|
||||
assert exc_info.value.code == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test 18: FAIL path — Detector returns wrong color; report lists top 3
|
||||
# candidates and a SUGGESTIONS line with RGB distance.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_validate_calibration_fail_reports_top_candidates(monkeypatch, tmp_path):
|
||||
from atm import validate as validate_mod
|
||||
|
||||
img_path = _write_blank_png(tmp_path, "dark_red_sample.png")
|
||||
labels = _write_labels(
|
||||
tmp_path,
|
||||
[{"path": str(img_path), "expected": "dark_red", "note": "missed dark_red"}],
|
||||
)
|
||||
|
||||
# Observed RGB closer to gray than dark_red (like the real 2026-04-17 miss).
|
||||
def fake_step(self, ts, frame=None):
|
||||
return DetectionResult(
|
||||
ts=ts,
|
||||
window_found=True,
|
||||
dot_found=True,
|
||||
rgb=(135, 62, 67),
|
||||
match=ColorMatch(name="gray", distance=45.0, confidence=0.12),
|
||||
accepted=True,
|
||||
color="gray",
|
||||
)
|
||||
|
||||
monkeypatch.setattr("atm.detector.Detector.step", fake_step)
|
||||
|
||||
report = validate_mod.validate_calibration(labels, _make_config())
|
||||
|
||||
assert report.total == 1
|
||||
assert report.failed == 1
|
||||
assert report.all_pass is False
|
||||
|
||||
rec = report.records[0]
|
||||
assert rec.passed is False
|
||||
assert rec.detected == "gray"
|
||||
assert rec.expected == "dark_red"
|
||||
# Top 3 candidates populated (name, score) sorted by RGB distance.
|
||||
assert len(rec.top3) == 3
|
||||
names = [n for n, _ in rec.top3]
|
||||
# dark_red should appear in top candidates since observed RGB(135,62,67)
|
||||
# is reasonably close to dark_red(165,42,42).
|
||||
assert "dark_red" in names
|
||||
|
||||
rendered = report.render()
|
||||
assert "[FAIL]" in rendered
|
||||
assert "Top 3 candidates:" in rendered
|
||||
assert "SUGGESTIONS:" in rendered
|
||||
# The suggestion must mention the expected color's RGB and the measured distance.
|
||||
assert "dark_red" in rendered
|
||||
assert "(165, 42, 42)" in rendered
|
||||
|
||||
# CLI wiring: exit 1
|
||||
import atm.main as _main
|
||||
|
||||
class _Args:
|
||||
label_file = labels
|
||||
|
||||
monkeypatch.setattr("atm.config.Config.load_current", classmethod(lambda cls, d: _make_config()))
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_main._cmd_validate_calibration(_Args())
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test 19: missing label file — clean error, non-zero exit, no stack trace
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_validate_calibration_file_not_found(monkeypatch, tmp_path, capsys):
|
||||
from atm import validate as validate_mod
|
||||
|
||||
missing = tmp_path / "nope.json"
|
||||
|
||||
# Library-level: raises ValidationError (not bare FileNotFoundError).
|
||||
with pytest.raises(validate_mod.ValidationError) as exc_info:
|
||||
validate_mod.validate_calibration(missing, _make_config())
|
||||
assert "not found" in str(exc_info.value).lower()
|
||||
|
||||
# CLI-level: graceful sys.exit with non-zero code, message on stderr.
|
||||
import atm.main as _main
|
||||
|
||||
class _Args:
|
||||
label_file = missing
|
||||
|
||||
monkeypatch.setattr("atm.config.Config.load_current", classmethod(lambda cls, d: _make_config()))
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_main._cmd_validate_calibration(_Args())
|
||||
assert exc_info.value.code != 0
|
||||
err = capsys.readouterr().err
|
||||
assert "not found" in err.lower()
|
||||
# Ensure no python traceback leaked through.
|
||||
assert "Traceback" not in err
|
||||
Reference in New Issue
Block a user