feat(commands): /pause /resume + adaptive dispatch + richer /status

Add two new Telegram commands so the user can manage monitoring without
restarting the process:

- /pause sets lifecycle.user_paused = True. The detection loop then
  short-circuits via _should_skip without touching FSM / canary state.
- /resume clears user_paused. R2 decision: drift-pause is NOT lifted by
  plain /resume (the drift may be legit and require recalibration).
  "/resume force" (value=1) also calls canary.resume(). The response
  message adapts to context:
    - drift active + plain resume → explains force requirement
    - force + drift → confirms override, warns about recurrence
    - out-of-window → explains monitor will resume at next open
    - otherwise → plain "Monitorizare reluată"
- /status now shows "Activ: <pause_reason | activ>" and window state.

commands.py: extend CommandAction literal and _parse_command to accept
pause, resume, and "resume force" (value=1 signal).

Tests: test_commands.py parse coverage;
test_pause_command_sets_user_paused_and_skips_detection,
test_resume_clears_user_paused_and_canary_when_forced,
test_resume_during_drift_keeps_canary_paused_without_force (R2 #21),
test_resume_out_of_window_responds_with_pending_message,
test_status_command_reports_pause_reason,
test_lifecycle_with_drift_then_resume_then_fire (E2E #16).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 12:01:19 +03:00
parent 54f55752c1
commit 23865776e3
4 changed files with 343 additions and 1 deletions

View File

@@ -17,7 +17,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
CommandAction = Literal["set_interval", "stop", "status", "ss"]
CommandAction = Literal["set_interval", "stop", "status", "ss", "pause", "resume"]
_BASE = "https://api.telegram.org/bot{token}/{method}"
@@ -154,6 +154,13 @@ class TelegramPoller:
return Command(action="status")
if t in ("ss", "screenshot"):
return Command(action="ss")
if t == "pause":
return Command(action="pause")
if t == "resume":
return Command(action="resume")
if t == "resume force":
# value=1 signals force: also lift canary drift-pause, not just user pause.
return Command(action="resume", value=1)
# "3" → set_interval 3 minutes → 180s; "interval 3" also accepted
parts = t.split()
if len(parts) == 1 and parts[0].isdigit():

View File

@@ -941,8 +941,23 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ"
) if ctx.scheduler.is_running else "oprit"
canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok"
# Active / pause reason + window state
active_info = "activ"
window_info = ""
if ctx.lifecycle is not None:
skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
if skip is not None:
active_info = f"pauzat:{skip}"
oh = getattr(ctx.cfg, "operating_hours", None)
if oh is not None and oh.enabled:
window_info = ctx.lifecycle.last_window_state or ""
else:
window_info = "always_on"
body = (
f"Stare: {ctx.fsm.state.value}\n"
f"Activ: {active_info} | Fereastră: {window_info}\n"
f"Ultima detecție: {last_color} (conf {last_conf})\n"
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n"
f"Poller: {sched_info} | Canary: {canary_info}"
@@ -963,6 +978,51 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
)
ctx.audit.log({"ts": now_ss, "event": "screenshot_sent", "path": str(path_ss) if path_ss else None})
ctx.notifier.send(Alert(kind="screenshot", title="Screenshot manual", body="", image_path=path_ss))
elif cmd.action == "pause":
# User manually stops monitoring. Canary drift state is untouched.
if ctx.lifecycle is not None:
ctx.lifecycle.user_paused = True
ctx.audit.log({"ts": time.time(), "event": "user_paused"})
ctx.notifier.send(Alert(
kind="status",
title="Monitorizare oprită manual",
body="Folosește /resume pentru a relua.",
))
elif cmd.action == "resume":
# R2: /resume clears only user_paused. Canary drift requires
# /resume force (value == 1) so the user acknowledges the risk.
was_drift = bool(getattr(ctx.canary, "is_paused", False))
was_user = bool(ctx.lifecycle.user_paused) if ctx.lifecycle is not None else False
force = cmd.value == 1
if ctx.lifecycle is not None:
ctx.lifecycle.user_paused = False
if force and was_drift:
ctx.canary.resume()
ctx.audit.log({
"ts": time.time(), "event": "user_resumed",
"was_drift": was_drift, "was_user": was_user, "force": force,
})
# Adaptive response
if was_drift and not force:
title = "Pauză user eliminată — dar Canary drift activ"
body = (
"Trimite /resume force pentru a anula drift-pause. "
"Recalibrează dacă driftul persistă."
)
elif force and was_drift:
title = "Drift-pause anulat manual (force)"
body = "Dacă driftul persistă, Canary va repauza."
else:
skip_now = None
if ctx.lifecycle is not None:
skip_now = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
if skip_now and skip_now.startswith("out_of_window"):
title = "Pauză eliminată — piața e închisă acum"
body = "Monitorizarea va porni la următoarea fereastră."
else:
title = "Monitorizare reluată"
body = ""
ctx.notifier.send(Alert(kind="status", title=title, body=body))
async def _drain_cmd_queue(ctx: RunContext) -> None:

45
tests/test_commands.py Normal file
View File

@@ -0,0 +1,45 @@
"""Tests for atm.commands — /pause /resume parsing (Commit 5)."""
from __future__ import annotations
from unittest.mock import MagicMock
from atm.commands import Command, TelegramPoller
def _make_poller() -> TelegramPoller:
cfg = MagicMock()
cfg.bot_token = "tok"
cfg.chat_id = "123"
cfg.allowed_chat_ids = ("123",)
cfg.poll_timeout_s = 1
return TelegramPoller(cfg, MagicMock(), MagicMock())
def test_parse_pause():
p = _make_poller()
assert p._parse_command("pause") == Command(action="pause")
assert p._parse_command("/pause") == Command(action="pause")
def test_parse_resume_plain():
p = _make_poller()
assert p._parse_command("resume") == Command(action="resume")
assert p._parse_command("/resume") == Command(action="resume")
def test_parse_resume_force():
p = _make_poller()
# "resume force" → value=1 signals force-resume of canary drift
cmd = p._parse_command("resume force")
assert cmd is not None
assert cmd.action == "resume"
assert cmd.value == 1
def test_parse_existing_commands_still_work():
"""Regression: adding pause/resume must not break stop/status/ss/interval."""
p = _make_poller()
assert p._parse_command("stop") == Command(action="stop")
assert p._parse_command("status") == Command(action="status")
assert p._parse_command("ss") == Command(action="ss")
assert p._parse_command("3") == Command(action="set_interval", value=180)

View File

@@ -749,3 +749,233 @@ def test_should_skip_canary_drift_wins_over_window():
tz = cfg.operating_hours._tz_cache
mid = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._should_skip(mid, lifecycle, cfg, _fake_canary(paused=True)) == "drift_paused"
# ---------------------------------------------------------------------------
# Commit 5: /pause /resume dispatch (plan tests #11-15, #16, R2 #21)
# ---------------------------------------------------------------------------
def _dispatch_ctx(canary=None, lifecycle=None, cfg=None):
"""Minimal RunContext for _dispatch_command unit tests."""
import atm.main as _main
class _A:
def __init__(self): self.events = []
def log(self, e): self.events.append(e)
class _N:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _S:
is_running = False
interval_s = None
def start(self, s): self.is_running = True
def stop(self): self.is_running = False
if canary is None:
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
if lifecycle is None:
lifecycle = _main.LifecycleState()
if cfg is None:
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
state = _main._LoopState(start=0.0)
ctx = _main.RunContext(
cfg=cfg, capture=lambda: None, canary=canary,
detector=MagicMock(), fsm=MagicMock(),
notifier=_N(), audit=_A(), detection_log=_A(),
scheduler=_S(), samples_dir=Path("."), fires_dir=Path("."),
cmd_queue=MagicMock(), state=state,
levels_extractor_factory=lambda *a, **kw: None,
lifecycle=lifecycle,
)
return ctx
@pytest.mark.asyncio
async def test_pause_command_sets_user_paused_and_skips_detection():
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
await _main._dispatch_command(ctx, Command(action="pause"))
assert ctx.lifecycle.user_paused is True
# When combined with _should_skip, we get user_paused
assert _main._should_skip(0.0, ctx.lifecycle, ctx.cfg, ctx.canary) == "user_paused"
# Audit + notif
assert any(e.get("event") == "user_paused" for e in ctx.audit.events)
assert any(a.kind == "status" and "oprit" in a.title.lower() for a in ctx.notifier.alerts)
@pytest.mark.asyncio
async def test_resume_clears_user_paused_and_canary_when_forced():
import atm.main as _main
from atm.commands import Command
canary_state = {"paused": True}
canary = types.SimpleNamespace(
is_paused=True,
resume=lambda: canary_state.__setitem__("paused", False),
)
# Re-bind is_paused via property so resume() effect is visible
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is False
force_events = [e for e in ctx.audit.events if e.get("event") == "user_resumed"]
assert force_events and force_events[0]["force"] is True
@pytest.mark.asyncio
async def test_resume_during_drift_keeps_canary_paused_without_force():
"""R2 #21: plain /resume during drift clears user_paused but NOT canary."""
import atm.main as _main
from atm.commands import Command
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
ctx = _dispatch_ctx(canary=canary)
ctx.lifecycle.user_paused = True
await _main._dispatch_command(ctx, Command(action="resume")) # no force
assert ctx.lifecycle.user_paused is False
assert canary.is_paused is True # still drift-paused
# Message must mention drift
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status and ("drift" in (status[0].title + status[0].body).lower())
# Now force
ctx.notifier.alerts.clear()
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert canary.is_paused is False
@pytest.mark.asyncio
async def test_resume_out_of_window_responds_with_pending_message():
"""/resume while operating-hours window is closed → special body."""
import atm.main as _main
from atm.commands import Command
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
lifecycle = _main.LifecycleState(user_paused=True, last_window_state="closed")
canary = types.SimpleNamespace(is_paused=False, resume=lambda: None)
ctx = _dispatch_ctx(canary=canary, lifecycle=lifecycle, cfg=cfg)
# Pin time to Saturday
import atm.main as _mm
real_time = _mm.time
fake_ts = _dt.datetime(2026, 4, 18, 12, 0, tzinfo=tz).timestamp()
class _FakeTime:
def time(self): return fake_ts
def monotonic(self): return 0.0
_mm.time = _FakeTime()
try:
await _main._dispatch_command(ctx, Command(action="resume"))
finally:
_mm.time = real_time
assert ctx.lifecycle.user_paused is False
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
combined = (status[0].title + status[0].body).lower()
assert "închis" in combined or "piața" in combined or "ferestr" in combined
@pytest.mark.asyncio
async def test_status_command_reports_pause_reason():
"""/status body must mention pause reason + window state."""
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
ctx.lifecycle.user_paused = True
# Stub detector.rolling for status
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
body = status[0].body
assert "user_paused" in body or "pauzat:user_paused" in body
@pytest.mark.asyncio
async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path):
"""E2E #16: drift paused → /resume force → dark_red/light_red produce FIRE alert.
This test verifies the full command-driven lifecycle in isolation:
- canary starts drift-paused, _should_skip returns drift_paused
- /resume force clears canary + user_paused
- subsequent detection produces SELL fire through normal FSM path
"""
import atm.main as _main
from atm.commands import Command
# Canary with mutable pause state
class _Canary:
def __init__(self): self._p = True
@property
def is_paused(self): return self._p
def resume(self): self._p = False
canary = _Canary()
cfg = MagicMock()
cfg.telegram.auto_poll_interval_s = 180
cfg.operating_hours = types.SimpleNamespace(enabled=False, _tz_cache=None)
ctx = _dispatch_ctx(canary=canary, cfg=cfg)
# 1. While drift-paused, _should_skip returns drift_paused
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) == "drift_paused"
# 2. User issues /resume force
await _main._dispatch_command(ctx, Command(action="resume", value=1))
assert canary.is_paused is False
assert _main._should_skip(0.0, ctx.lifecycle, cfg, canary) is None
# 3. Feed a yellow→light_red sequence through _handle_tick (FSM path)
from atm.state_machine import StateMachine, State
fsm = StateMachine(lockout_s=60)
class _N:
def __init__(self): self.alerts = []
def send(self, a): self.alerts.append(a)
class _A:
def log(self, _e): pass
notif = _N()
audit = _A()
cfg_mock = types.SimpleNamespace(alerts=types.SimpleNamespace(fire_on_phase_skip=True))
_main._handle_tick(fsm, "yellow", 1.0, notif, audit, first_accepted=False, cfg=cfg_mock)
_main._handle_tick(fsm, "dark_red", 2.0, notif, audit, first_accepted=False, cfg=cfg_mock)
tr = _main._handle_tick(fsm, "light_red", 3.0, notif, audit, first_accepted=False, cfg=cfg_mock)
# FSM reached fire via normal path
assert tr is not None and tr.trigger == "SELL"
assert fsm.state == State.IDLE