From c950a5a699003773da994a892c37b967046850bc Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Tue, 5 May 2026 17:59:18 +0300 Subject: [PATCH] =?UTF-8?q?feat(multi-chart):=20refactor=20=5Frun=5Fmulti?= =?UTF-8?q?=5Ftick=20+=20fix=20alert=20spam=20pe=20oscila=C8=9Bie=20strip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug critic: _strips_match(tol=10) trip pe pulsații naturale de lățime ~18px între ticks (ex. 792↔810px). Fiecare trip → _commit_layout_change → reset FSM + alert Telegram + scheduler stop. Logul 2026-05-04.jsonl arăta 576 evenimente layout_change/zi, plus prime alerts repetate la dark_red/dark_green (FSM resetat înghite lockout-ul) și sincronizare cross-chart pe ambele FSM-uri simultan. Fix: - main.py:1511 — gate doar pe count change (len(new) != len(current)); count stabil → silent update sub_roi indiferent de jitter - main.py:1438 — silent=True pe alert layout_change (Telegram fără sunet) - 2 teste regresie noi: width oscillation 792↔810 + silent assertion - 2 teste async reparate: bootstrap _detect_strips_for_ctx pentru ScriptedDetector (regresie după ce _run_multi_tick a devenit unica cale de detecție) Plus refactor multi-chart pre-existent: layout.py modul nou, _detect_strips_for_ctx, ChartState per-chart FSM/Detector, ROI per-strip pe screenshots, scripts/diag_*. Verificat: 292 passed, 2 skipped în 10s. Co-Authored-By: Claude Haiku 4.5 --- .gitignore | 14 + configs/2026-04-21-recalib.toml | 196 +++++----- scripts/diag_strip_detection.py | 133 +++++++ scripts/diag_strip_fixes.py | 162 +++++++++ scripts/inspect_image_pixels.py | 124 +++++++ scripts/repro_ss_resume.py | 173 +++++++++ src/atm/detector.py | 11 +- src/atm/layout.py | 48 +++ src/atm/main.py | 619 ++++++++++++++++++++++++-------- src/atm/notifier/__init__.py | 16 + tests/test_detector.py | 56 +++ tests/test_layout.py | 163 +++++++++ tests/test_main.py | 461 ++++++++++++++++++++++++ tests/test_notifier.py | 31 +- 14 files changed, 1952 insertions(+), 255 deletions(-) create mode 100644 scripts/diag_strip_detection.py create mode 100644 scripts/diag_strip_fixes.py create mode 100644 scripts/inspect_image_pixels.py create mode 100644 scripts/repro_ss_resume.py create mode 100644 src/atm/layout.py create mode 100644 tests/test_layout.py diff --git a/.gitignore b/.gitignore index 1b8aaec..639166c 100644 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,17 @@ calibrate_capture_*.png # Debug captures debug_*.png logs/*.png + +# Test/dev scratch output +pytest-*.log +pytest-*.err +*.log +*.err + +# Auto-captured calibration frames (use `git add -f` to commit selected ones) +calibration/frames/*.png + +# Misc clutter +*.jpeg +*.jpg +!samples/*.jpg diff --git a/configs/2026-04-21-recalib.toml b/configs/2026-04-21-recalib.toml index dcd2b01..92f37cd 100644 --- a/configs/2026-04-21-recalib.toml +++ b/configs/2026-04-21-recalib.toml @@ -1,98 +1,98 @@ -window_title = "m2d" - -[dot_roi] -x = 0 -y = 712 -w = 1796 -h = 35 - -[chart_roi] -x = 17 -y = 125 -w = 1767 -h = 567 - -[colors] - -[colors.turquoise] -rgb = [0, 253, 253] -tolerance = 60.0 - -[colors.yellow] -rgb = [253, 253, 0] -tolerance = 60.0 - -[colors.dark_green] -rgb = [0, 122, 0] -tolerance = 60.0 - -[colors.dark_red] -rgb = [128, 0, 0] -tolerance = 60.0 - -[colors.light_green] -rgb = [0, 255, 0] -tolerance = 60.0 - -[colors.light_red] -rgb = [255, 0, 0] -tolerance = 60.0 - -[colors.gray] -rgb = [128, 128, 128] -tolerance = 60.0 - -[colors.background] -rgb = [0, 0, 0] -tolerance = 25.0 - -[y_axis] -p1_y = 166 -p1_price = 485.2 -p2_y = 664 -p2_price = 483.2 - -[canary] -baseline_phash = "fbe145390c1abec23204017757a326b8e37077288ef79947310a89c70e07ffff" -drift_threshold = 8 - -[canary.roi] -x = 26 -y = 27 -w = 197 -h = 15 - -[chart_window_region] -x = 3 -y = 0 -w = 1918 -h = 1029 - -# Secretele Discord/Telegram se setează în .env la rădăcina proiectului — vezi .env.example. - -[options] -debounce_depth = 1 -loop_interval_s = 5.0 -heartbeat_min = 30 -lockout_s = 240 -low_conf_threshold = 0.2 -low_conf_run = 3 -phaseb_timeout_s = 600 -dead_letter_path = "logs/dead_letter.jsonl" - -[options.alerts] -fire_on_phase_skip = true - -[options.operating_hours] -enabled = true -timezone = "America/New_York" -weekdays = ["MON", "TUE", "WED", "THU", "FRI"] -start_hhmm = "09:30" -stop_hhmm = "16:00" - -[options.attach_screenshots] -late_start = true -catchup = true -arm = true -prime = true -trigger = true +window_title = "m2d" + +[dot_roi] +x = 0 +y = 720 +w = 1796 +h = 40 + +[chart_roi] +x = 17 +y = 125 +w = 1767 +h = 567 + +[colors] + +[colors.turquoise] +rgb = [0, 253, 253] +tolerance = 60.0 + +[colors.yellow] +rgb = [253, 253, 0] +tolerance = 60.0 + +[colors.dark_green] +rgb = [0, 122, 0] +tolerance = 60.0 + +[colors.dark_red] +rgb = [128, 0, 0] +tolerance = 60.0 + +[colors.light_green] +rgb = [0, 255, 0] +tolerance = 60.0 + +[colors.light_red] +rgb = [255, 0, 0] +tolerance = 60.0 + +[colors.gray] +rgb = [128, 128, 128] +tolerance = 60.0 + +[colors.background] +rgb = [0, 0, 0] +tolerance = 25.0 + +[y_axis] +p1_y = 166 +p1_price = 485.2 +p2_y = 664 +p2_price = 483.2 + +[canary] +baseline_phash = "c11f4a852ec09f3a8de4e4cf4ad76d84f10b19d3e708663c38f5b538877c6624" +drift_threshold = 8 + +[canary.roi] +x = 26 +y = 27 +w = 197 +h = 15 + +[chart_window_region] +x = 3 +y = 0 +w = 1918 +h = 1029 + +# Secretele Discord/Telegram se setează în .env la rădăcina proiectului — vezi .env.example. + +[options] +debounce_depth = 1 +loop_interval_s = 5.0 +heartbeat_min = 30 +lockout_s = 240 +low_conf_threshold = 0.2 +low_conf_run = 3 +phaseb_timeout_s = 600 +dead_letter_path = "logs/dead_letter.jsonl" + +[options.alerts] +fire_on_phase_skip = true + +[options.operating_hours] +enabled = true +timezone = "America/New_York" +weekdays = ["MON", "TUE", "WED", "THU", "FRI"] +start_hhmm = "09:30" +stop_hhmm = "16:00" + +[options.attach_screenshots] +late_start = true +catchup = true +arm = true +prime = true +trigger = true diff --git a/scripts/diag_strip_detection.py b/scripts/diag_strip_detection.py new file mode 100644 index 0000000..0d43529 --- /dev/null +++ b/scripts/diag_strip_detection.py @@ -0,0 +1,133 @@ +r"""Diagnose why detect_strips finds 1 strip when there are 2 TS windows. + +Reuses the most recent raw capture under logs/repro/. For each strip detected, +prints the connected-components vivid mask, gaps, and the contiguous run lengths +so we can see exactly where the threshold is killing the second strip. +""" +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +import cv2 # noqa: E402 +import numpy as np # noqa: E402 + +from atm.config import Config # noqa: E402 +from atm.layout import VIVID_COLORS, detect_strips # noqa: E402 +from atm.vision import crop_roi, find_top_dots, classify_pixel, pixel_rgb # noqa: E402 + + +def _latest_raw() -> Path: + candidates = sorted((ROOT / "logs" / "repro").glob("*_raw.png")) + if not candidates: + raise SystemExit("No *_raw.png in logs/repro — run scripts/repro_ss_resume.py first.") + return candidates[-1] + + +def main() -> int: + cfg = Config.load_current(ROOT / "configs") + raw_path = _latest_raw() + print(f"Using raw frame: {raw_path}") + frame = cv2.imread(str(raw_path), cv2.IMREAD_COLOR) + if frame is None: + raise SystemExit(f"Could not read {raw_path}") + + palette = { + name: (spec.rgb, spec.tolerance) + for name, spec in cfg.colors.items() + if name != "background" + } + full_crop = crop_roi(frame, cfg.dot_roi) + h, w = full_crop.shape[:2] + print(f"dot_roi crop shape: {h}x{w} (cfg.dot_roi={cfg.dot_roi})") + + # 1) Build the vivid mask used by detect_strips + mask = np.zeros((h, w), dtype=np.uint8) + img_f = full_crop.astype(np.float32) + per_color_pixels = {} + for name in VIVID_COLORS: + if name not in palette: + continue + rgb, tol = palette[name] + bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32) + diff = np.linalg.norm(img_f - bgr, axis=2) + m = (diff < tol).astype(np.uint8) + per_color_pixels[name] = int(m.sum()) + mask |= m + + print("\nVIVID_COLORS pixel counts in dot_roi crop (pre-morphology):") + for n, c in per_color_pixels.items(): + print(f" {n:12s} {c:>7d} px") + print(f" total mask: {int(mask.sum()):>7d} px") + + # 2) Column projection: which x columns have any vivid pixel? + col_has = mask.any(axis=0).astype(np.uint8) + runs = [] + i = 0 + while i < w: + if col_has[i] == 0: + i += 1 + continue + j = i + while j < w and col_has[j] == 1: + j += 1 + runs.append((i, j - 1, j - i)) + i = j + print(f"\nContiguous vivid-column runs (raw, no morphology): {len(runs)}") + for x0, x1, run_w in runs[:25]: + print(f" x={x0:>4d}..{x1:>4d} width={run_w}") + if len(runs) > 25: + print(f" ... +{len(runs) - 25} more") + + # 3) Apply same morphology + connected components as detect_strips + strip_h = cfg.dot_roi.h + min_strip_px = max(150, strip_h * 8) + min_gap_px = max(20, int(strip_h * 0.8)) + kw = max(3, min_gap_px // 2) + print(f"\ndetect_strips params: min_strip_px={min_strip_px} min_gap_px={min_gap_px} morphology kw={kw}") + + kernel = np.ones((1, kw), dtype=np.uint8) + closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) + n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8) + print(f"\nConnected components after CLOSE (kw={kw}): n={n_labels - 1} (excluding bg)") + for i in range(1, n_labels): + x = int(stats[i, cv2.CC_STAT_LEFT]) + y = int(stats[i, cv2.CC_STAT_TOP]) + ww = int(stats[i, cv2.CC_STAT_WIDTH]) + hh = int(stats[i, cv2.CC_STAT_HEIGHT]) + passes = "PASS" if ww >= min_strip_px else "drop" + print(f" [{i:>2d}] x={x:>4d} y={y:>3d} w={ww:>4d} h={hh:>3d} -> {passes}") + + strips = detect_strips(full_crop, palette, min_gap_px, min_strip_px) + print(f"\ndetect_strips() final result: {len(strips)} strip(s)") + for i, s in enumerate(strips): + print(f" strip[{i}] x={s.x} y={s.y} w={s.w} h={s.h}") + + # 4) Run find_top_dots on the FULL dot_roi (single-chart fallback path used + # by /ss when ctx.charts < 2). This is what users see when the layout + # detector hasn't promoted the layout to multi-chart yet. + bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18) + bg_tol = cfg.colors["background"].tolerance if "background" in cfg.colors else 15.0 + full_dots = find_top_dots(full_crop, bg_rgb, bg_tol, n=3) + print(f"\nfind_top_dots on FULL dot_roi (n=3, single-chart fallback path):") + for i, (cx, cy) in enumerate(full_dots): + rgb = pixel_rgb(full_crop, cx, cy) + m = classify_pixel(rgb, palette) + print(f" c{i + 1}: pos=({cx + cfg.dot_roi.x},{cy + cfg.dot_roi.y}) rgb={rgb} -> {m.name} d={m.distance:.1f}") + + # 5) Save the binary mask + closed mask for visual inspection + out_dir = ROOT / "logs" / "repro" + cv2.imwrite(str(out_dir / "diag_mask_raw.png"), (mask * 255).astype(np.uint8)) + cv2.imwrite(str(out_dir / "diag_mask_closed.png"), (closed * 255).astype(np.uint8)) + print(f"\nWrote diag_mask_raw.png + diag_mask_closed.png to {out_dir}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/diag_strip_fixes.py b/scripts/diag_strip_fixes.py new file mode 100644 index 0000000..78e8e9e --- /dev/null +++ b/scripts/diag_strip_fixes.py @@ -0,0 +1,162 @@ +r"""Verify the two proposed fixes for detect_strips on the latest 2-window capture. + +Fix A: include gray in VIVID_COLORS mask (cheapest change). +Fix B: use non-background mask (any pixel where diff(bg) > bg_tol). + +Reuses the most recent raw capture in logs/repro/. +""" +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +import cv2 # noqa: E402 +import numpy as np # noqa: E402 + +from atm.config import Config, ROI # noqa: E402 +from atm.vision import crop_roi # noqa: E402 + + +def _detect_strips_with_palette( + full_dot_crop, palette, color_names, min_gap_px, min_strip_px, +): + """Same body as layout.detect_strips but with selectable color set.""" + h, w = full_dot_crop.shape[:2] + mask = np.zeros((h, w), dtype=np.uint8) + img_f = full_dot_crop.astype(np.float32) + for name in color_names: + if name not in palette: + continue + rgb, tol = palette[name] + bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32) + diff = np.linalg.norm(img_f - bgr, axis=2) + mask |= (diff < tol).astype(np.uint8) + kw = max(3, min_gap_px // 2) + kernel = np.ones((1, kw), dtype=np.uint8) + closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) + n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8) + out = [] + for i in range(1, n_labels): + x = int(stats[i, cv2.CC_STAT_LEFT]) + ww = int(stats[i, cv2.CC_STAT_WIDTH]) + if ww < min_strip_px: + continue + out.append(ROI(x=x, y=0, w=ww, h=h)) + out.sort(key=lambda r: r.x) + return out, mask, closed + + +def _detect_strips_non_bg(full_dot_crop, bg_rgb, bg_tol, min_gap_px, min_strip_px): + """Fix B: any pixel different from background → strip mask.""" + h, w = full_dot_crop.shape[:2] + bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32) + diff = np.linalg.norm(full_dot_crop.astype(np.float32) - bgr_bg, axis=2) + mask = (diff > bg_tol).astype(np.uint8) + kw = max(3, min_gap_px // 2) + kernel = np.ones((1, kw), dtype=np.uint8) + closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) + n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8) + out = [] + for i in range(1, n_labels): + x = int(stats[i, cv2.CC_STAT_LEFT]) + ww = int(stats[i, cv2.CC_STAT_WIDTH]) + if ww < min_strip_px: + continue + out.append(ROI(x=x, y=0, w=ww, h=h)) + out.sort(key=lambda r: r.x) + return out, mask, closed + + +def _annotate(frame, strips_abs, label, out_dir): + annotated = frame.copy() + for r in strips_abs: + cv2.rectangle(annotated, (r.x, r.y), (r.x + r.w, r.y + r.h), (0, 255, 255), 2) + p = out_dir / f"diag_fix_{label}.png" + cv2.imwrite(str(p), annotated) + return p + + +def main() -> int: + cfg = Config.load_current(ROOT / "configs") + raws = sorted(p for p in (ROOT / "logs" / "repro").glob("*_raw.png") if p.name[0].isdigit()) + if not raws: + raise SystemExit("Run scripts/repro_ss_resume.py first.") + raw_path = raws[-1] + print(f"Using: {raw_path}") + frame = cv2.imread(str(raw_path), cv2.IMREAD_COLOR) + + palette = {n: (s.rgb, s.tolerance) for n, s in cfg.colors.items() if n != "background"} + bg_rgb = cfg.colors["background"].rgb + bg_tol = cfg.colors["background"].tolerance + full_crop = crop_roi(frame, cfg.dot_roi) + h, w = full_crop.shape[:2] + strip_h = cfg.dot_roi.h + min_strip_px = max(150, strip_h * 8) + min_gap_px = max(20, int(strip_h * 0.8)) + print(f"params: min_strip_px={min_strip_px} min_gap_px={min_gap_px} crop={w}x{h}") + + out_dir = ROOT / "logs" / "repro" + + # Baseline (current code: vivid only, no gray) + vivid_only = ("turquoise", "yellow", "dark_green", "dark_red", "light_green", "light_red") + s0, m0, c0 = _detect_strips_with_palette(full_crop, palette, vivid_only, min_gap_px, min_strip_px) + print(f"\n[BASELINE vivid-only ] strips={len(s0)}") + for i, r in enumerate(s0): + print(f" [{i}] x={r.x:>4d} w={r.w:>4d}") + cv2.imwrite(str(out_dir / "diag_fix_BASELINE_mask_closed.png"), (c0 * 255).astype(np.uint8)) + _annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in s0], + "BASELINE", out_dir) + + # Fix A: include gray + fixA_palette = vivid_only + ("gray",) + sA, mA, cA = _detect_strips_with_palette(full_crop, palette, fixA_palette, min_gap_px, min_strip_px) + print(f"\n[FIX A vivid+gray ] strips={len(sA)}") + for i, r in enumerate(sA): + print(f" [{i}] x={r.x:>4d} w={r.w:>4d}") + cv2.imwrite(str(out_dir / "diag_fix_A_mask_closed.png"), (cA * 255).astype(np.uint8)) + _annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in sA], + "A_vivid_plus_gray", out_dir) + + # Fix B: any non-background + sB, mB, cB = _detect_strips_non_bg(full_crop, bg_rgb, bg_tol, min_gap_px, min_strip_px) + print(f"\n[FIX B non-bg ] strips={len(sB)}") + for i, r in enumerate(sB): + print(f" [{i}] x={r.x:>4d} w={r.w:>4d}") + cv2.imwrite(str(out_dir / "diag_fix_B_mask_closed.png"), (cB * 255).astype(np.uint8)) + _annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in sB], + "B_non_background", out_dir) + + # Sanity: where is the divider between the two TS windows? + # Project non-bg mask onto x; long zero-runs reveal the gap. + bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32) + diff = np.linalg.norm(full_crop.astype(np.float32) - bgr_bg, axis=2) + nonbg = (diff > bg_tol).astype(np.uint8) + col_any = nonbg.any(axis=0).astype(np.uint8) + # Find longest 0-run + longest = (0, 0, 0) # (length, x0, x1) + i = 0 + while i < w: + if col_any[i] == 1: + i += 1 + continue + j = i + while j < w and col_any[j] == 0: + j += 1 + run = j - i + if run > longest[0]: + longest = (run, i, j - 1) + i = j + print(f"\nLongest empty (background-only) horizontal stretch in dot_roi: " + f"{longest[0]}px at x={longest[1]}..{longest[2]} " + f"(this is where the window divider sits)") + print(f"\nWrote diag_fix_*.png to {out_dir}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/inspect_image_pixels.py b/scripts/inspect_image_pixels.py new file mode 100644 index 0000000..5a4b826 --- /dev/null +++ b/scripts/inspect_image_pixels.py @@ -0,0 +1,124 @@ +r"""Inspect ATM strip pixels without relying on shell pipelines. + +Usage: + .\.venv\Scripts\python.exe scripts\inspect_image_pixels.py 6033117943853423831.jpg + .\.venv\Scripts\python.exe scripts\inspect_image_pixels.py image.jpg --point 1780 725 + +The script intentionally parses only the config fields needed for pixel inspection, +so it does not require Discord/Telegram secrets to be valid. +""" +from __future__ import annotations + +import argparse +import json +import sys +import tomllib +from pathlib import Path + +import cv2 + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from atm.config import ROI # noqa: E402 +from atm.vision import classify_pixel, crop_roi, find_top_dots, pixel_rgb # noqa: E402 + + +def _load_probe_config(path: Path) -> dict: + data = tomllib.loads(path.read_text(encoding="utf-8")) + colors = { + name: (tuple(int(c) for c in spec["rgb"]), float(spec["tolerance"])) + for name, spec in data["colors"].items() + } + background = colors.pop("background", ((18, 18, 18), 15.0)) + return { + "dot_roi": ROI(**data["dot_roi"]), + "colors": colors, + "background_rgb": background[0], + "background_tol": background[1], + } + + +def _as_jsonable_match(match) -> dict: + return { + "name": match.name, + "distance": round(float(match.distance), 3), + "confidence": round(float(match.confidence), 3), + } + + +def main() -> int: + parser = argparse.ArgumentParser(description="Inspect ATM strip pixels in a JPG/PNG frame.") + parser.add_argument("image", type=Path, help="Frame image path.") + parser.add_argument( + "--config", + type=Path, + default=ROOT / "configs" / "2026-04-21-recalib.toml", + help="ATM TOML config path.", + ) + parser.add_argument("--top", type=int, default=3, help="Number of rightmost dots to report.") + parser.add_argument( + "--point", + nargs=2, + type=int, + metavar=("X", "Y"), + help="Optional absolute pixel coordinate to sample.", + ) + parser.add_argument("--box", type=int, default=3, help="Sampling radius for mean RGB.") + args = parser.parse_args() + + frame = cv2.imread(str(args.image), cv2.IMREAD_COLOR) + if frame is None: + raise SystemExit(f"Could not read image: {args.image}") + + probe = _load_probe_config(args.config) + roi = probe["dot_roi"] + roi_img = crop_roi(frame, roi) + dots = find_top_dots( + roi_img, + bg_rgb=probe["background_rgb"], + bg_tol=probe["background_tol"], + n=args.top, + ) + + result = { + "image": str(args.image), + "image_size": {"w": int(frame.shape[1]), "h": int(frame.shape[0])}, + "config": str(args.config), + "dot_roi": {"x": roi.x, "y": roi.y, "w": roi.w, "h": roi.h}, + "dots": [], + } + + for x, y in dots: + rgb = pixel_rgb(roi_img, x, y, box=args.box) + match = classify_pixel(rgb, probe["colors"]) + result["dots"].append( + { + "roi_xy": [int(x), int(y)], + "abs_xy": [int(roi.x + x), int(roi.y + y)], + "rgb": list(rgb), + "match": _as_jsonable_match(match), + } + ) + + if args.point: + px, py = args.point + if not (roi.x <= px < roi.x + roi.w and roi.y <= py < roi.y + roi.h): + raise SystemExit(f"Point {px},{py} is outside dot_roi") + rx, ry = px - roi.x, py - roi.y + rgb = pixel_rgb(roi_img, rx, ry, box=args.box) + result["point"] = { + "roi_xy": [rx, ry], + "abs_xy": [px, py], + "rgb": list(rgb), + "match": _as_jsonable_match(classify_pixel(rgb, probe["colors"])), + } + + print(json.dumps(result, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/repro_ss_resume.py b/scripts/repro_ss_resume.py new file mode 100644 index 0000000..cc36a6f --- /dev/null +++ b/scripts/repro_ss_resume.py @@ -0,0 +1,173 @@ +r"""Reproduce /ss and /resume Telegram screenshot pipelines for offline inspection. + +Brings the `m2d` window to front (Win32 trick — same as live loop), captures via mss +using the active config's chart_window_region, then runs the EXACT same annotators +that /ss and /resume use: + - _save_inspect_frame → /ss path (top-3 dots per detected strip + caption) + - _save_annotated_frame → /resume path (cyan rect on dot_roi/sub_roi) + +Outputs are saved under logs/repro/ alongside a JSON summary of detections. + +Usage: + .\.venv\Scripts\python.exe scripts\repro_ss_resume.py + .\.venv\Scripts\python.exe scripts\repro_ss_resume.py --no-focus +""" +from __future__ import annotations + +import argparse +import json +import sys +import time +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +import cv2 # noqa: E402 +import numpy as np # noqa: E402 + +from atm.config import Config # noqa: E402 +from atm.layout import detect_strips # noqa: E402 +from atm.main import ( # noqa: E402 + _focus_window_by_title, + _format_inspect_caption, + _save_annotated_frame, + _save_inspect_frame, +) +from atm.vision import crop_roi # noqa: E402 + + +def _capture_via_region(cfg) -> np.ndarray | None: + import mss # type: ignore[import-untyped] + + reg = cfg.chart_window_region + if reg is None: + # Fallback: grab full primary monitor + with mss.mss() as sct: + mon = sct.monitors[1] + img = sct.grab(mon) + return cv2.cvtColor(np.array(img), cv2.COLOR_BGRA2BGR) + with mss.mss() as sct: + mon = {"top": reg.y, "left": reg.x, "width": reg.w, "height": reg.h} + img = sct.grab(mon) + return cv2.cvtColor(np.array(img), cv2.COLOR_BGRA2BGR) + + +def main() -> int: + p = argparse.ArgumentParser() + p.add_argument("--no-focus", action="store_true", help="Skip Win32 focus call.") + p.add_argument("--delay", type=float, default=0.5, help="Seconds to sleep after focus.") + p.add_argument( + "--out", + type=Path, + default=ROOT / "logs" / "repro", + help="Output directory for annotated frames + JSON.", + ) + args = p.parse_args() + + cfg = Config.load_current(ROOT / "configs") + args.out.mkdir(parents=True, exist_ok=True) + + # 1) Focus + if not args.no_focus and cfg.window_title: + title = _focus_window_by_title(cfg.window_title) + print(f"[focus] needle={cfg.window_title!r} -> {title!r}") + if args.delay > 0: + time.sleep(args.delay) + + # 2) Capture + frame = _capture_via_region(cfg) + if frame is None: + print("[capture] FAILED — no frame") + return 1 + print(f"[capture] frame shape={frame.shape}") + + now = time.time() + ts_str = time.strftime("%Y%m%d_%H%M%S") + + # Save raw too so we can hand-inspect what mss actually grabbed + raw_path = args.out / f"{ts_str}_raw.png" + cv2.imwrite(str(raw_path), frame) + print(f"[raw] {raw_path}") + + # 3) Detect strips (same logic as live multi-chart split) + strip_h = cfg.dot_roi.h + min_strip_px = max(150, strip_h * 8) + min_gap_px = max(20, int(strip_h * 0.8)) + palette = { + name: (spec.rgb, spec.tolerance) + for name, spec in cfg.colors.items() + if name != "background" + } + full_dot_crop = crop_roi(frame, cfg.dot_roi) + raw_strips = detect_strips(full_dot_crop, palette, min_gap_px, min_strip_px) + # Translate back to absolute frame coords + from atm.config import ROI # noqa: E402 + + strips = [ + ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y + r.y, w=r.w, h=r.h) + for r in raw_strips + ] + print(f"[strips] dot_roi={cfg.dot_roi} detected={len(strips)} strips") + for i, s in enumerate(strips): + print(f" strip[{i}] x={s.x} y={s.y} w={s.w} h={s.h}") + + # Also dump a copy of the full dot_roi crop for visual sanity-check + crop_path = args.out / f"{ts_str}_dot_roi_crop.png" + cv2.imwrite(str(crop_path), full_dot_crop) + print(f"[crop] {crop_path}") + + # 4) /ss inspect-annotate (top-3 per strip, FSM-pick markers) + inspect_path, detections = _save_inspect_frame( + frame, cfg, args.out, now, audit=None, + strips=strips if strips else None, + ) + caption = _format_inspect_caption(detections, cfg) + print(f"[ss] {inspect_path}") + print(f"[ss] caption:\n{caption}") + + # 5) /resume annotate (cyan rect on dot_roi or first strip) + roi_for_resume = strips[0] if strips else cfg.dot_roi + resume_path = _save_annotated_frame( + frame, cfg, args.out, "resume_repro", now, audit=None, roi=roi_for_resume, + ) + print(f"[resume] {resume_path}") + + # 6) JSON summary + summary = { + "ts": ts_str, + "frame_shape": list(frame.shape), + "dot_roi": {"x": cfg.dot_roi.x, "y": cfg.dot_roi.y, "w": cfg.dot_roi.w, "h": cfg.dot_roi.h}, + "strips": [ + {"x": s.x, "y": s.y, "w": s.w, "h": s.h} for s in strips + ], + "detections": [ + { + "strip_idx": d["strip_idx"], + "idx": d["idx"], + "name": d["name"], + "rgb": list(d["rgb"]), + "distance": round(float(d["distance"]), 3), + "confidence": round(float(d["confidence"]), 3), + "pos_abs": list(d["pos_abs"]), + } + for d in detections + ], + "files": { + "raw": str(raw_path), + "dot_roi_crop": str(crop_path), + "inspect": str(inspect_path) if inspect_path else None, + "resume": str(resume_path) if resume_path else None, + }, + "ss_caption": caption, + } + json_path = args.out / f"{ts_str}_summary.json" + json_path.write_text(json.dumps(summary, indent=2), encoding="utf-8") + print(f"[json] {json_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/atm/detector.py b/src/atm/detector.py index e2bc603..5db6837 100644 --- a/src/atm/detector.py +++ b/src/atm/detector.py @@ -7,7 +7,7 @@ from typing import Callable import numpy as np -from .config import Config +from .config import Config, ROI from .vision import ( ColorMatch, classify_pixel, @@ -40,9 +40,11 @@ class Detector: capture: ScreenCapture, bg_rgb: tuple[int, int, int] | None = None, bg_tol: float | None = None, + dot_roi_override: ROI | None = None, ) -> None: self._cfg = cfg self._capture = capture + self._dot_roi = dot_roi_override if dot_roi_override is not None else cfg.dot_roi # Prefer config-defined background; fall back to dark-grey default. if "background" in cfg.colors: spec = cfg.colors["background"] @@ -84,7 +86,7 @@ class Detector: self._rolling.append(r) return r - roi_img = crop_roi(frame, self._cfg.dot_roi) + roi_img = crop_roi(frame, self._dot_roi) dot_pos = find_rightmost_dot(roi_img, self._bg_rgb, self._bg_tol) if dot_pos is None: @@ -124,11 +126,14 @@ class Detector: match=match, accepted=accepted, color=color, - dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y), + dot_pos_abs=(self._dot_roi.x + x, self._dot_roi.y + y), ) self._rolling.append(r) return r + def update_dot_roi(self, roi: ROI) -> None: + self._dot_roi = roi + @property def rolling(self) -> list[DetectionResult]: return list(self._rolling) diff --git a/src/atm/layout.py b/src/atm/layout.py new file mode 100644 index 0000000..3713665 --- /dev/null +++ b/src/atm/layout.py @@ -0,0 +1,48 @@ +import cv2 +import numpy as np + +from .config import ROI + +VIVID_COLORS = ("turquoise", "yellow", "dark_green", "dark_red", "light_green", "light_red", "gray") + + +def detect_strips( + full_dot_crop: np.ndarray, + palette: dict[str, tuple[tuple[int, int, int], float]], + min_gap_px: int, + min_strip_px: int, +) -> list[ROI]: + """Return list of sub-ROIs (relative to full_dot_crop) sorted left-to-right. + Empty list if no vivid pixels found.""" + h, w = full_dot_crop.shape[:2] + mask = np.zeros((h, w), dtype=np.uint8) + img_f = full_dot_crop.astype(np.float32) + for name in VIVID_COLORS: + if name not in palette: + continue + rgb, tol = palette[name] + bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32) + diff = np.linalg.norm(img_f - bgr, axis=2) + mask |= (diff < tol).astype(np.uint8) + kw = max(3, min_gap_px // 2) + kernel = np.ones((1, kw), dtype=np.uint8) + closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) + n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8) + strips: list[tuple[int, int]] = [] + for i in range(1, n_labels): + x = int(stats[i, cv2.CC_STAT_LEFT]) + ww = int(stats[i, cv2.CC_STAT_WIDTH]) + if ww < min_strip_px: + continue + strips.append((x, x + ww)) + strips.sort() + return [ROI(x=xs, y=0, w=xe - xs, h=h) for (xs, xe) in strips] + + +def _strips_match(a: list[ROI], b: list[ROI], tol: int = 10) -> bool: + if len(a) != len(b): + return False + return all( + abs(ra.x - rb.x) <= tol and abs((ra.x + ra.w) - (rb.x + rb.w)) <= tol + for ra, rb in zip(a, b) + ) diff --git a/src/atm/main.py b/src/atm/main.py index dffed8d..dbeb37f 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -7,7 +7,7 @@ import contextlib import os import sys import time -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, tzinfo from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Protocol, cast @@ -572,11 +572,10 @@ def _build_resume_info_text(was_drift: bool, skip_now: str | None) -> "tuple[str return ("Monitorizare reluată", "") -def _draw_roi_cyan(annotated, cfg) -> None: - """Draw the dot_roi cyan rectangle onto annotated. Shared by annotate helpers.""" +def _draw_roi_cyan(annotated, roi) -> None: + """Draw the given ROI as a cyan rectangle onto annotated. Shared by annotate helpers.""" import cv2 # type: ignore[import-untyped] - x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h - cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) + cv2.rectangle(annotated, (roi.x, roi.y), (roi.x + roi.w, roi.y + roi.h), (0, 255, 255), 2) _BASELINE_PHASH_LINE = __import__("re").compile( @@ -644,13 +643,14 @@ def _save_inspect_frame( fires_dir: Path, now: float, audit: "_AuditLike | None" = None, + strips: "list | None" = None, ) -> "tuple[Path | None, list[dict]]": """Annotate frame with top-3 rightmost dots + classifications. Returns (path, detections). - FSM pick (rightmost, idx 0) → thick red circle. Neighbors (idx 1, 2) → thin circle - in the classified color (BGR derived from cfg.colors[name].rgb at runtime, UNKNOWN - → gray). Labels `{name} d={distance}` next to each circle. Price overlay for - rightmost dot (same as _save_annotated_frame). Fail-safe: any error → (None, []). + Per-strip detection when ``strips`` is provided (multi-chart split). Each strip + runs its own find_top_dots; circles are offset by the strip origin. Detections + carry a ``strip_idx`` so callers can distinguish which chart they came from. + Single-chart fallback uses ``[cfg.dot_roi]``. """ try: import cv2 # type: ignore[import-untyped] @@ -666,58 +666,51 @@ def _save_inspect_frame( ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") fpath = fires_dir / f"{ts_str}_inspect.png" annotated = frame.copy() - _draw_roi_cyan(annotated, cfg) bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18) - dot_crop = crop_roi(frame, cfg.dot_roi) - positions = find_top_dots(dot_crop, bg_rgb, n=3) palette = {k: (v.rgb, v.tolerance) for k, v in cfg.colors.items() if k != "background"} + strip_list = strips if strips else [cfg.dot_roi] + detections: list[dict] = [] - for idx, (cx, cy) in enumerate(positions): - rgb = pixel_rgb(dot_crop, cx, cy) - match = classify_pixel(rgb, palette) - pos_abs = (cfg.dot_roi.x + cx, cfg.dot_roi.y + cy) - detections.append({ - "idx": idx, "name": match.name, "rgb": rgb, - "distance": match.distance, "confidence": match.confidence, - "pos_abs": pos_abs, - }) + for strip_idx, strip in enumerate(strip_list): + _draw_roi_cyan(annotated, strip) + dot_crop = crop_roi(frame, strip) + # Cer N+spare buline ca să avem rezerve după filtrarea UNKNOWN-urilor + # (text-artefacte / patch-uri întunecate adiacente bulinei rightmost). + positions = find_top_dots(dot_crop, bg_rgb, n=8) + kept = 0 + for cx, cy in positions: + if kept >= 3: + break + rgb = pixel_rgb(dot_crop, cx, cy) + match = classify_pixel(rgb, palette) + if match.name == "UNKNOWN": + continue + pos_abs = (strip.x + cx, strip.y + cy) + detections.append({ + "idx": kept, "name": match.name, "rgb": rgb, + "distance": match.distance, "confidence": match.confidence, + "pos_abs": pos_abs, "strip_idx": strip_idx, + }) + kept += 1 - # Markerii stau pe un rând SUB ROI, aliniat vertical cu bulina prin x. - # Toate cercurile au aceeași rază (r=7), pline, în culoarea clasificată. - # Primary (FSM pick, rightmost) = tick vertical roșu care leagă markerul de bulina. - marker_y = cfg.dot_roi.y + cfg.dot_roi.h + 8 - for det in reversed(detections): # neighbors first, primary last on top - pos_abs = det["pos_abs"] - name = det["name"] - if name in cfg.colors: - rgb_pal = cfg.colors[name].rgb - bgr = (int(rgb_pal[2]), int(rgb_pal[1]), int(rgb_pal[0])) - else: - bgr = (128, 128, 128) - marker_pos = (pos_abs[0], marker_y) - cv2.circle(annotated, marker_pos, 7, bgr, -1) - if det["idx"] == 0: - cv2.line(annotated, (pos_abs[0], marker_y - 7), (pos_abs[0], pos_abs[1] + 4), (0, 0, 255), 1) - - # Price overlay on rightmost (reuse same formula as _save_annotated_frame) - if detections and hasattr(cfg, "y_axis"): - try: - dot_y = detections[0]["pos_abs"][1] - ya = cfg.y_axis - slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y) - price = ya.p1_price + (dot_y - ya.p1_y) * slope - w_frame = annotated.shape[1] - text = f"${price:.2f}" - font = cv2.FONT_HERSHEY_SIMPLEX - scale, thickness = 1.2, 3 - (tw, th), _ = cv2.getTextSize(text, font, scale, thickness) - tx, ty = w_frame - tw - 10, th + 10 - cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1) - cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA) - except Exception: - pass + # Markerii stau pe un rând SUB ROI, aliniat vertical cu bulina prin x. + # Primary (FSM pick, rightmost) = tick vertical roșu peste markeri. + marker_y = strip.y + strip.h + 8 + strip_dets = [d for d in detections if d["strip_idx"] == strip_idx] + for det in reversed(strip_dets): + pos_abs = det["pos_abs"] + name = det["name"] + if name in cfg.colors: + rgb_pal = cfg.colors[name].rgb + bgr = (int(rgb_pal[2]), int(rgb_pal[1]), int(rgb_pal[0])) + else: + bgr = (128, 128, 128) + marker_pos = (pos_abs[0], marker_y) + cv2.circle(annotated, marker_pos, 7, bgr, -1) + if det["idx"] == 0: + cv2.line(annotated, (pos_abs[0], marker_y - 7), (pos_abs[0], pos_abs[1] + 4), (0, 0, 255), 1) cv2.imwrite(str(fpath), annotated) return fpath, detections @@ -743,22 +736,52 @@ _COLOR_EMOJI = { def _format_inspect_caption(detections: list[dict], cfg) -> str: """Compact caption: leftmost = c1, rightmost = cN (FSM pick). - Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption — - emoji-ul e cel mai aproape de „textul în aceeași culoare ca bulina"). + Single-strip: c1..cN flat list. Multi-strip: per-strip blocks cu header + ("stânga"/"dreapta" pt 2 chart-uri, "chart N" pt 3+) și pick separat per strip. + Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption). """ if not detections: return "nicio bulină detectată în ROI" - # detections ordered idx=0 (rightmost) → idx=N-1 (leftmost) - # Display left-to-right so user reads the chart order natural. - ordered = list(reversed(detections)) - total = len(ordered) - lines: list[str] = [] - for display_idx, det in enumerate(ordered, start=1): + + # Group by strip_idx. Old detections (pre multi-chart) lack the field — + # treat as single anonymous group. + groups: dict[int, list[dict]] = {} + for det in detections: + groups.setdefault(det.get("strip_idx", 0), []).append(det) + n_strips = len(groups) + + def _line(display_idx: int, det: dict, is_pick: bool) -> str: name = det["name"] emoji = _COLOR_EMOJI.get(name, "⚪") - suffix = " ← pick" if display_idx == total else "" - lines.append(f"{emoji} c{display_idx}: {name}{suffix}") - return "\n".join(lines) + suffix = " ← pick" if is_pick else "" + return f"{emoji} c{display_idx}: {name}{suffix}" + + def _strip_header(strip_idx: int) -> str: + if n_strips == 2: + return "stânga" if strip_idx == 0 else "dreapta" + return f"chart {strip_idx + 1}" + + if n_strips <= 1: + # Single-strip path preserved verbatim — leftmost=c1, rightmost=cN=pick. + only = next(iter(groups.values())) + ordered = list(reversed(only)) + total = len(ordered) + return "\n".join( + _line(i + 1, d, is_pick=(i + 1 == total)) + for i, d in enumerate(ordered) + ) + + # Multi-strip: one block per strip (sorted by strip_idx ascending). + blocks: list[str] = [] + for sidx in sorted(groups): + dets = groups[sidx] + ordered = list(reversed(dets)) # leftmost first + total = len(ordered) + lines = [f"[{_strip_header(sidx)}]"] + for i, det in enumerate(ordered): + lines.append(_line(i + 1, det, is_pick=(i + 1 == total))) + blocks.append("\n".join(lines)) + return "\n".join(blocks) def _save_annotated_frame( @@ -770,20 +793,19 @@ def _save_annotated_frame( audit: _AuditLike | None = None, dot_pos_abs: "tuple[int, int] | None" = None, canary_ok: bool = True, + roi: "Any | None" = None, ) -> "Path | None": - """Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``. + """Save BGR frame with cyan ROI rect to ``logs/fires/{ts}_{label}.png``. - Returns the path on success, ``None`` on any error. Failures are logged to - audit (when provided) so disk-full / permission issues don't become silent - regressions. Never raises — snapshot is a best-effort enhancement, the - text alert must still go out. + ``roi`` overrides ``cfg.dot_roi`` for the rectangle (used by multi-chart split + so per-strip alerts highlight the correct chart). Falls back to cfg.dot_roi + when None. - dot_pos_abs + canary_ok: when both are set the price overlay is drawn - (y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted - since calibration may be stale. + Returns the path on success, ``None`` on any error. Never raises — snapshot + is best-effort, the text alert must still go out. """ try: - import cv2 # type: ignore[import-untyped] + import cv2 # type: ignore[import-untyped] # noqa: F401 except ImportError as exc: if audit is not None: try: @@ -792,26 +814,12 @@ def _save_annotated_frame( pass return None try: + import cv2 # type: ignore[import-untyped] ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") fpath = fires_dir / f"{ts_str}_{label}.png" annotated = frame.copy() - _draw_roi_cyan(annotated, cfg) - if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"): - try: - _, dot_y = dot_pos_abs - ya = cfg.y_axis - slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y) - price = ya.p1_price + (dot_y - ya.p1_y) * slope - w_frame = annotated.shape[1] - text = f"${price:.2f}" - font = cv2.FONT_HERSHEY_SIMPLEX - scale, thickness = 1.2, 3 - (tw, th), _ = cv2.getTextSize(text, font, scale, thickness) - tx, ty = w_frame - tw - 10, th + 10 - cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1) - cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA) - except Exception: - pass # price overlay is best-effort; never break the screenshot + roi_to_draw = roi if roi is not None else cfg.dot_roi + _draw_roi_cyan(annotated, roi_to_draw) cv2.imwrite(str(fpath), annotated) return fpath except Exception as exc: @@ -829,15 +837,27 @@ def _build_heartbeat_alert( fire_count: int, uptime_h: float, canary_paused: bool, + charts: "list | None" = None, ) -> "Alert": """Construct the periodic heartbeat Alert. - When canary is drift-paused the title/body reflect it explicitly — - 2026-04-21 bug: previously the heartbeat said "activ ARMED_SELL" while - detection had been dead for hours, misleading the user into thinking - the system was running. + Single-chart compat: when ``charts`` is None or has length <=1, body is + ``"{state} | semnale: N | Hh"`` (drift-pause appended). Multi-chart split: + one line per chart with ``[chart_id] STATE`` plus a tail signals/uptime line. """ title = "⚠️ pauzat (drift)" if canary_paused else "activ" + if charts and len(charts) >= 2: + from atm.notifier import _alert_prefix as _prefix + chart_lines = [] + for c in charts: + label = _prefix(c.chart_id).strip() + state_value = getattr(getattr(c.fsm, "state", None), "value", "—") + chart_lines.append(f"{label} {state_value}".strip()) + tail = f"| semnale: {fire_count} | {uptime_h:.1f}h" + if canary_paused: + tail = f"| semnale: {fire_count} | {uptime_h:.1f}h [drift-pause]" + body = "\n".join(chart_lines) + "\n" + tail + return Alert(kind="heartbeat", title=title, body=body) state_label = f"{fsm_state} [drift-pause]" if canary_paused else fsm_state body = f"{state_label} | semnale: {fire_count} | {uptime_h:.1f}h" return Alert(kind="heartbeat", title=title, body=body) @@ -1048,6 +1068,23 @@ class _TickSyncResult: first_consumed: bool = False late_start: bool = False new_color: str | None = None # corpus sample color when changed + chart_id: str = "" # multi-chart split: which chart produced this result + + +@dataclass +class ChartState: + """Per-chart split-workspace state (FSM + Detector + sub-ROI). + + chart_id="" → single-chart mode (no Alert prefix). "left"/"right" for n=2. + "chart_0", "chart_1", ... for n>2. Each chart has its own debounce + FSM + independent of siblings. + """ + chart_id: str + sub_roi: Any + detector: Any + fsm: Any + first_accepted: bool = True + last_saved_color: "str | None" = None @dataclass @@ -1076,6 +1113,7 @@ class RunContext: # Pending /rebase proposal: (proposed_ts, new_phash, config_path) or None. # One slot; a second /rebase overwrites. `/rebase confirm` applies if within TTL. pending_rebase: Any = None + charts: list = field(default_factory=list) # list[ChartState] — multi-chart split @dataclass @@ -1086,6 +1124,7 @@ class _LoopState: levels_extractor: Any = None fire_count: int = 0 start: float = 0.0 + n_primed_global: int = 0 @dataclass @@ -1210,21 +1249,32 @@ def _sync_detection_tick( last_saved_color: "str | None", now: float, samples_dir: Path, + chart_id: str = "", + sub_roi: Any = None, + frame: Any = None, ) -> _TickSyncResult: - """One full detection tick (blocking I/O). Runs in asyncio.to_thread.""" - frame = capture() - if frame is None: - audit.log({"ts": now, "event": "window_lost"}) - return _TickSyncResult() + """One full detection tick (blocking I/O). Runs in asyncio.to_thread. - cr = canary.check(frame) - if canary.is_paused: - audit.log({"ts": now, "event": "paused", "drift": cr.distance}) - return _TickSyncResult(frame=frame) + When ``frame`` is provided, capture/canary work is skipped (caller already + did them — multi-chart orchestrator). ``chart_id`` is propagated to all + Alerts emitted by this tick. ``sub_roi`` overrides cfg.dot_roi for the + cyan rect on saved screenshots (per-chart ROI). + """ + if frame is None: + frame = capture() + if frame is None: + audit.log({"ts": now, "event": "window_lost"}) + return _TickSyncResult(chart_id=chart_id) + + cr = canary.check(frame) + if canary.is_paused: + audit.log({"ts": now, "event": "paused", "drift": cr.distance}) + return _TickSyncResult(frame=frame, chart_id=chart_id) res = detector.step(now, frame) detection_log.log({ "ts": now, "event": "frame", + "chart_id": chart_id, "window_found": res.window_found, "dot_found": res.dot_found, "rgb": list(res.rgb) if res.rgb is not None else None, @@ -1236,9 +1286,10 @@ def _sync_detection_tick( }) if not (res.accepted and res.color): - return _TickSyncResult(frame=frame, res=res) + return _TickSyncResult(frame=frame, res=res, chart_id=chart_id) is_first = first_accepted + roi_for_draw = sub_roi if sub_roi is not None else cfg.dot_roi def _snapshot(kind: str, label: str) -> "Path | None": if not getattr(cfg.attach_screenshots, kind, True): @@ -1246,18 +1297,29 @@ def _sync_detection_tick( return _save_annotated_frame( frame, cfg, fires_dir, label, now, audit=audit, dot_pos_abs=getattr(res, "dot_pos_abs", None), - canary_ok=True, + canary_ok=True, roi=roi_for_draw, ) - tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot, cfg=cfg) + # Wrap notifier so every Alert from _handle_tick carries chart_id. + if chart_id: + class _ChartNotifier: + def send(self, alert: Alert) -> None: + alert.chart_id = chart_id + notifier.send(alert) + wrapped_notifier: _NotifierLike = _ChartNotifier() + else: + wrapped_notifier = notifier + + tr = _handle_tick(fsm, res.color, now, wrapped_notifier, audit, is_first, snapshot=_snapshot, cfg=cfg) if tr is None: - return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True) + return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True, chart_id=chart_id) new_color: str | None = None if res.color != last_saved_color: ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") - sample_path = samples_dir / f"{ts_str}_{res.color}.png" + suffix = f"_{chart_id}" if chart_id else "" + sample_path = samples_dir / f"{ts_str}_{res.color}{suffix}.png" try: import cv2 # type: ignore[import-untyped] cv2.imwrite(str(sample_path), frame) @@ -1271,7 +1333,7 @@ def _sync_detection_tick( fire_path = _save_annotated_frame( frame, cfg, fires_dir, tr.trigger, now, audit=audit, dot_pos_abs=getattr(res, "dot_pos_abs", None), - canary_ok=True, + canary_ok=True, roi=roi_for_draw, ) notifier.send(Alert( kind="trigger", @@ -1279,17 +1341,19 @@ def _sync_detection_tick( body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", image_path=fire_path, direction=tr.trigger, + chart_id=chart_id, )) return _TickSyncResult( frame=frame, res=res, tr=tr, - first_consumed=is_first, new_color=new_color, + first_consumed=is_first, new_color=new_color, chart_id=chart_id, ) def _brief_status(ctx) -> str: h = (time.monotonic() - ctx.state.start) / 3600 - return f"{ctx.fsm.state.value} | semnale: {ctx.state.fire_count} | {h:.1f}h" + fsm_state = ctx.charts[0].fsm.state.value if ctx.charts else ctx.fsm.state.value + return f"{fsm_state} | semnale: {ctx.state.fire_count} | {h:.1f}h" async def _run_tick(ctx: RunContext) -> _TickSyncResult: @@ -1323,48 +1387,264 @@ async def _run_tick(ctx: RunContext) -> _TickSyncResult: ) -async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None: - """Scheduler start/stop + levels extraction. No-op if res is None/late_start.""" - if result.first_consumed: - ctx.state.first_accepted = False - if result.new_color is not None: - ctx.state.last_saved_color = result.new_color +def _chart_id_for_index(i: int, n: int) -> str: + """Map (idx, total) → chart_id. n=1 → "" ; n=2 → "left"/"right" ; n>2 → "chart_N".""" + if n == 1: + return "" + if n == 2: + return ("left", "right")[i] + return f"chart_{i}" - tr = result.tr - res = result.res - if result.late_start or res is None: +def _commit_layout_change(ctx: RunContext, new_strips: list, now: float) -> None: + """Replace ctx.charts with fresh ChartState objects for new_strips. + + Resets per-chart FSM (StateMachine(lockout_s)) and Detector (with + dot_roi_override=strip), zeroes n_primed_global, stops scheduler if running, + audits the layout_change event, sends a status Alert. + """ + from atm.detector import Detector + from atm.state_machine import StateMachine + + cfg = ctx.cfg + old_n = len(ctx.charts) + new_n = len(new_strips) + + new_charts: list = [] + for i, strip in enumerate(new_strips): + chart_id = _chart_id_for_index(i, new_n) + det = Detector(cfg, ctx.capture, dot_roi_override=strip) + fsm = StateMachine(lockout_s=cfg.lockout_s) + new_charts.append(ChartState( + chart_id=chart_id, sub_roi=strip, detector=det, fsm=fsm, + first_accepted=True, last_saved_color=None, + )) + + ctx.state.n_primed_global = 0 + if getattr(ctx.scheduler, "is_running", False): + ctx.scheduler.stop() + ctx.audit.log({"ts": now, "event": "scheduler_stopped", "reason": "layout_change"}) + + ctx.audit.log({ + "ts": now, + "event": "layout_change", + "old_n": old_n, + "new_n": new_n, + "strips": [{"x": s.x, "w": s.w} for s in new_strips], + }) + # Suppress notification on first-ever detection (old_n=0 = startup bootstrap, + # not a real layout change). + if old_n > 0: + ctx.notifier.send(Alert( + kind="status", + title=f"🔄 Layout TS schimbat: {old_n} → {new_n} ferestre. FSM resetat.", + body="", + silent=True, + )) + ctx.charts = new_charts + # Keep ctx.fsm / ctx.detector in sync with charts[0] so legacy code paths + # (brief_status, /ss state_hdr, /status) continue to work after layout change. + if new_charts: + ctx.fsm = new_charts[0].fsm + ctx.detector = new_charts[0].detector + + +def _detect_strips_for_ctx(ctx: RunContext, frame: Any) -> list: + """Lazy strip detection for ctx.cfg + frame. Empty list when no vivid pixels.""" + from atm.layout import detect_strips + from atm.vision import crop_roi + cfg = ctx.cfg + strip_h = cfg.dot_roi.h + min_strip_px = max(150, strip_h * 8) + min_gap_px = max(20, int(strip_h * 0.8)) + palette = { + name: (spec.rgb, spec.tolerance) + for name, spec in cfg.colors.items() + if name != "background" + } + full_crop = crop_roi(frame, cfg.dot_roi) + raw = detect_strips(full_crop, palette, min_gap_px, min_strip_px) + # detect_strips returns ROIs relative to the cropped region. Translate + # back into frame coordinates by adding cfg.dot_roi origin so per-strip + # detector crops can use the absolute ROI. + from atm.config import ROI + return [ + ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y + r.y, w=r.w, h=r.h) + for r in raw + ] + + +async def _run_multi_tick(ctx: RunContext) -> "list[_TickSyncResult]": + """Multi-chart orchestrator: capture once, detect strips, run per-chart ticks. + + Returns a list of _TickSyncResult — one per chart (empty list if capture + failed or canary paused). Layout management is in-process: when strips + don't match current ctx.charts, _commit_layout_change is invoked before + feeding ticks to the new charts. + """ + now = time.time() + if ctx.lifecycle is not None: + skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary) + sb = _brief_status(ctx) + transition = _maybe_log_transition( + skip, ctx.lifecycle, now, ctx.audit, ctx.notifier, status_body=sb, + ) + if transition == "market_open" and ctx.cfg.window_title: + title = await asyncio.to_thread(_focus_window_by_title, ctx.cfg.window_title) + ctx.audit.log({"ts": now, "event": "window_focused", "command": "market_open", "title": title}) + await asyncio.sleep(0.15) + if skip is not None: + return [] + + frame = await asyncio.to_thread(ctx.capture) + if frame is None: + ctx.audit.log({"ts": now, "event": "window_lost"}) + return [] + + cr = ctx.canary.check(frame) + if ctx.canary.is_paused: + ctx.audit.log({"ts": now, "event": "paused", "drift": cr.distance}) + return [] + + new_strips = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame) + current_strips = [c.sub_roi for c in ctx.charts] + if new_strips: + if len(new_strips) != len(current_strips): + # Real count change (1↔2 ferestre) → rebuild charts. + _commit_layout_change(ctx, new_strips, now) + else: + # Same count → silent sub_roi update regardless of position/width jitter. + # Width pulsations (e.g. 792↔810px from antialiased edges) must not + # reset FSM or emit alerts — that path produced 500+ false alerts/day. + for c, ns in zip(ctx.charts, new_strips): + c.sub_roi = ns + if hasattr(c.detector, "update_dot_roi"): + c.detector.update_dot_roi(ns) + + results: list[_TickSyncResult] = [] + for c in ctx.charts: + result = await asyncio.to_thread( + _sync_detection_tick, + ctx.capture, ctx.canary, ctx.cfg, c.detector, c.fsm, + ctx.notifier, ctx.audit, ctx.detection_log, + ctx.fires_dir, c.first_accepted, c.last_saved_color, + now, ctx.samples_dir, + c.chart_id, c.sub_roi, frame, + ) + results.append(result) + return results + + +async def _handle_fsm_result(ctx: RunContext, result: "Any") -> None: + """Scheduler start/stop + levels extraction. + + Accepts either a single ``_TickSyncResult`` (legacy single-chart path) or a + ``list[_TickSyncResult]`` (multi-chart split). For multi-chart, scheduler + arbitration uses ``ctx.state.n_primed_global`` (start when 0→1, stop when + 1→0). Levels extraction is disabled when ``len(ctx.charts) >= 2`` since the + extractor reads the full window y-axis which is per-chart-private in split. + """ + results: list[_TickSyncResult] + if isinstance(result, list): + results = result + else: + results = [result] + + if not results: return - if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None): - if tr.reason == "prime" and not ctx.scheduler.is_running: - ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s) - ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"}) - elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running: - ctx.scheduler.stop() - ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason}) + multi_chart = len(ctx.charts) >= 2 - if tr is not None and tr.trigger and not tr.locked: - ctx.state.fire_count += 1 - if ctx.scheduler.is_running: - ctx.scheduler.stop() - ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"}) - ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time()) + for r in results: + if r.first_consumed: + if multi_chart: + for c in ctx.charts: + if c.chart_id == r.chart_id: + c.first_accepted = False + break + else: + ctx.state.first_accepted = False + if ctx.charts: + ctx.charts[0].first_accepted = False + if r.new_color is not None: + if multi_chart: + for c in ctx.charts: + if c.chart_id == r.chart_id: + c.last_saved_color = r.new_color + break + else: + ctx.state.last_saved_color = r.new_color + if ctx.charts: + ctx.charts[0].last_saved_color = r.new_color - if ctx.state.levels_extractor is not None and result.frame is not None: - lr = ctx.state.levels_extractor.step(result.frame, time.time()) - if lr.status in ("complete", "timeout"): - if lr.status == "complete" and lr.levels: - ctx.notifier.send(Alert( - kind="levels", - title="Niveluri", - body=( - f"SL={lr.levels.sl} " - f"TP1={lr.levels.tp1} " - f"TP2={lr.levels.tp2}" - ), - )) - ctx.state.levels_extractor = None + if not multi_chart: + # Legacy single-chart path — preserve scheduler+levels semantics exactly. + single = results[0] + tr = single.tr + res = single.res + + if single.late_start or res is None: + return + + if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None): + if tr.reason == "prime" and not ctx.scheduler.is_running: + ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s) + ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"}) + elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running: + ctx.scheduler.stop() + ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason}) + + if tr is not None and tr.trigger and not tr.locked: + ctx.state.fire_count += 1 + if ctx.scheduler.is_running: + ctx.scheduler.stop() + ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"}) + ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time()) + + if ctx.state.levels_extractor is not None and single.frame is not None: + lr = ctx.state.levels_extractor.step(single.frame, time.time()) + if lr.status in ("complete", "timeout"): + if lr.status == "complete" and lr.levels: + ctx.notifier.send(Alert( + kind="levels", + title="Niveluri", + body=( + f"SL={lr.levels.sl} " + f"TP1={lr.levels.tp1} " + f"TP2={lr.levels.tp2}" + ), + )) + ctx.state.levels_extractor = None + return + + # Multi-chart path: aggregate scheduler arbitration over all results. + for r in results: + tr = r.tr + res = r.res + if r.late_start or res is None or tr is None: + continue + if not (getattr(res, "accepted", False) and getattr(res, "color", None)): + continue + + if tr.reason == "prime": + ctx.state.n_primed_global += 1 + if ctx.state.n_primed_global == 1 and not ctx.scheduler.is_running: + ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s) + ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"}) + elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm"): + if ctx.state.n_primed_global > 0: + ctx.state.n_primed_global -= 1 + if ctx.state.n_primed_global == 0 and ctx.scheduler.is_running: + ctx.scheduler.stop() + ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason}) + + if tr.trigger and not tr.locked: + ctx.state.fire_count += 1 + if ctx.state.n_primed_global == 0 and ctx.scheduler.is_running: + ctx.scheduler.stop() + ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"}) + # Levels disabled in split (per-chart y-axis would need its own calib). + ctx.audit.log({"ts": time.time(), "event": "levels_skipped_split"}) async def _dispatch_command(ctx: RunContext, cmd) -> None: @@ -1436,8 +1716,21 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: body="", )) return + # Always re-detect strips fresh — out-of-window or pre-first-tick, + # ctx.charts is empty (lopul de detection nu rulează când e closed), + # dar /ss trebuie să arate corect oricând. Fallback pe ctx.charts + # dacă detecția eșuează, apoi pe cfg.dot_roi via ss_strips=None. + ss_strips: "list | None" = None + try: + fresh = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame_ss) + except Exception: + fresh = [] + if fresh: + ss_strips = fresh + elif ctx.charts: + ss_strips = [c.sub_roi for c in ctx.charts] path_ss, detections_ss = await asyncio.to_thread( - _save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit, + _save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit, ss_strips, ) caption_ss = _format_inspect_caption(detections_ss, ctx.cfg) state_hdr = _fsm_step_label(ctx.fsm, now_ss) @@ -1482,8 +1775,19 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: if frame_r is None: capture_failed = True else: + # Same fresh-detect logic ca /ss: out-of-window ctx.charts e gol, + # dar /resume tot trebuie să arate corect ambele chart-uri. + r_strips: "list | None" = None + try: + fresh_r = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame_r) + except Exception: + fresh_r = [] + if fresh_r: + r_strips = fresh_r + elif ctx.charts: + r_strips = [c.sub_roi for c in ctx.charts] path_r, detections_r = await asyncio.to_thread( - _save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit, + _save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit, r_strips, ) was_drift = bool(getattr(ctx.canary, "is_paused", False)) @@ -1833,6 +2137,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts), lifecycle=lifecycle, ) + # ctx.charts is populated on the first detection tick by _run_multi_tick + # which calls detect_strips → _commit_layout_change. Starting empty means + # the real strip geometry is used (not a hardcoded cfg.dot_roi placeholder) + # and avoids spurious layout-change alerts. + ctx.charts = [] # ------------------------------------------------------------------ # Nested async coroutines — heartbeat captures notifier + heartbeat_due @@ -1853,6 +2162,7 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No fire_count=ctx.state.fire_count, uptime_h=uptime_h, canary_paused=paused, + charts=ctx.charts if len(ctx.charts) >= 2 else None, )) heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 @@ -1860,8 +2170,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No while True: if duration_s is not None and (time.monotonic() - start) >= duration_s: break - result = await _run_tick(ctx) - await _handle_fsm_result(ctx, result) + # Always use _run_multi_tick: it detects strips each tick, + # manages layout changes (1→2 or 2→1), and handles single-chart + # mode identically to the old _run_tick path. + results = await _run_multi_tick(ctx) + await _handle_fsm_result(ctx, results) await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang await asyncio.sleep(cfg.loop_interval_s) diff --git a/src/atm/notifier/__init__.py b/src/atm/notifier/__init__.py index 2a61f79..841efd2 100644 --- a/src/atm/notifier/__init__.py +++ b/src/atm/notifier/__init__.py @@ -12,6 +12,22 @@ class Alert: image_path: Path | None = None # annotated screenshot direction: str | None = None # "BUY"/"SELL" when kind=trigger silent: bool = False # disable_notification for Telegram; ignored by Discord + chart_id: str = "" + + +def _alert_prefix(chart_id: str) -> str: + """Return Telegram title prefix for chart_id. Empty for single-chart mode.""" + if not chart_id: + return "" + if chart_id == "left": + return "[stânga] " + if chart_id == "right": + return "[dreapta] " + try: + n = int(chart_id.split("_")[1]) + return f"[chart {n + 1}] " + except (IndexError, ValueError): + return f"[{chart_id}] " class Notifier(Protocol): diff --git a/tests/test_detector.py b/tests/test_detector.py index 4cfb81e..11448ee 100644 --- a/tests/test_detector.py +++ b/tests/test_detector.py @@ -283,6 +283,62 @@ def test_real_screenshot_rightmost_dot(screenshot: str, expected: str) -> None: ) +def test_dot_roi_override_uses_sub_roi() -> None: + """dot_roi_override must be used instead of cfg.dot_roi for crop + offset. + + Paint a yellow dot inside the override ROI but **outside** cfg.dot_roi. + The default DOT_ROI is (10,10,280,80); we override with an ROI placed + well to the right (x=200, w=80) so the painted dot only intersects the + override. If the detector still cropped from cfg.dot_roi the yellow dot + would land at the rightmost edge of the larger ROI as well — so we use + a frame that has nothing in the cfg.dot_roi region except inside the + override window, and assert dot_pos_abs falls inside the override. + """ + override = ROI(x=200, y=20, w=80, h=60) + # Background-only frame, then paint yellow only inside the override + frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8) + fy0, fy1 = override.y, override.y + override.h + fx0, fx1 = override.x + 50, override.x + override.w # right edge of override + frame[fy0:fy1, fx0:fx1] = YELLOW_BGR + + cfg = _make_cfg(debounce_depth=1) + det = Detector(cfg, capture=lambda: frame, dot_roi_override=override) + + r = det.step(0.0) + + assert r.dot_found is True + assert r.match is not None + assert r.match.name == "yellow" + assert r.dot_pos_abs is not None + abs_x, abs_y = r.dot_pos_abs + assert override.x <= abs_x < override.x + override.w + assert override.y <= abs_y < override.y + override.h + + +def test_dot_pos_abs_with_offset() -> None: + """dot_pos_abs must include the override ROI's (x, y) offset.""" + override = ROI(x=100, y=20, w=50, h=40) + frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8) + # Paint a single full-height yellow stripe at roi-local x in [40, 50) + # so find_rightmost_dot lands somewhere inside that stripe. + fy0, fy1 = override.y, override.y + override.h + fx0, fx1 = override.x + 40, override.x + 50 + frame[fy0:fy1, fx0:fx1] = YELLOW_BGR + + cfg = _make_cfg(debounce_depth=1) + det = Detector(cfg, capture=lambda: frame, dot_roi_override=override) + + r = det.step(0.0) + + assert r.dot_found is True + assert r.dot_pos_abs is not None + abs_x, abs_y = r.dot_pos_abs + # Painted stripe: roi-local x in [40,50), y in [0, h). Absolute coords + # must be offset by override.(x, y). + assert override.x + 40 <= abs_x < override.x + 50 + assert override.y <= abs_y < override.y + override.h + + def test_fused_blob_samples_rightmost_dot() -> None: """Fused multi-colour stripe must classify the rightmost colour, not the centroid colour. Pre-fix the centroid fell on an interior gray segment diff --git a/tests/test_layout.py b/tests/test_layout.py new file mode 100644 index 0000000..5870ef2 --- /dev/null +++ b/tests/test_layout.py @@ -0,0 +1,163 @@ +import numpy as np +import pytest + +from atm.config import ROI +from atm.layout import _strips_match, detect_strips + + +PALETTE = { + "turquoise": ((0, 253, 253), 60.0), + "yellow": ((253, 253, 0), 60.0), + "dark_green": ((0, 128, 0), 60.0), + "dark_red": ((128, 0, 0), 60.0), + "light_green": ((0, 255, 0), 60.0), + "light_red": ((255, 0, 0), 60.0), +} + + +def _blank(h: int = 20, w: int = 200) -> np.ndarray: + return np.zeros((h, w, 3), dtype=np.uint8) + + +def _paint(img: np.ndarray, x0: int, x1: int, rgb: tuple[int, int, int]) -> None: + """Paint vivid color into BGR image (palette stores RGB).""" + bgr = (rgb[2], rgb[1], rgb[0]) + img[:, x0:x1] = bgr + + +def test_single_strip(): + img = _blank(20, 200) + _paint(img, 0, 200, (0, 253, 253)) # turquoise + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert len(out) == 1 + assert out[0].x == 0 + assert abs(out[0].w - 200) <= 1 + assert out[0].h == 20 + + +def test_split_50_50(): + img = _blank(20, 230) + _paint(img, 0, 100, (0, 253, 253)) + _paint(img, 130, 230, (253, 253, 0)) + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert len(out) == 2 + assert out[0].x < out[1].x # L->R + assert out[0].x == 0 + assert abs(out[0].w - 100) <= 1 + assert out[1].x == 130 + assert abs(out[1].w - 100) <= 1 + + +def test_split_asymmetric(): + img = _blank(20, 230) + _paint(img, 0, 70, (0, 253, 253)) # 35% width + _paint(img, 100, 230, (253, 253, 0)) # 65% width + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert len(out) == 2 + assert abs(out[0].w - 70) <= 1 + assert abs(out[1].w - 130) <= 1 + + +def test_gray_only_no_strip(): + img = _blank(20, 200) + img[:, 50:150] = (128, 128, 128) + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert out == [] + + +def test_cooldown_gray_dots_no_detect(): + img = _blank(20, 200) + # scattered gray dots + for x in (20, 50, 80, 110, 140, 170): + img[8:12, x:x + 4] = (100, 100, 100) + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert out == [] + + +def test_vivid_palette_match(): + img = _blank(20, 200) + _paint(img, 50, 80, (0, 255, 0)) # light_green + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert len(out) == 1 + + +def test_blank_frame(): + img = _blank(20, 200) + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert out == [] + + +def test_strip_too_narrow_filtered(): + img = _blank(20, 200) + _paint(img, 50, 53, (0, 253, 253)) # only 3px wide + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=10) + assert out == [] + + +def test_small_gap_fuses(): + img = _blank(20, 200) + _paint(img, 30, 70, (0, 253, 253)) + _paint(img, 75, 120, (0, 253, 253)) # 5px gap, < min_gap_px=20 + out = detect_strips(img, PALETTE, min_gap_px=20, min_strip_px=5) + assert len(out) == 1 + assert abs(out[0].x - 30) <= 2 + assert abs((out[0].x + out[0].w) - 120) <= 2 + + +def test_split_two_charts_with_interleaved_gray(): + # Regresie 2 ferestre TS: fiecare row de buline e mix vivid + gri, separate + # de un gap larg de background (dividerul dintre ferestre). Înainte de fix + # detect_strips picka doar runs vivid contiguu și rata fereastra stângă. + palette = {**PALETTE, "gray": ((128, 128, 128), 60.0)} + img = _blank(35, 1796) + # Left chart: dots vivid + gray la fiecare 26px, x=0..820 + for i, x in enumerate(range(0, 820, 26)): + rgb = (128, 128, 128) if i % 3 else (0, 128, 0) + _paint(img, x, x + 22, rgb) + # Window divider gap: x=820..910 rămâne background + # Right chart: same pattern, x=910..1796 + for i, x in enumerate(range(910, 1790, 26)): + rgb = (128, 128, 128) if i % 3 else (0, 128, 0) + _paint(img, x, x + 22, rgb) + out = detect_strips(img, palette, min_gap_px=28, min_strip_px=280) + assert len(out) == 2, f"expected 2 strips, got {len(out)}: {out}" + assert out[0].x == 0 + assert out[1].x >= 900 # right chart starts after divider + assert out[0].x + out[0].w < out[1].x # disjoint + + +def test_three_strips(): + img = _blank(20, 300) + _paint(img, 0, 60, (0, 253, 253)) + _paint(img, 100, 160, (253, 253, 0)) + _paint(img, 220, 300, (0, 255, 0)) + out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5) + assert len(out) == 3 + assert out[0].x < out[1].x < out[2].x + assert out[0].x == 0 + assert out[1].x == 100 + assert out[2].x == 220 + + +def test_strips_match_identical(): + a = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)] + b = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)] + assert _strips_match(a, b) is True + + +def test_strips_match_jitter_5px(): + a = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)] + b = [ROI(x=5, y=0, w=95, h=20), ROI(x=128, y=0, w=70, h=20)] + assert _strips_match(a, b, tol=10) is True + + +def test_strips_match_drift_12px(): + a = [ROI(x=0, y=0, w=100, h=20)] + b = [ROI(x=12, y=0, w=100, h=20)] + assert _strips_match(a, b, tol=10) is False + + +def test_strips_match_count_different(): + a = [ROI(x=0, y=0, w=100, h=20)] + b = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)] + assert _strips_match(a, b) is False diff --git a/tests/test_main.py b/tests/test_main.py index 9261927..c3a2168 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -257,6 +257,14 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path): monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler) + # Bootstrap a single chart so _run_multi_tick populates ctx.charts on tick 1. + # Frame is zeros → real detect_strips returns [] → without this, charts stays + # empty and the ScriptedDetector loop never advances (regression after the + # multi-chart refactor made _run_multi_tick the single detection path). + from atm.config import ROI + _bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)] + monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip) + with pytest.raises(_StopLoop): _main.run_live(cfg, duration_s=None) @@ -384,6 +392,11 @@ async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_p monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched) + # Bootstrap ctx.charts on tick 1 — see test_run_live_catchup_sell for context. + from atm.config import ROI + _bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)] + monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip) + with pytest.raises(_StopLoop): await _main.run_live_async(cfg, duration_s=None) @@ -1628,3 +1641,451 @@ async def test_status_window_line_when_oh_enabled(): status = [a for a in ctx.notifier.alerts if a.kind == "status"] body = status[0].body assert "fereastră: deschisă" in body + + +# --------------------------------------------------------------------------- +# Multi-chart split workspace tests +# --------------------------------------------------------------------------- + +def test_chart_state_defaults(): + """ChartState's first_accepted defaults to True; last_saved_color to None.""" + import atm.main as _main + from atm.config import ROI + + roi = ROI(x=0, y=0, w=200, h=35) + chart = _main.ChartState( + chart_id="", sub_roi=roi, detector=MagicMock(), fsm=MagicMock(), + ) + assert chart.first_accepted is True + assert chart.last_saved_color is None + assert chart.chart_id == "" + assert chart.sub_roi is roi + + +def test_alert_prefix_import(): + """_alert_prefix lives in atm.notifier and produces the expected prefixes.""" + from atm.notifier import _alert_prefix + assert _alert_prefix("") == "" + assert _alert_prefix("left") == "[stânga] " + assert _alert_prefix("right") == "[dreapta] " + assert _alert_prefix("chart_0") == "[chart 1] " + assert _alert_prefix("chart_2") == "[chart 3] " + + +def test_save_annotated_frame_no_price_overlay(tmp_path): + """_save_annotated_frame must NOT draw the price overlay anymore. + + Top-right area must contain no white pixels (text was rendered there before). + """ + import atm.main as _main + from atm.config import ROI, YAxisCalib, ColorSpec + + frame = np.zeros((200, 400, 3), dtype=np.uint8) + cfg = types.SimpleNamespace( + dot_roi=ROI(x=10, y=10, w=380, h=180), + y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=190, p2_price=50.0), + colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)}, + ) + + fpath = _main._save_annotated_frame( + frame, cfg, tmp_path, "test", now=123.0, + dot_pos_abs=(200, 100), canary_ok=True, + ) + assert fpath is not None + saved = cv2.imread(str(fpath)) + assert saved is not None + # Top-right corner where the "$..." text used to live (rows 0..40, cols 300..390). + # The cyan ROI rect is on top edge but only 2px thick → vast majority of that + # corner is still pure black. White text would average a much higher value. + corner = saved[0:40, 300:390] + # No bright white pixels (text was 255,255,255). + white_pixels = ((corner[:, :, 0] > 200) & (corner[:, :, 1] > 200) & (corner[:, :, 2] > 200)).sum() + assert white_pixels == 0, f"expected 0 white pixels (no price text), got {white_pixels}" + + +def test_save_annotated_frame_uses_roi_param(tmp_path): + """When roi= is passed, the cyan rect is drawn there (not at cfg.dot_roi).""" + import atm.main as _main + from atm.config import ROI, ColorSpec + + frame = np.zeros((200, 400, 3), dtype=np.uint8) + cfg = types.SimpleNamespace( + dot_roi=ROI(x=0, y=0, w=10, h=10), # tiny cfg ROI + colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)}, + ) + custom_roi = ROI(x=100, y=50, w=200, h=80) + fpath = _main._save_annotated_frame( + frame, cfg, tmp_path, "rtest", now=123.0, roi=custom_roi, + ) + assert fpath is not None + saved = cv2.imread(str(fpath)) + # ROI rect color is (0, 255, 255) BGR. Find any pixel where G+R are saturated and B=0. + edge = saved[49:52, 100:300] + rect_present = ((edge[:, :, 0] < 50) & (edge[:, :, 1] > 200) & (edge[:, :, 2] > 200)).any() + assert rect_present, "Expected ROI rect along custom roi top edge" + # Also check the cfg.dot_roi rect (x=0..10, y=0..10) is NOT drawn — proves we used roi=custom. + cfg_corner = saved[0:10, 0:10] + rect_at_cfg = ((cfg_corner[:, :, 0] < 50) & (cfg_corner[:, :, 1] > 200) & (cfg_corner[:, :, 2] > 200)).any() + assert not rect_at_cfg, "cfg.dot_roi rect must not appear when roi= override is used" + + +def test_commit_layout_change_resets_fsm(monkeypatch): + """_commit_layout_change rebuilds ctx.charts, resets FSM, zeroes n_primed_global.""" + import atm.main as _main + from atm.config import ROI + from atm.state_machine import StateMachine, State + + # Prebuild a ctx with one chart in ARMED_BUY. + cfg = MagicMock() + cfg.lockout_s = 60 + cfg.colors = {} + cfg.debounce_depth = 3 + fsm_old = StateMachine(lockout_s=60) + fsm_old.feed("turquoise", 1.0) # IDLE -> ARMED_BUY + assert fsm_old.state == State.ARMED_BUY + initial_chart = _main.ChartState( + chart_id="", sub_roi=ROI(x=0, y=0, w=400, h=35), + detector=MagicMock(), fsm=fsm_old, + ) + + state = _main._LoopState() + state.n_primed_global = 5 # any non-zero value + + notifier_alerts: list = [] + audit_events: list = [] + + class _N: + def send(self, a): notifier_alerts.append(a) + + class _A: + def log(self, e): audit_events.append(e) + + class _S: + is_running = True + def stop(self): + type(self).is_running = False + + ctx = _main.RunContext( + cfg=cfg, capture=lambda: None, canary=MagicMock(), + detector=MagicMock(), fsm=fsm_old, + notifier=_N(), audit=_A(), detection_log=_A(), + scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), + cmd_queue=MagicMock(), state=state, + levels_extractor_factory=lambda *a, **kw: None, + ) + ctx.charts = [initial_chart] + + new_strips = [ + ROI(x=0, y=0, w=200, h=35), + ROI(x=250, y=0, w=200, h=35), + ] + _main._commit_layout_change(ctx, new_strips, now=100.0) + + assert len(ctx.charts) == 2 + assert ctx.charts[0].chart_id == "left" + assert ctx.charts[1].chart_id == "right" + # New FSMs must be IDLE (fresh StateMachine) + assert ctx.charts[0].fsm.state == State.IDLE + assert ctx.charts[1].fsm.state == State.IDLE + # Old fsm not reused + assert ctx.charts[0].fsm is not fsm_old + # n_primed_global reset + assert state.n_primed_global == 0 + # layout_change audit + status alert + assert any(e.get("event") == "layout_change" and e["new_n"] == 2 for e in audit_events) + assert any("Layout TS schimbat" in a.title for a in notifier_alerts) + + +def test_commit_layout_change_chart_ids_for_n3(monkeypatch): + """For n>=3 charts use 'chart_0', 'chart_1', ...""" + import atm.main as _main + from atm.config import ROI + + cfg = MagicMock() + cfg.lockout_s = 60 + cfg.colors = {} + cfg.debounce_depth = 3 + state = _main._LoopState() + + class _N: + def send(self, a): pass + class _A: + def log(self, e): pass + class _S: + is_running = False + def stop(self): pass + + ctx = _main.RunContext( + cfg=cfg, capture=lambda: None, canary=MagicMock(), + detector=MagicMock(), fsm=MagicMock(), + notifier=_N(), audit=_A(), detection_log=_A(), + scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), + cmd_queue=MagicMock(), state=state, + levels_extractor_factory=lambda *a, **kw: None, + ) + ctx.charts = [] + + strips = [ROI(x=i * 100, y=0, w=80, h=35) for i in range(3)] + _main._commit_layout_change(ctx, strips, now=100.0) + assert [c.chart_id for c in ctx.charts] == ["chart_0", "chart_1", "chart_2"] + + +def test_strips_match_silent_update_jitter(): + """_strips_match returns True for <=10px jitter — used to trigger silent update.""" + from atm.layout import _strips_match + from atm.config import ROI + a = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)] + b = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)] + assert _strips_match(a, b, tol=10) is True + # Drift 12px breaks the match + c = [ROI(x=12, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)] + assert _strips_match(a, c, tol=10) is False + + +def test_build_heartbeat_alert_single_compat(): + """Without charts arg, _build_heartbeat_alert returns the legacy single-chart body.""" + import atm.main as _main + a = _main._build_heartbeat_alert( + fsm_state="IDLE", fire_count=0, uptime_h=1.0, canary_paused=False, + ) + assert a.kind == "heartbeat" + assert a.title == "activ" + assert a.body == "IDLE | semnale: 0 | 1.0h" + + +def test_build_heartbeat_alert_multi_chart(): + """charts=[left,right] body lines reference [stânga] / [dreapta].""" + import atm.main as _main + from atm.config import ROI + + fsm_left = types.SimpleNamespace(state=types.SimpleNamespace(value="ARMED_BUY")) + fsm_right = types.SimpleNamespace(state=types.SimpleNamespace(value="IDLE")) + cs_left = _main.ChartState( + chart_id="left", sub_roi=ROI(x=0, y=0, w=10, h=10), + detector=MagicMock(), fsm=fsm_left, + ) + cs_right = _main.ChartState( + chart_id="right", sub_roi=ROI(x=0, y=0, w=10, h=10), + detector=MagicMock(), fsm=fsm_right, + ) + a = _main._build_heartbeat_alert( + fsm_state="ignored", fire_count=2, uptime_h=1.5, canary_paused=False, + charts=[cs_left, cs_right], + ) + assert a.kind == "heartbeat" + assert "[stânga]" in a.body + assert "[dreapta]" in a.body + assert "ARMED_BUY" in a.body + assert "IDLE" in a.body + assert "semnale: 2" in a.body + + +@pytest.mark.asyncio +async def test_run_multi_tick_silent_jitter_update(monkeypatch, tmp_path): + """When detect_strips returns positions with <=10px jitter, sub_roi updates + silently (no _commit_layout_change → no extra status alert).""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + initial = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)] + jittered = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)] + + class _Det: + def __init__(self): self.last_roi = None + def step(self, ts, frame=None): + from atm.detector import DetectionResult + return DetectionResult( + ts=ts, window_found=True, dot_found=False, + rgb=None, match=None, accepted=False, color=None, + ) + def update_dot_roi(self, roi): + self.last_roi = roi + + class _FSM: + state = types.SimpleNamespace(value="IDLE") + _last_fire: dict = {} + + cfg = MagicMock() + cfg.lockout_s = 60 + cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False) + cfg.window_title = None + state_obj = _main._LoopState() + + notifier_alerts: list = [] + audit_events: list = [] + + class _N: + def send(self, a): notifier_alerts.append(a) + class _A: + def log(self, e): audit_events.append(e) + class _S: + is_running = False + def start(self, s): pass + def stop(self): pass + + canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False)) + + ctx = _main.RunContext( + cfg=cfg, capture=lambda: np.zeros((200, 600, 3), dtype=np.uint8), + canary=canary, detector=MagicMock(), fsm=_FSM(), + notifier=_N(), audit=_A(), detection_log=_A(), + scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path, + cmd_queue=MagicMock(), state=state_obj, + levels_extractor_factory=lambda *a, **kw: None, + lifecycle=None, + ) + det_l, det_r = _Det(), _Det() + ctx.charts = [ + _main.ChartState("left", initial[0], det_l, _FSM()), + _main.ChartState("right", initial[1], det_r, _FSM()), + ] + + # Stub strip detection to return jittered strips. + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: jittered) + + results = await _main._run_multi_tick(ctx) + assert len(results) == 2 + # Sub-ROI silently updated, but no layout_change event. + assert ctx.charts[0].sub_roi == jittered[0] + assert ctx.charts[1].sub_roi == jittered[1] + assert det_l.last_roi == jittered[0] + assert det_r.last_roi == jittered[1] + assert not any(e.get("event") == "layout_change" for e in audit_events) + # No status "Layout TS schimbat" alert. + assert not any("Layout TS schimbat" in a.title for a in notifier_alerts) + + +@pytest.mark.asyncio +async def test_run_multi_tick_silent_width_oscillation(monkeypatch, tmp_path): + """Regression: width pulsations (e.g. 792↔810px) on a stable chart count + must NOT trigger _commit_layout_change. Reproduces the production bug + visible in logs/2026-05-04.jsonl (576 layout_change events/day). + """ + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + initial = [ROI(x=0, y=0, w=792, h=35), ROI(x=912, y=0, w=845, h=35)] + pulsed = [ROI(x=0, y=0, w=810, h=35), ROI(x=912, y=0, w=863, h=35)] + + class _Det: + def __init__(self): self.last_roi = None + def step(self, ts, frame=None): + from atm.detector import DetectionResult + return DetectionResult( + ts=ts, window_found=True, dot_found=False, + rgb=None, match=None, accepted=False, color=None, + ) + def update_dot_roi(self, roi): + self.last_roi = roi + + class _FSM: + state = types.SimpleNamespace(value="PRIMED_BUY") + _last_fire: dict = {} + + cfg = MagicMock() + cfg.lockout_s = 60 + cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False) + cfg.window_title = None + state_obj = _main._LoopState() + + notifier_alerts: list = [] + audit_events: list = [] + + class _N: + def send(self, a): notifier_alerts.append(a) + class _A: + def log(self, e): audit_events.append(e) + class _S: + is_running = False + def start(self, s): pass + def stop(self): pass + + canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False)) + + fsm_left = _FSM() + fsm_right = _FSM() + ctx = _main.RunContext( + cfg=cfg, capture=lambda: np.zeros((200, 1800, 3), dtype=np.uint8), + canary=canary, detector=MagicMock(), fsm=fsm_left, + notifier=_N(), audit=_A(), detection_log=_A(), + scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path, + cmd_queue=MagicMock(), state=state_obj, + levels_extractor_factory=lambda *a, **kw: None, + lifecycle=None, + ) + det_l, det_r = _Det(), _Det() + ctx.charts = [ + _main.ChartState("left", initial[0], det_l, fsm_left), + _main.ChartState("right", initial[1], det_r, fsm_right), + ] + + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: pulsed) + + await _main._run_multi_tick(ctx) + + # Sub-ROI silently updated. + assert ctx.charts[0].sub_roi == pulsed[0] + assert ctx.charts[1].sub_roi == pulsed[1] + assert det_l.last_roi == pulsed[0] + assert det_r.last_roi == pulsed[1] + # FSM identity preserved (not rebuilt). + assert ctx.charts[0].fsm is fsm_left + assert ctx.charts[1].fsm is fsm_right + # Critical: no layout_change event, no status alert. + assert not any(e.get("event") == "layout_change" for e in audit_events) + assert not any("Layout TS schimbat" in a.title for a in notifier_alerts) + + +def test_commit_layout_change_alert_is_silent(monkeypatch): + """Layout-change alert on real count change uses silent=True so Telegram + doesn't ping the user. Re-anchor info still visible in chat history. + """ + import atm.main as _main + from atm.config import ROI + from atm.state_machine import StateMachine + + cfg = MagicMock() + cfg.lockout_s = 60 + cfg.colors = {} + cfg.debounce_depth = 3 + cfg.confidence_min = 0.5 + cfg.dot_roi = ROI(x=0, y=0, w=600, h=35) + + notifier_alerts: list = [] + audit_events: list = [] + + class _N: + def send(self, a): notifier_alerts.append(a) + class _A: + def log(self, e): audit_events.append(e) + class _S: + is_running = False + def stop(self): pass + + state_obj = _main._LoopState() + ctx = _main.RunContext( + cfg=cfg, capture=lambda: None, canary=MagicMock(), + detector=MagicMock(), fsm=StateMachine(60), + notifier=_N(), audit=_A(), detection_log=_A(), + scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), + cmd_queue=MagicMock(), state=state_obj, + levels_extractor_factory=lambda *a, **kw: None, + lifecycle=None, + ) + ctx.charts = [ + _main.ChartState("only", ROI(x=0, y=0, w=400, h=35), MagicMock(), StateMachine(60)), + ] + + new_strips = [ + ROI(x=0, y=0, w=200, h=35), + ROI(x=250, y=0, w=200, h=35), + ] + _main._commit_layout_change(ctx, new_strips, now=100.0) + + layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title] + assert len(layout_alerts) == 1 + assert layout_alerts[0].silent is True diff --git a/tests/test_notifier.py b/tests/test_notifier.py index 1d24518..6b18bcf 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -7,7 +7,7 @@ from pathlib import Path import pytest -from atm.notifier import Alert +from atm.notifier import Alert, _alert_prefix from atm.notifier.fanout import FanoutNotifier @@ -358,3 +358,32 @@ def test_fanout_on_drop_exception_swallowed(tmp_path: Path) -> None: s = fan.stats() # Some alerts still went through assert s["slow"]["sent"] > 0 or s["slow"]["dropped"] > 0 + + +# --------------------------------------------------------------------------- +# Alert.chart_id + _alert_prefix +# --------------------------------------------------------------------------- + +def test_alert_chart_id_default() -> None: + assert Alert(kind="arm", title="t", body="b").chart_id == "" + + +def test_alert_chart_id_set() -> None: + assert Alert(kind="arm", title="t", body="b", chart_id="left").chart_id == "left" + + +def test_alert_prefix_empty() -> None: + assert _alert_prefix("") == "" + + +def test_alert_prefix_left() -> None: + assert _alert_prefix("left") == "[stânga] " + + +def test_alert_prefix_right() -> None: + assert _alert_prefix("right") == "[dreapta] " + + +def test_alert_prefix_chart_n() -> None: + assert _alert_prefix("chart_0") == "[chart 1] " + assert _alert_prefix("chart_1") == "[chart 2] "