From bb241bf050f01746967400fb3fad986bc9868520 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Tue, 5 May 2026 18:30:57 +0300 Subject: [PATCH] =?UTF-8?q?feat(canary):=20auto-rebase=20pe=20layout=20cha?= =?UTF-8?q?nge=20(2=E2=86=941=20ferestre)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Când canary drift coincide cu schimbare strip-count pe același frame (ex: TS comută 2→1 chart-uri și mută menu-ul peste care e ancorat ROI), sistemul rescrie automat baseline_phash în TOML, commit layout change și trimite o singură alertă combinată — fără pauză, fără /rebase manual. Drift fără strip-count change rămâne pauză ca azi (drift real). Gate pe două semnale independente previne fals-pozitive. Canary.check() despărțit în măsurare pură + commit_pause/rebase explicit; tick-loop-ul orchestrează decizia. Co-Authored-By: Claude Haiku 4.5 --- src/atm/canary.py | 40 +++-- src/atm/main.py | 105 +++++++++++- tests/test_canary.py | 91 ++++++++--- tests/test_main.py | 376 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 576 insertions(+), 36 deletions(-) diff --git a/src/atm/canary.py b/src/atm/canary.py index 602caf2..c2688c3 100644 --- a/src/atm/canary.py +++ b/src/atm/canary.py @@ -24,8 +24,10 @@ class CanaryResult: class Canary: """Compare live canary ROI phash against a known-good baseline. - Once drift is detected the instance stays paused until resume() is called, - even if subsequent frames look clean again. + ``check()`` is pure measurement — it never mutates ``_paused`` or fires + side-effects. Callers decide whether to ``commit_pause()`` (real drift) or + ``rebase()`` (legitimate layout change). This split exists so the tick loop + can use a second signal (strip-count change) to disambiguate before pausing. """ def __init__( @@ -47,20 +49,30 @@ class Canary: current_hash = phash(roi_img) distance = hamming_hex(current_hash, self._cfg.canary.baseline_phash) drifted = distance > self._cfg.canary.drift_threshold - - if drifted and not self._paused: - self._paused = True - if self._pause_flag_path is not None: - self._pause_flag_path.write_text("paused", encoding="utf-8") - if self._on_pause is not None: - try: - self._on_pause(distance) - except Exception as exc: - # Never let a notifier hiccup abort the detection cycle. - logger.warning("canary on_pause_callback raised: %s", exc) - return CanaryResult(distance=distance, drifted=drifted, paused=self._paused) + def commit_pause(self, distance: int) -> None: + """Transition to paused state. Idempotent — no-op if already paused.""" + if self._paused: + return + self._paused = True + if self._pause_flag_path is not None: + self._pause_flag_path.write_text("paused", encoding="utf-8") + if self._on_pause is not None: + try: + self._on_pause(distance) + except Exception as exc: + # Never let a notifier hiccup abort the detection cycle. + logger.warning("canary on_pause_callback raised: %s", exc) + + def rebase(self, new_phash: str) -> None: + """Replace baseline_phash in the live cfg (in-memory mirror). + + Caller is responsible for persisting to TOML separately. Does NOT touch + ``_paused`` — used in the auto-rebase path where we never paused. + """ + object.__setattr__(self._cfg.canary, "baseline_phash", new_phash) + @property def is_paused(self) -> bool: return self._paused diff --git a/src/atm/main.py b/src/atm/main.py index dbeb37f..f49b988 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -603,6 +603,43 @@ def _rewrite_baseline_phash(config_path: Path, new_phash: str) -> str: return old_phash +def _auto_rebase_canary( + ctx: "RunContext", frame, now: float +) -> "tuple[str, str] | None": + """Compute new canary phash from frame, persist to TOML, mirror in-memory. + + Used by the tick loop when canary drift coincides with a strip-count change + (legitimate layout transition like 2↔1 chart windows). Returns (old_phash, + new_phash) on success. Returns None on TOML rewrite failure or when + config_version is unknown — caller falls back to commit_pause(). + """ + from atm.vision import crop_roi, phash as _phash + + cfg = ctx.cfg + config_name = getattr(cfg, "config_version", None) or "unknown" + if config_name in ("", "unknown"): + ctx.audit.log({ + "ts": now, "event": "auto_rebase_failed", + "error": "config_version unknown", + }) + return None + config_path = Path("configs") / f"{config_name}.toml" + try: + new_phash = _phash(crop_roi(frame, cfg.canary.roi)) + except Exception as exc: + ctx.audit.log({"ts": now, "event": "auto_rebase_failed", + "error": f"phash compute: {exc}"}) + return None + try: + old_phash = _rewrite_baseline_phash(config_path, new_phash) + except Exception as exc: + ctx.audit.log({"ts": now, "event": "auto_rebase_failed", + "error": str(exc), "config": str(config_path)}) + return None + ctx.canary.rebase(new_phash) + return old_phash, new_phash + + def _save_rebase_frame( frame, cfg, fires_dir: Path, now: float, audit: "_AuditLike | None" = None, ) -> "Path | None": @@ -1396,12 +1433,21 @@ def _chart_id_for_index(i: int, n: int) -> str: return f"chart_{i}" -def _commit_layout_change(ctx: RunContext, new_strips: list, now: float) -> None: +def _commit_layout_change( + ctx: RunContext, + new_strips: list, + now: float, + suppress_alert: bool = False, +) -> None: """Replace ctx.charts with fresh ChartState objects for new_strips. Resets per-chart FSM (StateMachine(lockout_s)) and Detector (with dot_roi_override=strip), zeroes n_primed_global, stops scheduler if running, audits the layout_change event, sends a status Alert. + + ``suppress_alert=True`` skips the status Alert (caller will send a combined + one — used by the auto-rebase path so the user gets a single notification + covering both the layout change and the canary rebase). """ from atm.detector import Detector from atm.state_machine import StateMachine @@ -1433,8 +1479,9 @@ def _commit_layout_change(ctx: RunContext, new_strips: list, now: float) -> None "strips": [{"x": s.x, "w": s.w} for s in new_strips], }) # Suppress notification on first-ever detection (old_n=0 = startup bootstrap, - # not a real layout change). - if old_n > 0: + # not a real layout change). Also suppress when caller (auto-rebase path) + # will send a combined alert. + if old_n > 0 and not suppress_alert: ctx.notifier.send(Alert( kind="status", title=f"🔄 Layout TS schimbat: {old_n} → {new_n} ferestre. FSM resetat.", @@ -1503,12 +1550,60 @@ async def _run_multi_tick(ctx: RunContext) -> "list[_TickSyncResult]": cr = ctx.canary.check(frame) if ctx.canary.is_paused: + # Already paused from a prior tick — short-circuit. Auto-rebase only + # runs on the not_paused→drifted edge; once paused, only /resume or + # /rebase confirm clears it. ctx.audit.log({"ts": now, "event": "paused", "drift": cr.distance}) return [] new_strips = await asyncio.to_thread(_detect_strips_for_ctx, ctx, frame) current_strips = [c.sub_roi for c in ctx.charts] - if new_strips: + + if cr.drifted: + # Two-signal gate: drift ALONE could be a real anomaly (window occluded, + # screen blackout) OR a layout transition that moved the canary anchor + # within the frame (e.g. 2→1 chart split shifts the menu chrome). We + # disambiguate by checking strip count on the same frame. If strip count + # changed, treat as legitimate layout change → silently rebase canary + + # commit layout change + single combined alert. Otherwise pause as before. + count_changed = bool(new_strips) and len(new_strips) != len(current_strips) + if count_changed: + rebase_result = _auto_rebase_canary(ctx, frame, now) + if rebase_result is not None: + old_phash, new_phash = rebase_result + old_n = len(current_strips) + new_n = len(new_strips) + _commit_layout_change(ctx, new_strips, now, suppress_alert=True) + ctx.audit.log({ + "ts": now, "event": "layout_change_with_rebase", + "old_n": old_n, "new_n": new_n, + "old_phash": old_phash, "new_phash": new_phash, + "distance": cr.distance, + }) + shot = await asyncio.to_thread( + _save_rebase_frame, frame, ctx.cfg, ctx.fires_dir, now, ctx.audit, + ) + ctx.notifier.send(Alert( + kind="status", + title=f"🔄 Layout TS schimbat: {old_n}→{new_n} ferestre + canary auto-rebased", + body=f"distance={cr.distance}, old={old_phash[:12]}…, new={new_phash[:12]}…", + image_path=shot, + silent=True, + )) + # Fall through to per-chart tick loop with new charts. + else: + # Rebase failed (TOML malformed / config_version unknown) → + # fall back to standard drift-pause behavior. + ctx.canary.commit_pause(cr.distance) + ctx.audit.log({"ts": now, "event": "paused", "drift": cr.distance, + "reason": "auto_rebase_failed"}) + return [] + else: + # Drift without strip-count change → real drift. Pause. + ctx.canary.commit_pause(cr.distance) + ctx.audit.log({"ts": now, "event": "paused", "drift": cr.distance}) + return [] + elif new_strips: if len(new_strips) != len(current_strips): # Real count change (1↔2 ferestre) → rebuild charts. _commit_layout_change(ctx, new_strips, now) @@ -2078,6 +2173,8 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}" if first_check.drifted: print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True) + # check() no longer auto-pauses; resume() kept defensively in case + # an earlier startup path or a future change pauses before this. canary.resume() senzor_label = "deviat" if first_check.drifted else "ok" senzor_info = f"senzor: {senzor_label} ({canary_status.replace('drift=', '')})" diff --git a/tests/test_canary.py b/tests/test_canary.py index 4dcce37..fac6a40 100644 --- a/tests/test_canary.py +++ b/tests/test_canary.py @@ -91,24 +91,37 @@ def test_no_drift() -> None: assert canary.is_paused is False -def test_drift_triggers_pause() -> None: - """Drastically different canary ROI → drifted=True, paused=True.""" +def test_check_does_not_auto_pause() -> None: + """check() is pure measurement — never transitions to paused on its own.""" cfg = _cfg_with_baseline(BASELINE_FRAME) canary = Canary(cfg) result = canary.check(DRIFTED_FRAME) assert result.drifted is True - assert result.paused is True + assert result.paused is False # not committed + assert canary.is_paused is False + + +def test_drift_triggers_pause() -> None: + """check() detects drift; commit_pause() transitions state.""" + cfg = _cfg_with_baseline(BASELINE_FRAME) + canary = Canary(cfg) + + result = canary.check(DRIFTED_FRAME) + assert result.drifted is True + canary.commit_pause(result.distance) + assert canary.is_paused is True def test_persists_paused() -> None: - """After drift, feeding back a clean frame keeps paused=True.""" + """After commit_pause, feeding back a clean frame keeps paused=True.""" cfg = _cfg_with_baseline(BASELINE_FRAME) canary = Canary(cfg) - canary.check(DRIFTED_FRAME) # trigger pause + r1 = canary.check(DRIFTED_FRAME) + canary.commit_pause(r1.distance) result = canary.check(BASELINE_FRAME) # clean frame, but still paused assert result.paused is True @@ -120,7 +133,8 @@ def test_resume_clears() -> None: cfg = _cfg_with_baseline(BASELINE_FRAME) canary = Canary(cfg) - canary.check(DRIFTED_FRAME) # pause + r = canary.check(DRIFTED_FRAME) + canary.commit_pause(r.distance) canary.resume() assert canary.is_paused is False @@ -130,13 +144,15 @@ def test_resume_clears() -> None: def test_pause_file_written(tmp_path: Path) -> None: - """When pause_flag_path is provided, the file is created on drift.""" + """When pause_flag_path is provided, the file is created on commit_pause.""" flag = tmp_path / "paused.flag" cfg = _cfg_with_baseline(BASELINE_FRAME) canary = Canary(cfg, pause_flag_path=flag) assert not flag.exists() - canary.check(DRIFTED_FRAME) + r = canary.check(DRIFTED_FRAME) + assert not flag.exists() # check() alone does NOT write the flag + canary.commit_pause(r.distance) assert flag.exists() @@ -147,14 +163,32 @@ def test_canary_pause_callback_fires_once() -> None: canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d)) - canary.check(DRIFTED_FRAME) # transition → callback fires - canary.check(DRIFTED_FRAME) # still paused → no new callback - canary.check(BASELINE_FRAME) # clean but still paused → no new callback + r1 = canary.check(DRIFTED_FRAME) + canary.commit_pause(r1.distance) # transition → callback fires + canary.commit_pause(r1.distance) # idempotent → no new callback + r2 = canary.check(DRIFTED_FRAME) + canary.commit_pause(r2.distance) # still paused → no new callback + canary.check(BASELINE_FRAME) # clean but still paused → no new callback assert len(calls) == 1 assert calls[0] > 0 # distance should be positive +def test_commit_pause_idempotent() -> None: + """commit_pause is no-op when already paused — no flag re-write, no callback.""" + cfg = _cfg_with_baseline(BASELINE_FRAME) + calls: list[int] = [] + + canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d)) + + canary.commit_pause(100) + canary.commit_pause(200) + canary.commit_pause(300) + + assert len(calls) == 1 + assert calls[0] == 100 + + def test_canary_resume_allows_new_pause_notification() -> None: """After resume, a fresh drift must re-fire the callback.""" cfg = _cfg_with_baseline(BASELINE_FRAME) @@ -162,17 +196,19 @@ def test_canary_resume_allows_new_pause_notification() -> None: canary = Canary(cfg, on_pause_callback=lambda d: calls.append(d)) - canary.check(DRIFTED_FRAME) + r1 = canary.check(DRIFTED_FRAME) + canary.commit_pause(r1.distance) assert len(calls) == 1 canary.resume() - canary.check(DRIFTED_FRAME) # new pause transition + r2 = canary.check(DRIFTED_FRAME) + canary.commit_pause(r2.distance) # new pause transition assert len(calls) == 2 -def test_canary_pause_callback_exception_does_not_crash_check() -> None: - """A failing callback must not break canary.check (detection cycle safety).""" +def test_canary_pause_callback_exception_does_not_crash_commit_pause() -> None: + """A failing callback must not break commit_pause (detection cycle safety).""" cfg = _cfg_with_baseline(BASELINE_FRAME) def _boom(_d: int) -> None: @@ -181,8 +217,8 @@ def test_canary_pause_callback_exception_does_not_crash_check() -> None: canary = Canary(cfg, on_pause_callback=_boom) # Must not raise — exception is swallowed + logged. - result = canary.check(DRIFTED_FRAME) - assert result.paused is True + r = canary.check(DRIFTED_FRAME) + canary.commit_pause(r.distance) assert canary.is_paused is True @@ -192,7 +228,26 @@ def test_resume_deletes_pause_file(tmp_path: Path) -> None: cfg = _cfg_with_baseline(BASELINE_FRAME) canary = Canary(cfg, pause_flag_path=flag) - canary.check(DRIFTED_FRAME) + r = canary.check(DRIFTED_FRAME) + canary.commit_pause(r.distance) assert flag.exists() canary.resume() assert not flag.exists() + + +def test_rebase_updates_baseline_in_memory() -> None: + """rebase(new_h) mirrors hash into cfg.canary; subsequent check uses it.""" + cfg = _cfg_with_baseline(BASELINE_FRAME) + canary = Canary(cfg) + + # Compute the phash of the drifted frame; rebase to it. + drifted_hash = phash(crop_roi(DRIFTED_FRAME, CANARY_ROI)) + assert cfg.canary.baseline_phash != drifted_hash + + canary.rebase(drifted_hash) + + assert cfg.canary.baseline_phash == drifted_hash + # Now the drifted frame reads as clean. + result = canary.check(DRIFTED_FRAME) + assert result.drifted is False + assert result.paused is False diff --git a/tests/test_main.py b/tests/test_main.py index c3a2168..c42dc23 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -2089,3 +2089,379 @@ def test_commit_layout_change_alert_is_silent(monkeypatch): layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title] assert len(layout_alerts) == 1 assert layout_alerts[0].silent is True + + +# --------------------------------------------------------------------------- +# Auto-rebase on layout change (two-signal gate: drift + strip-count change) +# --------------------------------------------------------------------------- + +class _FakeCanary: + """Stand-in for atm.canary.Canary that lets tests drive drift state. + + Mirrors the new split API (check pure, commit_pause + rebase explicit). + """ + + def __init__(self, drift_distance: int = 0): + self.drift_distance = drift_distance + self.is_paused = False + self.commit_pause_calls: list[int] = [] + self.rebase_calls: list[str] = [] + self.resume_calls = 0 + + def check(self, _frame): + return types.SimpleNamespace( + distance=self.drift_distance, + drifted=self.drift_distance > 0, + paused=self.is_paused, + ) + + def commit_pause(self, distance: int) -> None: + if self.is_paused: + return + self.is_paused = True + self.commit_pause_calls.append(distance) + + def rebase(self, new_phash: str) -> None: + self.rebase_calls.append(new_phash) + + def resume(self) -> None: + self.is_paused = False + self.resume_calls += 1 + + +def _build_multi_tick_ctx(tmp_path, cfg, canary, initial_strips): + """Construct a RunContext suitable for driving _run_multi_tick. + + Returns (ctx, notifier_alerts, audit_events, detector_stubs). + Charts are seeded with stub Detectors that record dot_roi updates. + """ + import atm.main as _main + from atm.detector import DetectionResult + + notifier_alerts: list = [] + audit_events: list = [] + + class _Det: + def __init__(self): + self.last_roi = None + + def step(self, ts, frame=None): + return DetectionResult( + ts=ts, window_found=True, dot_found=False, + rgb=None, match=None, accepted=False, color=None, + ) + + def update_dot_roi(self, roi): + self.last_roi = roi + + class _FSM: + state = types.SimpleNamespace(value="IDLE") + _last_fire: dict = {} + + class _N: + def send(self, a): notifier_alerts.append(a) + + class _A: + def log(self, e): audit_events.append(e) + + class _S: + is_running = False + def start(self, s): pass + def stop(self): pass + + state_obj = _main._LoopState() + detectors = [_Det() for _ in initial_strips] + charts = [ + _main.ChartState( + chart_id=("left" if i == 0 else "right") if len(initial_strips) == 2 else "only", + sub_roi=strip, detector=detectors[i], fsm=_FSM(), + ) + for i, strip in enumerate(initial_strips) + ] + ctx = _main.RunContext( + cfg=cfg, capture=lambda: np.zeros((200, 1800, 3), dtype=np.uint8), + canary=canary, detector=MagicMock(), fsm=_FSM(), + notifier=_N(), audit=_A(), detection_log=_A(), + scheduler=_S(), samples_dir=tmp_path, fires_dir=tmp_path, + cmd_queue=MagicMock(), state=state_obj, + levels_extractor_factory=lambda *a, **kw: None, + lifecycle=None, + ) + ctx.charts = charts + return ctx, notifier_alerts, audit_events, detectors + + +def _make_minimal_cfg_for_multi_tick(): + """Minimal cfg that satisfies _run_multi_tick + _commit_layout_change paths.""" + from atm.config import ROI + cfg = MagicMock() + cfg.lockout_s = 60 + cfg.colors = {} + cfg.debounce_depth = 3 + cfg.confidence_min = 0.5 + cfg.dot_roi = ROI(x=0, y=0, w=600, h=35) + cfg.window_title = None + cfg.attach_screenshots = types.SimpleNamespace( + arm=False, prime=False, trigger=False, late_start=False, + catchup=False, opposite_rearm=False, rearm=False, phase_skip=False, + ) + cfg.config_version = "test-cfg" + cfg.canary = types.SimpleNamespace( + roi=ROI(x=0, y=0, w=50, h=50), + baseline_phash="0" * 64, + drift_threshold=8, + ) + return cfg + + +@pytest.mark.asyncio +async def test_multi_tick_drift_with_strip_count_change_auto_rebases(monkeypatch, tmp_path): + """Drift + strip count 2→1 → silent auto-rebase, no pause, single combined alert.""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + cfg = _make_minimal_cfg_for_multi_tick() + canary = _FakeCanary(drift_distance=156) + initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + new_strips = [ROI(x=0, y=0, w=900, h=35)] # 2 → 1 + ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( + tmp_path, cfg, canary, initial, + ) + + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) + rewrite_calls: list = [] + def _stub_rewrite(path, new_phash): + rewrite_calls.append((path, new_phash)) + return "OLD_PHASH" + monkeypatch.setattr(_main, "_rewrite_baseline_phash", _stub_rewrite) + + await _main._run_multi_tick(ctx) + + # Layout committed: charts shrank to 1. + assert len(ctx.charts) == 1 + assert ctx.charts[0].sub_roi == new_strips[0] + # Canary NOT paused (auto-rebase path). + assert canary.is_paused is False + assert canary.commit_pause_calls == [] + # rebase() called with the new phash that was also passed to _rewrite_baseline_phash. + assert len(canary.rebase_calls) == 1 + assert len(rewrite_calls) == 1 + new_phash = canary.rebase_calls[0] + assert rewrite_calls[0][1] == new_phash + # Audit: layout_change_with_rebase + standard layout_change (from _commit_layout_change). + rebase_events = [e for e in audit_events if e.get("event") == "layout_change_with_rebase"] + assert len(rebase_events) == 1 + assert rebase_events[0]["old_n"] == 2 + assert rebase_events[0]["new_n"] == 1 + assert rebase_events[0]["distance"] == 156 + assert rebase_events[0]["old_phash"] == "OLD_PHASH" + assert rebase_events[0]["new_phash"] == new_phash + # No "paused" audit event. + assert not any(e.get("event") == "paused" for e in audit_events) + # Exactly one combined alert. Generic layout_change alert was suppressed. + combined = [a for a in notifier_alerts if "auto-rebased" in a.title] + assert len(combined) == 1 + assert "2→1" in combined[0].title + plain_layout = [a for a in notifier_alerts + if "Layout TS schimbat" in a.title and "auto-rebased" not in a.title] + assert plain_layout == [] + + +@pytest.mark.asyncio +async def test_multi_tick_drift_with_same_strip_count_pauses(monkeypatch, tmp_path): + """Drift without strip count change → real drift → pause as before.""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + cfg = _make_minimal_cfg_for_multi_tick() + canary = _FakeCanary(drift_distance=156) + initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + same_count = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( + tmp_path, cfg, canary, initial, + ) + + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: same_count) + monkeypatch.setattr(_main, "_rewrite_baseline_phash", + lambda *a, **kw: pytest.fail("should not be called")) + + results = await _main._run_multi_tick(ctx) + + assert results == [] + assert canary.is_paused is True + assert canary.commit_pause_calls == [156] + assert len(ctx.charts) == 2 # unchanged + paused_events = [e for e in audit_events if e.get("event") == "paused"] + assert len(paused_events) == 1 + assert paused_events[0]["drift"] == 156 + assert canary.rebase_calls == [] + + +@pytest.mark.asyncio +async def test_multi_tick_drift_with_zero_strips_pauses(monkeypatch, tmp_path): + """Strip detection returns [] (chart blackout) → pause, do NOT auto-rebase.""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + cfg = _make_minimal_cfg_for_multi_tick() + canary = _FakeCanary(drift_distance=156) + initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + ctx, _alerts, audit_events, _ = _build_multi_tick_ctx( + tmp_path, cfg, canary, initial, + ) + + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: []) + monkeypatch.setattr(_main, "_rewrite_baseline_phash", + lambda *a, **kw: pytest.fail("should not be called")) + + results = await _main._run_multi_tick(ctx) + + assert results == [] + assert canary.is_paused is True + assert canary.commit_pause_calls == [156] + assert canary.rebase_calls == [] + assert any(e.get("event") == "paused" for e in audit_events) + + +@pytest.mark.asyncio +async def test_multi_tick_no_drift_strip_count_change_unchanged_path(monkeypatch, tmp_path): + """Strip count change without drift → existing _commit_layout_change path, + canary baseline untouched, silent layout-change alert.""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + cfg = _make_minimal_cfg_for_multi_tick() + canary = _FakeCanary(drift_distance=0) # no drift + initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + new_strips = [ROI(x=0, y=0, w=900, h=35)] + ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( + tmp_path, cfg, canary, initial, + ) + + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) + monkeypatch.setattr(_main, "_rewrite_baseline_phash", + lambda *a, **kw: pytest.fail("should not be called")) + + await _main._run_multi_tick(ctx) + + assert len(ctx.charts) == 1 + assert canary.rebase_calls == [] + # Standard layout_change alert (not suppressed). + layout_alerts = [a for a in notifier_alerts if "Layout TS schimbat" in a.title] + assert len(layout_alerts) == 1 + assert "auto-rebased" not in layout_alerts[0].title + assert any(e.get("event") == "layout_change" for e in audit_events) + assert not any(e.get("event") == "layout_change_with_rebase" for e in audit_events) + + +@pytest.mark.asyncio +async def test_multi_tick_auto_rebase_toml_failure_falls_back_to_pause(monkeypatch, tmp_path): + """When _rewrite_baseline_phash raises, fall back to commit_pause + audit.""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + cfg = _make_minimal_cfg_for_multi_tick() + canary = _FakeCanary(drift_distance=156) + initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + new_strips = [ROI(x=0, y=0, w=900, h=35)] + ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( + tmp_path, cfg, canary, initial, + ) + + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) + def _boom(_path, _new): + raise ValueError("expected exactly 1 baseline_phash line, found 0") + monkeypatch.setattr(_main, "_rewrite_baseline_phash", _boom) + + results = await _main._run_multi_tick(ctx) + + assert results == [] + assert canary.is_paused is True + assert canary.commit_pause_calls == [156] + # Charts NOT mutated. + assert len(ctx.charts) == 2 + fail_events = [e for e in audit_events if e.get("event") == "auto_rebase_failed"] + assert len(fail_events) == 1 + paused_events = [e for e in audit_events + if e.get("event") == "paused" and e.get("reason") == "auto_rebase_failed"] + assert len(paused_events) == 1 + + +@pytest.mark.asyncio +async def test_multi_tick_symmetric_1_to_2(monkeypatch, tmp_path): + """1→2 transition is symmetric: same auto-rebase + layout commit path.""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + cfg = _make_minimal_cfg_for_multi_tick() + canary = _FakeCanary(drift_distance=120) + initial = [ROI(x=0, y=0, w=900, h=35)] + new_strips = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + ctx, notifier_alerts, audit_events, _ = _build_multi_tick_ctx( + tmp_path, cfg, canary, initial, + ) + + monkeypatch.setattr(_main, "_detect_strips_for_ctx", lambda c, f: new_strips) + monkeypatch.setattr(_main, "_rewrite_baseline_phash", + lambda path, new_phash: "OLD") + + await _main._run_multi_tick(ctx) + + assert len(ctx.charts) == 2 + assert canary.is_paused is False + assert len(canary.rebase_calls) == 1 + rebase_events = [e for e in audit_events if e.get("event") == "layout_change_with_rebase"] + assert len(rebase_events) == 1 + assert rebase_events[0]["old_n"] == 1 + assert rebase_events[0]["new_n"] == 2 + combined = [a for a in notifier_alerts if "auto-rebased" in a.title] + assert len(combined) == 1 + assert "1→2" in combined[0].title + + +@pytest.mark.asyncio +async def test_multi_tick_already_paused_short_circuits(monkeypatch, tmp_path): + """is_paused=True at tick start → return [], no auto-rebase attempted.""" + import atm.main as _main + from atm.config import ROI + + monkeypatch.chdir(tmp_path) + + cfg = _make_minimal_cfg_for_multi_tick() + canary = _FakeCanary(drift_distance=156) + canary.is_paused = True # already paused from a prior tick + initial = [ROI(x=0, y=0, w=400, h=35), ROI(x=500, y=0, w=400, h=35)] + new_strips = [ROI(x=0, y=0, w=900, h=35)] + ctx, _alerts, audit_events, _ = _build_multi_tick_ctx( + tmp_path, cfg, canary, initial, + ) + + # Strip detection should NOT run on the already-paused short-circuit path. + detect_calls: list = [] + def _detect(c, f): + detect_calls.append((c, f)) + return new_strips + monkeypatch.setattr(_main, "_detect_strips_for_ctx", _detect) + monkeypatch.setattr(_main, "_rewrite_baseline_phash", + lambda *a, **kw: pytest.fail("should not be called")) + + results = await _main._run_multi_tick(ctx) + + assert results == [] + assert detect_calls == [] # short-circuited before strip detection + assert canary.rebase_calls == [] + assert canary.commit_pause_calls == [] # already paused; no new commit + assert len(ctx.charts) == 2 + assert any(e.get("event") == "paused" for e in audit_events)