diff --git a/CLAUDE.md b/CLAUDE.md index 489c4c3..6301816 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,7 +30,8 @@ Workflow după sesiune: review frame-urile noi din `frames/`, adaugi entry-uri `/ss` `/status` `/pause` `/resume` `/3` (interval min) `/stop` -- `/resume` clears BOTH user pause and canary drift-pause in one shot (`/resume force` still accepted as legacy alias). +- `/ss` — verify multi-bulină: adnotează top-3 buline din `dot_roi` (cerc roșu gros pe pick-ul FSM, cercuri colorate subțiri pe vecini) + caption cu clasificarea fiecăreia (nume, RGB, distanță, confidence) + `config: {version}`. Cercul colorat folosește `cfg.colors[name].rgb` la runtime — DRY cu paleta activă. +- `/resume` clears BOTH user pause and canary drift-pause in one shot (`/resume force` still accepted as legacy alias). Trimite un singur Alert cu screenshot adnotat inline (capture rulează **înainte** de clearing state → zero race cu FSM tick-uri). Dacă capture eșuează, title conține `⚠️ captură eșuată` și resume-ul se execută oricum. - Drift-pause emits a single Telegram alert on transition. While paused, `/set_interval` is refused and `/ss` captions warn that detection is off. - Heartbeat shows `⚠️ pauzat (drift)` instead of `activ` while canary is paused. diff --git a/TODOS.md b/TODOS.md index 64c4e82..8a9676b 100644 --- a/TODOS.md +++ b/TODOS.md @@ -30,11 +30,20 @@ Per-kind mute toggles for notifications in case arm/prime turn out too noisy in - `cfg.notify.arm: bool = true` - `cfg.notify.prime: bool = true` - `cfg.notify.late_start: bool = true` +- `cfg.notify.resume_screenshot: bool = true` — gate `_save_inspect_frame` + inline screenshot în `/resume` Alert dacă recover-urile dese din drift devin zgomotoase. Default all true. Gate each `notifier.send()` in `_handle_tick()` on the flag. Start after 3+ live sessions confirm the signal/noise ratio. Blocked on: Faza 1 baseline evidence. +## P3-inspect-top-n-configurable + +Parser comandă `/ss N` (ex: `/ss 5`) ca override pentru `n` în `find_top_dots` (default 3). Util dacă ROI scope se extinde și vrei o privire de ansamblu pe mai multe buline. + +- Extindere `_parse_command` în `commands.py` (similar cu `/set_interval N`). +- Caption scaling: pentru N>3 formatter-ul trebuie să limiteze cele mai puțin relevante detections (ex: doar top-3 labels vizibile în poză, restul doar listate în caption). +- Start când `find_top_dots` + caption multi-bulină s-au dovedit util în practică. + ## P3-faza2-exec Auto-execution on TradeLocker. Blocked on TOS audit (see `docs/phase2-prop-firm-audit.md`). Not started until GO decision + 20+ Faza 1 sessions. diff --git a/src/atm/main.py b/src/atm/main.py index e32eb27..44fbb33 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -534,6 +534,174 @@ def _cmd_validate_calibration(args) -> None: # Live loop # --------------------------------------------------------------------------- +def _fsm_step_label(fsm, now: float, recent_fire_s: float = 30.0) -> str: + """Translate FSM into `step/total STATE` shorthand. + + M2D sequence to a fire: IDLE → ARMED_* (1/3) → PRIMED_* (2/3) → FIRE (3/3). + FIRE nu e state persistent în FSM (întoarce în IDLE), așa că îl afișăm pe + baza `_last_fire[direction]` când timestamp-ul e în ultimele `recent_fire_s`. + """ + last_fires = getattr(fsm, "_last_fire", None) + if isinstance(last_fires, dict) and last_fires: + recent = [(ts, d) for d, ts in last_fires.items() + if isinstance(ts, (int, float)) and now - ts < recent_fire_s] + if recent: + _ts, direction = max(recent) + return f"3/3 FIRE_{direction}" + state_value = getattr(getattr(fsm, "state", None), "value", None) + if isinstance(state_value, str): + if state_value.startswith("ARMED_"): + return f"1/3 {state_value}" + if state_value.startswith("PRIMED_"): + return f"2/3 {state_value}" + return state_value + return "—" + + +def _build_resume_info_text(was_drift: bool, skip_now: str | None) -> "tuple[str, str]": + """Resume alert title/body for the three outcomes (market closed, drift, normal). + + Extracted from /resume dispatcher so we can reuse it around the inline screenshot. + """ + if skip_now and skip_now.startswith("out_of_window"): + return ("Pauză eliminată — piața e închisă acum", + "Monitorizarea va porni la următoarea fereastră.") + if was_drift: + return ("Monitorizare reluată — drift-pause anulat", + "Dacă driftul persistă, Canary va repauza la următoarea verificare.") + return ("Monitorizare reluată", "") + + +def _draw_roi_cyan(annotated, cfg) -> None: + """Draw the dot_roi cyan rectangle onto annotated. Shared by annotate helpers.""" + import cv2 # type: ignore[import-untyped] + x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h + cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) + + +def _save_inspect_frame( + frame, + cfg, + fires_dir: Path, + now: float, + audit: "_AuditLike | None" = None, +) -> "tuple[Path | None, list[dict]]": + """Annotate frame with top-3 rightmost dots + classifications. Returns (path, detections). + + FSM pick (rightmost, idx 0) → thick red circle. Neighbors (idx 1, 2) → thin circle + in the classified color (BGR derived from cfg.colors[name].rgb at runtime, UNKNOWN + → gray). Labels `{name} d={distance}` next to each circle. Price overlay for + rightmost dot (same as _save_annotated_frame). Fail-safe: any error → (None, []). + """ + try: + import cv2 # type: ignore[import-untyped] + except ImportError as exc: + if audit is not None: + try: + audit.log({"ts": now, "event": "snapshot_fail", "label": "inspect", "error": f"cv2 missing: {exc}"}) + except Exception: + pass + return None, [] + try: + from atm.vision import crop_roi, find_top_dots, pixel_rgb, classify_pixel + ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") + fpath = fires_dir / f"{ts_str}_inspect.png" + annotated = frame.copy() + _draw_roi_cyan(annotated, cfg) + + bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18) + dot_crop = crop_roi(frame, cfg.dot_roi) + positions = find_top_dots(dot_crop, bg_rgb, n=3) + palette = {k: (v.rgb, v.tolerance) for k, v in cfg.colors.items() if k != "background"} + + detections: list[dict] = [] + for idx, (cx, cy) in enumerate(positions): + rgb = pixel_rgb(dot_crop, cx, cy) + match = classify_pixel(rgb, palette) + pos_abs = (cfg.dot_roi.x + cx, cfg.dot_roi.y + cy) + detections.append({ + "idx": idx, "name": match.name, "rgb": rgb, + "distance": match.distance, "confidence": match.confidence, + "pos_abs": pos_abs, + }) + + # Markerii stau pe un rând SUB ROI, aliniat vertical cu bulina prin x. + # Toate cercurile au aceeași rază (r=7), pline, în culoarea clasificată. + # Primary (FSM pick, rightmost) = tick vertical roșu care leagă markerul de bulina. + marker_y = cfg.dot_roi.y + cfg.dot_roi.h + 8 + for det in reversed(detections): # neighbors first, primary last on top + pos_abs = det["pos_abs"] + name = det["name"] + if name in cfg.colors: + rgb_pal = cfg.colors[name].rgb + bgr = (int(rgb_pal[2]), int(rgb_pal[1]), int(rgb_pal[0])) + else: + bgr = (128, 128, 128) + marker_pos = (pos_abs[0], marker_y) + cv2.circle(annotated, marker_pos, 7, bgr, -1) + if det["idx"] == 0: + cv2.line(annotated, (pos_abs[0], marker_y - 7), (pos_abs[0], pos_abs[1] + 4), (0, 0, 255), 1) + + # Price overlay on rightmost (reuse same formula as _save_annotated_frame) + if detections and hasattr(cfg, "y_axis"): + try: + dot_y = detections[0]["pos_abs"][1] + ya = cfg.y_axis + slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y) + price = ya.p1_price + (dot_y - ya.p1_y) * slope + w_frame = annotated.shape[1] + text = f"${price:.2f}" + font = cv2.FONT_HERSHEY_SIMPLEX + scale, thickness = 1.2, 3 + (tw, th), _ = cv2.getTextSize(text, font, scale, thickness) + tx, ty = w_frame - tw - 10, th + 10 + cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1) + cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA) + except Exception: + pass + + cv2.imwrite(str(fpath), annotated) + return fpath, detections + except Exception as exc: + if audit is not None: + try: + audit.log({"ts": now, "event": "snapshot_fail", "label": "inspect", "error": str(exc)}) + except Exception: + pass + return None, [] + + +_COLOR_EMOJI = { + "turquoise": "🔵", + "light_green": "🟢", + "dark_green": "🟢", + "light_red": "🔴", + "dark_red": "🔴", + "yellow": "🟡", +} + + +def _format_inspect_caption(detections: list[dict], cfg) -> str: + """Compact caption: leftmost = c1, rightmost = cN (FSM pick). + + Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption — + emoji-ul e cel mai aproape de „textul în aceeași culoare ca bulina"). + """ + if not detections: + return "nicio bulină detectată în ROI" + # detections ordered idx=0 (rightmost) → idx=N-1 (leftmost) + # Display left-to-right so user reads the chart order natural. + ordered = list(reversed(detections)) + total = len(ordered) + lines: list[str] = [] + for display_idx, det in enumerate(ordered, start=1): + name = det["name"] + emoji = _COLOR_EMOJI.get(name, "⚪") + suffix = " ← pick" if display_idx == total else "" + lines.append(f"{emoji} c{display_idx}: {name}{suffix}") + return "\n".join(lines) + + def _save_annotated_frame( frame, cfg, @@ -568,8 +736,7 @@ def _save_annotated_frame( ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") fpath = fires_dir / f"{ts_str}_{label}.png" annotated = frame.copy() - x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h - cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) + _draw_roi_cyan(annotated, cfg) if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"): try: _, dot_y = dot_pos_abs @@ -1188,6 +1355,10 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: window_ro = {"open": "deschisă", "closed": "închisă"}.get(window_val or "", window_val or "—") lines.append(f"fereastră: {window_ro}") + cfg_name = getattr(ctx.cfg, "config_version", None) or getattr(ctx.cfg, "version", None) + if isinstance(cfg_name, str) and cfg_name not in ("", "unknown"): + lines.append(f"config: {cfg_name}") + ctx.notifier.send(Alert(kind="status", title="ATM Status", body="\n".join(lines))) elif cmd.action == "ss": now_ss = time.time() @@ -1203,12 +1374,24 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: body="", )) return - path_ss = await asyncio.to_thread( - _save_annotated_frame, frame_ss, ctx.cfg, ctx.fires_dir, "ss", now_ss, ctx.audit, + path_ss, detections_ss = await asyncio.to_thread( + _save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit, ) - ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None}) - warn = "⚠️ DETECȚIE OPRITĂ (drift) — trimite /resume" if getattr(ctx.canary, "is_paused", False) else "" - ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body=warn, image_path=path_ss)) + caption_ss = _format_inspect_caption(detections_ss, ctx.cfg) + state_hdr = _fsm_step_label(ctx.fsm, now_ss) + warn_parts = [state_hdr] + if getattr(ctx.canary, "is_paused", False): + warn_parts.append("⚠️ DETECȚIE OPRITĂ (drift) — trimite /resume") + warn_parts.append(caption_ss) + ctx.audit.log({ + "ts": now_ss, "event": "screenshot_sent", + "path": str(path_ss) if path_ss else None, + "n_dots": len(detections_ss), + }) + ctx.notifier.send(Alert( + kind="screenshot", title="Screenshot manual", + body="\n\n".join(warn_parts), image_path=path_ss, + )) elif cmd.action == "pause": # User manually stops monitoring. Canary drift state is untouched. if ctx.lifecycle is not None: @@ -1223,6 +1406,24 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: # /resume clears BOTH user_paused AND canary drift in one shot. # /resume force rămâne acceptat ca alias (legacy muscle memory); # câmpul `force` e păstrat în audit pentru schema compat. + # Capture+annotate BEFORE clearing state to avoid race with a concurrent + # FSM tick firing between clear and screenshot. + now_r = time.time() + path_r: "Path | None" = None + detections_r: list[dict] = [] + capture_failed = False + if cfg.window_title: + title = await asyncio.to_thread(_focus_window_by_title, cfg.window_title) + ctx.audit.log({"ts": now_r, "event": "window_focused", "command": "resume", "title": title}) + await asyncio.sleep(0.15) + frame_r = await asyncio.to_thread(ctx.capture) + if frame_r is None: + capture_failed = True + else: + path_r, detections_r = await asyncio.to_thread( + _save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit, + ) + was_drift = bool(getattr(ctx.canary, "is_paused", False)) was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False force = cmd.value == 1 @@ -1233,25 +1434,26 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: ctx.audit.log({ "ts": time.time(), "event": "user_resumed", "was_drift": was_drift, "was_user": was_user, "force": force, + "screenshot_path": str(path_r) if path_r else None, + "n_dots": len(detections_r), }) - if cfg.window_title: - title = await asyncio.to_thread(_focus_window_by_title, cfg.window_title) - ctx.audit.log({"ts": time.time(), "event": "window_focused", "command": "resume", "title": title}) - await asyncio.sleep(0.15) - # Adaptive response + skip_now = None if ctx.lifecycle is not None: skip_now = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary) - if skip_now and skip_now.startswith("out_of_window"): - title = "Pauză eliminată — piața e închisă acum" - body = "Monitorizarea va porni la următoarea fereastră." - elif was_drift: - title = "Monitorizare reluată — drift-pause anulat" - body = "Dacă driftul persistă, Canary va repauza la următoarea verificare." - else: - title = "Monitorizare reluată" - body = "" - ctx.notifier.send(Alert(kind="status", title=title, body=body)) + title_r, body_r = _build_resume_info_text(was_drift, skip_now) + if capture_failed: + title_r = f"{title_r} — ⚠️ captură eșuată" + state_hdr_r = _fsm_step_label(ctx.fsm, now_r) + body_parts = [state_hdr_r] + if body_r: + body_parts.append(body_r) + if not capture_failed: + body_parts.append(_format_inspect_caption(detections_r, ctx.cfg)) + ctx.notifier.send(Alert( + kind="screenshot", title=title_r, + body="\n\n".join(body_parts), image_path=path_r, + )) elif cmd.action == "help": body = ( "/status — stare FSM, uptime, ultima detecție\n" diff --git a/src/atm/vision.py b/src/atm/vision.py index 9a46287..8413526 100644 --- a/src/atm/vision.py +++ b/src/atm/vision.py @@ -137,6 +137,45 @@ def find_rightmost_dot( return (cx, cy) +def find_top_dots( + roi_img: np.ndarray, + bg_rgb: tuple[int, int, int], + bg_tol: float = 15.0, + min_cluster_px: int = 3, + n: int = 3, +) -> list[tuple[int, int]]: + """Top-N rightmost non-background clusters as (cx, cy), sorted by right edge desc. + + Same mask/erode/connectedComponents pipeline as `find_rightmost_dot`, but collects + all qualifying components and returns the top N by right-edge. Tie-break on equal + right_edge: smaller y first (deterministic for tests). Anchor logic identical — + fused blobs (comp_w > 12) anchor to `right_edge-2`, small isolated dots use centroid. + """ + bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32) + diff = np.linalg.norm(roi_img.astype(np.float32) - bgr_bg, axis=2) + mask = (diff > bg_tol).astype(np.uint8) + + kernel = np.ones((3, 3), dtype=np.uint8) + mask = cv2.erode(mask, kernel, iterations=2) + n_labels, _labels, stats, centroids = cv2.connectedComponentsWithStats( + mask, connectivity=8, + ) + candidates: list[tuple[int, int, int, int]] = [] # (-right_edge, y, cx, cy) + for i in range(1, n_labels): # skip background + if int(stats[i, cv2.CC_STAT_AREA]) < min_cluster_px: + continue + comp_w = int(stats[i, cv2.CC_STAT_WIDTH]) + right_edge = int(stats[i, cv2.CC_STAT_LEFT]) + comp_w - 1 + if comp_w > 12: + cx = max(right_edge - 2, 0) + else: + cx = int(centroids[i][0]) + cy = int(centroids[i][1]) + candidates.append((-right_edge, cy, cx, cy)) + candidates.sort() # desc by right_edge, asc by y + return [(cx, cy) for (_neg_r, _y, cx, cy) in candidates[:n]] + + def pixel_rgb(roi_img: np.ndarray, x: int, y: int, box: int = 3) -> tuple[int, int, int]: """Sample mean RGB of a (2*box+1)² patch around (x,y). Input BGR → returns RGB.""" h, w = roi_img.shape[:2] diff --git a/tests/test_main.py b/tests/test_main.py index 84356a5..c7a3aca 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -10,6 +10,8 @@ from dataclasses import dataclass from pathlib import Path from unittest.mock import MagicMock +import cv2 +import numpy as np import pytest @@ -865,9 +867,9 @@ async def test_resume_plain_also_clears_canary_drift(): # Audit event still records was_drift + force=False for traceability resumed = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] assert resumed and resumed[0]["was_drift"] is True and resumed[0]["force"] is False - # Message mentions drift-pause was cleared - status = [a for a in ctx.notifier.alerts if a.kind == "status"] - assert status and ("drift" in (status[0].title + status[0].body).lower()) + # Message mentions drift-pause was cleared (kind is "screenshot" now since /resume attaches image) + alerts = ctx.notifier.alerts + assert alerts and ("drift" in (alerts[0].title + alerts[0].body).lower()) @pytest.mark.asyncio @@ -921,12 +923,163 @@ async def test_resume_out_of_window_responds_with_pending_message(): _mm.time = real_time assert ctx.lifecycle.user_paused is False - status = [a for a in ctx.notifier.alerts if a.kind == "status"] - assert status - combined = (status[0].title + status[0].body).lower() + alerts = ctx.notifier.alerts + assert alerts + combined = (alerts[0].title + alerts[0].body).lower() assert "închis" in combined or "piața" in combined or "ferestr" in combined +@pytest.mark.asyncio +async def test_dispatch_resume_sends_inline_screenshot(monkeypatch, tmp_path): + """/resume produces a single Alert with image_path + FSM pick caption when capture succeeds.""" + import atm.main as _main + from atm.commands import Command + + class _Canary: + def __init__(self): self._p = True + @property + def is_paused(self): return self._p + def resume(self): self._p = False + canary = _Canary() + + ctx = _dispatch_ctx(canary=canary) + ctx.lifecycle.user_paused = True + ctx.capture = lambda: object() # non-None frame + ctx.fires_dir = tmp_path + ctx.cfg.window_title = None + + _fake_detections = [{ + "idx": 0, "name": "light_green", "rgb": (0, 255, 0), + "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), + }] + monkeypatch.setattr(_main, "_save_inspect_frame", + lambda *a, **kw: (tmp_path / "fake_resume.png", _fake_detections)) + + await _main._dispatch_command(ctx, Command(action="resume")) + + # Exactly one alert, with image attached + caption in body. + alerts = ctx.notifier.alerts + assert len(alerts) == 1 + alert = alerts[0] + assert alert.image_path == tmp_path / "fake_resume.png" + assert "Monitorizare reluată" in alert.title + assert "← pick" in alert.body + assert "captură eșuată" not in alert.title + + +@pytest.mark.asyncio +async def test_dispatch_resume_capture_failed_still_resumes(monkeypatch, tmp_path): + """/resume with capture=None → Alert title contains capture-failed, no image, resume still executes.""" + import atm.main as _main + from atm.commands import Command + + class _Canary: + def __init__(self): self._p = True + @property + def is_paused(self): return self._p + def resume(self): self._p = False + canary = _Canary() + + ctx = _dispatch_ctx(canary=canary) + ctx.lifecycle.user_paused = True + ctx.capture = lambda: None # capture fails + ctx.fires_dir = tmp_path + ctx.cfg.window_title = None + + await _main._dispatch_command(ctx, Command(action="resume")) + + # State still cleared despite capture failure. + assert ctx.lifecycle.user_paused is False + assert canary.is_paused is False + alerts = ctx.notifier.alerts + assert len(alerts) == 1 + assert alerts[0].image_path is None + assert "captură eșuată" in alerts[0].title + assert "Monitorizare reluată" in alerts[0].title + + +@pytest.mark.asyncio +async def test_dispatch_resume_captures_before_state_clear(monkeypatch, tmp_path): + """Capture must run BEFORE clearing user_paused / canary.resume() to avoid race with FSM tick.""" + import atm.main as _main + from atm.commands import Command + + class _Canary: + def __init__(self): + self._p = True + self.resumed_at: float | None = None + @property + def is_paused(self): return self._p + def resume(self): + self._p = False + self.resumed_at = _capture_sequence[0] if _capture_sequence else 0 + canary = _Canary() + + ctx = _dispatch_ctx(canary=canary) + ctx.lifecycle.user_paused = True + ctx.fires_dir = tmp_path + ctx.cfg.window_title = None + + _capture_sequence: list[int] = [] + _capture_called = [0] + + def _capture(): + _capture_called[0] += 1 + # State must still be paused at capture time. + assert ctx.lifecycle.user_paused is True, "capture ran AFTER user_paused was cleared" + assert canary.is_paused is True, "capture ran AFTER canary.resume()" + _capture_sequence.append(_capture_called[0]) + return object() + ctx.capture = _capture + + monkeypatch.setattr(_main, "_save_inspect_frame", + lambda *a, **kw: (tmp_path / "ok.png", [])) + + await _main._dispatch_command(ctx, Command(action="resume")) + + assert _capture_called[0] == 1 + assert ctx.lifecycle.user_paused is False # cleared after capture + + +@pytest.mark.asyncio +async def test_ss_and_fire_agree_on_rightmost_dot(tmp_path): + """Parity: _save_inspect_frame's detections[0].pos_abs must match find_rightmost_dot + output on the same frame + ROI. Prevents silent drift between /ss verify and fire path.""" + import atm.main as _main + from atm.vision import find_rightmost_dot, crop_roi + from atm.config import ROI, ColorSpec, YAxisCalib + + # Synthetic frame with one bright green dot. + frame = np.zeros((100, 200, 3), dtype=np.uint8) + frame[:, :] = (18, 18, 18) # BGR background matching the palette entry below + cv2.circle(frame, (150, 50), 5, (0, 255, 0), -1) + + cfg = types.SimpleNamespace( + dot_roi=ROI(x=10, y=10, w=180, h=80), + colors={ + "background": ColorSpec(rgb=(18, 18, 18), tolerance=15.0), + "light_green": ColorSpec(rgb=(0, 255, 0), tolerance=60.0), + }, + y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=90, p2_price=50.0), + version="parity-test", + ) + + dot_crop = crop_roi(frame, cfg.dot_roi) + fire_pos = find_rightmost_dot(dot_crop, cfg.colors["background"].rgb) + assert fire_pos is not None + fire_abs = (cfg.dot_roi.x + fire_pos[0], cfg.dot_roi.y + fire_pos[1]) + + path, detections = _main._save_inspect_frame(frame, cfg, tmp_path, now=123.0) + + assert path is not None + assert detections, "inspect should detect the green dot" + inspect_abs = detections[0]["pos_abs"] + assert inspect_abs == fire_abs, ( + f"Parity break: fire={fire_abs} inspect={inspect_abs} — " + "fire path and /ss would show different rightmost positions." + ) + + @pytest.mark.asyncio async def test_status_command_reports_pause_reason(): """/status body must mention pause reason + window state.""" @@ -993,9 +1146,13 @@ async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path): ctx.fires_dir = tmp_path # window_title off so we skip focus branch ctx.cfg.window_title = None - # stub _save_annotated_frame to return a dummy path - monkeypatch.setattr(_main, "_save_annotated_frame", - lambda *a, **kw: tmp_path / "fake_ss.png") + # stub _save_inspect_frame to return (path, detections) + _fake_detections = [{ + "idx": 0, "name": "light_green", "rgb": (0, 255, 0), + "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), + }] + monkeypatch.setattr(_main, "_save_inspect_frame", + lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="ss")) @@ -1003,11 +1160,13 @@ async def test_ss_includes_warn_body_while_canary_paused(monkeypatch, tmp_path): assert screenshots assert "DETECȚIE OPRITĂ" in screenshots[0].body or "drift" in screenshots[0].body.lower() assert "/resume" in screenshots[0].body + # Caption with FSM pick must appear alongside the warn. + assert "← pick" in screenshots[0].body @pytest.mark.asyncio async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path): - """/ss body must be empty when canary is not paused (no warn noise).""" + """/ss body contains caption only when canary is not paused (no warn prefix).""" import atm.main as _main from atm.commands import Command @@ -1016,14 +1175,20 @@ async def test_ss_no_warn_when_canary_healthy(monkeypatch, tmp_path): ctx.capture = lambda: object() ctx.fires_dir = tmp_path ctx.cfg.window_title = None - monkeypatch.setattr(_main, "_save_annotated_frame", - lambda *a, **kw: tmp_path / "fake_ss.png") + _fake_detections = [{ + "idx": 0, "name": "light_green", "rgb": (0, 255, 0), + "distance": 5.0, "confidence": 0.9, "pos_abs": (100, 200), + }] + monkeypatch.setattr(_main, "_save_inspect_frame", + lambda *a, **kw: (tmp_path / "fake_ss.png", _fake_detections)) await _main._dispatch_command(ctx, Command(action="ss")) screenshots = [a for a in ctx.notifier.alerts if a.kind == "screenshot"] assert screenshots - assert screenshots[0].body == "" + # Body should contain the caption (no warn), not be empty. + assert "← pick" in screenshots[0].body + assert "DETECȚIE OPRITĂ" not in screenshots[0].body @pytest.mark.asyncio diff --git a/tests/test_vision.py b/tests/test_vision.py new file mode 100644 index 0000000..bc6f8ae --- /dev/null +++ b/tests/test_vision.py @@ -0,0 +1,73 @@ +"""Unit tests for vision primitives (synthetic BGR masks, fast, deterministic).""" +from __future__ import annotations + +import cv2 +import numpy as np + +from atm.vision import find_top_dots + + +BG_RGB = (18, 18, 18) # background in RGB + + +def _make_frame(h: int = 30, w: int = 100) -> np.ndarray: + """Blank BGR frame filled with BG_RGB.""" + bgr_bg = (BG_RGB[2], BG_RGB[1], BG_RGB[0]) + frame = np.zeros((h, w, 3), dtype=np.uint8) + frame[:, :] = bgr_bg + return frame + + +def _paint_dot(frame: np.ndarray, cx: int, cy: int, radius: int = 5, + bgr: tuple[int, int, int] = (0, 255, 0)) -> None: + # radius ≥ 5 keeps blob above min_cluster_px after 2× erosion by 3x3 kernel. + cv2.circle(frame, (cx, cy), radius, bgr, -1) + + +def test_find_top_dots_happy_three_blobs_sorted_desc(): + frame = _make_frame() + _paint_dot(frame, 10, 15) + _paint_dot(frame, 30, 15) + _paint_dot(frame, 50, 15) + result = find_top_dots(frame, BG_RGB, n=3) + assert len(result) == 3 + # Sorted by right edge descending → x=50 first, then 30, then 10. + xs = [pt[0] for pt in result] + assert xs[0] > xs[1] > xs[2] + assert xs[0] >= 48 and xs[2] <= 12 # allow ±2px wobble from centroid + + +def test_find_top_dots_zero_blobs_returns_empty(): + frame = _make_frame() + assert find_top_dots(frame, BG_RGB, n=3) == [] + + +def test_find_top_dots_one_blob_n3_returns_one(): + frame = _make_frame() + _paint_dot(frame, 25, 15) + result = find_top_dots(frame, BG_RGB, n=3) + assert len(result) == 1 + cx, _cy = result[0] + assert 23 <= cx <= 27 + + +def test_find_top_dots_fused_wide_blob_anchors_to_right_edge(): + frame = _make_frame() + # Paint a wide stripe (width > 12) — simulates fused anti-aliased dots. + cv2.rectangle(frame, (20, 13), (60, 17), (0, 255, 0), -1) + result = find_top_dots(frame, BG_RGB, n=1) + assert len(result) == 1 + cx, _cy = result[0] + # Anchor should be near right edge (~58 = 60-2), not centroid (~40). + assert cx >= 55 + + +def test_find_top_dots_tie_break_by_y_ascending(): + frame = _make_frame(h=40) + # Two dots at same right-edge x=50, different y. + _paint_dot(frame, 50, 10) # upper — should come first + _paint_dot(frame, 50, 30) # lower + result = find_top_dots(frame, BG_RGB, n=2) + assert len(result) == 2 + # Tie-break: smaller y first. + assert result[0][1] < result[1][1]