"""Backtest statistics for ``data/jurnal.csv``. Public API: - ``compute_stats(csv_path, overlay) -> dict`` - ``render_stats(stats, overlay) -> str`` - ``compute_calibration(csv_path) -> dict`` - ``render_calibration(cal) -> str`` - ``main()`` — CLI entry point. A "win" is a closed trade with ``pl_overlay > 0`` (where ``pl_overlay`` is either ``pl_marius`` or ``pl_theoretical``). Pending trades — ``pl_marius`` blank, i.e. ``outcome_path in {pending, TP0->pending}`` — are excluded from both WR and expectancy: there is no realised outcome yet. The ``calitate`` field is a known-biased descriptor: it is classified post-outcome (see ``STOPPING_RULE.md`` §3). The per-``calitate`` split is reported with an explicit *descriptor only — biased post-outcome* caveat. """ from __future__ import annotations import argparse import csv import math import sys from pathlib import Path from typing import Any, Iterable import numpy as np from scripts.append_row import CSV_COLUMNS __all__ = [ "BACKTEST_SOURCES", "CALIBRATION_SOURCES", "CORE_CALIBRATION_FIELDS", "NUMERIC_CALIBRATION_FIELDS", "STOPPING_RULE_N", "wilson_ci", "bootstrap_expectancy_ci", "compute_stats", "render_stats", "compute_calibration", "render_calibration", "main", ] # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- BACKTEST_SOURCES: frozenset[str] = frozenset({"vision", "manual"}) CALIBRATION_SOURCES: frozenset[str] = frozenset( {"manual_calibration", "vision_calibration"} ) # Calibration P4 gate (STOPPING_RULE.md §P4) — explicitly reported per field. CORE_CALIBRATION_FIELDS: tuple[str, ...] = ( "entry", "sl", "tp0", "tp1", "tp2", "outcome_path", "max_reached", "directie", "instrument", ) NUMERIC_CALIBRATION_FIELDS: frozenset[str] = frozenset( {"entry", "sl", "tp0", "tp1", "tp2"} ) # STOPPING_RULE.md §"GO LIVE" gate: N >= 40 per Set. STOPPING_RULE_N: int = 40 # --------------------------------------------------------------------------- # Loading # --------------------------------------------------------------------------- def _parse_optional_float(value: str) -> float | None: s = (value or "").strip() if s == "": return None try: return float(s) except ValueError: return None def _load_rows(csv_path: Path | str) -> list[dict[str, str]]: p = Path(csv_path) if not p.exists() or p.stat().st_size == 0: return [] with p.open("r", encoding="utf-8", newline="") as fh: return list(csv.DictReader(fh)) # --------------------------------------------------------------------------- # CI primitives # --------------------------------------------------------------------------- def wilson_ci(wins: int, n: int, z: float = 1.96) -> tuple[float, float]: """Wilson score interval for a binomial proportion. Returns ``(lo, hi)`` clamped to ``[0.0, 1.0]``. For ``n == 0`` returns ``(0.0, 0.0)``. ``z = 1.96`` ≈ 95% confidence. """ if n <= 0: return (0.0, 0.0) if wins < 0 or wins > n: raise ValueError(f"wins={wins} out of range for n={n}") p = wins / n denom = 1.0 + (z * z) / n center = (p + (z * z) / (2.0 * n)) / denom spread = z * math.sqrt(p * (1.0 - p) / n + (z * z) / (4.0 * n * n)) / denom return (max(0.0, center - spread), min(1.0, center + spread)) def bootstrap_expectancy_ci( values: list[float] | np.ndarray, n_resamples: int = 5000, seed: int = 42, ) -> tuple[float, float]: """Percentile-method bootstrap 95% CI for the mean of ``values``. Deterministic for a given ``seed``. Empty input → ``(0.0, 0.0)``. Single value → ``(value, value)`` (no variance to resample). """ arr = np.asarray(list(values), dtype=float) if arr.size == 0: return (0.0, 0.0) if arr.size == 1: v = float(arr[0]) return (v, v) rng = np.random.default_rng(seed) boots = np.empty(n_resamples, dtype=float) n = arr.size for i in range(n_resamples): idx = rng.integers(0, n, size=n) boots[i] = float(arr[idx].mean()) lo = float(np.percentile(boots, 2.5)) hi = float(np.percentile(boots, 97.5)) return (lo, hi) # --------------------------------------------------------------------------- # compute_stats # --------------------------------------------------------------------------- def _group_stats( overlay_values: list[float | None], *, include_ci: bool, bootstrap_seed: int, ) -> dict[str, Any]: closed = [v for v in overlay_values if v is not None] n = len(closed) wins = sum(1 for v in closed if v > 0) wr = (wins / n) if n else 0.0 out: dict[str, Any] = { "n": n, "wr": wr, "expectancy": (sum(closed) / n) if n else 0.0, } if include_ci: out["wr_ci_95"] = wilson_ci(wins, n) out["expectancy_ci_95"] = bootstrap_expectancy_ci( closed, seed=bootstrap_seed ) return out def _overlay_value(row: dict[str, str], overlay: str) -> float | None: raw = row.get(overlay, "") return _parse_optional_float(raw) def compute_stats( csv_path: Path | str = "data/jurnal.csv", overlay: str = "pl_marius", ) -> dict[str, Any]: """Compute aggregate WR + expectancy stats over the backtest rows. Calibration rows (``manual_calibration`` / ``vision_calibration``) are excluded; use :func:`compute_calibration` for the P4 mismatch report. ``overlay`` selects the P/L column: ``"pl_marius"`` (default — the real overlay Marius trades) or ``"pl_theoretical"`` (1/3-1/3-1/3 hold-to-TP2). """ if overlay not in {"pl_marius", "pl_theoretical"}: raise ValueError(f"unknown overlay {overlay!r}") rows = [r for r in _load_rows(csv_path) if r.get("source", "") in BACKTEST_SOURCES] if not rows: return { "n_total": 0, "n_pending": 0, "n_closed": 0, "wr": 0.0, "wr_ci_95": (0.0, 0.0), "expectancy": 0.0, "expectancy_ci_95": (0.0, 0.0), "per_set": {}, "per_calitate": {}, "per_directie": {}, } # Pending status is overlay-independent: a trade is pending iff # pl_marius is blank (outcome_path in {pending, TP0->pending}). # pl_theoretical is concrete even for pending rows, so it would otherwise # let pending trades sneak into the closed-trades stats — we mask those # out explicitly here. pending_mask = [_parse_optional_float(r.get("pl_marius", "")) is None for r in rows] overlay_vals: list[float | None] = [] for r, is_pending in zip(rows, pending_mask): overlay_vals.append(None if is_pending else _overlay_value(r, overlay)) n_total = len(rows) n_pending = sum(1 for p in pending_mask if p) n_closed = n_total - n_pending overall = _group_stats( overlay_vals, include_ci=True, bootstrap_seed=42 ) def _split(field: str, include_ci: bool) -> dict[str, dict[str, Any]]: groups: dict[str, list[float | None]] = {} for r, v in zip(rows, overlay_vals): key = r.get(field, "") or "(blank)" groups.setdefault(key, []).append(v) out: dict[str, dict[str, Any]] = {} for k in sorted(groups): sub_seed = 42 + (abs(hash(("split", field, k))) % 1_000_000) out[k] = _group_stats( groups[k], include_ci=include_ci, bootstrap_seed=sub_seed ) return out return { "n_total": n_total, "n_pending": n_pending, "n_closed": n_closed, "wr": overall["wr"], "wr_ci_95": overall["wr_ci_95"], "expectancy": overall["expectancy"], "expectancy_ci_95": overall["expectancy_ci_95"], "per_set": _split("set", include_ci=True), "per_calitate": _split("calitate", include_ci=True), # per_directie skips CI per spec (no wr_ci_95 / expectancy_ci_95 keys). "per_directie": { k: {"n": v["n"], "wr": v["wr"], "expectancy": v["expectancy"]} for k, v in _split("directie", include_ci=False).items() }, } # --------------------------------------------------------------------------- # render_stats # --------------------------------------------------------------------------- def _fmt_pct(p: float) -> str: return f"{100.0 * p:5.1f}%" def _fmt_r(x: float) -> str: return f"{x:+.2f} R" def _set_sort_key(name: str) -> tuple[int, str]: order = ["A1", "A2", "A3", "B", "C", "D", "Other"] return (order.index(name), name) if name in order else (len(order), name) def render_stats(stats: dict[str, Any], overlay: str) -> str: lines: list[str] = [] lines.append(f"=== Stats jurnal.csv (overlay: {overlay}) ===") lines.append( f"Trade-uri totale: {stats['n_total']} | " f"închise: {stats['n_closed']} | pending: {stats['n_pending']}" ) if stats["n_total"] == 0: lines.append("") lines.append("(nu sunt trade-uri backtest în CSV)") return "\n".join(lines) + "\n" lines.append("") lo, hi = stats["wr_ci_95"] e_lo, e_hi = stats["expectancy_ci_95"] lines.append(f"GLOBAL (n={stats['n_closed']}):") lines.append( f" WR: {_fmt_pct(stats['wr'])} " f"[95% CI: {_fmt_pct(lo)}, {_fmt_pct(hi)}]" ) lines.append( f" Expectancy: {_fmt_r(stats['expectancy'])} " f"[95% CI: {_fmt_r(e_lo)}, {_fmt_r(e_hi)}]" ) lines.append("") def _emit_split( title: str, data: dict[str, dict[str, Any]], *, sort_keys: list[str] | None = None, include_ci: bool = True, ) -> None: lines.append(title) keys = sort_keys if sort_keys is not None else sorted(data) for k in keys: if k not in data: continue d = data[k] if include_ci and "wr_ci_95" in d: clo, chi = d["wr_ci_95"] lines.append( f" {k:<14} n={d['n']:>3} " f"WR {_fmt_pct(d['wr'])} " f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] " f"E {_fmt_r(d['expectancy'])}" ) else: lines.append( f" {k:<14} n={d['n']:>3} " f"WR {_fmt_pct(d['wr'])} " f"E {_fmt_r(d['expectancy'])}" ) lines.append("") _emit_split( "PER SET:", stats["per_set"], sort_keys=sorted(stats["per_set"], key=_set_sort_key), ) lines.append( "PER CALITATE (⚠️ DESCRIPTOR ONLY — biased post-outcome, NU folosi ca filtru):" ) cal_order = ["Clară", "Mai mare ca impuls", "Slabă", "n/a"] keys = [k for k in cal_order if k in stats["per_calitate"]] + [ k for k in sorted(stats["per_calitate"]) if k not in cal_order ] for k in keys: d = stats["per_calitate"][k] clo, chi = d["wr_ci_95"] lines.append( f" {k:<20} n={d['n']:>3} " f"WR {_fmt_pct(d['wr'])} " f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] " f"E {_fmt_r(d['expectancy'])}" ) lines.append("") _emit_split("PER DIRECȚIE:", stats["per_directie"], include_ci=False) # STOPPING_RULE gate check — flag every Set that hasn't crossed N>=40. lines.append(f"⚠️ STOPPING RULE check (vezi STOPPING_RULE.md, N>={STOPPING_RULE_N}):") set_keys = sorted(stats["per_set"], key=_set_sort_key) any_flagged = False for k in set_keys: n = stats["per_set"][k]["n"] if n < STOPPING_RULE_N: lines.append(f" {k}: N={n} < {STOPPING_RULE_N} → NEEDS MORE DATA") any_flagged = True if not any_flagged: lines.append(f" toate Set-urile au N>={STOPPING_RULE_N} (eligibile pentru GO LIVE check).") return "\n".join(lines) + "\n" # --------------------------------------------------------------------------- # compute_calibration # --------------------------------------------------------------------------- def _calibration_match(field: str, m_val: str, v_val: str, tol: float = 0.01) -> bool: if field in NUMERIC_CALIBRATION_FIELDS: try: return abs(float(m_val) - float(v_val)) <= tol except ValueError: return (m_val or "").strip() == (v_val or "").strip() return (m_val or "").strip() == (v_val or "").strip() def compute_calibration( csv_path: Path | str = "data/jurnal.csv", ) -> dict[str, Any]: """Pair calibration legs by ``screenshot_file`` and report per-field mismatch. Returns a dict ``{"n_pairs": int, "fields": {field: {match, mismatch, match_rate, mismatch_examples}}}``. ``mismatch_examples`` holds up to 3 strings ``": manual=X vs vision=Y"`` per field. Numeric fields (``entry/sl/tp0/tp1/tp2``) use a tolerance of 0.01; everything else is exact-string equality after strip. """ rows = _load_rows(csv_path) manual: dict[str, dict[str, str]] = {} vision: dict[str, dict[str, str]] = {} for r in rows: src = r.get("source", "") if src == "manual_calibration": manual[r.get("screenshot_file", "")] = r elif src == "vision_calibration": vision[r.get("screenshot_file", "")] = r paired_files = sorted(set(manual) & set(vision)) fields_report: dict[str, dict[str, Any]] = { f: { "match": 0, "mismatch": 0, "match_rate": 0.0, "mismatch_examples": [], } for f in CORE_CALIBRATION_FIELDS } for f in paired_files: m = manual[f] v = vision[f] for fld in CORE_CALIBRATION_FIELDS: mv = m.get(fld, "") vv = v.get(fld, "") if _calibration_match(fld, mv, vv): fields_report[fld]["match"] += 1 else: fields_report[fld]["mismatch"] += 1 examples = fields_report[fld]["mismatch_examples"] if len(examples) < 3: examples.append(f"{f}: manual={mv!r} vs vision={vv!r}") for fld, data in fields_report.items(): total = data["match"] + data["mismatch"] data["match_rate"] = (data["match"] / total) if total else 0.0 return {"n_pairs": len(paired_files), "fields": fields_report} def render_calibration(cal: dict[str, Any]) -> str: lines: list[str] = [] lines.append("=== Calibration P4 gate (vezi STOPPING_RULE.md §P4) ===") lines.append(f"Perechi calibration: {cal['n_pairs']}") if cal["n_pairs"] == 0: lines.append("(nu există perechi manual_calibration ∩ vision_calibration)") return "\n".join(lines) + "\n" lines.append("") lines.append(f"{'field':<14} match mismatch rate") total_mismatches = 0 total_comparisons = 0 for fld in CORE_CALIBRATION_FIELDS: d = cal["fields"][fld] n = d["match"] + d["mismatch"] total_mismatches += d["mismatch"] total_comparisons += n lines.append( f"{fld:<14} {d['match']:>5} {d['mismatch']:>8} " f"{_fmt_pct(d['match_rate'])}" ) lines.append("") overall_match_rate = ( (total_comparisons - total_mismatches) / total_comparisons if total_comparisons else 0.0 ) overall_mismatch_rate = 1.0 - overall_match_rate verdict = "PASS" if overall_mismatch_rate <= 0.10 else "FAIL" lines.append( f"Overall mismatch rate: {_fmt_pct(overall_mismatch_rate)} " f"({total_mismatches}/{total_comparisons}) → P4 gate: {verdict}" ) has_examples = any( cal["fields"][f]["mismatch_examples"] for f in CORE_CALIBRATION_FIELDS ) if has_examples: lines.append("") lines.append("Mismatch examples (max 3 per field):") for fld in CORE_CALIBRATION_FIELDS: ex = cal["fields"][fld]["mismatch_examples"] if not ex: continue lines.append(f" [{fld}]") for e in ex: lines.append(f" - {e}") return "\n".join(lines) + "\n" # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( prog="stats", description="Backtest statistics for data/jurnal.csv", ) parser.add_argument( "--csv", type=Path, default=Path("data/jurnal.csv"), help="Path to the jurnal CSV (default: data/jurnal.csv).", ) parser.add_argument( "--overlay", choices=("pl_marius", "pl_theoretical"), default="pl_marius", help="Which P/L overlay to use (default: pl_marius).", ) parser.add_argument( "--calibration", action="store_true", help="Show P4 calibration mismatch report instead of backtest stats.", ) args = parser.parse_args(argv) try: sys.stdout.reconfigure(encoding="utf-8") # type: ignore[attr-defined] except (AttributeError, OSError): pass if args.calibration: cal = compute_calibration(args.csv) sys.stdout.write(render_calibration(cal)) else: stats = compute_stats(args.csv, overlay=args.overlay) sys.stdout.write(render_stats(stats, args.overlay)) return 0 if __name__ == "__main__": raise SystemExit(main()) # Ensure the canonical CSV schema is importable from one place — fail fast if # someone removes append_row.CSV_COLUMNS that this module depends on. assert CSV_COLUMNS is not None