Merge branch 'ralph/instrumentation' — rate limit budget + WhatsApp keywords

Rate limit budget tracking (TODO P2):
- tools/ralph_usage.py — pure functions extract/parse/aggregate; CLI subcomenzi
  append/summarize. Atomic write JSONL.
- tools/ralph/ralph.sh: după fiecare claude -p, append usage entry la
  workspace/<slug>/scripts/ralph/usage.jsonl (best-effort)
- dashboard/handlers/ralph.py: GET /api/ralph/usage[?days=N] cross-project
  aggregation cu today_cost, today_runs, by_project, by_day

WhatsApp text-keyword commands (TODO P3):
- src/router.py: helper _translate_whatsapp_text — `aprob <slug>` → `/a <slug>`,
  `stop <slug>` → `/k <slug>`, `stare`/`stare <slug>` → `/l`/`/l <slug>`. Aplicat
  DOAR pe adapter whatsapp în _try_ralph_dispatch (Discord/TG nu sunt afectate).
  Propose intentionally NOT covered (descrierea fragilă).

Tests: 53 noi (28 ralph_usage + 21 whatsapp_keywords + 4 dashboard endpoint extend)
+ 0 regressions pe modulele atinse.
This commit is contained in:
2026-04-26 19:12:43 +00:00
8 changed files with 1072 additions and 1 deletions

View File

@@ -163,6 +163,8 @@ class TaskBoardHandler(
self.handle_eco_doctor()
elif self.path == '/api/ralph/status' or self.path.startswith('/api/ralph/status?'):
self.handle_ralph_status()
elif self.path == '/api/ralph/usage' or self.path.startswith('/api/ralph/usage?'):
self.handle_ralph_usage()
elif self.path.startswith('/api/ralph/'):
# /api/ralph/<slug>/log or /api/ralph/<slug>/prd
parts = self.path.split('?', 1)[0].split('/')

View File

@@ -1,9 +1,10 @@
"""Ralph live dashboard endpoints (W3).
"""Ralph live dashboard endpoints (W3 + instrumentation).
Endpoints:
GET /api/ralph/status — toate proiectele Ralph (cards data)
GET /api/ralph/<slug>/log — tail progress.txt (default 100 lines)
GET /api/ralph/<slug>/prd — full prd.json content
GET /api/ralph/usage[?days=N] — rate limit budget summary (cross-project)
POST /api/ralph/<slug>/stop — SIGTERM la Ralph PID
Polling: 5s din ralph.html (suficient pentru iter 8-15min Ralph).
@@ -14,18 +15,30 @@ Citește status din `~/workspace/<slug>/scripts/ralph/`:
- progress.txt → log human-readable
- logs/iteration-*.log → mtime ultimului iter
- .ralph.pid → PID activ (verificat cu os.kill 0)
- usage.jsonl → token/cost log per iter (instrumentation MVP)
Reuse path constants din `dashboard/constants.py` (WORKSPACE_DIR).
"""
import json
import os
import signal
import sys
from datetime import datetime
from pathlib import Path
from urllib.parse import unquote
import constants
# Best-effort import of pure functions for /api/ralph/usage (instrumentation MVP).
# Helper lives at <repo>/tools/ralph_usage.py — sibling of `dashboard/`.
_TOOLS_DIR = Path(__file__).resolve().parents[2] / "tools"
if str(_TOOLS_DIR) not in sys.path:
sys.path.insert(0, str(_TOOLS_DIR))
try:
import ralph_usage # type: ignore
except ImportError: # pragma: no cover — diagnostic only
ralph_usage = None # type: ignore
# Path Ralph per proiect (mereu în scripts/ralph/)
def _ralph_dir(project_dir: Path) -> Path:
@@ -259,6 +272,58 @@ class RalphHandlers:
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/usage (GET) ─────────────────────────────────
def handle_ralph_usage(self):
"""Returnează rate limit budget summary cross-project.
Citește toate `~/workspace/<slug>/scripts/ralph/usage.jsonl`, le concatenează,
rulează `ralph_usage.summarize` cu `?days=N` (default 7).
Răspuns:
{
"today": "YYYY-MM-DD",
"today_cost": float,
"today_runs": int,
"window_days": N,
"window_cost": float,
"window_runs": int,
"by_project": {...},
"by_day": {...},
"total_cost": float,
"total_runs": int
}
"""
try:
from urllib.parse import parse_qs, urlparse
qs = parse_qs(urlparse(self.path).query)
try:
days = int(qs.get("days", ["7"])[0])
if days <= 0:
days = 7
if days > 365:
days = 365
except ValueError:
days = 7
if ralph_usage is None:
self.send_json({"error": "ralph_usage helper unavailable"}, 500)
return
entries: list[dict] = []
if constants.WORKSPACE_DIR.exists():
for entry in sorted(constants.WORKSPACE_DIR.iterdir()):
if not entry.is_dir() or entry.name.startswith("."):
continue
usage_path = _ralph_dir(entry) / "usage.jsonl"
if usage_path.exists():
entries.extend(ralph_usage.parse_usage_jsonl(usage_path))
summary = ralph_usage.summarize(entries, days=days)
summary["fetchedAt"] = datetime.now().isoformat()
self.send_json(summary)
except Exception as exc:
self.send_json({"error": str(exc)}, 500)
# ── /api/ralph/<slug>/stop (POST) ──────────────────────────
def handle_ralph_stop(self, slug: str):
"""Trimite SIGTERM la Ralph PID. Verifică că PID-ul e în WORKSPACE_DIR."""

View File

@@ -241,8 +241,51 @@ def _maybe_whatsapp_redirect(text: str, adapter_name: str | None) -> str:
return text
def _translate_whatsapp_text(text: str) -> str | None:
"""Translate WhatsApp text-keyword commands to slash equivalents.
Acoperă **doar** keyword-urile robuste (single-token + opțional slug):
- `aprob` → `/a` (listează pending)
- `aprob <slug>` → `/a <slug>` (aprobă proiect)
- `stop <slug>` → `/k <slug>` (oprește Ralph)
- `stare` → `/l` (status global)
- `stare <slug>` → `/l <slug>` (status filtrat)
NU acoperă `propose` — descrierea liberă e prea fragilă pentru parsing
text-only (utilizatorii ar trimite descrieri multi-line care s-ar
interpreta greșit). Pentru propose, redirecționăm spre Discord/Telegram.
Returnează slash command translatat sau None dacă text-ul nu match.
Case-insensitive pe keyword (slug-ul rămâne ca în input).
Apelat DOAR pe adapter `whatsapp` în router (nu vrem ca un user pe
Discord să zică „stop" și să se întâmple ceva).
"""
if not text or not text.strip():
return None
parts = text.strip().split(None, 1)
keyword = parts[0].lower()
rest = parts[1].strip() if len(parts) > 1 else ""
if keyword == "aprob":
return f"/a {rest}".rstrip()
if keyword == "stop" and rest:
# `stop` fără slug ar putea fi colocvial („stop, am uitat ceva") — nu translatăm.
return f"/k {rest}"
if keyword == "stare":
return f"/l {rest}".rstrip()
return None
def _try_ralph_dispatch(text: str, adapter_name: str | None = None) -> str | None:
"""Parse and dispatch Ralph commands. Returns response string or None if no match."""
# WhatsApp keyword preprocessing — doar pe whatsapp, înainte de dispatch.
if adapter_name == "whatsapp":
translated = _translate_whatsapp_text(text)
if translated is not None:
text = translated
low = text.lower()
first = low.split(None, 1)[0] if low else ""

View File

@@ -186,6 +186,54 @@ class TestPrd:
assert handler.captured_code == 400
# ── /api/ralph/usage ────────────────────────────────────────────
class TestUsageEndpoint:
def test_usage_empty_workspace(self, handler):
handler.path = "/api/ralph/usage"
handler.handle_ralph_usage()
assert handler.captured_code == 200
assert handler.captured["today_runs"] == 0
assert handler.captured["total_runs"] == 0
assert handler.captured["by_project"] == {}
def test_usage_aggregates_across_projects(self, handler, tmp_path):
# Create two projects, each with usage.jsonl
for slug, cost, ts in [("proj-a", 0.5, "2026-04-26T10:00:00+00:00"),
("proj-b", 0.3, "2026-04-26T11:00:00+00:00")]:
ralph_dir = tmp_path / slug / "scripts" / "ralph"
ralph_dir.mkdir(parents=True)
(ralph_dir / "usage.jsonl").write_text(
json.dumps({"slug": slug, "ts": ts, "total_cost_usd": cost,
"input_tokens": 100, "output_tokens": 50, "cache_read": 0}) + "\n",
encoding="utf-8",
)
handler.path = "/api/ralph/usage?days=30"
handler.handle_ralph_usage()
assert handler.captured_code == 200
# Should have both projects
assert "proj-a" in handler.captured["by_project"]
assert "proj-b" in handler.captured["by_project"]
assert handler.captured["total_runs"] == 2
assert handler.captured["window_runs"] == 2
def test_usage_invalid_days_falls_back(self, handler):
handler.path = "/api/ralph/usage?days=abc"
handler.handle_ralph_usage()
assert handler.captured_code == 200
assert handler.captured["window_days"] == 7
def test_usage_handles_corrupt_jsonl(self, handler, tmp_path):
# Project with corrupt usage.jsonl shouldn't 500
ralph_dir = tmp_path / "broken" / "scripts" / "ralph"
ralph_dir.mkdir(parents=True)
(ralph_dir / "usage.jsonl").write_text("not json\n", encoding="utf-8")
handler.path = "/api/ralph/usage"
handler.handle_ralph_usage()
assert handler.captured_code == 200
# ── _ralph_validate_slug ───────────────────────────────────────

366
tests/test_ralph_usage.py Normal file
View File

@@ -0,0 +1,366 @@
"""Tests for tools/ralph_usage.py — rate limit budget tracking.
Acoperă:
- extract_usage_entry: shape corect, missing fields, JSON corupt → None
- parse_usage_jsonl: skip linii corupte, file lipsă → []
- aggregate_by_day / aggregate_by_project: sume corecte, deduplicare
- filter_by_days: window inclusiv vs exclusiv
- summarize: today_cost/today_runs corecte
- append_entry: atomic write, JSONL roundtrip
- CLI append: idempotent la JSON corupt (no-op + exit 0)
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
TOOLS = PROJECT_ROOT / "tools"
if str(TOOLS) not in sys.path:
sys.path.insert(0, str(TOOLS))
import ralph_usage # noqa: E402
# ── Sample claude -p --output-format json envelopes ────────────────
def _claude_envelope(
*,
cost: float = 0.55,
input_tokens: int = 1234,
output_tokens: int = 567,
cache_read: int = 890,
duration_ms: int = 49000,
model: str = "claude-opus-4-7-20260101",
) -> dict:
return {
"type": "result",
"subtype": "completed",
"session_id": "abc123",
"result": "Story implementat",
"is_error": False,
"total_cost_usd": cost,
"duration_ms": duration_ms,
"num_turns": 5,
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": cache_read,
},
"model": model,
}
# ── extract_usage_entry ────────────────────────────────────────────
class TestExtractEntry:
def test_full_envelope_extracts_all_fields(self):
env = _claude_envelope()
entry = ralph_usage.extract_usage_entry(
env, slug="proj-a", story_id="US-001", iter_n=3,
ts="2026-04-26T12:00:00+00:00",
)
assert entry == {
"ts": "2026-04-26T12:00:00+00:00",
"slug": "proj-a",
"story_id": "US-001",
"iter": 3,
"total_cost_usd": 0.55,
"input_tokens": 1234,
"output_tokens": 567,
"cache_read": 890,
"model": "claude-opus-4-7-20260101",
"duration_ms": 49000,
}
def test_accepts_raw_string(self):
env = _claude_envelope()
entry = ralph_usage.extract_usage_entry(
json.dumps(env), slug="x", story_id=None, iter_n=None,
ts="2026-04-26T00:00:00+00:00",
)
assert entry is not None
assert entry["story_id"] is None
assert entry["iter"] is None
assert entry["total_cost_usd"] == 0.55
def test_corrupt_json_returns_none(self):
assert ralph_usage.extract_usage_entry("{not json", slug="x") is None
assert ralph_usage.extract_usage_entry("", slug="x") is None
assert ralph_usage.extract_usage_entry("null", slug="x") is None
def test_missing_usage_field_zeros(self):
env = {"total_cost_usd": 0.1, "duration_ms": 1000}
entry = ralph_usage.extract_usage_entry(env, slug="x")
assert entry["input_tokens"] == 0
assert entry["output_tokens"] == 0
assert entry["cache_read"] == 0
assert entry["model"] is None
def test_missing_cost_defaults_zero(self):
env = {"usage": {"input_tokens": 100}}
entry = ralph_usage.extract_usage_entry(env, slug="x")
assert entry["total_cost_usd"] == 0.0
assert entry["input_tokens"] == 100
def test_non_dict_returns_none(self):
assert ralph_usage.extract_usage_entry([], slug="x") is None
assert ralph_usage.extract_usage_entry(123, slug="x") is None
def test_alternative_cache_field_name(self):
# Defensive: dacă viitor schema folosește `cache_read`
env = {"usage": {"cache_read": 42}, "total_cost_usd": 0.1}
entry = ralph_usage.extract_usage_entry(env, slug="x")
assert entry["cache_read"] == 42
# ── parse_usage_jsonl ──────────────────────────────────────────────
class TestParseJsonl:
def test_file_missing_returns_empty(self, tmp_path):
assert ralph_usage.parse_usage_jsonl(tmp_path / "ghost.jsonl") == []
def test_skips_corrupt_lines(self, tmp_path):
p = tmp_path / "u.jsonl"
p.write_text(
'{"slug": "a", "ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.1}\n'
"{not json}\n"
'{"slug": "b", "ts": "2026-04-26T01:00:00+00:00", "total_cost_usd": 0.2}\n'
"\n"
"[]\n", # not a dict
encoding="utf-8",
)
entries = ralph_usage.parse_usage_jsonl(p)
slugs = [e["slug"] for e in entries]
assert slugs == ["a", "b"]
def test_empty_file_returns_empty(self, tmp_path):
p = tmp_path / "u.jsonl"
p.write_text("", encoding="utf-8")
assert ralph_usage.parse_usage_jsonl(p) == []
# ── aggregate_by_day / aggregate_by_project ───────────────────────
class TestAggregate:
@pytest.fixture
def entries(self):
return [
{"slug": "proj-a", "ts": "2026-04-26T10:00:00+00:00",
"total_cost_usd": 0.5, "input_tokens": 100, "output_tokens": 50, "cache_read": 200},
{"slug": "proj-a", "ts": "2026-04-26T11:00:00+00:00",
"total_cost_usd": 0.3, "input_tokens": 80, "output_tokens": 30, "cache_read": 100},
{"slug": "proj-b", "ts": "2026-04-25T22:00:00+00:00",
"total_cost_usd": 1.2, "input_tokens": 500, "output_tokens": 200, "cache_read": 0},
]
def test_aggregate_by_day(self, entries):
result = ralph_usage.aggregate_by_day(entries)
assert result["2026-04-26"]["cost_usd"] == 0.8
assert result["2026-04-26"]["runs"] == 2
assert result["2026-04-26"]["input_tokens"] == 180
assert result["2026-04-26"]["output_tokens"] == 80
assert result["2026-04-26"]["cache_read"] == 300
assert result["2026-04-25"]["cost_usd"] == 1.2
assert result["2026-04-25"]["runs"] == 1
# Sortare descrescătoare în iteration order
keys = list(result.keys())
assert keys == ["2026-04-26", "2026-04-25"]
def test_aggregate_by_project(self, entries):
result = ralph_usage.aggregate_by_project(entries)
assert result["proj-a"]["cost_usd"] == 0.8
assert result["proj-a"]["runs"] == 2
assert result["proj-b"]["cost_usd"] == 1.2
assert result["proj-b"]["runs"] == 1
def test_aggregate_handles_missing_slug(self):
entries = [{"ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.1}]
result = ralph_usage.aggregate_by_project(entries)
assert "unknown" in result
def test_aggregate_handles_missing_ts(self):
entries = [{"slug": "x", "total_cost_usd": 0.1}]
# Missing ts → skipped from by_day
result = ralph_usage.aggregate_by_day(entries)
assert result == {}
def test_aggregate_empty_entries(self):
assert ralph_usage.aggregate_by_day([]) == {}
assert ralph_usage.aggregate_by_project([]) == {}
# ── filter_by_days ─────────────────────────────────────────────────
class TestFilterByDays:
def test_window_inclusive_today(self):
entries = [
{"ts": "2026-04-26T00:00:00+00:00", "slug": "a"},
{"ts": "2026-04-25T00:00:00+00:00", "slug": "a"},
{"ts": "2026-04-20T00:00:00+00:00", "slug": "a"},
]
kept = ralph_usage.filter_by_days(entries, 7, today="2026-04-26")
# 7-day window inclusiv de la today: 2026-04-20 .. 2026-04-26
slugs = [e["ts"][:10] for e in kept]
assert slugs == ["2026-04-26", "2026-04-25", "2026-04-20"]
def test_window_exclusive_older(self):
entries = [
{"ts": "2026-04-26T00:00:00+00:00"},
{"ts": "2026-04-19T00:00:00+00:00"}, # 7 days before today → exclus
]
kept = ralph_usage.filter_by_days(entries, 7, today="2026-04-26")
assert len(kept) == 1
assert kept[0]["ts"] == "2026-04-26T00:00:00+00:00"
def test_zero_days_empty(self):
entries = [{"ts": "2026-04-26T00:00:00+00:00"}]
assert ralph_usage.filter_by_days(entries, 0, today="2026-04-26") == []
def test_corrupt_ts_skipped(self):
entries = [{"ts": "garbage"}]
assert ralph_usage.filter_by_days(entries, 7, today="2026-04-26") == []
# ── summarize ──────────────────────────────────────────────────────
class TestSummarize:
def test_summary_shape_and_today_split(self):
entries = [
{"ts": "2026-04-26T10:00:00+00:00", "slug": "a", "total_cost_usd": 0.5,
"input_tokens": 100, "output_tokens": 50, "cache_read": 0},
{"ts": "2026-04-26T11:00:00+00:00", "slug": "a", "total_cost_usd": 0.3,
"input_tokens": 80, "output_tokens": 30, "cache_read": 0},
{"ts": "2026-04-25T00:00:00+00:00", "slug": "b", "total_cost_usd": 1.0,
"input_tokens": 0, "output_tokens": 0, "cache_read": 0},
]
s = ralph_usage.summarize(entries, days=7, today="2026-04-26")
assert s["today"] == "2026-04-26"
assert s["today_cost"] == 0.8
assert s["today_runs"] == 2
assert s["window_days"] == 7
assert s["window_runs"] == 3
assert "by_project" in s
assert "by_day" in s
assert s["total_runs"] == 3
assert s["by_project"]["a"]["runs"] == 2
assert s["by_project"]["b"]["runs"] == 1
def test_summary_empty_entries(self):
s = ralph_usage.summarize([], days=7, today="2026-04-26")
assert s["today_cost"] == 0
assert s["today_runs"] == 0
assert s["by_project"] == {}
assert s["by_day"] == {}
assert s["total_runs"] == 0
# ── append_entry ───────────────────────────────────────────────────
class TestAppendEntry:
def test_append_creates_file_with_jsonl_format(self, tmp_path):
usage = tmp_path / "usage.jsonl"
entry = {"slug": "x", "ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.1}
ralph_usage.append_entry(usage, entry)
text = usage.read_text(encoding="utf-8")
assert text.endswith("\n")
loaded = json.loads(text.strip())
assert loaded == entry
def test_append_preserves_existing_entries(self, tmp_path):
usage = tmp_path / "usage.jsonl"
usage.write_text(
'{"slug": "a", "ts": "2026-04-25T00:00:00+00:00", "total_cost_usd": 0.5}\n',
encoding="utf-8",
)
ralph_usage.append_entry(usage, {"slug": "b", "ts": "2026-04-26T00:00:00+00:00",
"total_cost_usd": 0.3})
entries = ralph_usage.parse_usage_jsonl(usage)
assert len(entries) == 2
assert entries[0]["slug"] == "a"
assert entries[1]["slug"] == "b"
def test_append_handles_missing_trailing_newline(self, tmp_path):
usage = tmp_path / "usage.jsonl"
usage.write_text(
'{"slug": "a", "ts": "2026-04-25T00:00:00+00:00"}', # no trailing \n
encoding="utf-8",
)
ralph_usage.append_entry(usage, {"slug": "b", "ts": "2026-04-26T00:00:00+00:00"})
entries = ralph_usage.parse_usage_jsonl(usage)
assert [e["slug"] for e in entries] == ["a", "b"]
# ── CLI: append subcommand ─────────────────────────────────────────
class TestCliAppend:
def test_append_from_log_file(self, tmp_path):
log = tmp_path / "iter.log"
log.write_text(json.dumps(_claude_envelope(cost=0.42)), encoding="utf-8")
usage = tmp_path / "usage.jsonl"
rc = ralph_usage.main([
"append", str(usage), str(log),
"--slug", "proj-a",
"--story-id", "US-001",
"--iter", "3",
])
assert rc == 0
entries = ralph_usage.parse_usage_jsonl(usage)
assert len(entries) == 1
e = entries[0]
assert e["slug"] == "proj-a"
assert e["story_id"] == "US-001"
assert e["iter"] == 3
assert e["total_cost_usd"] == 0.42
def test_append_corrupt_log_no_op(self, tmp_path):
log = tmp_path / "iter.log"
log.write_text("not json", encoding="utf-8")
usage = tmp_path / "usage.jsonl"
rc = ralph_usage.main([
"append", str(usage), str(log),
"--slug", "proj-a",
])
# Idempotent: corrupt JSON → exit 0, no entry written
assert rc == 0
assert not usage.exists() or ralph_usage.parse_usage_jsonl(usage) == []
def test_append_missing_log_no_op(self, tmp_path):
usage = tmp_path / "usage.jsonl"
rc = ralph_usage.main([
"append", str(usage), str(tmp_path / "missing.log"),
"--slug", "x",
])
assert rc == 0
# ── CLI: summarize subcommand ──────────────────────────────────────
class TestCliSummarize:
def test_summarize_outputs_json(self, tmp_path, capsys):
usage = tmp_path / "usage.jsonl"
usage.write_text(
json.dumps({"slug": "x", "ts": "2026-04-26T00:00:00+00:00", "total_cost_usd": 0.5}) + "\n",
encoding="utf-8",
)
rc = ralph_usage.main(["summarize", str(usage), "--days", "7"])
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert "today" in out
assert "by_project" in out
assert "by_day" in out

View File

@@ -0,0 +1,139 @@
"""Tests for WhatsApp text-keyword commands → slash translation.
Acoperă `_translate_whatsapp_text` și integrarea cu `_try_ralph_dispatch`:
- aprob / aprob <slug>
- stop <slug>
- stare / stare <slug>
- case-insensitive pe keyword
- Discord/Telegram NU sunt afectate
- propose intentionally NOT supported
"""
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import patch
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from src.router import _translate_whatsapp_text, _try_ralph_dispatch
# ── _translate_whatsapp_text (pure helper) ────────────────────────
class TestTranslate:
def test_aprob_alone_lists_pending(self):
# `aprob` fără slug → /a (listează pending)
assert _translate_whatsapp_text("aprob") == "/a"
def test_aprob_with_slug(self):
assert _translate_whatsapp_text("aprob roa2web") == "/a roa2web"
def test_aprob_case_insensitive(self):
assert _translate_whatsapp_text("APROB roa2web") == "/a roa2web"
assert _translate_whatsapp_text("Aprob roa2web") == "/a roa2web"
def test_stop_with_slug(self):
assert _translate_whatsapp_text("stop roa2web") == "/k roa2web"
def test_stop_case_insensitive(self):
assert _translate_whatsapp_text("STOP roa2web") == "/k roa2web"
def test_stop_alone_not_translated(self):
# `stop` fără slug poate fi colocvial → nu translatăm
assert _translate_whatsapp_text("stop") is None
def test_stare_alone(self):
assert _translate_whatsapp_text("stare") == "/l"
def test_stare_with_slug(self):
assert _translate_whatsapp_text("stare roa2web") == "/l roa2web"
def test_stare_case_insensitive(self):
assert _translate_whatsapp_text("STARE") == "/l"
def test_other_text_not_translated(self):
assert _translate_whatsapp_text("hello") is None
assert _translate_whatsapp_text("ce mai faci") is None
assert _translate_whatsapp_text("propose roa2web descriere") is None
# Slash commands pass through unchanged (None — don't override)
assert _translate_whatsapp_text("/a") is None
def test_empty_input(self):
assert _translate_whatsapp_text("") is None
assert _translate_whatsapp_text(" ") is None
def test_propose_not_covered(self):
# Verifică explicit că nu acoperim propose (descrierea fragilă)
assert _translate_whatsapp_text("propose foo bar baz") is None
assert _translate_whatsapp_text("propune foo bar baz") is None
# ── Integration: _try_ralph_dispatch with adapter_name ────────────
class TestDispatchIntegration:
def test_whatsapp_aprob_routes_to_approve(self):
# `aprob` pe whatsapp → trebuie să intre în Ralph dispatch
with patch("src.router._ralph_approve") as mock:
mock.return_value = "ok"
result = _try_ralph_dispatch("aprob foo", adapter_name="whatsapp")
assert result == "ok"
mock.assert_called_once_with(["foo"])
def test_whatsapp_stop_routes_to_stop(self):
with patch("src.router._ralph_stop") as mock:
mock.return_value = "stopped"
result = _try_ralph_dispatch("stop foo", adapter_name="whatsapp")
assert result == "stopped"
mock.assert_called_once_with("foo")
def test_whatsapp_stare_routes_to_status(self):
with patch("src.router._ralph_status") as mock:
mock.return_value = "status"
result = _try_ralph_dispatch("stare", adapter_name="whatsapp")
# Status returnează cu redirect hint pe whatsapp
assert "status" in result
mock.assert_called_once_with(None)
def test_whatsapp_stare_with_slug(self):
with patch("src.router._ralph_status") as mock:
mock.return_value = "status"
_try_ralph_dispatch("stare roa2web", adapter_name="whatsapp")
mock.assert_called_once_with("roa2web")
def test_discord_keyword_not_translated(self):
# Pe Discord, "stop foo" NU ar trebui să match — nu e adapter whatsapp
with patch("src.router._ralph_stop") as mock:
result = _try_ralph_dispatch("stop foo", adapter_name="discord")
assert result is None
mock.assert_not_called()
def test_telegram_keyword_not_translated(self):
with patch("src.router._ralph_approve") as mock:
result = _try_ralph_dispatch("aprob foo", adapter_name="telegram")
assert result is None
mock.assert_not_called()
def test_no_adapter_keyword_not_translated(self):
# adapter_name=None → nu e whatsapp → no translation
with patch("src.router._ralph_approve") as mock:
result = _try_ralph_dispatch("aprob foo", adapter_name=None)
assert result is None
mock.assert_not_called()
def test_whatsapp_slash_command_still_works(self):
# Slash-uri normale pe WhatsApp NU trebuie sparte de translation
with patch("src.router._ralph_approve") as mock:
mock.return_value = "ok"
result = _try_ralph_dispatch("/a foo", adapter_name="whatsapp")
assert result == "ok"
mock.assert_called_once_with(["foo"])
def test_whatsapp_chat_message_passthrough(self):
# Mesajul normal pe whatsapp (fără keyword) → None (cade pe Claude)
result = _try_ralph_dispatch("hello echo, ce mai faci", adapter_name="whatsapp")
assert result is None

View File

@@ -39,6 +39,21 @@ else
DAG_HELPER=""
fi
# Usage helper auto-detect (rate limit budget tracking — best effort, niciodată
# blochează rularea Ralph dacă lipsește)
if [ -n "$RALPH_USAGE_HELPER" ] && [ -f "$RALPH_USAGE_HELPER" ]; then
USAGE_HELPER="$RALPH_USAGE_HELPER"
elif [ -f "/home/moltbot/echo-core/tools/ralph_usage.py" ]; then
USAGE_HELPER="/home/moltbot/echo-core/tools/ralph_usage.py"
elif [ -f "/home/moltbot/echo-core-instr/tools/ralph_usage.py" ]; then
USAGE_HELPER="/home/moltbot/echo-core-instr/tools/ralph_usage.py"
elif [ -f "$SCRIPT_DIR/ralph_usage.py" ]; then
USAGE_HELPER="$SCRIPT_DIR/ralph_usage.py"
else
USAGE_HELPER=""
fi
USAGE_FILE="$SCRIPT_DIR/usage.jsonl"
# Verifică că jq este instalat
if ! command -v jq &> /dev/null; then
echo "Eroare: jq nu este instalat. Rulează: apt install jq"
@@ -292,6 +307,15 @@ EOF
set -e
OUTPUT=$(cat "$LOG_FILE")
# Rate limit budget tracking (best-effort, never blocks Ralph)
if [ -n "$USAGE_HELPER" ]; then
"$RALPH_PYTHON" "$USAGE_HELPER" append \
"$USAGE_FILE" "$LOG_FILE" \
--slug "$PROJECT_NAME" \
--story-id "$CURRENT_STORY" \
--iter "$i" 2>/dev/null || true
fi
# W3: rate limit detection (max 1 retry per rulare)
if is_rate_limited "$OUTPUT" || [ "$CLAUDE_EXIT" = "29" ]; then
if [ "$RATE_LIMIT_RETRY_USED" = "0" ]; then

384
tools/ralph_usage.py Executable file
View File

@@ -0,0 +1,384 @@
#!/usr/bin/env python3
"""Ralph usage tracking — rate limit budget MVP.
Two responsabilități:
1. **Pure functions** (testable, no side-effects): parse usage JSONL, aggregate
by day / by project, summarize for dashboard.
2. **CLI subcommands** (chemate din `tools/ralph/ralph.sh` după fiecare iter):
atomic append usage entry derivat din `claude -p --output-format json`.
JSON envelope produs de `claude -p --output-format json`:
{
"type": "result",
"subtype": "completed" | "error_max_turns" | ...,
"session_id": "...",
"result": "...",
"is_error": false,
"total_cost_usd": 0.55,
"duration_ms": 49000,
"num_turns": 5,
"usage": {
"input_tokens": 1234,
"output_tokens": 567,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 890
},
"model": "claude-opus-4-7-...", // poate lipsi
...
}
Usage entry shape (one per JSONL line):
{
"ts": "2026-04-26T12:00:00+00:00",
"slug": "roa2web",
"story_id": "US-001", // null dacă necunoscut
"iter": 3, // null dacă necunoscut
"total_cost_usd": 0.55,
"input_tokens": 1234,
"output_tokens": 567,
"cache_read": 890,
"model": "claude-opus-4-7-...",
"duration_ms": 49000
}
CLI subcommands:
python3 ralph_usage.py append <usage_jsonl> <claude_log> \\
--slug <slug> [--story-id <id>] [--iter <N>]
→ parse claude_log, atomic append entry in usage_jsonl. Idempotent
la JSON corupt (no-op + exit 0).
python3 ralph_usage.py summarize <usage_jsonl> [--days N]
→ print JSON summary {today_cost, today_runs, by_project, by_day, ...}.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import tempfile
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
# ---------------------------------------------------------------------------
# Pure functions — extract / parse / aggregate
# ---------------------------------------------------------------------------
def extract_usage_entry(
claude_json: dict | str,
*,
slug: str,
story_id: str | None = None,
iter_n: int | None = None,
ts: str | None = None,
) -> dict | None:
"""Build a usage entry from a claude -p JSON envelope.
Acceptă dict deja parsat sau raw string. Returnează None dacă inputul
nu poate fi parsat sau nu e un dict (anti-corruption).
Pure: no I/O, no side effects.
"""
if isinstance(claude_json, str):
try:
claude_json = json.loads(claude_json)
except (json.JSONDecodeError, TypeError, ValueError):
return None
if not isinstance(claude_json, dict):
return None
usage = claude_json.get("usage") or {}
if not isinstance(usage, dict):
usage = {}
return {
"ts": ts or datetime.now(timezone.utc).isoformat(timespec="seconds"),
"slug": slug,
"story_id": story_id if story_id else None,
"iter": int(iter_n) if iter_n is not None else None,
"total_cost_usd": _coerce_float(claude_json.get("total_cost_usd")),
"input_tokens": _coerce_int(usage.get("input_tokens")),
"output_tokens": _coerce_int(usage.get("output_tokens")),
"cache_read": _coerce_int(
usage.get("cache_read_input_tokens") or usage.get("cache_read") or 0
),
"model": str(claude_json.get("model") or "") or None,
"duration_ms": _coerce_int(claude_json.get("duration_ms")),
}
def _coerce_float(v: Any) -> float:
try:
return float(v) if v is not None else 0.0
except (TypeError, ValueError):
return 0.0
def _coerce_int(v: Any) -> int:
try:
return int(v) if v is not None else 0
except (TypeError, ValueError):
return 0
def parse_usage_jsonl(path: Path | str) -> list[dict]:
"""Read a JSONL file of usage entries. Skip corrupt lines silently.
Pure-ish (file I/O scoped to the path; no global state mutation).
Întoarce listă goală dacă fișierul lipsește.
"""
p = Path(path)
if not p.exists():
return []
entries: list[dict] = []
try:
with p.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(obj, dict):
entries.append(obj)
except OSError:
return []
return entries
def _entry_day(entry: dict) -> str:
"""Extract YYYY-MM-DD from entry.ts. Robust la formate timezone-aware/naive."""
ts = entry.get("ts") or ""
if not ts:
return ""
# Accept both `+00:00` and `Z`; the date prefix is the same.
return ts[:10] if len(ts) >= 10 else ""
def aggregate_by_day(entries: Iterable[dict]) -> dict[str, dict]:
"""Aggregate usage by YYYY-MM-DD.
Returnează `{"2026-04-26": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`.
Stabilizat sortat cronologic (descending) când dict-urile sunt iterate.
"""
buckets: dict[str, dict] = defaultdict(
lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0,
"output_tokens": 0, "cache_read": 0}
)
for e in entries:
day = _entry_day(e)
if not day:
continue
b = buckets[day]
b["cost_usd"] += _coerce_float(e.get("total_cost_usd"))
b["runs"] += 1
b["input_tokens"] += _coerce_int(e.get("input_tokens"))
b["output_tokens"] += _coerce_int(e.get("output_tokens"))
b["cache_read"] += _coerce_int(e.get("cache_read"))
# round cost to 4 decimals to reduce float noise in JSON dump
return {
d: {**v, "cost_usd": round(v["cost_usd"], 4)}
for d, v in sorted(buckets.items(), reverse=True)
}
def aggregate_by_project(entries: Iterable[dict]) -> dict[str, dict]:
"""Aggregate usage by slug.
Returnează `{"roa2web": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`.
"""
buckets: dict[str, dict] = defaultdict(
lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0,
"output_tokens": 0, "cache_read": 0}
)
for e in entries:
slug = e.get("slug") or "unknown"
b = buckets[slug]
b["cost_usd"] += _coerce_float(e.get("total_cost_usd"))
b["runs"] += 1
b["input_tokens"] += _coerce_int(e.get("input_tokens"))
b["output_tokens"] += _coerce_int(e.get("output_tokens"))
b["cache_read"] += _coerce_int(e.get("cache_read"))
return {
s: {**v, "cost_usd": round(v["cost_usd"], 4)}
for s, v in sorted(buckets.items())
}
def filter_by_days(entries: Iterable[dict], days: int, *, today: str | None = None) -> list[dict]:
"""Keep only entries with ts within last `days` days (today inclusive).
`today` defaults to UTC current date (testabil prin override).
`days <= 0` → entries goale.
"""
if days <= 0:
return []
today = today or datetime.now(timezone.utc).date().isoformat()
try:
today_dt = datetime.fromisoformat(today).date()
except ValueError:
return list(entries)
out = []
for e in entries:
d = _entry_day(e)
if not d:
continue
try:
d_dt = datetime.fromisoformat(d).date()
except ValueError:
continue
delta = (today_dt - d_dt).days
if 0 <= delta < days:
out.append(e)
return out
def summarize(
entries: list[dict],
*,
days: int = 7,
today: str | None = None,
) -> dict:
"""Build summary {today_cost, today_runs, by_project, by_day, total_cost, total_runs}.
`today` defaults la UTC date curentă (override pentru teste). `by_day`
și `by_project` se calculează DOAR pe fereastra `days` (cele mai recente).
"""
today_str = today or datetime.now(timezone.utc).date().isoformat()
windowed = filter_by_days(entries, days, today=today_str)
today_entries = [e for e in entries if _entry_day(e) == today_str]
return {
"today": today_str,
"today_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in today_entries), 4),
"today_runs": len(today_entries),
"window_days": days,
"window_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in windowed), 4),
"window_runs": len(windowed),
"by_project": aggregate_by_project(windowed),
"by_day": aggregate_by_day(windowed),
"total_runs": len(entries),
"total_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in entries), 4),
}
# ---------------------------------------------------------------------------
# Atomic append (CLI side) — used from ralph.sh
# ---------------------------------------------------------------------------
def append_entry(usage_path: Path | str, entry: dict) -> None:
"""Append a single entry as JSONL with atomic write semantics.
Uses temp file rename to avoid concurrent-writer corruption (read-existing,
write-existing+new, atomic replace). NU folosim `open(..., 'a')` direct pentru
că poate fi tăiat la mijloc dacă procesul e killed.
"""
p = Path(usage_path)
p.parent.mkdir(parents=True, exist_ok=True)
existing = ""
if p.exists():
try:
existing = p.read_text(encoding="utf-8")
except OSError:
existing = ""
if existing and not existing.endswith("\n"):
existing += "\n"
new_line = json.dumps(entry, ensure_ascii=False) + "\n"
fd, tmp_path = tempfile.mkstemp(prefix=".usage_", suffix=".jsonl.tmp", dir=str(p.parent))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(existing)
f.write(new_line)
os.replace(tmp_path, p)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def cmd_append(args: argparse.Namespace) -> int:
"""Read claude JSON log, derive entry, atomic append.
Idempotent la JSON corupt: dacă fișierul nu poate fi parsat, exit 0
(nu vrem să spargem ralph.sh pentru un parse warning).
"""
log_path = Path(args.claude_log)
if not log_path.exists():
print(f"warn: claude log missing: {log_path}", file=sys.stderr)
return 0
try:
text = log_path.read_text(encoding="utf-8")
except OSError as exc:
print(f"warn: read failed: {exc}", file=sys.stderr)
return 0
entry = extract_usage_entry(
text,
slug=args.slug,
story_id=args.story_id or None,
iter_n=args.iter if args.iter is not None else None,
)
if entry is None:
print(f"warn: claude log not parseable as JSON envelope; no usage entry written", file=sys.stderr)
return 0
try:
append_entry(args.usage_jsonl, entry)
except OSError as exc:
print(f"error: append failed: {exc}", file=sys.stderr)
return 1
return 0
def cmd_summarize(args: argparse.Namespace) -> int:
entries = parse_usage_jsonl(args.usage_jsonl)
summary = summarize(entries, days=args.days)
print(json.dumps(summary, indent=2, ensure_ascii=False))
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="ralph_usage", description=__doc__)
sub = parser.add_subparsers(dest="cmd", required=True)
sp_app = sub.add_parser("append", help="Atomic append usage entry from claude JSON log")
sp_app.add_argument("usage_jsonl", help="Path to usage.jsonl (will be created)")
sp_app.add_argument("claude_log", help="Path to claude -p JSON output log")
sp_app.add_argument("--slug", required=True)
sp_app.add_argument("--story-id", default="", dest="story_id")
sp_app.add_argument("--iter", type=int, default=None)
sp_sum = sub.add_parser("summarize", help="Print JSON summary of usage")
sp_sum.add_argument("usage_jsonl", help="Path to usage.jsonl")
sp_sum.add_argument("--days", type=int, default=7)
args = parser.parse_args(argv)
if args.cmd == "append":
return cmd_append(args)
if args.cmd == "summarize":
return cmd_summarize(args)
parser.print_help()
return 2
if __name__ == "__main__":
sys.exit(main())