From 6f71c1d633f7bcc272896e9d0d669a864d51f394 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Sat, 13 Jun 2026 13:25:41 +0300 Subject: [PATCH] =?UTF-8?q?feat(lifecycle):=20/window=20+=20stop=20screens?= =?UTF-8?q?hot-uri=20la=20=C3=AEnchiderea=20bursei?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix #1: la tranziția market_closed scheduler-ul e oprit forțat și FSM resetat la IDLE (_handle_market_closed), ca o bulină dark_* rămasă PRIMED să nu mai trimită screenshot-uri periodice după închidere — și ca scheduler-ul să poată reporni curat în sesiunea următoare (muchia 0->1 n_primed_global). Fix #2: comandă Telegram /window HH:MM-HH:MM (sau HH:MM HH:MM) — fereastră de monitorizare în ora locală, recurentă zilnic, ANDed cu operating_hours; în afara intervalului pauză automată. /window off șterge fereastra. Discord e webhook outbound, fără poller — comanda e doar Telegram. DOX pass: src/atm + notifier. Co-Authored-By: Claude Opus 4.8 --- src/atm/AGENTS.md | 5 +- src/atm/commands.py | 30 ++++++++++- src/atm/main.py | 106 +++++++++++++++++++++++++++++++------ src/atm/notifier/AGENTS.md | 9 +++- tests/test_commands.py | 36 +++++++++++++ tests/test_main.py | 101 +++++++++++++++++++++++++++++++++++ 6 files changed, 267 insertions(+), 20 deletions(-) diff --git a/src/atm/AGENTS.md b/src/atm/AGENTS.md index ba0e3f8..2eba101 100644 --- a/src/atm/AGENTS.md +++ b/src/atm/AGENTS.md @@ -23,7 +23,10 @@ flag-urile `atm run/debug/calibrate`. - **`detector.py`** — debounce + rolling window peste clasificarea per-ciclu. - **`scheduler.py`** — asyncio task; capture + cv2 rulează în `to_thread` (nu bloca event-loop-ul). Decizia 13: scheduler cheamă `capture()` direct, - NU prin `Detector`. + NU prin `Detector`. Pornit pe muchia `0→1` `n_primed_global`, oprit pe fire/ + cooled. **La `market_closed`** (`_handle_market_closed` în `main.py`) e oprit + forțat + FSM resetat la IDLE, ca o bulină `dark_*` rămasă PRIMED să nu trimită + screenshot-uri la nesfârșit după închidere și să permită repornirea curată. - **`vision.py`** — primitive partajate: crop ROI, perceptual hash, interpolare pixel↔preț, Hough. Singurul loc unde trăiesc primitivele cv2 reutilizabile. - **`canary.py`** — drift de layout via phash vs `baseline_phash`. Re-anchor prin diff --git a/src/atm/commands.py b/src/atm/commands.py index d6c04d2..cea039e 100644 --- a/src/atm/commands.py +++ b/src/atm/commands.py @@ -17,15 +17,30 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume", "rebase", "help"] +CommandAction = Literal[ + "set_interval", "stop", "status", "ss", "pause", "resume", "rebase", "window", "help" +] _BASE = "https://api.telegram.org/bot{token}/{method}" +_HHMM = __import__("re").compile(r"^([01]?\d|2[0-3]):[0-5]\d$") + + +def _norm_hhmm(s: str) -> str | None: + """Return zero-padded HH:MM if `s` is a valid 24h time, else None.""" + if not _HHMM.match(s): + return None + hh, mm = s.split(":") + return f"{int(hh):02d}:{mm}" + @dataclass class Command: action: CommandAction value: int | None = None # seconds; only for set_interval + # (start_hhmm, stop_hhmm) local wall-clock; only for action="window". + # None on a "window" command means "clear the window". + window: tuple[str, str] | None = None class TelegramPoller: @@ -168,6 +183,19 @@ class TelegramPoller: if t == "rebase confirm": # value=1 applies the pending proposal; plain "rebase" captures+proposes. return Command(action="rebase", value=1) + if t == "window" or t.startswith("window "): + rest = t[len("window"):].strip() + if rest in ("", "off", "clear"): + # window=None signals "clear the session window". + return Command(action="window", window=None) + # accept "19:40-21:45" or "19:40 21:45" + seg = rest.replace("-", " ").split() + if len(seg) == 2: + start = _norm_hhmm(seg[0]) + stop = _norm_hhmm(seg[1]) + if start and stop: + return Command(action="window", window=(start, stop)) + return None # "3" → set_interval 3 minutes → 180s; "interval 3" also accepted parts = t.split() if len(parts) == 1 and parts[0].isdigit(): diff --git a/src/atm/main.py b/src/atm/main.py index f49b988..46282a3 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -1173,6 +1173,9 @@ class LifecycleState: """ user_paused: bool = False last_window_state: str | None = None # "open" / "closed" / None (uninitialized) + # Local wall-clock (start_hhmm, stop_hhmm) set via /window; None = no window. + # ANDed with operating_hours: detection runs only when inside BOTH. + session_window: tuple[str, str] | None = None # Locale-independent weekday names; index matches datetime.weekday() (MON=0). @@ -1197,28 +1200,35 @@ def _in_trading_window(now_ts: float, cfg) -> bool: def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None: """Return a reason string if detection should be skipped, else None. - Order: user_paused > canary drift > operating-hours window. Uses the - ZoneInfo cached on cfg.operating_hours._tz_cache (populated at config load) - to avoid per-tick tz lookup cost. + Order: user_paused > canary drift > operating-hours window > session window. + Uses the ZoneInfo cached on cfg.operating_hours._tz_cache (populated at + config load) to avoid per-tick tz lookup cost. The session window (set via + /window) is local wall-clock and ANDed with operating_hours: outside EITHER + window detection is skipped. """ if state.user_paused: return "user_paused" if getattr(canary, "is_paused", False): return "drift_paused" oh = getattr(cfg, "operating_hours", None) - if oh is None or not oh.enabled: - return None - tz = getattr(oh, "_tz_cache", None) - if not isinstance(tz, tzinfo): - # Enabled but no tz resolved (or mock cfg in tests) — skip rather than crash. - return None - now_exchange = datetime.fromtimestamp(now_ts, tz=tz) - # weekday() = 0..6 (MON..SUN). Locale-free; strftime('%a') is not. - if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays: - return "out_of_window_weekend" - hhmm = now_exchange.strftime("%H:%M") - if hhmm < oh.start_hhmm or hhmm >= oh.stop_hhmm: - return "out_of_window_hours" + if oh is not None and oh.enabled: + tz = getattr(oh, "_tz_cache", None) + # Enabled but no tz resolved (or mock cfg in tests) — skip the oh check + # rather than crash; the session window below may still apply. + if isinstance(tz, tzinfo): + now_exchange = datetime.fromtimestamp(now_ts, tz=tz) + # weekday() = 0..6 (MON..SUN). Locale-free; strftime('%a') is not. + if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays: + return "out_of_window_weekend" + hhmm = now_exchange.strftime("%H:%M") + if hhmm < oh.start_hhmm or hhmm >= oh.stop_hhmm: + return "out_of_window_hours" + sw = getattr(state, "session_window", None) + if sw is not None: + # Local wall-clock (no tz) — matches --start-at/--stop-at convention. + hhmm_local = datetime.fromtimestamp(now_ts).strftime("%H:%M") + if hhmm_local < sw[0] or hhmm_local >= sw[1]: + return "out_of_window_hours" return None @@ -1412,6 +1422,8 @@ async def _run_tick(ctx: RunContext) -> _TickSyncResult: title = await asyncio.to_thread(_focus_window_by_title, ctx.cfg.window_title) ctx.audit.log({"ts": now, "event": "window_focused", "command": "market_open", "title": title}) await asyncio.sleep(0.15) + elif transition == "market_closed": + _handle_market_closed(ctx, now) if skip is not None: # No detection this tick. Empty result → _handle_fsm_result no-op. return _TickSyncResult() @@ -1496,6 +1508,33 @@ def _commit_layout_change( ctx.detector = new_charts[0].detector +def _reset_charts_fsm(ctx: RunContext) -> None: + """Reset every chart's FSM to IDLE and clear the primed counter. + + Used on the market_closed transition so a stale PRIMED dot (dark_green/ + dark_red) doesn't keep the screenshot scheduler alive past close — and so + the 0→1 primed edge can re-arm the scheduler cleanly in the next session. + """ + from atm.state_machine import StateMachine + + for c in ctx.charts: + c.fsm = StateMachine(lockout_s=ctx.cfg.lockout_s) + ctx.state.n_primed_global = 0 + if ctx.charts: + ctx.fsm = ctx.charts[0].fsm + else: + # Single-chart / legacy path keeps the FSM on ctx.fsm directly. + ctx.fsm = StateMachine(lockout_s=ctx.cfg.lockout_s) + + +def _handle_market_closed(ctx: RunContext, now: float) -> None: + """Stop periodic screenshots and reset FSM when the market window closes.""" + if getattr(ctx.scheduler, "is_running", False): + ctx.scheduler.stop() + ctx.audit.log({"ts": now, "event": "scheduler_stopped", "reason": "market_closed"}) + _reset_charts_fsm(ctx) + + def _detect_strips_for_ctx(ctx: RunContext, frame: Any) -> list: """Lazy strip detection for ctx.cfg + frame. Empty list when no vivid pixels.""" from atm.layout import detect_strips @@ -1540,6 +1579,8 @@ async def _run_multi_tick(ctx: RunContext) -> "list[_TickSyncResult]": title = await asyncio.to_thread(_focus_window_by_title, ctx.cfg.window_title) ctx.audit.log({"ts": now, "event": "window_focused", "command": "market_open", "title": title}) await asyncio.sleep(0.15) + elif transition == "market_closed": + _handle_market_closed(ctx, now) if skip is not None: return [] @@ -1915,6 +1956,29 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: kind="screenshot", title=title_r, body="\n\n".join(body_parts), image_path=path_r, )) + elif cmd.action == "window": + if ctx.lifecycle is None: + return + if cmd.window is None: + ctx.lifecycle.session_window = None + ctx.audit.log({"ts": time.time(), "event": "session_window_cleared"}) + ctx.notifier.send(Alert( + kind="status", + title="Fereastră de monitorizare dezactivată", + body="Monitorizez conform operating_hours (sau continuu).", + )) + else: + start_w, stop_w = cmd.window + ctx.lifecycle.session_window = (start_w, stop_w) + ctx.audit.log({ + "ts": time.time(), "event": "session_window_set", + "start": start_w, "stop": stop_w, + }) + ctx.notifier.send(Alert( + kind="status", + title=f"Fereastră monitorizare: {start_w}–{stop_w} (ora locală, zilnic)", + body="În afara intervalului monitorizarea se pauzează automat.", + )) elif cmd.action == "rebase": await _dispatch_rebase(ctx, cmd) elif cmd.action == "help": @@ -1926,6 +1990,8 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: "/rebase — propune phash nou pentru canary (confirm cu /rebase confirm)\n" "/3 — screenshot automat la fiecare 3 min (sau orice număr)\n" "/stop — oprește screenshot-urile automate\n" + "/window 19:40-21:45 — monitorizează doar în interval (ora locală, zilnic)\n" + "/window off — dezactivează fereastra\n" "/h — acest mesaj" ) ctx.notifier.send(Alert(kind="status", title="Comenzi ATM", body=body)) @@ -2249,7 +2315,13 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No while True: await asyncio.sleep(60) if time.monotonic() > heartbeat_due: - if not _in_trading_window(time.time(), cfg): + in_window = _in_trading_window(time.time(), cfg) + sw = ctx.lifecycle.session_window if ctx.lifecycle else None + if sw is not None: + hhmm_local = datetime.fromtimestamp(time.time()).strftime("%H:%M") + if hhmm_local < sw[0] or hhmm_local >= sw[1]: + in_window = False + if not in_window: heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 continue uptime_h = (time.monotonic() - start) / 3600 diff --git a/src/atm/notifier/AGENTS.md b/src/atm/notifier/AGENTS.md index 7758cce..0655111 100644 --- a/src/atm/notifier/AGENTS.md +++ b/src/atm/notifier/AGENTS.md @@ -17,7 +17,8 @@ comenzi live Telegram. Fan-out trimite același eveniment pe mai multe canale. - **`commands.py`** e singurul consumator de **httpx** (async long-poll). `TelegramNotifier` (sync) rămâne pe **requests**. Nu amesteca cele două. - **Comenzi live:** `/ss` `/status` `/pause` `/resume` `/rebase` `/3` (interval - min) `/stop`. + min) `/stop` `/window`. **Doar Telegram** primește comenzi — Discord e webhook + outbound, fără poller. ### Contracte pe comenzi (nu le slăbi fără update aici) @@ -35,6 +36,12 @@ comenzi live Telegram. Fan-out trimite același eveniment pe mai multe canale. - **Drift-pause** — un singur alert Telegram pe tranziție. Cât e pauzat, `/set_interval` e refuzat, caption-ul `/ss` avertizează că detecția e oprită, heartbeat arată `⚠️ pauzat (drift)` în loc de `activ`. +- **`/window HH:MM-HH:MM`** (sau `HH:MM HH:MM`) — fereastră de monitorizare în + **ora locală**, **recurentă zilnic**, stocată în `LifecycleState.session_window`. + Se combină prin **AND** cu `operating_hours` (vezi `../AGENTS.md` → scheduler): + în afara intervalului `_should_skip` întoarce `out_of_window_hours` ⇒ pauză + automată (alertă „Piața închisă" o dată + scheduler oprit + FSM reset). + `/window off` (sau `clear`) șterge fereastra. Format invalid e ignorat. ## Work Guidance diff --git a/tests/test_commands.py b/tests/test_commands.py index 91f696b..0287a84 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -65,3 +65,39 @@ def test_parse_existing_commands_still_work(): assert p._parse_command("status") == Command(action="status") assert p._parse_command("ss") == Command(action="ss") assert p._parse_command("3") == Command(action="set_interval", value=180) + + +def test_parse_window_dash(): + p = _make_poller() + cmd = p._parse_command("window 19:40-21:45") + assert cmd == Command(action="window", window=("19:40", "21:45")) + assert p._parse_command("/window 19:40-21:45") == cmd + + +def test_parse_window_space(): + p = _make_poller() + assert p._parse_command("window 20:50 22:45") == Command( + action="window", window=("20:50", "22:45") + ) + + +def test_parse_window_zero_pads_hour(): + p = _make_poller() + assert p._parse_command("window 9:05-9:30") == Command( + action="window", window=("09:05", "09:30") + ) + + +def test_parse_window_off_clears(): + p = _make_poller() + assert p._parse_command("window off") == Command(action="window", window=None) + assert p._parse_command("window") == Command(action="window", window=None) + assert p._parse_command("window clear") == Command(action="window", window=None) + + +def test_parse_window_invalid_returns_none(): + p = _make_poller() + assert p._parse_command("window 25:99-26:00") is None + assert p._parse_command("window foo") is None + assert p._parse_command("window 19:40") is None + assert p._parse_command("window 19:60-20:00") is None diff --git a/tests/test_main.py b/tests/test_main.py index c42dc23..58d78fa 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -769,6 +769,107 @@ def test_should_skip_canary_drift_wins_over_window(): assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused" +# --------------------------------------------------------------------------- +# Session window (/window): local wall-clock, recurring daily, ANDed with oh +# --------------------------------------------------------------------------- + +def test_should_skip_session_window_local_only(): + """operating_hours off → only the local session window gates detection.""" + import atm.main as _main + + cfg = _oh_cfg(enabled=False) + lifecycle = _main.LifecycleState(session_window=("19:40", "21:45")) + canary = _fake_canary() + # Naive datetimes → .timestamp() interprets them as local wall-clock, and + # _should_skip reads them back via datetime.fromtimestamp (also local), so + # this is machine-timezone independent. + inside = _dt.datetime(2026, 4, 20, 20, 30).timestamp() + before = _dt.datetime(2026, 4, 20, 19, 0).timestamp() + after = _dt.datetime(2026, 4, 20, 22, 0).timestamp() + boundary_stop = _dt.datetime(2026, 4, 20, 21, 45).timestamp() # stop is exclusive + + assert _main._should_skip(inside, lifecycle, cfg, canary) is None + assert _main._should_skip(before, lifecycle, cfg, canary) == "out_of_window_hours" + assert _main._should_skip(after, lifecycle, cfg, canary) == "out_of_window_hours" + assert _main._should_skip(boundary_stop, lifecycle, cfg, canary) == "out_of_window_hours" + + +def test_should_skip_session_window_no_window_is_open(): + import atm.main as _main + cfg = _oh_cfg(enabled=False) + lifecycle = _main.LifecycleState(session_window=None) + any_ts = _dt.datetime(2026, 4, 20, 3, 0).timestamp() + assert _main._should_skip(any_ts, lifecycle, cfg, _fake_canary()) is None + + +def test_should_skip_operating_hours_wins_over_session_window(): + """Out of operating_hours → skip even if the local session window is open.""" + import atm.main as _main + + cfg = _oh_cfg() # NY 09:30-16:00 enabled + tz = cfg.operating_hours._tz_cache + pre_open = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() # NY pre-open + # Session window that contains the corresponding local time (so only oh skips). + local_hhmm = _dt.datetime.fromtimestamp(pre_open).strftime("%H:%M") + lifecycle = _main.LifecycleState(session_window=("00:00", "23:59")) + skip = _main._should_skip(pre_open, lifecycle, cfg, _fake_canary()) + assert skip == "out_of_window_hours" + # sanity: local time really is inside the wide session window + assert "00:00" <= local_hhmm < "23:59" + + +def test_should_skip_session_window_ands_inside_operating_hours(): + """Inside operating_hours but outside the local session window → skip.""" + import atm.main as _main + + cfg = _oh_cfg() # NY 09:30-16:00 enabled + tz = cfg.operating_hours._tz_cache + mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() # in NY window + local_hhmm = _dt.datetime.fromtimestamp(mid).strftime("%H:%M") + h = int(local_hhmm[:2]) + # 1-minute window two hours away from the current local time → excludes it. + off = (h + 2) % 24 + lifecycle = _main.LifecycleState(session_window=(f"{off:02d}:00", f"{off:02d}:01")) + assert _main._should_skip(mid, lifecycle, cfg, _fake_canary()) == "out_of_window_hours" + + +def test_handle_market_closed_stops_scheduler_and_resets_fsm(): + import atm.main as _main + from atm.state_machine import State, StateMachine + + ctx = _dispatch_ctx() + ctx.cfg.lockout_s = 240 + ctx.scheduler.is_running = True + ctx.charts = [] + ctx.state.n_primed_global = 2 + + _main._handle_market_closed(ctx, 0.0) + + assert ctx.scheduler.is_running is False + assert ctx.state.n_primed_global == 0 + assert isinstance(ctx.fsm, StateMachine) + assert ctx.fsm.state == State.IDLE + assert any( + e.get("event") == "scheduler_stopped" and e.get("reason") == "market_closed" + for e in ctx.audit.events + ) + + +@pytest.mark.asyncio +async def test_window_command_sets_and_clears_session_window(): + import atm.main as _main + from atm.commands import Command + + ctx = _dispatch_ctx() + await _main._dispatch_command(ctx, Command(action="window", window=("19:40", "21:45"))) + assert ctx.lifecycle.session_window == ("19:40", "21:45") + assert any(e.get("event") == "session_window_set" for e in ctx.audit.events) + + await _main._dispatch_command(ctx, Command(action="window", window=None)) + assert ctx.lifecycle.session_window is None + assert any(e.get("event") == "session_window_cleared" for e in ctx.audit.events) + + # --------------------------------------------------------------------------- # Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21) # ---------------------------------------------------------------------------