Compare commits

...

2 Commits

Author SHA1 Message Date
839caacc95 agents 2026-05-05 18:02:37 +03:00
c950a5a699 feat(multi-chart): refactor _run_multi_tick + fix alert spam pe oscilație strip
Bug critic: _strips_match(tol=10) trip pe pulsații naturale de lățime ~18px între
ticks (ex. 792↔810px). Fiecare trip → _commit_layout_change → reset FSM + alert
Telegram + scheduler stop. Logul 2026-05-04.jsonl arăta 576 evenimente
layout_change/zi, plus prime alerts repetate la dark_red/dark_green (FSM resetat
înghite lockout-ul) și sincronizare cross-chart pe ambele FSM-uri simultan.

Fix:
- main.py:1511 — gate doar pe count change (len(new) != len(current));
  count stabil → silent update sub_roi indiferent de jitter
- main.py:1438 — silent=True pe alert layout_change (Telegram fără sunet)
- 2 teste regresie noi: width oscillation 792↔810 + silent assertion
- 2 teste async reparate: bootstrap _detect_strips_for_ctx pentru ScriptedDetector
  (regresie după ce _run_multi_tick a devenit unica cale de detecție)

Plus refactor multi-chart pre-existent: layout.py modul nou, _detect_strips_for_ctx,
ChartState per-chart FSM/Detector, ROI per-strip pe screenshots, scripts/diag_*.

Verificat: 292 passed, 2 skipped în 10s.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-05-05 17:59:18 +03:00
15 changed files with 2042 additions and 255 deletions

14
.gitignore vendored
View File

@@ -74,3 +74,17 @@ calibrate_capture_*.png
# Debug captures # Debug captures
debug_*.png debug_*.png
logs/*.png logs/*.png
# Test/dev scratch output
pytest-*.log
pytest-*.err
*.log
*.err
# Auto-captured calibration frames (use `git add -f` to commit selected ones)
calibration/frames/*.png
# Misc clutter
*.jpeg
*.jpg
!samples/*.jpg

90
AGENTS.md Normal file
View File

@@ -0,0 +1,90 @@
# ATM — Automated Trading Monitor
Personal Faza-1 tool for the M2D strategy. Python 3.11+.
## Quick Reference
```bash
pip install -e ".[windows]" # Windows: live capture
pip install -e ".[dev]" # Linux/macOS: dev + tests (WSL: create venv first)
cp .env.example .env # secretele Discord/Telegram (vezi README §Secrets)
atm calibrate # Tk wizard
atm debug --delay 5 # one-shot capture + detect
atm validate-calibration calibration/calibration_labels.json # offline color gate
atm run --start-at 16:30 --stop-at 23:00 # live session
atm run --tz America/New_York --oh-start 09:30 --oh-stop 16:00 # NYSE window override
pytest -q # 230+ tests (core + 8 scenarii regresie + env loader)
pytest tests/test_scenarios_regression.py -v # FSM pe imagini reale
```
## Codex sandbox/tooling notes
On this Windows checkout, do not assume `rg` or the global `python` has project deps.
Use the repo venv for diagnostics that need Pillow/OpenCV:
```powershell
.\.venv\Scripts\python.exe scripts\inspect_image_pixels.py 6033117943853423831.jpg
.\.venv\Scripts\python.exe scripts\inspect_image_pixels.py 6033117943853423831.jpg --point 1780 725
```
If `rg` is missing, use PowerShell fallbacks:
`Get-ChildItem -Recurse -File src,tests,scripts | Select-String -Pattern "needle"`.
If a command fails due to sandbox permissions and is required for the task, rerun it
with an escalation request instead of stopping the investigation without a verdict.
## Calibration corpus
`calibration/` — persistent, auto-suficient. Conține:
- `frames/` — PNG-uri raw `{ts}_{color}.png` scrise **automat** de live loop la fiecare schimbare de culoare (filename = culoarea detectată, poate fi greșită)
- `calibration_labels.json` — ground truth **manual** (gate offline pentru `atm validate-calibration`)
- `scenarios.json` — secvențe FSM pentru `tests/test_scenarios_regression.py`
Workflow după sesiune: review frame-urile noi din `frames/`, adaugi entry-uri în `calibration_labels.json` cu culoarea pe care ai văzut-o TU pe chart (nu neapărat cea din filename), rulezi `atm validate-calibration`.
## Telegram commands (live)
`/ss` `/status` `/pause` `/resume` `/rebase` `/3` (interval min) `/stop`
- `/rebase` — propune un `baseline_phash` nou pentru canary: capturează frame, crop pe `canary.roi`, phash → trimite screenshot adnotat (cerc roșu pe ROI) cu old/new hash + distance. `/rebase confirm` în ≤180s aplică: rescrie `baseline_phash` în TOML-ul activ (păstrează comentariile), mirror în `cfg` la runtime, clear `user_paused` + `drift_paused`. Fără confirm, nimic nu se modifică. Folosește-l când layout-ul TS s-a schimbat intenționat și vrei să re-ancorezi canary-ul fără `atm calibrate` full.
- `/ss` — verify multi-bulină: adnotează top-3 buline din `dot_roi` (cerc roșu gros pe pick-ul FSM, cercuri colorate subțiri pe vecini) + caption cu clasificarea fiecăreia (nume, RGB, distanță, confidence) + `config: {version}`. Cercul colorat folosește `cfg.colors[name].rgb` la runtime — DRY cu paleta activă.
- `/resume` clears BOTH user pause and canary drift-pause in one shot (`/resume force` still accepted as legacy alias). Trimite un singur Alert cu screenshot adnotat inline (capture rulează **înainte** de clearing state → zero race cu FSM tick-uri). Dacă capture eșuează, title conține `⚠️ captură eșuată` și resume-ul se execută oricum.
- Drift-pause emits a single Telegram alert on transition. While paused, `/set_interval` is refused and `/ss` captions warn that detection is off.
- Heartbeat shows `⚠️ pauzat (drift)` instead of `activ` while canary is paused.
## Operating-hours config
`[options.operating_hours]` in TOML: `enabled`, `timezone` (NYSE local, e.g. `America/New_York`), `weekdays`, `start_hhmm`, `stop_hhmm`. Timezone validated at load; `_tz_cache` reused per tick. Boundary crossings log `market_open` / `market_closed` and notify once. Startup in-window is silent.
## Phase-skip backstop
`[options.alerts] fire_on_phase_skip = true` (default) — ARMED→light_* direct (dark_* missed) still emits a `⚠️ PHASE SKIP` alert using FSM lockout to suppress spam.
## Palette gotcha (2026-04-21 recalibration)
TradeStation M2D indicators paint the four bright colors at near-pure saturation:
turquoise `(0,253,253)`, yellow `(253,253,0)`, light_green `(0,255,0)`, light_red `(255,0,0)`.
If Tk-wizard calibration samples a slightly desaturated pixel, classifier returns `UNKNOWN`
(distance > tolerance=60) → FSM never sees trigger → stuck in PRIMED → scheduler polls
forever. Always run `atm validate-calibration calibration/calibration_labels.json` after
recalibrating. Current active config: `configs/2026-04-21-recalib.toml`.
## Skill routing
When the user's request matches an available skill, ALWAYS invoke it using the Skill
tool as your FIRST action. Do NOT answer directly, do NOT use other tools first.
The skill has specialized workflows that produce better results than ad-hoc answers.
Key routing rules:
- Product ideas, "is this worth building", brainstorming → invoke office-hours
- Bugs, errors, "why is this broken", 500 errors → invoke investigate
- Ship, deploy, push, create PR → invoke ship
- QA, test the site, find bugs → invoke qa
- Code review, check my diff → invoke review
- Update docs after shipping → invoke document-release
- Weekly retro → invoke retro
- Design system, brand → invoke design-consultation
- Visual audit, design polish → invoke design-review
- Architecture review → invoke plan-eng-review
- Save progress, checkpoint, resume → invoke checkpoint
- Code quality, health check → invoke health

View File

@@ -2,9 +2,9 @@ window_title = "m2d"
[dot_roi] [dot_roi]
x = 0 x = 0
y = 712 y = 720
w = 1796 w = 1796
h = 35 h = 40
[chart_roi] [chart_roi]
x = 17 x = 17
@@ -53,7 +53,7 @@ p2_y = 664
p2_price = 483.2 p2_price = 483.2
[canary] [canary]
baseline_phash = "fbe145390c1abec23204017757a326b8e37077288ef79947310a89c70e07ffff" baseline_phash = "c11f4a852ec09f3a8de4e4cf4ad76d84f10b19d3e708663c38f5b538877c6624"
drift_threshold = 8 drift_threshold = 8
[canary.roi] [canary.roi]

View File

@@ -0,0 +1,133 @@
r"""Diagnose why detect_strips finds 1 strip when there are 2 TS windows.
Reuses the most recent raw capture under logs/repro/. For each strip detected,
prints the connected-components vivid mask, gaps, and the contiguous run lengths
so we can see exactly where the threshold is killing the second strip.
"""
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import cv2 # noqa: E402
import numpy as np # noqa: E402
from atm.config import Config # noqa: E402
from atm.layout import VIVID_COLORS, detect_strips # noqa: E402
from atm.vision import crop_roi, find_top_dots, classify_pixel, pixel_rgb # noqa: E402
def _latest_raw() -> Path:
candidates = sorted((ROOT / "logs" / "repro").glob("*_raw.png"))
if not candidates:
raise SystemExit("No *_raw.png in logs/repro — run scripts/repro_ss_resume.py first.")
return candidates[-1]
def main() -> int:
cfg = Config.load_current(ROOT / "configs")
raw_path = _latest_raw()
print(f"Using raw frame: {raw_path}")
frame = cv2.imread(str(raw_path), cv2.IMREAD_COLOR)
if frame is None:
raise SystemExit(f"Could not read {raw_path}")
palette = {
name: (spec.rgb, spec.tolerance)
for name, spec in cfg.colors.items()
if name != "background"
}
full_crop = crop_roi(frame, cfg.dot_roi)
h, w = full_crop.shape[:2]
print(f"dot_roi crop shape: {h}x{w} (cfg.dot_roi={cfg.dot_roi})")
# 1) Build the vivid mask used by detect_strips
mask = np.zeros((h, w), dtype=np.uint8)
img_f = full_crop.astype(np.float32)
per_color_pixels = {}
for name in VIVID_COLORS:
if name not in palette:
continue
rgb, tol = palette[name]
bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32)
diff = np.linalg.norm(img_f - bgr, axis=2)
m = (diff < tol).astype(np.uint8)
per_color_pixels[name] = int(m.sum())
mask |= m
print("\nVIVID_COLORS pixel counts in dot_roi crop (pre-morphology):")
for n, c in per_color_pixels.items():
print(f" {n:12s} {c:>7d} px")
print(f" total mask: {int(mask.sum()):>7d} px")
# 2) Column projection: which x columns have any vivid pixel?
col_has = mask.any(axis=0).astype(np.uint8)
runs = []
i = 0
while i < w:
if col_has[i] == 0:
i += 1
continue
j = i
while j < w and col_has[j] == 1:
j += 1
runs.append((i, j - 1, j - i))
i = j
print(f"\nContiguous vivid-column runs (raw, no morphology): {len(runs)}")
for x0, x1, run_w in runs[:25]:
print(f" x={x0:>4d}..{x1:>4d} width={run_w}")
if len(runs) > 25:
print(f" ... +{len(runs) - 25} more")
# 3) Apply same morphology + connected components as detect_strips
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
kw = max(3, min_gap_px // 2)
print(f"\ndetect_strips params: min_strip_px={min_strip_px} min_gap_px={min_gap_px} morphology kw={kw}")
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
print(f"\nConnected components after CLOSE (kw={kw}): n={n_labels - 1} (excluding bg)")
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
y = int(stats[i, cv2.CC_STAT_TOP])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
hh = int(stats[i, cv2.CC_STAT_HEIGHT])
passes = "PASS" if ww >= min_strip_px else "drop"
print(f" [{i:>2d}] x={x:>4d} y={y:>3d} w={ww:>4d} h={hh:>3d} -> {passes}")
strips = detect_strips(full_crop, palette, min_gap_px, min_strip_px)
print(f"\ndetect_strips() final result: {len(strips)} strip(s)")
for i, s in enumerate(strips):
print(f" strip[{i}] x={s.x} y={s.y} w={s.w} h={s.h}")
# 4) Run find_top_dots on the FULL dot_roi (single-chart fallback path used
# by /ss when ctx.charts < 2). This is what users see when the layout
# detector hasn't promoted the layout to multi-chart yet.
bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18)
bg_tol = cfg.colors["background"].tolerance if "background" in cfg.colors else 15.0
full_dots = find_top_dots(full_crop, bg_rgb, bg_tol, n=3)
print(f"\nfind_top_dots on FULL dot_roi (n=3, single-chart fallback path):")
for i, (cx, cy) in enumerate(full_dots):
rgb = pixel_rgb(full_crop, cx, cy)
m = classify_pixel(rgb, palette)
print(f" c{i + 1}: pos=({cx + cfg.dot_roi.x},{cy + cfg.dot_roi.y}) rgb={rgb} -> {m.name} d={m.distance:.1f}")
# 5) Save the binary mask + closed mask for visual inspection
out_dir = ROOT / "logs" / "repro"
cv2.imwrite(str(out_dir / "diag_mask_raw.png"), (mask * 255).astype(np.uint8))
cv2.imwrite(str(out_dir / "diag_mask_closed.png"), (closed * 255).astype(np.uint8))
print(f"\nWrote diag_mask_raw.png + diag_mask_closed.png to {out_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

162
scripts/diag_strip_fixes.py Normal file
View File

@@ -0,0 +1,162 @@
r"""Verify the two proposed fixes for detect_strips on the latest 2-window capture.
Fix A: include gray in VIVID_COLORS mask (cheapest change).
Fix B: use non-background mask (any pixel where diff(bg) > bg_tol).
Reuses the most recent raw capture in logs/repro/.
"""
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import cv2 # noqa: E402
import numpy as np # noqa: E402
from atm.config import Config, ROI # noqa: E402
from atm.vision import crop_roi # noqa: E402
def _detect_strips_with_palette(
full_dot_crop, palette, color_names, min_gap_px, min_strip_px,
):
"""Same body as layout.detect_strips but with selectable color set."""
h, w = full_dot_crop.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
img_f = full_dot_crop.astype(np.float32)
for name in color_names:
if name not in palette:
continue
rgb, tol = palette[name]
bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32)
diff = np.linalg.norm(img_f - bgr, axis=2)
mask |= (diff < tol).astype(np.uint8)
kw = max(3, min_gap_px // 2)
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
out = []
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
if ww < min_strip_px:
continue
out.append(ROI(x=x, y=0, w=ww, h=h))
out.sort(key=lambda r: r.x)
return out, mask, closed
def _detect_strips_non_bg(full_dot_crop, bg_rgb, bg_tol, min_gap_px, min_strip_px):
"""Fix B: any pixel different from background → strip mask."""
h, w = full_dot_crop.shape[:2]
bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32)
diff = np.linalg.norm(full_dot_crop.astype(np.float32) - bgr_bg, axis=2)
mask = (diff > bg_tol).astype(np.uint8)
kw = max(3, min_gap_px // 2)
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
out = []
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
if ww < min_strip_px:
continue
out.append(ROI(x=x, y=0, w=ww, h=h))
out.sort(key=lambda r: r.x)
return out, mask, closed
def _annotate(frame, strips_abs, label, out_dir):
annotated = frame.copy()
for r in strips_abs:
cv2.rectangle(annotated, (r.x, r.y), (r.x + r.w, r.y + r.h), (0, 255, 255), 2)
p = out_dir / f"diag_fix_{label}.png"
cv2.imwrite(str(p), annotated)
return p
def main() -> int:
cfg = Config.load_current(ROOT / "configs")
raws = sorted(p for p in (ROOT / "logs" / "repro").glob("*_raw.png") if p.name[0].isdigit())
if not raws:
raise SystemExit("Run scripts/repro_ss_resume.py first.")
raw_path = raws[-1]
print(f"Using: {raw_path}")
frame = cv2.imread(str(raw_path), cv2.IMREAD_COLOR)
palette = {n: (s.rgb, s.tolerance) for n, s in cfg.colors.items() if n != "background"}
bg_rgb = cfg.colors["background"].rgb
bg_tol = cfg.colors["background"].tolerance
full_crop = crop_roi(frame, cfg.dot_roi)
h, w = full_crop.shape[:2]
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
print(f"params: min_strip_px={min_strip_px} min_gap_px={min_gap_px} crop={w}x{h}")
out_dir = ROOT / "logs" / "repro"
# Baseline (current code: vivid only, no gray)
vivid_only = ("turquoise", "yellow", "dark_green", "dark_red", "light_green", "light_red")
s0, m0, c0 = _detect_strips_with_palette(full_crop, palette, vivid_only, min_gap_px, min_strip_px)
print(f"\n[BASELINE vivid-only ] strips={len(s0)}")
for i, r in enumerate(s0):
print(f" [{i}] x={r.x:>4d} w={r.w:>4d}")
cv2.imwrite(str(out_dir / "diag_fix_BASELINE_mask_closed.png"), (c0 * 255).astype(np.uint8))
_annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in s0],
"BASELINE", out_dir)
# Fix A: include gray
fixA_palette = vivid_only + ("gray",)
sA, mA, cA = _detect_strips_with_palette(full_crop, palette, fixA_palette, min_gap_px, min_strip_px)
print(f"\n[FIX A vivid+gray ] strips={len(sA)}")
for i, r in enumerate(sA):
print(f" [{i}] x={r.x:>4d} w={r.w:>4d}")
cv2.imwrite(str(out_dir / "diag_fix_A_mask_closed.png"), (cA * 255).astype(np.uint8))
_annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in sA],
"A_vivid_plus_gray", out_dir)
# Fix B: any non-background
sB, mB, cB = _detect_strips_non_bg(full_crop, bg_rgb, bg_tol, min_gap_px, min_strip_px)
print(f"\n[FIX B non-bg ] strips={len(sB)}")
for i, r in enumerate(sB):
print(f" [{i}] x={r.x:>4d} w={r.w:>4d}")
cv2.imwrite(str(out_dir / "diag_fix_B_mask_closed.png"), (cB * 255).astype(np.uint8))
_annotate(frame, [ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y, w=r.w, h=r.h) for r in sB],
"B_non_background", out_dir)
# Sanity: where is the divider between the two TS windows?
# Project non-bg mask onto x; long zero-runs reveal the gap.
bgr_bg = np.array([bg_rgb[2], bg_rgb[1], bg_rgb[0]], dtype=np.float32)
diff = np.linalg.norm(full_crop.astype(np.float32) - bgr_bg, axis=2)
nonbg = (diff > bg_tol).astype(np.uint8)
col_any = nonbg.any(axis=0).astype(np.uint8)
# Find longest 0-run
longest = (0, 0, 0) # (length, x0, x1)
i = 0
while i < w:
if col_any[i] == 1:
i += 1
continue
j = i
while j < w and col_any[j] == 0:
j += 1
run = j - i
if run > longest[0]:
longest = (run, i, j - 1)
i = j
print(f"\nLongest empty (background-only) horizontal stretch in dot_roi: "
f"{longest[0]}px at x={longest[1]}..{longest[2]} "
f"(this is where the window divider sits)")
print(f"\nWrote diag_fix_*.png to {out_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,124 @@
r"""Inspect ATM strip pixels without relying on shell pipelines.
Usage:
.\.venv\Scripts\python.exe scripts\inspect_image_pixels.py 6033117943853423831.jpg
.\.venv\Scripts\python.exe scripts\inspect_image_pixels.py image.jpg --point 1780 725
The script intentionally parses only the config fields needed for pixel inspection,
so it does not require Discord/Telegram secrets to be valid.
"""
from __future__ import annotations
import argparse
import json
import sys
import tomllib
from pathlib import Path
import cv2
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from atm.config import ROI # noqa: E402
from atm.vision import classify_pixel, crop_roi, find_top_dots, pixel_rgb # noqa: E402
def _load_probe_config(path: Path) -> dict:
data = tomllib.loads(path.read_text(encoding="utf-8"))
colors = {
name: (tuple(int(c) for c in spec["rgb"]), float(spec["tolerance"]))
for name, spec in data["colors"].items()
}
background = colors.pop("background", ((18, 18, 18), 15.0))
return {
"dot_roi": ROI(**data["dot_roi"]),
"colors": colors,
"background_rgb": background[0],
"background_tol": background[1],
}
def _as_jsonable_match(match) -> dict:
return {
"name": match.name,
"distance": round(float(match.distance), 3),
"confidence": round(float(match.confidence), 3),
}
def main() -> int:
parser = argparse.ArgumentParser(description="Inspect ATM strip pixels in a JPG/PNG frame.")
parser.add_argument("image", type=Path, help="Frame image path.")
parser.add_argument(
"--config",
type=Path,
default=ROOT / "configs" / "2026-04-21-recalib.toml",
help="ATM TOML config path.",
)
parser.add_argument("--top", type=int, default=3, help="Number of rightmost dots to report.")
parser.add_argument(
"--point",
nargs=2,
type=int,
metavar=("X", "Y"),
help="Optional absolute pixel coordinate to sample.",
)
parser.add_argument("--box", type=int, default=3, help="Sampling radius for mean RGB.")
args = parser.parse_args()
frame = cv2.imread(str(args.image), cv2.IMREAD_COLOR)
if frame is None:
raise SystemExit(f"Could not read image: {args.image}")
probe = _load_probe_config(args.config)
roi = probe["dot_roi"]
roi_img = crop_roi(frame, roi)
dots = find_top_dots(
roi_img,
bg_rgb=probe["background_rgb"],
bg_tol=probe["background_tol"],
n=args.top,
)
result = {
"image": str(args.image),
"image_size": {"w": int(frame.shape[1]), "h": int(frame.shape[0])},
"config": str(args.config),
"dot_roi": {"x": roi.x, "y": roi.y, "w": roi.w, "h": roi.h},
"dots": [],
}
for x, y in dots:
rgb = pixel_rgb(roi_img, x, y, box=args.box)
match = classify_pixel(rgb, probe["colors"])
result["dots"].append(
{
"roi_xy": [int(x), int(y)],
"abs_xy": [int(roi.x + x), int(roi.y + y)],
"rgb": list(rgb),
"match": _as_jsonable_match(match),
}
)
if args.point:
px, py = args.point
if not (roi.x <= px < roi.x + roi.w and roi.y <= py < roi.y + roi.h):
raise SystemExit(f"Point {px},{py} is outside dot_roi")
rx, ry = px - roi.x, py - roi.y
rgb = pixel_rgb(roi_img, rx, ry, box=args.box)
result["point"] = {
"roi_xy": [rx, ry],
"abs_xy": [px, py],
"rgb": list(rgb),
"match": _as_jsonable_match(classify_pixel(rgb, probe["colors"])),
}
print(json.dumps(result, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

173
scripts/repro_ss_resume.py Normal file
View File

@@ -0,0 +1,173 @@
r"""Reproduce /ss and /resume Telegram screenshot pipelines for offline inspection.
Brings the `m2d` window to front (Win32 trick — same as live loop), captures via mss
using the active config's chart_window_region, then runs the EXACT same annotators
that /ss and /resume use:
- _save_inspect_frame → /ss path (top-3 dots per detected strip + caption)
- _save_annotated_frame → /resume path (cyan rect on dot_roi/sub_roi)
Outputs are saved under logs/repro/ alongside a JSON summary of detections.
Usage:
.\.venv\Scripts\python.exe scripts\repro_ss_resume.py
.\.venv\Scripts\python.exe scripts\repro_ss_resume.py --no-focus
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import cv2 # noqa: E402
import numpy as np # noqa: E402
from atm.config import Config # noqa: E402
from atm.layout import detect_strips # noqa: E402
from atm.main import ( # noqa: E402
_focus_window_by_title,
_format_inspect_caption,
_save_annotated_frame,
_save_inspect_frame,
)
from atm.vision import crop_roi # noqa: E402
def _capture_via_region(cfg) -> np.ndarray | None:
import mss # type: ignore[import-untyped]
reg = cfg.chart_window_region
if reg is None:
# Fallback: grab full primary monitor
with mss.mss() as sct:
mon = sct.monitors[1]
img = sct.grab(mon)
return cv2.cvtColor(np.array(img), cv2.COLOR_BGRA2BGR)
with mss.mss() as sct:
mon = {"top": reg.y, "left": reg.x, "width": reg.w, "height": reg.h}
img = sct.grab(mon)
return cv2.cvtColor(np.array(img), cv2.COLOR_BGRA2BGR)
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--no-focus", action="store_true", help="Skip Win32 focus call.")
p.add_argument("--delay", type=float, default=0.5, help="Seconds to sleep after focus.")
p.add_argument(
"--out",
type=Path,
default=ROOT / "logs" / "repro",
help="Output directory for annotated frames + JSON.",
)
args = p.parse_args()
cfg = Config.load_current(ROOT / "configs")
args.out.mkdir(parents=True, exist_ok=True)
# 1) Focus
if not args.no_focus and cfg.window_title:
title = _focus_window_by_title(cfg.window_title)
print(f"[focus] needle={cfg.window_title!r} -> {title!r}")
if args.delay > 0:
time.sleep(args.delay)
# 2) Capture
frame = _capture_via_region(cfg)
if frame is None:
print("[capture] FAILED — no frame")
return 1
print(f"[capture] frame shape={frame.shape}")
now = time.time()
ts_str = time.strftime("%Y%m%d_%H%M%S")
# Save raw too so we can hand-inspect what mss actually grabbed
raw_path = args.out / f"{ts_str}_raw.png"
cv2.imwrite(str(raw_path), frame)
print(f"[raw] {raw_path}")
# 3) Detect strips (same logic as live multi-chart split)
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
palette = {
name: (spec.rgb, spec.tolerance)
for name, spec in cfg.colors.items()
if name != "background"
}
full_dot_crop = crop_roi(frame, cfg.dot_roi)
raw_strips = detect_strips(full_dot_crop, palette, min_gap_px, min_strip_px)
# Translate back to absolute frame coords
from atm.config import ROI # noqa: E402
strips = [
ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y + r.y, w=r.w, h=r.h)
for r in raw_strips
]
print(f"[strips] dot_roi={cfg.dot_roi} detected={len(strips)} strips")
for i, s in enumerate(strips):
print(f" strip[{i}] x={s.x} y={s.y} w={s.w} h={s.h}")
# Also dump a copy of the full dot_roi crop for visual sanity-check
crop_path = args.out / f"{ts_str}_dot_roi_crop.png"
cv2.imwrite(str(crop_path), full_dot_crop)
print(f"[crop] {crop_path}")
# 4) /ss inspect-annotate (top-3 per strip, FSM-pick markers)
inspect_path, detections = _save_inspect_frame(
frame, cfg, args.out, now, audit=None,
strips=strips if strips else None,
)
caption = _format_inspect_caption(detections, cfg)
print(f"[ss] {inspect_path}")
print(f"[ss] caption:\n{caption}")
# 5) /resume annotate (cyan rect on dot_roi or first strip)
roi_for_resume = strips[0] if strips else cfg.dot_roi
resume_path = _save_annotated_frame(
frame, cfg, args.out, "resume_repro", now, audit=None, roi=roi_for_resume,
)
print(f"[resume] {resume_path}")
# 6) JSON summary
summary = {
"ts": ts_str,
"frame_shape": list(frame.shape),
"dot_roi": {"x": cfg.dot_roi.x, "y": cfg.dot_roi.y, "w": cfg.dot_roi.w, "h": cfg.dot_roi.h},
"strips": [
{"x": s.x, "y": s.y, "w": s.w, "h": s.h} for s in strips
],
"detections": [
{
"strip_idx": d["strip_idx"],
"idx": d["idx"],
"name": d["name"],
"rgb": list(d["rgb"]),
"distance": round(float(d["distance"]), 3),
"confidence": round(float(d["confidence"]), 3),
"pos_abs": list(d["pos_abs"]),
}
for d in detections
],
"files": {
"raw": str(raw_path),
"dot_roi_crop": str(crop_path),
"inspect": str(inspect_path) if inspect_path else None,
"resume": str(resume_path) if resume_path else None,
},
"ss_caption": caption,
}
json_path = args.out / f"{ts_str}_summary.json"
json_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
print(f"[json] {json_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -7,7 +7,7 @@ from typing import Callable
import numpy as np import numpy as np
from .config import Config from .config import Config, ROI
from .vision import ( from .vision import (
ColorMatch, ColorMatch,
classify_pixel, classify_pixel,
@@ -40,9 +40,11 @@ class Detector:
capture: ScreenCapture, capture: ScreenCapture,
bg_rgb: tuple[int, int, int] | None = None, bg_rgb: tuple[int, int, int] | None = None,
bg_tol: float | None = None, bg_tol: float | None = None,
dot_roi_override: ROI | None = None,
) -> None: ) -> None:
self._cfg = cfg self._cfg = cfg
self._capture = capture self._capture = capture
self._dot_roi = dot_roi_override if dot_roi_override is not None else cfg.dot_roi
# Prefer config-defined background; fall back to dark-grey default. # Prefer config-defined background; fall back to dark-grey default.
if "background" in cfg.colors: if "background" in cfg.colors:
spec = cfg.colors["background"] spec = cfg.colors["background"]
@@ -84,7 +86,7 @@ class Detector:
self._rolling.append(r) self._rolling.append(r)
return r return r
roi_img = crop_roi(frame, self._cfg.dot_roi) roi_img = crop_roi(frame, self._dot_roi)
dot_pos = find_rightmost_dot(roi_img, self._bg_rgb, self._bg_tol) dot_pos = find_rightmost_dot(roi_img, self._bg_rgb, self._bg_tol)
if dot_pos is None: if dot_pos is None:
@@ -124,11 +126,14 @@ class Detector:
match=match, match=match,
accepted=accepted, accepted=accepted,
color=color, color=color,
dot_pos_abs=(self._cfg.dot_roi.x + x, self._cfg.dot_roi.y + y), dot_pos_abs=(self._dot_roi.x + x, self._dot_roi.y + y),
) )
self._rolling.append(r) self._rolling.append(r)
return r return r
def update_dot_roi(self, roi: ROI) -> None:
self._dot_roi = roi
@property @property
def rolling(self) -> list[DetectionResult]: def rolling(self) -> list[DetectionResult]:
return list(self._rolling) return list(self._rolling)

48
src/atm/layout.py Normal file
View File

@@ -0,0 +1,48 @@
import cv2
import numpy as np
from .config import ROI
VIVID_COLORS = ("turquoise", "yellow", "dark_green", "dark_red", "light_green", "light_red", "gray")
def detect_strips(
full_dot_crop: np.ndarray,
palette: dict[str, tuple[tuple[int, int, int], float]],
min_gap_px: int,
min_strip_px: int,
) -> list[ROI]:
"""Return list of sub-ROIs (relative to full_dot_crop) sorted left-to-right.
Empty list if no vivid pixels found."""
h, w = full_dot_crop.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
img_f = full_dot_crop.astype(np.float32)
for name in VIVID_COLORS:
if name not in palette:
continue
rgb, tol = palette[name]
bgr = np.array([rgb[2], rgb[1], rgb[0]], dtype=np.float32)
diff = np.linalg.norm(img_f - bgr, axis=2)
mask |= (diff < tol).astype(np.uint8)
kw = max(3, min_gap_px // 2)
kernel = np.ones((1, kw), dtype=np.uint8)
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
n_labels, _labels, stats, _centroids = cv2.connectedComponentsWithStats(closed, connectivity=8)
strips: list[tuple[int, int]] = []
for i in range(1, n_labels):
x = int(stats[i, cv2.CC_STAT_LEFT])
ww = int(stats[i, cv2.CC_STAT_WIDTH])
if ww < min_strip_px:
continue
strips.append((x, x + ww))
strips.sort()
return [ROI(x=xs, y=0, w=xe - xs, h=h) for (xs, xe) in strips]
def _strips_match(a: list[ROI], b: list[ROI], tol: int = 10) -> bool:
if len(a) != len(b):
return False
return all(
abs(ra.x - rb.x) <= tol and abs((ra.x + ra.w) - (rb.x + rb.w)) <= tol
for ra, rb in zip(a, b)
)

View File

@@ -7,7 +7,7 @@ import contextlib
import os import os
import sys import sys
import time import time
from dataclasses import dataclass from dataclasses import dataclass, field
from datetime import datetime, tzinfo from datetime import datetime, tzinfo
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
@@ -572,11 +572,10 @@ def _build_resume_info_text(was_drift: bool, skip_now: str | None) -> "tuple[str
return ("Monitorizare reluată", "") return ("Monitorizare reluată", "")
def _draw_roi_cyan(annotated, cfg) -> None: def _draw_roi_cyan(annotated, roi) -> None:
"""Draw the dot_roi cyan rectangle onto annotated. Shared by annotate helpers.""" """Draw the given ROI as a cyan rectangle onto annotated. Shared by annotate helpers."""
import cv2 # type: ignore[import-untyped] import cv2 # type: ignore[import-untyped]
x, y, w, h = cfg.dot_roi.x, cfg.dot_roi.y, cfg.dot_roi.w, cfg.dot_roi.h cv2.rectangle(annotated, (roi.x, roi.y), (roi.x + roi.w, roi.y + roi.h), (0, 255, 255), 2)
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
_BASELINE_PHASH_LINE = __import__("re").compile( _BASELINE_PHASH_LINE = __import__("re").compile(
@@ -644,13 +643,14 @@ def _save_inspect_frame(
fires_dir: Path, fires_dir: Path,
now: float, now: float,
audit: "_AuditLike | None" = None, audit: "_AuditLike | None" = None,
strips: "list | None" = None,
) -> "tuple[Path | None, list[dict]]": ) -> "tuple[Path | None, list[dict]]":
"""Annotate frame with top-3 rightmost dots + classifications. Returns (path, detections). """Annotate frame with top-3 rightmost dots + classifications. Returns (path, detections).
FSM pick (rightmost, idx 0) → thick red circle. Neighbors (idx 1, 2) → thin circle Per-strip detection when ``strips`` is provided (multi-chart split). Each strip
in the classified color (BGR derived from cfg.colors[name].rgb at runtime, UNKNOWN runs its own find_top_dots; circles are offset by the strip origin. Detections
→ gray). Labels `{name} d={distance}` next to each circle. Price overlay for carry a ``strip_idx`` so callers can distinguish which chart they came from.
rightmost dot (same as _save_annotated_frame). Fail-safe: any error → (None, []). Single-chart fallback uses ``[cfg.dot_roi]``.
""" """
try: try:
import cv2 # type: ignore[import-untyped] import cv2 # type: ignore[import-untyped]
@@ -666,58 +666,51 @@ def _save_inspect_frame(
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_inspect.png" fpath = fires_dir / f"{ts_str}_inspect.png"
annotated = frame.copy() annotated = frame.copy()
_draw_roi_cyan(annotated, cfg)
bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18) bg_rgb = cfg.colors["background"].rgb if "background" in cfg.colors else (18, 18, 18)
dot_crop = crop_roi(frame, cfg.dot_roi)
positions = find_top_dots(dot_crop, bg_rgb, n=3)
palette = {k: (v.rgb, v.tolerance) for k, v in cfg.colors.items() if k != "background"} palette = {k: (v.rgb, v.tolerance) for k, v in cfg.colors.items() if k != "background"}
strip_list = strips if strips else [cfg.dot_roi]
detections: list[dict] = [] detections: list[dict] = []
for idx, (cx, cy) in enumerate(positions): for strip_idx, strip in enumerate(strip_list):
rgb = pixel_rgb(dot_crop, cx, cy) _draw_roi_cyan(annotated, strip)
match = classify_pixel(rgb, palette) dot_crop = crop_roi(frame, strip)
pos_abs = (cfg.dot_roi.x + cx, cfg.dot_roi.y + cy) # Cer N+spare buline ca să avem rezerve după filtrarea UNKNOWN-urilor
detections.append({ # (text-artefacte / patch-uri întunecate adiacente bulinei rightmost).
"idx": idx, "name": match.name, "rgb": rgb, positions = find_top_dots(dot_crop, bg_rgb, n=8)
"distance": match.distance, "confidence": match.confidence, kept = 0
"pos_abs": pos_abs, for cx, cy in positions:
}) if kept >= 3:
break
rgb = pixel_rgb(dot_crop, cx, cy)
match = classify_pixel(rgb, palette)
if match.name == "UNKNOWN":
continue
pos_abs = (strip.x + cx, strip.y + cy)
detections.append({
"idx": kept, "name": match.name, "rgb": rgb,
"distance": match.distance, "confidence": match.confidence,
"pos_abs": pos_abs, "strip_idx": strip_idx,
})
kept += 1
# Markerii stau pe un rând SUB ROI, aliniat vertical cu bulina prin x. # Markerii stau pe un rând SUB ROI, aliniat vertical cu bulina prin x.
# Toate cercurile au aceeași rază (r=7), pline, în culoarea clasificată. # Primary (FSM pick, rightmost) = tick vertical roșu peste markeri.
# Primary (FSM pick, rightmost) = tick vertical roșu care leagă markerul de bulina. marker_y = strip.y + strip.h + 8
marker_y = cfg.dot_roi.y + cfg.dot_roi.h + 8 strip_dets = [d for d in detections if d["strip_idx"] == strip_idx]
for det in reversed(detections): # neighbors first, primary last on top for det in reversed(strip_dets):
pos_abs = det["pos_abs"] pos_abs = det["pos_abs"]
name = det["name"] name = det["name"]
if name in cfg.colors: if name in cfg.colors:
rgb_pal = cfg.colors[name].rgb rgb_pal = cfg.colors[name].rgb
bgr = (int(rgb_pal[2]), int(rgb_pal[1]), int(rgb_pal[0])) bgr = (int(rgb_pal[2]), int(rgb_pal[1]), int(rgb_pal[0]))
else: else:
bgr = (128, 128, 128) bgr = (128, 128, 128)
marker_pos = (pos_abs[0], marker_y) marker_pos = (pos_abs[0], marker_y)
cv2.circle(annotated, marker_pos, 7, bgr, -1) cv2.circle(annotated, marker_pos, 7, bgr, -1)
if det["idx"] == 0: if det["idx"] == 0:
cv2.line(annotated, (pos_abs[0], marker_y - 7), (pos_abs[0], pos_abs[1] + 4), (0, 0, 255), 1) cv2.line(annotated, (pos_abs[0], marker_y - 7), (pos_abs[0], pos_abs[1] + 4), (0, 0, 255), 1)
# Price overlay on rightmost (reuse same formula as _save_annotated_frame)
if detections and hasattr(cfg, "y_axis"):
try:
dot_y = detections[0]["pos_abs"][1]
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass
cv2.imwrite(str(fpath), annotated) cv2.imwrite(str(fpath), annotated)
return fpath, detections return fpath, detections
@@ -743,22 +736,52 @@ _COLOR_EMOJI = {
def _format_inspect_caption(detections: list[dict], cfg) -> str: def _format_inspect_caption(detections: list[dict], cfg) -> str:
"""Compact caption: leftmost = c1, rightmost = cN (FSM pick). """Compact caption: leftmost = c1, rightmost = cN (FSM pick).
Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption — Single-strip: c1..cN flat list. Multi-strip: per-strip blocks cu header
emoji-ul e cel mai aproape de „textul în aceeași culoare ca bulina"). ("stânga"/"dreapta" pt 2 chart-uri, "chart N" pt 3+) și pick separat per strip.
Emoji prefix = culoarea detectată (Telegram nu suportă text colorat în caption).
""" """
if not detections: if not detections:
return "nicio bulină detectată în ROI" return "nicio bulină detectată în ROI"
# detections ordered idx=0 (rightmost) → idx=N-1 (leftmost)
# Display left-to-right so user reads the chart order natural. # Group by strip_idx. Old detections (pre multi-chart) lack the field —
ordered = list(reversed(detections)) # treat as single anonymous group.
total = len(ordered) groups: dict[int, list[dict]] = {}
lines: list[str] = [] for det in detections:
for display_idx, det in enumerate(ordered, start=1): groups.setdefault(det.get("strip_idx", 0), []).append(det)
n_strips = len(groups)
def _line(display_idx: int, det: dict, is_pick: bool) -> str:
name = det["name"] name = det["name"]
emoji = _COLOR_EMOJI.get(name, "") emoji = _COLOR_EMOJI.get(name, "")
suffix = " ← pick" if display_idx == total else "" suffix = " ← pick" if is_pick else ""
lines.append(f"{emoji} c{display_idx}: {name}{suffix}") return f"{emoji} c{display_idx}: {name}{suffix}"
return "\n".join(lines)
def _strip_header(strip_idx: int) -> str:
if n_strips == 2:
return "stânga" if strip_idx == 0 else "dreapta"
return f"chart {strip_idx + 1}"
if n_strips <= 1:
# Single-strip path preserved verbatim — leftmost=c1, rightmost=cN=pick.
only = next(iter(groups.values()))
ordered = list(reversed(only))
total = len(ordered)
return "\n".join(
_line(i + 1, d, is_pick=(i + 1 == total))
for i, d in enumerate(ordered)
)
# Multi-strip: one block per strip (sorted by strip_idx ascending).
blocks: list[str] = []
for sidx in sorted(groups):
dets = groups[sidx]
ordered = list(reversed(dets)) # leftmost first
total = len(ordered)
lines = [f"[{_strip_header(sidx)}]"]
for i, det in enumerate(ordered):
lines.append(_line(i + 1, det, is_pick=(i + 1 == total)))
blocks.append("\n".join(lines))
return "\n".join(blocks)
def _save_annotated_frame( def _save_annotated_frame(
@@ -770,20 +793,19 @@ def _save_annotated_frame(
audit: _AuditLike | None = None, audit: _AuditLike | None = None,
dot_pos_abs: "tuple[int, int] | None" = None, dot_pos_abs: "tuple[int, int] | None" = None,
canary_ok: bool = True, canary_ok: bool = True,
roi: "Any | None" = None,
) -> "Path | None": ) -> "Path | None":
"""Save BGR frame with cyan dot_roi rect to ``logs/fires/{ts}_{label}.png``. """Save BGR frame with cyan ROI rect to ``logs/fires/{ts}_{label}.png``.
Returns the path on success, ``None`` on any error. Failures are logged to ``roi`` overrides ``cfg.dot_roi`` for the rectangle (used by multi-chart split
audit (when provided) so disk-full / permission issues don't become silent so per-strip alerts highlight the correct chart). Falls back to cfg.dot_roi
regressions. Never raises — snapshot is a best-effort enhancement, the when None.
text alert must still go out.
dot_pos_abs + canary_ok: when both are set the price overlay is drawn Returns the path on success, ``None`` on any error. Never raises — snapshot
(y-axis linear interpolation via cfg.y_axis). Skipped when canary drifted is best-effort, the text alert must still go out.
since calibration may be stale.
""" """
try: try:
import cv2 # type: ignore[import-untyped] import cv2 # type: ignore[import-untyped] # noqa: F401
except ImportError as exc: except ImportError as exc:
if audit is not None: if audit is not None:
try: try:
@@ -792,26 +814,12 @@ def _save_annotated_frame(
pass pass
return None return None
try: try:
import cv2 # type: ignore[import-untyped]
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_{label}.png" fpath = fires_dir / f"{ts_str}_{label}.png"
annotated = frame.copy() annotated = frame.copy()
_draw_roi_cyan(annotated, cfg) roi_to_draw = roi if roi is not None else cfg.dot_roi
if dot_pos_abs is not None and canary_ok and hasattr(cfg, "y_axis"): _draw_roi_cyan(annotated, roi_to_draw)
try:
_, dot_y = dot_pos_abs
ya = cfg.y_axis
slope = (ya.p2_price - ya.p1_price) / (ya.p2_y - ya.p1_y)
price = ya.p1_price + (dot_y - ya.p1_y) * slope
w_frame = annotated.shape[1]
text = f"${price:.2f}"
font = cv2.FONT_HERSHEY_SIMPLEX
scale, thickness = 1.2, 3
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
tx, ty = w_frame - tw - 10, th + 10
cv2.rectangle(annotated, (tx - 4, 4), (tx + tw + 4, ty + 4), (0, 0, 0), -1)
cv2.putText(annotated, text, (tx, ty), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
except Exception:
pass # price overlay is best-effort; never break the screenshot
cv2.imwrite(str(fpath), annotated) cv2.imwrite(str(fpath), annotated)
return fpath return fpath
except Exception as exc: except Exception as exc:
@@ -829,15 +837,27 @@ def _build_heartbeat_alert(
fire_count: int, fire_count: int,
uptime_h: float, uptime_h: float,
canary_paused: bool, canary_paused: bool,
charts: "list | None" = None,
) -> "Alert": ) -> "Alert":
"""Construct the periodic heartbeat Alert. """Construct the periodic heartbeat Alert.
When canary is drift-paused the title/body reflect it explicitly — Single-chart compat: when ``charts`` is None or has length <=1, body is
2026-04-21 bug: previously the heartbeat said "activ ARMED_SELL" while ``"{state} | semnale: N | Hh"`` (drift-pause appended). Multi-chart split:
detection had been dead for hours, misleading the user into thinking one line per chart with ``[chart_id] STATE`` plus a tail signals/uptime line.
the system was running.
""" """
title = "⚠️ pauzat (drift)" if canary_paused else "activ" title = "⚠️ pauzat (drift)" if canary_paused else "activ"
if charts and len(charts) >= 2:
from atm.notifier import _alert_prefix as _prefix
chart_lines = []
for c in charts:
label = _prefix(c.chart_id).strip()
state_value = getattr(getattr(c.fsm, "state", None), "value", "")
chart_lines.append(f"{label} {state_value}".strip())
tail = f"| semnale: {fire_count} | {uptime_h:.1f}h"
if canary_paused:
tail = f"| semnale: {fire_count} | {uptime_h:.1f}h [drift-pause]"
body = "\n".join(chart_lines) + "\n" + tail
return Alert(kind="heartbeat", title=title, body=body)
state_label = f"{fsm_state} [drift-pause]" if canary_paused else fsm_state state_label = f"{fsm_state} [drift-pause]" if canary_paused else fsm_state
body = f"{state_label} | semnale: {fire_count} | {uptime_h:.1f}h" body = f"{state_label} | semnale: {fire_count} | {uptime_h:.1f}h"
return Alert(kind="heartbeat", title=title, body=body) return Alert(kind="heartbeat", title=title, body=body)
@@ -1048,6 +1068,23 @@ class _TickSyncResult:
first_consumed: bool = False first_consumed: bool = False
late_start: bool = False late_start: bool = False
new_color: str | None = None # corpus sample color when changed new_color: str | None = None # corpus sample color when changed
chart_id: str = "" # multi-chart split: which chart produced this result
@dataclass
class ChartState:
"""Per-chart split-workspace state (FSM + Detector + sub-ROI).
chart_id="" → single-chart mode (no Alert prefix). "left"/"right" for n=2.
"chart_0", "chart_1", ... for n>2. Each chart has its own debounce + FSM
independent of siblings.
"""
chart_id: str
sub_roi: Any
detector: Any
fsm: Any
first_accepted: bool = True
last_saved_color: "str | None" = None
@dataclass @dataclass
@@ -1076,6 +1113,7 @@ class RunContext:
# Pending /rebase proposal: (proposed_ts, new_phash, config_path) or None. # Pending /rebase proposal: (proposed_ts, new_phash, config_path) or None.
# One slot; a second /rebase overwrites. `/rebase confirm` applies if within TTL. # One slot; a second /rebase overwrites. `/rebase confirm` applies if within TTL.
pending_rebase: Any = None pending_rebase: Any = None
charts: list = field(default_factory=list) # list[ChartState] — multi-chart split
@dataclass @dataclass
@@ -1086,6 +1124,7 @@ class _LoopState:
levels_extractor: Any = None levels_extractor: Any = None
fire_count: int = 0 fire_count: int = 0
start: float = 0.0 start: float = 0.0
n_primed_global: int = 0
@dataclass @dataclass
@@ -1210,21 +1249,32 @@ def _sync_detection_tick(
last_saved_color: "str | None", last_saved_color: "str | None",
now: float, now: float,
samples_dir: Path, samples_dir: Path,
chart_id: str = "",
sub_roi: Any = None,
frame: Any = None,
) -> _TickSyncResult: ) -> _TickSyncResult:
"""One full detection tick (blocking I/O). Runs in asyncio.to_thread.""" """One full detection tick (blocking I/O). Runs in asyncio.to_thread.
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
return _TickSyncResult()
cr = canary.check(frame) When ``frame`` is provided, capture/canary work is skipped (caller already
if canary.is_paused: did them — multi-chart orchestrator). ``chart_id`` is propagated to all
audit.log({"ts": now, "event": "paused", "drift": cr.distance}) Alerts emitted by this tick. ``sub_roi`` overrides cfg.dot_roi for the
return _TickSyncResult(frame=frame) cyan rect on saved screenshots (per-chart ROI).
"""
if frame is None:
frame = capture()
if frame is None:
audit.log({"ts": now, "event": "window_lost"})
return _TickSyncResult(chart_id=chart_id)
cr = canary.check(frame)
if canary.is_paused:
audit.log({"ts": now, "event": "paused", "drift": cr.distance})
return _TickSyncResult(frame=frame, chart_id=chart_id)
res = detector.step(now, frame) res = detector.step(now, frame)
detection_log.log({ detection_log.log({
"ts": now, "event": "frame", "ts": now, "event": "frame",
"chart_id": chart_id,
"window_found": res.window_found, "window_found": res.window_found,
"dot_found": res.dot_found, "dot_found": res.dot_found,
"rgb": list(res.rgb) if res.rgb is not None else None, "rgb": list(res.rgb) if res.rgb is not None else None,
@@ -1236,9 +1286,10 @@ def _sync_detection_tick(
}) })
if not (res.accepted and res.color): if not (res.accepted and res.color):
return _TickSyncResult(frame=frame, res=res) return _TickSyncResult(frame=frame, res=res, chart_id=chart_id)
is_first = first_accepted is_first = first_accepted
roi_for_draw = sub_roi if sub_roi is not None else cfg.dot_roi
def _snapshot(kind: str, label: str) -> "Path | None": def _snapshot(kind: str, label: str) -> "Path | None":
if not getattr(cfg.attach_screenshots, kind, True): if not getattr(cfg.attach_screenshots, kind, True):
@@ -1246,18 +1297,29 @@ def _sync_detection_tick(
return _save_annotated_frame( return _save_annotated_frame(
frame, cfg, fires_dir, label, now, audit=audit, frame, cfg, fires_dir, label, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None), dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True, canary_ok=True, roi=roi_for_draw,
) )
tr = _handle_tick(fsm, res.color, now, notifier, audit, is_first, snapshot=_snapshot, cfg=cfg) # Wrap notifier so every Alert from _handle_tick carries chart_id.
if chart_id:
class _ChartNotifier:
def send(self, alert: Alert) -> None:
alert.chart_id = chart_id
notifier.send(alert)
wrapped_notifier: _NotifierLike = _ChartNotifier()
else:
wrapped_notifier = notifier
tr = _handle_tick(fsm, res.color, now, wrapped_notifier, audit, is_first, snapshot=_snapshot, cfg=cfg)
if tr is None: if tr is None:
return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True) return _TickSyncResult(frame=frame, res=res, first_consumed=is_first, late_start=True, chart_id=chart_id)
new_color: str | None = None new_color: str | None = None
if res.color != last_saved_color: if res.color != last_saved_color:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
sample_path = samples_dir / f"{ts_str}_{res.color}.png" suffix = f"_{chart_id}" if chart_id else ""
sample_path = samples_dir / f"{ts_str}_{res.color}{suffix}.png"
try: try:
import cv2 # type: ignore[import-untyped] import cv2 # type: ignore[import-untyped]
cv2.imwrite(str(sample_path), frame) cv2.imwrite(str(sample_path), frame)
@@ -1271,7 +1333,7 @@ def _sync_detection_tick(
fire_path = _save_annotated_frame( fire_path = _save_annotated_frame(
frame, cfg, fires_dir, tr.trigger, now, audit=audit, frame, cfg, fires_dir, tr.trigger, now, audit=audit,
dot_pos_abs=getattr(res, "dot_pos_abs", None), dot_pos_abs=getattr(res, "dot_pos_abs", None),
canary_ok=True, canary_ok=True, roi=roi_for_draw,
) )
notifier.send(Alert( notifier.send(Alert(
kind="trigger", kind="trigger",
@@ -1279,17 +1341,19 @@ def _sync_detection_tick(
body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}", body=f"@ {datetime.fromtimestamp(now).isoformat(timespec='seconds')}",
image_path=fire_path, image_path=fire_path,
direction=tr.trigger, direction=tr.trigger,
chart_id=chart_id,
)) ))
return _TickSyncResult( return _TickSyncResult(
frame=frame, res=res, tr=tr, frame=frame, res=res, tr=tr,
first_consumed=is_first, new_color=new_color, first_consumed=is_first, new_color=new_color, chart_id=chart_id,
) )
def _brief_status(ctx) -> str: def _brief_status(ctx) -> str:
h = (time.monotonic() - ctx.state.start) / 3600 h = (time.monotonic() - ctx.state.start) / 3600
return f"{ctx.fsm.state.value} | semnale: {ctx.state.fire_count} | {h:.1f}h" fsm_state = ctx.charts[0].fsm.state.value if ctx.charts else ctx.fsm.state.value
return f"{fsm_state} | semnale: {ctx.state.fire_count} | {h:.1f}h"
async def _run_tick(ctx: RunContext) -> _TickSyncResult: async def _run_tick(ctx: RunContext) -> _TickSyncResult:
@@ -1323,48 +1387,264 @@ async def _run_tick(ctx: RunContext) -> _TickSyncResult:
) )
async def _handle_fsm_result(ctx: RunContext, result: _TickSyncResult) -> None: def _chart_id_for_index(i: int, n: int) -> str:
"""Scheduler start/stop + levels extraction. No-op if res is None/late_start.""" """Map (idx, total) → chart_id. n=1 → "" ; n=2 → "left"/"right" ; n>2 → "chart_N"."""
if result.first_consumed: if n == 1:
ctx.state.first_accepted = False return ""
if result.new_color is not None: if n == 2:
ctx.state.last_saved_color = result.new_color return ("left", "right")[i]
return f"chart_{i}"
tr = result.tr
res = result.res
if result.late_start or res is None: def _commit_layout_change(ctx: RunContext, new_strips: list, now: float) -> None:
"""Replace ctx.charts with fresh ChartState objects for new_strips.
Resets per-chart FSM (StateMachine(lockout_s)) and Detector (with
dot_roi_override=strip), zeroes n_primed_global, stops scheduler if running,
audits the layout_change event, sends a status Alert.
"""
from atm.detector import Detector
from atm.state_machine import StateMachine
cfg = ctx.cfg
old_n = len(ctx.charts)
new_n = len(new_strips)
new_charts: list = []
for i, strip in enumerate(new_strips):
chart_id = _chart_id_for_index(i, new_n)
det = Detector(cfg, ctx.capture, dot_roi_override=strip)
fsm = StateMachine(lockout_s=cfg.lockout_s)
new_charts.append(ChartState(
chart_id=chart_id, sub_roi=strip, detector=det, fsm=fsm,
first_accepted=True, last_saved_color=None,
))
ctx.state.n_primed_global = 0
if getattr(ctx.scheduler, "is_running", False):
ctx.scheduler.stop()
ctx.audit.log({"ts": now, "event": "scheduler_stopped", "reason": "layout_change"})
ctx.audit.log({
"ts": now,
"event": "layout_change",
"old_n": old_n,
"new_n": new_n,
"strips": [{"x": s.x, "w": s.w} for s in new_strips],
})
# Suppress notification on first-ever detection (old_n=0 = startup bootstrap,
# not a real layout change).
if old_n > 0:
ctx.notifier.send(Alert(
kind="status",
title=f"🔄 Layout TS schimbat: {old_n}{new_n} ferestre. FSM resetat.",
body="",
silent=True,
))
ctx.charts = new_charts
# Keep ctx.fsm / ctx.detector in sync with charts[0] so legacy code paths
# (brief_status, /ss state_hdr, /status) continue to work after layout change.
if new_charts:
ctx.fsm = new_charts[0].fsm
ctx.detector = new_charts[0].detector
def _detect_strips_for_ctx(ctx: RunContext, frame: Any) -> list:
"""Lazy strip detection for ctx.cfg + frame. Empty list when no vivid pixels."""
from atm.layout import detect_strips
from atm.vision import crop_roi
cfg = ctx.cfg
strip_h = cfg.dot_roi.h
min_strip_px = max(150, strip_h * 8)
min_gap_px = max(20, int(strip_h * 0.8))
palette = {
name: (spec.rgb, spec.tolerance)
for name, spec in cfg.colors.items()
if name != "background"
}
full_crop = crop_roi(frame, cfg.dot_roi)
raw = detect_strips(full_crop, palette, min_gap_px, min_strip_px)
# detect_strips returns ROIs relative to the cropped region. Translate
# back into frame coordinates by adding cfg.dot_roi origin so per-strip
# detector crops can use the absolute ROI.
from atm.config import ROI
return [
ROI(x=cfg.dot_roi.x + r.x, y=cfg.dot_roi.y + r.y, w=r.w, h=r.h)
for r in raw
]
async def _run_multi_tick(ctx: RunContext) -> "list[_TickSyncResult]":
"""Multi-chart orchestrator: capture once, detect strips, run per-chart ticks.
Returns a list of _TickSyncResult — one per chart (empty list if capture
failed or canary paused). Layout management is in-process: when strips
don't match current ctx.charts, _commit_layout_change is invoked before
feeding ticks to the new charts.
"""
now = time.time()
if ctx.lifecycle is not None:
skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary)
sb = _brief_status(ctx)
transition = _maybe_log_transition(
skip, ctx.lifecycle, now, ctx.audit, ctx.notifier, status_body=sb,
)
if transition == "market_open" and ctx.cfg.window_title:
title = await asyncio.to_thread(_focus_window_by_title, ctx.cfg.window_title)
ctx.audit.log({"ts": now, "event": "window_focused", "command": "market_open", "title": title})
await asyncio.sleep(0.15)
if skip is not None:
return []
frame = await asyncio.to_thread(ctx.capture)
if frame is None:
ctx.audit.log({"ts": now, "event": "window_lost"})
return []
cr = ctx.canary.check(frame)
if ctx.canary.is_paused:
ctx.audit.log({"ts": now, "event": "paused", "drift": cr.distance})
return []
new_strips = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame)
current_strips = [c.sub_roi for c in ctx.charts]
if new_strips:
if len(new_strips) != len(current_strips):
# Real count change (1↔2 ferestre) → rebuild charts.
_commit_layout_change(ctx, new_strips, now)
else:
# Same count → silent sub_roi update regardless of position/width jitter.
# Width pulsations (e.g. 792↔810px from antialiased edges) must not
# reset FSM or emit alerts — that path produced 500+ false alerts/day.
for c, ns in zip(ctx.charts, new_strips):
c.sub_roi = ns
if hasattr(c.detector, "update_dot_roi"):
c.detector.update_dot_roi(ns)
results: list[_TickSyncResult] = []
for c in ctx.charts:
result = await asyncio.to_thread(
_sync_detection_tick,
ctx.capture, ctx.canary, ctx.cfg, c.detector, c.fsm,
ctx.notifier, ctx.audit, ctx.detection_log,
ctx.fires_dir, c.first_accepted, c.last_saved_color,
now, ctx.samples_dir,
c.chart_id, c.sub_roi, frame,
)
results.append(result)
return results
async def _handle_fsm_result(ctx: RunContext, result: "Any") -> None:
"""Scheduler start/stop + levels extraction.
Accepts either a single ``_TickSyncResult`` (legacy single-chart path) or a
``list[_TickSyncResult]`` (multi-chart split). For multi-chart, scheduler
arbitration uses ``ctx.state.n_primed_global`` (start when 0→1, stop when
1→0). Levels extraction is disabled when ``len(ctx.charts) >= 2`` since the
extractor reads the full window y-axis which is per-chart-private in split.
"""
results: list[_TickSyncResult]
if isinstance(result, list):
results = result
else:
results = [result]
if not results:
return return
if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None): multi_chart = len(ctx.charts) >= 2
if tr.reason == "prime" and not ctx.scheduler.is_running:
ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr is not None and tr.trigger and not tr.locked: for r in results:
ctx.state.fire_count += 1 if r.first_consumed:
if ctx.scheduler.is_running: if multi_chart:
ctx.scheduler.stop() for c in ctx.charts:
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"}) if c.chart_id == r.chart_id:
ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time()) c.first_accepted = False
break
else:
ctx.state.first_accepted = False
if ctx.charts:
ctx.charts[0].first_accepted = False
if r.new_color is not None:
if multi_chart:
for c in ctx.charts:
if c.chart_id == r.chart_id:
c.last_saved_color = r.new_color
break
else:
ctx.state.last_saved_color = r.new_color
if ctx.charts:
ctx.charts[0].last_saved_color = r.new_color
if ctx.state.levels_extractor is not None and result.frame is not None: if not multi_chart:
lr = ctx.state.levels_extractor.step(result.frame, time.time()) # Legacy single-chart path — preserve scheduler+levels semantics exactly.
if lr.status in ("complete", "timeout"): single = results[0]
if lr.status == "complete" and lr.levels: tr = single.tr
ctx.notifier.send(Alert( res = single.res
kind="levels",
title="Niveluri", if single.late_start or res is None:
body=( return
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} " if tr is not None and getattr(res, "accepted", False) and getattr(res, "color", None):
f"TP2={lr.levels.tp2}" if tr.reason == "prime" and not ctx.scheduler.is_running:
), ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
)) ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
ctx.state.levels_extractor = None elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm") and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr is not None and tr.trigger and not tr.locked:
ctx.state.fire_count += 1
if ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
ctx.state.levels_extractor = ctx.levels_extractor_factory(ctx.cfg, tr.trigger, time.time())
if ctx.state.levels_extractor is not None and single.frame is not None:
lr = ctx.state.levels_extractor.step(single.frame, time.time())
if lr.status in ("complete", "timeout"):
if lr.status == "complete" and lr.levels:
ctx.notifier.send(Alert(
kind="levels",
title="Niveluri",
body=(
f"SL={lr.levels.sl} "
f"TP1={lr.levels.tp1} "
f"TP2={lr.levels.tp2}"
),
))
ctx.state.levels_extractor = None
return
# Multi-chart path: aggregate scheduler arbitration over all results.
for r in results:
tr = r.tr
res = r.res
if r.late_start or res is None or tr is None:
continue
if not (getattr(res, "accepted", False) and getattr(res, "color", None)):
continue
if tr.reason == "prime":
ctx.state.n_primed_global += 1
if ctx.state.n_primed_global == 1 and not ctx.scheduler.is_running:
ctx.scheduler.start(ctx.cfg.telegram.auto_poll_interval_s)
ctx.audit.log({"ts": time.time(), "event": "scheduler_started", "reason": "primed"})
elif tr.reason in ("fire", "cooled", "phase_skip", "opposite_rearm"):
if ctx.state.n_primed_global > 0:
ctx.state.n_primed_global -= 1
if ctx.state.n_primed_global == 0 and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": tr.reason})
if tr.trigger and not tr.locked:
ctx.state.fire_count += 1
if ctx.state.n_primed_global == 0 and ctx.scheduler.is_running:
ctx.scheduler.stop()
ctx.audit.log({"ts": time.time(), "event": "scheduler_stopped", "reason": "fire"})
# Levels disabled in split (per-chart y-axis would need its own calib).
ctx.audit.log({"ts": time.time(), "event": "levels_skipped_split"})
async def _dispatch_command(ctx: RunContext, cmd) -> None: async def _dispatch_command(ctx: RunContext, cmd) -> None:
@@ -1436,8 +1716,21 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
body="", body="",
)) ))
return return
# Always re-detect strips fresh — out-of-window or pre-first-tick,
# ctx.charts is empty (lopul de detection nu rulează când e closed),
# dar /ss trebuie să arate corect oricând. Fallback pe ctx.charts
# dacă detecția eșuează, apoi pe cfg.dot_roi via ss_strips=None.
ss_strips: "list | None" = None
try:
fresh = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame_ss)
except Exception:
fresh = []
if fresh:
ss_strips = fresh
elif ctx.charts:
ss_strips = [c.sub_roi for c in ctx.charts]
path_ss, detections_ss = await asyncio.to_thread( path_ss, detections_ss = await asyncio.to_thread(
_save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit, _save_inspect_frame, frame_ss, ctx.cfg, ctx.fires_dir, now_ss, ctx.audit, ss_strips,
) )
caption_ss = _format_inspect_caption(detections_ss, ctx.cfg) caption_ss = _format_inspect_caption(detections_ss, ctx.cfg)
state_hdr = _fsm_step_label(ctx.fsm, now_ss) state_hdr = _fsm_step_label(ctx.fsm, now_ss)
@@ -1482,8 +1775,19 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
if frame_r is None: if frame_r is None:
capture_failed = True capture_failed = True
else: else:
# Same fresh-detect logic ca /ss: out-of-window ctx.charts e gol,
# dar /resume tot trebuie să arate corect ambele chart-uri.
r_strips: "list | None" = None
try:
fresh_r = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame_r)
except Exception:
fresh_r = []
if fresh_r:
r_strips = fresh_r
elif ctx.charts:
r_strips = [c.sub_roi for c in ctx.charts]
path_r, detections_r = await asyncio.to_thread( path_r, detections_r = await asyncio.to_thread(
_save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit, _save_inspect_frame, frame_r, ctx.cfg, ctx.fires_dir, now_r, ctx.audit, r_strips,
) )
was_drift = bool(getattr(ctx.canary, "is_paused", False)) was_drift = bool(getattr(ctx.canary, "is_paused", False))
@@ -1833,6 +2137,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts), levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts),
lifecycle=lifecycle, lifecycle=lifecycle,
) )
# ctx.charts is populated on the first detection tick by _run_multi_tick
# which calls detect_strips → _commit_layout_change. Starting empty means
# the real strip geometry is used (not a hardcoded cfg.dot_roi placeholder)
# and avoids spurious layout-change alerts.
ctx.charts = []
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Nested async coroutines — heartbeat captures notifier + heartbeat_due # Nested async coroutines — heartbeat captures notifier + heartbeat_due
@@ -1853,6 +2162,7 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
fire_count=ctx.state.fire_count, fire_count=ctx.state.fire_count,
uptime_h=uptime_h, uptime_h=uptime_h,
canary_paused=paused, canary_paused=paused,
charts=ctx.charts if len(ctx.charts) >= 2 else None,
)) ))
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
@@ -1860,8 +2170,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
while True: while True:
if duration_s is not None and (time.monotonic() - start) >= duration_s: if duration_s is not None and (time.monotonic() - start) >= duration_s:
break break
result = await _run_tick(ctx) # Always use _run_multi_tick: it detects strips each tick,
await _handle_fsm_result(ctx, result) # manages layout changes (1→2 or 2→1), and handles single-chart
# mode identically to the old _run_tick path.
results = await _run_multi_tick(ctx)
await _handle_fsm_result(ctx, results)
await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang await _drain_cmd_queue(ctx) # UNCONDITIONAL — fix for command hang
await asyncio.sleep(cfg.loop_interval_s) await asyncio.sleep(cfg.loop_interval_s)

View File

@@ -12,6 +12,22 @@ class Alert:
image_path: Path | None = None # annotated screenshot image_path: Path | None = None # annotated screenshot
direction: str | None = None # "BUY"/"SELL" when kind=trigger direction: str | None = None # "BUY"/"SELL" when kind=trigger
silent: bool = False # disable_notification for Telegram; ignored by Discord silent: bool = False # disable_notification for Telegram; ignored by Discord
chart_id: str = ""
def _alert_prefix(chart_id: str) -> str:
"""Return Telegram title prefix for chart_id. Empty for single-chart mode."""
if not chart_id:
return ""
if chart_id == "left":
return "[stânga] "
if chart_id == "right":
return "[dreapta] "
try:
n = int(chart_id.split("_")[1])
return f"[chart {n + 1}] "
except (IndexError, ValueError):
return f"[{chart_id}] "
class Notifier(Protocol): class Notifier(Protocol):

View File

@@ -283,6 +283,62 @@ def test_real_screenshot_rightmost_dot(screenshot: str, expected: str) -> None:
) )
def test_dot_roi_override_uses_sub_roi() -> None:
"""dot_roi_override must be used instead of cfg.dot_roi for crop + offset.
Paint a yellow dot inside the override ROI but **outside** cfg.dot_roi.
The default DOT_ROI is (10,10,280,80); we override with an ROI placed
well to the right (x=200, w=80) so the painted dot only intersects the
override. If the detector still cropped from cfg.dot_roi the yellow dot
would land at the rightmost edge of the larger ROI as well — so we use
a frame that has nothing in the cfg.dot_roi region except inside the
override window, and assert dot_pos_abs falls inside the override.
"""
override = ROI(x=200, y=20, w=80, h=60)
# Background-only frame, then paint yellow only inside the override
frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8)
fy0, fy1 = override.y, override.y + override.h
fx0, fx1 = override.x + 50, override.x + override.w # right edge of override
frame[fy0:fy1, fx0:fx1] = YELLOW_BGR
cfg = _make_cfg(debounce_depth=1)
det = Detector(cfg, capture=lambda: frame, dot_roi_override=override)
r = det.step(0.0)
assert r.dot_found is True
assert r.match is not None
assert r.match.name == "yellow"
assert r.dot_pos_abs is not None
abs_x, abs_y = r.dot_pos_abs
assert override.x <= abs_x < override.x + override.w
assert override.y <= abs_y < override.y + override.h
def test_dot_pos_abs_with_offset() -> None:
"""dot_pos_abs must include the override ROI's (x, y) offset."""
override = ROI(x=100, y=20, w=50, h=40)
frame = np.full((100, 300, 3), BG_VAL, dtype=np.uint8)
# Paint a single full-height yellow stripe at roi-local x in [40, 50)
# so find_rightmost_dot lands somewhere inside that stripe.
fy0, fy1 = override.y, override.y + override.h
fx0, fx1 = override.x + 40, override.x + 50
frame[fy0:fy1, fx0:fx1] = YELLOW_BGR
cfg = _make_cfg(debounce_depth=1)
det = Detector(cfg, capture=lambda: frame, dot_roi_override=override)
r = det.step(0.0)
assert r.dot_found is True
assert r.dot_pos_abs is not None
abs_x, abs_y = r.dot_pos_abs
# Painted stripe: roi-local x in [40,50), y in [0, h). Absolute coords
# must be offset by override.(x, y).
assert override.x + 40 <= abs_x < override.x + 50
assert override.y <= abs_y < override.y + override.h
def test_fused_blob_samples_rightmost_dot() -> None: def test_fused_blob_samples_rightmost_dot() -> None:
"""Fused multi-colour stripe must classify the rightmost colour, not the """Fused multi-colour stripe must classify the rightmost colour, not the
centroid colour. Pre-fix the centroid fell on an interior gray segment centroid colour. Pre-fix the centroid fell on an interior gray segment

163
tests/test_layout.py Normal file
View File

@@ -0,0 +1,163 @@
import numpy as np
import pytest
from atm.config import ROI
from atm.layout import _strips_match, detect_strips
PALETTE = {
"turquoise": ((0, 253, 253), 60.0),
"yellow": ((253, 253, 0), 60.0),
"dark_green": ((0, 128, 0), 60.0),
"dark_red": ((128, 0, 0), 60.0),
"light_green": ((0, 255, 0), 60.0),
"light_red": ((255, 0, 0), 60.0),
}
def _blank(h: int = 20, w: int = 200) -> np.ndarray:
return np.zeros((h, w, 3), dtype=np.uint8)
def _paint(img: np.ndarray, x0: int, x1: int, rgb: tuple[int, int, int]) -> None:
"""Paint vivid color into BGR image (palette stores RGB)."""
bgr = (rgb[2], rgb[1], rgb[0])
img[:, x0:x1] = bgr
def test_single_strip():
img = _blank(20, 200)
_paint(img, 0, 200, (0, 253, 253)) # turquoise
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 1
assert out[0].x == 0
assert abs(out[0].w - 200) <= 1
assert out[0].h == 20
def test_split_50_50():
img = _blank(20, 230)
_paint(img, 0, 100, (0, 253, 253))
_paint(img, 130, 230, (253, 253, 0))
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 2
assert out[0].x < out[1].x # L->R
assert out[0].x == 0
assert abs(out[0].w - 100) <= 1
assert out[1].x == 130
assert abs(out[1].w - 100) <= 1
def test_split_asymmetric():
img = _blank(20, 230)
_paint(img, 0, 70, (0, 253, 253)) # 35% width
_paint(img, 100, 230, (253, 253, 0)) # 65% width
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 2
assert abs(out[0].w - 70) <= 1
assert abs(out[1].w - 130) <= 1
def test_gray_only_no_strip():
img = _blank(20, 200)
img[:, 50:150] = (128, 128, 128)
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert out == []
def test_cooldown_gray_dots_no_detect():
img = _blank(20, 200)
# scattered gray dots
for x in (20, 50, 80, 110, 140, 170):
img[8:12, x:x + 4] = (100, 100, 100)
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert out == []
def test_vivid_palette_match():
img = _blank(20, 200)
_paint(img, 50, 80, (0, 255, 0)) # light_green
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 1
def test_blank_frame():
img = _blank(20, 200)
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert out == []
def test_strip_too_narrow_filtered():
img = _blank(20, 200)
_paint(img, 50, 53, (0, 253, 253)) # only 3px wide
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=10)
assert out == []
def test_small_gap_fuses():
img = _blank(20, 200)
_paint(img, 30, 70, (0, 253, 253))
_paint(img, 75, 120, (0, 253, 253)) # 5px gap, < min_gap_px=20
out = detect_strips(img, PALETTE, min_gap_px=20, min_strip_px=5)
assert len(out) == 1
assert abs(out[0].x - 30) <= 2
assert abs((out[0].x + out[0].w) - 120) <= 2
def test_split_two_charts_with_interleaved_gray():
# Regresie 2 ferestre TS: fiecare row de buline e mix vivid + gri, separate
# de un gap larg de background (dividerul dintre ferestre). Înainte de fix
# detect_strips picka doar runs vivid contiguu și rata fereastra stângă.
palette = {**PALETTE, "gray": ((128, 128, 128), 60.0)}
img = _blank(35, 1796)
# Left chart: dots vivid + gray la fiecare 26px, x=0..820
for i, x in enumerate(range(0, 820, 26)):
rgb = (128, 128, 128) if i % 3 else (0, 128, 0)
_paint(img, x, x + 22, rgb)
# Window divider gap: x=820..910 rămâne background
# Right chart: same pattern, x=910..1796
for i, x in enumerate(range(910, 1790, 26)):
rgb = (128, 128, 128) if i % 3 else (0, 128, 0)
_paint(img, x, x + 22, rgb)
out = detect_strips(img, palette, min_gap_px=28, min_strip_px=280)
assert len(out) == 2, f"expected 2 strips, got {len(out)}: {out}"
assert out[0].x == 0
assert out[1].x >= 900 # right chart starts after divider
assert out[0].x + out[0].w < out[1].x # disjoint
def test_three_strips():
img = _blank(20, 300)
_paint(img, 0, 60, (0, 253, 253))
_paint(img, 100, 160, (253, 253, 0))
_paint(img, 220, 300, (0, 255, 0))
out = detect_strips(img, PALETTE, min_gap_px=10, min_strip_px=5)
assert len(out) == 3
assert out[0].x < out[1].x < out[2].x
assert out[0].x == 0
assert out[1].x == 100
assert out[2].x == 220
def test_strips_match_identical():
a = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
b = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
assert _strips_match(a, b) is True
def test_strips_match_jitter_5px():
a = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
b = [ROI(x=5, y=0, w=95, h=20), ROI(x=128, y=0, w=70, h=20)]
assert _strips_match(a, b, tol=10) is True
def test_strips_match_drift_12px():
a = [ROI(x=0, y=0, w=100, h=20)]
b = [ROI(x=12, y=0, w=100, h=20)]
assert _strips_match(a, b, tol=10) is False
def test_strips_match_count_different():
a = [ROI(x=0, y=0, w=100, h=20)]
b = [ROI(x=0, y=0, w=100, h=20), ROI(x=130, y=0, w=70, h=20)]
assert _strips_match(a, b) is False

View File

@@ -257,6 +257,14 @@ def test_run_live_catchup_sell_from_gray_then_dark_red(monkeypatch, tmp_path):
monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller) monkeypatch.setattr("atm.commands.TelegramPoller", _StubPoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", _StubScheduler)
# Bootstrap a single chart so _run_multi_tick populates ctx.charts on tick 1.
# Frame is zeros → real detect_strips returns [] → without this, charts stays
# empty and the ScriptedDetector loop never advances (regression after the
# multi-chart refactor made _run_multi_tick the single detection path).
from atm.config import ROI
_bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)]
monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip)
with pytest.raises(_StopLoop): with pytest.raises(_StopLoop):
_main.run_live(cfg, duration_s=None) _main.run_live(cfg, duration_s=None)
@@ -384,6 +392,11 @@ async def test_lifecycle_idle_armed_primed_autopoll_fire_stop(monkeypatch, tmp_p
monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller) monkeypatch.setattr("atm.commands.TelegramPoller", FakePoller)
monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched) monkeypatch.setattr("atm.scheduler.ScreenshotScheduler", lambda *a, **kw: fake_sched)
# Bootstrap ctx.charts on tick 1 — see test_run_live_catchup_sell for context.
from atm.config import ROI
_bootstrap_strip = [ROI(x=0, y=0, w=10, h=10)]
monkeypatch.setattr("atm.main._detect_strips_for_ctx", lambda c, f: _bootstrap_strip)
with pytest.raises(_StopLoop): with pytest.raises(_StopLoop):
await _main.run_live_async(cfg, duration_s=None) await _main.run_live_async(cfg, duration_s=None)
@@ -1628,3 +1641,451 @@ async def test_status_window_line_when_oh_enabled():
status = [a for a in ctx.notifier.alerts if a.kind == "status"] status = [a for a in ctx.notifier.alerts if a.kind == "status"]
body = status[0].body body = status[0].body
assert "fereastră: deschisă" in body assert "fereastră: deschisă" in body
# ---------------------------------------------------------------------------
# Multi-chart split workspace tests
# ---------------------------------------------------------------------------
def test_chart_state_defaults():
"""ChartState's first_accepted defaults to True; last_saved_color to None."""
import atm.main as _main
from atm.config import ROI
roi = ROI(x=0, y=0, w=200, h=35)
chart = _main.ChartState(
chart_id="", sub_roi=roi, detector=MagicMock(), fsm=MagicMock(),
)
assert chart.first_accepted is True
assert chart.last_saved_color is None
assert chart.chart_id == ""
assert chart.sub_roi is roi
def test_alert_prefix_import():
"""_alert_prefix lives in atm.notifier and produces the expected prefixes."""
from atm.notifier import _alert_prefix
assert _alert_prefix("") == ""
assert _alert_prefix("left") == "[stânga] "
assert _alert_prefix("right") == "[dreapta] "
assert _alert_prefix("chart_0") == "[chart 1] "
assert _alert_prefix("chart_2") == "[chart 3] "
def test_save_annotated_frame_no_price_overlay(tmp_path):
"""_save_annotated_frame must NOT draw the price overlay anymore.
Top-right area must contain no white pixels (text was rendered there before).
"""
import atm.main as _main
from atm.config import ROI, YAxisCalib, ColorSpec
frame = np.zeros((200, 400, 3), dtype=np.uint8)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=10, y=10, w=380, h=180),
y_axis=YAxisCalib(p1_y=10, p1_price=100.0, p2_y=190, p2_price=50.0),
colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)},
)
fpath = _main._save_annotated_frame(
frame, cfg, tmp_path, "test", now=123.0,
dot_pos_abs=(200, 100), canary_ok=True,
)
assert fpath is not None
saved = cv2.imread(str(fpath))
assert saved is not None
# Top-right corner where the "$..." text used to live (rows 0..40, cols 300..390).
# The cyan ROI rect is on top edge but only 2px thick → vast majority of that
# corner is still pure black. White text would average a much higher value.
corner = saved[0:40, 300:390]
# No bright white pixels (text was 255,255,255).
white_pixels = ((corner[:, :, 0] > 200) & (corner[:, :, 1] > 200) & (corner[:, :, 2] > 200)).sum()
assert white_pixels == 0, f"expected 0 white pixels (no price text), got {white_pixels}"
def test_save_annotated_frame_uses_roi_param(tmp_path):
"""When roi= is passed, the cyan rect is drawn there (not at cfg.dot_roi)."""
import atm.main as _main
from atm.config import ROI, ColorSpec
frame = np.zeros((200, 400, 3), dtype=np.uint8)
cfg = types.SimpleNamespace(
dot_roi=ROI(x=0, y=0, w=10, h=10), # tiny cfg ROI
colors={"background": ColorSpec(rgb=(0, 0, 0), tolerance=15.0)},
)
custom_roi = ROI(x=100, y=50, w=200, h=80)
fpath = _main._save_annotated_frame(
frame, cfg, tmp_path, "rtest", now=123.0, roi=custom_roi,
)
assert fpath is not None
saved = cv2.imread(str(fpath))
# ROI rect color is (0, 255, 255) BGR. Find any pixel where G+R are saturated and B=0.
edge = saved[49:52, 100:300]
rect_present = ((edge[:, :, 0] < 50) & (edge[:, :, 1] > 200) & (edge[:, :, 2] > 200)).any()
assert rect_present, "Expected ROI rect along custom roi top edge"
# Also check the cfg.dot_roi rect (x=0..10, y=0..10) is NOT drawn — proves we used roi=custom.
cfg_corner = saved[0:10, 0:10]
rect_at_cfg = ((cfg_corner[:, :, 0] < 50) & (cfg_corner[:, :, 1] > 200) & (cfg_corner[:, :, 2] > 200)).any()
assert not rect_at_cfg, "cfg.dot_roi rect must not appear when roi= override is used"
def test_commit_layout_change_resets_fsm(monkeypatch):
"""_commit_layout_change rebuilds ctx.charts, resets FSM, zeroes n_primed_global."""
import atm.main as _main
from atm.config import ROI
from atm.state_machine import StateMachine, State
# Prebuild a ctx with one chart in ARMED_BUY.
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
fsm_old = StateMachine(lockout_s=60)
fsm_old.feed("turquoise", 1.0) # IDLE -> ARMED_BUY
assert fsm_old.state == State.ARMED_BUY
initial_chart = _main.ChartState(
chart_id="", sub_roi=ROI(x=0, y=0, w=400, h=35),
detector=MagicMock(), fsm=fsm_old,
)
state = _main._LoopState()
state.n_primed_global = 5 # any non-zero value
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = True
def stop(self):
type(self).is_running = False
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=fsm_old,
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
ctx.charts = [initial_chart]
new_strips = [
ROI(x=0, y=0, w=200, h=35),
ROI(x=250, y=0, w=200, h=35),
]
_main._commit_layout_change(ctx, new_strips, now=100.0)
assert len(ctx.charts) == 2
assert ctx.charts[0].chart_id == "left"
assert ctx.charts[1].chart_id == "right"
# New FSMs must be IDLE (fresh StateMachine)
assert ctx.charts[0].fsm.state == State.IDLE
assert ctx.charts[1].fsm.state == State.IDLE
# Old fsm not reused
assert ctx.charts[0].fsm is not fsm_old
# n_primed_global reset
assert state.n_primed_global == 0
# layout_change audit + status alert
assert any(e.get("event") == "layout_change" and e["new_n"] == 2 for e in audit_events)
assert any("Layout TS schimbat" in a.title for a in notifier_alerts)
def test_commit_layout_change_chart_ids_for_n3(monkeypatch):
"""For n>=3 charts use 'chart_0', 'chart_1', ..."""
import atm.main as _main
from atm.config import ROI
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
state = _main._LoopState()
class _N:
def send(self, a): pass
class _A:
def log(self, e): pass
class _S:
is_running = False
def stop(self): pass
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=MagicMock(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
)
ctx.charts = []
strips = [ROI(x=i * 100, y=0, w=80, h=35) for i in range(3)]
_main._commit_layout_change(ctx, strips, now=100.0)
assert [c.chart_id for c in ctx.charts] == ["chart_0", "chart_1", "chart_2"]
def test_strips_match_silent_update_jitter():
"""_strips_match returns True for <=10px jitter — used to trigger silent update."""
from atm.layout import _strips_match
from atm.config import ROI
a = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
b = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)]
assert _strips_match(a, b, tol=10) is True
# Drift 12px breaks the match
c = [ROI(x=12, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
assert _strips_match(a, c, tol=10) is False
def test_build_heartbeat_alert_single_compat():
"""Without charts arg, _build_heartbeat_alert returns the legacy single-chart body."""
import atm.main as _main
a = _main._build_heartbeat_alert(
fsm_state="IDLE", fire_count=0, uptime_h=1.0, canary_paused=False,
)
assert a.kind == "heartbeat"
assert a.title == "activ"
assert a.body == "IDLE | semnale: 0 | 1.0h"
def test_build_heartbeat_alert_multi_chart():
"""charts=[left,right] body lines reference [stânga] / [dreapta]."""
import atm.main as _main
from atm.config import ROI
fsm_left = types.SimpleNamespace(state=types.SimpleNamespace(value="ARMED_BUY"))
fsm_right = types.SimpleNamespace(state=types.SimpleNamespace(value="IDLE"))
cs_left = _main.ChartState(
chart_id="left", sub_roi=ROI(x=0, y=0, w=10, h=10),
detector=MagicMock(), fsm=fsm_left,
)
cs_right = _main.ChartState(
chart_id="right", sub_roi=ROI(x=0, y=0, w=10, h=10),
detector=MagicMock(), fsm=fsm_right,
)
a = _main._build_heartbeat_alert(
fsm_state="ignored", fire_count=2, uptime_h=1.5, canary_paused=False,
charts=[cs_left, cs_right],
)
assert a.kind == "heartbeat"
assert "[stânga]" in a.body
assert "[dreapta]" in a.body
assert "ARMED_BUY" in a.body
assert "IDLE" in a.body
assert "semnale: 2" in a.body
@pytest.mark.asyncio
async def test_run_multi_tick_silent_jitter_update(monkeypatch, tmp_path):
"""When detect_strips returns positions with <=10px jitter, sub_roi updates
silently (no _commit_layout_change → no extra status alert)."""
import atm.main as _main
from atm.config import ROI
monkeypatch.chdir(tmp_path)
initial = [ROI(x=0, y=0, w=200, h=35), ROI(x=250, y=0, w=200, h=35)]
jittered = [ROI(x=3, y=0, w=200, h=35), ROI(x=253, y=0, w=200, h=35)]
class _Det:
def __init__(self): self.last_roi = None
def step(self, ts, frame=None):
from atm.detector import DetectionResult
return DetectionResult(
ts=ts, window_found=True, dot_found=False,
rgb=None, match=None, accepted=False, color=None,
)
def update_dot_roi(self, roi):
self.last_roi = roi
class _FSM:
state = types.SimpleNamespace(value="IDLE")
_last_fire: dict = {}
cfg = MagicMock()
cfg.lockout_s = 60
cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False)
cfg.window_title = None
state_obj = _main._LoopState()
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def start(self, s): pass
def stop(self): pass
canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False))
ctx = _main.RunContext(
cfg=cfg, capture=lambda: np.zeros((200, 600, 3), dtype=np.uint8),
canary=canary, detector=MagicMock(), fsm=_FSM(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path,
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
det_l, det_r = _Det(), _Det()
ctx.charts = [
_main.ChartState("left", initial[0], det_l, _FSM()),
_main.ChartState("right", initial[1], det_r, _FSM()),
]
# Stub strip detection to return jittered strips.
monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: jittered)
results = await _main._run_multi_tick(ctx)
assert len(results) == 2
# Sub-ROI silently updated, but no layout_change event.
assert ctx.charts[0].sub_roi == jittered[0]
assert ctx.charts[1].sub_roi == jittered[1]
assert det_l.last_roi == jittered[0]
assert det_r.last_roi == jittered[1]
assert not any(e.get("event") == "layout_change" for e in audit_events)
# No status "Layout TS schimbat" alert.
assert not any("Layout TS schimbat" in a.title for a in notifier_alerts)
@pytest.mark.asyncio
async def test_run_multi_tick_silent_width_oscillation(monkeypatch, tmp_path):
"""Regression: width pulsations (e.g. 792↔810px) on a stable chart count
must NOT trigger _commit_layout_change. Reproduces the production bug
visible in logs/2026-05-04.jsonl (576 layout_change events/day).
"""
import atm.main as _main
from atm.config import ROI
monkeypatch.chdir(tmp_path)
initial = [ROI(x=0, y=0, w=792, h=35), ROI(x=912, y=0, w=845, h=35)]
pulsed = [ROI(x=0, y=0, w=810, h=35), ROI(x=912, y=0, w=863, h=35)]
class _Det:
def __init__(self): self.last_roi = None
def step(self, ts, frame=None):
from atm.detector import DetectionResult
return DetectionResult(
ts=ts, window_found=True, dot_found=False,
rgb=None, match=None, accepted=False, color=None,
)
def update_dot_roi(self, roi):
self.last_roi = roi
class _FSM:
state = types.SimpleNamespace(value="PRIMED_BUY")
_last_fire: dict = {}
cfg = MagicMock()
cfg.lockout_s = 60
cfg.attach_screenshots = types.SimpleNamespace(arm=False, prime=False, trigger=False, late_start=False, catchup=False, opposite_rearm=False, rearm=False, phase_skip=False)
cfg.window_title = None
state_obj = _main._LoopState()
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def start(self, s): pass
def stop(self): pass
canary = types.SimpleNamespace(is_paused=False, check=lambda f: types.SimpleNamespace(distance=0, drifted=False))
fsm_left = _FSM()
fsm_right = _FSM()
ctx = _main.RunContext(
cfg=cfg, capture=lambda: np.zeros((200, 1800, 3), dtype=np.uint8),
canary=canary, detector=MagicMock(), fsm=fsm_left,
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path,
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
det_l, det_r = _Det(), _Det()
ctx.charts = [
_main.ChartState("left", initial[0], det_l, fsm_left),
_main.ChartState("right", initial[1], det_r, fsm_right),
]
monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: pulsed)
await _main._run_multi_tick(ctx)
# Sub-ROI silently updated.
assert ctx.charts[0].sub_roi == pulsed[0]
assert ctx.charts[1].sub_roi == pulsed[1]
assert det_l.last_roi == pulsed[0]
assert det_r.last_roi == pulsed[1]
# FSM identity preserved (not rebuilt).
assert ctx.charts[0].fsm is fsm_left
assert ctx.charts[1].fsm is fsm_right
# Critical: no layout_change event, no status alert.
assert not any(e.get("event") == "layout_change" for e in audit_events)
assert not any("Layout TS schimbat" in a.title for a in notifier_alerts)
def test_commit_layout_change_alert_is_silent(monkeypatch):
"""Layout-change alert on real count change uses silent=True so Telegram
doesn't ping the user. Re-anchor info still visible in chat history.
"""
import atm.main as _main
from atm.config import ROI
from atm.state_machine import StateMachine
cfg = MagicMock()
cfg.lockout_s = 60
cfg.colors = {}
cfg.debounce_depth = 3
cfg.confidence_min = 0.5
cfg.dot_roi = ROI(x=0, y=0, w=600, h=35)
notifier_alerts: list = []
audit_events: list = []
class _N:
def send(self, a): notifier_alerts.append(a)
class _A:
def log(self, e): audit_events.append(e)
class _S:
is_running = False
def stop(self): pass
state_obj = _main._LoopState()
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=MagicMock(),
detector=MagicMock(), fsm=StateMachine(60),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state_obj,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=None,
)
ctx.charts = [
_main.ChartState("only", ROI(x=0, y=0, w=400, h=35), MagicMock(), StateMachine(60)),
]
new_strips = [
ROI(x=0, y=0, w=200, h=35),
ROI(x=250, y=0, w=200, h=35),
]
_main._commit_layout_change(ctx, new_strips, now=100.0)
layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title]
assert len(layout_alerts) == 1
assert layout_alerts[0].silent is True

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import pytest import pytest
from atm.notifier import Alert from atm.notifier import Alert, _alert_prefix
from atm.notifier.fanout import FanoutNotifier from atm.notifier.fanout import FanoutNotifier
@@ -358,3 +358,32 @@ def test_fanout_on_drop_exception_swallowed(tmp_path: Path) -> None:
s = fan.stats() s = fan.stats()
# Some alerts still went through # Some alerts still went through
assert s["slow"]["sent"] > 0 or s["slow"]["dropped"] > 0 assert s["slow"]["sent"] > 0 or s["slow"]["dropped"] > 0
# ---------------------------------------------------------------------------
# Alert.chart_id + _alert_prefix
# ---------------------------------------------------------------------------
def test_alert_chart_id_default() -> None:
assert Alert(kind="arm", title="t", body="b").chart_id == ""
def test_alert_chart_id_set() -> None:
assert Alert(kind="arm", title="t", body="b", chart_id="left").chart_id == "left"
def test_alert_prefix_empty() -> None:
assert _alert_prefix("") == ""
def test_alert_prefix_left() -> None:
assert _alert_prefix("left") == "[stânga] "
def test_alert_prefix_right() -> None:
assert _alert_prefix("right") == "[dreapta] "
def test_alert_prefix_chart_n() -> None:
assert _alert_prefix("chart_0") == "[chart 1] "
assert _alert_prefix("chart_1") == "[chart 2] "