feat(lifecycle): /window + stop screenshot-uri la închiderea bursei

Fix #1: la tranziția market_closed scheduler-ul e oprit forțat și FSM
resetat la IDLE (_handle_market_closed), ca o bulină dark_* rămasă PRIMED
să nu mai trimită screenshot-uri periodice după închidere — și ca scheduler-ul
să poată reporni curat în sesiunea următoare (muchia 0->1 n_primed_global).

Fix #2: comandă Telegram /window HH:MM-HH:MM (sau HH:MM HH:MM) — fereastră
de monitorizare în ora locală, recurentă zilnic, ANDed cu operating_hours;
în afara intervalului pauză automată. /window off șterge fereastra.
Discord e webhook outbound, fără poller — comanda e doar Telegram.

DOX pass: src/atm + notifier.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-13 13:25:41 +03:00
parent 39f2439246
commit 6f71c1d633
6 changed files with 267 additions and 20 deletions

View File

@@ -23,7 +23,10 @@ flag-urile `atm run/debug/calibrate`.
- **`detector.py`** — debounce + rolling window peste clasificarea per-ciclu.
- **`scheduler.py`** — asyncio task; capture + cv2 rulează în `to_thread`
(nu bloca event-loop-ul). Decizia 13: scheduler cheamă `capture()` direct,
NU prin `Detector`.
NU prin `Detector`. Pornit pe muchia `0→1` `n_primed_global`, oprit pe fire/
cooled. **La `market_closed`** (`_handle_market_closed` în `main.py`) e oprit
forțat + FSM resetat la IDLE, ca o bulină `dark_*` rămasă PRIMED să nu trimită
screenshot-uri la nesfârșit după închidere și să permită repornirea curată.
- **`vision.py`** — primitive partajate: crop ROI, perceptual hash, interpolare
pixel↔preț, Hough. Singurul loc unde trăiesc primitivele cv2 reutilizabile.
- **`canary.py`** — drift de layout via phash vs `baseline_phash`. Re-anchor prin

View File

@@ -17,15 +17,30 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume", "rebase", "help"]
CommandAction = Literal[
"set_interval", "stop", "status", "ss", "pause", "resume", "rebase", "window", "help"
]
_BASE = "https://api.telegram.org/bot{token}/{method}"
_HHMM = __import__("re").compile(r"^([01]?\d|2[0-3]):[0-5]\d$")
def _norm_hhmm(s: str) -> str | None:
"""Return zero-padded HH:MM if `s` is a valid 24h time, else None."""
if not _HHMM.match(s):
return None
hh, mm = s.split(":")
return f"{int(hh):02d}:{mm}"
@dataclass
class Command:
action: CommandAction
value: int | None = None # seconds; only for set_interval
# (start_hhmm, stop_hhmm) local wall-clock; only for action="window".
# None on a "window" command means "clear the window".
window: tuple[str, str] | None = None
class TelegramPoller:
@@ -168,6 +183,19 @@ class TelegramPoller:
if t == "rebase confirm":
# value=1 applies the pending proposal; plain "rebase" captures+proposes.
return Command(action="rebase", value=1)
if t == "window" or t.startswith("window "):
rest = t[len("window"):].strip()
if rest in ("", "off", "clear"):
# window=None signals "clear the session window".
return Command(action="window", window=None)
# accept "19:40-21:45" or "19:40 21:45"
seg = rest.replace("-", " ").split()
if len(seg) == 2:
start = _norm_hhmm(seg[0])
stop = _norm_hhmm(seg[1])
if start and stop:
return Command(action="window", window=(start, stop))
return None
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
parts = t.split()
if len(parts) == 1 and parts[0].isdigit():

View File

@@ -1173,6 +1173,9 @@ class LifecycleState:
"""
user_paused: bool = False
last_window_state: str | None = None # "open" / "closed" / None (uninitialized)
# Local wall-clock (start_hhmm, stop_hhmm) set via /window; None = no window.
# ANDed with operating_hours: detection runs only when inside BOTH.
session_window: tuple[str, str] | None = None
# Locale-independent weekday names; index matches datetime.weekday() (MON=0).
@@ -1197,21 +1200,22 @@ def _in_trading_window(now_ts: float, cfg) -> bool:
def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None:
"""Return a reason string if detection should be skipped, else None.
Order: user_paused > canary drift > operating-hours window. Uses the
ZoneInfo cached on cfg.operating_hours._tz_cache (populated at config load)
to avoid per-tick tz lookup cost.
Order: user_paused > canary drift > operating-hours window > session window.
Uses the ZoneInfo cached on cfg.operating_hours._tz_cache (populated at
config load) to avoid per-tick tz lookup cost. The session window (set via
/window) is local wall-clock and ANDed with operating_hours: outside EITHER
window detection is skipped.
"""
if state.user_paused:
return "user_paused"
if getattr(canary, "is_paused", False):
return "drift_paused"
oh = getattr(cfg, "operating_hours", None)
if oh is None or not oh.enabled:
return None
if oh is not None and oh.enabled:
tz = getattr(oh, "_tz_cache", None)
if not isinstance(tz, tzinfo):
# Enabled but no tz resolved (or mock cfg in tests) — skip rather than crash.
return None
# Enabled but no tz resolved (or mock cfg in tests) — skip the oh check
# rather than crash; the session window below may still apply.
if isinstance(tz, tzinfo):
now_exchange = datetime.fromtimestamp(now_ts, tz=tz)
# weekday() = 0..6 (MON..SUN). Locale-free; strftime('%a') is not.
if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays:
@@ -1219,6 +1223,12 @@ def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | Non
hhmm = now_exchange.strftime("%H:%M")
if hhmm < oh.start_hhmm or hhmm >= oh.stop_hhmm:
return "out_of_window_hours"
sw = getattr(state, "session_window", None)
if sw is not None:
# Local wall-clock (no tz) — matches --start-at/--stop-at convention.
hhmm_local = datetime.fromtimestamp(now_ts).strftime("%H:%M")
if hhmm_local < sw[0] or hhmm_local >= sw[1]:
return "out_of_window_hours"
return None
@@ -1412,6 +1422,8 @@ async def _run_tick(ctx: RunContext) -> _TickSyncResult:
title = await asyncio.to_thread(_focus_window_by_title, ctx.cfg.window_title)
ctx.audit.log({"ts": now, "event": "window_focused", "command": "market_open", "title": title})
await asyncio.sleep(0.15)
elif transition == "market_closed":
_handle_market_closed(ctx, now)
if skip is not None:
# No detection this tick. Empty result → _handle_fsm_result no-op.
return _TickSyncResult()
@@ -1496,6 +1508,33 @@ def _commit_layout_change(
ctx.detector = new_charts[0].detector
def _reset_charts_fsm(ctx: RunContext) -> None:
"""Reset every chart's FSM to IDLE and clear the primed counter.
Used on the market_closed transition so a stale PRIMED dot (dark_green/
dark_red) doesn't keep the screenshot scheduler alive past close — and so
the 0→1 primed edge can re-arm the scheduler cleanly in the next session.
"""
from atm.state_machine import StateMachine
for c in ctx.charts:
c.fsm = StateMachine(lockout_s=ctx.cfg.lockout_s)
ctx.state.n_primed_global = 0
if ctx.charts:
ctx.fsm = ctx.charts[0].fsm
else:
# Single-chart / legacy path keeps the FSM on ctx.fsm directly.
ctx.fsm = StateMachine(lockout_s=ctx.cfg.lockout_s)
def _handle_market_closed(ctx: RunContext, now: float) -> None:
"""Stop periodic screenshots and reset FSM when the market window closes."""
if getattr(ctx.scheduler, "is_running", False):
ctx.scheduler.stop()
ctx.audit.log({"ts": now, "event": "scheduler_stopped", "reason": "market_closed"})
_reset_charts_fsm(ctx)
def _detect_strips_for_ctx(ctx: RunContext, frame: Any) -> list:
"""Lazy strip detection for ctx.cfg + frame. Empty list when no vivid pixels."""
from atm.layout import detect_strips
@@ -1540,6 +1579,8 @@ async def _run_multi_tick(ctx: RunContext) -> "list[_TickSyncResult]":
title = await asyncio.to_thread(_focus_window_by_title, ctx.cfg.window_title)
ctx.audit.log({"ts": now, "event": "window_focused", "command": "market_open", "title": title})
await asyncio.sleep(0.15)
elif transition == "market_closed":
_handle_market_closed(ctx, now)
if skip is not None:
return []
@@ -1915,6 +1956,29 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
kind="screenshot", title=title_r,
body="\n\n".join(body_parts), image_path=path_r,
))
elif cmd.action == "window":
if ctx.lifecycle is None:
return
if cmd.window is None:
ctx.lifecycle.session_window = None
ctx.audit.log({"ts": time.time(), "event": "session_window_cleared"})
ctx.notifier.send(Alert(
kind="status",
title="Fereastră de monitorizare dezactivată",
body="Monitorizez conform operating_hours (sau continuu).",
))
else:
start_w, stop_w = cmd.window
ctx.lifecycle.session_window = (start_w, stop_w)
ctx.audit.log({
"ts": time.time(), "event": "session_window_set",
"start": start_w, "stop": stop_w,
})
ctx.notifier.send(Alert(
kind="status",
title=f"Fereastră monitorizare: {start_w}{stop_w} (ora locală, zilnic)",
body="În afara intervalului monitorizarea se pauzează automat.",
))
elif cmd.action == "rebase":
await _dispatch_rebase(ctx, cmd)
elif cmd.action == "help":
@@ -1926,6 +1990,8 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
"/rebase — propune phash nou pentru canary (confirm cu /rebase confirm)\n"
"/3 — screenshot automat la fiecare 3 min (sau orice număr)\n"
"/stop — oprește screenshot-urile automate\n"
"/window 19:40-21:45 — monitorizează doar în interval (ora locală, zilnic)\n"
"/window off — dezactivează fereastra\n"
"/h — acest mesaj"
)
ctx.notifier.send(Alert(kind="status", title="Comenzi ATM", body=body))
@@ -2249,7 +2315,13 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
while True:
await asyncio.sleep(60)
if time.monotonic() > heartbeat_due:
if not _in_trading_window(time.time(), cfg):
in_window = _in_trading_window(time.time(), cfg)
sw = ctx.lifecycle.session_window if ctx.lifecycle else None
if sw is not None:
hhmm_local = datetime.fromtimestamp(time.time()).strftime("%H:%M")
if hhmm_local < sw[0] or hhmm_local >= sw[1]:
in_window = False
if not in_window:
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
continue
uptime_h = (time.monotonic() - start) / 3600

View File

@@ -17,7 +17,8 @@ comenzi live Telegram. Fan-out trimite același eveniment pe mai multe canale.
- **`commands.py`** e singurul consumator de **httpx** (async long-poll).
`TelegramNotifier` (sync) rămâne pe **requests**. Nu amesteca cele două.
- **Comenzi live:** `/ss` `/status` `/pause` `/resume` `/rebase` `/3` (interval
min) `/stop`.
min) `/stop` `/window`. **Doar Telegram** primește comenzi — Discord e webhook
outbound, fără poller.
### Contracte pe comenzi (nu le slăbi fără update aici)
@@ -35,6 +36,12 @@ comenzi live Telegram. Fan-out trimite același eveniment pe mai multe canale.
- **Drift-pause** — un singur alert Telegram pe tranziție. Cât e pauzat,
`/set_interval` e refuzat, caption-ul `/ss` avertizează că detecția e oprită,
heartbeat arată `⚠️ pauzat (drift)` în loc de `activ`.
- **`/window HH:MM-HH:MM`** (sau `HH:MM HH:MM`) — fereastră de monitorizare în
**ora locală**, **recurentă zilnic**, stocată în `LifecycleState.session_window`.
Se combină prin **AND** cu `operating_hours` (vezi `../AGENTS.md` → scheduler):
în afara intervalului `_should_skip` întoarce `out_of_window_hours` ⇒ pauză
automată (alertă „Piața închisă" o dată + scheduler oprit + FSM reset).
`/window off` (sau `clear`) șterge fereastra. Format invalid e ignorat.
## Work Guidance

View File

@@ -65,3 +65,39 @@ def test_parse_existing_commands_still_work():
assert p._parse_command("status") == Command(action="status")
assert p._parse_command("ss") == Command(action="ss")
assert p._parse_command("3") == Command(action="set_interval", value=180)
def test_parse_window_dash():
p = _make_poller()
cmd = p._parse_command("window 19:40-21:45")
assert cmd == Command(action="window", window=("19:40", "21:45"))
assert p._parse_command("/window 19:40-21:45") == cmd
def test_parse_window_space():
p = _make_poller()
assert p._parse_command("window 20:50 22:45") == Command(
action="window", window=("20:50", "22:45")
)
def test_parse_window_zero_pads_hour():
p = _make_poller()
assert p._parse_command("window 9:05-9:30") == Command(
action="window", window=("09:05", "09:30")
)
def test_parse_window_off_clears():
p = _make_poller()
assert p._parse_command("window off") == Command(action="window", window=None)
assert p._parse_command("window") == Command(action="window", window=None)
assert p._parse_command("window clear") == Command(action="window", window=None)
def test_parse_window_invalid_returns_none():
p = _make_poller()
assert p._parse_command("window 25:99-26:00") is None
assert p._parse_command("window foo") is None
assert p._parse_command("window 19:40") is None
assert p._parse_command("window 19:60-20:00") is None

View File

@@ -769,6 +769,107 @@ def test_should_skip_canary_drift_wins_over_window():
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused"
# ---------------------------------------------------------------------------
# Session window (/window): local wall-clock, recurring daily, ANDed with oh
# ---------------------------------------------------------------------------
def test_should_skip_session_window_local_only():
"""operating_hours off → only the local session window gates detection."""
import atm.main as _main
cfg = _oh_cfg(enabled=False)
lifecycle = _main.LifecycleState(session_window=("19:40", "21:45"))
canary = _fake_canary()
# Naive datetimes → .timestamp() interprets them as local wall-clock, and
# _should_skip reads them back via datetime.fromtimestamp (also local), so
# this is machine-timezone independent.
inside = _dt.datetime(2026, 4, 20, 20, 30).timestamp()
before = _dt.datetime(2026, 4, 20, 19, 0).timestamp()
after = _dt.datetime(2026, 4, 20, 22, 0).timestamp()
boundary_stop = _dt.datetime(2026, 4, 20, 21, 45).timestamp() # stop is exclusive
assert _main._should_skip(inside, lifecycle, cfg, canary) is None
assert _main._should_skip(before, lifecycle, cfg, canary) == "out_of_window_hours"
assert _main._should_skip(after, lifecycle, cfg, canary) == "out_of_window_hours"
assert _main._should_skip(boundary_stop, lifecycle, cfg, canary) == "out_of_window_hours"
def test_should_skip_session_window_no_window_is_open():
import atm.main as _main
cfg = _oh_cfg(enabled=False)
lifecycle = _main.LifecycleState(session_window=None)
any_ts = _dt.datetime(2026, 4, 20, 3, 0).timestamp()
assert _main._should_skip(any_ts, lifecycle, cfg, _fake_canary()) is None
def test_should_skip_operating_hours_wins_over_session_window():
"""Out of operating_hours → skip even if the local session window is open."""
import atm.main as _main
cfg = _oh_cfg() # NY 09:30-16:00 enabled
tz = cfg.operating_hours._tz_cache
pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() # NY pre-open
# Session window that contains the corresponding local time (so only oh skips).
local_hhmm = _dt.datetime.fromtimestamp(pre_open).strftime("%H:%M")
lifecycle = _main.LifecycleState(session_window=("00:00", "23:59"))
skip = _main._should_skip(pre_open, lifecycle, cfg, _fake_canary())
assert skip == "out_of_window_hours"
# sanity: local time really is inside the wide session window
assert "00:00" <= local_hhmm < "23:59"
def test_should_skip_session_window_ands_inside_operating_hours():
"""Inside operating_hours but outside the local session window → skip."""
import atm.main as _main
cfg = _oh_cfg() # NY 09:30-16:00 enabled
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() # in NY window
local_hhmm = _dt.datetime.fromtimestamp(mid).strftime("%H:%M")
h = int(local_hhmm[:2])
# 1-minute window two hours away from the current local time → excludes it.
off = (h + 2) % 24
lifecycle = _main.LifecycleState(session_window=(f"{off:02d}:00", f"{off:02d}:01"))
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "out_of_window_hours"
def test_handle_market_closed_stops_scheduler_and_resets_fsm():
import atm.main as _main
from atm.state_machine import State, StateMachine
ctx = _dispatch_ctx()
ctx.cfg.lockout_s = 240
ctx.scheduler.is_running = True
ctx.charts = []
ctx.state.n_primed_global = 2
_main._handle_market_closed(ctx, 0.0)
assert ctx.scheduler.is_running is False
assert ctx.state.n_primed_global == 0
assert isinstance(ctx.fsm, StateMachine)
assert ctx.fsm.state == State.IDLE
assert any(
e.get("event") == "scheduler_stopped" and e.get("reason") == "market_closed"
for e in ctx.audit.events
)
@pytest.mark.asyncio
async def test_window_command_sets_and_clears_session_window():
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
await _main._dispatch_command(ctx, Command(action="window", window=("19:40", "21:45")))
assert ctx.lifecycle.session_window == ("19:40", "21:45")
assert any(e.get("event") == "session_window_set" for e in ctx.audit.events)
await _main._dispatch_command(ctx, Command(action="window", window=None))
assert ctx.lifecycle.session_window is None
assert any(e.get("event") == "session_window_cleared" for e in ctx.audit.events)
# ---------------------------------------------------------------------------
# Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21)
# ---------------------------------------------------------------------------