Files
atm-backtesting/scripts/stats.py

552 lines
18 KiB
Python

"""Backtest statistics for ``data/jurnal.csv``.
Public API:
- ``compute_stats(csv_path, overlay) -> dict``
- ``render_stats(stats, overlay) -> str``
- ``compute_calibration(csv_path) -> dict``
- ``render_calibration(cal) -> str``
- ``main()`` — CLI entry point.
A "win" is a closed trade with ``pl_overlay > 0`` (where ``pl_overlay`` is
either ``pl_marius`` or ``pl_theoretical``). Pending trades — ``pl_marius``
blank, i.e. ``outcome_path in {pending, TP0->pending}`` — are excluded from
both WR and expectancy: there is no realised outcome yet.
The ``calitate`` field is a known-biased descriptor: it is classified
post-outcome (see ``STOPPING_RULE.md`` §3). The per-``calitate`` split is
reported with an explicit *descriptor only — biased post-outcome* caveat.
"""
from __future__ import annotations
import argparse
import csv
import math
import sys
from pathlib import Path
from typing import Any, Iterable
import numpy as np
from scripts.append_row import CSV_COLUMNS
__all__ = [
"BACKTEST_SOURCES",
"CALIBRATION_SOURCES",
"CORE_CALIBRATION_FIELDS",
"NUMERIC_CALIBRATION_FIELDS",
"STOPPING_RULE_N",
"wilson_ci",
"bootstrap_expectancy_ci",
"compute_stats",
"render_stats",
"compute_calibration",
"render_calibration",
"main",
]
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BACKTEST_SOURCES: frozenset[str] = frozenset({"vision", "manual"})
CALIBRATION_SOURCES: frozenset[str] = frozenset(
{"manual_calibration", "vision_calibration"}
)
# Calibration P4 gate (STOPPING_RULE.md §P4) — explicitly reported per field.
CORE_CALIBRATION_FIELDS: tuple[str, ...] = (
"entry",
"sl",
"tp0",
"tp1",
"tp2",
"outcome_path",
"max_reached",
"directie",
"instrument",
)
NUMERIC_CALIBRATION_FIELDS: frozenset[str] = frozenset(
{"entry", "sl", "tp0", "tp1", "tp2"}
)
# STOPPING_RULE.md §"GO LIVE" gate: N >= 40 per Set.
STOPPING_RULE_N: int = 40
# ---------------------------------------------------------------------------
# Loading
# ---------------------------------------------------------------------------
def _parse_optional_float(value: str) -> float | None:
s = (value or "").strip()
if s == "":
return None
try:
return float(s)
except ValueError:
return None
def _load_rows(csv_path: Path | str) -> list[dict[str, str]]:
p = Path(csv_path)
if not p.exists() or p.stat().st_size == 0:
return []
with p.open("r", encoding="utf-8", newline="") as fh:
return list(csv.DictReader(fh))
# ---------------------------------------------------------------------------
# CI primitives
# ---------------------------------------------------------------------------
def wilson_ci(wins: int, n: int, z: float = 1.96) -> tuple[float, float]:
"""Wilson score interval for a binomial proportion.
Returns ``(lo, hi)`` clamped to ``[0.0, 1.0]``. For ``n == 0`` returns
``(0.0, 0.0)``. ``z = 1.96`` ≈ 95% confidence.
"""
if n <= 0:
return (0.0, 0.0)
if wins < 0 or wins > n:
raise ValueError(f"wins={wins} out of range for n={n}")
p = wins / n
denom = 1.0 + (z * z) / n
center = (p + (z * z) / (2.0 * n)) / denom
spread = z * math.sqrt(p * (1.0 - p) / n + (z * z) / (4.0 * n * n)) / denom
return (max(0.0, center - spread), min(1.0, center + spread))
def bootstrap_expectancy_ci(
values: list[float] | np.ndarray,
n_resamples: int = 5000,
seed: int = 42,
) -> tuple[float, float]:
"""Percentile-method bootstrap 95% CI for the mean of ``values``.
Deterministic for a given ``seed``. Empty input → ``(0.0, 0.0)``.
Single value → ``(value, value)`` (no variance to resample).
"""
arr = np.asarray(list(values), dtype=float)
if arr.size == 0:
return (0.0, 0.0)
if arr.size == 1:
v = float(arr[0])
return (v, v)
rng = np.random.default_rng(seed)
boots = np.empty(n_resamples, dtype=float)
n = arr.size
for i in range(n_resamples):
idx = rng.integers(0, n, size=n)
boots[i] = float(arr[idx].mean())
lo = float(np.percentile(boots, 2.5))
hi = float(np.percentile(boots, 97.5))
return (lo, hi)
# ---------------------------------------------------------------------------
# compute_stats
# ---------------------------------------------------------------------------
def _group_stats(
overlay_values: list[float | None],
*,
include_ci: bool,
bootstrap_seed: int,
) -> dict[str, Any]:
closed = [v for v in overlay_values if v is not None]
n = len(closed)
wins = sum(1 for v in closed if v > 0)
wr = (wins / n) if n else 0.0
out: dict[str, Any] = {
"n": n,
"wr": wr,
"expectancy": (sum(closed) / n) if n else 0.0,
}
if include_ci:
out["wr_ci_95"] = wilson_ci(wins, n)
out["expectancy_ci_95"] = bootstrap_expectancy_ci(
closed, seed=bootstrap_seed
)
return out
def _overlay_value(row: dict[str, str], overlay: str) -> float | None:
raw = row.get(overlay, "")
return _parse_optional_float(raw)
def compute_stats(
csv_path: Path | str = "data/jurnal.csv",
overlay: str = "pl_marius",
) -> dict[str, Any]:
"""Compute aggregate WR + expectancy stats over the backtest rows.
Calibration rows (``manual_calibration`` / ``vision_calibration``) are
excluded; use :func:`compute_calibration` for the P4 mismatch report.
``overlay`` selects the P/L column: ``"pl_marius"`` (default — the real
overlay Marius trades) or ``"pl_theoretical"`` (1/3-1/3-1/3 hold-to-TP2).
"""
if overlay not in {"pl_marius", "pl_theoretical"}:
raise ValueError(f"unknown overlay {overlay!r}")
rows = [r for r in _load_rows(csv_path) if r.get("source", "") in BACKTEST_SOURCES]
if not rows:
return {
"n_total": 0,
"n_pending": 0,
"n_closed": 0,
"wr": 0.0,
"wr_ci_95": (0.0, 0.0),
"expectancy": 0.0,
"expectancy_ci_95": (0.0, 0.0),
"per_set": {},
"per_calitate": {},
"per_directie": {},
}
# Pending status is overlay-independent: a trade is pending iff
# pl_marius is blank (outcome_path in {pending, TP0->pending}).
# pl_theoretical is concrete even for pending rows, so it would otherwise
# let pending trades sneak into the closed-trades stats — we mask those
# out explicitly here.
pending_mask = [_parse_optional_float(r.get("pl_marius", "")) is None for r in rows]
overlay_vals: list[float | None] = []
for r, is_pending in zip(rows, pending_mask):
overlay_vals.append(None if is_pending else _overlay_value(r, overlay))
n_total = len(rows)
n_pending = sum(1 for p in pending_mask if p)
n_closed = n_total - n_pending
overall = _group_stats(
overlay_vals, include_ci=True, bootstrap_seed=42
)
def _split(field: str, include_ci: bool) -> dict[str, dict[str, Any]]:
groups: dict[str, list[float | None]] = {}
for r, v in zip(rows, overlay_vals):
key = r.get(field, "") or "(blank)"
groups.setdefault(key, []).append(v)
out: dict[str, dict[str, Any]] = {}
for k in sorted(groups):
sub_seed = 42 + (abs(hash(("split", field, k))) % 1_000_000)
out[k] = _group_stats(
groups[k], include_ci=include_ci, bootstrap_seed=sub_seed
)
return out
return {
"n_total": n_total,
"n_pending": n_pending,
"n_closed": n_closed,
"wr": overall["wr"],
"wr_ci_95": overall["wr_ci_95"],
"expectancy": overall["expectancy"],
"expectancy_ci_95": overall["expectancy_ci_95"],
"per_set": _split("set", include_ci=True),
"per_calitate": _split("calitate", include_ci=True),
# per_directie skips CI per spec (no wr_ci_95 / expectancy_ci_95 keys).
"per_directie": {
k: {"n": v["n"], "wr": v["wr"], "expectancy": v["expectancy"]}
for k, v in _split("directie", include_ci=False).items()
},
}
# ---------------------------------------------------------------------------
# render_stats
# ---------------------------------------------------------------------------
def _fmt_pct(p: float) -> str:
return f"{100.0 * p:5.1f}%"
def _fmt_r(x: float) -> str:
return f"{x:+.2f} R"
def _set_sort_key(name: str) -> tuple[int, str]:
order = ["A1", "A2", "A3", "B", "C", "D", "Other"]
return (order.index(name), name) if name in order else (len(order), name)
def render_stats(stats: dict[str, Any], overlay: str) -> str:
lines: list[str] = []
lines.append(f"=== Stats jurnal.csv (overlay: {overlay}) ===")
lines.append(
f"Trade-uri totale: {stats['n_total']} | "
f"închise: {stats['n_closed']} | pending: {stats['n_pending']}"
)
if stats["n_total"] == 0:
lines.append("")
lines.append("(nu sunt trade-uri backtest în CSV)")
return "\n".join(lines) + "\n"
lines.append("")
lo, hi = stats["wr_ci_95"]
e_lo, e_hi = stats["expectancy_ci_95"]
lines.append(f"GLOBAL (n={stats['n_closed']}):")
lines.append(
f" WR: {_fmt_pct(stats['wr'])} "
f"[95% CI: {_fmt_pct(lo)}, {_fmt_pct(hi)}]"
)
lines.append(
f" Expectancy: {_fmt_r(stats['expectancy'])} "
f"[95% CI: {_fmt_r(e_lo)}, {_fmt_r(e_hi)}]"
)
lines.append("")
def _emit_split(
title: str,
data: dict[str, dict[str, Any]],
*,
sort_keys: list[str] | None = None,
include_ci: bool = True,
) -> None:
lines.append(title)
keys = sort_keys if sort_keys is not None else sorted(data)
for k in keys:
if k not in data:
continue
d = data[k]
if include_ci and "wr_ci_95" in d:
clo, chi = d["wr_ci_95"]
lines.append(
f" {k:<14} n={d['n']:>3} "
f"WR {_fmt_pct(d['wr'])} "
f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] "
f"E {_fmt_r(d['expectancy'])}"
)
else:
lines.append(
f" {k:<14} n={d['n']:>3} "
f"WR {_fmt_pct(d['wr'])} "
f"E {_fmt_r(d['expectancy'])}"
)
lines.append("")
_emit_split(
"PER SET:",
stats["per_set"],
sort_keys=sorted(stats["per_set"], key=_set_sort_key),
)
lines.append(
"PER CALITATE (⚠️ DESCRIPTOR ONLY — biased post-outcome, NU folosi ca filtru):"
)
cal_order = ["Clară", "Mai mare ca impuls", "Slabă", "n/a"]
keys = [k for k in cal_order if k in stats["per_calitate"]] + [
k for k in sorted(stats["per_calitate"]) if k not in cal_order
]
for k in keys:
d = stats["per_calitate"][k]
clo, chi = d["wr_ci_95"]
lines.append(
f" {k:<20} n={d['n']:>3} "
f"WR {_fmt_pct(d['wr'])} "
f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] "
f"E {_fmt_r(d['expectancy'])}"
)
lines.append("")
_emit_split("PER DIRECȚIE:", stats["per_directie"], include_ci=False)
# STOPPING_RULE gate check — flag every Set that hasn't crossed N>=40.
lines.append(f"⚠️ STOPPING RULE check (vezi STOPPING_RULE.md, N>={STOPPING_RULE_N}):")
set_keys = sorted(stats["per_set"], key=_set_sort_key)
any_flagged = False
for k in set_keys:
n = stats["per_set"][k]["n"]
if n < STOPPING_RULE_N:
lines.append(f" {k}: N={n} < {STOPPING_RULE_N} → NEEDS MORE DATA")
any_flagged = True
if not any_flagged:
lines.append(f" toate Set-urile au N>={STOPPING_RULE_N} (eligibile pentru GO LIVE check).")
return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# compute_calibration
# ---------------------------------------------------------------------------
def _calibration_match(field: str, m_val: str, v_val: str, tol: float = 0.01) -> bool:
if field in NUMERIC_CALIBRATION_FIELDS:
try:
return abs(float(m_val) - float(v_val)) <= tol
except ValueError:
return (m_val or "").strip() == (v_val or "").strip()
return (m_val or "").strip() == (v_val or "").strip()
def compute_calibration(
csv_path: Path | str = "data/jurnal.csv",
) -> dict[str, Any]:
"""Pair calibration legs by ``screenshot_file`` and report per-field mismatch.
Returns a dict ``{"n_pairs": int, "fields": {field: {match, mismatch,
match_rate, mismatch_examples}}}``. ``mismatch_examples`` holds up to 3
strings ``"<screenshot_file>: manual=X vs vision=Y"`` per field.
Numeric fields (``entry/sl/tp0/tp1/tp2``) use a tolerance of 0.01;
everything else is exact-string equality after strip.
"""
rows = _load_rows(csv_path)
manual: dict[str, dict[str, str]] = {}
vision: dict[str, dict[str, str]] = {}
for r in rows:
src = r.get("source", "")
if src == "manual_calibration":
manual[r.get("screenshot_file", "")] = r
elif src == "vision_calibration":
vision[r.get("screenshot_file", "")] = r
paired_files = sorted(set(manual) & set(vision))
fields_report: dict[str, dict[str, Any]] = {
f: {
"match": 0,
"mismatch": 0,
"match_rate": 0.0,
"mismatch_examples": [],
}
for f in CORE_CALIBRATION_FIELDS
}
for f in paired_files:
m = manual[f]
v = vision[f]
for fld in CORE_CALIBRATION_FIELDS:
mv = m.get(fld, "")
vv = v.get(fld, "")
if _calibration_match(fld, mv, vv):
fields_report[fld]["match"] += 1
else:
fields_report[fld]["mismatch"] += 1
examples = fields_report[fld]["mismatch_examples"]
if len(examples) < 3:
examples.append(f"{f}: manual={mv!r} vs vision={vv!r}")
for fld, data in fields_report.items():
total = data["match"] + data["mismatch"]
data["match_rate"] = (data["match"] / total) if total else 0.0
return {"n_pairs": len(paired_files), "fields": fields_report}
def render_calibration(cal: dict[str, Any]) -> str:
lines: list[str] = []
lines.append("=== Calibration P4 gate (vezi STOPPING_RULE.md §P4) ===")
lines.append(f"Perechi calibration: {cal['n_pairs']}")
if cal["n_pairs"] == 0:
lines.append("(nu există perechi manual_calibration ∩ vision_calibration)")
return "\n".join(lines) + "\n"
lines.append("")
lines.append(f"{'field':<14} match mismatch rate")
total_mismatches = 0
total_comparisons = 0
for fld in CORE_CALIBRATION_FIELDS:
d = cal["fields"][fld]
n = d["match"] + d["mismatch"]
total_mismatches += d["mismatch"]
total_comparisons += n
lines.append(
f"{fld:<14} {d['match']:>5} {d['mismatch']:>8} "
f"{_fmt_pct(d['match_rate'])}"
)
lines.append("")
overall_match_rate = (
(total_comparisons - total_mismatches) / total_comparisons
if total_comparisons
else 0.0
)
overall_mismatch_rate = 1.0 - overall_match_rate
verdict = "PASS" if overall_mismatch_rate <= 0.10 else "FAIL"
lines.append(
f"Overall mismatch rate: {_fmt_pct(overall_mismatch_rate)} "
f"({total_mismatches}/{total_comparisons}) → P4 gate: {verdict}"
)
has_examples = any(
cal["fields"][f]["mismatch_examples"] for f in CORE_CALIBRATION_FIELDS
)
if has_examples:
lines.append("")
lines.append("Mismatch examples (max 3 per field):")
for fld in CORE_CALIBRATION_FIELDS:
ex = cal["fields"][fld]["mismatch_examples"]
if not ex:
continue
lines.append(f" [{fld}]")
for e in ex:
lines.append(f" - {e}")
return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="stats",
description="Backtest statistics for data/jurnal.csv",
)
parser.add_argument(
"--csv",
type=Path,
default=Path("data/jurnal.csv"),
help="Path to the jurnal CSV (default: data/jurnal.csv).",
)
parser.add_argument(
"--overlay",
choices=("pl_marius", "pl_theoretical"),
default="pl_marius",
help="Which P/L overlay to use (default: pl_marius).",
)
parser.add_argument(
"--calibration",
action="store_true",
help="Show P4 calibration mismatch report instead of backtest stats.",
)
args = parser.parse_args(argv)
try:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore[attr-defined]
except (AttributeError, OSError):
pass
if args.calibration:
cal = compute_calibration(args.csv)
sys.stdout.write(render_calibration(cal))
else:
stats = compute_stats(args.csv, overlay=args.overlay)
sys.stdout.write(render_stats(stats, args.overlay))
return 0
if __name__ == "__main__":
raise SystemExit(main())
# Ensure the canonical CSV schema is importable from one place — fail fast if
# someone removes append_row.CSV_COLUMNS that this module depends on.
assert CSV_COLUMNS is not None