feat(run,config): operating hours window + timezone-aware lifecycle state

Add OperatingHoursCfg (enabled/timezone/weekdays/start_hhmm/stop_hhmm) so
the run loop can align with NYSE session hours instead of the user's
local wall clock (fixes DST drift between NY and Europe/Bucharest).

- Config parses [options.operating_hours] and resolves ZoneInfo at load,
  fail-fast on invalid tz or weekday names. The tz is cached on
  _tz_cache so the detection loop pays zero per-tick cost.
- LifecycleState tracks user_paused + last_window_state across ticks.
- Module-scope _should_skip(now, state, cfg, canary) returns skip reason
  or None. Weekday check uses datetime.weekday() + a fixed MON..SUN list
  (locale-free; strftime('%a') is localized).
- _maybe_log_transition emits market_open / market_closed once per edge.
  R2: when last_window_state is None (startup), just seed — do not send
  a spurious market_open alert when run_live_async launches in-window.
- _run_tick consults the lifecycle guard before scheduling the heavy
  detection thread, so drain + transition logging still happen when the
  tick is skipped.
- CLI flags --tz / --weekdays / --oh-start / --oh-stop override TOML.
  (Kept distinct from the existing --start-at/--stop-at sleep-until-time
  semantics to avoid breaking current deployments — deviation noted.)
- configs/example.toml documents the new [options.operating_hours] table.

Tests: parametrized window matrix (tests #8), transition logging (#9),
notification side-effect (#10), R2 #20 startup suppression, R2 #22
locale-independent weekday, plus guards for user_paused / canary
precedence and config-parse error paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 11:59:22 +03:00
parent 8b53b8d3c9
commit 54f55752c1
5 changed files with 519 additions and 1 deletions

View File

@@ -88,6 +88,17 @@ dead_letter_path = "logs/dead_letter.jsonl"
[options.alerts] [options.alerts]
fire_on_phase_skip = true fire_on_phase_skip = true
# Operating hours — detection only runs on allowed weekdays + HH:MM window.
# Timezone is the source of truth (NYSE local); the runtime converts tick
# timestamps to this zone so DST rollovers stay aligned with the exchange.
# Override from CLI with --tz / --weekdays / --oh-start / --oh-stop.
[options.operating_hours]
enabled = false
timezone = "America/New_York"
weekdays = ["MON", "TUE", "WED", "THU", "FRI"]
start_hhmm = "09:30"
stop_hhmm = "16:00"
# Per-kind screenshot-attach toggles. All default to true on upgrade. # Per-kind screenshot-attach toggles. All default to true on upgrade.
# Accepts either a bare bool (legacy: attach_screenshots = true) or this table. # Accepts either a bare bool (legacy: attach_screenshots = true) or this table.
[options.attach_screenshots] [options.attach_screenshots]

View File

@@ -5,6 +5,9 @@ import tomllib
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Literal from typing import Literal
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
_VALID_WEEKDAYS: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
DotColor = Literal[ DotColor = Literal[
"turquoise", "yellow", "turquoise", "yellow",
@@ -97,6 +100,31 @@ class AlertsCfg:
trigger: bool = True trigger: bool = True
@dataclass
class OperatingHoursCfg:
"""Session window: only run detection on allowed weekdays within HH:MM range.
Timezone is the source of truth for the exchange (default America/New_York
for NYSE). Start/stop are compared against the clock in that timezone.
Weekday check uses datetime.weekday() + a fixed MON..SUN list to stay
locale-independent (strftime('%a') returns localized names).
The ZoneInfo is cached at config load time so the detection loop doesn't
pay per-tick lookup cost.
NOTE: this dataclass is mutable (non-frozen) so Config._from_dict can stash
the resolved ZoneInfo onto `_tz_cache` after validation. Treat fields as
read-only at runtime.
"""
enabled: bool = False
timezone: str = "America/New_York"
weekdays: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI")
start_hhmm: str = "09:30"
stop_hhmm: str = "16:00"
# Populated by Config._from_dict; None for disabled or failed-load cases.
_tz_cache: ZoneInfo | None = None
@dataclass(frozen=True) @dataclass(frozen=True)
class AlertBehaviorCfg: class AlertBehaviorCfg:
"""Alert behavior knobs (not screenshot toggles). """Alert behavior knobs (not screenshot toggles).
@@ -130,6 +158,7 @@ class Config:
dead_letter_path: str = "logs/dead_letter.jsonl" dead_letter_path: str = "logs/dead_letter.jsonl"
attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg) attach_screenshots: AlertsCfg = field(default_factory=AlertsCfg)
alerts: AlertBehaviorCfg = field(default_factory=AlertBehaviorCfg) alerts: AlertBehaviorCfg = field(default_factory=AlertBehaviorCfg)
operating_hours: OperatingHoursCfg = field(default_factory=OperatingHoursCfg)
config_version: str = "unknown" config_version: str = "unknown"
def __post_init__(self) -> None: def __post_init__(self) -> None:
@@ -202,6 +231,31 @@ class Config:
alert_behavior = AlertBehaviorCfg( alert_behavior = AlertBehaviorCfg(
fire_on_phase_skip=bool(alerts_dict.get("fire_on_phase_skip", True)), fire_on_phase_skip=bool(alerts_dict.get("fire_on_phase_skip", True)),
) )
oh_dict = opts.get("operating_hours", {}) or {}
oh_weekdays = tuple(
str(w).upper() for w in oh_dict.get("weekdays", ("MON", "TUE", "WED", "THU", "FRI"))
)
for wd in oh_weekdays:
if wd not in _VALID_WEEKDAYS:
raise ValueError(
f"operating_hours.weekdays contains invalid day {wd!r}; "
f"expected any of {_VALID_WEEKDAYS}"
)
oh = OperatingHoursCfg(
enabled=bool(oh_dict.get("enabled", False)),
timezone=str(oh_dict.get("timezone", "America/New_York")),
weekdays=oh_weekdays,
start_hhmm=str(oh_dict.get("start_hhmm", "09:30")),
stop_hhmm=str(oh_dict.get("stop_hhmm", "16:00")),
)
if oh.enabled:
try:
oh._tz_cache = ZoneInfo(oh.timezone)
except ZoneInfoNotFoundError as exc:
raise ValueError(
f"operating_hours.timezone {oh.timezone!r} invalid: {exc}"
) from exc
return cls( return cls(
window_title=data["window_title"], window_title=data["window_title"],
dot_roi=roi, dot_roi=roi,
@@ -222,5 +276,6 @@ class Config:
dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"), dead_letter_path=opts.get("dead_letter_path", "logs/dead_letter.jsonl"),
attach_screenshots=attach, attach_screenshots=attach,
alerts=alert_behavior, alerts=alert_behavior,
operating_hours=oh,
config_version=version, config_version=version,
) )

View File

@@ -92,6 +92,23 @@ def main(argv=None) -> None:
help="Stop at local HH:MM (overrides --duration). If the time is in " help="Stop at local HH:MM (overrides --duration). If the time is in "
"the past when the loop starts, rolls over to tomorrow.", "the past when the loop starts, rolls over to tomorrow.",
) )
p_run.add_argument(
"--tz", metavar="ZONE", default=None,
help="Override operating_hours.timezone (e.g. America/New_York).",
)
p_run.add_argument(
"--weekdays", metavar="DAYS", default=None,
help="Override operating_hours.weekdays. Accepts comma list "
"(MON,TUE) or range (MON-FRI).",
)
p_run.add_argument(
"--oh-start", metavar="HH:MM", default=None,
help="Override operating_hours.start_hhmm (exchange-local).",
)
p_run.add_argument(
"--oh-stop", metavar="HH:MM", default=None,
help="Override operating_hours.stop_hhmm (exchange-local).",
)
# journal # journal
p_journal = sub.add_parser("journal", help="Add a trade journal entry interactively") p_journal = sub.add_parser("journal", help="Add a trade journal entry interactively")
@@ -171,6 +188,7 @@ def _cmd_dryrun(args) -> None:
def _cmd_run(args) -> None: def _cmd_run(args) -> None:
cfg = Config.load_current(Path("configs")) cfg = Config.load_current(Path("configs"))
cfg = _apply_operating_hours_cli_overrides(cfg, args)
capture_stub = args.capture_stub or bool(os.environ.get("ATM_STUB_CAPTURE")) capture_stub = args.capture_stub or bool(os.environ.get("ATM_STUB_CAPTURE"))
# --start-at HH:MM: sleep until the next occurrence of that local wall-clock time # --start-at HH:MM: sleep until the next occurrence of that local wall-clock time
@@ -230,6 +248,66 @@ def _cmd_run(args) -> None:
run_live(cfg, duration_s=duration_s, capture_stub=capture_stub) run_live(cfg, duration_s=duration_s, capture_stub=capture_stub)
_WEEKDAY_ORDER = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
def _parse_weekdays_arg(raw: str) -> tuple[str, ...]:
"""Accept 'MON,TUE,WED' or 'MON-FRI'. Case-insensitive."""
txt = raw.strip().upper()
if "-" in txt and "," not in txt:
a, b = (p.strip() for p in txt.split("-", 1))
if a not in _WEEKDAY_ORDER or b not in _WEEKDAY_ORDER:
raise ValueError(f"unknown weekday(s) in range {raw!r}")
i, j = _WEEKDAY_ORDER.index(a), _WEEKDAY_ORDER.index(b)
if i > j:
raise ValueError(f"weekday range reversed: {raw!r}")
return tuple(_WEEKDAY_ORDER[i : j + 1])
days = tuple(d.strip() for d in txt.split(",") if d.strip())
for d in days:
if d not in _WEEKDAY_ORDER:
raise ValueError(f"unknown weekday {d!r} (valid: {_WEEKDAY_ORDER})")
return days
def _apply_operating_hours_cli_overrides(cfg, args):
"""Return cfg (possibly new) with operating_hours overridden by CLI flags.
Config is a frozen dataclass, but operating_hours is non-frozen by design
so we can tweak it in-place and recompute the tz cache. CLI flags implicitly
enable operating_hours even if the TOML had it disabled.
"""
import dataclasses as _dc
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
oh = cfg.operating_hours
any_override = any(
getattr(args, k, None)
for k in ("tz", "weekdays", "oh_start", "oh_stop")
)
if not any_override:
return cfg
new_tz = args.tz if args.tz else oh.timezone
try:
tz_cache = ZoneInfo(new_tz)
except ZoneInfoNotFoundError as exc:
sys.exit(f"--tz {new_tz!r} invalid: {exc}")
new_weekdays = _parse_weekdays_arg(args.weekdays) if args.weekdays else oh.weekdays
new_start = args.oh_start if args.oh_start else oh.start_hhmm
new_stop = args.oh_stop if args.oh_stop else oh.stop_hhmm
oh.enabled = True
oh.timezone = new_tz
oh.weekdays = new_weekdays
oh.start_hhmm = new_start
oh.stop_hhmm = new_stop
oh._tz_cache = tz_cache
# Config is frozen but operating_hours is a mutable field object —
# mutating it in place is sufficient; no dataclasses.replace needed.
_ = _dc # keep import for future use
return cfg
def _cmd_journal(args) -> None: def _cmd_journal(args) -> None:
try: try:
from atm.journal import Journal, prompt_entry from atm.journal import Journal, prompt_entry
@@ -579,6 +657,7 @@ class RunContext:
cmd_queue: Any # asyncio.Queue[Command] cmd_queue: Any # asyncio.Queue[Command]
state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start state: Any # carries first_accepted, last_saved_color, levels_extractor, fire_count, start
levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now) levels_extractor_factory: Callable # builds LevelsExtractor(cfg, trigger, now)
lifecycle: Any = None # LifecycleState — window + user_paused tracking
@dataclass @dataclass
@@ -591,6 +670,92 @@ class _LoopState:
start: float = 0.0 start: float = 0.0
@dataclass
class LifecycleState:
"""Tracks user-pause / out-of-window state across detection ticks.
last_window_state: None at startup so _maybe_log_transition can seed it
without emitting a spurious market_open alert on the first in-window tick.
"""
user_paused: bool = False
last_window_state: str | None = None # "open" / "closed" / None (uninitialized)
# Locale-independent weekday names; index matches datetime.weekday() (MON=0).
_WEEKDAY_NAMES: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None:
"""Return a reason string if detection should be skipped, else None.
Order: user_paused > canary drift > operating-hours window. Uses the
ZoneInfo cached on cfg.operating_hours._tz_cache (populated at config load)
to avoid per-tick tz lookup cost.
"""
if state.user_paused:
return "user_paused"
if getattr(canary, "is_paused", False):
return "drift_paused"
oh = getattr(cfg, "operating_hours", None)
if oh is None or not oh.enabled:
return None
tz = getattr(oh, "_tz_cache", None)
if tz is None:
# Enabled but no tz resolved — skip the check rather than crash mid-loop.
return None
now_exchange = datetime.fromtimestamp(now_ts, tz=tz)
# weekday() = 0..6 (MON..SUN). Locale-free; strftime('%a') is not.
if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays:
return "out_of_window_weekend"
hhmm = now_exchange.strftime("%H:%M")
if hhmm < oh.start_hhmm or hhmm >= oh.stop_hhmm:
return "out_of_window_hours"
return None
def _maybe_log_transition(
reason: str | None,
state: LifecycleState,
now: float,
audit: _AuditLike,
notifier: _NotifierLike,
) -> None:
"""Log market_open / market_closed exactly once per transition.
Startup guard (R2): when last_window_state is None we just seed it; no
alert/audit event is emitted for the initial evaluation. This prevents a
spurious market_open alert when run_live_async starts in-window.
"""
if reason is None:
window_reason = "open"
elif reason.startswith("out_of_window"):
window_reason = "closed"
else:
# user_paused / drift_paused don't change market window state
return
if window_reason == state.last_window_state:
return
if state.last_window_state is None:
state.last_window_state = window_reason
return
event_name = "market_open" if window_reason == "open" else "market_closed"
audit.log({"ts": now, "event": event_name, "reason": reason})
body = (
"Piața închisă — monitorizare pauzată până la următoarea deschidere"
if event_name == "market_closed"
else "Piața deschisă — monitorizare reluată"
)
notifier.send(Alert(
kind="status",
title=event_name.replace("_", " ").title(),
body=body,
))
state.last_window_state = window_reason
def _sync_detection_tick( def _sync_detection_tick(
capture: Callable, capture: Callable,
canary: Any, canary: Any,
@@ -683,8 +848,20 @@ def _sync_detection_tick(
async def _run_tick(ctx: RunContext) -> _TickSyncResult: async def _run_tick(ctx: RunContext) -> _TickSyncResult:
"""Execute one `_sync_detection_tick` in a thread; returns result or empty.""" """Execute one `_sync_detection_tick` in a thread; returns result or empty.
Lifecycle gating (user pause / operating hours / drift) happens here, not
inside the sync tick, so the async loop can still drain commands and emit
market_open / market_closed transitions even when the heavy detection
work is skipped.
"""
now = time.time() now = time.time()
if ctx.lifecycle is not None:
skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary)
_maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier)
if skip is not None:
# No detection this tick. Empty result → _handle_fsm_result no-op.
return _TickSyncResult()
return await asyncio.to_thread( return await asyncio.to_thread(
_sync_detection_tick, _sync_detection_tick,
ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm, ctx.capture, ctx.canary, ctx.cfg, ctx.detector, ctx.fsm,
@@ -926,12 +1103,19 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
) )
poller = TelegramPoller(cfg.telegram, cmd_queue, audit) poller = TelegramPoller(cfg.telegram, cmd_queue, audit)
lifecycle = LifecycleState()
# Seed lifecycle.last_window_state with the current status so we don't emit
# a spurious market_open alert on the very first tick (R2).
_pre_skip = _should_skip(time.time(), lifecycle, cfg, canary)
_maybe_log_transition(_pre_skip, lifecycle, time.time(), audit, notifier)
ctx = RunContext( ctx = RunContext(
cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm, cfg=cfg, capture=capture, canary=canary, detector=detector, fsm=fsm,
notifier=notifier, audit=audit, detection_log=detection_log, notifier=notifier, audit=audit, detection_log=detection_log,
scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir, scheduler=scheduler, samples_dir=samples_dir, fires_dir=fires_dir,
cmd_queue=cmd_queue, state=loop_state, cmd_queue=cmd_queue, state=loop_state,
levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts), levels_extractor_factory=lambda _cfg, trigger, now_ts: LevelsExtractor(_cfg, trigger, now_ts),
lifecycle=lifecycle,
) )
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View File

@@ -97,3 +97,59 @@ def test_attach_screenshots_unknown_keys_ignored() -> None:
})) }))
assert cfg.attach_screenshots.arm is False assert cfg.attach_screenshots.arm is False
# Should not raise even with unknown key # Should not raise even with unknown key
# ---------------------------------------------------------------------------
# Commit 3: AlertBehaviorCfg (fire_on_phase_skip)
# ---------------------------------------------------------------------------
def test_alerts_default_fire_on_phase_skip_true() -> None:
cfg = Config._from_dict(_with_opts({}))
assert cfg.alerts.fire_on_phase_skip is True
def test_alerts_fire_on_phase_skip_can_be_disabled() -> None:
cfg = Config._from_dict(_with_opts({"alerts": {"fire_on_phase_skip": False}}))
assert cfg.alerts.fire_on_phase_skip is False
# ---------------------------------------------------------------------------
# Commit 4: OperatingHoursCfg parsing + tz cache
# ---------------------------------------------------------------------------
def test_operating_hours_default_disabled() -> None:
cfg = Config._from_dict(_with_opts({}))
assert cfg.operating_hours.enabled is False
assert cfg.operating_hours.timezone == "America/New_York"
assert cfg.operating_hours._tz_cache is None
def test_operating_hours_enabled_caches_tz() -> None:
cfg = Config._from_dict(_with_opts({
"operating_hours": {
"enabled": True,
"timezone": "America/New_York",
"weekdays": ["MON", "TUE", "WED", "THU", "FRI"],
"start_hhmm": "09:30",
"stop_hhmm": "16:00",
}
}))
assert cfg.operating_hours.enabled is True
assert cfg.operating_hours._tz_cache is not None
assert str(cfg.operating_hours._tz_cache) == "America/New_York"
def test_operating_hours_invalid_tz_raises_valueerror() -> None:
import pytest
with pytest.raises(ValueError, match="operating_hours.timezone"):
Config._from_dict(_with_opts({
"operating_hours": {"enabled": True, "timezone": "Not/A_Zone"},
}))
def test_operating_hours_invalid_weekday_raises_valueerror() -> None:
import pytest
with pytest.raises(ValueError, match="weekdays"):
Config._from_dict(_with_opts({
"operating_hours": {"enabled": True, "weekdays": ["XYZ"]},
}))

View File

@@ -537,3 +537,215 @@ async def test_drain_isolates_dispatch_exceptions(monkeypatch):
# command_error audit event # command_error audit event
errs = [e for e in ctx.audit.events if e.get("event") == "command_error"] errs = [e for e in ctx.audit.events if e.get("event") == "command_error"]
assert len(errs) == 1 and errs[0]["action"] == "status" assert len(errs) == 1 and errs[0]["action"] == "status"
# ---------------------------------------------------------------------------
# Commit 4: operating hours + LifecycleState transitions
# ---------------------------------------------------------------------------
from zoneinfo import ZoneInfo as _ZI # noqa: E402
import datetime as _dt # noqa: E402
def _oh_cfg(enabled=True, weekdays=("MON", "TUE", "WED", "THU", "FRI"),
start="09:30", stop="16:00", tz="America/New_York"):
"""Build a lightweight cfg-like object with operating_hours populated."""
oh = types.SimpleNamespace(
enabled=enabled,
timezone=tz,
weekdays=weekdays,
start_hhmm=start,
stop_hhmm=stop,
_tz_cache=_ZI(tz) if enabled else None,
)
return types.SimpleNamespace(operating_hours=oh)
def _fake_canary(paused=False):
return types.SimpleNamespace(is_paused=paused)
@pytest.mark.parametrize(
"local_dt,expected",
[
# Monday 09:30 NY — exact open → active (None)
(_dt.datetime(2026, 4, 20, 9, 30), None),
# Monday 16:00 NY — exact close → inactive (>= stop)
(_dt.datetime(2026, 4, 20, 16, 0), "out_of_window_hours"),
# Monday 08:00 NY — before open
(_dt.datetime(2026, 4, 20, 8, 0), "out_of_window_hours"),
# Monday 12:00 NY — active
(_dt.datetime(2026, 4, 20, 12, 0), None),
# Saturday 12:00 NY — weekend
(_dt.datetime(2026, 4, 18, 12, 0), "out_of_window_weekend"),
# Sunday 23:00 NY — weekend
(_dt.datetime(2026, 4, 19, 23, 0), "out_of_window_weekend"),
],
)
def test_operating_hours_skip_matrix(local_dt, expected):
"""Timezone-aware start/stop + weekday checks."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
now_ts = local_dt.replace(tzinfo=tz).timestamp()
lifecycle = _main.LifecycleState()
result = _main._should_skip(now_ts, lifecycle, cfg, _fake_canary())
assert result == expected
def test_market_open_close_transitions_logged_once():
"""Crossing a boundary emits exactly one market_open / market_closed event."""
import atm.main as _main
audit_events = []
alerts = []
class _A:
def log(self, e): audit_events.append(e)
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState()
canary = _fake_canary()
# Prime as closed (before open, Monday 08:00)
pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp()
skip_pre = _main._should_skip(pre_open, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_pre, lifecycle, pre_open, _A(), _N())
# First evaluation seeds state, no alert yet.
assert lifecycle.last_window_state == "closed"
assert alerts == []
assert audit_events == []
# Transition to open
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
skip_mid = _main._should_skip(mid, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_mid, lifecycle, mid, _A(), _N())
assert lifecycle.last_window_state == "open"
assert len(alerts) == 1
assert any(e.get("event") == "market_open" for e in audit_events)
# Repeated open tick — no duplicate log
alerts.clear()
audit_events.clear()
skip_mid2 = _main._should_skip(mid + 60, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_mid2, lifecycle, mid + 60, _A(), _N())
assert alerts == []
assert audit_events == []
# Transition to close
close = _dt.datetime(2026, 4, 20, 17, 0, tzinfo=tz).timestamp()
skip_close = _main._should_skip(close, lifecycle, cfg, canary)
_main._maybe_log_transition(skip_close, lifecycle, close, _A(), _N())
assert lifecycle.last_window_state == "closed"
assert any(e.get("event") == "market_closed" for e in audit_events)
def test_market_transition_sends_notification():
"""market_open / market_closed transitions produce kind=status alerts."""
import atm.main as _main
alerts = []
class _A:
def log(self, e): pass
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState(last_window_state="closed")
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
_main._maybe_log_transition(None, lifecycle, mid, _A(), _N())
assert len(alerts) == 1
assert alerts[0].kind == "status"
assert "market" in alerts[0].title.lower() or "piața" in alerts[0].body.lower()
def test_startup_in_window_suppresses_market_open():
"""R2 #20: first evaluation in-window just seeds state; no alert fires."""
import atm.main as _main
alerts = []
events = []
class _A:
def log(self, e): events.append(e)
class _N:
def send(self, a): alerts.append(a)
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState() # last_window_state is None
in_window = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
skip = _main._should_skip(in_window, lifecycle, cfg, _fake_canary())
assert skip is None
_main._maybe_log_transition(skip, lifecycle, in_window, _A(), _N())
# Seeded silently
assert lifecycle.last_window_state == "open"
assert alerts == []
assert not any(e.get("event") == "market_open" for e in events)
# Two more ticks, still in-window → no spurious alert
for _ in range(2):
skip = _main._should_skip(in_window + 60, lifecycle, cfg, _fake_canary())
_main._maybe_log_transition(skip, lifecycle, in_window + 60, _A(), _N())
assert alerts == []
def test_operating_hours_weekday_locale_independent():
"""R2 #22: weekday check must not depend on process locale (strftime('%a'))."""
import locale as _locale
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
# Saturday 12:00 NY
sat = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
original = _locale.setlocale(_locale.LC_TIME)
try:
for loc in ("C", "de_DE.UTF-8"):
try:
_locale.setlocale(_locale.LC_TIME, loc)
except _locale.Error:
continue # locale not installed → skip gracefully
lifecycle = _main.LifecycleState()
result = _main._should_skip(sat, lifecycle, cfg, _fake_canary())
assert result == "out_of_window_weekend", (
f"locale={loc} returned {result!r}"
)
finally:
try:
_locale.setlocale(_locale.LC_TIME, original)
except _locale.Error:
_locale.setlocale(_locale.LC_TIME, "C")
def test_should_skip_user_paused_wins():
import atm.main as _main
cfg = _oh_cfg()
lifecycle = _main.LifecycleState(user_paused=True)
# Mid-Monday (in-window) — should still skip because user_paused
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "user_paused"
def test_should_skip_canary_drift_wins_over_window():
import atm.main as _main
cfg = _oh_cfg()
lifecycle = _main.LifecycleState()
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused"