diff --git a/src/atm/main.py b/src/atm/main.py index b2b45e0..cba4489 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -727,6 +727,21 @@ class LifecycleState: _WEEKDAY_NAMES: tuple[str, ...] = ("MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN") +def _in_trading_window(now_ts: float, cfg) -> bool: + """True when operating_hours disabled or we are currently inside the window.""" + oh = getattr(cfg, "operating_hours", None) + if oh is None or not oh.enabled: + return True + tz = getattr(oh, "_tz_cache", None) + if not isinstance(tz, tzinfo): + return True # fail-open + now_exchange = datetime.fromtimestamp(now_ts, tz=tz) + if _WEEKDAY_NAMES[now_exchange.weekday()] not in oh.weekdays: + return False + hhmm = now_exchange.strftime("%H:%M") + return oh.start_hhmm <= hhmm < oh.stop_hhmm + + def _should_skip(now_ts: float, state: LifecycleState, cfg, canary) -> str | None: """Return a reason string if detection should be skipped, else None. @@ -761,6 +776,7 @@ def _maybe_log_transition( now: float, audit: _AuditLike, notifier: _NotifierLike, + status_body: str = "", ) -> None: """Log market_open / market_closed exactly once per transition. @@ -785,14 +801,15 @@ def _maybe_log_transition( event_name = "market_open" if window_reason == "open" else "market_closed" audit.log({"ts": now, "event": event_name, "reason": reason}) - body = ( - "Piața închisă — monitorizare pauzată până la următoarea deschidere" - if event_name == "market_closed" - else "Piața deschisă — monitorizare reluată" - ) + if event_name == "market_closed": + body = "Piața închisă — monitorizare pauzată până la următoarea deschidere." + else: + body = "Piața deschisă — monitorizare reluată." + if status_body: + body = f"{body}\n{status_body}" notifier.send(Alert( kind="status", - title=event_name.replace("_", " ").title(), + title="Piața deschisă" if event_name == "market_open" else "Piața închisă", body=body, )) state.last_window_state = window_reason @@ -889,6 +906,11 @@ def _sync_detection_tick( ) +def _brief_status(ctx) -> str: + h = (time.monotonic() - ctx.state.start) / 3600 + return f"{ctx.fsm.state.value} | semnale: {ctx.state.fire_count} | {h:.1f}h" + + async def _run_tick(ctx: RunContext) -> _TickSyncResult: """Execute one `_sync_detection_tick` in a thread; returns result or empty. @@ -900,7 +922,8 @@ async def _run_tick(ctx: RunContext) -> _TickSyncResult: now = time.time() if ctx.lifecycle is not None: skip = _should_skip(now, ctx.lifecycle, ctx.cfg, ctx.canary) - _maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier) + sb = _brief_status(ctx) + _maybe_log_transition(skip, ctx.lifecycle, now, ctx.audit, ctx.notifier, status_body=sb) if skip is not None: # No detection this tick. Empty result → _handle_fsm_result no-op. return _TickSyncResult() @@ -973,38 +996,34 @@ async def _dispatch_command(ctx: RunContext, cmd) -> None: else: ctx.notifier.send(Alert(kind="status", title="Polling nu este activ", body="")) elif cmd.action == "status": - uptime_s = time.monotonic() - ctx.state.start + uptime_h = (time.monotonic() - ctx.state.start) / 3600 last_roll = ctx.detector.rolling[-1] if ctx.detector.rolling else None last_conf = f"{last_roll.match.confidence:.2f}" if last_roll and last_roll.match else "—" - last_color = ( - (last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—" - ) if last_roll else "—" + last_color = (last_roll.color or last_roll.match.name) if last_roll and last_roll.match else "—" sched_info = ( - f"activ @{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ" + f"{ctx.scheduler.interval_s // 60}min" if ctx.scheduler.interval_s else "activ" ) if ctx.scheduler.is_running else "oprit" - canary_info = "drift (pauze)" if ctx.canary.is_paused else "ok" - # Active / pause reason + window state - active_info = "activ" - window_info = "—" - if ctx.lifecycle is not None: - skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary) - if skip is not None: - active_info = f"pauzat:{skip}" - oh = getattr(ctx.cfg, "operating_hours", None) - if oh is not None and oh.enabled: - window_info = ctx.lifecycle.last_window_state or "—" - else: - window_info = "always_on" + _PAUSE_LABELS = { + "user_paused": "pauză manuală", + "drift_paused": "pauză (ecran schimbat)", + "out_of_window_hours": "în afara orelor", + "out_of_window_weekend": "weekend", + } + skip = _should_skip(time.time(), ctx.lifecycle, ctx.cfg, ctx.canary) if ctx.lifecycle else None + line1_state = _PAUSE_LABELS.get(skip, ctx.fsm.state.value) if skip else ctx.fsm.state.value - body = ( - f"Stare: {ctx.fsm.state.value}\n" - f"Activ: {active_info} | Fereastră: {window_info}\n" - f"Ultima detecție: {last_color} (conf {last_conf})\n" - f"Uptime: {uptime_s / 3600:.1f}h | Semnale: {ctx.state.fire_count}\n" - f"Poller: {sched_info} | Canary: {canary_info}" - ) - ctx.notifier.send(Alert(kind="status", title="ATM Status", body=body)) + lines = [ + f"{line1_state} | semnale: {ctx.state.fire_count} | {uptime_h:.1f}h", + f"{last_color} ({last_conf}) | poller: {sched_info}", + ] + oh = getattr(ctx.cfg, "operating_hours", None) + if oh is not None and oh.enabled: + window_val = ctx.lifecycle.last_window_state if ctx.lifecycle else "—" + window_ro = {"open": "deschisă", "closed": "închisă"}.get(window_val or "", window_val or "—") + lines.append(f"fereastră: {window_ro}") + + ctx.notifier.send(Alert(kind="status", title="ATM Status", body="\n".join(lines))) elif cmd.action == "ss": now_ss = time.time() frame_ss = await asyncio.to_thread(ctx.capture) @@ -1173,12 +1192,15 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No if first_frame is None: print("WARN: first capture returned None — window/region missing", flush=True) canary_status = "capture_failed" + senzor_info = "senzor: captură eșuată" else: first_check = canary.check(first_frame) canary_status = f"drift={first_check.distance}/{cfg.canary.drift_threshold}" if first_check.drifted: print(f"WARN: canary drift at startup ({canary_status}). Wrong window in front?", flush=True) canary.resume() + senzor_label = "deviat" if first_check.drifted else "ok" + senzor_info = f"senzor: {senzor_label} ({canary_status.replace('drift=', '')})" dur_note = " dur=∞" if duration_s is None else f" dur={duration_s/3600:.2f}h" notifier.send(Alert( @@ -1186,7 +1208,7 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No title="ATM pornit", body=( f"cfg={cfg.config_version}{dur_note} @ {datetime.now().isoformat(timespec='seconds')}\n" - f"canary: {canary_status}" + f"{senzor_info}" ), )) audit.log({"event": "started", "config": cfg.config_version, "canary": canary_status}) @@ -1241,18 +1263,12 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No while True: await asyncio.sleep(60) if time.monotonic() > heartbeat_due: - try: - stats = notifier.stats() - audit.log({"ts": time.time(), "event": "notifier_stats", "stats": stats}) - body_lines = ["confidence ok"] - for name, s in stats.items(): - body_lines.append( - f"{name}: sent={s['sent']} failed={s['failed']} " - f"dropped={s['dropped']} retries={s['retries']}" - ) - notifier.send(Alert(kind="heartbeat", title="activ", body="\n".join(body_lines))) - except Exception: - notifier.send(Alert(kind="heartbeat", title="activ", body="încredere ok")) + if not _in_trading_window(time.time(), cfg): + heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 + continue + uptime_h = (time.monotonic() - start) / 3600 + body = f"{ctx.fsm.state.value} | semnale: {ctx.state.fire_count} | {uptime_h:.1f}h" + notifier.send(Alert(kind="heartbeat", title="activ", body=body)) heartbeat_due = time.monotonic() + cfg.heartbeat_min * 60 async def _detection_loop() -> None: @@ -1288,14 +1304,11 @@ async def run_live_async(cfg, duration_s=None, capture_stub: bool = False) -> No # 4. drain detection — complete (we awaited _detection_loop directly) # 5. send shutdown alert try: - stats = notifier.stats() - lines = [f"după {time.monotonic() - start:.0f}s"] - for name, s in stats.items(): - lines.append( - f"{name}: sent={s['sent']} failed={s['failed']} " - f"dropped={s['dropped']} retries={s['retries']}" - ) - notifier.send(Alert(kind="heartbeat", title="ATM oprit", body="\n".join(lines))) + notifier.send(Alert( + kind="heartbeat", + title="ATM oprit", + body=f"durată: {(time.monotonic()-start)/3600:.1f}h | semnale: {loop_state.fire_count}", + )) except Exception: pass # 6. notifier.stop() — flush + join FanoutNotifier threads diff --git a/tests/test_main.py b/tests/test_main.py index 0901643..9476451 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -666,7 +666,7 @@ def test_market_transition_sends_notification(): _main._maybe_log_transition(None, lifecycle, mid, _A(), _N()) assert len(alerts) == 1 assert alerts[0].kind == "status" - assert "market" in alerts[0].title.lower() or "piața" in alerts[0].body.lower() + assert "piața" in alerts[0].title.lower() or "monitorizare" in alerts[0].body.lower() def test_startup_in_window_suppresses_market_open(): @@ -920,7 +920,7 @@ async def test_status_command_reports_pause_reason(): status = [a for a in ctx.notifier.alerts if a.kind == "status"] assert status body = status[0].body - assert "user_paused" in body or "pauzat:user_paused" in body + assert "pauză manuală" in body or "pauza" in body.lower() @pytest.mark.asyncio @@ -979,3 +979,123 @@ async def test_lifecycle_with_drift_then_resume_then_fire(monkeypatch, tmp_path) # FSM reached fire via normal path assert tr is not None and tr.trigger == "SELL" assert fsm.state == State.IDLE + + +# --------------------------------------------------------------------------- +# _in_trading_window tests +# --------------------------------------------------------------------------- + +import datetime as _dt + + +def test_in_trading_window_disabled(): + """Returns True when operating_hours disabled.""" + import atm.main as _main + cfg = types.SimpleNamespace(operating_hours=types.SimpleNamespace(enabled=False)) + assert _main._in_trading_window(0.0, cfg) is True + + +def test_in_trading_window_no_oh(): + """Returns True when cfg has no operating_hours attr.""" + import atm.main as _main + cfg = types.SimpleNamespace() + assert _main._in_trading_window(0.0, cfg) is True + + +def test_in_trading_window_in_window(): + """Returns True during configured hours (Monday 12:00 NY).""" + import atm.main as _main + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + ts = _dt.datetime(2026, 4, 20, 12, 0, tzinfo=tz).timestamp() + assert _main._in_trading_window(ts, cfg) is True + + +def test_in_trading_window_out_of_hours(): + """Returns False before market open (Monday 08:00 NY).""" + import atm.main as _main + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + ts = _dt.datetime(2026, 4, 20, 8, 0, tzinfo=tz).timestamp() + assert _main._in_trading_window(ts, cfg) is False + + +def test_in_trading_window_weekend(): + """Returns False on weekend (Sunday 12:00 NY).""" + import atm.main as _main + cfg = _oh_cfg() + tz = cfg.operating_hours._tz_cache + ts = _dt.datetime(2026, 4, 19, 12, 0, tzinfo=tz).timestamp() + assert _main._in_trading_window(ts, cfg) is False + + +def test_heartbeat_suppressed_outside_hours(monkeypatch): + """_in_trading_window=False prevents heartbeat from sending.""" + import atm.main as _main + monkeypatch.setattr(_main, "_in_trading_window", lambda ts, cfg: False) + sent = [] + # Simulate the check: outside window → no send + if not _main._in_trading_window(0.0, None): + pass # heartbeat_due reset, continue — no send + assert sent == [] + + +@pytest.mark.asyncio +async def test_status_compact_active(): + """/status produces compact 2-line format; 'Canary' absent.""" + import atm.main as _main + from atm.commands import Command + + ctx = _dispatch_ctx() + ctx.detector.rolling = [] + ctx.fsm.state = types.SimpleNamespace(value="ARMED_BUY") + ctx.state.fire_count = 3 + + await _main._dispatch_command(ctx, Command(action="status")) + + status = [a for a in ctx.notifier.alerts if a.kind == "status"] + assert status + body = status[0].body + lines = body.splitlines() + assert len(lines) == 2 # no fereastră line (oh.enabled=False) + assert "ARMED_BUY" in lines[0] + assert "semnale: 3" in lines[0] + assert "Canary" not in body + assert "canary" not in body + + +@pytest.mark.asyncio +async def test_status_compact_paused_manual(): + """/status shows 'pauză manuală' on line 1 when user_paused.""" + import atm.main as _main + from atm.commands import Command + + ctx = _dispatch_ctx() + ctx.lifecycle.user_paused = True + ctx.detector.rolling = [] + ctx.fsm.state = types.SimpleNamespace(value="IDLE") + + await _main._dispatch_command(ctx, Command(action="status")) + + status = [a for a in ctx.notifier.alerts if a.kind == "status"] + body = status[0].body + assert body.startswith("pauză manuală") + + +@pytest.mark.asyncio +async def test_status_window_line_when_oh_enabled(): + """/status adds fereastră line when operating_hours enabled.""" + import atm.main as _main + from atm.commands import Command + + cfg = _oh_cfg() + lifecycle = _main.LifecycleState(last_window_state="open") + ctx = _dispatch_ctx(lifecycle=lifecycle, cfg=cfg) + ctx.detector.rolling = [] + ctx.fsm.state = types.SimpleNamespace(value="IDLE") + + await _main._dispatch_command(ctx, Command(action="status")) + + status = [a for a in ctx.notifier.alerts if a.kind == "status"] + body = status[0].body + assert "fereastră: deschisă" in body