feat(telegram): /ss + /resume cu verify multi-bulină și header FSM step

/ss și /resume afișează acum markerii top-3 buline sub ROI (cercuri pline,
r=7, culoarea clasificată) cu tick vertical roșu pe pick-ul FSM (rightmost).
Caption compact: `N/3 STATE` header + `emoji c1/c2/c3: name ← pick`.
FIRE_{BUY|SELL} afișat ca 3/3 când fire_ts e în ultimele 30s.

/resume face capture ÎNAINTE de clearing state → zero race cu FSM tick
simultan. Capture fail → title marchează "⚠️ captură eșuată", resume-ul
rulează oricum.

config: <version> mutat din caption în /status (acolo are sens pentru
verificare de calibrare, nu la fiecare /ss).

Adaugă find_top_dots în vision.py (top-N variantă a find_rightmost_dot,
tie-break determinist pe y). 5 teste sintetice noi + 4 teste noi pentru
dispatcher resume (screenshot inline, capture-fail, order-of-ops,
parity /ss <-> fire path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-22 00:38:29 +03:00
parent 5ebe26e5d5
commit 45ed502b3d
6 changed files with 525 additions and 36 deletions

View File

@@ -30,7 +30,8 @@ Workflow după sesiune: review frame-urile noi din `frames/`, adaugi entry-uri
`/ss` `/status` `/pause` `/resume` `/3` (interval min) `/stop`
- `/resume` clears BOTH user pause and canary drift-pause in one shot (`/resume force` still accepted as legacy alias).
- `/ss` — verify multi-bulină: adnotează top-3 buline din `dot_roi` (cerc roșu gros pe pick-ul FSM, cercuri colorate subțiri pe vecini) + caption cu clasificarea fiecăreia (nume, RGB, distanță, confidence) + `config: {version}`. Cercul colorat folosește `cfg.colors[name].rgb` la runtime — DRY cu paleta activă.
- `/resume` clears BOTH user pause and canary drift-pause in one shot (`/resume force` still accepted as legacy alias). Trimite un singur Alert cu screenshot adnotat inline (capture rulează **înainte** de clearing state → zero race cu FSM tick-uri). Dacă capture eșuează, title conține `⚠️ captură eșuată` și resume-ul se execută oricum.
- Drift-pause emits a single Telegram alert on transition. While paused, `/set_interval` is refused and `/ss` captions warn that detection is off.
- Heartbeat shows `⚠️ pauzat (drift)` instead of `activ` while canary is paused.

View File

@@ -30,11 +30,20 @@ Per-kind mute toggles for notifications in case arm/prime turn out too noisy in
- `cfg.notify.arm: bool = true`
- `cfg.notify.prime: bool = true`
- `cfg.notify.late_start: bool = true`
- `cfg.notify.resume_screenshot: bool = true` — gate `_save_inspect_frame` + inline screenshot în `/resume` Alert dacă recover-urile dese din drift devin zgomotoase.
Default all true. Gate each `notifier.send()` in `_handle_tick()` on the flag. Start after 3+ live sessions confirm the signal/noise ratio.
Blocked on: Faza 1 baseline evidence.
## P3-inspect-top-n-configurable
Parser comandă `/ss N` (ex: `/ss 5`) ca override pentru `n` în `find_top_dots` (default 3). Util dacă ROI scope se extinde și vrei o privire de ansamblu pe mai multe buline.
- Extindere `_parse_command` în `commands.py` (similar cu `/set_interval N`).
- Caption scaling: pentru N>3 formatter-ul trebuie să limiteze cele mai puțin relevante detections (ex: doar top-3 labels vizibile în poză, restul doar listate în caption).
- Start când `find_top_dots` + caption multi-bulină s-au dovedit util în practică.
## P3-faza2-exec
Auto-execution on TradeLocker. Blocked on TOS audit (see `docs/phase2-prop-firm-audit.md`). Not started until GO decision + 20+ Faza 1 sessions.

View File

@@ -534,6 +534,174 @@ def _cmd_validate_calibration(args) -> None:
# Live loop
# ---------------------------------------------------------------------------
def _fsm_step_label(fsm, now: float, recent_fire_s: float = 30.0) -> str:
"""Translate FSM into `step/total STATE` shorthand.
M2D sequence to a fire: IDLE → ARMED_* (1/3) → PRIMED_* (2/3) → FIRE (3/3).
FIRE nu e state persistent în FSM (întoarce în IDLE), așa că îl afișăm pe
baza `_last_fire[direction]` când timestamp-ul e în ultimele `recent_fire_s`.
"""
last_fires = getattr(fsm, "_last_fire", None)
if isinstance(last_fires, dict) and last_fires:
recent = [(ts, d) for d, ts in last_fires.items()
if isinstance(ts, (int, float)) and now - ts < recent_fire_s]
if recent:
_ts, direction = max(recent)
return f"3/3 FIRE_{direction}"
state_value = getattr(getattr(fsm, "state", None), "value", None)
if isinstance(state_value, str):
if state_value.startswith("ARMED_"):
return f"1/3 {state_value}"
if state_value.startswith("PRIMED_"):
return f"2/3 {state_value}"
return state_value
return ""
def _build_resume_info_text(was_drift: bool, skip_now: str | None) -> "tuple[str, str]":
"""Resume alert title/body for the three outcomes (market closed, drift, normal).
Extracted from /resume dispatcher so we can reuse it around the inline screenshot.
"""
if skip_now and skip_now.startswith("out_of_window"):
return ("Pauză eliminată — piața e închisă acum",
"Monitorizarea va porni la următoarea fereastră.")
if was_drift:
return ("Monitorizare reluată — drift-pause anulat",
"Dacă driftul persistă, Canary va repauza la următoarea verificare.")
return ("Monitorizare reluată", "")
def _draw_roi_cyan(annotated, cfg) -> None:
"""Draw the dot_roi cyan rectangle onto annotated. Shared by annotate helpers."""
import cv2 # type: ignore[import-untyped]
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
def _save_inspect_frame(
frame,
cfg,
fires_dir: Path,
now: float,
audit: "_AuditLike | None" = None,
) -> "tuple[Path | None, list[dict]]":
"""Annotate frame with top-3 rightmost dots + classifications. Returns (path, detections).
FSM pick (rightmost, idx 0) → thick red circle. Neighbors (idx 1, 2) → thin circle
in the classified color (BGR derived from cfg.colors[name].rgb at runtime, UNKNOWN
→ gray). Labels `{name} d={distance}` next to each circle. Price overlay for
rightmost dot (same as _save_annotated_frame). Fail-safe: any error → (None, []).
"""
try:
import cv2 # type: ignore[import-untyped]
except ImportError as exc:
if audit is not None:
try:
audit.log({"ts": now, "event": "snapshot_fail", "label": "inspect", "error": f"cv2 missing: {exc}"})
except Exception:
pass
return None, []
try:
from atm.vision import crop_roi, find_top_dots, pixel_rgb, classify_pixel
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_inspect.png"
annotated = frame.copy()
_draw_roi_cyan(annotated, cfg)
bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18)
dot_crop = crop_roi(frame, cfg.dot_roi)
positions = find_top_dots(dot_crop, bg_rgb, n=3)
palette = {k: (v.rgb, v.tolerance) for k, v in cfg.colors.items() if k != "background"}
detections: list[dict] = []
for idx, (cx, cy) in enumerate(positions):
rgb = pixel_rgb(dot_crop, cx, cy)
match = classify_pixel(rgb, palette)
pos_abs = (cfg.dot_roi.x + cx, cfg.dot_roi.y + cy)
detections.append({
"idx": idx, "name": match.name, "rgb": rgb,
"distance": match.distance, "confidence": match.confidence,
"pos_abs": pos_abs,
})
# Markerii stau pe un rând SUB ROI, aliniat vertical cu bulina prin x.
# Toate cercurile au aceeași rază (r=7), pline, în culoarea clasificată.
# Primary (FSM pick, rightmost) = tick vertical roșu care leagă markerul de bulina.
marker_y = cfg.dot_roi.y + cfg.dot_roi.h + 8
for det in reversed(detections): # neighbors first, primary last on top
pos_abs = det["pos_abs"]
name = det["name"]
if name in cfg.colors:
rgb_pal = cfg.colors[name].rgb
bgr = (int(rgb_pal[2]), int(rgb_pal[1]), int(rgb_pal[0]))
else:
bgr = (128, 128, 128)
marker_pos = (pos_abs[0], marker_y)
cv2.circle(annotated, marker_pos, 7, bgr, -1)
if det["idx"] == 0:
cv2.line(annotated, (pos_abs[0], marker_y - 7), (pos_abs[0], pos_abs[1] + 4), (0, 0, 255), 1)
# Price overlay on rightmost (reuse same formula as _save_annotated_frame)
if detections and hasattr(cfg, "y_axis"):
try:
dot_y = detections[0]["pos_abs"][1]
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass
cv2.imwrite(str(fpath), annotated)
return fpath, detections
except Exception as exc:
if audit is not None:
try:
audit.log({"ts": now, "event": "snapshot_fail", "label": "inspect", "error": str(exc)})
except Exception:
pass
return None, []
_COLOR_EMOJI = {
"turquoise": "🔵",
"light_green": "🟢",
"dark_green": "🟢",
"light_red": "🔴",
"dark_red": "🔴",
"yellow": "🟡",
}
def _format_inspect_caption(detections: list[dict], cfg) -> str:
"""Compact caption: leftmost = c1, rightmost = cN (FSM pick).
Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption —
emoji-ul e cel mai aproape de „textul în aceeași culoare ca bulina").
"""
if not detections:
return "nicio bulină detectată în ROI"
# detections ordered idx=0 (rightmost) → idx=N-1 (leftmost)
# Display left-to-right so user reads the chart order natural.
ordered = list(reversed(detections))
total = len(ordered)
lines: list[str] = []
for display_idx, det in enumerate(ordered, start=1):
name = det["name"]
emoji = _COLOR_EMOJI.get(name, "")
suffix = " ← pick" if display_idx == total else ""
lines.append(f"{emoji} c{display_idx}: {name}{suffix}")
return "\n".join(lines)
def _save_annotated_frame(
frame,
cfg,
@@ -568,8 +736,7 @@ def _save_annotated_frame(
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_{label}.png"
annotated = frame.copy()
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
_draw_roi_cyan(annotated, cfg)
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
try:
_, dot_y = dot_pos_abs
@@ -1188,6 +1355,10 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
window_ro = {"open": "deschisă", "closed": "închisă"}.get(window_val or "", window_val or "")
lines.append(f"fereastră: {window_ro}")
cfg_name = getattr(ctx.cfg, "config_version", None) or getattr(ctx.cfg, "version", None)
if isinstance(cfg_name, str) and cfg_name not in ("", "unknown"):
lines.append(f"config: {cfg_name}")
ctx.notifier.send(Alert(kind="status", title="ATM Status", body="\n".join(lines)))
elif cmd.action == "ss":
now_ss = time.time()
@@ -1203,12 +1374,24 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
body="",
))
return
path_ss = await asyncio.to_thread(
_save_annotated_frame, frame_ss, ctx.cfg, ctx.fires_dir, "ss", now_ss, ctx.audit,
path_ss, detections_ss = await asyncio.to_thread(
_save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit,
)
ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
warn = "⚠️ DETECȚIE OPRITĂ (drift) — trimite /resume" if getattr(ctx.canary, "is_paused", False) else ""
ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body=warn, image_path=path_ss))
caption_ss = _format_inspect_caption(detections_ss, ctx.cfg)
state_hdr = _fsm_step_label(ctx.fsm, now_ss)
warn_parts = [state_hdr]
if getattr(ctx.canary, "is_paused", False):
warn_parts.append("⚠️ DETECȚIE OPRITĂ (drift) — trimite /resume")
warn_parts.append(caption_ss)
ctx.audit.log({
"ts": now_ss, "event": "screenshot_sent",
"path": str(path_ss) if path_ss else None,
"n_dots": len(detections_ss),
})
ctx.notifier.send(Alert(
kind="screenshot", title="Screenshot manual",
body="\n\n".join(warn_parts), image_path=path_ss,
))
elif cmd.action == "pause":
# User manually stops monitoring. Canary drift state is untouched.
if ctx.lifecycle is not None:
@@ -1223,6 +1406,24 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
# /resume clears BOTH user_paused AND canary drift in one shot.
# /resume force rămâne acceptat ca alias (legacy muscle memory);
# câmpul `force` e păstrat în audit pentru schema compat.
# Capture+annotate BEFORE clearing state to avoid race with a concurrent
# FSM tick firing between clear and screenshot.
now_r = time.time()
path_r: "Path | None" = None
detections_r: list[dict] = []
capture_failed = False
if cfg.window_title:
title = await asyncio.to_thread(_focus_window_by_title, cfg.window_title)
ctx.audit.log({"ts": now_r, "event": "window_focused", "command": "resume", "title": title})
await asyncio.sleep(0.15)
frame_r = await asyncio.to_thread(ctx.capture)
if frame_r is None:
capture_failed = True
else:
path_r, detections_r = await asyncio.to_thread(
_save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit,
)
was_drift = bool(getattr(ctx.canary, "is_paused", False))
was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False
force = cmd.value == 1
@@ -1233,25 +1434,26 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
ctx.audit.log({
"ts": time.time(), "event": "user_resumed",
"was_drift": was_drift, "was_user": was_user, "force": force,
"screenshot_path": str(path_r) if path_r else None,
"n_dots": len(detections_r),
})
if cfg.window_title:
title = await asyncio.to_thread(_focus_window_by_title, cfg.window_title)
ctx.audit.log({"ts": time.time(), "event": "window_focused", "command": "resume", "title": title})
await asyncio.sleep(0.15)
# Adaptive response
skip_now = None
if ctx.lifecycle is not None:
skip_now = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
if skip_now and skip_now.startswith("out_of_window"):
title = "Pauză eliminată — piața e închisă acum"
body = "Monitorizarea va porni la următoarea fereastră."
elif was_drift:
title = "Monitorizare reluată — drift-pause anulat"
body = "Dacă driftul persistă, Canary va repauza la următoarea verificare."
else:
title = "Monitorizare reluată"
body = ""
ctx.notifier.send(Alert(kind="status", title=title, body=body))
title_r, body_r = _build_resume_info_text(was_drift, skip_now)
if capture_failed:
title_r = f"{title_r} — ⚠️ captură eșuată"
state_hdr_r = _fsm_step_label(ctx.fsm, now_r)
body_parts = [state_hdr_r]
if body_r:
body_parts.append(body_r)
if not capture_failed:
body_parts.append(_format_inspect_caption(detections_r, ctx.cfg))
ctx.notifier.send(Alert(
kind="screenshot", title=title_r,
body="\n\n".join(body_parts), image_path=path_r,
))
elif cmd.action == "help":
body = (
"/status — stare FSM, uptime, ultima detecție\n"

View File

@@ -137,6 +137,45 @@ def find_rightmost_dot(
return (cx, cy)
def find_top_dots(
roi_img: np.ndarray,
bg_rgb: tuple[int, int, int],
bg_tol: float = 15.0,
min_cluster_px: int = 3,
n: int = 3,
) -> list[tuple[int, int]]:
"""Top-N rightmost non-background clusters as (cx, cy), sorted by right edge desc.
Same mask/erode/connectedComponents pipeline as `find_rightmost_dot`, but collects
all qualifying components and returns the top N by right-edge. Tie-break on equal
right_edge: smaller y first (deterministic for tests). Anchor logic identical —
fused blobs (comp_w > 12) anchor to `right_edge-2`, small isolated dots use centroid.
"""
bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32)
diff = np.linalg.norm(roi_img.astype(np.float32) - bgr_bg, axis=2)
mask = (diff > bg_tol).astype(np.uint8)
kernel = np.ones((3, 3), dtype=np.uint8)
mask = cv2.erode(mask, kernel, iterations=2)
n_labels, _labels, stats, centroids = cv2.connectedComponentsWithStats(
mask, connectivity=8,
)
candidates: list[tuple[int, int, int, int]] = [] # (-right_edge, y, cx, cy)
for i in range(1, n_labels): # skip background
if int(stats[i, cv2.CC_STAT_AREA]) < min_cluster_px:
continue
comp_w = int(stats[i, cv2.CC_STAT_WIDTH])
right_edge = int(stats[i, cv2.CC_STAT_LEFT]) + comp_w - 1
if comp_w > 12:
cx = max(right_edge - 2, 0)
else:
cx = int(centroids[i][0])
cy = int(centroids[i][1])
candidates.append((-right_edge, cy, cx, cy))
candidates.sort() # desc by right_edge, asc by y
return [(cx, cy) for (_neg_r, _y, cx, cy) in candidates[:n]]
def pixel_rgb(roi_img: np.ndarray, x: int, y: int, box: int = 3) -> tuple[int, int, int]:
"""Sample mean RGB of a (2*box+1)² patch around (x,y). Input BGR → returns RGB."""
h, w = roi_img.shape[:2]

View File

@@ -10,6 +10,8 @@ from dataclasses import dataclass
from pathlib import Path
from unittest.mock import MagicMock
import cv2
import numpy as np
import pytest
@@ -865,9 +867,9 @@ async def test_resume_plain_also_clears_canary_drift():
# Audit event still records was_drift + force=False for traceability
resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"]
assert resumed and resumed[0]["was_drift"] is True and resumed[0]["force"] is False
# Message mentions drift-pause was cleared
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status and ("drift" in (status[0].title + status[0].body).lower())
# Message mentions drift-pause was cleared (kind is "screenshot" now since /resume attaches image)
alerts = ctx.notifier.alerts
assert alerts and ("drift" in (alerts[0].title + alerts[0].body).lower())
@pytest.mark.asyncio
@@ -921,12 +923,163 @@ async def test_resume_out_of_window_responds_with_pending_message():
_mm.time = real_time
assert ctx.lifecycle.user_paused is False
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
combined = (status[0].title + status[0].body).lower()
alerts = ctx.notifier.alerts
assert alerts
combined = (alerts[0].title + alerts[0].body).lower()
assert "închis" in combined or "piața" in combined or "ferestr" in combined
@pytest.mark.asyncio
async def test_dispatch_resume_sends_inline_screenshot(monkeypatch, tmp_path):
"""/resume produces a single Alert with image_path + FSM pick caption when capture succeeds."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
ctx.capture = lambda: object() # non-None frame
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
_fake_detections = [{
"idx": 0, "name": "light_green", "rgb": (0, 255, 0),
"distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200),
}]
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "fake_resume.png", _fake_detections))
await _main._dispatch_command(ctx, Command(action="resume"))
# Exactly one alert, with image attached + caption in body.
alerts = ctx.notifier.alerts
assert len(alerts) == 1
alert = alerts[0]
assert alert.image_path == tmp_path / "fake_resume.png"
assert "Monitorizare reluată" in alert.title
assert "← pick" in alert.body
assert "captură eșuată" not in alert.title
@pytest.mark.asyncio
async def test_dispatch_resume_capture_failed_still_resumes(monkeypatch, tmp_path):
"""/resume with capture=None → Alert title contains capture-failed, no image, resume still executes."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
ctx.capture = lambda: None # capture fails
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
await _main._dispatch_command(ctx, Command(action="resume"))
# State still cleared despite capture failure.
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
alerts = ctx.notifier.alerts
assert len(alerts) == 1
assert alerts[0].image_path is None
assert "captură eșuată" in alerts[0].title
assert "Monitorizare reluată" in alerts[0].title
@pytest.mark.asyncio
async def test_dispatch_resume_captures_before_state_clear(monkeypatch, tmp_path):
"""Capture must run BEFORE clearing user_paused / canary.resume() to avoid race with FSM tick."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self):
self._p = True
self.resumed_at: float | None = None
@property
def is_paused(self): return self._p
def resume(self):
self._p = False
self.resumed_at = _capture_sequence[0] if _capture_sequence else 0
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
_capture_sequence: list[int] = []
_capture_called = [0]
def _capture():
_capture_called[0] += 1
# State must still be paused at capture time.
assert ctx.lifecycle.user_paused is True, "capture ran AFTER user_paused was cleared"
assert canary.is_paused is True, "capture ran AFTER canary.resume()"
_capture_sequence.append(_capture_called[0])
return object()
ctx.capture = _capture
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "ok.png", []))
await _main._dispatch_command(ctx, Command(action="resume"))
assert _capture_called[0] == 1
assert ctx.lifecycle.user_paused is False # cleared after capture
@pytest.mark.asyncio
async def test_ss_and_fire_agree_on_rightmost_dot(tmp_path):
"""Parity: _save_inspect_frame's detections[0].pos_abs must match find_rightmost_dot
output on the same frame + ROI. Prevents silent drift between /ss verify and fire path."""
import atm.main as _main
from atm.vision import find_rightmost_dot, crop_roi
from atm.config import ROI, ColorSpec, YAxisCalib
# Synthetic frame with one bright green dot.
frame = np.zeros((100, 200, 3), dtype=np.uint8)
frame[:, :] = (18, 18, 18) # BGR background matching the palette entry below
cv2.circle(frame, (150, 50), 5, (0, 255, 0), -1)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=10, y=10, w=180, h=80),
colors={
"background": ColorSpec(rgb=(18, 18, 18), tolerance=15.0),
"light_green": ColorSpec(rgb=(0, 255, 0), tolerance=60.0),
},
y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=90, p2_price=50.0),
version="parity-test",
)
dot_crop = crop_roi(frame, cfg.dot_roi)
fire_pos = find_rightmost_dot(dot_crop, cfg.colors["background"].rgb)
assert fire_pos is not None
fire_abs = (cfg.dot_roi.x + fire_pos[0], cfg.dot_roi.y + fire_pos[1])
path, detections = _main._save_inspect_frame(frame, cfg, tmp_path, now=123.0)
assert path is not None
assert detections, "inspect should detect the green dot"
inspect_abs = detections[0]["pos_abs"]
assert inspect_abs == fire_abs, (
f"Parity break: fire={fire_abs} inspect={inspect_abs}"
"fire path and /ss would show different rightmost positions."
)
@pytest.mark.asyncio
async def test_status_command_reports_pause_reason():
"""/status body must mention pause reason + window state."""
@@ -993,9 +1146,13 @@ async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path):
ctx.fires_dir = tmp_path
# window_title off so we skip focus branch
ctx.cfg.window_title = None
# stub _save_annotated_frame to return a dummy path
monkeypatch.setattr(_main, "_save_annotated_frame",
lambda *a, **kw: tmp_path / "fake_ss.png")
# stub _save_inspect_frame to return (path, detections)
_fake_detections = [{
"idx": 0, "name": "light_green", "rgb": (0, 255, 0),
"distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200),
}]
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections))
await _main._dispatch_command(ctx, Command(action="ss"))
@@ -1003,11 +1160,13 @@ async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path):
assert screenshots
assert "DETECȚIE OPRITĂ" in screenshots[0].body or "drift" in screenshots[0].body.lower()
assert "/resume" in screenshots[0].body
# Caption with FSM pick must appear alongside the warn.
assert "← pick" in screenshots[0].body
@pytest.mark.asyncio
async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path):
"""/ss body must be empty when canary is not paused (no warn noise)."""
"""/ss body contains caption only when canary is not paused (no warn prefix)."""
import atm.main as _main
from atm.commands import Command
@@ -1016,14 +1175,20 @@ async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path):
ctx.capture = lambda: object()
ctx.fires_dir = tmp_path
ctx.cfg.window_title = None
monkeypatch.setattr(_main, "_save_annotated_frame",
lambda *a, **kw: tmp_path / "fake_ss.png")
_fake_detections = [{
"idx": 0, "name": "light_green", "rgb": (0, 255, 0),
"distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200),
}]
monkeypatch.setattr(_main, "_save_inspect_frame",
lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections))
await _main._dispatch_command(ctx, Command(action="ss"))
screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"]
assert screenshots
assert screenshots[0].body == ""
# Body should contain the caption (no warn), not be empty.
assert "← pick" in screenshots[0].body
assert "DETECȚIE OPRITĂ" not in screenshots[0].body
@pytest.mark.asyncio

73
tests/test_vision.py Normal file
View File

@@ -0,0 +1,73 @@
"""Unit tests for vision primitives (synthetic BGR masks, fast, deterministic)."""
from __future__ import annotations
import cv2
import numpy as np
from atm.vision import find_top_dots
BG_RGB = (18, 18, 18) # background in RGB
def _make_frame(h: int = 30, w: int = 100) -> np.ndarray:
"""Blank BGR frame filled with BG_RGB."""
bgr_bg = (BG_RGB[2], BG_RGB[1], BG_RGB[0])
frame = np.zeros((h, w, 3), dtype=np.uint8)
frame[:, :] = bgr_bg
return frame
def _paint_dot(frame: np.ndarray, cx: int, cy: int, radius: int = 5,
bgr: tuple[int, int, int] = (0, 255, 0)) -> None:
# radius ≥ 5 keeps blob above min_cluster_px after 2× erosion by 3x3 kernel.
cv2.circle(frame, (cx, cy), radius, bgr, -1)
def test_find_top_dots_happy_three_blobs_sorted_desc():
frame = _make_frame()
_paint_dot(frame, 10, 15)
_paint_dot(frame, 30, 15)
_paint_dot(frame, 50, 15)
result = find_top_dots(frame, BG_RGB, n=3)
assert len(result) == 3
# Sorted by right edge descending → x=50 first, then 30, then 10.
xs = [pt[0] for pt in result]
assert xs[0] > xs[1] > xs[2]
assert xs[0] >= 48 and xs[2] <= 12 # allow ±2px wobble from centroid
def test_find_top_dots_zero_blobs_returns_empty():
frame = _make_frame()
assert find_top_dots(frame, BG_RGB, n=3) == []
def test_find_top_dots_one_blob_n3_returns_one():
frame = _make_frame()
_paint_dot(frame, 25, 15)
result = find_top_dots(frame, BG_RGB, n=3)
assert len(result) == 1
cx, _cy = result[0]
assert 23 <= cx <= 27
def test_find_top_dots_fused_wide_blob_anchors_to_right_edge():
frame = _make_frame()
# Paint a wide stripe (width > 12) — simulates fused anti-aliased dots.
cv2.rectangle(frame, (20, 13), (60, 17), (0, 255, 0), -1)
result = find_top_dots(frame, BG_RGB, n=1)
assert len(result) == 1
cx, _cy = result[0]
# Anchor should be near right edge (~58 = 60-2), not centroid (~40).
assert cx >= 55
def test_find_top_dots_tie_break_by_y_ascending():
frame = _make_frame(h=40)
# Two dots at same right-edge x=50, different y.
_paint_dot(frame, 50, 10) # upper — should come first
_paint_dot(frame, 50, 30) # lower
result = find_top_dots(frame, BG_RGB, n=2)
assert len(result) == 2
# Tie-break: smaller y first.
assert result[0][1] < result[1][1]