Files
echo-core/tools/ralph_usage.py
Marius Mutu 3e7818286b feat(ralph): rate limit budget tracking + whatsapp text-keywords
Task #1 — Rate limit budget tracking MVP:
- tools/ralph_usage.py: pure functions (extract_usage_entry, parse_usage_jsonl,
  aggregate_by_day/_project, filter_by_days, summarize) + CLI append/summarize
  subcommands. Atomic write via temp+rename.
- tools/ralph/ralph.sh: după fiecare claude -p, append usage entry
  derivat din JSON envelope la <project>/scripts/ralph/usage.jsonl. Best-effort,
  niciodată blochează rularea (|| true).
- dashboard/handlers/ralph.py: GET /api/ralph/usage[?days=N] aggregează cross-
  project și returnează {today_cost, today_runs, by_project, by_day, ...}.

Task #2 — WhatsApp text-keyword commands:
- src/router.py: helper _translate_whatsapp_text mapează "aprob"/"stop <slug>"/
  "stare [<slug>]" → /a, /k, /l. Apelat DOAR pe adapter whatsapp în
  _try_ralph_dispatch (Discord/TG nu sunt afectate). NU acoperim propose
  intentionat — descrierea liberă e prea fragilă pentru parsing text-only.

Tests: 49 noi (test_ralph_usage 28 + test_whatsapp_keywords 21) + 4 noi în
test_dashboard_ralph_endpoint pentru /api/ralph/usage. Toate trec; regression
suite (test_router, test_router_planning, test_dashboard_ralph_endpoint,
test_whatsapp) — 90/90 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 19:05:50 +00:00

385 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""Ralph usage tracking — rate limit budget MVP.
Two responsabilități:
1. **Pure functions** (testable, no side-effects): parse usage JSONL, aggregate
by day / by project, summarize for dashboard.
2. **CLI subcommands** (chemate din `tools/ralph/ralph.sh` după fiecare iter):
atomic append usage entry derivat din `claude -p --output-format json`.
JSON envelope produs de `claude -p --output-format json`:
{
"type": "result",
"subtype": "completed" | "error_max_turns" | ...,
"session_id": "...",
"result": "...",
"is_error": false,
"total_cost_usd": 0.55,
"duration_ms": 49000,
"num_turns": 5,
"usage": {
"input_tokens": 1234,
"output_tokens": 567,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 890
},
"model": "claude-opus-4-7-...", // poate lipsi
...
}
Usage entry shape (one per JSONL line):
{
"ts": "2026-04-26T12:00:00+00:00",
"slug": "roa2web",
"story_id": "US-001", // null dacă necunoscut
"iter": 3, // null dacă necunoscut
"total_cost_usd": 0.55,
"input_tokens": 1234,
"output_tokens": 567,
"cache_read": 890,
"model": "claude-opus-4-7-...",
"duration_ms": 49000
}
CLI subcommands:
python3 ralph_usage.py append <usage_jsonl> <claude_log> \\
--slug <slug> [--story-id <id>] [--iter <N>]
→ parse claude_log, atomic append entry in usage_jsonl. Idempotent
la JSON corupt (no-op + exit 0).
python3 ralph_usage.py summarize <usage_jsonl> [--days N]
→ print JSON summary {today_cost, today_runs, by_project, by_day, ...}.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import tempfile
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
# ---------------------------------------------------------------------------
# Pure functions — extract / parse / aggregate
# ---------------------------------------------------------------------------
def extract_usage_entry(
claude_json: dict | str,
*,
slug: str,
story_id: str | None = None,
iter_n: int | None = None,
ts: str | None = None,
) -> dict | None:
"""Build a usage entry from a claude -p JSON envelope.
Acceptă dict deja parsat sau raw string. Returnează None dacă inputul
nu poate fi parsat sau nu e un dict (anti-corruption).
Pure: no I/O, no side effects.
"""
if isinstance(claude_json, str):
try:
claude_json = json.loads(claude_json)
except (json.JSONDecodeError, TypeError, ValueError):
return None
if not isinstance(claude_json, dict):
return None
usage = claude_json.get("usage") or {}
if not isinstance(usage, dict):
usage = {}
return {
"ts": ts or datetime.now(timezone.utc).isoformat(timespec="seconds"),
"slug": slug,
"story_id": story_id if story_id else None,
"iter": int(iter_n) if iter_n is not None else None,
"total_cost_usd": _coerce_float(claude_json.get("total_cost_usd")),
"input_tokens": _coerce_int(usage.get("input_tokens")),
"output_tokens": _coerce_int(usage.get("output_tokens")),
"cache_read": _coerce_int(
usage.get("cache_read_input_tokens") or usage.get("cache_read") or 0
),
"model": str(claude_json.get("model") or "") or None,
"duration_ms": _coerce_int(claude_json.get("duration_ms")),
}
def _coerce_float(v: Any) -> float:
try:
return float(v) if v is not None else 0.0
except (TypeError, ValueError):
return 0.0
def _coerce_int(v: Any) -> int:
try:
return int(v) if v is not None else 0
except (TypeError, ValueError):
return 0
def parse_usage_jsonl(path: Path | str) -> list[dict]:
"""Read a JSONL file of usage entries. Skip corrupt lines silently.
Pure-ish (file I/O scoped to the path; no global state mutation).
Întoarce listă goală dacă fișierul lipsește.
"""
p = Path(path)
if not p.exists():
return []
entries: list[dict] = []
try:
with p.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(obj, dict):
entries.append(obj)
except OSError:
return []
return entries
def _entry_day(entry: dict) -> str:
"""Extract YYYY-MM-DD from entry.ts. Robust la formate timezone-aware/naive."""
ts = entry.get("ts") or ""
if not ts:
return ""
# Accept both `+00:00` and `Z`; the date prefix is the same.
return ts[:10] if len(ts) >= 10 else ""
def aggregate_by_day(entries: Iterable[dict]) -> dict[str, dict]:
"""Aggregate usage by YYYY-MM-DD.
Returnează `{"2026-04-26": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`.
Stabilizat sortat cronologic (descending) când dict-urile sunt iterate.
"""
buckets: dict[str, dict] = defaultdict(
lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0,
"output_tokens": 0, "cache_read": 0}
)
for e in entries:
day = _entry_day(e)
if not day:
continue
b = buckets[day]
b["cost_usd"] += _coerce_float(e.get("total_cost_usd"))
b["runs"] += 1
b["input_tokens"] += _coerce_int(e.get("input_tokens"))
b["output_tokens"] += _coerce_int(e.get("output_tokens"))
b["cache_read"] += _coerce_int(e.get("cache_read"))
# round cost to 4 decimals to reduce float noise in JSON dump
return {
d: {**v, "cost_usd": round(v["cost_usd"], 4)}
for d, v in sorted(buckets.items(), reverse=True)
}
def aggregate_by_project(entries: Iterable[dict]) -> dict[str, dict]:
"""Aggregate usage by slug.
Returnează `{"roa2web": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`.
"""
buckets: dict[str, dict] = defaultdict(
lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0,
"output_tokens": 0, "cache_read": 0}
)
for e in entries:
slug = e.get("slug") or "unknown"
b = buckets[slug]
b["cost_usd"] += _coerce_float(e.get("total_cost_usd"))
b["runs"] += 1
b["input_tokens"] += _coerce_int(e.get("input_tokens"))
b["output_tokens"] += _coerce_int(e.get("output_tokens"))
b["cache_read"] += _coerce_int(e.get("cache_read"))
return {
s: {**v, "cost_usd": round(v["cost_usd"], 4)}
for s, v in sorted(buckets.items())
}
def filter_by_days(entries: Iterable[dict], days: int, *, today: str | None = None) -> list[dict]:
"""Keep only entries with ts within last `days` days (today inclusive).
`today` defaults to UTC current date (testabil prin override).
`days <= 0` → entries goale.
"""
if days <= 0:
return []
today = today or datetime.now(timezone.utc).date().isoformat()
try:
today_dt = datetime.fromisoformat(today).date()
except ValueError:
return list(entries)
out = []
for e in entries:
d = _entry_day(e)
if not d:
continue
try:
d_dt = datetime.fromisoformat(d).date()
except ValueError:
continue
delta = (today_dt - d_dt).days
if 0 <= delta < days:
out.append(e)
return out
def summarize(
entries: list[dict],
*,
days: int = 7,
today: str | None = None,
) -> dict:
"""Build summary {today_cost, today_runs, by_project, by_day, total_cost, total_runs}.
`today` defaults la UTC date curentă (override pentru teste). `by_day`
și `by_project` se calculează DOAR pe fereastra `days` (cele mai recente).
"""
today_str = today or datetime.now(timezone.utc).date().isoformat()
windowed = filter_by_days(entries, days, today=today_str)
today_entries = [e for e in entries if _entry_day(e) == today_str]
return {
"today": today_str,
"today_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in today_entries), 4),
"today_runs": len(today_entries),
"window_days": days,
"window_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in windowed), 4),
"window_runs": len(windowed),
"by_project": aggregate_by_project(windowed),
"by_day": aggregate_by_day(windowed),
"total_runs": len(entries),
"total_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in entries), 4),
}
# ---------------------------------------------------------------------------
# Atomic append (CLI side) — used from ralph.sh
# ---------------------------------------------------------------------------
def append_entry(usage_path: Path | str, entry: dict) -> None:
"""Append a single entry as JSONL with atomic write semantics.
Uses temp file rename to avoid concurrent-writer corruption (read-existing,
write-existing+new, atomic replace). NU folosim `open(..., 'a')` direct pentru
că poate fi tăiat la mijloc dacă procesul e killed.
"""
p = Path(usage_path)
p.parent.mkdir(parents=True, exist_ok=True)
existing = ""
if p.exists():
try:
existing = p.read_text(encoding="utf-8")
except OSError:
existing = ""
if existing and not existing.endswith("\n"):
existing += "\n"
new_line = json.dumps(entry, ensure_ascii=False) + "\n"
fd, tmp_path = tempfile.mkstemp(prefix=".usage_", suffix=".jsonl.tmp", dir=str(p.parent))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(existing)
f.write(new_line)
os.replace(tmp_path, p)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def cmd_append(args: argparse.Namespace) -> int:
"""Read claude JSON log, derive entry, atomic append.
Idempotent la JSON corupt: dacă fișierul nu poate fi parsat, exit 0
(nu vrem să spargem ralph.sh pentru un parse warning).
"""
log_path = Path(args.claude_log)
if not log_path.exists():
print(f"warn: claude log missing: {log_path}", file=sys.stderr)
return 0
try:
text = log_path.read_text(encoding="utf-8")
except OSError as exc:
print(f"warn: read failed: {exc}", file=sys.stderr)
return 0
entry = extract_usage_entry(
text,
slug=args.slug,
story_id=args.story_id or None,
iter_n=args.iter if args.iter is not None else None,
)
if entry is None:
print(f"warn: claude log not parseable as JSON envelope; no usage entry written", file=sys.stderr)
return 0
try:
append_entry(args.usage_jsonl, entry)
except OSError as exc:
print(f"error: append failed: {exc}", file=sys.stderr)
return 1
return 0
def cmd_summarize(args: argparse.Namespace) -> int:
entries = parse_usage_jsonl(args.usage_jsonl)
summary = summarize(entries, days=args.days)
print(json.dumps(summary, indent=2, ensure_ascii=False))
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="ralph_usage", description=__doc__)
sub = parser.add_subparsers(dest="cmd", required=True)
sp_app = sub.add_parser("append", help="Atomic append usage entry from claude JSON log")
sp_app.add_argument("usage_jsonl", help="Path to usage.jsonl (will be created)")
sp_app.add_argument("claude_log", help="Path to claude -p JSON output log")
sp_app.add_argument("--slug", required=True)
sp_app.add_argument("--story-id", default="", dest="story_id")
sp_app.add_argument("--iter", type=int, default=None)
sp_sum = sub.add_parser("summarize", help="Print JSON summary of usage")
sp_sum.add_argument("usage_jsonl", help="Path to usage.jsonl")
sp_sum.add_argument("--days", type=int, default=7)
args = parser.parse_args(argv)
if args.cmd == "append":
return cmd_append(args)
if args.cmd == "summarize":
return cmd_summarize(args)
parser.print_help()
return 2
if __name__ == "__main__":
sys.exit(main())