feat(multi-chart): refactor _run_multi_tick + fix alert spam pe oscilație strip

Bug critic: _strips_match(tol=10) trip pe pulsații naturale de lățime ~18px între
ticks (ex. 792↔810px). Fiecare trip → _commit_layout_change → reset FSM + alert
Telegram + scheduler stop. Logul 2026-05-04.jsonl arăta 576 evenimente
layout_change/zi, plus prime alerts repetate la dark_red/dark_green (FSM resetat
înghite lockout-ul) și sincronizare cross-chart pe ambele FSM-uri simultan.

Fix:
- main.py:1511 — gate doar pe count change (len(new) != len(current));
  count stabil → silent update sub_roi indiferent de jitter
- main.py:1438 — silent=True pe alert layout_change (Telegram fără sunet)
- 2 teste regresie noi: width oscillation 792↔810 + silent assertion
- 2 teste async reparate: bootstrap _detect_strips_for_ctx pentru ScriptedDetector
  (regresie după ce _run_multi_tick a devenit unica cale de detecție)

Plus refactor multi-chart pre-existent: layout.py modul nou, _detect_strips_for_ctx,
ChartState per-chart FSM/Detector, ROI per-strip pe screenshots, scripts/diag_*.

Verificat: 292 passed, 2 skipped în 10s.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-05-05 17:59:18 +03:00
parent 8a1be979fe
commit c950a5a699
14 changed files with 1952 additions and 255 deletions

14
.gitignore vendored
View File

@@ -74,3 +74,17 @@ calibrate_capture_*.png
# Debug captures
debug_*.png
logs/*.png
# Test/dev scratch output
pytest-*.log
pytest-*.err
*.log
*.err
# Auto-captured calibration frames (use `git add -f` to commit selected ones)
calibration/frames/*.png
# Misc clutter
*.jpeg
*.jpg
!samples/*.jpg

View File

@@ -2,9 +2,9 @@ window_title = "m2d"
[dot_roi]
x = 0
y = 712
y = 720
w = 1796
h = 35
h = 40
[chart_roi]
x = 17
@@ -53,7 +53,7 @@ p2_y = 664
p2_price = 483.2
[canary]
baseline_phash = "fbe145390c1abec23204017757a326b8e37077288ef79947310a89c70e07ffff"
baseline_phash = "c11f4a852ec09f3a8de4e4cf4ad76d84f10b19d3e708663c38f5b538877c6624"
drift_threshold = 8
[canary.roi]

View File

@@ -0,0 +1,133 @@
r"""Diagnose why detect_strips finds 1 strip when there are 2 TS windows.
Reuses the most recent raw capture under logs/repro/. For each strip detected,
prints the connected-components vivid mask, gaps, and the contiguous run lengths
so we can see exactly where the threshold is killing the second strip.
"""
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import cv2 # noqa: E402
import numpy as np # noqa: E402
from atm.config import Config # noqa: E402
from atm.layout import VIVID_COLORS, detect_strips # noqa: E402
from atm.vision import crop_roi, find_top_dots, classify_pixel, pixel_rgb # noqa: E402
def _latest_raw() -> Path:
candidates = sorted((ROOT / "logs" / "repro").glob("*_raw.png"))
if not candidates:
raise SystemExit("No *_raw.png in logs/repro — run scripts/repro_ss_resume.py first.")
return candidates[-1]
def main() -> int:
cfg = Config.load_current(ROOT / "configs")
raw_path = _latest_raw()
print(f"Using raw frame: {raw_path}")
frame = cv2.imread(str(raw_path), cv2.IMREAD_COLOR)
if frame is None:
raise SystemExit(f"Could not read {raw_path}")
palette = {
name: (spec.rgb, spec.tolerance)
for name, spec in cfg.colors.items()
if name != "background"
}
full_crop = crop_roi(frame, cfg.dot_roi)
h, w = full_crop.shape[:2]
print(f"dot_roi crop shape: {h}x{w} (cfg.dot_roi={cfg.dot_roi})")
# 1) Build the vivid mask used by detect_strips
mask = np.zeros((h, w), dtype=np.uint8)
img_f = full_crop.astype(np.float32)
per_color_pixels = {}
for name in VIVID_COLORS:
if name not in palette:
continue
rgb, tol = palette[name]
bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32)
diff = np.linalg.norm(img_f - bgr, axis=2)
m = (diff < tol).astype(np.uint8)
per_color_pixels[name] = int(m.sum())
mask |= m
print("\nVIVID_COLORS pixel counts in dot_roi crop (pre-morphology):")
for n, c in per_color_pixels.items():
print(f" {n:12s} {c:>7d} px")
print(f" total mask: {int(mask.sum()):>7d} px")
# 2) Column projection: which x columns have any vivid pixel?
col_has = mask.any(axis=0).astype(np.uint8)
runs = []
i = 0
while i < w:
if col_has[i] == 0:
i += 1
continue
j = i
while j < w and col_has[j] == 1:
j += 1
runs.append((i, j - 1, j - i))
i = j
print(f"\nContiguous vivid-column runs (raw, no morphology): {len(runs)}")
for x0, x1, run_w in runs[:25]:
print(f" x={x0:>4d}..{x1:>4d} width={run_w}")
if len(runs) > 25:
print(f" ... +{len(runs) - 25} more")
# 3) Apply same morphology + connected components as detect_strips
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
kw = max(3, min_gap_px // 2)
print(f"\ndetect_strips params: min_strip_px={min_strip_px} min_gap_px={min_gap_px} morphology kw={kw}")
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
print(f"\nConnected components after CLOSE (kw={kw}): n={n_labels - 1} (excluding bg)")
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
y = int(stats[i, cv2.CC_STAT_TOP])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
hh = int(stats[i, cv2.CC_STAT_HEIGHT])
passes = "PASS" if ww >= min_strip_px else "drop"
print(f" [{i:>2d}] x={x:>4d} y={y:>3d} w={ww:>4d} h={hh:>3d} -> {passes}")
strips = detect_strips(full_crop, palette, min_gap_px, min_strip_px)
print(f"\ndetect_strips() final result: {len(strips)} strip(s)")
for i, s in enumerate(strips):
print(f" strip[{i}] x={s.x} y={s.y} w={s.w} h={s.h}")
# 4) Run find_top_dots on the FULL dot_roi (single-chart fallback path used
# by /ss when ctx.charts < 2). This is what users see when the layout
# detector hasn't promoted the layout to multi-chart yet.
bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18)
bg_tol = cfg.colors["background"].tolerance if "background" in cfg.colors else 15.0
full_dots = find_top_dots(full_crop, bg_rgb, bg_tol, n=3)
print(f"\nfind_top_dots on FULL dot_roi (n=3, single-chart fallback path):")
for i, (cx, cy) in enumerate(full_dots):
rgb = pixel_rgb(full_crop, cx, cy)
m = classify_pixel(rgb, palette)
print(f" c{i + 1}: pos=({cx + cfg.dot_roi.x},{cy + cfg.dot_roi.y}) rgb={rgb} -> {m.name} d={m.distance:.1f}")
# 5) Save the binary mask + closed mask for visual inspection
out_dir = ROOT / "logs" / "repro"
cv2.imwrite(str(out_dir / "diag_mask_raw.png"), (mask * 255).astype(np.uint8))
cv2.imwrite(str(out_dir / "diag_mask_closed.png"), (closed * 255).astype(np.uint8))
print(f"\nWrote diag_mask_raw.png + diag_mask_closed.png to {out_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

162
scripts/diag_strip_fixes.py Normal file
View File

@@ -0,0 +1,162 @@
r"""Verify the two proposed fixes for detect_strips on the latest 2-window capture.
Fix A: include gray in VIVID_COLORS mask (cheapest change).
Fix B: use non-background mask (any pixel where diff(bg) > bg_tol).
Reuses the most recent raw capture in logs/repro/.
"""
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import cv2 # noqa: E402
import numpy as np # noqa: E402
from atm.config import Config, ROI # noqa: E402
from atm.vision import crop_roi # noqa: E402
def _detect_strips_with_palette(
full_dot_crop, palette, color_names, min_gap_px, min_strip_px,
):
"""Same body as layout.detect_strips but with selectable color set."""
h, w = full_dot_crop.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
img_f = full_dot_crop.astype(np.float32)
for name in color_names:
if name not in palette:
continue
rgb, tol = palette[name]
bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32)
diff = np.linalg.norm(img_f - bgr, axis=2)
mask |= (diff < tol).astype(np.uint8)
kw = max(3, min_gap_px // 2)
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
out = []
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
if ww < min_strip_px:
continue
out.append(ROI(x=x, y=0, w=ww, h=h))
out.sort(key=lambda r: r.x)
return out, mask, closed
def _detect_strips_non_bg(full_dot_crop, bg_rgb, bg_tol, min_gap_px, min_strip_px):
"""Fix B: any pixel different from background → strip mask."""
h, w = full_dot_crop.shape[:2]
bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32)
diff = np.linalg.norm(full_dot_crop.astype(np.float32) - bgr_bg, axis=2)
mask = (diff > bg_tol).astype(np.uint8)
kw = max(3, min_gap_px // 2)
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
out = []
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
if ww < min_strip_px:
continue
out.append(ROI(x=x, y=0, w=ww, h=h))
out.sort(key=lambda r: r.x)
return out, mask, closed
def _annotate(frame, strips_abs, label, out_dir):
annotated = frame.copy()
for r in strips_abs:
cv2.rectangle(annotated, (r.x, r.y), (r.x + r.w, r.y + r.h), (0, 255, 255), 2)
p = out_dir / f"diag_fix_{label}.png"
cv2.imwrite(str(p), annotated)
return p
def main() -> int:
cfg = Config.load_current(ROOT / "configs")
raws = sorted(p for p in (ROOT / "logs" / "repro").glob("*_raw.png") if p.name[0].isdigit())
if not raws:
raise SystemExit("Run scripts/repro_ss_resume.py first.")
raw_path = raws[-1]
print(f"Using: {raw_path}")
frame = cv2.imread(str(raw_path), cv2.IMREAD_COLOR)
palette = {n: (s.rgb, s.tolerance) for n, s in cfg.colors.items() if n != "background"}
bg_rgb = cfg.colors["background"].rgb
bg_tol = cfg.colors["background"].tolerance
full_crop = crop_roi(frame, cfg.dot_roi)
h, w = full_crop.shape[:2]
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
print(f"params: min_strip_px={min_strip_px} min_gap_px={min_gap_px} crop={w}x{h}")
out_dir = ROOT / "logs" / "repro"
# Baseline (current code: vivid only, no gray)
vivid_only = ("turquoise", "yellow", "dark_green", "dark_red", "light_green", "light_red")
s0, m0, c0 = _detect_strips_with_palette(full_crop, palette, vivid_only, min_gap_px, min_strip_px)
print(f"\n[BASELINE vivid-only ] strips={len(s0)}")
for i, r in enumerate(s0):
print(f" [{i}] x={r.x:>4d} w={r.w:>4d}")
cv2.imwrite(str(out_dir / "diag_fix_BASELINE_mask_closed.png"), (c0 * 255).astype(np.uint8))
_annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in s0],
"BASELINE", out_dir)
# Fix A: include gray
fixA_palette = vivid_only + ("gray",)
sA, mA, cA = _detect_strips_with_palette(full_crop, palette, fixA_palette, min_gap_px, min_strip_px)
print(f"\n[FIX A vivid+gray ] strips={len(sA)}")
for i, r in enumerate(sA):
print(f" [{i}] x={r.x:>4d} w={r.w:>4d}")
cv2.imwrite(str(out_dir / "diag_fix_A_mask_closed.png"), (cA * 255).astype(np.uint8))
_annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in sA],
"A_vivid_plus_gray", out_dir)
# Fix B: any non-background
sB, mB, cB = _detect_strips_non_bg(full_crop, bg_rgb, bg_tol, min_gap_px, min_strip_px)
print(f"\n[FIX B non-bg ] strips={len(sB)}")
for i, r in enumerate(sB):
print(f" [{i}] x={r.x:>4d} w={r.w:>4d}")
cv2.imwrite(str(out_dir / "diag_fix_B_mask_closed.png"), (cB * 255).astype(np.uint8))
_annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in sB],
"B_non_background", out_dir)
# Sanity: where is the divider between the two TS windows?
# Project non-bg mask onto x; long zero-runs reveal the gap.
bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32)
diff = np.linalg.norm(full_crop.astype(np.float32) - bgr_bg, axis=2)
nonbg = (diff > bg_tol).astype(np.uint8)
col_any = nonbg.any(axis=0).astype(np.uint8)
# Find longest 0-run
longest = (0, 0, 0) # (length, x0, x1)
i = 0
while i < w:
if col_any[i] == 1:
i += 1
continue
j = i
while j < w and col_any[j] == 0:
j += 1
run = j - i
if run > longest[0]:
longest = (run, i, j - 1)
i = j
print(f"\nLongest empty (background-only) horizontal stretch in dot_roi: "
f"{longest[0]}px at x={longest[1]}..{longest[2]} "
f"(this is where the window divider sits)")
print(f"\nWrote diag_fix_*.png to {out_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,124 @@
r"""Inspect ATM strip pixels without relying on shell pipelines.
Usage:
.\.venv\Scripts\python.exe scripts\inspect_image_pixels.py 6033117943853423831.jpg
.\.venv\Scripts\python.exe scripts\inspect_image_pixels.py image.jpg --point 1780 725
The script intentionally parses only the config fields needed for pixel inspection,
so it does not require Discord/Telegram secrets to be valid.
"""
from __future__ import annotations
import argparse
import json
import sys
import tomllib
from pathlib import Path
import cv2
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from atm.config import ROI # noqa: E402
from atm.vision import classify_pixel, crop_roi, find_top_dots, pixel_rgb # noqa: E402
def _load_probe_config(path: Path) -> dict:
data = tomllib.loads(path.read_text(encoding="utf-8"))
colors = {
name: (tuple(int(c) for c in spec["rgb"]), float(spec["tolerance"]))
for name, spec in data["colors"].items()
}
background = colors.pop("background", ((18, 18, 18), 15.0))
return {
"dot_roi": ROI(**data["dot_roi"]),
"colors": colors,
"background_rgb": background[0],
"background_tol": background[1],
}
def _as_jsonable_match(match) -> dict:
return {
"name": match.name,
"distance": round(float(match.distance), 3),
"confidence": round(float(match.confidence), 3),
}
def main() -> int:
parser = argparse.ArgumentParser(description="Inspect ATM strip pixels in a JPG/PNG frame.")
parser.add_argument("image", type=Path, help="Frame image path.")
parser.add_argument(
"--config",
type=Path,
default=ROOT / "configs" / "2026-04-21-recalib.toml",
help="ATM TOML config path.",
)
parser.add_argument("--top", type=int, default=3, help="Number of rightmost dots to report.")
parser.add_argument(
"--point",
nargs=2,
type=int,
metavar=("X", "Y"),
help="Optional absolute pixel coordinate to sample.",
)
parser.add_argument("--box", type=int, default=3, help="Sampling radius for mean RGB.")
args = parser.parse_args()
frame = cv2.imread(str(args.image), cv2.IMREAD_COLOR)
if frame is None:
raise SystemExit(f"Could not read image: {args.image}")
probe = _load_probe_config(args.config)
roi = probe["dot_roi"]
roi_img = crop_roi(frame, roi)
dots = find_top_dots(
roi_img,
bg_rgb=probe["background_rgb"],
bg_tol=probe["background_tol"],
n=args.top,
)
result = {
"image": str(args.image),
"image_size": {"w": int(frame.shape[1]), "h": int(frame.shape[0])},
"config": str(args.config),
"dot_roi": {"x": roi.x, "y": roi.y, "w": roi.w, "h": roi.h},
"dots": [],
}
for x, y in dots:
rgb = pixel_rgb(roi_img, x, y, box=args.box)
match = classify_pixel(rgb, probe["colors"])
result["dots"].append(
{
"roi_xy": [int(x), int(y)],
"abs_xy": [int(roi.x + x), int(roi.y + y)],
"rgb": list(rgb),
"match": _as_jsonable_match(match),
}
)
if args.point:
px, py = args.point
if not (roi.x <= px < roi.x + roi.w and roi.y <= py < roi.y + roi.h):
raise SystemExit(f"Point {px},{py} is outside dot_roi")
rx, ry = px - roi.x, py - roi.y
rgb = pixel_rgb(roi_img, rx, ry, box=args.box)
result["point"] = {
"roi_xy": [rx, ry],
"abs_xy": [px, py],
"rgb": list(rgb),
"match": _as_jsonable_match(classify_pixel(rgb, probe["colors"])),
}
print(json.dumps(result, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

173
scripts/repro_ss_resume.py Normal file
View File

@@ -0,0 +1,173 @@
r"""Reproduce /ss and /resume Telegram screenshot pipelines for offline inspection.
Brings the `m2d` window to front (Win32 trick — same as live loop), captures via mss
using the active config's chart_window_region, then runs the EXACT same annotators
that /ss and /resume use:
- _save_inspect_frame → /ss path (top-3 dots per detected strip + caption)
- _save_annotated_frame → /resume path (cyan rect on dot_roi/sub_roi)
Outputs are saved under logs/repro/ alongside a JSON summary of detections.
Usage:
.\.venv\Scripts\python.exe scripts\repro_ss_resume.py
.\.venv\Scripts\python.exe scripts\repro_ss_resume.py --no-focus
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import cv2 # noqa: E402
import numpy as np # noqa: E402
from atm.config import Config # noqa: E402
from atm.layout import detect_strips # noqa: E402
from atm.main import ( # noqa: E402
_focus_window_by_title,
_format_inspect_caption,
_save_annotated_frame,
_save_inspect_frame,
)
from atm.vision import crop_roi # noqa: E402
def _capture_via_region(cfg) -> np.ndarray | None:
import mss # type: ignore[import-untyped]
reg = cfg.chart_window_region
if reg is None:
# Fallback: grab full primary monitor
with mss.mss() as sct:
mon = sct.monitors[1]
img = sct.grab(mon)
return cv2.cvtColor(np.array(img), cv2.COLOR_BGRA2BGR)
with mss.mss() as sct:
mon = {"top": reg.y, "left": reg.x, "width": reg.w, "height": reg.h}
img = sct.grab(mon)
return cv2.cvtColor(np.array(img), cv2.COLOR_BGRA2BGR)
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--no-focus", action="store_true", help="Skip Win32 focus call.")
p.add_argument("--delay", type=float, default=0.5, help="Seconds to sleep after focus.")
p.add_argument(
"--out",
type=Path,
default=ROOT / "logs" / "repro",
help="Output directory for annotated frames + JSON.",
)
args = p.parse_args()
cfg = Config.load_current(ROOT / "configs")
args.out.mkdir(parents=True, exist_ok=True)
# 1) Focus
if not args.no_focus and cfg.window_title:
title = _focus_window_by_title(cfg.window_title)
print(f"[focus] needle={cfg.window_title!r} -> {title!r}")
if args.delay > 0:
time.sleep(args.delay)
# 2) Capture
frame = _capture_via_region(cfg)
if frame is None:
print("[capture] FAILED — no frame")
return 1
print(f"[capture] frame shape={frame.shape}")
now = time.time()
ts_str = time.strftime("%Y%m%d_%H%M%S")
# Save raw too so we can hand-inspect what mss actually grabbed
raw_path = args.out / f"{ts_str}_raw.png"
cv2.imwrite(str(raw_path), frame)
print(f"[raw] {raw_path}")
# 3) Detect strips (same logic as live multi-chart split)
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
palette = {
name: (spec.rgb, spec.tolerance)
for name, spec in cfg.colors.items()
if name != "background"
}
full_dot_crop = crop_roi(frame, cfg.dot_roi)
raw_strips = detect_strips(full_dot_crop, palette, min_gap_px, min_strip_px)
# Translate back to absolute frame coords
from atm.config import ROI # noqa: E402
strips = [
ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y + r.y, w=r.w, h=r.h)
for r in raw_strips
]
print(f"[strips] dot_roi={cfg.dot_roi} detected={len(strips)} strips")
for i, s in enumerate(strips):
print(f" strip[{i}] x={s.x} y={s.y} w={s.w} h={s.h}")
# Also dump a copy of the full dot_roi crop for visual sanity-check
crop_path = args.out / f"{ts_str}_dot_roi_crop.png"
cv2.imwrite(str(crop_path), full_dot_crop)
print(f"[crop] {crop_path}")
# 4) /ss inspect-annotate (top-3 per strip, FSM-pick markers)
inspect_path, detections = _save_inspect_frame(
frame, cfg, args.out, now, audit=None,
strips=strips if strips else None,
)
caption = _format_inspect_caption(detections, cfg)
print(f"[ss] {inspect_path}")
print(f"[ss] caption:\n{caption}")
# 5) /resume annotate (cyan rect on dot_roi or first strip)
roi_for_resume = strips[0] if strips else cfg.dot_roi
resume_path = _save_annotated_frame(
frame, cfg, args.out, "resume_repro", now, audit=None, roi=roi_for_resume,
)
print(f"[resume] {resume_path}")
# 6) JSON summary
summary = {
"ts": ts_str,
"frame_shape": list(frame.shape),
"dot_roi": {"x": cfg.dot_roi.x, "y": cfg.dot_roi.y, "w": cfg.dot_roi.w, "h": cfg.dot_roi.h},
"strips": [
{"x": s.x, "y": s.y, "w": s.w, "h": s.h} for s in strips
],
"detections": [
{
"strip_idx": d["strip_idx"],
"idx": d["idx"],
"name": d["name"],
"rgb": list(d["rgb"]),
"distance": round(float(d["distance"]), 3),
"confidence": round(float(d["confidence"]), 3),
"pos_abs": list(d["pos_abs"]),
}
for d in detections
],
"files": {
"raw": str(raw_path),
"dot_roi_crop": str(crop_path),
"inspect": str(inspect_path) if inspect_path else None,
"resume": str(resume_path) if resume_path else None,
},
"ss_caption": caption,
}
json_path = args.out / f"{ts_str}_summary.json"
json_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
print(f"[json] {json_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -7,7 +7,7 @@ from typing import Callable
import numpy as np
from .config import Config
from .config import Config, ROI
from .vision import (
ColorMatch,
classify_pixel,
@@ -40,9 +40,11 @@ class Detector:
capture: ScreenCapture,
bg_rgb: tuple[int, int, int] | None = None,
bg_tol: float | None = None,
dot_roi_override: ROI | None = None,
) -> None:
self._cfg = cfg
self._capture = capture
self._dot_roi = dot_roi_override if dot_roi_override is not None else cfg.dot_roi
# Prefer config-defined background; fall back to dark-grey default.
if "background" in cfg.colors:
spec = cfg.colors["background"]
@@ -84,7 +86,7 @@ class Detector:
self._rolling.append(r)
return r
roi_img = crop_roi(frame, self._cfg.dot_roi)
roi_img = crop_roi(frame, self._dot_roi)
dot_pos = find_rightmost_dot(roi_img, self._bg_rgb, self._bg_tol)
if dot_pos is None:
@@ -124,11 +126,14 @@ class Detector:
match=match,
accepted=accepted,
color=color,
dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y),
dot_pos_abs=(self._dot_roi.x + x, self._dot_roi.y + y),
)
self._rolling.append(r)
return r
def update_dot_roi(self, roi: ROI) -> None:
self._dot_roi = roi
@property
def rolling(self) -> list[DetectionResult]:
return list(self._rolling)

48
src/atm/layout.py Normal file
View File

@@ -0,0 +1,48 @@
import cv2
import numpy as np
from .config import ROI
VIVID_COLORS = ("turquoise", "yellow", "dark_green", "dark_red", "light_green", "light_red", "gray")
def detect_strips(
full_dot_crop: np.ndarray,
palette: dict[str, tuple[tuple[int, int, int], float]],
min_gap_px: int,
min_strip_px: int,
) -> list[ROI]:
"""Return list of sub-ROIs (relative to full_dot_crop) sorted left-to-right.
Empty list if no vivid pixels found."""
h, w = full_dot_crop.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
img_f = full_dot_crop.astype(np.float32)
for name in VIVID_COLORS:
if name not in palette:
continue
rgb, tol = palette[name]
bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32)
diff = np.linalg.norm(img_f - bgr, axis=2)
mask |= (diff < tol).astype(np.uint8)
kw = max(3, min_gap_px // 2)
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
strips: list[tuple[int, int]] = []
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
if ww < min_strip_px:
continue
strips.append((x, x + ww))
strips.sort()
return [ROI(x=xs, y=0, w=xe - xs, h=h) for (xs, xe) in strips]
def _strips_match(a: list[ROI], b: list[ROI], tol: int = 10) -> bool:
if len(a) != len(b):
return False
return all(
abs(ra.x - rb.x) <= tol and abs((ra.x + ra.w) - (rb.x + rb.w)) <= tol
for ra, rb in zip(a, b)
)

View File

@@ -7,7 +7,7 @@ import contextlib
import os
import sys
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, tzinfo
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
@@ -572,11 +572,10 @@ def _build_resume_info_text(was_drift: bool, skip_now: str | None) -> "tuple[str
return ("Monitorizare reluată", "")
def _draw_roi_cyan(annotated, cfg) -> None:
"""Draw the dot_roi cyan rectangle onto annotated. Shared by annotate helpers."""
def _draw_roi_cyan(annotated, roi) -> None:
"""Draw the given ROI as a cyan rectangle onto annotated. Shared by annotate helpers."""
import cv2 # type: ignore[import-untyped]
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
cv2.rectangle(annotated, (roi.x, roi.y), (roi.x + roi.w, roi.y + roi.h), (0, 255, 255), 2)
_BASELINE_PHASH_LINE = __import__("re").compile(
@@ -644,13 +643,14 @@ def _save_inspect_frame(
fires_dir: Path,
now: float,
audit: "_AuditLike | None" = None,
strips: "list | None" = None,
) -> "tuple[Path | None, list[dict]]":
"""Annotate frame with top-3 rightmost dots + classifications. Returns (path, detections).
FSM pick (rightmost, idx 0) → thick red circle. Neighbors (idx 1, 2) → thin circle
in the classified color (BGR derived from cfg.colors[name].rgb at runtime, UNKNOWN
→ gray). Labels `{name} d={distance}` next to each circle. Price overlay for
rightmost dot (same as _save_annotated_frame). Fail-safe: any error → (None, []).
Per-strip detection when ``strips`` is provided (multi-chart split). Each strip
runs its own find_top_dots; circles are offset by the strip origin. Detections
carry a ``strip_idx`` so callers can distinguish which chart they came from.
Single-chart fallback uses ``[cfg.dot_roi]``.
"""
try:
import cv2 # type: ignore[import-untyped]
@@ -666,29 +666,40 @@ def _save_inspect_frame(
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_inspect.png"
annotated = frame.copy()
_draw_roi_cyan(annotated, cfg)
bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18)
dot_crop = crop_roi(frame, cfg.dot_roi)
positions = find_top_dots(dot_crop, bg_rgb, n=3)
palette = {k: (v.rgb, v.tolerance) for k, v in cfg.colors.items() if k != "background"}
strip_list = strips if strips else [cfg.dot_roi]
detections: list[dict] = []
for idx, (cx, cy) in enumerate(positions):
for strip_idx, strip in enumerate(strip_list):
_draw_roi_cyan(annotated, strip)
dot_crop = crop_roi(frame, strip)
# Cer N+spare buline ca să avem rezerve după filtrarea UNKNOWN-urilor
# (text-artefacte / patch-uri întunecate adiacente bulinei rightmost).
positions = find_top_dots(dot_crop, bg_rgb, n=8)
kept = 0
for cx, cy in positions:
if kept >= 3:
break
rgb = pixel_rgb(dot_crop, cx, cy)
match = classify_pixel(rgb, palette)
pos_abs = (cfg.dot_roi.x + cx, cfg.dot_roi.y + cy)
if match.name == "UNKNOWN":
continue
pos_abs = (strip.x + cx, strip.y + cy)
detections.append({
"idx": idx, "name": match.name, "rgb": rgb,
"idx": kept, "name": match.name, "rgb": rgb,
"distance": match.distance, "confidence": match.confidence,
"pos_abs": pos_abs,
"pos_abs": pos_abs, "strip_idx": strip_idx,
})
kept += 1
# Markerii stau pe un rând SUB ROI, aliniat vertical cu bulina prin x.
# Toate cercurile au aceeași rază (r=7), pline, în culoarea clasificată.
# Primary (FSM pick, rightmost) = tick vertical roșu care leagă markerul de bulina.
marker_y = cfg.dot_roi.y + cfg.dot_roi.h + 8
for det in reversed(detections): # neighbors first, primary last on top
# Primary (FSM pick, rightmost) = tick vertical roșu peste markeri.
marker_y = strip.y + strip.h + 8
strip_dets = [d for d in detections if d["strip_idx"] == strip_idx]
for det in reversed(strip_dets):
pos_abs = det["pos_abs"]
name = det["name"]
if name in cfg.colors:
@@ -701,24 +712,6 @@ def _save_inspect_frame(
if det["idx"] == 0:
cv2.line(annotated, (pos_abs[0], marker_y - 7), (pos_abs[0], pos_abs[1] + 4), (0, 0, 255), 1)
# Price overlay on rightmost (reuse same formula as _save_annotated_frame)
if detections and hasattr(cfg, "y_axis"):
try:
dot_y = detections[0]["pos_abs"][1]
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass
cv2.imwrite(str(fpath), annotated)
return fpath, detections
except Exception as exc:
@@ -743,22 +736,52 @@ _COLOR_EMOJI = {
def _format_inspect_caption(detections: list[dict], cfg) -> str:
"""Compact caption: leftmost = c1, rightmost = cN (FSM pick).
Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption —
emoji-ul e cel mai aproape de „textul în aceeași culoare ca bulina").
Single-strip: c1..cN flat list. Multi-strip: per-strip blocks cu header
("stânga"/"dreapta" pt 2 chart-uri, "chart N" pt 3+) și pick separat per strip.
Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption).
"""
if not detections:
return "nicio bulină detectată în ROI"
# detections ordered idx=0 (rightmost) → idx=N-1 (leftmost)
# Display left-to-right so user reads the chart order natural.
ordered = list(reversed(detections))
total = len(ordered)
lines: list[str] = []
for display_idx, det in enumerate(ordered, start=1):
# Group by strip_idx. Old detections (pre multi-chart) lack the field —
# treat as single anonymous group.
groups: dict[int, list[dict]] = {}
for det in detections:
groups.setdefault(det.get("strip_idx", 0), []).append(det)
n_strips = len(groups)
def _line(display_idx: int, det: dict, is_pick: bool) -> str:
name = det["name"]
emoji = _COLOR_EMOJI.get(name, "")
suffix = " ← pick" if display_idx == total else ""
lines.append(f"{emoji} c{display_idx}: {name}{suffix}")
return "\n".join(lines)
suffix = " ← pick" if is_pick else ""
return f"{emoji} c{display_idx}: {name}{suffix}"
def _strip_header(strip_idx: int) -> str:
if n_strips == 2:
return "stânga" if strip_idx == 0 else "dreapta"
return f"chart {strip_idx + 1}"
if n_strips <= 1:
# Single-strip path preserved verbatim — leftmost=c1, rightmost=cN=pick.
only = next(iter(groups.values()))
ordered = list(reversed(only))
total = len(ordered)
return "\n".join(
_line(i + 1, d, is_pick=(i + 1 == total))
for i, d in enumerate(ordered)
)
# Multi-strip: one block per strip (sorted by strip_idx ascending).
blocks: list[str] = []
for sidx in sorted(groups):
dets = groups[sidx]
ordered = list(reversed(dets)) # leftmost first
total = len(ordered)
lines = [f"[{_strip_header(sidx)}]"]
for i, det in enumerate(ordered):
lines.append(_line(i + 1, det, is_pick=(i + 1 == total)))
blocks.append("\n".join(lines))
return "\n".join(blocks)
def _save_annotated_frame(
@@ -770,20 +793,19 @@ def _save_annotated_frame(
audit: _AuditLike | None = None,
dot_pos_abs: "tuple[int, int] | None" = None,
canary_ok: bool = True,
roi: "Any | None" = None,
) -> "Path | None":
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``.
"""Save BGR frame with cyan ROI rect to ``logs/fires/{ts}_{label}.png``.
Returns the path on success, ``None`` on any error. Failures are logged to
audit (when provided) so disk-full / permission issues don't become silent
regressions. Never raises — snapshot is a best-effort enhancement, the
text alert must still go out.
``roi`` overrides ``cfg.dot_roi`` for the rectangle (used by multi-chart split
so per-strip alerts highlight the correct chart). Falls back to cfg.dot_roi
when None.
dot_pos_abs + canary_ok: when both are set the price overlay is drawn
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted
since calibration may be stale.
Returns the path on success, ``None`` on any error. Never raises — snapshot
is best-effort, the text alert must still go out.
"""
try:
import cv2 # type: ignore[import-untyped]
import cv2 # type: ignore[import-untyped] # noqa: F401
except ImportError as exc:
if audit is not None:
try:
@@ -792,26 +814,12 @@ def _save_annotated_frame(
pass
return None
try:
import cv2 # type: ignore[import-untyped]
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_{label}.png"
annotated = frame.copy()
_draw_roi_cyan(annotated, cfg)
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"):
try:
_, dot_y = dot_pos_abs
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass # price overlay is best-effort; never break the screenshot
roi_to_draw = roi if roi is not None else cfg.dot_roi
_draw_roi_cyan(annotated, roi_to_draw)
cv2.imwrite(str(fpath), annotated)
return fpath
except Exception as exc:
@@ -829,15 +837,27 @@ def _build_heartbeat_alert(
fire_count: int,
uptime_h: float,
canary_paused: bool,
charts: "list | None" = None,
) -> "Alert":
"""Construct the periodic heartbeat Alert.
When canary is drift-paused the title/body reflect it explicitly —
2026-04-21 bug: previously the heartbeat said "activ ARMED_SELL" while
detection had been dead for hours, misleading the user into thinking
the system was running.
Single-chart compat: when ``charts`` is None or has length <=1, body is
``"{state} | semnale: N | Hh"`` (drift-pause appended). Multi-chart split:
one line per chart with ``[chart_id] STATE`` plus a tail signals/uptime line.
"""
title = "⚠️ pauzat (drift)" if canary_paused else "activ"
if charts and len(charts) >= 2:
from atm.notifier import _alert_prefix as _prefix
chart_lines = []
for c in charts:
label = _prefix(c.chart_id).strip()
state_value = getattr(getattr(c.fsm, "state", None), "value", "")
chart_lines.append(f"{label} {state_value}".strip())
tail = f"| semnale: {fire_count} | {uptime_h:.1f}h"
if canary_paused:
tail = f"| semnale: {fire_count} | {uptime_h:.1f}h [drift-pause]"
body = "\n".join(chart_lines) + "\n" + tail
return Alert(kind="heartbeat", title=title, body=body)
state_label = f"{fsm_state} [drift-pause]" if canary_paused else fsm_state
body = f"{state_label} | semnale: {fire_count} | {uptime_h:.1f}h"
return Alert(kind="heartbeat", title=title, body=body)
@@ -1048,6 +1068,23 @@ class _TickSyncResult:
first_consumed: bool = False
late_start: bool = False
new_color: str | None = None # corpus sample color when changed
chart_id: str = "" # multi-chart split: which chart produced this result
@dataclass
class ChartState:
"""Per-chart split-workspace state (FSM + Detector + sub-ROI).
chart_id="" → single-chart mode (no Alert prefix). "left"/"right" for n=2.
"chart_0", "chart_1", ... for n>2. Each chart has its own debounce + FSM
independent of siblings.
"""
chart_id: str
sub_roi: Any
detector: Any
fsm: Any
first_accepted: bool = True
last_saved_color: "str | None" = None
@dataclass
@@ -1076,6 +1113,7 @@ class RunContext:
# Pending /rebase proposal: (proposed_ts, new_phash, config_path) or None.
# One slot; a second /rebase overwrites. `/rebase confirm` applies if within TTL.
pending_rebase: Any = None
charts: list = field(default_factory=list) # list[ChartState] — multi-chart split
@dataclass
@@ -1086,6 +1124,7 @@ class _LoopState:
levels_extractor: Any = None
fire_count: int = 0
start: float = 0.0
n_primed_global: int = 0
@dataclass
@@ -1210,21 +1249,32 @@ def _sync_detection_tick(
last_saved_color: "str | None",
now: float,
samples_dir: Path,
chart_id: str = "",
sub_roi: Any = None,
frame: Any = None,
) -> _TickSyncResult:
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread."""
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread.
When ``frame`` is provided, capture/canary work is skipped (caller already
did them — multi-chart orchestrator). ``chart_id`` is propagated to all
Alerts emitted by this tick. ``sub_roi`` overrides cfg.dot_roi for the
cyan rect on saved screenshots (per-chart ROI).
"""
if frame is None:
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
return _TickSyncResult()
return _TickSyncResult(chart_id=chart_id)
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
return _TickSyncResult(frame=frame)
return _TickSyncResult(frame=frame, chart_id=chart_id)
res = detector.step(now, frame)
detection_log.log({
"ts": now, "event": "frame",
"chart_id": chart_id,
"window_found": res.window_found,
"dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None,
@@ -1236,9 +1286,10 @@ def _sync_detection_tick(
})
if not (res.accepted and res.color):
return _TickSyncResult(frame=frame, res=res)
return _TickSyncResult(frame=frame, res=res, chart_id=chart_id)
is_first = first_accepted
roi_for_draw = sub_roi if sub_roi is not None else cfg.dot_roi
def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True):
@@ -1246,18 +1297,29 @@ def _sync_detection_tick(
return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
canary_ok=True, roi=roi_for_draw,
)
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot, cfg=cfg)
# Wrap notifier so every Alert from _handle_tick carries chart_id.
if chart_id:
class _ChartNotifier:
def send(self, alert: Alert) -> None:
alert.chart_id = chart_id
notifier.send(alert)
wrapped_notifier: _NotifierLike = _ChartNotifier()
else:
wrapped_notifier = notifier
tr = _handle_tick(fsm, res.color, now, wrapped_notifier, audit, is_first, snapshot=_snapshot, cfg=cfg)
if tr is None:
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True)
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True, chart_id=chart_id)
new_color: str | None = None
if res.color != last_saved_color:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png"
suffix = f"_{chart_id}" if chart_id else ""
sample_path = samples_dir / f"{ts_str}_{res.color}{suffix}.png"
try:
import cv2 # type: ignore[import-untyped]
cv2.imwrite(str(sample_path), frame)
@@ -1271,7 +1333,7 @@ def _sync_detection_tick(
fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True,
canary_ok=True, roi=roi_for_draw,
)
notifier.send(Alert(
kind="trigger",
@@ -1279,17 +1341,19 @@ def _sync_detection_tick(
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=fire_path,
direction=tr.trigger,
chart_id=chart_id,
))
return _TickSyncResult(
frame=frame, res=res, tr=tr,
first_consumed=is_first, new_color=new_color,
first_consumed=is_first, new_color=new_color, chart_id=chart_id,
)
def _brief_status(ctx) -> str:
h = (time.monotonic() - ctx.state.start) / 3600
return f"{ctx.fsm.state.value} | semnale: {ctx.state.fire_count} | {h:.1f}h"
fsm_state = ctx.charts[0].fsm.state.value if ctx.charts else ctx.fsm.state.value
return f"{fsm_state} | semnale: {ctx.state.fire_count} | {h:.1f}h"
async def _run_tick(ctx: RunContext) -> _TickSyncResult:
@@ -1323,17 +1387,203 @@ async def _run_tick(ctx: RunContext) -> _TickSyncResult:
)
async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None:
"""Scheduler start/stop + levels extraction. No-op if res is None/late_start."""
if result.first_consumed:
def _chart_id_for_index(i: int, n: int) -> str:
"""Map (idx, total) → chart_id. n=1 → "" ; n=2 → "left"/"right" ; n>2 → "chart_N"."""
if n == 1:
return ""
if n == 2:
return ("left", "right")[i]
return f"chart_{i}"
def _commit_layout_change(ctx: RunContext, new_strips: list, now: float) -> None:
"""Replace ctx.charts with fresh ChartState objects for new_strips.
Resets per-chart FSM (StateMachine(lockout_s)) and Detector (with
dot_roi_override=strip), zeroes n_primed_global, stops scheduler if running,
audits the layout_change event, sends a status Alert.
"""
from atm.detector import Detector
from atm.state_machine import StateMachine
cfg = ctx.cfg
old_n = len(ctx.charts)
new_n = len(new_strips)
new_charts: list = []
for i, strip in enumerate(new_strips):
chart_id = _chart_id_for_index(i, new_n)
det = Detector(cfg, ctx.capture, dot_roi_override=strip)
fsm = StateMachine(lockout_s=cfg.lockout_s)
new_charts.append(ChartState(
chart_id=chart_id, sub_roi=strip, detector=det, fsm=fsm,
first_accepted=True, last_saved_color=None,
))
ctx.state.n_primed_global = 0
if getattr(ctx.scheduler, "is_running", False):
ctx.scheduler.stop()
ctx.audit.log({"ts": now, "event": "scheduler_stopped", "reason": "layout_change"})
ctx.audit.log({
"ts": now,
"event": "layout_change",
"old_n": old_n,
"new_n": new_n,
"strips": [{"x": s.x, "w": s.w} for s in new_strips],
})
# Suppress notification on first-ever detection (old_n=0 = startup bootstrap,
# not a real layout change).
if old_n > 0:
ctx.notifier.send(Alert(
kind="status",
title=f"🔄 Layout TS schimbat: {old_n}{new_n} ferestre. FSM resetat.",
body="",
silent=True,
))
ctx.charts = new_charts
# Keep ctx.fsm / ctx.detector in sync with charts[0] so legacy code paths
# (brief_status, /ss state_hdr, /status) continue to work after layout change.
if new_charts:
ctx.fsm = new_charts[0].fsm
ctx.detector = new_charts[0].detector
def _detect_strips_for_ctx(ctx: RunContext, frame: Any) -> list:
"""Lazy strip detection for ctx.cfg + frame. Empty list when no vivid pixels."""
from atm.layout import detect_strips
from atm.vision import crop_roi
cfg = ctx.cfg
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
palette = {
name: (spec.rgb, spec.tolerance)
for name, spec in cfg.colors.items()
if name != "background"
}
full_crop = crop_roi(frame, cfg.dot_roi)
raw = detect_strips(full_crop, palette, min_gap_px, min_strip_px)
# detect_strips returns ROIs relative to the cropped region. Translate
# back into frame coordinates by adding cfg.dot_roi origin so per-strip
# detector crops can use the absolute ROI.
from atm.config import ROI
return [
ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y + r.y, w=r.w, h=r.h)
for r in raw
]
async def _run_multi_tick(ctx: RunContext) -> "list[_TickSyncResult]":
"""Multi-chart orchestrator: capture once, detect strips, run per-chart ticks.
Returns a list of _TickSyncResult — one per chart (empty list if capture
failed or canary paused). Layout management is in-process: when strips
don't match current ctx.charts, _commit_layout_change is invoked before
feeding ticks to the new charts.
"""
now = time.time()
if ctx.lifecycle is not None:
skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary)
sb = _brief_status(ctx)
transition = _maybe_log_transition(
skip, ctx.lifecycle, now, ctx.audit, ctx.notifier, status_body=sb,
)
if transition == "market_open" and ctx.cfg.window_title:
title = await asyncio.to_thread(_focus_window_by_title, ctx.cfg.window_title)
ctx.audit.log({"ts": now, "event": "window_focused", "command": "market_open", "title": title})
await asyncio.sleep(0.15)
if skip is not None:
return []
frame = await asyncio.to_thread(ctx.capture)
if frame is None:
ctx.audit.log({"ts": now, "event": "window_lost"})
return []
cr = ctx.canary.check(frame)
if ctx.canary.is_paused:
ctx.audit.log({"ts": now, "event": "paused", "drift": cr.distance})
return []
new_strips = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame)
current_strips = [c.sub_roi for c in ctx.charts]
if new_strips:
if len(new_strips) != len(current_strips):
# Real count change (1↔2 ferestre) → rebuild charts.
_commit_layout_change(ctx, new_strips, now)
else:
# Same count → silent sub_roi update regardless of position/width jitter.
# Width pulsations (e.g. 792↔810px from antialiased edges) must not
# reset FSM or emit alerts — that path produced 500+ false alerts/day.
for c, ns in zip(ctx.charts, new_strips):
c.sub_roi = ns
if hasattr(c.detector, "update_dot_roi"):
c.detector.update_dot_roi(ns)
results: list[_TickSyncResult] = []
for c in ctx.charts:
result = await asyncio.to_thread(
_sync_detection_tick,
ctx.capture, ctx.canary, ctx.cfg, c.detector, c.fsm,
ctx.notifier, ctx.audit, ctx.detection_log,
ctx.fires_dir, c.first_accepted, c.last_saved_color,
now, ctx.samples_dir,
c.chart_id, c.sub_roi, frame,
)
results.append(result)
return results
async def _handle_fsm_result(ctx: RunContext, result: "Any") -> None:
"""Scheduler start/stop + levels extraction.
Accepts either a single ``_TickSyncResult`` (legacy single-chart path) or a
``list[_TickSyncResult]`` (multi-chart split). For multi-chart, scheduler
arbitration uses ``ctx.state.n_primed_global`` (start when 0→1, stop when
1→0). Levels extraction is disabled when ``len(ctx.charts) >= 2`` since the
extractor reads the full window y-axis which is per-chart-private in split.
"""
results: list[_TickSyncResult]
if isinstance(result, list):
results = result
else:
results = [result]
if not results:
return
multi_chart = len(ctx.charts) >= 2
for r in results:
if r.first_consumed:
if multi_chart:
for c in ctx.charts:
if c.chart_id == r.chart_id:
c.first_accepted = False
break
else:
ctx.state.first_accepted = False
if result.new_color is not None:
ctx.state.last_saved_color = result.new_color
if ctx.charts:
ctx.charts[0].first_accepted = False
if r.new_color is not None:
if multi_chart:
for c in ctx.charts:
if c.chart_id == r.chart_id:
c.last_saved_color = r.new_color
break
else:
ctx.state.last_saved_color = r.new_color
if ctx.charts:
ctx.charts[0].last_saved_color = r.new_color
tr = result.tr
res = result.res
if not multi_chart:
# Legacy single-chart path — preserve scheduler+levels semantics exactly.
single = results[0]
tr = single.tr
res = single.res
if result.late_start or res is None:
if single.late_start or res is None:
return
if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None):
@@ -1351,8 +1601,8 @@ async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None:
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time())
if ctx.state.levels_extractor is not None and result.frame is not None:
lr = ctx.state.levels_extractor.step(result.frame, time.time())
if ctx.state.levels_extractor is not None and single.frame is not None:
lr = ctx.state.levels_extractor.step(single.frame, time.time())
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
ctx.notifier.send(Alert(
@@ -1365,6 +1615,36 @@ async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None:
),
))
ctx.state.levels_extractor = None
return
# Multi-chart path: aggregate scheduler arbitration over all results.
for r in results:
tr = r.tr
res = r.res
if r.late_start or res is None or tr is None:
continue
if not (getattr(res, "accepted", False) and getattr(res, "color", None)):
continue
if tr.reason == "prime":
ctx.state.n_primed_global += 1
if ctx.state.n_primed_global == 1 and not ctx.scheduler.is_running:
ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm"):
if ctx.state.n_primed_global > 0:
ctx.state.n_primed_global -= 1
if ctx.state.n_primed_global == 0 and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr.trigger and not tr.locked:
ctx.state.fire_count += 1
if ctx.state.n_primed_global == 0 and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
# Levels disabled in split (per-chart y-axis would need its own calib).
ctx.audit.log({"ts": time.time(), "event": "levels_skipped_split"})
async def _dispatch_command(ctx: RunContext, cmd) -> None:
@@ -1436,8 +1716,21 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
body="",
))
return
# Always re-detect strips fresh — out-of-window or pre-first-tick,
# ctx.charts is empty (lopul de detection nu rulează când e closed),
# dar /ss trebuie să arate corect oricând. Fallback pe ctx.charts
# dacă detecția eșuează, apoi pe cfg.dot_roi via ss_strips=None.
ss_strips: "list | None" = None
try:
fresh = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame_ss)
except Exception:
fresh = []
if fresh:
ss_strips = fresh
elif ctx.charts:
ss_strips = [c.sub_roi for c in ctx.charts]
path_ss, detections_ss = await asyncio.to_thread(
_save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit,
_save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit, ss_strips,
)
caption_ss = _format_inspect_caption(detections_ss, ctx.cfg)
state_hdr = _fsm_step_label(ctx.fsm, now_ss)
@@ -1482,8 +1775,19 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
if frame_r is None:
capture_failed = True
else:
# Same fresh-detect logic ca /ss: out-of-window ctx.charts e gol,
# dar /resume tot trebuie să arate corect ambele chart-uri.
r_strips: "list | None" = None
try:
fresh_r = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame_r)
except Exception:
fresh_r = []
if fresh_r:
r_strips = fresh_r
elif ctx.charts:
r_strips = [c.sub_roi for c in ctx.charts]
path_r, detections_r = await asyncio.to_thread(
_save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit,
_save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit, r_strips,
)
was_drift = bool(getattr(ctx.canary, "is_paused", False))
@@ -1833,6 +2137,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts),
lifecycle=lifecycle,
)
# ctx.charts is populated on the first detection tick by _run_multi_tick
# which calls detect_strips → _commit_layout_change. Starting empty means
# the real strip geometry is used (not a hardcoded cfg.dot_roi placeholder)
# and avoids spurious layout-change alerts.
ctx.charts = []
# ------------------------------------------------------------------
# Nested async coroutines — heartbeat captures notifier + heartbeat_due
@@ -1853,6 +2162,7 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
fire_count=ctx.state.fire_count,
uptime_h=uptime_h,
canary_paused=paused,
charts=ctx.charts if len(ctx.charts) >= 2 else None,
))
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
@@ -1860,8 +2170,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
while True:
if duration_s is not None and (time.monotonic() - start) >= duration_s:
break
result = await _run_tick(ctx)
await _handle_fsm_result(ctx, result)
# Always use _run_multi_tick: it detects strips each tick,
# manages layout changes (1→2 or 2→1), and handles single-chart
# mode identically to the old _run_tick path.
results = await _run_multi_tick(ctx)
await _handle_fsm_result(ctx, results)
await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang
await asyncio.sleep(cfg.loop_interval_s)

View File

@@ -12,6 +12,22 @@ class Alert:
image_path: Path | None = None # annotated screenshot
direction: str | None = None # "BUY"/"SELL" when kind=trigger
silent: bool = False # disable_notification for Telegram; ignored by Discord
chart_id: str = ""
def _alert_prefix(chart_id: str) -> str:
"""Return Telegram title prefix for chart_id. Empty for single-chart mode."""
if not chart_id:
return ""
if chart_id == "left":
return "[stânga] "
if chart_id == "right":
return "[dreapta] "
try:
n = int(chart_id.split("_")[1])
return f"[chart {n + 1}] "
except (IndexError, ValueError):
return f"[{chart_id}] "
class Notifier(Protocol):

View File

@@ -283,6 +283,62 @@ def test_real_screenshot_rightmost_dot(screenshot: str, expected: str) -> None:
)
def test_dot_roi_override_uses_sub_roi() -> None:
"""dot_roi_override must be used instead of cfg.dot_roi for crop + offset.
Paint a yellow dot inside the override ROI but **outside** cfg.dot_roi.
The default DOT_ROI is (10,10,280,80); we override with an ROI placed
well to the right (x=200, w=80) so the painted dot only intersects the
override. If the detector still cropped from cfg.dot_roi the yellow dot
would land at the rightmost edge of the larger ROI as well — so we use
a frame that has nothing in the cfg.dot_roi region except inside the
override window, and assert dot_pos_abs falls inside the override.
"""
override = ROI(x=200, y=20, w=80, h=60)
# Background-only frame, then paint yellow only inside the override
frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8)
fy0, fy1 = override.y, override.y + override.h
fx0, fx1 = override.x + 50, override.x + override.w # right edge of override
frame[fy0:fy1, fx0:fx1] = YELLOW_BGR
cfg = _make_cfg(debounce_depth=1)
det = Detector(cfg, capture=lambda: frame, dot_roi_override=override)
r = det.step(0.0)
assert r.dot_found is True
assert r.match is not None
assert r.match.name == "yellow"
assert r.dot_pos_abs is not None
abs_x, abs_y = r.dot_pos_abs
assert override.x <= abs_x < override.x + override.w
assert override.y <= abs_y < override.y + override.h
def test_dot_pos_abs_with_offset() -> None:
"""dot_pos_abs must include the override ROI's (x, y) offset."""
override = ROI(x=100, y=20, w=50, h=40)
frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8)
# Paint a single full-height yellow stripe at roi-local x in [40, 50)
# so find_rightmost_dot lands somewhere inside that stripe.
fy0, fy1 = override.y, override.y + override.h
fx0, fx1 = override.x + 40, override.x + 50
frame[fy0:fy1, fx0:fx1] = YELLOW_BGR
cfg = _make_cfg(debounce_depth=1)
det = Detector(cfg, capture=lambda: frame, dot_roi_override=override)
r = det.step(0.0)
assert r.dot_found is True
assert r.dot_pos_abs is not None
abs_x, abs_y = r.dot_pos_abs
# Painted stripe: roi-local x in [40,50), y in [0, h). Absolute coords
# must be offset by override.(x, y).
assert override.x + 40 <= abs_x < override.x + 50
assert override.y <= abs_y < override.y + override.h
def test_fused_blob_samples_rightmost_dot() -> None:
"""Fused multi-colour stripe must classify the rightmost colour, not the
centroid colour. Pre-fix the centroid fell on an interior gray segment

163
tests/test_layout.py Normal file
View File

@@ -0,0 +1,163 @@
import numpy as np
import pytest
from atm.config import ROI
from atm.layout import _strips_match, detect_strips
PALETTE = {
"turquoise": ((0, 253, 253), 60.0),
"yellow": ((253, 253, 0), 60.0),
"dark_green": ((0, 128, 0), 60.0),
"dark_red": ((128, 0, 0), 60.0),
"light_green": ((0, 255, 0), 60.0),
"light_red": ((255, 0, 0), 60.0),
}
def _blank(h: int = 20, w: int = 200) -> np.ndarray:
return np.zeros((h, w, 3), dtype=np.uint8)
def _paint(img: np.ndarray, x0: int, x1: int, rgb: tuple[int, int, int]) -> None:
"""Paint vivid color into BGR image (palette stores RGB)."""
bgr = (rgb[2], rgb[1], rgb[0])
img[:, x0:x1] = bgr
def test_single_strip():
img = _blank(20, 200)
_paint(img, 0, 200, (0, 253, 253)) # turquoise
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 1
assert out[0].x == 0
assert abs(out[0].w - 200) <= 1
assert out[0].h == 20
def test_split_50_50():
img = _blank(20, 230)
_paint(img, 0, 100, (0, 253, 253))
_paint(img, 130, 230, (253, 253, 0))
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 2
assert out[0].x < out[1].x # L->R
assert out[0].x == 0
assert abs(out[0].w - 100) <= 1
assert out[1].x == 130
assert abs(out[1].w - 100) <= 1
def test_split_asymmetric():
img = _blank(20, 230)
_paint(img, 0, 70, (0, 253, 253)) # 35% width
_paint(img, 100, 230, (253, 253, 0)) # 65% width
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 2
assert abs(out[0].w - 70) <= 1
assert abs(out[1].w - 130) <= 1
def test_gray_only_no_strip():
img = _blank(20, 200)
img[:, 50:150] = (128, 128, 128)
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert out == []
def test_cooldown_gray_dots_no_detect():
img = _blank(20, 200)
# scattered gray dots
for x in (20, 50, 80, 110, 140, 170):
img[8:12, x:x + 4] = (100, 100, 100)
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert out == []
def test_vivid_palette_match():
img = _blank(20, 200)
_paint(img, 50, 80, (0, 255, 0)) # light_green
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 1
def test_blank_frame():
img = _blank(20, 200)
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert out == []
def test_strip_too_narrow_filtered():
img = _blank(20, 200)
_paint(img, 50, 53, (0, 253, 253)) # only 3px wide
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=10)
assert out == []
def test_small_gap_fuses():
img = _blank(20, 200)
_paint(img, 30, 70, (0, 253, 253))
_paint(img, 75, 120, (0, 253, 253)) # 5px gap, < min_gap_px=20
out = detect_strips(img, PALETTE, min_gap_px=20, min_strip_px=5)
assert len(out) == 1
assert abs(out[0].x - 30) <= 2
assert abs((out[0].x + out[0].w) - 120) <= 2
def test_split_two_charts_with_interleaved_gray():
# Regresie 2 ferestre TS: fiecare row de buline e mix vivid + gri, separate
# de un gap larg de background (dividerul dintre ferestre). Înainte de fix
# detect_strips picka doar runs vivid contiguu și rata fereastra stângă.
palette = {**PALETTE, "gray": ((128, 128, 128), 60.0)}
img = _blank(35, 1796)
# Left chart: dots vivid + gray la fiecare 26px, x=0..820
for i, x in enumerate(range(0, 820, 26)):
rgb = (128, 128, 128) if i % 3 else (0, 128, 0)
_paint(img, x, x + 22, rgb)
# Window divider gap: x=820..910 rămâne background
# Right chart: same pattern, x=910..1796
for i, x in enumerate(range(910, 1790, 26)):
rgb = (128, 128, 128) if i % 3 else (0, 128, 0)
_paint(img, x, x + 22, rgb)
out = detect_strips(img, palette, min_gap_px=28, min_strip_px=280)
assert len(out) == 2, f"expected 2 strips, got {len(out)}: {out}"
assert out[0].x == 0
assert out[1].x >= 900 # right chart starts after divider
assert out[0].x + out[0].w < out[1].x # disjoint
def test_three_strips():
img = _blank(20, 300)
_paint(img, 0, 60, (0, 253, 253))
_paint(img, 100, 160, (253, 253, 0))
_paint(img, 220, 300, (0, 255, 0))
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 3
assert out[0].x < out[1].x < out[2].x
assert out[0].x == 0
assert out[1].x == 100
assert out[2].x == 220
def test_strips_match_identical():
a = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
b = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
assert _strips_match(a, b) is True
def test_strips_match_jitter_5px():
a = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
b = [ROI(x=5, y=0, w=95, h=20), ROI(x=128, y=0, w=70, h=20)]
assert _strips_match(a, b, tol=10) is True
def test_strips_match_drift_12px():
a = [ROI(x=0, y=0, w=100, h=20)]
b = [ROI(x=12, y=0, w=100, h=20)]
assert _strips_match(a, b, tol=10) is False
def test_strips_match_count_different():
a = [ROI(x=0, y=0, w=100, h=20)]
b = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
assert _strips_match(a, b) is False

View File

@@ -257,6 +257,14 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
# Bootstrap a single chart so _run_multi_tick populates ctx.charts on tick 1.
# Frame is zeros → real detect_strips returns [] → without this, charts stays
# empty and the ScriptedDetector loop never advances (regression after the
# multi-chart refactor made _run_multi_tick the single detection path).
from atm.config import ROI
_bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)]
monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip)
with pytest.raises(_StopLoop):
_main.run_live(cfg, duration_s=None)
@@ -384,6 +392,11 @@ async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_p
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
# Bootstrap ctx.charts on tick 1 — see test_run_live_catchup_sell for context.
from atm.config import ROI
_bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)]
monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip)
with pytest.raises(_StopLoop):
await _main.run_live_async(cfg, duration_s=None)
@@ -1628,3 +1641,451 @@ async def test_status_window_line_when_oh_enabled():
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
body = status[0].body
assert "fereastră: deschisă" in body
# ---------------------------------------------------------------------------
# Multi-chart split workspace tests
# ---------------------------------------------------------------------------
def test_chart_state_defaults():
"""ChartState's first_accepted defaults to True; last_saved_color to None."""
import atm.main as _main
from atm.config import ROI
roi = ROI(x=0, y=0, w=200, h=35)
chart = _main.ChartState(
chart_id="", sub_roi=roi, detector=MagicMock(), fsm=MagicMock(),
)
assert chart.first_accepted is True
assert chart.last_saved_color is None
assert chart.chart_id == ""
assert chart.sub_roi is roi
def test_alert_prefix_import():
"""_alert_prefix lives in atm.notifier and produces the expected prefixes."""
from atm.notifier import _alert_prefix
assert _alert_prefix("") == ""
assert _alert_prefix("left") == "[stânga] "
assert _alert_prefix("right") == "[dreapta] "
assert _alert_prefix("chart_0") == "[chart 1] "
assert _alert_prefix("chart_2") == "[chart 3] "
def test_save_annotated_frame_no_price_overlay(tmp_path):
"""_save_annotated_frame must NOT draw the price overlay anymore.
Top-right area must contain no white pixels (text was rendered there before).
"""
import atm.main as _main
from atm.config import ROI, YAxisCalib, ColorSpec
frame = np.zeros((200, 400, 3), dtype=np.uint8)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=10, y=10, w=380, h=180),
y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=190, p2_price=50.0),
colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)},
)
fpath = _main._save_annotated_frame(
frame, cfg, tmp_path, "test", now=123.0,
dot_pos_abs=(200, 100), canary_ok=True,
)
assert fpath is not None
saved = cv2.imread(str(fpath))
assert saved is not None
# Top-right corner where the "$..." text used to live (rows 0..40, cols 300..390).
# The cyan ROI rect is on top edge but only 2px thick → vast majority of that
# corner is still pure black. White text would average a much higher value.
corner = saved[0:40, 300:390]
# No bright white pixels (text was 255,255,255).
white_pixels = ((corner[:, :, 0] > 200) & (corner[:, :, 1] > 200) & (corner[:, :, 2] > 200)).sum()
assert white_pixels == 0, f"expected 0 white pixels (no price text), got {white_pixels}"
def test_save_annotated_frame_uses_roi_param(tmp_path):
"""When roi= is passed, the cyan rect is drawn there (not at cfg.dot_roi)."""
import atm.main as _main
from atm.config import ROI, ColorSpec
frame = np.zeros((200, 400, 3), dtype=np.uint8)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=0, y=0, w=10, h=10), # tiny cfg ROI
colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)},
)
custom_roi = ROI(x=100, y=50, w=200, h=80)
fpath = _main._save_annotated_frame(
frame, cfg, tmp_path, "rtest", now=123.0, roi=custom_roi,
)
assert fpath is not None
saved = cv2.imread(str(fpath))
# ROI rect color is (0, 255, 255) BGR. Find any pixel where G+R are saturated and B=0.
edge = saved[49:52, 100:300]
rect_present = ((edge[:, :, 0] < 50) & (edge[:, :, 1] > 200) & (edge[:, :, 2] > 200)).any()
assert rect_present, "Expected ROI rect along custom roi top edge"
# Also check the cfg.dot_roi rect (x=0..10, y=0..10) is NOT drawn — proves we used roi=custom.
cfg_corner = saved[0:10, 0:10]
rect_at_cfg = ((cfg_corner[:, :, 0] < 50) & (cfg_corner[:, :, 1] > 200) & (cfg_corner[:, :, 2] > 200)).any()
assert not rect_at_cfg, "cfg.dot_roi rect must not appear when roi= override is used"
def test_commit_layout_change_resets_fsm(monkeypatch):
"""_commit_layout_change rebuilds ctx.charts, resets FSM, zeroes n_primed_global."""
import atm.main as _main
from atm.config import ROI
from atm.state_machine import StateMachine, State
# Prebuild a ctx with one chart in ARMED_BUY.
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
fsm_old = StateMachine(lockout_s=60)
fsm_old.feed("turquoise", 1.0) # IDLE -> ARMED_BUY
assert fsm_old.state == State.ARMED_BUY
initial_chart = _main.ChartState(
chart_id="", sub_roi=ROI(x=0, y=0, w=400, h=35),
detector=MagicMock(), fsm=fsm_old,
)
state = _main._LoopState()
state.n_primed_global = 5 # any non-zero value
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = True
def stop(self):
type(self).is_running = False
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=fsm_old,
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
ctx.charts = [initial_chart]
new_strips = [
ROI(x=0, y=0, w=200, h=35),
ROI(x=250, y=0, w=200, h=35),
]
_main._commit_layout_change(ctx, new_strips, now=100.0)
assert len(ctx.charts) == 2
assert ctx.charts[0].chart_id == "left"
assert ctx.charts[1].chart_id == "right"
# New FSMs must be IDLE (fresh StateMachine)
assert ctx.charts[0].fsm.state == State.IDLE
assert ctx.charts[1].fsm.state == State.IDLE
# Old fsm not reused
assert ctx.charts[0].fsm is not fsm_old
# n_primed_global reset
assert state.n_primed_global == 0
# layout_change audit + status alert
assert any(e.get("event") == "layout_change" and e["new_n"] == 2 for e in audit_events)
assert any("Layout TS schimbat" in a.title for a in notifier_alerts)
def test_commit_layout_change_chart_ids_for_n3(monkeypatch):
"""For n>=3 charts use 'chart_0', 'chart_1', ..."""
import atm.main as _main
from atm.config import ROI
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
state = _main._LoopState()
class _N:
def send(self, a): pass
class _A:
def log(self, e): pass
class _S:
is_running = False
def stop(self): pass
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=MagicMock(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
ctx.charts = []
strips = [ROI(x=i * 100, y=0, w=80, h=35) for i in range(3)]
_main._commit_layout_change(ctx, strips, now=100.0)
assert [c.chart_id for c in ctx.charts] == ["chart_0", "chart_1", "chart_2"]
def test_strips_match_silent_update_jitter():
"""_strips_match returns True for <=10px jitter — used to trigger silent update."""
from atm.layout import _strips_match
from atm.config import ROI
a = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
b = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)]
assert _strips_match(a, b, tol=10) is True
# Drift 12px breaks the match
c = [ROI(x=12, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
assert _strips_match(a, c, tol=10) is False
def test_build_heartbeat_alert_single_compat():
"""Without charts arg, _build_heartbeat_alert returns the legacy single-chart body."""
import atm.main as _main
a = _main._build_heartbeat_alert(
fsm_state="IDLE", fire_count=0, uptime_h=1.0, canary_paused=False,
)
assert a.kind == "heartbeat"
assert a.title == "activ"
assert a.body == "IDLE | semnale: 0 | 1.0h"
def test_build_heartbeat_alert_multi_chart():
"""charts=[left,right] body lines reference [stânga] / [dreapta]."""
import atm.main as _main
from atm.config import ROI
fsm_left = types.SimpleNamespace(state=types.SimpleNamespace(value="ARMED_BUY"))
fsm_right = types.SimpleNamespace(state=types.SimpleNamespace(value="IDLE"))
cs_left = _main.ChartState(
chart_id="left", sub_roi=ROI(x=0, y=0, w=10, h=10),
detector=MagicMock(), fsm=fsm_left,
)
cs_right = _main.ChartState(
chart_id="right", sub_roi=ROI(x=0, y=0, w=10, h=10),
detector=MagicMock(), fsm=fsm_right,
)
a = _main._build_heartbeat_alert(
fsm_state="ignored", fire_count=2, uptime_h=1.5, canary_paused=False,
charts=[cs_left, cs_right],
)
assert a.kind == "heartbeat"
assert "[stânga]" in a.body
assert "[dreapta]" in a.body
assert "ARMED_BUY" in a.body
assert "IDLE" in a.body
assert "semnale: 2" in a.body
@pytest.mark.asyncio
async def test_run_multi_tick_silent_jitter_update(monkeypatch, tmp_path):
"""When detect_strips returns positions with <=10px jitter, sub_roi updates
silently (no _commit_layout_change → no extra status alert)."""
import atm.main as _main
from atm.config import ROI
monkeypatch.chdir(tmp_path)
initial = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
jittered = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)]
class _Det:
def __init__(self): self.last_roi = None
def step(self, ts, frame=None):
from atm.detector import DetectionResult
return DetectionResult(
ts=ts, window_found=True, dot_found=False,
rgb=None, match=None, accepted=False, color=None,
)
def update_dot_roi(self, roi):
self.last_roi = roi
class _FSM:
state = types.SimpleNamespace(value="IDLE")
_last_fire: dict = {}
cfg = MagicMock()
cfg.lockout_s = 60
cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False)
cfg.window_title = None
state_obj = _main._LoopState()
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def start(self, s): pass
def stop(self): pass
canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False))
ctx = _main.RunContext(
cfg=cfg, capture=lambda: np.zeros((200, 600, 3), dtype=np.uint8),
canary=canary, detector=MagicMock(), fsm=_FSM(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path,
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
det_l, det_r = _Det(), _Det()
ctx.charts = [
_main.ChartState("left", initial[0], det_l, _FSM()),
_main.ChartState("right", initial[1], det_r, _FSM()),
]
# Stub strip detection to return jittered strips.
monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: jittered)
results = await _main._run_multi_tick(ctx)
assert len(results) == 2
# Sub-ROI silently updated, but no layout_change event.
assert ctx.charts[0].sub_roi == jittered[0]
assert ctx.charts[1].sub_roi == jittered[1]
assert det_l.last_roi == jittered[0]
assert det_r.last_roi == jittered[1]
assert not any(e.get("event") == "layout_change" for e in audit_events)
# No status "Layout TS schimbat" alert.
assert not any("Layout TS schimbat" in a.title for a in notifier_alerts)
@pytest.mark.asyncio
async def test_run_multi_tick_silent_width_oscillation(monkeypatch, tmp_path):
"""Regression: width pulsations (e.g. 792↔810px) on a stable chart count
must NOT trigger _commit_layout_change. Reproduces the production bug
visible in logs/2026-05-04.jsonl (576 layout_change events/day).
"""
import atm.main as _main
from atm.config import ROI
monkeypatch.chdir(tmp_path)
initial = [ROI(x=0, y=0, w=792, h=35), ROI(x=912, y=0, w=845, h=35)]
pulsed = [ROI(x=0, y=0, w=810, h=35), ROI(x=912, y=0, w=863, h=35)]
class _Det:
def __init__(self): self.last_roi = None
def step(self, ts, frame=None):
from atm.detector import DetectionResult
return DetectionResult(
ts=ts, window_found=True, dot_found=False,
rgb=None, match=None, accepted=False, color=None,
)
def update_dot_roi(self, roi):
self.last_roi = roi
class _FSM:
state = types.SimpleNamespace(value="PRIMED_BUY")
_last_fire: dict = {}
cfg = MagicMock()
cfg.lockout_s = 60
cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False)
cfg.window_title = None
state_obj = _main._LoopState()
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def start(self, s): pass
def stop(self): pass
canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False))
fsm_left = _FSM()
fsm_right = _FSM()
ctx = _main.RunContext(
cfg=cfg, capture=lambda: np.zeros((200, 1800, 3), dtype=np.uint8),
canary=canary, detector=MagicMock(), fsm=fsm_left,
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path,
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
det_l, det_r = _Det(), _Det()
ctx.charts = [
_main.ChartState("left", initial[0], det_l, fsm_left),
_main.ChartState("right", initial[1], det_r, fsm_right),
]
monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: pulsed)
await _main._run_multi_tick(ctx)
# Sub-ROI silently updated.
assert ctx.charts[0].sub_roi == pulsed[0]
assert ctx.charts[1].sub_roi == pulsed[1]
assert det_l.last_roi == pulsed[0]
assert det_r.last_roi == pulsed[1]
# FSM identity preserved (not rebuilt).
assert ctx.charts[0].fsm is fsm_left
assert ctx.charts[1].fsm is fsm_right
# Critical: no layout_change event, no status alert.
assert not any(e.get("event") == "layout_change" for e in audit_events)
assert not any("Layout TS schimbat" in a.title for a in notifier_alerts)
def test_commit_layout_change_alert_is_silent(monkeypatch):
"""Layout-change alert on real count change uses silent=True so Telegram
doesn't ping the user. Re-anchor info still visible in chat history.
"""
import atm.main as _main
from atm.config import ROI
from atm.state_machine import StateMachine
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
cfg.confidence_min = 0.5
cfg.dot_roi = ROI(x=0, y=0, w=600, h=35)
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def stop(self): pass
state_obj = _main._LoopState()
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=StateMachine(60),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
ctx.charts = [
_main.ChartState("only", ROI(x=0, y=0, w=400, h=35), MagicMock(), StateMachine(60)),
]
new_strips = [
ROI(x=0, y=0, w=200, h=35),
ROI(x=250, y=0, w=200, h=35),
]
_main._commit_layout_change(ctx, new_strips, now=100.0)
layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title]
assert len(layout_alerts) == 1
assert layout_alerts[0].silent is True

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import pytest
from atm.notifier import Alert
from atm.notifier import Alert, _alert_prefix
from atm.notifier.fanout import FanoutNotifier
@@ -358,3 +358,32 @@ def test_fanout_on_drop_exception_swallowed(tmp_path: Path) -> None:
s = fan.stats()
# Some alerts still went through
assert s["slow"]["sent"] > 0 or s["slow"]["dropped"] > 0
# ---------------------------------------------------------------------------
# Alert.chart_id + _alert_prefix
# ---------------------------------------------------------------------------
def test_alert_chart_id_default() -> None:
assert Alert(kind="arm", title="t", body="b").chart_id == ""
def test_alert_chart_id_set() -> None:
assert Alert(kind="arm", title="t", body="b", chart_id="left").chart_id == "left"
def test_alert_prefix_empty() -> None:
assert _alert_prefix("") == ""
def test_alert_prefix_left() -> None:
assert _alert_prefix("left") == "[stânga] "
def test_alert_prefix_right() -> None:
assert _alert_prefix("right") == "[dreapta] "
def test_alert_prefix_chart_n() -> None:
assert _alert_prefix("chart_0") == "[chart 1] "
assert _alert_prefix("chart_1") == "[chart 2] "