From 248ad6b10e1b97740be41c2f9f9409427142f366 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Wed, 22 Apr 2026 22:56:51 +0300 Subject: [PATCH] feat(telegram): /rebase + /rebase confirm pentru re-anchor canary baseline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit /rebase capturează + propune phash nou (screenshot adnotat cu red rect pe canary.roi, old/new hash, distance, TTL 180s). /rebase confirm rescrie baseline_phash în TOML-ul activ (regex line-match, păstrează comentariile), mirror în cfg live via object.__setattr__ (CanaryRegion e frozen), clear user_paused + drift_paused într-un singur shot — similar /resume. Fix adiacent: _dispatch_ctx / _mock_config_class setează cfg.window_title=None explicit; 5 teste _dispatch_command pre-existente eșuau pe MagicMock auto- truthy care propaga în _focus_window_by_title. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 4 +- src/atm/commands.py | 7 +- src/atm/main.py | 207 ++++++++++++++++++++++++++++++++++++ tests/test_commands.py | 14 +++ tests/test_main.py | 235 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 465 insertions(+), 2 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 6301816..f6ba64c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -28,7 +28,9 @@ Workflow după sesiune: review frame-urile noi din `frames/`, adaugi entry-uri ## Telegram commands (live) -`/ss` `/status` `/pause` `/resume` `/3` (interval min) `/stop` +`/ss` `/status` `/pause` `/resume` `/rebase` `/3` (interval min) `/stop` + +- `/rebase` — propune un `baseline_phash` nou pentru canary: capturează frame, crop pe `canary.roi`, phash → trimite screenshot adnotat (cerc roșu pe ROI) cu old/new hash + distance. `/rebase confirm` în ≤180s aplică: rescrie `baseline_phash` în TOML-ul activ (păstrează comentariile), mirror în `cfg` la runtime, clear `user_paused` + `drift_paused`. Fără confirm, nimic nu se modifică. Folosește-l când layout-ul TS s-a schimbat intenționat și vrei să re-ancorezi canary-ul fără `atm calibrate` full. - `/ss` — verify multi-bulină: adnotează top-3 buline din `dot_roi` (cerc roșu gros pe pick-ul FSM, cercuri colorate subțiri pe vecini) + caption cu clasificarea fiecăreia (nume, RGB, distanță, confidence) + `config: {version}`. Cercul colorat folosește `cfg.colors[name].rgb` la runtime — DRY cu paleta activă. - `/resume` clears BOTH user pause and canary drift-pause in one shot (`/resume force` still accepted as legacy alias). Trimite un singur Alert cu screenshot adnotat inline (capture rulează **înainte** de clearing state → zero race cu FSM tick-uri). Dacă capture eșuează, title conține `⚠️ captură eșuată` și resume-ul se execută oricum. diff --git a/src/atm/commands.py b/src/atm/commands.py index 7b774d5..d6c04d2 100644 --- a/src/atm/commands.py +++ b/src/atm/commands.py @@ -17,7 +17,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume", "help"] +CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume", "rebase", "help"] _BASE = "https://api.telegram.org/bot{token}/{method}" @@ -163,6 +163,11 @@ class TelegramPoller: if t == "resume force": # value=1 signals force: also lift canary drift-pause, not just user pause. return Command(action="resume", value=1) + if t == "rebase": + return Command(action="rebase") + if t == "rebase confirm": + # value=1 applies the pending proposal; plain "rebase" captures+proposes. + return Command(action="rebase", value=1) # "3" → set_interval 3 minutes → 180s; "interval 3" also accepted parts = t.split() if len(parts) == 1 and parts[0].isdigit(): diff --git a/src/atm/main.py b/src/atm/main.py index 44fbb33..dffed8d 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -579,6 +579,65 @@ def _draw_roi_cyan(annotated, cfg) -> None: cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2) +_BASELINE_PHASH_LINE = __import__("re").compile( + r'^(\s*baseline_phash\s*=\s*")([0-9a-fA-F]+)(")[^\n]*$', + __import__("re").MULTILINE, +) + + +def _rewrite_baseline_phash(config_path: Path, new_phash: str) -> str: + """Replace baseline_phash in a TOML config file. Returns the old value. + + Uses regex line-match to preserve comments and surrounding formatting. + Raises ValueError if the file has 0 or >1 baseline_phash lines. + """ + text = config_path.read_text(encoding="utf-8") + matches = list(_BASELINE_PHASH_LINE.finditer(text)) + if len(matches) != 1: + raise ValueError( + f"{config_path}: expected exactly 1 baseline_phash line, found {len(matches)}" + ) + m = matches[0] + old_phash = m.group(2) + new_text = text[: m.start(2)] + new_phash + text[m.end(2) :] + config_path.write_text(new_text, encoding="utf-8") + return old_phash + + +def _save_rebase_frame( + frame, cfg, fires_dir: Path, now: float, audit: "_AuditLike | None" = None, +) -> "Path | None": + """Annotate frame with a red rectangle around canary.roi + save to fires_dir. + + Purpose: let the user eyeball exactly what pixels will be hashed before + confirming /rebase. Fail-safe: any error → None (alert still sends text-only). + """ + try: + import cv2 # type: ignore[import-untyped] + except ImportError as exc: + if audit is not None: + try: + audit.log({"ts": now, "event": "snapshot_fail", "label": "rebase", "error": f"cv2 missing: {exc}"}) + except Exception: + pass + return None + try: + ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S") + fpath = fires_dir / f"{ts_str}_rebase.png" + annotated = frame.copy() + r = cfg.canary.roi + cv2.rectangle(annotated, (r.x, r.y), (r.x + r.w, r.y + r.h), (0, 0, 255), 2) + cv2.imwrite(str(fpath), annotated) + return fpath + except Exception as exc: + if audit is not None: + try: + audit.log({"ts": now, "event": "snapshot_fail", "label": "rebase", "error": str(exc)}) + except Exception: + pass + return None + + def _save_inspect_frame( frame, cfg, @@ -1014,6 +1073,9 @@ class RunContext: state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now) lifecycle: Any = None # LifecycleState — window + user_paused tracking + # Pending /rebase proposal: (proposed_ts, new_phash, config_path) or None. + # One slot; a second /rebase overwrites. `/rebase confirm` applies if within TTL. + pending_rebase: Any = None @dataclass @@ -1454,12 +1516,15 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: kind="screenshot", title=title_r, body="\n\n".join(body_parts), image_path=path_r, )) + elif cmd.action == "rebase": + await _dispatch_rebase(ctx, cmd) elif cmd.action == "help": body = ( "/status — stare FSM, uptime, ultima detecție\n" "/ss — screenshot acum\n" "/pause — oprește detecția (heartbeat continuă)\n" "/resume — reia detecția (șterge user-pause și drift-pause)\n" + "/rebase — propune phash nou pentru canary (confirm cu /rebase confirm)\n" "/3 — screenshot automat la fiecare 3 min (sau orice număr)\n" "/stop — oprește screenshot-urile automate\n" "/h — acest mesaj" @@ -1467,6 +1532,148 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: ctx.notifier.send(Alert(kind="status", title="Comenzi ATM", body=body)) +_REBASE_TTL_S = 180.0 + + +async def _dispatch_rebase(ctx: RunContext, cmd) -> None: + """/rebase flow: propose (no value) captures+hashes, /rebase confirm (value=1) applies. + + Propose stores (ts, new_phash, config_path) on ctx.pending_rebase. Confirm + rewrites the active config's baseline_phash, clears user + drift pauses, + mirrors the new phash into the in-memory cfg. TTL = 180s. + """ + cfg = ctx.cfg + confirm = cmd.value == 1 + + if not confirm: + now_r = time.time() + if cfg.window_title: + title = await asyncio.to_thread(_focus_window_by_title, cfg.window_title) + ctx.audit.log({"ts": now_r, "event": "window_focused", "command": "rebase", "title": title}) + await asyncio.sleep(0.15) + frame = await asyncio.to_thread(ctx.capture) + if frame is None: + ctx.notifier.send(Alert( + kind="warn", + title="Captură eșuată — /rebase anulat", + body="Adu TradeStation în față și reîncearcă.", + )) + return + + from atm.vision import crop_roi, hamming_hex, phash as _phash + try: + roi_crop = crop_roi(frame, cfg.canary.roi) + except Exception as exc: + ctx.notifier.send(Alert( + kind="warn", + title="Crop canary ROI eșuat", + body=f"{exc}", + )) + return + new_phash = _phash(roi_crop) + old_phash = cfg.canary.baseline_phash + distance = hamming_hex(old_phash, new_phash) + + config_name = getattr(cfg, "config_version", None) or "unknown" + if config_name in ("", "unknown"): + ctx.notifier.send(Alert( + kind="warn", + title="Rebase imposibil — config necunoscut", + body="cfg.config_version lipsă; rulează `atm calibrate` în loc.", + )) + return + config_path = Path("configs") / f"{config_name}.toml" + + ctx.pending_rebase = (now_r, new_phash, config_path) + path_r = await asyncio.to_thread( + _save_rebase_frame, frame, cfg, ctx.fires_dir, now_r, ctx.audit, + ) + ctx.audit.log({ + "ts": now_r, "event": "rebase_proposed", + "old_phash": old_phash, "new_phash": new_phash, + "distance": distance, "config": config_name, + "screenshot_path": str(path_r) if path_r else None, + }) + body = ( + f"old: {old_phash[:16]}…\n" + f"new: {new_phash[:16]}…\n" + f"distance: {distance} (threshold={cfg.canary.drift_threshold})\n" + f"config: {config_name}\n\n" + f"Trimite /rebase confirm în {int(_REBASE_TTL_S)}s pentru a aplica." + ) + ctx.notifier.send(Alert( + kind="screenshot", + title="Canary rebase — confirmi?", + body=body, + image_path=path_r, + )) + return + + # confirm path + pending = ctx.pending_rebase + now_r = time.time() + if pending is None: + ctx.notifier.send(Alert( + kind="warn", + title="Nimic de confirmat", + body="Trimite mai întâi /rebase pentru a propune un phash nou.", + )) + return + proposed_ts, new_phash, config_path = pending + if now_r - proposed_ts > _REBASE_TTL_S: + ctx.pending_rebase = None + ctx.notifier.send(Alert( + kind="warn", + title="Rebase expirat", + body=f"Propunerea a depășit {int(_REBASE_TTL_S)}s. Trimite /rebase din nou.", + )) + return + + try: + old_phash = await asyncio.to_thread(_rewrite_baseline_phash, config_path, new_phash) + except Exception as exc: + ctx.audit.log({ + "ts": now_r, "event": "rebase_failed", + "error": str(exc), "config": str(config_path), + }) + ctx.notifier.send(Alert( + kind="warn", + title="Rebase eșuat — config nemodificat", + body=f"{exc}", + )) + ctx.pending_rebase = None + return + + # CanaryRegion is a frozen dataclass — bypass the freeze to mirror the new + # hash into the live cfg so the very next canary.check() uses it. Without + # this, the TOML is updated but in-memory canary.baseline_phash stays stale + # and drift re-pauses immediately after /rebase confirm. + object.__setattr__(cfg.canary, "baseline_phash", new_phash) + was_drift = bool(getattr(ctx.canary, "is_paused", False)) + was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False + if ctx.lifecycle is not None: + ctx.lifecycle.user_paused = False + if was_drift: + ctx.canary.resume() + ctx.pending_rebase = None + + ctx.audit.log({ + "ts": now_r, "event": "rebase_applied", + "old_phash": old_phash, "new_phash": new_phash, + "config": str(config_path), + "was_drift": was_drift, "was_user": was_user, + }) + ctx.notifier.send(Alert( + kind="status", + title="Canary rebase aplicat", + body=( + f"new: {new_phash[:16]}…\n" + f"config: {config_path.name}\n" + f"Detecția repornită." + ), + )) + + async def _drain_cmd_queue(ctx: RunContext) -> None: """Drain all pending commands, isolating each dispatch in try/except. diff --git a/tests/test_commands.py b/tests/test_commands.py index 9df48c4..91f696b 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -36,6 +36,20 @@ def test_parse_resume_force(): assert cmd.value == 1 +def test_parse_rebase_plain(): + p = _make_poller() + assert p._parse_command("rebase") == Command(action="rebase") + assert p._parse_command("/rebase") == Command(action="rebase") + + +def test_parse_rebase_confirm(): + p = _make_poller() + cmd = p._parse_command("rebase confirm") + assert cmd is not None + assert cmd.action == "rebase" + assert cmd.value == 1 + + def test_parse_help(): p = _make_poller() assert p._parse_command("h") == Command(action="help") diff --git a/tests/test_main.py b/tests/test_main.py index c7a3aca..9261927 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -30,6 +30,9 @@ def _mock_config_class(cfg=None): """Return a Config-like class whose load_current() returns *cfg*.""" if cfg is None: cfg = MagicMock() + # window_title must be a real falsy value, not a MagicMock auto-attribute; + # otherwise _cmd_run enters _focus_window_by_title and TypeErrors. + cfg.window_title = None mock_cls = MagicMock() mock_cls.load_current.return_value = cfg return mock_cls @@ -783,6 +786,9 @@ def _dispatch_ctx(canary=None, lifecycle=None, cfg=None): cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) + # Skip window focus in dispatch tests — MagicMock window_title would + # propagate into _focus_window_by_title (real Win32 call). + cfg.window_title = None state = _main._LoopState(start=0.0) ctx = _main.RunContext( @@ -896,6 +902,234 @@ async def test_resume_force_alias_still_works(): assert resumed and resumed[0]["force"] is True +# --------------------------------------------------------------------------- +# /rebase: propose + confirm flow +# --------------------------------------------------------------------------- + +def _rebase_cfg_and_path(tmp_path): + """Write a minimal TOML with baseline_phash, return (cfg_live, path, cfg_version).""" + name = "rebase_test" + path = tmp_path / "configs" / f"{name}.toml" + path.parent.mkdir(parents=True, exist_ok=True) + old = "a" * 64 + path.write_text( + f'[canary]\n' + f'baseline_phash = "{old}" # comment stays\n' + f'drift_threshold = 8\n' + f'[canary.roi]\nx = 0\ny = 0\nw = 4\nh = 4\n', + encoding="utf-8", + ) + cfg = types.SimpleNamespace( + window_title=None, + config_version=name, + canary=types.SimpleNamespace( + roi=types.SimpleNamespace(x=0, y=0, w=4, h=4), + baseline_phash=old, + drift_threshold=8, + ), + telegram=types.SimpleNamespace(auto_poll_interval_s=180), + operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None), + ) + return cfg, path, old + + +def _blue_frame(h=200, w=200): + import numpy as _np + return _np.full((h, w, 3), (50, 80, 20), dtype=_np.uint8) + + +def test_rewrite_baseline_phash_updates_only_target_line(tmp_path): + import atm.main as _main + p = tmp_path / "cfg.toml" + p.write_text( + '[canary]\n' + 'baseline_phash = "deadbeef" # keep this comment\n' + 'drift_threshold = 8\n', + encoding="utf-8", + ) + old = _main._rewrite_baseline_phash(p, "cafef00d") + assert old == "deadbeef" + txt = p.read_text(encoding="utf-8") + assert 'baseline_phash = "cafef00d" # keep this comment' in txt + assert "drift_threshold = 8" in txt + + +def test_rewrite_baseline_phash_raises_when_missing(tmp_path): + import atm.main as _main + p = tmp_path / "cfg.toml" + p.write_text("[canary]\ndrift_threshold = 8\n", encoding="utf-8") + with pytest.raises(ValueError): + _main._rewrite_baseline_phash(p, "ff") + + +@pytest.mark.asyncio +async def test_rebase_propose_sets_pending_and_does_not_touch_file(tmp_path, monkeypatch): + import atm.main as _main + from atm.commands import Command + + cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) + ctx = _dispatch_ctx(cfg=cfg) + ctx.capture = lambda: _blue_frame() + ctx.fires_dir = tmp_path / "fires" + ctx.fires_dir.mkdir() + monkeypatch.chdir(tmp_path) + + await _main._dispatch_command(ctx, Command(action="rebase")) + + assert ctx.pending_rebase is not None + proposed_ts, new_phash, pending_path = ctx.pending_rebase + assert new_phash != old + assert pending_path == Path("configs") / f"{cfg.config_version}.toml" + # File unchanged at propose time + assert f'"{old}"' in cfg_path.read_text(encoding="utf-8") + events = [e["event"] for e in ctx.audit.events] + assert "rebase_proposed" in events + + +@pytest.mark.asyncio +async def test_rebase_confirm_applies_and_clears_pauses(tmp_path, monkeypatch): + import atm.main as _main + from atm.commands import Command + + class _Canary: + def __init__(self): self._p = True + @property + def is_paused(self): return self._p + def resume(self): self._p = False + + cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) + canary = _Canary() + ctx = _dispatch_ctx(cfg=cfg, canary=canary) + ctx.capture = lambda: _blue_frame() + ctx.fires_dir = tmp_path / "fires" + ctx.fires_dir.mkdir() + ctx.lifecycle.user_paused = True + monkeypatch.chdir(tmp_path) + + await _main._dispatch_command(ctx, Command(action="rebase")) + assert ctx.pending_rebase is not None + _, new_phash, _ = ctx.pending_rebase + + await _main._dispatch_command(ctx, Command(action="rebase", value=1)) + + assert ctx.pending_rebase is None + assert ctx.lifecycle.user_paused is False + assert canary.is_paused is False + assert cfg.canary.baseline_phash == new_phash + txt = cfg_path.read_text(encoding="utf-8") + assert f'"{new_phash}"' in txt + assert f'"{old}"' not in txt + applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] + assert applied and applied[0]["old_phash"] == old and applied[0]["new_phash"] == new_phash + + +@pytest.mark.asyncio +async def test_rebase_confirm_without_pending_warns(tmp_path): + import atm.main as _main + from atm.commands import Command + + cfg, _, _ = _rebase_cfg_and_path(tmp_path) + ctx = _dispatch_ctx(cfg=cfg) + ctx.fires_dir = tmp_path / "fires" + ctx.fires_dir.mkdir() + + await _main._dispatch_command(ctx, Command(action="rebase", value=1)) + + assert ctx.pending_rebase is None + alerts = ctx.notifier.alerts + assert alerts and "nimic" in alerts[0].title.lower() + applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] + assert not applied + + +@pytest.mark.asyncio +async def test_rebase_confirm_expired_pending_warns(tmp_path, monkeypatch): + import atm.main as _main + from atm.commands import Command + + cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path) + ctx = _dispatch_ctx(cfg=cfg) + ctx.fires_dir = tmp_path / "fires" + ctx.fires_dir.mkdir() + # Pretend propose happened 1h ago + import time as _t + ctx.pending_rebase = (_t.time() - 3600, "b" * 64, cfg_path) + + await _main._dispatch_command(ctx, Command(action="rebase", value=1)) + + assert ctx.pending_rebase is None + # File untouched + assert f'"{old}"' in cfg_path.read_text(encoding="utf-8") + alerts = ctx.notifier.alerts + assert alerts and "expirat" in alerts[0].title.lower() + + +@pytest.mark.asyncio +async def test_rebase_confirm_mutates_frozen_canary_region(tmp_path, monkeypatch): + """Regression: CanaryRegion is @dataclass(frozen=True). Plain assignment + raises FrozenInstanceError. /rebase confirm must mirror the new phash into + the live cfg anyway — otherwise canary.check() keeps the stale hash and + drift re-pauses within one tick after the user confirms.""" + import atm.main as _main + from atm.commands import Command + from atm.config import CanaryRegion, ROI as _ROI + + class _Canary: + def __init__(self): self._p = False + @property + def is_paused(self): return self._p + def resume(self): self._p = False + + # Build cfg with the REAL frozen dataclass, not a SimpleNamespace. + _, cfg_path, old = _rebase_cfg_and_path(tmp_path) + cfg = types.SimpleNamespace( + window_title=None, + config_version="rebase_test", + canary=CanaryRegion( + roi=_ROI(x=0, y=0, w=4, h=4), + baseline_phash=old, + drift_threshold=8, + ), + telegram=types.SimpleNamespace(auto_poll_interval_s=180), + operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None), + ) + ctx = _dispatch_ctx(cfg=cfg, canary=_Canary()) + ctx.capture = lambda: _blue_frame() + ctx.fires_dir = tmp_path / "fires" + ctx.fires_dir.mkdir() + monkeypatch.chdir(tmp_path) + + await _main._dispatch_command(ctx, Command(action="rebase")) + _, new_phash, _ = ctx.pending_rebase + await _main._dispatch_command(ctx, Command(action="rebase", value=1)) + + # Live cfg mirrors the new hash (in-memory canary.check() sees it) + assert cfg.canary.baseline_phash == new_phash + # TOML on disk also updated + assert f'"{new_phash}"' in cfg_path.read_text(encoding="utf-8") + applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"] + assert applied + + +@pytest.mark.asyncio +async def test_rebase_propose_capture_failed_warns(tmp_path, monkeypatch): + import atm.main as _main + from atm.commands import Command + + cfg, _, _ = _rebase_cfg_and_path(tmp_path) + ctx = _dispatch_ctx(cfg=cfg) + ctx.capture = lambda: None + ctx.fires_dir = tmp_path / "fires" + ctx.fires_dir.mkdir() + monkeypatch.chdir(tmp_path) + + await _main._dispatch_command(ctx, Command(action="rebase")) + + assert ctx.pending_rebase is None + alerts = ctx.notifier.alerts + assert alerts and "e" in alerts[0].title.lower() # "Captură eșuată" + + @pytest.mark.asyncio async def test_resume_out_of_window_responds_with_pending_message(): """/resume while operating-hours window is closed → special body.""" @@ -1214,6 +1448,7 @@ async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path) cfg = MagicMock() cfg.telegram.auto_poll_interval_s = 180 cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) + cfg.window_title = None # skip Win32 focus path in unit test ctx = _dispatch_ctx(canary=canary, cfg=cfg)