diff --git a/dashboard/api.py b/dashboard/api.py index 4f1fcd4..d7851e4 100644 --- a/dashboard/api.py +++ b/dashboard/api.py @@ -163,6 +163,8 @@ class TaskBoardHandler( self.handle_eco_doctor() elif self.path == '/api/ralph/status' or self.path.startswith('/api/ralph/status?'): self.handle_ralph_status() + elif self.path == '/api/ralph/usage' or self.path.startswith('/api/ralph/usage?'): + self.handle_ralph_usage() elif self.path.startswith('/api/ralph/'): # /api/ralph//log or /api/ralph//prd parts = self.path.split('?', 1)[0].split('/') diff --git a/dashboard/handlers/ralph.py b/dashboard/handlers/ralph.py index b2bd14b..85d84b1 100644 --- a/dashboard/handlers/ralph.py +++ b/dashboard/handlers/ralph.py @@ -1,9 +1,10 @@ -"""Ralph live dashboard endpoints (W3). +"""Ralph live dashboard endpoints (W3 + instrumentation). Endpoints: GET /api/ralph/status — toate proiectele Ralph (cards data) GET /api/ralph//log — tail progress.txt (default 100 lines) GET /api/ralph//prd — full prd.json content + GET /api/ralph/usage[?days=N] — rate limit budget summary (cross-project) POST /api/ralph//stop — SIGTERM la Ralph PID Polling: 5s din ralph.html (suficient pentru iter 8-15min Ralph). @@ -14,18 +15,30 @@ Citește status din `~/workspace//scripts/ralph/`: - progress.txt → log human-readable - logs/iteration-*.log → mtime ultimului iter - .ralph.pid → PID activ (verificat cu os.kill 0) +- usage.jsonl → token/cost log per iter (instrumentation MVP) Reuse path constants din `dashboard/constants.py` (WORKSPACE_DIR). """ import json import os import signal +import sys from datetime import datetime from pathlib import Path from urllib.parse import unquote import constants +# Best-effort import of pure functions for /api/ralph/usage (instrumentation MVP). +# Helper lives at /tools/ralph_usage.py — sibling of `dashboard/`. +_TOOLS_DIR = Path(__file__).resolve().parents[2] / "tools" +if str(_TOOLS_DIR) not in sys.path: + sys.path.insert(0, str(_TOOLS_DIR)) +try: + import ralph_usage # type: ignore +except ImportError: # pragma: no cover — diagnostic only + ralph_usage = None # type: ignore + # Path Ralph per proiect (mereu în scripts/ralph/) def _ralph_dir(project_dir: Path) -> Path: @@ -259,6 +272,58 @@ class RalphHandlers: except Exception as exc: self.send_json({"error": str(exc)}, 500) + # ── /api/ralph/usage (GET) ───────────────────────────────── + def handle_ralph_usage(self): + """Returnează rate limit budget summary cross-project. + + Citește toate `~/workspace//scripts/ralph/usage.jsonl`, le concatenează, + rulează `ralph_usage.summarize` cu `?days=N` (default 7). + + Răspuns: + { + "today": "YYYY-MM-DD", + "today_cost": float, + "today_runs": int, + "window_days": N, + "window_cost": float, + "window_runs": int, + "by_project": {...}, + "by_day": {...}, + "total_cost": float, + "total_runs": int + } + """ + try: + from urllib.parse import parse_qs, urlparse + qs = parse_qs(urlparse(self.path).query) + try: + days = int(qs.get("days", ["7"])[0]) + if days <= 0: + days = 7 + if days > 365: + days = 365 + except ValueError: + days = 7 + + if ralph_usage is None: + self.send_json({"error": "ralph_usage helper unavailable"}, 500) + return + + entries: list[dict] = [] + if constants.WORKSPACE_DIR.exists(): + for entry in sorted(constants.WORKSPACE_DIR.iterdir()): + if not entry.is_dir() or entry.name.startswith("."): + continue + usage_path = _ralph_dir(entry) / "usage.jsonl" + if usage_path.exists(): + entries.extend(ralph_usage.parse_usage_jsonl(usage_path)) + + summary = ralph_usage.summarize(entries, days=days) + summary["fetchedAt"] = datetime.now().isoformat() + self.send_json(summary) + except Exception as exc: + self.send_json({"error": str(exc)}, 500) + # ── /api/ralph//stop (POST) ────────────────────────── def handle_ralph_stop(self, slug: str): """Trimite SIGTERM la Ralph PID. Verifică că PID-ul e în WORKSPACE_DIR.""" diff --git a/src/router.py b/src/router.py index c07f960..d8bb222 100644 --- a/src/router.py +++ b/src/router.py @@ -241,8 +241,51 @@ def _maybe_whatsapp_redirect(text: str, adapter_name: str | None) -> str: return text +def _translate_whatsapp_text(text: str) -> str | None: + """Translate WhatsApp text-keyword commands to slash equivalents. + + Acoperă **doar** keyword-urile robuste (single-token + opțional slug): + - `aprob` → `/a` (listează pending) + - `aprob ` → `/a ` (aprobă proiect) + - `stop ` → `/k ` (oprește Ralph) + - `stare` → `/l` (status global) + - `stare ` → `/l ` (status filtrat) + + NU acoperă `propose` — descrierea liberă e prea fragilă pentru parsing + text-only (utilizatorii ar trimite descrieri multi-line care s-ar + interpreta greșit). Pentru propose, redirecționăm spre Discord/Telegram. + + Returnează slash command translatat sau None dacă text-ul nu match. + Case-insensitive pe keyword (slug-ul rămâne ca în input). + + Apelat DOAR pe adapter `whatsapp` în router (nu vrem ca un user pe + Discord să zică „stop" și să se întâmple ceva). + """ + if not text or not text.strip(): + return None + + parts = text.strip().split(None, 1) + keyword = parts[0].lower() + rest = parts[1].strip() if len(parts) > 1 else "" + + if keyword == "aprob": + return f"/a {rest}".rstrip() + if keyword == "stop" and rest: + # `stop` fără slug ar putea fi colocvial („stop, am uitat ceva") — nu translatăm. + return f"/k {rest}" + if keyword == "stare": + return f"/l {rest}".rstrip() + return None + + def _try_ralph_dispatch(text: str, adapter_name: str | None = None) -> str | None: """Parse and dispatch Ralph commands. Returns response string or None if no match.""" + # WhatsApp keyword preprocessing — doar pe whatsapp, înainte de dispatch. + if adapter_name == "whatsapp": + translated = _translate_whatsapp_text(text) + if translated is not None: + text = translated + low = text.lower() first = low.split(None, 1)[0] if low else "" diff --git a/tests/test_dashboard_ralph_endpoint.py b/tests/test_dashboard_ralph_endpoint.py index f0a72a2..aa46cff 100644 --- a/tests/test_dashboard_ralph_endpoint.py +++ b/tests/test_dashboard_ralph_endpoint.py @@ -186,6 +186,54 @@ class TestPrd: assert handler.captured_code == 400 +# ── /api/ralph/usage ──────────────────────────────────────────── + + +class TestUsageEndpoint: + def test_usage_empty_workspace(self, handler): + handler.path = "/api/ralph/usage" + handler.handle_ralph_usage() + assert handler.captured_code == 200 + assert handler.captured["today_runs"] == 0 + assert handler.captured["total_runs"] == 0 + assert handler.captured["by_project"] == {} + + def test_usage_aggregates_across_projects(self, handler, tmp_path): + # Create two projects, each with usage.jsonl + for slug, cost, ts in [("proj-a", 0.5, "2026-04-26T10:00:00+00:00"), + ("proj-b", 0.3, "2026-04-26T11:00:00+00:00")]: + ralph_dir = tmp_path / slug / "scripts" / "ralph" + ralph_dir.mkdir(parents=True) + (ralph_dir / "usage.jsonl").write_text( + json.dumps({"slug": slug, "ts": ts, "total_cost_usd": cost, + "input_tokens": 100, "output_tokens": 50, "cache_read": 0}) + "\n", + encoding="utf-8", + ) + handler.path = "/api/ralph/usage?days=30" + handler.handle_ralph_usage() + assert handler.captured_code == 200 + # Should have both projects + assert "proj-a" in handler.captured["by_project"] + assert "proj-b" in handler.captured["by_project"] + assert handler.captured["total_runs"] == 2 + assert handler.captured["window_runs"] == 2 + + def test_usage_invalid_days_falls_back(self, handler): + handler.path = "/api/ralph/usage?days=abc" + handler.handle_ralph_usage() + assert handler.captured_code == 200 + assert handler.captured["window_days"] == 7 + + def test_usage_handles_corrupt_jsonl(self, handler, tmp_path): + # Project with corrupt usage.jsonl shouldn't 500 + ralph_dir = tmp_path / "broken" / "scripts" / "ralph" + ralph_dir.mkdir(parents=True) + (ralph_dir / "usage.jsonl").write_text("not json\n", encoding="utf-8") + handler.path = "/api/ralph/usage" + handler.handle_ralph_usage() + assert handler.captured_code == 200 + + # ── _ralph_validate_slug ─────────────────────────────────────── diff --git a/tests/test_ralph_usage.py b/tests/test_ralph_usage.py new file mode 100644 index 0000000..955232f --- /dev/null +++ b/tests/test_ralph_usage.py @@ -0,0 +1,366 @@ +"""Tests for tools/ralph_usage.py — rate limit budget tracking. + +Acoperă: +- extract_usage_entry: shape corect, missing fields, JSON corupt → None +- parse_usage_jsonl: skip linii corupte, file lipsă → [] +- aggregate_by_day / aggregate_by_project: sume corecte, deduplicare +- filter_by_days: window inclusiv vs exclusiv +- summarize: today_cost/today_runs corecte +- append_entry: atomic write, JSONL roundtrip +- CLI append: idempotent la JSON corupt (no-op + exit 0) +""" +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +TOOLS = PROJECT_ROOT / "tools" +if str(TOOLS) not in sys.path: + sys.path.insert(0, str(TOOLS)) + +import ralph_usage # noqa: E402 + + +# ── Sample claude -p --output-format json envelopes ──────────────── + + +def _claude_envelope( + *, + cost: float = 0.55, + input_tokens: int = 1234, + output_tokens: int = 567, + cache_read: int = 890, + duration_ms: int = 49000, + model: str = "claude-opus-4-7-20260101", +) -> dict: + return { + "type": "result", + "subtype": "completed", + "session_id": "abc123", + "result": "Story implementat", + "is_error": False, + "total_cost_usd": cost, + "duration_ms": duration_ms, + "num_turns": 5, + "usage": { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": cache_read, + }, + "model": model, + } + + +# ── extract_usage_entry ──────────────────────────────────────────── + + +class TestExtractEntry: + def test_full_envelope_extracts_all_fields(self): + env = _claude_envelope() + entry = ralph_usage.extract_usage_entry( + env, slug="proj-a", story_id="US-001", iter_n=3, + ts="2026-04-26T12:00:00+00:00", + ) + assert entry == { + "ts": "2026-04-26T12:00:00+00:00", + "slug": "proj-a", + "story_id": "US-001", + "iter": 3, + "total_cost_usd": 0.55, + "input_tokens": 1234, + "output_tokens": 567, + "cache_read": 890, + "model": "claude-opus-4-7-20260101", + "duration_ms": 49000, + } + + def test_accepts_raw_string(self): + env = _claude_envelope() + entry = ralph_usage.extract_usage_entry( + json.dumps(env), slug="x", story_id=None, iter_n=None, + ts="2026-04-26T00:00:00+00:00", + ) + assert entry is not None + assert entry["story_id"] is None + assert entry["iter"] is None + assert entry["total_cost_usd"] == 0.55 + + def test_corrupt_json_returns_none(self): + assert ralph_usage.extract_usage_entry("{not json", slug="x") is None + assert ralph_usage.extract_usage_entry("", slug="x") is None + assert ralph_usage.extract_usage_entry("null", slug="x") is None + + def test_missing_usage_field_zeros(self): + env = {"total_cost_usd": 0.1, "duration_ms": 1000} + entry = ralph_usage.extract_usage_entry(env, slug="x") + assert entry["input_tokens"] == 0 + assert entry["output_tokens"] == 0 + assert entry["cache_read"] == 0 + assert entry["model"] is None + + def test_missing_cost_defaults_zero(self): + env = {"usage": {"input_tokens": 100}} + entry = ralph_usage.extract_usage_entry(env, slug="x") + assert entry["total_cost_usd"] == 0.0 + assert entry["input_tokens"] == 100 + + def test_non_dict_returns_none(self): + assert ralph_usage.extract_usage_entry([], slug="x") is None + assert ralph_usage.extract_usage_entry(123, slug="x") is None + + def test_alternative_cache_field_name(self): + # Defensive: dacă viitor schema folosește `cache_read` + env = {"usage": {"cache_read": 42}, "total_cost_usd": 0.1} + entry = ralph_usage.extract_usage_entry(env, slug="x") + assert entry["cache_read"] == 42 + + +# ── parse_usage_jsonl ────────────────────────────────────────────── + + +class TestParseJsonl: + def test_file_missing_returns_empty(self, tmp_path): + assert ralph_usage.parse_usage_jsonl(tmp_path / "ghost.jsonl") == [] + + def test_skips_corrupt_lines(self, tmp_path): + p = tmp_path / "u.jsonl" + p.write_text( + '{"slug": "a", "ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.1}\n' + "{not json}\n" + '{"slug": "b", "ts": "2026-04-26T01:00:00+00:00", "total_cost_usd": 0.2}\n' + "\n" + "[]\n", # not a dict + encoding="utf-8", + ) + entries = ralph_usage.parse_usage_jsonl(p) + slugs = [e["slug"] for e in entries] + assert slugs == ["a", "b"] + + def test_empty_file_returns_empty(self, tmp_path): + p = tmp_path / "u.jsonl" + p.write_text("", encoding="utf-8") + assert ralph_usage.parse_usage_jsonl(p) == [] + + +# ── aggregate_by_day / aggregate_by_project ─────────────────────── + + +class TestAggregate: + @pytest.fixture + def entries(self): + return [ + {"slug": "proj-a", "ts": "2026-04-26T10:00:00+00:00", + "total_cost_usd": 0.5, "input_tokens": 100, "output_tokens": 50, "cache_read": 200}, + {"slug": "proj-a", "ts": "2026-04-26T11:00:00+00:00", + "total_cost_usd": 0.3, "input_tokens": 80, "output_tokens": 30, "cache_read": 100}, + {"slug": "proj-b", "ts": "2026-04-25T22:00:00+00:00", + "total_cost_usd": 1.2, "input_tokens": 500, "output_tokens": 200, "cache_read": 0}, + ] + + def test_aggregate_by_day(self, entries): + result = ralph_usage.aggregate_by_day(entries) + assert result["2026-04-26"]["cost_usd"] == 0.8 + assert result["2026-04-26"]["runs"] == 2 + assert result["2026-04-26"]["input_tokens"] == 180 + assert result["2026-04-26"]["output_tokens"] == 80 + assert result["2026-04-26"]["cache_read"] == 300 + assert result["2026-04-25"]["cost_usd"] == 1.2 + assert result["2026-04-25"]["runs"] == 1 + # Sortare descrescătoare în iteration order + keys = list(result.keys()) + assert keys == ["2026-04-26", "2026-04-25"] + + def test_aggregate_by_project(self, entries): + result = ralph_usage.aggregate_by_project(entries) + assert result["proj-a"]["cost_usd"] == 0.8 + assert result["proj-a"]["runs"] == 2 + assert result["proj-b"]["cost_usd"] == 1.2 + assert result["proj-b"]["runs"] == 1 + + def test_aggregate_handles_missing_slug(self): + entries = [{"ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.1}] + result = ralph_usage.aggregate_by_project(entries) + assert "unknown" in result + + def test_aggregate_handles_missing_ts(self): + entries = [{"slug": "x", "total_cost_usd": 0.1}] + # Missing ts → skipped from by_day + result = ralph_usage.aggregate_by_day(entries) + assert result == {} + + def test_aggregate_empty_entries(self): + assert ralph_usage.aggregate_by_day([]) == {} + assert ralph_usage.aggregate_by_project([]) == {} + + +# ── filter_by_days ───────────────────────────────────────────────── + + +class TestFilterByDays: + def test_window_inclusive_today(self): + entries = [ + {"ts": "2026-04-26T00:00:00+00:00", "slug": "a"}, + {"ts": "2026-04-25T00:00:00+00:00", "slug": "a"}, + {"ts": "2026-04-20T00:00:00+00:00", "slug": "a"}, + ] + kept = ralph_usage.filter_by_days(entries, 7, today="2026-04-26") + # 7-day window inclusiv de la today: 2026-04-20 .. 2026-04-26 + slugs = [e["ts"][:10] for e in kept] + assert slugs == ["2026-04-26", "2026-04-25", "2026-04-20"] + + def test_window_exclusive_older(self): + entries = [ + {"ts": "2026-04-26T00:00:00+00:00"}, + {"ts": "2026-04-19T00:00:00+00:00"}, # 7 days before today → exclus + ] + kept = ralph_usage.filter_by_days(entries, 7, today="2026-04-26") + assert len(kept) == 1 + assert kept[0]["ts"] == "2026-04-26T00:00:00+00:00" + + def test_zero_days_empty(self): + entries = [{"ts": "2026-04-26T00:00:00+00:00"}] + assert ralph_usage.filter_by_days(entries, 0, today="2026-04-26") == [] + + def test_corrupt_ts_skipped(self): + entries = [{"ts": "garbage"}] + assert ralph_usage.filter_by_days(entries, 7, today="2026-04-26") == [] + + +# ── summarize ────────────────────────────────────────────────────── + + +class TestSummarize: + def test_summary_shape_and_today_split(self): + entries = [ + {"ts": "2026-04-26T10:00:00+00:00", "slug": "a", "total_cost_usd": 0.5, + "input_tokens": 100, "output_tokens": 50, "cache_read": 0}, + {"ts": "2026-04-26T11:00:00+00:00", "slug": "a", "total_cost_usd": 0.3, + "input_tokens": 80, "output_tokens": 30, "cache_read": 0}, + {"ts": "2026-04-25T00:00:00+00:00", "slug": "b", "total_cost_usd": 1.0, + "input_tokens": 0, "output_tokens": 0, "cache_read": 0}, + ] + s = ralph_usage.summarize(entries, days=7, today="2026-04-26") + assert s["today"] == "2026-04-26" + assert s["today_cost"] == 0.8 + assert s["today_runs"] == 2 + assert s["window_days"] == 7 + assert s["window_runs"] == 3 + assert "by_project" in s + assert "by_day" in s + assert s["total_runs"] == 3 + assert s["by_project"]["a"]["runs"] == 2 + assert s["by_project"]["b"]["runs"] == 1 + + def test_summary_empty_entries(self): + s = ralph_usage.summarize([], days=7, today="2026-04-26") + assert s["today_cost"] == 0 + assert s["today_runs"] == 0 + assert s["by_project"] == {} + assert s["by_day"] == {} + assert s["total_runs"] == 0 + + +# ── append_entry ─────────────────────────────────────────────────── + + +class TestAppendEntry: + def test_append_creates_file_with_jsonl_format(self, tmp_path): + usage = tmp_path / "usage.jsonl" + entry = {"slug": "x", "ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.1} + ralph_usage.append_entry(usage, entry) + text = usage.read_text(encoding="utf-8") + assert text.endswith("\n") + loaded = json.loads(text.strip()) + assert loaded == entry + + def test_append_preserves_existing_entries(self, tmp_path): + usage = tmp_path / "usage.jsonl" + usage.write_text( + '{"slug": "a", "ts": "2026-04-25T00:00:00+00:00", "total_cost_usd": 0.5}\n', + encoding="utf-8", + ) + ralph_usage.append_entry(usage, {"slug": "b", "ts": "2026-04-26T00:00:00+00:00", + "total_cost_usd": 0.3}) + entries = ralph_usage.parse_usage_jsonl(usage) + assert len(entries) == 2 + assert entries[0]["slug"] == "a" + assert entries[1]["slug"] == "b" + + def test_append_handles_missing_trailing_newline(self, tmp_path): + usage = tmp_path / "usage.jsonl" + usage.write_text( + '{"slug": "a", "ts": "2026-04-25T00:00:00+00:00"}', # no trailing \n + encoding="utf-8", + ) + ralph_usage.append_entry(usage, {"slug": "b", "ts": "2026-04-26T00:00:00+00:00"}) + entries = ralph_usage.parse_usage_jsonl(usage) + assert [e["slug"] for e in entries] == ["a", "b"] + + +# ── CLI: append subcommand ───────────────────────────────────────── + + +class TestCliAppend: + def test_append_from_log_file(self, tmp_path): + log = tmp_path / "iter.log" + log.write_text(json.dumps(_claude_envelope(cost=0.42)), encoding="utf-8") + usage = tmp_path / "usage.jsonl" + + rc = ralph_usage.main([ + "append", str(usage), str(log), + "--slug", "proj-a", + "--story-id", "US-001", + "--iter", "3", + ]) + assert rc == 0 + entries = ralph_usage.parse_usage_jsonl(usage) + assert len(entries) == 1 + e = entries[0] + assert e["slug"] == "proj-a" + assert e["story_id"] == "US-001" + assert e["iter"] == 3 + assert e["total_cost_usd"] == 0.42 + + def test_append_corrupt_log_no_op(self, tmp_path): + log = tmp_path / "iter.log" + log.write_text("not json", encoding="utf-8") + usage = tmp_path / "usage.jsonl" + + rc = ralph_usage.main([ + "append", str(usage), str(log), + "--slug", "proj-a", + ]) + # Idempotent: corrupt JSON → exit 0, no entry written + assert rc == 0 + assert not usage.exists() or ralph_usage.parse_usage_jsonl(usage) == [] + + def test_append_missing_log_no_op(self, tmp_path): + usage = tmp_path / "usage.jsonl" + rc = ralph_usage.main([ + "append", str(usage), str(tmp_path / "missing.log"), + "--slug", "x", + ]) + assert rc == 0 + + +# ── CLI: summarize subcommand ────────────────────────────────────── + + +class TestCliSummarize: + def test_summarize_outputs_json(self, tmp_path, capsys): + usage = tmp_path / "usage.jsonl" + usage.write_text( + json.dumps({"slug": "x", "ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.5}) + "\n", + encoding="utf-8", + ) + rc = ralph_usage.main(["summarize", str(usage), "--days", "7"]) + assert rc == 0 + out = json.loads(capsys.readouterr().out) + assert "today" in out + assert "by_project" in out + assert "by_day" in out diff --git a/tests/test_whatsapp_keywords.py b/tests/test_whatsapp_keywords.py new file mode 100644 index 0000000..0559d37 --- /dev/null +++ b/tests/test_whatsapp_keywords.py @@ -0,0 +1,139 @@ +"""Tests for WhatsApp text-keyword commands → slash translation. + +Acoperă `_translate_whatsapp_text` și integrarea cu `_try_ralph_dispatch`: +- aprob / aprob +- stop +- stare / stare +- case-insensitive pe keyword +- Discord/Telegram NU sunt afectate +- propose intentionally NOT supported +""" +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import patch + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from src.router import _translate_whatsapp_text, _try_ralph_dispatch + + +# ── _translate_whatsapp_text (pure helper) ──────────────────────── + + +class TestTranslate: + def test_aprob_alone_lists_pending(self): + # `aprob` fără slug → /a (listează pending) + assert _translate_whatsapp_text("aprob") == "/a" + + def test_aprob_with_slug(self): + assert _translate_whatsapp_text("aprob roa2web") == "/a roa2web" + + def test_aprob_case_insensitive(self): + assert _translate_whatsapp_text("APROB roa2web") == "/a roa2web" + assert _translate_whatsapp_text("Aprob roa2web") == "/a roa2web" + + def test_stop_with_slug(self): + assert _translate_whatsapp_text("stop roa2web") == "/k roa2web" + + def test_stop_case_insensitive(self): + assert _translate_whatsapp_text("STOP roa2web") == "/k roa2web" + + def test_stop_alone_not_translated(self): + # `stop` fără slug poate fi colocvial → nu translatăm + assert _translate_whatsapp_text("stop") is None + + def test_stare_alone(self): + assert _translate_whatsapp_text("stare") == "/l" + + def test_stare_with_slug(self): + assert _translate_whatsapp_text("stare roa2web") == "/l roa2web" + + def test_stare_case_insensitive(self): + assert _translate_whatsapp_text("STARE") == "/l" + + def test_other_text_not_translated(self): + assert _translate_whatsapp_text("hello") is None + assert _translate_whatsapp_text("ce mai faci") is None + assert _translate_whatsapp_text("propose roa2web descriere") is None + # Slash commands pass through unchanged (None — don't override) + assert _translate_whatsapp_text("/a") is None + + def test_empty_input(self): + assert _translate_whatsapp_text("") is None + assert _translate_whatsapp_text(" ") is None + + def test_propose_not_covered(self): + # Verifică explicit că nu acoperim propose (descrierea fragilă) + assert _translate_whatsapp_text("propose foo bar baz") is None + assert _translate_whatsapp_text("propune foo bar baz") is None + + +# ── Integration: _try_ralph_dispatch with adapter_name ──────────── + + +class TestDispatchIntegration: + def test_whatsapp_aprob_routes_to_approve(self): + # `aprob` pe whatsapp → trebuie să intre în Ralph dispatch + with patch("src.router._ralph_approve") as mock: + mock.return_value = "ok" + result = _try_ralph_dispatch("aprob foo", adapter_name="whatsapp") + assert result == "ok" + mock.assert_called_once_with(["foo"]) + + def test_whatsapp_stop_routes_to_stop(self): + with patch("src.router._ralph_stop") as mock: + mock.return_value = "stopped" + result = _try_ralph_dispatch("stop foo", adapter_name="whatsapp") + assert result == "stopped" + mock.assert_called_once_with("foo") + + def test_whatsapp_stare_routes_to_status(self): + with patch("src.router._ralph_status") as mock: + mock.return_value = "status" + result = _try_ralph_dispatch("stare", adapter_name="whatsapp") + # Status returnează cu redirect hint pe whatsapp + assert "status" in result + mock.assert_called_once_with(None) + + def test_whatsapp_stare_with_slug(self): + with patch("src.router._ralph_status") as mock: + mock.return_value = "status" + _try_ralph_dispatch("stare roa2web", adapter_name="whatsapp") + mock.assert_called_once_with("roa2web") + + def test_discord_keyword_not_translated(self): + # Pe Discord, "stop foo" NU ar trebui să match — nu e adapter whatsapp + with patch("src.router._ralph_stop") as mock: + result = _try_ralph_dispatch("stop foo", adapter_name="discord") + assert result is None + mock.assert_not_called() + + def test_telegram_keyword_not_translated(self): + with patch("src.router._ralph_approve") as mock: + result = _try_ralph_dispatch("aprob foo", adapter_name="telegram") + assert result is None + mock.assert_not_called() + + def test_no_adapter_keyword_not_translated(self): + # adapter_name=None → nu e whatsapp → no translation + with patch("src.router._ralph_approve") as mock: + result = _try_ralph_dispatch("aprob foo", adapter_name=None) + assert result is None + mock.assert_not_called() + + def test_whatsapp_slash_command_still_works(self): + # Slash-uri normale pe WhatsApp NU trebuie sparte de translation + with patch("src.router._ralph_approve") as mock: + mock.return_value = "ok" + result = _try_ralph_dispatch("/a foo", adapter_name="whatsapp") + assert result == "ok" + mock.assert_called_once_with(["foo"]) + + def test_whatsapp_chat_message_passthrough(self): + # Mesajul normal pe whatsapp (fără keyword) → None (cade pe Claude) + result = _try_ralph_dispatch("hello echo, ce mai faci", adapter_name="whatsapp") + assert result is None diff --git a/tools/ralph/ralph.sh b/tools/ralph/ralph.sh index f30e103..08b2d28 100755 --- a/tools/ralph/ralph.sh +++ b/tools/ralph/ralph.sh @@ -39,6 +39,21 @@ else DAG_HELPER="" fi +# Usage helper auto-detect (rate limit budget tracking — best effort, niciodată +# blochează rularea Ralph dacă lipsește) +if [ -n "$RALPH_USAGE_HELPER" ] && [ -f "$RALPH_USAGE_HELPER" ]; then + USAGE_HELPER="$RALPH_USAGE_HELPER" +elif [ -f "/home/moltbot/echo-core/tools/ralph_usage.py" ]; then + USAGE_HELPER="/home/moltbot/echo-core/tools/ralph_usage.py" +elif [ -f "/home/moltbot/echo-core-instr/tools/ralph_usage.py" ]; then + USAGE_HELPER="/home/moltbot/echo-core-instr/tools/ralph_usage.py" +elif [ -f "$SCRIPT_DIR/ralph_usage.py" ]; then + USAGE_HELPER="$SCRIPT_DIR/ralph_usage.py" +else + USAGE_HELPER="" +fi +USAGE_FILE="$SCRIPT_DIR/usage.jsonl" + # Verifică că jq este instalat if ! command -v jq &> /dev/null; then echo "Eroare: jq nu este instalat. Rulează: apt install jq" @@ -292,6 +307,15 @@ EOF set -e OUTPUT=$(cat "$LOG_FILE") + # Rate limit budget tracking (best-effort, never blocks Ralph) + if [ -n "$USAGE_HELPER" ]; then + "$RALPH_PYTHON" "$USAGE_HELPER" append \ + "$USAGE_FILE" "$LOG_FILE" \ + --slug "$PROJECT_NAME" \ + --story-id "$CURRENT_STORY" \ + --iter "$i" 2>/dev/null || true + fi + # W3: rate limit detection (max 1 retry per rulare) if is_rate_limited "$OUTPUT" || [ "$CLAUDE_EXIT" = "29" ]; then if [ "$RATE_LIMIT_RETRY_USED" = "0" ]; then diff --git a/tools/ralph_usage.py b/tools/ralph_usage.py new file mode 100755 index 0000000..587da20 --- /dev/null +++ b/tools/ralph_usage.py @@ -0,0 +1,384 @@ +#!/usr/bin/env python3 +"""Ralph usage tracking — rate limit budget MVP. + +Two responsabilități: +1. **Pure functions** (testable, no side-effects): parse usage JSONL, aggregate + by day / by project, summarize for dashboard. +2. **CLI subcommands** (chemate din `tools/ralph/ralph.sh` după fiecare iter): + atomic append usage entry derivat din `claude -p --output-format json`. + +JSON envelope produs de `claude -p --output-format json`: + + { + "type": "result", + "subtype": "completed" | "error_max_turns" | ..., + "session_id": "...", + "result": "...", + "is_error": false, + "total_cost_usd": 0.55, + "duration_ms": 49000, + "num_turns": 5, + "usage": { + "input_tokens": 1234, + "output_tokens": 567, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 890 + }, + "model": "claude-opus-4-7-...", // poate lipsi + ... + } + +Usage entry shape (one per JSONL line): + + { + "ts": "2026-04-26T12:00:00+00:00", + "slug": "roa2web", + "story_id": "US-001", // null dacă necunoscut + "iter": 3, // null dacă necunoscut + "total_cost_usd": 0.55, + "input_tokens": 1234, + "output_tokens": 567, + "cache_read": 890, + "model": "claude-opus-4-7-...", + "duration_ms": 49000 + } + +CLI subcommands: + + python3 ralph_usage.py append \\ + --slug [--story-id ] [--iter ] + → parse claude_log, atomic append entry in usage_jsonl. Idempotent + la JSON corupt (no-op + exit 0). + + python3 ralph_usage.py summarize [--days N] + → print JSON summary {today_cost, today_runs, by_project, by_day, ...}. +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +import tempfile +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable + + +# --------------------------------------------------------------------------- +# Pure functions — extract / parse / aggregate +# --------------------------------------------------------------------------- + + +def extract_usage_entry( + claude_json: dict | str, + *, + slug: str, + story_id: str | None = None, + iter_n: int | None = None, + ts: str | None = None, +) -> dict | None: + """Build a usage entry from a claude -p JSON envelope. + + Acceptă dict deja parsat sau raw string. Returnează None dacă inputul + nu poate fi parsat sau nu e un dict (anti-corruption). + + Pure: no I/O, no side effects. + """ + if isinstance(claude_json, str): + try: + claude_json = json.loads(claude_json) + except (json.JSONDecodeError, TypeError, ValueError): + return None + if not isinstance(claude_json, dict): + return None + + usage = claude_json.get("usage") or {} + if not isinstance(usage, dict): + usage = {} + + return { + "ts": ts or datetime.now(timezone.utc).isoformat(timespec="seconds"), + "slug": slug, + "story_id": story_id if story_id else None, + "iter": int(iter_n) if iter_n is not None else None, + "total_cost_usd": _coerce_float(claude_json.get("total_cost_usd")), + "input_tokens": _coerce_int(usage.get("input_tokens")), + "output_tokens": _coerce_int(usage.get("output_tokens")), + "cache_read": _coerce_int( + usage.get("cache_read_input_tokens") or usage.get("cache_read") or 0 + ), + "model": str(claude_json.get("model") or "") or None, + "duration_ms": _coerce_int(claude_json.get("duration_ms")), + } + + +def _coerce_float(v: Any) -> float: + try: + return float(v) if v is not None else 0.0 + except (TypeError, ValueError): + return 0.0 + + +def _coerce_int(v: Any) -> int: + try: + return int(v) if v is not None else 0 + except (TypeError, ValueError): + return 0 + + +def parse_usage_jsonl(path: Path | str) -> list[dict]: + """Read a JSONL file of usage entries. Skip corrupt lines silently. + + Pure-ish (file I/O scoped to the path; no global state mutation). + Întoarce listă goală dacă fișierul lipsește. + """ + p = Path(path) + if not p.exists(): + return [] + entries: list[dict] = [] + try: + with p.open(encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(obj, dict): + entries.append(obj) + except OSError: + return [] + return entries + + +def _entry_day(entry: dict) -> str: + """Extract YYYY-MM-DD from entry.ts. Robust la formate timezone-aware/naive.""" + ts = entry.get("ts") or "" + if not ts: + return "" + # Accept both `+00:00` and `Z`; the date prefix is the same. + return ts[:10] if len(ts) >= 10 else "" + + +def aggregate_by_day(entries: Iterable[dict]) -> dict[str, dict]: + """Aggregate usage by YYYY-MM-DD. + + Returnează `{"2026-04-26": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`. + Stabilizat sortat cronologic (descending) când dict-urile sunt iterate. + """ + buckets: dict[str, dict] = defaultdict( + lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0, + "output_tokens": 0, "cache_read": 0} + ) + for e in entries: + day = _entry_day(e) + if not day: + continue + b = buckets[day] + b["cost_usd"] += _coerce_float(e.get("total_cost_usd")) + b["runs"] += 1 + b["input_tokens"] += _coerce_int(e.get("input_tokens")) + b["output_tokens"] += _coerce_int(e.get("output_tokens")) + b["cache_read"] += _coerce_int(e.get("cache_read")) + # round cost to 4 decimals to reduce float noise in JSON dump + return { + d: {**v, "cost_usd": round(v["cost_usd"], 4)} + for d, v in sorted(buckets.items(), reverse=True) + } + + +def aggregate_by_project(entries: Iterable[dict]) -> dict[str, dict]: + """Aggregate usage by slug. + + Returnează `{"roa2web": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`. + """ + buckets: dict[str, dict] = defaultdict( + lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0, + "output_tokens": 0, "cache_read": 0} + ) + for e in entries: + slug = e.get("slug") or "unknown" + b = buckets[slug] + b["cost_usd"] += _coerce_float(e.get("total_cost_usd")) + b["runs"] += 1 + b["input_tokens"] += _coerce_int(e.get("input_tokens")) + b["output_tokens"] += _coerce_int(e.get("output_tokens")) + b["cache_read"] += _coerce_int(e.get("cache_read")) + return { + s: {**v, "cost_usd": round(v["cost_usd"], 4)} + for s, v in sorted(buckets.items()) + } + + +def filter_by_days(entries: Iterable[dict], days: int, *, today: str | None = None) -> list[dict]: + """Keep only entries with ts within last `days` days (today inclusive). + + `today` defaults to UTC current date (testabil prin override). + `days <= 0` → entries goale. + """ + if days <= 0: + return [] + today = today or datetime.now(timezone.utc).date().isoformat() + try: + today_dt = datetime.fromisoformat(today).date() + except ValueError: + return list(entries) + out = [] + for e in entries: + d = _entry_day(e) + if not d: + continue + try: + d_dt = datetime.fromisoformat(d).date() + except ValueError: + continue + delta = (today_dt - d_dt).days + if 0 <= delta < days: + out.append(e) + return out + + +def summarize( + entries: list[dict], + *, + days: int = 7, + today: str | None = None, +) -> dict: + """Build summary {today_cost, today_runs, by_project, by_day, total_cost, total_runs}. + + `today` defaults la UTC date curentă (override pentru teste). `by_day` + și `by_project` se calculează DOAR pe fereastra `days` (cele mai recente). + """ + today_str = today or datetime.now(timezone.utc).date().isoformat() + windowed = filter_by_days(entries, days, today=today_str) + today_entries = [e for e in entries if _entry_day(e) == today_str] + return { + "today": today_str, + "today_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in today_entries), 4), + "today_runs": len(today_entries), + "window_days": days, + "window_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in windowed), 4), + "window_runs": len(windowed), + "by_project": aggregate_by_project(windowed), + "by_day": aggregate_by_day(windowed), + "total_runs": len(entries), + "total_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in entries), 4), + } + + +# --------------------------------------------------------------------------- +# Atomic append (CLI side) — used from ralph.sh +# --------------------------------------------------------------------------- + + +def append_entry(usage_path: Path | str, entry: dict) -> None: + """Append a single entry as JSONL with atomic write semantics. + + Uses temp file rename to avoid concurrent-writer corruption (read-existing, + write-existing+new, atomic replace). NU folosim `open(..., 'a')` direct pentru + că poate fi tăiat la mijloc dacă procesul e killed. + """ + p = Path(usage_path) + p.parent.mkdir(parents=True, exist_ok=True) + + existing = "" + if p.exists(): + try: + existing = p.read_text(encoding="utf-8") + except OSError: + existing = "" + if existing and not existing.endswith("\n"): + existing += "\n" + + new_line = json.dumps(entry, ensure_ascii=False) + "\n" + + fd, tmp_path = tempfile.mkstemp(prefix=".usage_", suffix=".jsonl.tmp", dir=str(p.parent)) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(existing) + f.write(new_line) + os.replace(tmp_path, p) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def cmd_append(args: argparse.Namespace) -> int: + """Read claude JSON log, derive entry, atomic append. + + Idempotent la JSON corupt: dacă fișierul nu poate fi parsat, exit 0 + (nu vrem să spargem ralph.sh pentru un parse warning). + """ + log_path = Path(args.claude_log) + if not log_path.exists(): + print(f"warn: claude log missing: {log_path}", file=sys.stderr) + return 0 + try: + text = log_path.read_text(encoding="utf-8") + except OSError as exc: + print(f"warn: read failed: {exc}", file=sys.stderr) + return 0 + + entry = extract_usage_entry( + text, + slug=args.slug, + story_id=args.story_id or None, + iter_n=args.iter if args.iter is not None else None, + ) + if entry is None: + print(f"warn: claude log not parseable as JSON envelope; no usage entry written", file=sys.stderr) + return 0 + + try: + append_entry(args.usage_jsonl, entry) + except OSError as exc: + print(f"error: append failed: {exc}", file=sys.stderr) + return 1 + return 0 + + +def cmd_summarize(args: argparse.Namespace) -> int: + entries = parse_usage_jsonl(args.usage_jsonl) + summary = summarize(entries, days=args.days) + print(json.dumps(summary, indent=2, ensure_ascii=False)) + return 0 + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(prog="ralph_usage", description=__doc__) + sub = parser.add_subparsers(dest="cmd", required=True) + + sp_app = sub.add_parser("append", help="Atomic append usage entry from claude JSON log") + sp_app.add_argument("usage_jsonl", help="Path to usage.jsonl (will be created)") + sp_app.add_argument("claude_log", help="Path to claude -p JSON output log") + sp_app.add_argument("--slug", required=True) + sp_app.add_argument("--story-id", default="", dest="story_id") + sp_app.add_argument("--iter", type=int, default=None) + + sp_sum = sub.add_parser("summarize", help="Print JSON summary of usage") + sp_sum.add_argument("usage_jsonl", help="Path to usage.jsonl") + sp_sum.add_argument("--days", type=int, default=7) + + args = parser.parse_args(argv) + if args.cmd == "append": + return cmd_append(args) + if args.cmd == "summarize": + return cmd_summarize(args) + parser.print_help() + return 2 + + +if __name__ == "__main__": + sys.exit(main())