From 23865776e3885a8b82e2dee81da550853bf7365f Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Sat, 18 Apr 2026 12:01:19 +0300 Subject: [PATCH] feat(commands): /pause /resume + adaptive dispatch + richer /status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add two new Telegram commands so the user can manage monitoring without restarting the process: - /pause sets lifecycle.user_paused = True. The detection loop then short-circuits via _should_skip without touching FSM / canary state. - /resume clears user_paused. R2 decision: drift-pause is NOT lifted by plain /resume (the drift may be legit and require recalibration). "/resume force" (value=1) also calls canary.resume(). The response message adapts to context: - drift active + plain resume → explains force requirement - force + drift → confirms override, warns about recurrence - out-of-window → explains monitor will resume at next open - otherwise → plain "Monitorizare reluată" - /status now shows "Activ: " and window state. commands.py: extend CommandAction literal and _parse_command to accept pause, resume, and "resume force" (value=1 signal). Tests: test_commands.py parse coverage; test_pause_command_sets_user_paused_and_skips_detection, test_resume_clears_user_paused_and_canary_when_forced, test_resume_during_drift_keeps_canary_paused_without_force (R2 #21), test_resume_out_of_window_responds_with_pending_message, test_status_command_reports_pause_reason, test_lifecycle_with_drift_then_resume_then_fire (E2E #16). Co-Authored-By: Claude Sonnet 4.6 --- src/atm/commands.py | 9 +- src/atm/main.py | 60 +++++++++++ tests/test_commands.py | 45 ++++++++ tests/test_main.py | 230 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 343 insertions(+), 1 deletion(-) create mode 100644 tests/test_commands.py diff --git a/src/atm/commands.py b/src/atm/commands.py index ca37977..2d6f6cf 100644 --- a/src/atm/commands.py +++ b/src/atm/commands.py @@ -17,7 +17,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -CommandAction = Literal["set_interval", "stop", "status", "ss"] +CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume"] _BASE = "https://api.telegram.org/bot{token}/{method}" @@ -154,6 +154,13 @@ class TelegramPoller: return Command(action="status") if t in ("ss", "screenshot"): return Command(action="ss") + if t == "pause": + return Command(action="pause") + if t == "resume": + return Command(action="resume") + if t == "resume force": + # value=1 signals force: also lift canary drift-pause, not just user pause. + return Command(action="resume", value=1) # "3" → set_interval 3 minutes → 180s; "interval 3" also accepted parts = t.split() if len(parts) == 1 and parts[0].isdigit(): diff --git a/src/atm/main.py b/src/atm/main.py index 73f34c8..c82815f 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -941,8 +941,23 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ" ) if ctx.scheduler.is_running else "oprit" canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok" + + # Active / pause reason + window state + active_info = "activ" + window_info = "—" + if ctx.lifecycle is not None: + skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary) + if skip is not None: + active_info = f"pauzat:{skip}" + oh = getattr(ctx.cfg, "operating_hours", None) + if oh is not None and oh.enabled: + window_info = ctx.lifecycle.last_window_state or "—" + else: + window_info = "always_on" + body = ( f"Stare: {ctx.fsm.state.value}\n" + f"Activ: {active_info} | Fereastră: {window_info}\n" f"Ultima detecție: {last_color} (conf {last_conf})\n" f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n" f"Poller: {sched_info} | Canary: {canary_info}" @@ -963,6 +978,51 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: ) ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None}) ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss)) + elif cmd.action == "pause": + # User manually stops monitoring. Canary drift state is untouched. + if ctx.lifecycle is not None: + ctx.lifecycle.user_paused = True + ctx.audit.log({"ts": time.time(), "event": "user_paused"}) + ctx.notifier.send(Alert( + kind="status", + title="Monitorizare oprită manual", + body="Folosește /resume pentru a relua.", + )) + elif cmd.action == "resume": + # R2: /resume clears only user_paused. Canary drift requires + # /resume force (value == 1) so the user acknowledges the risk. + was_drift = bool(getattr(ctx.canary, "is_paused", False)) + was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False + force = cmd.value == 1 + if ctx.lifecycle is not None: + ctx.lifecycle.user_paused = False + if force and was_drift: + ctx.canary.resume() + ctx.audit.log({ + "ts": time.time(), "event": "user_resumed", + "was_drift": was_drift, "was_user": was_user, "force": force, + }) + # Adaptive response + if was_drift and not force: + title = "Pauză user eliminată — dar Canary drift activ" + body = ( + "Trimite /resume force pentru a anula drift-pause. " + "Recalibrează dacă driftul persistă." + ) + elif force and was_drift: + title = "Drift-pause anulat manual (force)" + body = "Dacă driftul persistă, Canary va repauza." + else: + skip_now = None + if ctx.lifecycle is not None: + skip_now = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary) + if skip_now and skip_now.startswith("out_of_window"): + title = "Pauză eliminată — piața e închisă acum" + body = "Monitorizarea va porni la următoarea fereastră." + else: + title = "Monitorizare reluată" + body = "" + ctx.notifier.send(Alert(kind="status", title=title, body=body)) async def _drain_cmd_queue(ctx: RunContext) -> None: diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..7a9bdf6 --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,45 @@ +"""Tests for atm.commands — /pause /resume parsing (Commit 5).""" +from __future__ import annotations + +from unittest.mock import MagicMock + +from atm.commands import Command, TelegramPoller + + +def _make_poller() -> TelegramPoller: + cfg = MagicMock() + cfg.bot_token = "tok" + cfg.chat_id = "123" + cfg.allowed_chat_ids = ("123",) + cfg.poll_timeout_s = 1 + return TelegramPoller(cfg, MagicMock(), MagicMock()) + + +def test_parse_pause(): + p = _make_poller() + assert p._parse_command("pause") == Command(action="pause") + assert p._parse_command("/pause") == Command(action="pause") + + +def test_parse_resume_plain(): + p = _make_poller() + assert p._parse_command("resume") == Command(action="resume") + assert p._parse_command("/resume") == Command(action="resume") + + +def test_parse_resume_force(): + p = _make_poller() + # "resume force" → value=1 signals force-resume of canary drift + cmd = p._parse_command("resume force") + assert cmd is not None + assert cmd.action == "resume" + assert cmd.value == 1 + + +def test_parse_existing_commands_still_work(): + """Regression: adding pause/resume must not break stop/status/ss/interval.""" + p = _make_poller() + assert p._parse_command("stop") == Command(action="stop") + assert p._parse_command("status") == Command(action="status") + assert p._parse_command("ss") == Command(action="ss") + assert p._parse_command("3") == Command(action="set_interval", value=180) diff --git a/tests/test_main.py b/tests/test_main.py index c1fb323..0901643 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -749,3 +749,233 @@ def test_should_skip_canary_drift_wins_over_window(): tz = cfg.operating_hours._tz_cache mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused" + + +# --------------------------------------------------------------------------- +# Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21) +# --------------------------------------------------------------------------- + +def _dispatch_ctx(canary=None, lifecycle=None, cfg=None): + """Minimal RunContext for _dispatch_command unit tests.""" + import atm.main as _main + + class _A: + def __init__(self): self.events = [] + def log(self, e): self.events.append(e) + + class _N: + def __init__(self): self.alerts = [] + def send(self, a): self.alerts.append(a) + + class _S: + is_running = False + interval_s = None + def start(self, s): self.is_running = True + def stop(self): self.is_running = False + + if canary is None: + canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) + if lifecycle is None: + lifecycle = _main.LifecycleState() + if cfg is None: + cfg = MagicMock() + cfg.telegram.auto_poll_interval_s = 180 + cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) + + state = _main._LoopState(start=0.0) + ctx = _main.RunContext( + cfg=cfg, capture=lambda: None, canary=canary, + detector=MagicMock(), fsm=MagicMock(), + notifier=_N(), audit=_A(), detection_log=_A(), + scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."), + cmd_queue=MagicMock(), state=state, + levels_extractor_factory=lambda *a, **kw: None, + lifecycle=lifecycle, + ) + return ctx + + +@pytest.mark.asyncio +async def test_pause_command_sets_user_paused_and_skips_detection(): + import atm.main as _main + from atm.commands import Command + + ctx = _dispatch_ctx() + await _main._dispatch_command(ctx, Command(action="pause")) + + assert ctx.lifecycle.user_paused is True + # When combined with _should_skip, we get user_paused + assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused" + # Audit + notif + assert any(e.get("event") == "user_paused" for e in ctx.audit.events) + assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts) + + +@pytest.mark.asyncio +async def test_resume_clears_user_paused_and_canary_when_forced(): + import atm.main as _main + from atm.commands import Command + + canary_state = {"paused": True} + canary = types.SimpleNamespace( + is_paused=True, + resume=lambda: canary_state.__setitem__("paused", False), + ) + # Re-bind is_paused via property so resume() effect is visible + class _Canary: + def __init__(self): self._p = True + @property + def is_paused(self): return self._p + def resume(self): self._p = False + canary = _Canary() + + ctx = _dispatch_ctx(canary=canary) + ctx.lifecycle.user_paused = True + + await _main._dispatch_command(ctx, Command(action="resume", value=1)) + + assert ctx.lifecycle.user_paused is False + assert canary.is_paused is False + force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"] + assert force_events and force_events[0]["force"] is True + + +@pytest.mark.asyncio +async def test_resume_during_drift_keeps_canary_paused_without_force(): + """R2 #21: plain /resume during drift clears user_paused but NOT canary.""" + import atm.main as _main + from atm.commands import Command + + class _Canary: + def __init__(self): self._p = True + @property + def is_paused(self): return self._p + def resume(self): self._p = False + canary = _Canary() + + ctx = _dispatch_ctx(canary=canary) + ctx.lifecycle.user_paused = True + + await _main._dispatch_command(ctx, Command(action="resume")) # no force + + assert ctx.lifecycle.user_paused is False + assert canary.is_paused is True # still drift-paused + # Message must mention drift + status = [a for a in ctx.notifier.alerts if a.kind == "status"] + assert status and ("drift" in (status[0].title + status[0].body).lower()) + + # Now force + ctx.notifier.alerts.clear() + await _main._dispatch_command(ctx, Command(action="resume", value=1)) + assert canary.is_paused is False + + +@pytest.mark.asyncio +async def test_resume_out_of_window_responds_with_pending_message(): + """/resume while operating-hours window is closed → special body.""" + import atm.main as _main + from atm.commands import Command + + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed") + canary = types.SimpleNamespace(is_paused=False, resume=lambda: None) + + ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg) + + # Pin time to Saturday + import atm.main as _mm + real_time = _mm.time + fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp() + class _FakeTime: + def time(self): return fake_ts + def monotonic(self): return 0.0 + _mm.time = _FakeTime() + try: + await _main._dispatch_command(ctx, Command(action="resume")) + finally: + _mm.time = real_time + + assert ctx.lifecycle.user_paused is False + status = [a for a in ctx.notifier.alerts if a.kind == "status"] + assert status + combined = (status[0].title + status[0].body).lower() + assert "închis" in combined or "piața" in combined or "ferestr" in combined + + +@pytest.mark.asyncio +async def test_status_command_reports_pause_reason(): + """/status body must mention pause reason + window state.""" + import atm.main as _main + from atm.commands import Command + + ctx = _dispatch_ctx() + ctx.lifecycle.user_paused = True + # Stub detector.rolling for status + ctx.detector.rolling = [] + ctx.fsm.state = types.SimpleNamespace(value="IDLE") + + await _main._dispatch_command(ctx, Command(action="status")) + + status = [a for a in ctx.notifier.alerts if a.kind == "status"] + assert status + body = status[0].body + assert "user_paused" in body or "pauzat:user_paused" in body + + +@pytest.mark.asyncio +async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path): + """E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert. + + This test verifies the full command-driven lifecycle in isolation: + - canary starts drift-paused, _should_skip returns drift_paused + - /resume force clears canary + user_paused + - subsequent detection produces SELL fire through normal FSM path + """ + import atm.main as _main + from atm.commands import Command + + # Canary with mutable pause state + class _Canary: + def __init__(self): self._p = True + @property + def is_paused(self): return self._p + def resume(self): self._p = False + + canary = _Canary() + cfg = MagicMock() + cfg.telegram.auto_poll_interval_s = 180 + cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None) + + ctx = _dispatch_ctx(canary=canary, cfg=cfg) + + # 1. While drift-paused, _should_skip returns drift_paused + assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused" + + # 2. User issues /resume force + await _main._dispatch_command(ctx, Command(action="resume", value=1)) + assert canary.is_paused is False + assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None + + # 3. Feed a yellow→light_red sequence through _handle_tick (FSM path) + from atm.state_machine import StateMachine, State + fsm = StateMachine(lockout_s=60) + + class _N: + def __init__(self): self.alerts = [] + def send(self, a): self.alerts.append(a) + + class _A: + def log(self, _e): pass + + notif = _N() + audit = _A() + cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True)) + + _main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock) + _main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock) + tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock) + + # FSM reached fire via normal path + assert tr is not None and tr.trigger == "SELL" + assert fsm.state == State.IDLE