#!/usr/bin/env python3 """Ralph usage tracking — rate limit budget MVP. Two responsabilități: 1. **Pure functions** (testable, no side-effects): parse usage JSONL, aggregate by day / by project, summarize for dashboard. 2. **CLI subcommands** (chemate din `tools/ralph/ralph.sh` după fiecare iter): atomic append usage entry derivat din `claude -p --output-format json`. JSON envelope produs de `claude -p --output-format json`: { "type": "result", "subtype": "completed" | "error_max_turns" | ..., "session_id": "...", "result": "...", "is_error": false, "total_cost_usd": 0.55, "duration_ms": 49000, "num_turns": 5, "usage": { "input_tokens": 1234, "output_tokens": 567, "cache_creation_input_tokens": 0, "cache_read_input_tokens": 890 }, "model": "claude-opus-4-7-...", // poate lipsi ... } Usage entry shape (one per JSONL line): { "ts": "2026-04-26T12:00:00+00:00", "slug": "roa2web", "story_id": "US-001", // null dacă necunoscut "iter": 3, // null dacă necunoscut "total_cost_usd": 0.55, "input_tokens": 1234, "output_tokens": 567, "cache_read": 890, "model": "claude-opus-4-7-...", "duration_ms": 49000 } CLI subcommands: python3 ralph_usage.py append \\ --slug [--story-id ] [--iter ] → parse claude_log, atomic append entry in usage_jsonl. Idempotent la JSON corupt (no-op + exit 0). python3 ralph_usage.py summarize [--days N] → print JSON summary {today_cost, today_runs, by_project, by_day, ...}. """ from __future__ import annotations import argparse import json import os import sys import tempfile from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable # --------------------------------------------------------------------------- # Pure functions — extract / parse / aggregate # --------------------------------------------------------------------------- def extract_usage_entry( claude_json: dict | str, *, slug: str, story_id: str | None = None, iter_n: int | None = None, ts: str | None = None, ) -> dict | None: """Build a usage entry from a claude -p JSON envelope. Acceptă dict deja parsat sau raw string. Returnează None dacă inputul nu poate fi parsat sau nu e un dict (anti-corruption). Pure: no I/O, no side effects. """ if isinstance(claude_json, str): try: claude_json = json.loads(claude_json) except (json.JSONDecodeError, TypeError, ValueError): return None if not isinstance(claude_json, dict): return None usage = claude_json.get("usage") or {} if not isinstance(usage, dict): usage = {} return { "ts": ts or datetime.now(timezone.utc).isoformat(timespec="seconds"), "slug": slug, "story_id": story_id if story_id else None, "iter": int(iter_n) if iter_n is not None else None, "total_cost_usd": _coerce_float(claude_json.get("total_cost_usd")), "input_tokens": _coerce_int(usage.get("input_tokens")), "output_tokens": _coerce_int(usage.get("output_tokens")), "cache_read": _coerce_int( usage.get("cache_read_input_tokens") or usage.get("cache_read") or 0 ), "model": str(claude_json.get("model") or "") or None, "duration_ms": _coerce_int(claude_json.get("duration_ms")), } def _coerce_float(v: Any) -> float: try: return float(v) if v is not None else 0.0 except (TypeError, ValueError): return 0.0 def _coerce_int(v: Any) -> int: try: return int(v) if v is not None else 0 except (TypeError, ValueError): return 0 def parse_usage_jsonl(path: Path | str) -> list[dict]: """Read a JSONL file of usage entries. Skip corrupt lines silently. Pure-ish (file I/O scoped to the path; no global state mutation). Întoarce listă goală dacă fișierul lipsește. """ p = Path(path) if not p.exists(): return [] entries: list[dict] = [] try: with p.open(encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue if isinstance(obj, dict): entries.append(obj) except OSError: return [] return entries def _entry_day(entry: dict) -> str: """Extract YYYY-MM-DD from entry.ts. Robust la formate timezone-aware/naive.""" ts = entry.get("ts") or "" if not ts: return "" # Accept both `+00:00` and `Z`; the date prefix is the same. return ts[:10] if len(ts) >= 10 else "" def aggregate_by_day(entries: Iterable[dict]) -> dict[str, dict]: """Aggregate usage by YYYY-MM-DD. Returnează `{"2026-04-26": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`. Stabilizat sortat cronologic (descending) când dict-urile sunt iterate. """ buckets: dict[str, dict] = defaultdict( lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0, "output_tokens": 0, "cache_read": 0} ) for e in entries: day = _entry_day(e) if not day: continue b = buckets[day] b["cost_usd"] += _coerce_float(e.get("total_cost_usd")) b["runs"] += 1 b["input_tokens"] += _coerce_int(e.get("input_tokens")) b["output_tokens"] += _coerce_int(e.get("output_tokens")) b["cache_read"] += _coerce_int(e.get("cache_read")) # round cost to 4 decimals to reduce float noise in JSON dump return { d: {**v, "cost_usd": round(v["cost_usd"], 4)} for d, v in sorted(buckets.items(), reverse=True) } def aggregate_by_project(entries: Iterable[dict]) -> dict[str, dict]: """Aggregate usage by slug. Returnează `{"roa2web": {cost_usd, runs, input_tokens, output_tokens, cache_read}}`. """ buckets: dict[str, dict] = defaultdict( lambda: {"cost_usd": 0.0, "runs": 0, "input_tokens": 0, "output_tokens": 0, "cache_read": 0} ) for e in entries: slug = e.get("slug") or "unknown" b = buckets[slug] b["cost_usd"] += _coerce_float(e.get("total_cost_usd")) b["runs"] += 1 b["input_tokens"] += _coerce_int(e.get("input_tokens")) b["output_tokens"] += _coerce_int(e.get("output_tokens")) b["cache_read"] += _coerce_int(e.get("cache_read")) return { s: {**v, "cost_usd": round(v["cost_usd"], 4)} for s, v in sorted(buckets.items()) } def filter_by_days(entries: Iterable[dict], days: int, *, today: str | None = None) -> list[dict]: """Keep only entries with ts within last `days` days (today inclusive). `today` defaults to UTC current date (testabil prin override). `days <= 0` → entries goale. """ if days <= 0: return [] today = today or datetime.now(timezone.utc).date().isoformat() try: today_dt = datetime.fromisoformat(today).date() except ValueError: return list(entries) out = [] for e in entries: d = _entry_day(e) if not d: continue try: d_dt = datetime.fromisoformat(d).date() except ValueError: continue delta = (today_dt - d_dt).days if 0 <= delta < days: out.append(e) return out def summarize( entries: list[dict], *, days: int = 7, today: str | None = None, ) -> dict: """Build summary {today_cost, today_runs, by_project, by_day, total_cost, total_runs}. `today` defaults la UTC date curentă (override pentru teste). `by_day` și `by_project` se calculează DOAR pe fereastra `days` (cele mai recente). """ today_str = today or datetime.now(timezone.utc).date().isoformat() windowed = filter_by_days(entries, days, today=today_str) today_entries = [e for e in entries if _entry_day(e) == today_str] return { "today": today_str, "today_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in today_entries), 4), "today_runs": len(today_entries), "window_days": days, "window_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in windowed), 4), "window_runs": len(windowed), "by_project": aggregate_by_project(windowed), "by_day": aggregate_by_day(windowed), "total_runs": len(entries), "total_cost": round(sum(_coerce_float(e.get("total_cost_usd")) for e in entries), 4), } # --------------------------------------------------------------------------- # Atomic append (CLI side) — used from ralph.sh # --------------------------------------------------------------------------- def append_entry(usage_path: Path | str, entry: dict) -> None: """Append a single entry as JSONL with atomic write semantics. Uses temp file rename to avoid concurrent-writer corruption (read-existing, write-existing+new, atomic replace). NU folosim `open(..., 'a')` direct pentru că poate fi tăiat la mijloc dacă procesul e killed. """ p = Path(usage_path) p.parent.mkdir(parents=True, exist_ok=True) existing = "" if p.exists(): try: existing = p.read_text(encoding="utf-8") except OSError: existing = "" if existing and not existing.endswith("\n"): existing += "\n" new_line = json.dumps(entry, ensure_ascii=False) + "\n" fd, tmp_path = tempfile.mkstemp(prefix=".usage_", suffix=".jsonl.tmp", dir=str(p.parent)) try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(existing) f.write(new_line) os.replace(tmp_path, p) except BaseException: try: os.unlink(tmp_path) except OSError: pass raise # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def cmd_append(args: argparse.Namespace) -> int: """Read claude JSON log, derive entry, atomic append. Idempotent la JSON corupt: dacă fișierul nu poate fi parsat, exit 0 (nu vrem să spargem ralph.sh pentru un parse warning). """ log_path = Path(args.claude_log) if not log_path.exists(): print(f"warn: claude log missing: {log_path}", file=sys.stderr) return 0 try: text = log_path.read_text(encoding="utf-8") except OSError as exc: print(f"warn: read failed: {exc}", file=sys.stderr) return 0 entry = extract_usage_entry( text, slug=args.slug, story_id=args.story_id or None, iter_n=args.iter if args.iter is not None else None, ) if entry is None: print(f"warn: claude log not parseable as JSON envelope; no usage entry written", file=sys.stderr) return 0 try: append_entry(args.usage_jsonl, entry) except OSError as exc: print(f"error: append failed: {exc}", file=sys.stderr) return 1 return 0 def cmd_summarize(args: argparse.Namespace) -> int: entries = parse_usage_jsonl(args.usage_jsonl) summary = summarize(entries, days=args.days) print(json.dumps(summary, indent=2, ensure_ascii=False)) return 0 def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(prog="ralph_usage", description=__doc__) sub = parser.add_subparsers(dest="cmd", required=True) sp_app = sub.add_parser("append", help="Atomic append usage entry from claude JSON log") sp_app.add_argument("usage_jsonl", help="Path to usage.jsonl (will be created)") sp_app.add_argument("claude_log", help="Path to claude -p JSON output log") sp_app.add_argument("--slug", required=True) sp_app.add_argument("--story-id", default="", dest="story_id") sp_app.add_argument("--iter", type=int, default=None) sp_sum = sub.add_parser("summarize", help="Print JSON summary of usage") sp_sum.add_argument("usage_jsonl", help="Path to usage.jsonl") sp_sum.add_argument("--days", type=int, default=7) args = parser.parse_args(argv) if args.cmd == "append": return cmd_append(args) if args.cmd == "summarize": return cmd_summarize(args) parser.print_help() return 2 if __name__ == "__main__": sys.exit(main())