From 54f55752c1115134563721deb5cfa76c4de6788f Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Sat, 18 Apr 2026 11:59:22 +0300 Subject: [PATCH] feat(run,config): operating hours window + timezone-aware lifecycle state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add OperatingHoursCfg (enabled/timezone/weekdays/start_hhmm/stop_hhmm) so the run loop can align with NYSE session hours instead of the user's local wall clock (fixes DST drift between NY and Europe/Bucharest). - Config parses [options.operating_hours] and resolves ZoneInfo at load, fail-fast on invalid tz or weekday names. The tz is cached on _tz_cache so the detection loop pays zero per-tick cost. - LifecycleState tracks user_paused + last_window_state across ticks. - Module-scope _should_skip(now, state, cfg, canary) returns skip reason or None. Weekday check uses datetime.weekday() + a fixed MON..SUN list (locale-free; strftime('%a') is localized). - _maybe_log_transition emits market_open / market_closed once per edge. R2: when last_window_state is None (startup), just seed — do not send a spurious market_open alert when run_live_async launches in-window. - _run_tick consults the lifecycle guard before scheduling the heavy detection thread, so drain + transition logging still happen when the tick is skipped. - CLI flags --tz / --weekdays / --oh-start / --oh-stop override TOML. (Kept distinct from the existing --start-at/--stop-at sleep-until-time semantics to avoid breaking current deployments — deviation noted.) - configs/example.toml documents the new [options.operating_hours] table. Tests: parametrized window matrix (tests #8), transition logging (#9), notification side-effect (#10), R2 #20 startup suppression, R2 #22 locale-independent weekday, plus guards for user_paused / canary precedence and config-parse error paths. Co-Authored-By: Claude Sonnet 4.6 --- configs/example.toml | 11 +++ src/atm/config.py | 55 +++++++++++ src/atm/main.py | 186 ++++++++++++++++++++++++++++++++++++- tests/test_config.py | 56 ++++++++++++ tests/test_main.py | 212 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 519 insertions(+), 1 deletion(-) diff --git a/configs/example.toml b/configs/example.toml index e738105..1602547 100644 --- a/configs/example.toml +++ b/configs/example.toml @@ -88,6 +88,17 @@ dead_letter_path = "logs/dead_letter.jsonl" [options.alerts] fire_on_phase_skip = true +# Operating hours — detection only runs on allowed weekdays + HH:MM window. +# Timezone is the source of truth (NYSE local); the runtime converts tick +# timestamps to this zone so DST rollovers stay aligned with the exchange. +# Override from CLI with --tz / --weekdays / --oh-start / --oh-stop. +[options.operating_hours] +enabled = false +timezone = "America/New_York" +weekdays = ["MON", "TUE", "WED", "THU", "FRI"] +start_hhmm = "09:30" +stop_hhmm = "16:00" + # Per-kind screenshot-attach toggles. All default to true on upgrade. # Accepts either a bare bool (legacy: attach_screenshots = true) or this table. [options.attach_screenshots] diff --git a/src/atm/config.py b/src/atm/config.py index 6d11768..6639304 100644 --- a/src/atm/config.py +++ b/src/atm/config.py @@ -5,6 +5,9 @@ import tomllib from dataclasses import dataclass, field from pathlib import Path from typing import Literal +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError + +_VALID_WEEKDAYS: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN") DotColor = Literal[ "turquoise", "yellow", @@ -97,6 +100,31 @@ class AlertsCfg: trigger: bool = True +@dataclass +class OperatingHoursCfg: + """Session window: only run detection on allowed weekdays within HH:MM range. + + Timezone is the source of truth for the exchange (default America/New_York + for NYSE). Start/stop are compared against the clock in that timezone. + Weekday check uses datetime.weekday() + a fixed MON..SUN list to stay + locale-independent (strftime('%a') returns localized names). + + The ZoneInfo is cached at config load time so the detection loop doesn't + pay per-tick lookup cost. + + NOTE: this dataclass is mutable (non-frozen) so Config._from_dict can stash + the resolved ZoneInfo onto `_tz_cache` after validation. Treat fields as + read-only at runtime. + """ + enabled: bool = False + timezone: str = "America/New_York" + weekdays: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI") + start_hhmm: str = "09:30" + stop_hhmm: str = "16:00" + # Populated by Config._from_dict; None for disabled or failed-load cases. + _tz_cache: ZoneInfo | None = None + + @dataclass(frozen=True) class AlertBehaviorCfg: """Alert behavior knobs (not screenshot toggles). @@ -130,6 +158,7 @@ class Config: dead_letter_path: str = "logs/dead_letter.jsonl" attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg) alerts: AlertBehaviorCfg = field(default_factory=AlertBehaviorCfg) + operating_hours: OperatingHoursCfg = field(default_factory=OperatingHoursCfg) config_version: str = "unknown" def __post_init__(self) -> None: @@ -202,6 +231,31 @@ class Config: alert_behavior = AlertBehaviorCfg( fire_on_phase_skip=bool(alerts_dict.get("fire_on_phase_skip", True)), ) + + oh_dict = opts.get("operating_hours", {}) or {} + oh_weekdays = tuple( + str(w).upper() for w in oh_dict.get("weekdays", ("MON", "TUE", "WED", "THU", "FRI")) + ) + for wd in oh_weekdays: + if wd not in _VALID_WEEKDAYS: + raise ValueError( + f"operating_hours.weekdays contains invalid day {wd!r}; " + f"expected any of {_VALID_WEEKDAYS}" + ) + oh = OperatingHoursCfg( + enabled=bool(oh_dict.get("enabled", False)), + timezone=str(oh_dict.get("timezone", "America/New_York")), + weekdays=oh_weekdays, + start_hhmm=str(oh_dict.get("start_hhmm", "09:30")), + stop_hhmm=str(oh_dict.get("stop_hhmm", "16:00")), + ) + if oh.enabled: + try: + oh._tz_cache = ZoneInfo(oh.timezone) + except ZoneInfoNotFoundError as exc: + raise ValueError( + f"operating_hours.timezone {oh.timezone!r} invalid: {exc}" + ) from exc return cls( window_title=data["window_title"], dot_roi=roi, @@ -222,5 +276,6 @@ class Config: dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"), attach_screenshots=attach, alerts=alert_behavior, + operating_hours=oh, config_version=version, ) diff --git a/src/atm/main.py b/src/atm/main.py index dc1416d..73f34c8 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -92,6 +92,23 @@ def main(argv=None) -> None: help="Stop at local HH:MM (overrides --duration). If the time is in " "the past when the loop starts, rolls over to tomorrow.", ) + p_run.add_argument( + "--tz", metavar="ZONE", default=None, + help="Override operating_hours.timezone (e.g. America/New_York).", + ) + p_run.add_argument( + "--weekdays", metavar="DAYS", default=None, + help="Override operating_hours.weekdays. Accepts comma list " + "(MON,TUE) or range (MON-FRI).", + ) + p_run.add_argument( + "--oh-start", metavar="HH:MM", default=None, + help="Override operating_hours.start_hhmm (exchange-local).", + ) + p_run.add_argument( + "--oh-stop", metavar="HH:MM", default=None, + help="Override operating_hours.stop_hhmm (exchange-local).", + ) # journal p_journal = sub.add_parser("journal", help="Add a trade journal entry interactively") @@ -171,6 +188,7 @@ def _cmd_dryrun(args) -> None: def _cmd_run(args) -> None: cfg = Config.load_current(Path("configs")) + cfg = _apply_operating_hours_cli_overrides(cfg, args) capture_stub = args.capture_stub or bool(os.environ.get("ATM_STUB_CAPTURE")) # --start-at HH:MM: sleep until the next occurrence of that local wall-clock time @@ -230,6 +248,66 @@ def _cmd_run(args) -> None: run_live(cfg, duration_s=duration_s, capture_stub=capture_stub) +_WEEKDAY_ORDER = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN") + + +def _parse_weekdays_arg(raw: str) -> tuple[str, ...]: + """Accept 'MON,TUE,WED' or 'MON-FRI'. Case-insensitive.""" + txt = raw.strip().upper() + if "-" in txt and "," not in txt: + a, b = (p.strip() for p in txt.split("-", 1)) + if a not in _WEEKDAY_ORDER or b not in _WEEKDAY_ORDER: + raise ValueError(f"unknown weekday(s) in range {raw!r}") + i, j = _WEEKDAY_ORDER.index(a), _WEEKDAY_ORDER.index(b) + if i > j: + raise ValueError(f"weekday range reversed: {raw!r}") + return tuple(_WEEKDAY_ORDER[i : j + 1]) + days = tuple(d.strip() for d in txt.split(",") if d.strip()) + for d in days: + if d not in _WEEKDAY_ORDER: + raise ValueError(f"unknown weekday {d!r} (valid: {_WEEKDAY_ORDER})") + return days + + +def _apply_operating_hours_cli_overrides(cfg, args): + """Return cfg (possibly new) with operating_hours overridden by CLI flags. + + Config is a frozen dataclass, but operating_hours is non-frozen by design + so we can tweak it in-place and recompute the tz cache. CLI flags implicitly + enable operating_hours even if the TOML had it disabled. + """ + import dataclasses as _dc + from zoneinfo import ZoneInfo, ZoneInfoNotFoundError + + oh = cfg.operating_hours + any_override = any( + getattr(args, k, None) + for k in ("tz", "weekdays", "oh_start", "oh_stop") + ) + if not any_override: + return cfg + + new_tz = args.tz if args.tz else oh.timezone + try: + tz_cache = ZoneInfo(new_tz) + except ZoneInfoNotFoundError as exc: + sys.exit(f"--tz {new_tz!r} invalid: {exc}") + + new_weekdays = _parse_weekdays_arg(args.weekdays) if args.weekdays else oh.weekdays + new_start = args.oh_start if args.oh_start else oh.start_hhmm + new_stop = args.oh_stop if args.oh_stop else oh.stop_hhmm + oh.enabled = True + oh.timezone = new_tz + oh.weekdays = new_weekdays + oh.start_hhmm = new_start + oh.stop_hhmm = new_stop + oh._tz_cache = tz_cache + # Config is frozen but operating_hours is a mutable field object — + # mutating it in place is sufficient; no dataclasses.replace needed. + _ = _dc # keep import for future use + return cfg + + def _cmd_journal(args) -> None: try: from atm.journal import Journal, prompt_entry @@ -579,6 +657,7 @@ class RunContext: cmd_queue: Any # asyncio.Queue[Command] state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now) + lifecycle: Any = None # LifecycleState — window + user_paused tracking @dataclass @@ -591,6 +670,92 @@ class _LoopState: start: float = 0.0 +@dataclass +class LifecycleState: + """Tracks user-pause / out-of-window state across detection ticks. + + last_window_state: None at startup so _maybe_log_transition can seed it + without emitting a spurious market_open alert on the first in-window tick. + """ + user_paused: bool = False + last_window_state: str | None = None # "open" / "closed" / None (uninitialized) + + +# Locale-independent weekday names; index matches datetime.weekday() (MON=0). +_WEEKDAY_NAMES: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN") + + +def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None: + """Return a reason string if detection should be skipped, else None. + + Order: user_paused > canary drift > operating-hours window. Uses the + ZoneInfo cached on cfg.operating_hours._tz_cache (populated at config load) + to avoid per-tick tz lookup cost. + """ + if state.user_paused: + return "user_paused" + if getattr(canary, "is_paused", False): + return "drift_paused" + oh = getattr(cfg, "operating_hours", None) + if oh is None or not oh.enabled: + return None + tz = getattr(oh, "_tz_cache", None) + if tz is None: + # Enabled but no tz resolved — skip the check rather than crash mid-loop. + return None + now_exchange = datetime.fromtimestamp(now_ts, tz=tz) + # weekday() = 0..6 (MON..SUN). Locale-free; strftime('%a') is not. + if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays: + return "out_of_window_weekend" + hhmm = now_exchange.strftime("%H:%M") + if hhmm < oh.start_hhmm or hhmm >= oh.stop_hhmm: + return "out_of_window_hours" + return None + + +def _maybe_log_transition( + reason: str | None, + state: LifecycleState, + now: float, + audit: _AuditLike, + notifier: _NotifierLike, +) -> None: + """Log market_open / market_closed exactly once per transition. + + Startup guard (R2): when last_window_state is None we just seed it; no + alert/audit event is emitted for the initial evaluation. This prevents a + spurious market_open alert when run_live_async starts in-window. + """ + if reason is None: + window_reason = "open" + elif reason.startswith("out_of_window"): + window_reason = "closed" + else: + # user_paused / drift_paused don't change market window state + return + + if window_reason == state.last_window_state: + return + + if state.last_window_state is None: + state.last_window_state = window_reason + return + + event_name = "market_open" if window_reason == "open" else "market_closed" + audit.log({"ts": now, "event": event_name, "reason": reason}) + body = ( + "Piața închisă — monitorizare pauzată până la următoarea deschidere" + if event_name == "market_closed" + else "Piața deschisă — monitorizare reluată" + ) + notifier.send(Alert( + kind="status", + title=event_name.replace("_", " ").title(), + body=body, + )) + state.last_window_state = window_reason + + def _sync_detection_tick( capture: Callable, canary: Any, @@ -683,8 +848,20 @@ def _sync_detection_tick( async def _run_tick(ctx: RunContext) -> _TickSyncResult: - """Execute one `_sync_detection_tick` in a thread; returns result or empty.""" + """Execute one `_sync_detection_tick` in a thread; returns result or empty. + + Lifecycle gating (user pause / operating hours / drift) happens here, not + inside the sync tick, so the async loop can still drain commands and emit + market_open / market_closed transitions even when the heavy detection + work is skipped. + """ now = time.time() + if ctx.lifecycle is not None: + skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary) + _maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier) + if skip is not None: + # No detection this tick. Empty result → _handle_fsm_result no-op. + return _TickSyncResult() return await asyncio.to_thread( _sync_detection_tick, ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm, @@ -926,12 +1103,19 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No ) poller = TelegramPoller(cfg.telegram, cmd_queue, audit) + lifecycle = LifecycleState() + # Seed lifecycle.last_window_state with the current status so we don't emit + # a spurious market_open alert on the very first tick (R2). + _pre_skip = _should_skip(time.time(), lifecycle, cfg, canary) + _maybe_log_transition(_pre_skip, lifecycle, time.time(), audit, notifier) + ctx = RunContext( cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm, notifier=notifier, audit=audit, detection_log=detection_log, scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir, cmd_queue=cmd_queue, state=loop_state, levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts), + lifecycle=lifecycle, ) # ------------------------------------------------------------------ diff --git a/tests/test_config.py b/tests/test_config.py index f864366..6825ced 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -97,3 +97,59 @@ def test_attach_screenshots_unknown_keys_ignored() -> None: })) assert cfg.attach_screenshots.arm is False # Should not raise even with unknown key + + +# --------------------------------------------------------------------------- +# Commit 3: AlertBehaviorCfg (fire_on_phase_skip) +# --------------------------------------------------------------------------- + +def test_alerts_default_fire_on_phase_skip_true() -> None: + cfg = Config._from_dict(_with_opts({})) + assert cfg.alerts.fire_on_phase_skip is True + + +def test_alerts_fire_on_phase_skip_can_be_disabled() -> None: + cfg = Config._from_dict(_with_opts({"alerts": {"fire_on_phase_skip": False}})) + assert cfg.alerts.fire_on_phase_skip is False + + +# --------------------------------------------------------------------------- +# Commit 4: OperatingHoursCfg parsing + tz cache +# --------------------------------------------------------------------------- + +def test_operating_hours_default_disabled() -> None: + cfg = Config._from_dict(_with_opts({})) + assert cfg.operating_hours.enabled is False + assert cfg.operating_hours.timezone == "America/New_York" + assert cfg.operating_hours._tz_cache is None + + +def test_operating_hours_enabled_caches_tz() -> None: + cfg = Config._from_dict(_with_opts({ + "operating_hours": { + "enabled": True, + "timezone": "America/New_York", + "weekdays": ["MON", "TUE", "WED", "THU", "FRI"], + "start_hhmm": "09:30", + "stop_hhmm": "16:00", + } + })) + assert cfg.operating_hours.enabled is True + assert cfg.operating_hours._tz_cache is not None + assert str(cfg.operating_hours._tz_cache) == "America/New_York" + + +def test_operating_hours_invalid_tz_raises_valueerror() -> None: + import pytest + with pytest.raises(ValueError, match="operating_hours.timezone"): + Config._from_dict(_with_opts({ + "operating_hours": {"enabled": True, "timezone": "Not/A_Zone"}, + })) + + +def test_operating_hours_invalid_weekday_raises_valueerror() -> None: + import pytest + with pytest.raises(ValueError, match="weekdays"): + Config._from_dict(_with_opts({ + "operating_hours": {"enabled": True, "weekdays": ["XYZ"]}, + })) diff --git a/tests/test_main.py b/tests/test_main.py index 4c98334..c1fb323 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -537,3 +537,215 @@ async def test_drain_isolates_dispatch_exceptions(monkeypatch): # command_error audit event errs = [e for e in ctx.audit.events if e.get("event") == "command_error"] assert len(errs) == 1 and errs[0]["action"] == "status" + + +# --------------------------------------------------------------------------- +# Commit 4: operating hours + LifecycleState transitions +# --------------------------------------------------------------------------- + +from zoneinfo import ZoneInfo as _ZI # noqa: E402 +import datetime as _dt # noqa: E402 + + +def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"), + start="09:30", stop="16:00", tz="America/New_York"): + """Build a lightweight cfg-like object with operating_hours populated.""" + oh = types.SimpleNamespace( + enabled=enabled, + timezone=tz, + weekdays=weekdays, + start_hhmm=start, + stop_hhmm=stop, + _tz_cache=_ZI(tz) if enabled else None, + ) + return types.SimpleNamespace(operating_hours=oh) + + +def _fake_canary(paused=False): + return types.SimpleNamespace(is_paused=paused) + + +@pytest.mark.parametrize( + "local_dt,expected", + [ + # Monday 09:30 NY — exact open → active (None) + (_dt.datetime(2026, 4, 20, 9, 30), None), + # Monday 16:00 NY — exact close → inactive (>= stop) + (_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"), + # Monday 08:00 NY — before open + (_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"), + # Monday 12:00 NY — active + (_dt.datetime(2026, 4, 20, 12, 0), None), + # Saturday 12:00 NY — weekend + (_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"), + # Sunday 23:00 NY — weekend + (_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"), + ], +) +def test_operating_hours_skip_matrix(local_dt, expected): + """Timezone-aware start/stop + weekday checks.""" + import atm.main as _main + + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + now_ts = local_dt.replace(tzinfo=tz).timestamp() + + lifecycle = _main.LifecycleState() + result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary()) + assert result == expected + + +def test_market_open_close_transitions_logged_once(): + """Crossing a boundary emits exactly one market_open / market_closed event.""" + import atm.main as _main + + audit_events = [] + alerts = [] + + class _A: + def log(self, e): audit_events.append(e) + + class _N: + def send(self, a): alerts.append(a) + + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + lifecycle = _main.LifecycleState() + canary = _fake_canary() + + # Prime as closed (before open, Monday 08:00) + pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() + skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary) + _main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N()) + # First evaluation seeds state, no alert yet. + assert lifecycle.last_window_state == "closed" + assert alerts == [] + assert audit_events == [] + + # Transition to open + mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() + skip_mid = _main._should_skip(mid, lifecycle, cfg, canary) + _main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N()) + assert lifecycle.last_window_state == "open" + assert len(alerts) == 1 + assert any(e.get("event") == "market_open" for e in audit_events) + + # Repeated open tick — no duplicate log + alerts.clear() + audit_events.clear() + skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary) + _main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N()) + assert alerts == [] + assert audit_events == [] + + # Transition to close + close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp() + skip_close = _main._should_skip(close, lifecycle, cfg, canary) + _main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N()) + assert lifecycle.last_window_state == "closed" + assert any(e.get("event") == "market_closed" for e in audit_events) + + +def test_market_transition_sends_notification(): + """market_open / market_closed transitions produce kind=status alerts.""" + import atm.main as _main + + alerts = [] + + class _A: + def log(self, e): pass + + class _N: + def send(self, a): alerts.append(a) + + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + lifecycle = _main.LifecycleState(last_window_state="closed") + + mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() + _main._maybe_log_transition(None, lifecycle, mid, _A(), _N()) + assert len(alerts) == 1 + assert alerts[0].kind == "status" + assert "market" in alerts[0].title.lower() or "piața" in alerts[0].body.lower() + + +def test_startup_in_window_suppresses_market_open(): + """R2 #20: first evaluation in-window just seeds state; no alert fires.""" + import atm.main as _main + + alerts = [] + events = [] + + class _A: + def log(self, e): events.append(e) + + class _N: + def send(self, a): alerts.append(a) + + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + lifecycle = _main.LifecycleState() # last_window_state is None + + in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() + skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary()) + assert skip is None + _main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N()) + + # Seeded silently + assert lifecycle.last_window_state == "open" + assert alerts == [] + assert not any(e.get("event") == "market_open" for e in events) + + # Two more ticks, still in-window → no spurious alert + for _ in range(2): + skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary()) + _main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N()) + assert alerts == [] + + +def test_operating_hours_weekday_locale_independent(): + """R2 #22: weekday check must not depend on process locale (strftime('%a')).""" + import locale as _locale + import atm.main as _main + + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + # Saturday 12:00 NY + sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() + + original = _locale.setlocale(_locale.LC_TIME) + try: + for loc in ("C", "de_DE.UTF-8"): + try: + _locale.setlocale(_locale.LC_TIME, loc) + except _locale.Error: + continue # locale not installed → skip gracefully + lifecycle = _main.LifecycleState() + result = _main._should_skip(sat, lifecycle, cfg, _fake_canary()) + assert result == "out_of_window_weekend", ( + f"locale={loc} returned {result!r}" + ) + finally: + try: + _locale.setlocale(_locale.LC_TIME, original) + except _locale.Error: + _locale.setlocale(_locale.LC_TIME, "C") + + +def test_should_skip_user_paused_wins(): + import atm.main as _main + cfg = _oh_cfg() + lifecycle = _main.LifecycleState(user_paused=True) + # Mid-Monday (in-window) — should still skip because user_paused + tz = cfg.operating_hours._tz_cache + mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() + assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused" + + +def test_should_skip_canary_drift_wins_over_window(): + import atm.main as _main + cfg = _oh_cfg() + lifecycle = _main.LifecycleState() + tz = cfg.operating_hours._tz_cache + mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() + assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused"