Compare commits

...

2 Commits

Author SHA1 Message Date
8a1be979fe chore(calibration): 22 frame-uri auto-captured din sesiunea 2026-04-22
Output din live loop (auto-capture pe schimbare de culoare) pentru
sesiunea de azi. Filename = culoare detectată de FSM (poate fi greșită).
Următorul pas manual: review și label-uire în calibration_labels.json
pentru atm validate-calibration.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 22:59:21 +03:00
248ad6b10e feat(telegram): /rebase + /rebase confirm pentru re-anchor canary baseline
/rebase capturează + propune phash nou (screenshot adnotat cu red rect pe
canary.roi, old/new hash, distance, TTL 180s). /rebase confirm rescrie
baseline_phash în TOML-ul activ (regex line-match, păstrează comentariile),
mirror în cfg live via object.__setattr__ (CanaryRegion e frozen), clear
user_paused + drift_paused într-un singur shot — similar /resume.

Fix adiacent: _dispatch_ctx / _mock_config_class setează cfg.window_title=None
explicit; 5 teste _dispatch_command pre-existente eșuau pe MagicMock auto-
truthy care propaga în _focus_window_by_title.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 22:56:51 +03:00
27 changed files with 465 additions and 2 deletions

View File

@@ -28,7 +28,9 @@ Workflow după sesiune: review frame-urile noi din `frames/`, adaugi entry-uri
## Telegram commands (live)
`/ss` `/status` `/pause` `/resume` `/3` (interval min) `/stop`
`/ss` `/status` `/pause` `/resume` `/rebase` `/3` (interval min) `/stop`
- `/rebase` — propune un `baseline_phash` nou pentru canary: capturează frame, crop pe `canary.roi`, phash → trimite screenshot adnotat (cerc roșu pe ROI) cu old/new hash + distance. `/rebase confirm` în ≤180s aplică: rescrie `baseline_phash` în TOML-ul activ (păstrează comentariile), mirror în `cfg` la runtime, clear `user_paused` + `drift_paused`. Fără confirm, nimic nu se modifică. Folosește-l când layout-ul TS s-a schimbat intenționat și vrei să re-ancorezi canary-ul fără `atm calibrate` full.
- `/ss` — verify multi-bulină: adnotează top-3 buline din `dot_roi` (cerc roșu gros pe pick-ul FSM, cercuri colorate subțiri pe vecini) + caption cu clasificarea fiecăreia (nume, RGB, distanță, confidence) + `config: {version}`. Cercul colorat folosește `cfg.colors[name].rgb` la runtime — DRY cu paleta activă.
- `/resume` clears BOTH user pause and canary drift-pause in one shot (`/resume force` still accepted as legacy alias). Trimite un singur Alert cu screenshot adnotat inline (capture rulează **înainte** de clearing state → zero race cu FSM tick-uri). Dacă capture eșuează, title conține `⚠️ captură eșuată` și resume-ul se execută oricum.

Binary file not shown.

After

Width:  |  Height:  |  Size: 237 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 263 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 252 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 265 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 265 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 266 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 265 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 262 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 263 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 219 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 223 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 228 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 258 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 228 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 458 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 378 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 248 KiB

View File

@@ -17,7 +17,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume", "help"]
CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume", "rebase", "help"]
_BASE = "https://api.telegram.org/bot{token}/{method}"
@@ -163,6 +163,11 @@ class TelegramPoller:
if t == "resume force":
# value=1 signals force: also lift canary drift-pause, not just user pause.
return Command(action="resume", value=1)
if t == "rebase":
return Command(action="rebase")
if t == "rebase confirm":
# value=1 applies the pending proposal; plain "rebase" captures+proposes.
return Command(action="rebase", value=1)
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
parts = t.split()
if len(parts) == 1 and parts[0].isdigit():

View File

@@ -579,6 +579,65 @@ def _draw_roi_cyan(annotated, cfg) -> None:
cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 255), 2)
_BASELINE_PHASH_LINE = __import__("re").compile(
r'^(\s*baseline_phash\s*=\s*")([0-9a-fA-F]+)(")[^\n]*$',
__import__("re").MULTILINE,
)
def _rewrite_baseline_phash(config_path: Path, new_phash: str) -> str:
"""Replace baseline_phash in a TOML config file. Returns the old value.
Uses regex line-match to preserve comments and surrounding formatting.
Raises ValueError if the file has 0 or >1 baseline_phash lines.
"""
text = config_path.read_text(encoding="utf-8")
matches = list(_BASELINE_PHASH_LINE.finditer(text))
if len(matches) != 1:
raise ValueError(
f"{config_path}: expected exactly 1 baseline_phash line, found {len(matches)}"
)
m = matches[0]
old_phash = m.group(2)
new_text = text[: m.start(2)] + new_phash + text[m.end(2) :]
config_path.write_text(new_text, encoding="utf-8")
return old_phash
def _save_rebase_frame(
frame, cfg, fires_dir: Path, now: float, audit: "_AuditLike | None" = None,
) -> "Path | None":
"""Annotate frame with a red rectangle around canary.roi + save to fires_dir.
Purpose: let the user eyeball exactly what pixels will be hashed before
confirming /rebase. Fail-safe: any error → None (alert still sends text-only).
"""
try:
import cv2 # type: ignore[import-untyped]
except ImportError as exc:
if audit is not None:
try:
audit.log({"ts": now, "event": "snapshot_fail", "label": "rebase", "error": f"cv2 missing: {exc}"})
except Exception:
pass
return None
try:
ts_str = datetime.fromtimestamp(now).strftime("%Y%m%d_%H%M%S")
fpath = fires_dir / f"{ts_str}_rebase.png"
annotated = frame.copy()
r = cfg.canary.roi
cv2.rectangle(annotated, (r.x, r.y), (r.x + r.w, r.y + r.h), (0, 0, 255), 2)
cv2.imwrite(str(fpath), annotated)
return fpath
except Exception as exc:
if audit is not None:
try:
audit.log({"ts": now, "event": "snapshot_fail", "label": "rebase", "error": str(exc)})
except Exception:
pass
return None
def _save_inspect_frame(
frame,
cfg,
@@ -1014,6 +1073,9 @@ class RunContext:
state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start
levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now)
lifecycle: Any = None # LifecycleState — window + user_paused tracking
# Pending /rebase proposal: (proposed_ts, new_phash, config_path) or None.
# One slot; a second /rebase overwrites. `/rebase confirm` applies if within TTL.
pending_rebase: Any = None
@dataclass
@@ -1454,12 +1516,15 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
kind="screenshot", title=title_r,
body="\n\n".join(body_parts), image_path=path_r,
))
elif cmd.action == "rebase":
await _dispatch_rebase(ctx, cmd)
elif cmd.action == "help":
body = (
"/status — stare FSM, uptime, ultima detecție\n"
"/ss — screenshot acum\n"
"/pause — oprește detecția (heartbeat continuă)\n"
"/resume — reia detecția (șterge user-pause și drift-pause)\n"
"/rebase — propune phash nou pentru canary (confirm cu /rebase confirm)\n"
"/3 — screenshot automat la fiecare 3 min (sau orice număr)\n"
"/stop — oprește screenshot-urile automate\n"
"/h — acest mesaj"
@@ -1467,6 +1532,148 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
ctx.notifier.send(Alert(kind="status", title="Comenzi ATM", body=body))
_REBASE_TTL_S = 180.0
async def _dispatch_rebase(ctx: RunContext, cmd) -> None:
"""/rebase flow: propose (no value) captures+hashes, /rebase confirm (value=1) applies.
Propose stores (ts, new_phash, config_path) on ctx.pending_rebase. Confirm
rewrites the active config's baseline_phash, clears user + drift pauses,
mirrors the new phash into the in-memory cfg. TTL = 180s.
"""
cfg = ctx.cfg
confirm = cmd.value == 1
if not confirm:
now_r = time.time()
if cfg.window_title:
title = await asyncio.to_thread(_focus_window_by_title, cfg.window_title)
ctx.audit.log({"ts": now_r, "event": "window_focused", "command": "rebase", "title": title})
await asyncio.sleep(0.15)
frame = await asyncio.to_thread(ctx.capture)
if frame is None:
ctx.notifier.send(Alert(
kind="warn",
title="Captură eșuată — /rebase anulat",
body="Adu TradeStation în față și reîncearcă.",
))
return
from atm.vision import crop_roi, hamming_hex, phash as _phash
try:
roi_crop = crop_roi(frame, cfg.canary.roi)
except Exception as exc:
ctx.notifier.send(Alert(
kind="warn",
title="Crop canary ROI eșuat",
body=f"{exc}",
))
return
new_phash = _phash(roi_crop)
old_phash = cfg.canary.baseline_phash
distance = hamming_hex(old_phash, new_phash)
config_name = getattr(cfg, "config_version", None) or "unknown"
if config_name in ("", "unknown"):
ctx.notifier.send(Alert(
kind="warn",
title="Rebase imposibil — config necunoscut",
body="cfg.config_version lipsă; rulează `atm calibrate` în loc.",
))
return
config_path = Path("configs") / f"{config_name}.toml"
ctx.pending_rebase = (now_r, new_phash, config_path)
path_r = await asyncio.to_thread(
_save_rebase_frame, frame, cfg, ctx.fires_dir, now_r, ctx.audit,
)
ctx.audit.log({
"ts": now_r, "event": "rebase_proposed",
"old_phash": old_phash, "new_phash": new_phash,
"distance": distance, "config": config_name,
"screenshot_path": str(path_r) if path_r else None,
})
body = (
f"old: {old_phash[:16]}\n"
f"new: {new_phash[:16]}\n"
f"distance: {distance} (threshold={cfg.canary.drift_threshold})\n"
f"config: {config_name}\n\n"
f"Trimite /rebase confirm în {int(_REBASE_TTL_S)}s pentru a aplica."
)
ctx.notifier.send(Alert(
kind="screenshot",
title="Canary rebase — confirmi?",
body=body,
image_path=path_r,
))
return
# confirm path
pending = ctx.pending_rebase
now_r = time.time()
if pending is None:
ctx.notifier.send(Alert(
kind="warn",
title="Nimic de confirmat",
body="Trimite mai întâi /rebase pentru a propune un phash nou.",
))
return
proposed_ts, new_phash, config_path = pending
if now_r - proposed_ts > _REBASE_TTL_S:
ctx.pending_rebase = None
ctx.notifier.send(Alert(
kind="warn",
title="Rebase expirat",
body=f"Propunerea a depășit {int(_REBASE_TTL_S)}s. Trimite /rebase din nou.",
))
return
try:
old_phash = await asyncio.to_thread(_rewrite_baseline_phash, config_path, new_phash)
except Exception as exc:
ctx.audit.log({
"ts": now_r, "event": "rebase_failed",
"error": str(exc), "config": str(config_path),
})
ctx.notifier.send(Alert(
kind="warn",
title="Rebase eșuat — config nemodificat",
body=f"{exc}",
))
ctx.pending_rebase = None
return
# CanaryRegion is a frozen dataclass — bypass the freeze to mirror the new
# hash into the live cfg so the very next canary.check() uses it. Without
# this, the TOML is updated but in-memory canary.baseline_phash stays stale
# and drift re-pauses immediately after /rebase confirm.
object.__setattr__(cfg.canary, "baseline_phash", new_phash)
was_drift = bool(getattr(ctx.canary, "is_paused", False))
was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False
if ctx.lifecycle is not None:
ctx.lifecycle.user_paused = False
if was_drift:
ctx.canary.resume()
ctx.pending_rebase = None
ctx.audit.log({
"ts": now_r, "event": "rebase_applied",
"old_phash": old_phash, "new_phash": new_phash,
"config": str(config_path),
"was_drift": was_drift, "was_user": was_user,
})
ctx.notifier.send(Alert(
kind="status",
title="Canary rebase aplicat",
body=(
f"new: {new_phash[:16]}\n"
f"config: {config_path.name}\n"
f"Detecția repornită."
),
))
async def _drain_cmd_queue(ctx: RunContext) -> None:
"""Drain all pending commands, isolating each dispatch in try/except.

View File

@@ -36,6 +36,20 @@ def test_parse_resume_force():
assert cmd.value == 1
def test_parse_rebase_plain():
p = _make_poller()
assert p._parse_command("rebase") == Command(action="rebase")
assert p._parse_command("/rebase") == Command(action="rebase")
def test_parse_rebase_confirm():
p = _make_poller()
cmd = p._parse_command("rebase confirm")
assert cmd is not None
assert cmd.action == "rebase"
assert cmd.value == 1
def test_parse_help():
p = _make_poller()
assert p._parse_command("h") == Command(action="help")

View File

@@ -30,6 +30,9 @@ def _mock_config_class(cfg=None):
"""Return a Config-like class whose load_current() returns *cfg*."""
if cfg is None:
cfg = MagicMock()
# window_title must be a real falsy value, not a MagicMock auto-attribute;
# otherwise _cmd_run enters _focus_window_by_title and TypeErrors.
cfg.window_title = None
mock_cls = MagicMock()
mock_cls.load_current.return_value = cfg
return mock_cls
@@ -783,6 +786,9 @@ def _dispatch_ctx(canary=None, lifecycle=None, cfg=None):
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
# Skip window focus in dispatch tests — MagicMock window_title would
# propagate into _focus_window_by_title (real Win32 call).
cfg.window_title = None
state = _main._LoopState(start=0.0)
ctx = _main.RunContext(
@@ -896,6 +902,234 @@ async def test_resume_force_alias_still_works():
assert resumed and resumed[0]["force"] is True
# ---------------------------------------------------------------------------
# /rebase: propose + confirm flow
# ---------------------------------------------------------------------------
def _rebase_cfg_and_path(tmp_path):
"""Write a minimal TOML with baseline_phash, return (cfg_live, path, cfg_version)."""
name = "rebase_test"
path = tmp_path / "configs" / f"{name}.toml"
path.parent.mkdir(parents=True, exist_ok=True)
old = "a" * 64
path.write_text(
f'[canary]\n'
f'baseline_phash = "{old}" # comment stays\n'
f'drift_threshold = 8\n'
f'[canary.roi]\nx = 0\ny = 0\nw = 4\nh = 4\n',
encoding="utf-8",
)
cfg = types.SimpleNamespace(
window_title=None,
config_version=name,
canary=types.SimpleNamespace(
roi=types.SimpleNamespace(x=0, y=0, w=4, h=4),
baseline_phash=old,
drift_threshold=8,
),
telegram=types.SimpleNamespace(auto_poll_interval_s=180),
operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None),
)
return cfg, path, old
def _blue_frame(h=200, w=200):
import numpy as _np
return _np.full((h, w, 3), (50, 80, 20), dtype=_np.uint8)
def test_rewrite_baseline_phash_updates_only_target_line(tmp_path):
import atm.main as _main
p = tmp_path / "cfg.toml"
p.write_text(
'[canary]\n'
'baseline_phash = "deadbeef" # keep this comment\n'
'drift_threshold = 8\n',
encoding="utf-8",
)
old = _main._rewrite_baseline_phash(p, "cafef00d")
assert old == "deadbeef"
txt = p.read_text(encoding="utf-8")
assert 'baseline_phash = "cafef00d" # keep this comment' in txt
assert "drift_threshold = 8" in txt
def test_rewrite_baseline_phash_raises_when_missing(tmp_path):
import atm.main as _main
p = tmp_path / "cfg.toml"
p.write_text("[canary]\ndrift_threshold = 8\n", encoding="utf-8")
with pytest.raises(ValueError):
_main._rewrite_baseline_phash(p, "ff")
@pytest.mark.asyncio
async def test_rebase_propose_sets_pending_and_does_not_touch_file(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.capture = lambda: _blue_frame()
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
assert ctx.pending_rebase is not None
proposed_ts, new_phash, pending_path = ctx.pending_rebase
assert new_phash != old
assert pending_path == Path("configs") / f"{cfg.config_version}.toml"
# File unchanged at propose time
assert f'"{old}"' in cfg_path.read_text(encoding="utf-8")
events = [e["event"] for e in ctx.audit.events]
assert "rebase_proposed" in events
@pytest.mark.asyncio
async def test_rebase_confirm_applies_and_clears_pauses(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path)
canary = _Canary()
ctx = _dispatch_ctx(cfg=cfg, canary=canary)
ctx.capture = lambda: _blue_frame()
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
ctx.lifecycle.user_paused = True
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
assert ctx.pending_rebase is not None
_, new_phash, _ = ctx.pending_rebase
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
assert ctx.pending_rebase is None
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
assert cfg.canary.baseline_phash == new_phash
txt = cfg_path.read_text(encoding="utf-8")
assert f'"{new_phash}"' in txt
assert f'"{old}"' not in txt
applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"]
assert applied and applied[0]["old_phash"] == old and applied[0]["new_phash"] == new_phash
@pytest.mark.asyncio
async def test_rebase_confirm_without_pending_warns(tmp_path):
import atm.main as _main
from atm.commands import Command
cfg, _, _ = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
assert ctx.pending_rebase is None
alerts = ctx.notifier.alerts
assert alerts and "nimic" in alerts[0].title.lower()
applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"]
assert not applied
@pytest.mark.asyncio
async def test_rebase_confirm_expired_pending_warns(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
cfg, cfg_path, old = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
# Pretend propose happened 1h ago
import time as _t
ctx.pending_rebase = (_t.time() - 3600, "b" * 64, cfg_path)
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
assert ctx.pending_rebase is None
# File untouched
assert f'"{old}"' in cfg_path.read_text(encoding="utf-8")
alerts = ctx.notifier.alerts
assert alerts and "expirat" in alerts[0].title.lower()
@pytest.mark.asyncio
async def test_rebase_confirm_mutates_frozen_canary_region(tmp_path, monkeypatch):
"""Regression: CanaryRegion is @dataclass(frozen=True). Plain assignment
raises FrozenInstanceError. /rebase confirm must mirror the new phash into
the live cfg anyway — otherwise canary.check() keeps the stale hash and
drift re-pauses within one tick after the user confirms."""
import atm.main as _main
from atm.commands import Command
from atm.config import CanaryRegion, ROI as _ROI
class _Canary:
def __init__(self): self._p = False
@property
def is_paused(self): return self._p
def resume(self): self._p = False
# Build cfg with the REAL frozen dataclass, not a SimpleNamespace.
_, cfg_path, old = _rebase_cfg_and_path(tmp_path)
cfg = types.SimpleNamespace(
window_title=None,
config_version="rebase_test",
canary=CanaryRegion(
roi=_ROI(x=0, y=0, w=4, h=4),
baseline_phash=old,
drift_threshold=8,
),
telegram=types.SimpleNamespace(auto_poll_interval_s=180),
operating_hours=types.SimpleNamespace(enabled=False, _tz_cache=None),
)
ctx = _dispatch_ctx(cfg=cfg, canary=_Canary())
ctx.capture = lambda: _blue_frame()
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
_, new_phash, _ = ctx.pending_rebase
await _main._dispatch_command(ctx, Command(action="rebase", value=1))
# Live cfg mirrors the new hash (in-memory canary.check() sees it)
assert cfg.canary.baseline_phash == new_phash
# TOML on disk also updated
assert f'"{new_phash}"' in cfg_path.read_text(encoding="utf-8")
applied = [e for e in ctx.audit.events if e.get("event") == "rebase_applied"]
assert applied
@pytest.mark.asyncio
async def test_rebase_propose_capture_failed_warns(tmp_path, monkeypatch):
import atm.main as _main
from atm.commands import Command
cfg, _, _ = _rebase_cfg_and_path(tmp_path)
ctx = _dispatch_ctx(cfg=cfg)
ctx.capture = lambda: None
ctx.fires_dir = tmp_path / "fires"
ctx.fires_dir.mkdir()
monkeypatch.chdir(tmp_path)
await _main._dispatch_command(ctx, Command(action="rebase"))
assert ctx.pending_rebase is None
alerts = ctx.notifier.alerts
assert alerts and "e" in alerts[0].title.lower() # "Captură eșuată"
@pytest.mark.asyncio
async def test_resume_out_of_window_responds_with_pending_message():
"""/resume while operating-hours window is closed → special body."""
@@ -1214,6 +1448,7 @@ async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path)
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
cfg.window_title = None # skip Win32 focus path in unit test
ctx = _dispatch_ctx(canary=canary, cfg=cfg)