From 34af5b631e15fdc1ee21f5a4c33558d2af080d26 Mon Sep 17 00:00:00 2001 From: Marius Date: Wed, 13 May 2026 12:48:26 +0300 Subject: [PATCH] commands: m2d-log + backtest + batch + stats slash commands (124 tests pass) --- .claude/commands/backtest.md | 73 ++++ .claude/commands/batch.md | 95 +++++ .claude/commands/m2d-log.md | 99 +++++ .claude/commands/stats.md | 42 ++ scripts/stats.py | 801 ++++++++++++++++++----------------- tests/test_stats.py | 648 ++++++++++++++-------------- tests/test_stats_ci.py | 83 ++++ 7 files changed, 1111 insertions(+), 730 deletions(-) create mode 100644 .claude/commands/backtest.md create mode 100644 .claude/commands/batch.md create mode 100644 .claude/commands/m2d-log.md create mode 100644 .claude/commands/stats.md create mode 100644 tests/test_stats_ci.py diff --git a/.claude/commands/backtest.md b/.claude/commands/backtest.md new file mode 100644 index 0000000..b79fc13 --- /dev/null +++ b/.claude/commands/backtest.md @@ -0,0 +1,73 @@ +--- +description: Run vision extraction on a single TradeStation screenshot, then append to jurnal CSV + regenerate MD. +argument-hint: " [--calibration]" +--- + +# /backtest — single screenshot vision extraction + +Lansează subagentul `m2d-extractor` pe un screenshot, primește JSON-ul, append la `data/jurnal.csv`, regenerează `data/jurnal.md`. + +## Arguments + +- `$1` (obligatoriu) — path la screenshot. Acceptă: + - basename (`2026-05-13-dia-1645.png`) — caută în `screenshots/inbox/`, fallback `screenshots/processed/` + - path relativ sau absolut explicit +- `--calibration` (flag) — `source=vision_calibration` în loc de `source=vision`. Folosit împreună cu `/m2d-log --calibration` pe același screenshot pentru P4 mismatch report. + +## Workflow + +1. **Rezolvă path-ul** screenshot-ului. Dacă `$1` e doar basename, încearcă `screenshots/inbox/` apoi `screenshots/processed/`. Dacă nu există nicăieri, raportezi eroare și te oprești. + +2. **Invocă subagentul `m2d-extractor`** (definit în `.claude/agents/m2d-extractor.md`) prin Task tool cu `subagent_type: "m2d-extractor"`. Prompt-ul către agent: + + ``` + screenshot_path: + screenshot_file: + ``` + + Agentul scrie `data/extractions/.json` + `.log` și returnează status-line scurt. + +3. **Verifică output-ul**: + - Dacă fișierul `data/extractions/.json` nu există după ce agentul revine → eroare; raportezi și muți screenshot-ul la `screenshots/needs_review/`. + - Citește JSON-ul. Dacă `confidence == "low"` SAU `ambiguities` non-empty cu `image_unreadable` → muți screenshot-ul la `screenshots/needs_review/`, raportezi, nu apelezi append. + +4. **Append la CSV**: + + ```bash + python -c "from pathlib import Path; from scripts.append_row import append_extraction; import json; r = append_extraction(Path('data/extractions/.json'), source=''); print(json.dumps(r, default=str))" + ``` + + `` = `vision_calibration` dacă `--calibration`, altfel `vision`. + + Parsezi răspunsul. Dacă `status == "rejected"`: + - `reason` conține "duplicate" → screenshot deja procesat cu acest source; raportezi și NU îl muți. + - `reason` conține "validation error" → JSON-ul agentului a fost respins; muți screenshot la `screenshots/needs_review/` și raportezi. + - Alte erori → raportezi și lași screenshot-ul unde e. + +5. **Mută screenshot-ul** la `screenshots/processed/` dacă append-ul a reușit și fișierul originar a fost în `inbox/`. Dacă era deja în `processed/`, nu-l muta. + +6. **Regenerează MD**: + + ```bash + python scripts/regenerate_md.py + ``` + +7. **Raport final** (în română): + + ``` + /backtest → trade # adăugat (source=, set=, pl_marius=, confidence=). + Regenerat data/jurnal.md ( rânduri). + ``` + + Dacă screenshot-ul a fost mutat la `needs_review`: + + ``` + /backtest → NEEDS REVIEW: . Mutat la screenshots/needs_review/. + ``` + +## Reguli + +- O singură invocare per screenshot. Nu reapelezi agentul dacă output-ul e dubios — îl muți la `needs_review` și raportezi. +- NU edita CSV direct. +- NU regenera MD dacă append-ul a fost respins. +- Path discipline: subagentul scrie doar la `data/extractions/`; tu (slash command) muți screenshot-uri și apelezi scripts/. diff --git a/.claude/commands/batch.md b/.claude/commands/batch.md new file mode 100644 index 0000000..8ce74ed --- /dev/null +++ b/.claude/commands/batch.md @@ -0,0 +1,95 @@ +--- +description: Run vision extraction in parallel on multiple screenshots (default screenshots/inbox/), then serial-append the results with partial-failure handling. +argument-hint: "[dir_or_glob] [--limit N] [--calibration]" +--- + +# /batch — parallel vision extraction over multiple screenshots + +Procesează screenshot-uri multiple. Lansează până la **5 subagenți `m2d-extractor` în paralel** (cap rigid — protejează context window și rate limits). După ce toți revin, append-ezi rezultatele **serial** (`append_row` citește/scrie CSV — paralelism la write = corupție garantată). + +## Arguments + +- `$1` (opțional) — director sau glob. Default `screenshots/inbox/`. Exemplu: `screenshots/inbox/2025-09-*.png`. +- `--limit N` (opțional) — procesează doar primele N screenshot-uri (în ordine alfabetică). Default: toate. +- `--calibration` (flag) — `source=vision_calibration` în loc de `vision`. + +## Workflow + +### Fază 1 — Colectează lista + +1. Enumeră fișierele PNG/JPG match-uind argumentul. Sortează alfabetic. Aplică `--limit` dacă există. +2. Dacă lista e goală → raportezi "Nimic de procesat în " și te oprești. +3. Dacă lista are 1 element → sugerează `/backtest` în loc și continuă cu batch. + +### Fază 2 — Extracție paralelă (max 5 concurent) + +Procesezi în **batch-uri de 5**. Pentru fiecare batch: + +- Lansezi câte un Task tool call cu `subagent_type: "m2d-extractor"` pentru fiecare screenshot, ÎN ACELAȘI MESAJ (tool calls paralele). Prompt-ul per agent: + + ``` + screenshot_path: + screenshot_file: + ``` + +- Aștepți să se întoarcă toți cinci. Pentru fiecare, verifici că `data/extractions/.json` a fost scris. +- Treci la următorul batch de 5. + +**De ce 5**: peste 5 sub-agenți paraleli începi să saturezi context window-ul orchestratorului cu output-urile lor și rate limits-urile API-ului. Cap rigid. + +### Fază 3 — Append serial cu partial-failure + +Pentru fiecare screenshot din lista originală, **în ordine**: + +1. Verifică `data/extractions/.json`: + - Lipsă → log "missing JSON, agent abort", mută screenshot-ul la `screenshots/needs_review/`, continuă cu următorul. + - Citește JSON. Dacă `confidence == "low"` SAU `"image_unreadable" in ambiguities` → mută la `needs_review/`, continuă. + +2. Apelează append: + + ```bash + python -c "from pathlib import Path; from scripts.append_row import append_extraction; import json; r = append_extraction(Path('data/extractions/.json'), source=''); print(json.dumps(r, default=str))" + ``` + + `` = `vision_calibration` dacă `--calibration`, altfel `vision`. + +3. Reacționezi la rezultat: + - `status == "ok"` → ține minte ID-ul, mută screenshot la `screenshots/processed/` dacă era în inbox. + - `status == "rejected"`, `reason` conține "duplicate" → ține minte ca skip; NU muta screenshot-ul (deja procesat). + - `status == "rejected"`, alt reason → log motivul, mută la `needs_review/`. + +4. NU oprești batch-ul la primul fail. Continuă până la capăt. + +### Fază 4 — Regenerează MD o singură dată + +După ce toate append-urile s-au terminat (chiar și parțial), rulezi UNA SINGURĂ DATĂ: + +```bash +python scripts/regenerate_md.py +``` + +(Regenerarea după fiecare append e wasteful; CSV-ul e sursa de adevăr, MD-ul e mirror.) + +### Fază 5 — Raport final + +Format: + +``` +/batch terminat. Procesat screenshot-uri. + OK: (trade-uri #, #, ...) + Duplicate: (skipped — deja în CSV) + Needs review: (mutate la screenshots/needs_review/) + - : + - : + Erori: + - : +Regenerat data/jurnal.md ( rânduri). +``` + +## Reguli + +- **Cap concurrency la 5**. Niciodată mai mulți subagenți paraleli — chiar și pentru un batch mare. Procesezi în secvențe de batch-uri de 5. +- **Append serial obligatoriu**. `append_extraction` citește CSV-ul, computează `next_id` și scrie atomic; rulat în paralel ar duce la ID-uri duplicat sau pierderi. +- **Partial failure = continuă**. Un screenshot prost nu blochează restul batch-ului. +- **MD regen o singură dată** la final. +- **Path discipline pentru subagent neschimbată**: agentul scrie doar la `data/extractions/`. Tu, ca orchestrator, muți screenshot-uri. diff --git a/.claude/commands/m2d-log.md b/.claude/commands/m2d-log.md new file mode 100644 index 0000000..a6a0154 --- /dev/null +++ b/.claude/commands/m2d-log.md @@ -0,0 +1,99 @@ +--- +description: Adaugă manual un rând în jurnal.csv (source=manual sau manual_calibration). Pentru calibrare P4 sau forward paper. +argument-hint: "[--calibration] " +--- + +# /m2d-log — manual M2D trade entry + +Marius extrage manual TOATE câmpurile trade-ului. Folosit pentru calibration P4 (împreună cu `/backtest --calibration` pe același screenshot) sau ca log direct fără vision. + +## Workflow + +1. **Parse `$ARGUMENTS`** — detectează flag `--calibration` și ``. Dacă `` lipsește, întreabă user-ul. Calculează `basename = basename()` și `basename_no_ext = basename` minus ultima extensie. + +2. **Promptează user-ul în română**, pe rând, pentru fiecare câmp din schema `M2DExtraction` (vezi `scripts/vision_schema.py`). Ordinea + opțiuni valide: + + - `data` — `YYYY-MM-DD` + - `ora_utc` — `HH:MM` (conversie din RO local: EEST=UTC+3 vară, EET=UTC+2 iarnă; întreabă user-ul direct dacă nu e clar) + - `instrument` — `DIA` / `US30` / `other` + - `directie` — `Buy` / `Sell` + - `tf_mare` — `5min` / `15min` + - `tf_mic` — `1min` / `3min` + - `calitate` — `Clară` / `Mai mare ca impuls` / `Slabă` / `n/a` + - `entry`, `sl`, `tp0`, `tp1`, `tp2` — float-uri + - `risc_pct` — float (ex: `0.12` pentru 0.12%) + - `outcome_path` — `SL` / `TP0→SL` / `TP0→TP1` / `TP0→TP2` / `TP0→pending` / `pending` (UNICODE `→`) + - `max_reached` — `SL_first` / `TP0` / `TP1` / `TP2` + - `be_moved` — `true` / `false` + - `confidence` — default `high` (manual e by definition high) + - `note` — string opțional, default `""` + + `screenshot_file` se setează automat la `basename`; `ambiguities` se setează automat la `[]`. Dacă user-ul dă valoare invalidă, repetă întrebarea. + +3. **Construiește JSON-ul** complet, valid contra `M2DExtraction`. + +4. **Scrie JSON-ul** la `data/extractions/.manual.json` — pretty-print indent 2, UTF-8, newline final. Sufixul `.manual` previne coliziunea cu output-ul vision (`.json`). + +5. **Determină source**: `manual_calibration` dacă `--calibration` e prezent, altfel `manual`. + +6. **Append la CSV**: + + ```bash + python -c "from pathlib import Path; from scripts.append_row import append_extraction; import json; r = append_extraction(Path('data/extractions/.manual.json'), source=''); print(json.dumps(r, default=str))" + ``` + + Parsezi răspunsul JSON. + +7. **Dacă `status == "ok"`**: + + ```bash + python -m scripts.regenerate_md + ``` + + Apoi afișezi: + + ``` + ✅ Trade adăugat la jurnal. ID: . Set: . P/L Marius: . outcome_path: . + ``` + +8. **Dacă `status == "rejected"`**: + + ``` + ❌ Trade respins: + ``` + + NU regenera MD. Dacă `reason` conține "duplicate": + - pentru `--calibration`: spui user-ului că există deja rând `manual_calibration` pentru acest screenshot; nu poți avea două leg-uri manual de calibrare pe același screenshot. + - pentru `source=manual` simplu: user-ul decide dacă suprascrie (atunci șterge manual rândul din `data/jurnal.csv` și re-rulează). + +## Reguli + +- NU edita CSV direct — folosește `append_extraction`. +- NU regenera MD dacă append-ul a fost respins. + +## Output skeleton JSON + +```json +{ + "screenshot_file": "2026-05-13-dia-1645.png", + "data": "2026-05-13", + "ora_utc": "14:45", + "instrument": "DIA", + "directie": "Buy", + "tf_mare": "5min", + "tf_mic": "1min", + "calitate": "Clară", + "entry": 497.42, + "sl": 496.80, + "tp0": 497.67, + "tp1": 497.79, + "tp2": 498.04, + "risc_pct": 0.12, + "outcome_path": "TP0→TP1", + "max_reached": "TP1", + "be_moved": true, + "confidence": "high", + "ambiguities": [], + "note": "" +} +``` diff --git a/.claude/commands/stats.md b/.claude/commands/stats.md new file mode 100644 index 0000000..470e01a --- /dev/null +++ b/.claude/commands/stats.md @@ -0,0 +1,42 @@ +--- +description: Show backtest statistics for data/jurnal.csv (overall, per-Set, per-calitate, per-instrument with Wilson + bootstrap CIs). --calibration shows P4 mismatch report. +argument-hint: "[--calibration] [--seed N]" +--- + +# /stats — backtest statistics + +Rulează `scripts/stats.py` și afișează raportul. + +## Arguments + +- `--calibration` (flag) — afișează raportul P4 (mismatch field-by-field pe perechi `manual_calibration` ↔ `vision_calibration` join-uite pe `screenshot_file`). +- `--seed N` (opțional) — seed pentru bootstrap RNG (default fără seed → output ne-determinist între run-uri). Folosește când vrei reproducibilitate. + +Default (fără flag-uri): backtest stats — overall + per-Set + per-calitate + per-instrument WR, expectancy, Wilson 95% CI pe WR, bootstrap 95% CI pe expectancy. + +## Workflow + +1. Construiește comanda: + + ```bash + python scripts/stats.py [--calibration] [--seed N] + ``` + + `--csv data/jurnal.csv` e default-ul scriptului — nu îl pasezi. + +2. Rulează prin Bash tool. Output-ul vine pe stdout în UTF-8. + +3. Afișează output-ul **as-is** către user. Nu reformata, nu re-rezuma, nu interpreta. Scriptul are deja format ales (tabel + secțiuni text). + +4. **Interpretare** scurtă (max 3 propoziții) DACĂ user-ul cere explicit sau dacă observi ceva ce merită menționat: + - În modul backtest: Set-uri cu N ≥ 40 și Wilson lower bound > 50% → candidat pentru GO LIVE (vezi `STOPPING_RULE.md`). + - În modul `--calibration`: dacă există ≥10 perechi și mismatch rate > 10% pe câmpuri core (`entry/sl/tp0/1/2/outcome_path/max_reached/directie`) → P4 FAIL, vision agent are nevoie de fix (`.claude/agents/m2d-extractor.md`). + +5. NU edita CSV. NU regenera MD (citire pură). + +## Reguli + +- Read-only. Această comandă nu scrie nimic. +- Output-ul scriptului e ground truth — nu inventezi numere. +- `calitate` e descriptor biased (post-outcome) — vezi `STOPPING_RULE.md` §3 — raportul îl afișează informational only. NU sugerezi user-ului să folosească `calitate` ca filtru pentru GO LIVE. +- Pentru calibration P4: minimum 10 perechi pentru ca verdictul să aibă sens. Sub 10 perechi → raportezi "insuficient pentru P4 — continuă să acumulezi calibrare". diff --git a/scripts/stats.py b/scripts/stats.py index 944cef3..24e7a6c 100644 --- a/scripts/stats.py +++ b/scripts/stats.py @@ -1,21 +1,20 @@ """Backtest statistics for ``data/jurnal.csv``. -Outputs: -- Overall + per-Set + per-calitate + per-instrument WR, expectancy. -- Wilson 95% CI for WR (closed form). -- Bootstrap percentile 95% CI for expectancy (deterministic via ``seed``). -- ``--calibration`` mode: joins ``manual_calibration`` rows with their - ``vision_calibration`` counterparts on ``screenshot_file`` and reports - field-by-field mismatch rates for the P4 gate (see ``STOPPING_RULE.md``). +Public API: + - ``compute_stats(csv_path, overlay) -> dict`` + - ``render_stats(stats, overlay) -> str`` + - ``compute_calibration(csv_path) -> dict`` + - ``render_calibration(cal) -> str`` + - ``main()`` — CLI entry point. -A "win" is any trade with ``pl_marius > 0``. Pending trades -(``pl_marius`` blank, i.e. ``outcome_path in {pending, TP0->pending}``) are -excluded from both WR and expectancy: there is no realised outcome yet. +A "win" is a closed trade with ``pl_overlay > 0`` (where ``pl_overlay`` is +either ``pl_marius`` or ``pl_theoretical``). Pending trades — ``pl_marius`` +blank, i.e. ``outcome_path in {pending, TP0->pending}`` — are excluded from +both WR and expectancy: there is no realised outcome yet. -The ``calitate`` field is a known-biased descriptor (post-outcome -classification — see ``STOPPING_RULE.md`` §3). It is reported as -informational only and explicitly flagged as such; do NOT use it as a -filter for GO LIVE decisions. +The ``calitate`` field is a known-biased descriptor: it is classified +post-outcome (see ``STOPPING_RULE.md`` §3). The per-``calitate`` split is +reported with an explicit *descriptor only — biased post-outcome* caveat. """ from __future__ import annotations @@ -23,32 +22,42 @@ from __future__ import annotations import argparse import csv import math -import random import sys -from dataclasses import dataclass, field from pathlib import Path -from typing import Iterable +from typing import Any, Iterable + +import numpy as np + +from scripts.append_row import CSV_COLUMNS __all__ = [ - "CORE_CALIBRATION_FIELDS", "BACKTEST_SOURCES", "CALIBRATION_SOURCES", - "Trade", - "GroupStats", - "load_trades", + "CORE_CALIBRATION_FIELDS", + "NUMERIC_CALIBRATION_FIELDS", + "STOPPING_RULE_N", "wilson_ci", - "bootstrap_ci", - "win_rate", - "expectancy", - "group_by", - "compute_group_stats", - "calibration_mismatch", - "format_report", + "bootstrap_expectancy_ci", + "compute_stats", + "render_stats", + "compute_calibration", + "render_calibration", "main", ] -# Fields compared in the calibration mismatch gate (STOPPING_RULE.md §P4). +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + + +BACKTEST_SOURCES: frozenset[str] = frozenset({"vision", "manual"}) +CALIBRATION_SOURCES: frozenset[str] = frozenset( + {"manual_calibration", "vision_calibration"} +) + + +# Calibration P4 gate (STOPPING_RULE.md §P4) — explicitly reported per field. CORE_CALIBRATION_FIELDS: tuple[str, ...] = ( "entry", "sl", @@ -58,315 +67,205 @@ CORE_CALIBRATION_FIELDS: tuple[str, ...] = ( "outcome_path", "max_reached", "directie", + "instrument", ) -BACKTEST_SOURCES: frozenset[str] = frozenset({"vision", "manual"}) -CALIBRATION_SOURCES: frozenset[str] = frozenset( - {"manual_calibration", "vision_calibration"} +NUMERIC_CALIBRATION_FIELDS: frozenset[str] = frozenset( + {"entry", "sl", "tp0", "tp1", "tp2"} ) +# STOPPING_RULE.md §"GO LIVE" gate: N >= 40 per Set. +STOPPING_RULE_N: int = 40 + + # --------------------------------------------------------------------------- -# Loading / typed access +# Loading # --------------------------------------------------------------------------- -@dataclass(frozen=True) -class Trade: - """One realised (or pending) trade row, typed.""" - - id: int - screenshot_file: str - source: str - data: str - zi: str - ora_ro: str - instrument: str - directie: str - calitate: str - set: str - outcome_path: str - max_reached: str - be_moved: bool - pl_marius: float | None - pl_theoretical: float - raw: dict[str, str] = field(default_factory=dict) - - @property - def is_pending(self) -> bool: - return self.pl_marius is None - - @property - def is_win(self) -> bool: - return self.pl_marius is not None and self.pl_marius > 0 - - def _parse_optional_float(value: str) -> float | None: s = (value or "").strip() if s == "": return None - return float(s) + try: + return float(s) + except ValueError: + return None -def _parse_bool(value: str) -> bool: - return (value or "").strip().lower() in {"true", "1", "yes", "da"} - - -def _row_to_trade(row: dict[str, str]) -> Trade: - return Trade( - id=int(row.get("id") or 0), - screenshot_file=row.get("screenshot_file", ""), - source=row.get("source", ""), - data=row.get("data", ""), - zi=row.get("zi", ""), - ora_ro=row.get("ora_ro", ""), - instrument=row.get("instrument", ""), - directie=row.get("directie", ""), - calitate=row.get("calitate", ""), - set=row.get("set", ""), - outcome_path=row.get("outcome_path", ""), - max_reached=row.get("max_reached", ""), - be_moved=_parse_bool(row.get("be_moved", "")), - pl_marius=_parse_optional_float(row.get("pl_marius", "")), - pl_theoretical=float(row.get("pl_theoretical") or 0.0), - raw=dict(row), - ) - - -def load_trades(csv_path: Path | str) -> list[Trade]: - """Load all rows of ``csv_path`` as :class:`Trade` objects. - - Returns ``[]`` if the file does not exist or is empty. - """ +def _load_rows(csv_path: Path | str) -> list[dict[str, str]]: p = Path(csv_path) if not p.exists() or p.stat().st_size == 0: return [] with p.open("r", encoding="utf-8", newline="") as fh: - reader = csv.DictReader(fh) - return [_row_to_trade(r) for r in reader] + return list(csv.DictReader(fh)) # --------------------------------------------------------------------------- -# Statistics primitives +# CI primitives # --------------------------------------------------------------------------- def wilson_ci(wins: int, n: int, z: float = 1.96) -> tuple[float, float]: """Wilson score interval for a binomial proportion. - Returns ``(lo, hi)`` as proportions in [0, 1]. For ``n == 0`` returns - ``(0.0, 0.0)``. ``z = 1.96`` corresponds to a 95% CI. + Returns ``(lo, hi)`` clamped to ``[0.0, 1.0]``. For ``n == 0`` returns + ``(0.0, 0.0)``. ``z = 1.96`` ≈ 95% confidence. """ if n <= 0: return (0.0, 0.0) if wins < 0 or wins > n: raise ValueError(f"wins={wins} out of range for n={n}") - p_hat = wins / n + p = wins / n denom = 1.0 + (z * z) / n - center = p_hat + (z * z) / (2.0 * n) - half = z * math.sqrt((p_hat * (1.0 - p_hat) + (z * z) / (4.0 * n)) / n) - lo = (center - half) / denom - hi = (center + half) / denom - return (max(0.0, lo), min(1.0, hi)) + center = (p + (z * z) / (2.0 * n)) / denom + spread = z * math.sqrt(p * (1.0 - p) / n + (z * z) / (4.0 * n * n)) / denom + return (max(0.0, center - spread), min(1.0, center + spread)) -def bootstrap_ci( - values: list[float], - *, - iterations: int = 2000, - alpha: float = 0.05, - seed: int | None = None, +def bootstrap_expectancy_ci( + values: list[float] | np.ndarray, + n_resamples: int = 5000, + seed: int = 42, ) -> tuple[float, float]: - """Percentile-method bootstrap CI for the mean of ``values``. + """Percentile-method bootstrap 95% CI for the mean of ``values``. - Deterministic when ``seed`` is provided. Returns ``(lo, hi)``. For - ``len(values) < 2`` returns ``(mean, mean)``. + Deterministic for a given ``seed``. Empty input → ``(0.0, 0.0)``. + Single value → ``(value, value)`` (no variance to resample). """ - if not values: + arr = np.asarray(list(values), dtype=float) + if arr.size == 0: return (0.0, 0.0) - n = len(values) - mean = sum(values) / n - if n < 2 or iterations <= 0: - return (mean, mean) - - rng = random.Random(seed) - means: list[float] = [] - for _ in range(iterations): - s = 0.0 - for _ in range(n): - s += values[rng.randrange(n)] - means.append(s / n) - means.sort() - lo_idx = int(math.floor((alpha / 2.0) * iterations)) - hi_idx = int(math.ceil((1.0 - alpha / 2.0) * iterations)) - 1 - lo_idx = max(0, min(iterations - 1, lo_idx)) - hi_idx = max(0, min(iterations - 1, hi_idx)) - return (means[lo_idx], means[hi_idx]) + if arr.size == 1: + v = float(arr[0]) + return (v, v) + rng = np.random.default_rng(seed) + boots = np.empty(n_resamples, dtype=float) + n = arr.size + for i in range(n_resamples): + idx = rng.integers(0, n, size=n) + boots[i] = float(arr[idx].mean()) + lo = float(np.percentile(boots, 2.5)) + hi = float(np.percentile(boots, 97.5)) + return (lo, hi) -def win_rate(trades: Iterable[Trade]) -> tuple[int, int, float]: - """Return ``(wins, n_resolved, wr)`` ignoring pending trades.""" - resolved = [t for t in trades if not t.is_pending] - wins = sum(1 for t in resolved if t.is_win) - n = len(resolved) +# --------------------------------------------------------------------------- +# compute_stats +# --------------------------------------------------------------------------- + + +def _group_stats( + overlay_values: list[float | None], + *, + include_ci: bool, + bootstrap_seed: int, +) -> dict[str, Any]: + closed = [v for v in overlay_values if v is not None] + n = len(closed) + wins = sum(1 for v in closed if v > 0) wr = (wins / n) if n else 0.0 - return wins, n, wr - - -def expectancy(trades: Iterable[Trade], overlay: str = "pl_marius") -> float: - """Mean P/L (in R) over non-pending trades, on the given overlay.""" - if overlay not in {"pl_marius", "pl_theoretical"}: - raise ValueError(f"unknown overlay {overlay!r}") - if overlay == "pl_marius": - vals = [t.pl_marius for t in trades if t.pl_marius is not None] - else: - vals = [t.pl_theoretical for t in trades if not t.is_pending] - if not vals: - return 0.0 - return sum(vals) / len(vals) - - -# --------------------------------------------------------------------------- -# Group stats -# --------------------------------------------------------------------------- - - -@dataclass(frozen=True) -class GroupStats: - key: str - n_total: int - n_resolved: int - wins: int - wr: float - wr_ci_lo: float - wr_ci_hi: float - exp_marius: float - exp_marius_ci_lo: float - exp_marius_ci_hi: float - exp_theoretical: float - exp_theoretical_ci_lo: float - exp_theoretical_ci_hi: float - - -def group_by(trades: Iterable[Trade], field_name: str) -> dict[str, list[Trade]]: - out: dict[str, list[Trade]] = {} - for t in trades: - key = getattr(t, field_name, "") or "(blank)" - out.setdefault(key, []).append(t) + out: dict[str, Any] = { + "n": n, + "wr": wr, + "expectancy": (sum(closed) / n) if n else 0.0, + } + if include_ci: + out["wr_ci_95"] = wilson_ci(wins, n) + out["expectancy_ci_95"] = bootstrap_expectancy_ci( + closed, seed=bootstrap_seed + ) return out -def compute_group_stats( - trades: list[Trade], - *, - label: str, - bootstrap_iterations: int = 2000, - seed: int | None = None, -) -> GroupStats: - wins, n_resolved, wr = win_rate(trades) - wr_lo, wr_hi = wilson_ci(wins, n_resolved) - - pl_m_vals = [t.pl_marius for t in trades if t.pl_marius is not None] - exp_m = (sum(pl_m_vals) / len(pl_m_vals)) if pl_m_vals else 0.0 - exp_m_lo, exp_m_hi = bootstrap_ci( - pl_m_vals, iterations=bootstrap_iterations, seed=seed - ) - - pl_t_vals = [t.pl_theoretical for t in trades if not t.is_pending] - exp_t = (sum(pl_t_vals) / len(pl_t_vals)) if pl_t_vals else 0.0 - exp_t_lo, exp_t_hi = bootstrap_ci( - pl_t_vals, - iterations=bootstrap_iterations, - seed=None if seed is None else seed + 1, - ) - - return GroupStats( - key=label, - n_total=len(trades), - n_resolved=n_resolved, - wins=wins, - wr=wr, - wr_ci_lo=wr_lo, - wr_ci_hi=wr_hi, - exp_marius=exp_m, - exp_marius_ci_lo=exp_m_lo, - exp_marius_ci_hi=exp_m_hi, - exp_theoretical=exp_t, - exp_theoretical_ci_lo=exp_t_lo, - exp_theoretical_ci_hi=exp_t_hi, - ) +def _overlay_value(row: dict[str, str], overlay: str) -> float | None: + raw = row.get(overlay, "") + return _parse_optional_float(raw) -# --------------------------------------------------------------------------- -# Calibration mode -# --------------------------------------------------------------------------- +def compute_stats( + csv_path: Path | str = "data/jurnal.csv", + overlay: str = "pl_marius", +) -> dict[str, Any]: + """Compute aggregate WR + expectancy stats over the backtest rows. + Calibration rows (``manual_calibration`` / ``vision_calibration``) are + excluded; use :func:`compute_calibration` for the P4 mismatch report. -@dataclass(frozen=True) -class CalibrationReport: - pairs: int - field_mismatches: dict[str, int] - total_comparisons: int - - @property - def overall_mismatch_rate(self) -> float: - if self.total_comparisons == 0: - return 0.0 - total = sum(self.field_mismatches.values()) - return total / self.total_comparisons - - -def _normalise_for_compare(field_name: str, value: str) -> str: - s = (value or "").strip() - if field_name in {"entry", "sl", "tp0", "tp1", "tp2"}: - try: - return f"{float(s):.4f}" - except ValueError: - return s - return s - - -def calibration_mismatch( - trades: Iterable[Trade], - *, - fields: tuple[str, ...] = CORE_CALIBRATION_FIELDS, -) -> CalibrationReport: - """Pair ``manual_calibration`` and ``vision_calibration`` rows by - ``screenshot_file``, then count mismatches per ``fields``. - - Returns a :class:`CalibrationReport`. Unpaired calibration rows are - silently ignored — they cannot contribute to a comparison. + ``overlay`` selects the P/L column: ``"pl_marius"`` (default — the real + overlay Marius trades) or ``"pl_theoretical"`` (1/3-1/3-1/3 hold-to-TP2). """ - manual: dict[str, Trade] = {} - vision: dict[str, Trade] = {} - for t in trades: - if t.source == "manual_calibration": - manual[t.screenshot_file] = t - elif t.source == "vision_calibration": - vision[t.screenshot_file] = t + if overlay not in {"pl_marius", "pl_theoretical"}: + raise ValueError(f"unknown overlay {overlay!r}") - paired_files = sorted(set(manual) & set(vision)) - field_mismatches: dict[str, int] = {f: 0 for f in fields} - for f in paired_files: - m = manual[f] - v = vision[f] - for fld in fields: - mv = _normalise_for_compare(fld, m.raw.get(fld, "")) - vv = _normalise_for_compare(fld, v.raw.get(fld, "")) - if mv != vv: - field_mismatches[fld] += 1 + rows = [r for r in _load_rows(csv_path) if r.get("source", "") in BACKTEST_SOURCES] - total_comparisons = len(paired_files) * len(fields) - return CalibrationReport( - pairs=len(paired_files), - field_mismatches=field_mismatches, - total_comparisons=total_comparisons, + if not rows: + return { + "n_total": 0, + "n_pending": 0, + "n_closed": 0, + "wr": 0.0, + "wr_ci_95": (0.0, 0.0), + "expectancy": 0.0, + "expectancy_ci_95": (0.0, 0.0), + "per_set": {}, + "per_calitate": {}, + "per_directie": {}, + } + + # Pending status is overlay-independent: a trade is pending iff + # pl_marius is blank (outcome_path in {pending, TP0->pending}). + # pl_theoretical is concrete even for pending rows, so it would otherwise + # let pending trades sneak into the closed-trades stats — we mask those + # out explicitly here. + pending_mask = [_parse_optional_float(r.get("pl_marius", "")) is None for r in rows] + overlay_vals: list[float | None] = [] + for r, is_pending in zip(rows, pending_mask): + overlay_vals.append(None if is_pending else _overlay_value(r, overlay)) + n_total = len(rows) + n_pending = sum(1 for p in pending_mask if p) + n_closed = n_total - n_pending + + overall = _group_stats( + overlay_vals, include_ci=True, bootstrap_seed=42 ) + def _split(field: str, include_ci: bool) -> dict[str, dict[str, Any]]: + groups: dict[str, list[float | None]] = {} + for r, v in zip(rows, overlay_vals): + key = r.get(field, "") or "(blank)" + groups.setdefault(key, []).append(v) + out: dict[str, dict[str, Any]] = {} + for k in sorted(groups): + sub_seed = 42 + (abs(hash(("split", field, k))) % 1_000_000) + out[k] = _group_stats( + groups[k], include_ci=include_ci, bootstrap_seed=sub_seed + ) + return out + + return { + "n_total": n_total, + "n_pending": n_pending, + "n_closed": n_closed, + "wr": overall["wr"], + "wr_ci_95": overall["wr_ci_95"], + "expectancy": overall["expectancy"], + "expectancy_ci_95": overall["expectancy_ci_95"], + "per_set": _split("set", include_ci=True), + "per_calitate": _split("calitate", include_ci=True), + # per_directie skips CI per spec (no wr_ci_95 / expectancy_ci_95 keys). + "per_directie": { + k: {"n": v["n"], "wr": v["wr"], "expectancy": v["expectancy"]} + for k, v in _split("directie", include_ci=False).items() + }, + } + # --------------------------------------------------------------------------- -# Reporting +# render_stats # --------------------------------------------------------------------------- @@ -375,110 +274,228 @@ def _fmt_pct(p: float) -> str: def _fmt_r(x: float) -> str: - return f"{x:+.3f}R" + return f"{x:+.2f} R" -def _fmt_stats_row(s: GroupStats) -> str: - return ( - f"{s.key:<14} N={s.n_total:>3} (resolved {s.n_resolved:>3}) " - f"WR={_fmt_pct(s.wr)} [{_fmt_pct(s.wr_ci_lo)}, {_fmt_pct(s.wr_ci_hi)}] " - f"E_marius={_fmt_r(s.exp_marius)} " - f"[{_fmt_r(s.exp_marius_ci_lo)}, {_fmt_r(s.exp_marius_ci_hi)}] " - f"E_theor={_fmt_r(s.exp_theoretical)}" - ) +def _set_sort_key(name: str) -> tuple[int, str]: + order = ["A1", "A2", "A3", "B", "C", "D", "Other"] + return (order.index(name), name) if name in order else (len(order), name) -def format_report( - trades: list[Trade], - *, - bootstrap_iterations: int = 2000, - seed: int | None = None, -) -> str: - """Render the main stats report. - - Only ``source in {vision, manual}`` rows are included in the WR / - expectancy computations; calibration rows are reported separately via - ``--calibration``. - """ - backtest = [t for t in trades if t.source in BACKTEST_SOURCES] +def render_stats(stats: dict[str, Any], overlay: str) -> str: lines: list[str] = [] - lines.append("=== M2D Backtest Stats ===") - lines.append(f"Backtest rows: {len(backtest)} (calibration excluded)") - lines.append("") - - if not backtest: - lines.append("(no backtest trades yet)") - return "\n".join(lines) - - overall = compute_group_stats( - backtest, - label="OVERALL", - bootstrap_iterations=bootstrap_iterations, - seed=seed, - ) - lines.append("-- Overall --") - lines.append(_fmt_stats_row(overall)) - lines.append("") - - def _emit_group(title: str, field_name: str, key_order: list[str] | None = None) -> None: - lines.append(f"-- By {title} --") - groups = group_by(backtest, field_name) - keys = key_order if key_order is not None else sorted(groups) - for k in keys: - if k not in groups: - continue - sub_seed = None if seed is None else seed + abs(hash(k)) % 10_000 - s = compute_group_stats( - groups[k], - label=k, - bootstrap_iterations=bootstrap_iterations, - seed=sub_seed, - ) - lines.append(_fmt_stats_row(s)) - lines.append("") - - _emit_group( - "Set", - "set", - key_order=["A1", "A2", "A3", "B", "C", "D", "Other"], - ) - _emit_group("Instrument", "instrument") + lines.append(f"=== Stats jurnal.csv (overlay: {overlay}) ===") lines.append( - "[!] By calitate — descriptor only (post-outcome, biased; do not use " - "as a GO LIVE filter — see STOPPING_RULE.md §3)." - ) - _emit_group( - "calitate", - "calitate", - key_order=["Clară", "Mai mare ca impuls", "Slabă", "n/a"], + f"Trade-uri totale: {stats['n_total']} | " + f"închise: {stats['n_closed']} | pending: {stats['n_pending']}" ) - return "\n".join(lines).rstrip() + "\n" - - -def format_calibration_report(trades: list[Trade]) -> str: - cal = calibration_mismatch(trades) - lines: list[str] = [] - lines.append("=== Calibration P4 gate ===") - lines.append(f"Paired screenshots (manual ∩ vision): {cal.pairs}") - if cal.pairs == 0: - lines.append("(no calibration pairs yet)") + if stats["n_total"] == 0: + lines.append("") + lines.append("(nu sunt trade-uri backtest în CSV)") return "\n".join(lines) + "\n" lines.append("") - lines.append(f"{'field':<14} mismatches / pairs rate") - for fld in CORE_CALIBRATION_FIELDS: - m = cal.field_mismatches.get(fld, 0) - rate = (m / cal.pairs) if cal.pairs else 0.0 - lines.append(f"{fld:<14} {m:>3} / {cal.pairs:<3} {_fmt_pct(rate)}") - lines.append("") + lo, hi = stats["wr_ci_95"] + e_lo, e_hi = stats["expectancy_ci_95"] + lines.append(f"GLOBAL (n={stats['n_closed']}):") lines.append( - f"Overall mismatch rate: {_fmt_pct(cal.overall_mismatch_rate)} " - f"({sum(cal.field_mismatches.values())} of {cal.total_comparisons} comparisons)" + f" WR: {_fmt_pct(stats['wr'])} " + f"[95% CI: {_fmt_pct(lo)}, {_fmt_pct(hi)}]" ) - threshold = 0.10 - verdict = "PASS" if cal.overall_mismatch_rate <= threshold else "FAIL" - lines.append(f"P4 gate (<= 10%): {verdict}") + lines.append( + f" Expectancy: {_fmt_r(stats['expectancy'])} " + f"[95% CI: {_fmt_r(e_lo)}, {_fmt_r(e_hi)}]" + ) + lines.append("") + + def _emit_split( + title: str, + data: dict[str, dict[str, Any]], + *, + sort_keys: list[str] | None = None, + include_ci: bool = True, + ) -> None: + lines.append(title) + keys = sort_keys if sort_keys is not None else sorted(data) + for k in keys: + if k not in data: + continue + d = data[k] + if include_ci and "wr_ci_95" in d: + clo, chi = d["wr_ci_95"] + lines.append( + f" {k:<14} n={d['n']:>3} " + f"WR {_fmt_pct(d['wr'])} " + f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] " + f"E {_fmt_r(d['expectancy'])}" + ) + else: + lines.append( + f" {k:<14} n={d['n']:>3} " + f"WR {_fmt_pct(d['wr'])} " + f"E {_fmt_r(d['expectancy'])}" + ) + lines.append("") + + _emit_split( + "PER SET:", + stats["per_set"], + sort_keys=sorted(stats["per_set"], key=_set_sort_key), + ) + + lines.append( + "PER CALITATE (⚠️ DESCRIPTOR ONLY — biased post-outcome, NU folosi ca filtru):" + ) + cal_order = ["Clară", "Mai mare ca impuls", "Slabă", "n/a"] + keys = [k for k in cal_order if k in stats["per_calitate"]] + [ + k for k in sorted(stats["per_calitate"]) if k not in cal_order + ] + for k in keys: + d = stats["per_calitate"][k] + clo, chi = d["wr_ci_95"] + lines.append( + f" {k:<20} n={d['n']:>3} " + f"WR {_fmt_pct(d['wr'])} " + f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] " + f"E {_fmt_r(d['expectancy'])}" + ) + lines.append("") + + _emit_split("PER DIRECȚIE:", stats["per_directie"], include_ci=False) + + # STOPPING_RULE gate check — flag every Set that hasn't crossed N>=40. + lines.append(f"⚠️ STOPPING RULE check (vezi STOPPING_RULE.md, N>={STOPPING_RULE_N}):") + set_keys = sorted(stats["per_set"], key=_set_sort_key) + any_flagged = False + for k in set_keys: + n = stats["per_set"][k]["n"] + if n < STOPPING_RULE_N: + lines.append(f" {k}: N={n} < {STOPPING_RULE_N} → NEEDS MORE DATA") + any_flagged = True + if not any_flagged: + lines.append(f" toate Set-urile au N>={STOPPING_RULE_N} (eligibile pentru GO LIVE check).") + + return "\n".join(lines) + "\n" + + +# --------------------------------------------------------------------------- +# compute_calibration +# --------------------------------------------------------------------------- + + +def _calibration_match(field: str, m_val: str, v_val: str, tol: float = 0.01) -> bool: + if field in NUMERIC_CALIBRATION_FIELDS: + try: + return abs(float(m_val) - float(v_val)) <= tol + except ValueError: + return (m_val or "").strip() == (v_val or "").strip() + return (m_val or "").strip() == (v_val or "").strip() + + +def compute_calibration( + csv_path: Path | str = "data/jurnal.csv", +) -> dict[str, Any]: + """Pair calibration legs by ``screenshot_file`` and report per-field mismatch. + + Returns a dict ``{"n_pairs": int, "fields": {field: {match, mismatch, + match_rate, mismatch_examples}}}``. ``mismatch_examples`` holds up to 3 + strings ``": manual=X vs vision=Y"`` per field. + + Numeric fields (``entry/sl/tp0/tp1/tp2``) use a tolerance of 0.01; + everything else is exact-string equality after strip. + """ + rows = _load_rows(csv_path) + manual: dict[str, dict[str, str]] = {} + vision: dict[str, dict[str, str]] = {} + for r in rows: + src = r.get("source", "") + if src == "manual_calibration": + manual[r.get("screenshot_file", "")] = r + elif src == "vision_calibration": + vision[r.get("screenshot_file", "")] = r + + paired_files = sorted(set(manual) & set(vision)) + fields_report: dict[str, dict[str, Any]] = { + f: { + "match": 0, + "mismatch": 0, + "match_rate": 0.0, + "mismatch_examples": [], + } + for f in CORE_CALIBRATION_FIELDS + } + + for f in paired_files: + m = manual[f] + v = vision[f] + for fld in CORE_CALIBRATION_FIELDS: + mv = m.get(fld, "") + vv = v.get(fld, "") + if _calibration_match(fld, mv, vv): + fields_report[fld]["match"] += 1 + else: + fields_report[fld]["mismatch"] += 1 + examples = fields_report[fld]["mismatch_examples"] + if len(examples) < 3: + examples.append(f"{f}: manual={mv!r} vs vision={vv!r}") + + for fld, data in fields_report.items(): + total = data["match"] + data["mismatch"] + data["match_rate"] = (data["match"] / total) if total else 0.0 + + return {"n_pairs": len(paired_files), "fields": fields_report} + + +def render_calibration(cal: dict[str, Any]) -> str: + lines: list[str] = [] + lines.append("=== Calibration P4 gate (vezi STOPPING_RULE.md §P4) ===") + lines.append(f"Perechi calibration: {cal['n_pairs']}") + if cal["n_pairs"] == 0: + lines.append("(nu există perechi manual_calibration ∩ vision_calibration)") + return "\n".join(lines) + "\n" + + lines.append("") + lines.append(f"{'field':<14} match mismatch rate") + total_mismatches = 0 + total_comparisons = 0 + for fld in CORE_CALIBRATION_FIELDS: + d = cal["fields"][fld] + n = d["match"] + d["mismatch"] + total_mismatches += d["mismatch"] + total_comparisons += n + lines.append( + f"{fld:<14} {d['match']:>5} {d['mismatch']:>8} " + f"{_fmt_pct(d['match_rate'])}" + ) + + lines.append("") + overall_match_rate = ( + (total_comparisons - total_mismatches) / total_comparisons + if total_comparisons + else 0.0 + ) + overall_mismatch_rate = 1.0 - overall_match_rate + verdict = "PASS" if overall_mismatch_rate <= 0.10 else "FAIL" + lines.append( + f"Overall mismatch rate: {_fmt_pct(overall_mismatch_rate)} " + f"({total_mismatches}/{total_comparisons}) → P4 gate: {verdict}" + ) + + has_examples = any( + cal["fields"][f]["mismatch_examples"] for f in CORE_CALIBRATION_FIELDS + ) + if has_examples: + lines.append("") + lines.append("Mismatch examples (max 3 per field):") + for fld in CORE_CALIBRATION_FIELDS: + ex = cal["fields"][fld]["mismatch_examples"] + if not ex: + continue + lines.append(f" [{fld}]") + for e in ex: + lines.append(f" - {e}") + return "\n".join(lines) + "\n" @@ -498,43 +515,37 @@ def main(argv: list[str] | None = None) -> int: default=Path("data/jurnal.csv"), help="Path to the jurnal CSV (default: data/jurnal.csv).", ) + parser.add_argument( + "--overlay", + choices=("pl_marius", "pl_theoretical"), + default="pl_marius", + help="Which P/L overlay to use (default: pl_marius).", + ) parser.add_argument( "--calibration", action="store_true", help="Show P4 calibration mismatch report instead of backtest stats.", ) - parser.add_argument( - "--bootstrap-iterations", - type=int, - default=2000, - help="Bootstrap iterations for expectancy CI (default: 2000).", - ) - parser.add_argument( - "--seed", - type=int, - default=None, - help="Seed for the bootstrap RNG (set for deterministic output).", - ) args = parser.parse_args(argv) - trades = load_trades(args.csv) - if args.calibration: - out = format_calibration_report(trades) - else: - out = format_report( - trades, - bootstrap_iterations=args.bootstrap_iterations, - seed=args.seed, - ) - # Force UTF-8 on stdout: the report contains diacritics ("Clară", "Slabă") - # and a console codepage like cp1252 would crash on those. try: sys.stdout.reconfigure(encoding="utf-8") # type: ignore[attr-defined] except (AttributeError, OSError): pass - sys.stdout.write(out) + + if args.calibration: + cal = compute_calibration(args.csv) + sys.stdout.write(render_calibration(cal)) + else: + stats = compute_stats(args.csv, overlay=args.overlay) + sys.stdout.write(render_stats(stats, args.overlay)) return 0 if __name__ == "__main__": raise SystemExit(main()) + + +# Ensure the canonical CSV schema is importable from one place — fail fast if +# someone removes append_row.CSV_COLUMNS that this module depends on. +assert CSV_COLUMNS is not None diff --git a/tests/test_stats.py b/tests/test_stats.py index 0de1d07..bd12a41 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -1,4 +1,5 @@ -"""Tests for scripts/stats.py.""" +"""CSV-fixture tests for scripts.stats — compute_stats, render_stats, +compute_calibration, render_calibration, main().""" from __future__ import annotations @@ -12,24 +13,17 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from scripts.append_row import CSV_COLUMNS # noqa: E402 from scripts.stats import ( # noqa: E402 - BACKTEST_SOURCES, CORE_CALIBRATION_FIELDS, - bootstrap_ci, - calibration_mismatch, - compute_group_stats, - expectancy, - format_calibration_report, - format_report, - group_by, - load_trades, + compute_calibration, + compute_stats, main, - win_rate, - wilson_ci, + render_calibration, + render_stats, ) # --------------------------------------------------------------------------- -# Synthetic CSV fixture: 30 trades +# Fixture row builder # --------------------------------------------------------------------------- @@ -78,55 +72,61 @@ def _write_csv(path: Path, rows: list[dict[str, str]]) -> None: w.writerow({k: r.get(k, "") for k in CSV_COLUMNS}) -def _synthetic_30(tmp_path: Path) -> Path: - """30 vision-source trades engineered for known stats. +# Outcome templates (P/L values) — match scripts.pl_calc tables. +_SL = {"outcome_path": "SL", "max_reached": "SL_first", "be_moved": "False", + "pl_marius": "-1.0000", "pl_theoretical": "-1.0000"} +_TP0_SL_BE = {"outcome_path": "TP0→SL", "max_reached": "TP0", "be_moved": "True", + "pl_marius": "0.2000", "pl_theoretical": "0.1330"} +_TP0_TP1 = {"outcome_path": "TP0→TP1", "max_reached": "TP1", "be_moved": "True", + "pl_marius": "0.5000", "pl_theoretical": "0.3330"} +_TP0_TP2 = {"outcome_path": "TP0→TP2", "max_reached": "TP2", "be_moved": "True", + "pl_marius": "0.5000", "pl_theoretical": "0.6670"} +_PENDING = {"outcome_path": "pending", "max_reached": "TP0", "be_moved": "False", + "pl_marius": "", "pl_theoretical": "0.1330"} - Layout (by Set): - - A1: 10 trades — 6 wins TP0->TP1 (+0.5), 4 losses SL (-1.0) → WR 60% - - A2: 10 trades — 7 wins TP0->TP2 (+0.5), 3 losses SL (-1.0) → WR 70% - - A3: 10 trades — 4 wins TP0->TP1 (+0.5), 6 losses SL (-1.0) → WR 40% - Overall: 17 wins / 30, WR ≈ 56.67%. +def _synthetic_csv(tmp_path: Path) -> Path: + """30-trade backtest fixture. + + Set distribution: + A1: 8 rows (all closed; 3 SL, 2 TP0→SL, 2 TP0→TP1, 1 TP0→TP2) + A2: 10 rows (all closed; 4 SL, 3 TP0→SL, 2 TP0→TP1, 1 TP0→TP2) + B : 7 rows (2 pending, 5 closed; 2 SL, 2 TP0→TP1, 1 TP0→TP2) + D : 5 rows (3 pending, 2 closed; 1 SL, 1 TP0→TP1) + + Totals: n_total=30, n_pending=5, n_closed=25. + + Wins by pl_marius (>0): all TP0→SL_BE + TP0→TP1 + TP0→TP2 + A1: 2 + 2 + 1 = 5 wins / 8 + A2: 3 + 2 + 1 = 6 wins / 10 + B : 0 + 2 + 1 = 3 wins / 5 + D : 0 + 1 + 0 = 1 win / 2 + Total wins = 15 / 25 = 60.0%. + + Calitate distribution: half "Clară", half "Slabă" (alternating). + Directie distribution: 2/3 Buy, 1/3 Sell. """ rows: list[dict[str, str]] = [] rid = 0 - def add(set_label: str, n_win: int, n_loss: int, calitate: str = "Clară") -> None: + def add(set_label: str, outcomes: list[dict[str, str]]) -> None: nonlocal rid - for _ in range(n_win): + for i, outcome in enumerate(outcomes): rid += 1 - rows.append( - _base_row( - id=rid, - screenshot_file=f"win-{rid}.png", - set=set_label, - calitate=calitate, - outcome_path="TP0→TP1", - max_reached="TP1", - be_moved="True", - pl_marius="0.5000", - pl_theoretical="0.3330", - ) - ) - for _ in range(n_loss): - rid += 1 - rows.append( - _base_row( - id=rid, - screenshot_file=f"loss-{rid}.png", - set=set_label, - calitate=calitate, - outcome_path="SL", - max_reached="SL_first", - be_moved="False", - pl_marius="-1.0000", - pl_theoretical="-1.0000", - ) + row = _base_row( + id=rid, + screenshot_file=f"{set_label.lower()}-{rid}.png", + set=set_label, + calitate="Clară" if rid % 2 == 0 else "Slabă", + directie="Buy" if rid % 3 != 0 else "Sell", ) + row.update({k: str(v) for k, v in outcome.items()}) + rows.append(row) - add("A1", 6, 4) - add("A2", 7, 3) - add("A3", 4, 6) + add("A1", [_SL] * 3 + [_TP0_SL_BE] * 2 + [_TP0_TP1] * 2 + [_TP0_TP2] * 1) + add("A2", [_SL] * 4 + [_TP0_SL_BE] * 3 + [_TP0_TP1] * 2 + [_TP0_TP2] * 1) + add("B", [_PENDING] * 2 + [_SL] * 2 + [_TP0_TP1] * 2 + [_TP0_TP2] * 1) + add("D", [_PENDING] * 3 + [_SL] * 1 + [_TP0_TP1] * 1) path = tmp_path / "jurnal.csv" _write_csv(path, rows) @@ -134,336 +134,314 @@ def _synthetic_30(tmp_path: Path) -> Path: # --------------------------------------------------------------------------- -# Wilson CI — reference values +# compute_stats — core # --------------------------------------------------------------------------- -class TestWilsonCI: - def test_n_zero(self) -> None: - assert wilson_ci(0, 0) == (0.0, 0.0) +class TestComputeStats: + def test_compute_stats_n_pending(self, tmp_path: Path) -> None: + path = _synthetic_csv(tmp_path) + s = compute_stats(path) + assert s["n_total"] == 30 + assert s["n_pending"] == 5 + assert s["n_closed"] == 25 - def test_50pct_at_n40(self) -> None: - lo, hi = wilson_ci(20, 40) - assert lo == pytest.approx(0.3519927879709976, abs=1e-9) - assert hi == pytest.approx(0.6480072120290024, abs=1e-9) + def test_compute_stats_wr_correct(self, tmp_path: Path) -> None: + """Manual win count: 15 / 25 = 60.0%.""" + path = _synthetic_csv(tmp_path) + s = compute_stats(path) + assert s["wr"] == pytest.approx(15 / 25) + lo, hi = s["wr_ci_95"] + assert 0.0 <= lo <= s["wr"] <= hi <= 1.0 - def test_55pct_at_n40(self) -> None: - lo, hi = wilson_ci(22, 40) - assert lo == pytest.approx(0.3982882988844078, abs=1e-9) - assert hi == pytest.approx(0.6929492471905531, abs=1e-9) + def test_compute_stats_per_set(self, tmp_path: Path) -> None: + path = _synthetic_csv(tmp_path) + s = compute_stats(path) + a2 = s["per_set"]["A2"] + assert a2["n"] == 10 # 10 closed A2 trades + # A2 wins (pl_marius > 0): 3 BE + 2 TP1 + 1 TP2 = 6 / 10 + assert a2["wr"] == pytest.approx(0.60) - def test_55pct_at_n100(self) -> None: - # Larger N tightens the CI; lower bound rises above 45%. - lo, hi = wilson_ci(55, 100) - assert lo == pytest.approx(0.4524442703164345, abs=1e-9) - assert hi == pytest.approx(0.6438562489359655, abs=1e-9) - assert lo > 0.45 # STOPPING_RULE GO-LIVE gate + def test_per_set_b_pending_excluded(self, tmp_path: Path) -> None: + """Set B has 7 total rows (2 pending + 5 closed). n must be 5.""" + path = _synthetic_csv(tmp_path) + s = compute_stats(path) + assert s["per_set"]["B"]["n"] == 5 + # B wins: 0 BE + 2 TP1 + 1 TP2 = 3 / 5 + assert s["per_set"]["B"]["wr"] == pytest.approx(0.60) - def test_zero_wins(self) -> None: - lo, hi = wilson_ci(0, 10) - assert lo == pytest.approx(0.0, abs=1e-12) - assert hi == pytest.approx(0.2775401687666165, abs=1e-9) + def test_per_directie_no_ci_keys(self, tmp_path: Path) -> None: + """per_directie omits CI fields per spec (only n / wr / expectancy).""" + path = _synthetic_csv(tmp_path) + s = compute_stats(path) + for k, d in s["per_directie"].items(): + assert set(d.keys()) == {"n", "wr", "expectancy"}, k - def test_all_wins(self) -> None: - lo, hi = wilson_ci(10, 10) - assert lo == pytest.approx(0.7224598312333834, abs=1e-9) - assert hi == pytest.approx(1.0, abs=1e-12) + def test_overlay_theoretical_vs_marius(self, tmp_path: Path) -> None: + path = _synthetic_csv(tmp_path) + s_m = compute_stats(path, overlay="pl_marius") + s_t = compute_stats(path, overlay="pl_theoretical") + # Same N, but different expectancy. + assert s_m["n_closed"] == s_t["n_closed"] + assert s_m["expectancy"] != s_t["expectancy"] - def test_wins_out_of_range(self) -> None: + def test_unknown_overlay_raises(self, tmp_path: Path) -> None: + path = _synthetic_csv(tmp_path) with pytest.raises(ValueError): - wilson_ci(11, 10) - with pytest.raises(ValueError): - wilson_ci(-1, 10) + compute_stats(path, overlay="pl_imaginary") + def test_empty_csv_no_crash(self, tmp_path: Path) -> None: + path = tmp_path / "empty.csv" + _write_csv(path, []) + s = compute_stats(path) + assert s["n_total"] == 0 + assert s["n_closed"] == 0 + assert s["per_set"] == {} + assert s["wr"] == 0.0 + assert s["wr_ci_95"] == (0.0, 0.0) -# --------------------------------------------------------------------------- -# Bootstrap CI — determinism + sanity -# --------------------------------------------------------------------------- + def test_missing_csv_no_crash(self, tmp_path: Path) -> None: + # Nonexistent path: treat as empty, do not raise. + s = compute_stats(tmp_path / "ghost.csv") + assert s["n_total"] == 0 - -class TestBootstrapCI: - def test_deterministic_with_seed(self) -> None: - vals = [0.5, -1.0, 0.5, 0.5, -1.0, 0.2, -0.3, 0.5, -1.0, 0.5] - lo1, hi1 = bootstrap_ci(vals, iterations=500, seed=42) - lo2, hi2 = bootstrap_ci(vals, iterations=500, seed=42) - assert (lo1, hi1) == (lo2, hi2) - - def test_different_seed_different_result(self) -> None: - vals = [0.5, -1.0, 0.5, 0.5, -1.0, 0.2, -0.3, 0.5, -1.0, 0.5] - r1 = bootstrap_ci(vals, iterations=500, seed=1) - r2 = bootstrap_ci(vals, iterations=500, seed=2) - assert r1 != r2 - - def test_brackets_the_mean(self) -> None: - vals = [0.5, -1.0, 0.5, 0.5, -1.0, 0.2, -0.3, 0.5, -1.0, 0.5] * 5 - mean = sum(vals) / len(vals) - lo, hi = bootstrap_ci(vals, iterations=1000, seed=7) - assert lo <= mean <= hi - - def test_empty_input(self) -> None: - assert bootstrap_ci([], iterations=100, seed=0) == (0.0, 0.0) - - def test_single_value(self) -> None: - lo, hi = bootstrap_ci([0.5], iterations=100, seed=0) - # No variance with n=1: short-circuited to (mean, mean). - assert lo == pytest.approx(0.5) - assert hi == pytest.approx(0.5) - - -# --------------------------------------------------------------------------- -# Loading + group stats on the 30-trade fixture -# --------------------------------------------------------------------------- - - -class TestSyntheticFixture: - def test_load_30(self, tmp_path: Path) -> None: - path = _synthetic_30(tmp_path) - trades = load_trades(path) - assert len(trades) == 30 - assert all(t.source == "vision" for t in trades) - - def test_overall_wr(self, tmp_path: Path) -> None: - trades = load_trades(_synthetic_30(tmp_path)) - wins, n, wr = win_rate(trades) - assert wins == 17 - assert n == 30 - assert wr == pytest.approx(17 / 30) - - def test_overall_expectancy(self, tmp_path: Path) -> None: - trades = load_trades(_synthetic_30(tmp_path)) - # 17 wins * 0.5 + 13 losses * -1.0 = 8.5 - 13.0 = -4.5 → mean = -0.15 - assert expectancy(trades) == pytest.approx(-0.15, abs=1e-9) - - def test_per_set_wr(self, tmp_path: Path) -> None: - trades = load_trades(_synthetic_30(tmp_path)) - by_set = group_by(trades, "set") - wr_a1 = win_rate(by_set["A1"])[2] - wr_a2 = win_rate(by_set["A2"])[2] - wr_a3 = win_rate(by_set["A3"])[2] - assert wr_a1 == pytest.approx(0.60) - assert wr_a2 == pytest.approx(0.70) - assert wr_a3 == pytest.approx(0.40) - - def test_group_stats_a2(self, tmp_path: Path) -> None: - trades = load_trades(_synthetic_30(tmp_path)) - a2 = [t for t in trades if t.set == "A2"] - s = compute_group_stats( - a2, label="A2", bootstrap_iterations=500, seed=11 - ) - assert s.n_total == 10 - assert s.n_resolved == 10 - assert s.wins == 7 - assert s.wr == pytest.approx(0.70) - # Wilson 7/10 - assert s.wr_ci_lo == pytest.approx(0.3967732199795652, abs=1e-9) - assert s.wr_ci_hi == pytest.approx(0.892210712513788, abs=1e-9) - # Expectancy A2 = 7*0.5 + 3*(-1.0) = 0.5 → mean = 0.05 - assert s.exp_marius == pytest.approx(0.05, abs=1e-9) - assert s.exp_marius_ci_lo <= s.exp_marius <= s.exp_marius_ci_hi - - -# --------------------------------------------------------------------------- -# Pending-trade handling -# --------------------------------------------------------------------------- - - -class TestPendingHandling: - def test_pending_excluded_from_wr(self, tmp_path: Path) -> None: + def test_calibration_rows_excluded(self, tmp_path: Path) -> None: rows = [ - _base_row( - id=1, screenshot_file="a.png", - outcome_path="TP0→TP1", max_reached="TP1", - be_moved="True", pl_marius="0.5000", pl_theoretical="0.3330", - ), - _base_row( - id=2, screenshot_file="b.png", - outcome_path="pending", max_reached="TP0", - be_moved="False", pl_marius="", pl_theoretical="0.1330", - ), - _base_row( - id=3, screenshot_file="c.png", - outcome_path="SL", max_reached="SL_first", - be_moved="False", pl_marius="-1.0000", pl_theoretical="-1.0000", - ), + _base_row(id=1, source="vision", screenshot_file="v.png"), + _base_row(id=2, source="manual_calibration", screenshot_file="c.png"), + _base_row(id=3, source="vision_calibration", screenshot_file="c.png"), ] - p = tmp_path / "j.csv" - _write_csv(p, rows) - trades = load_trades(p) - - wins, n, wr = win_rate(trades) - assert wins == 1 - assert n == 2 # pending excluded - assert wr == pytest.approx(0.5) - # Expectancy on pl_marius averages only resolved rows: (0.5 + -1.0) / 2 = -0.25 - assert expectancy(trades, "pl_marius") == pytest.approx(-0.25) + path = tmp_path / "j.csv" + _write_csv(path, rows) + s = compute_stats(path) + assert s["n_total"] == 1 # calibration rows filtered out # --------------------------------------------------------------------------- -# Source filtering: calibration rows excluded from main report +# render_stats # --------------------------------------------------------------------------- -class TestSourceFiltering: - def test_calibration_rows_excluded_from_backtest_stats( - self, tmp_path: Path - ) -> None: - rows = [ - _base_row(id=1, source="vision", screenshot_file="v.png", - pl_marius="0.5000"), - _base_row(id=2, source="manual", screenshot_file="m.png", - pl_marius="0.5000"), - _base_row(id=3, source="manual_calibration", screenshot_file="c.png", - pl_marius="-1.0000"), - _base_row(id=4, source="vision_calibration", screenshot_file="c.png", - pl_marius="-1.0000"), - ] - p = tmp_path / "j.csv" - _write_csv(p, rows) - trades = load_trades(p) - backtest = [t for t in trades if t.source in BACKTEST_SOURCES] - assert len(backtest) == 2 - wins, n, wr = win_rate(backtest) - assert (wins, n) == (2, 2) - assert wr == pytest.approx(1.0) +class TestRenderStats: + def test_render_stats_no_crash(self, tmp_path: Path) -> None: + path = _synthetic_csv(tmp_path) + s = compute_stats(path) + out = render_stats(s, "pl_marius") + assert isinstance(out, str) + assert out # non-empty + assert "STOPPING RULE" in out + + def test_render_stats_contains_sections(self, tmp_path: Path) -> None: + path = _synthetic_csv(tmp_path) + out = render_stats(compute_stats(path), "pl_marius") + for marker in ( + "Stats jurnal.csv", + "Trade-uri totale", + "GLOBAL", + "PER SET:", + "PER CALITATE", + "PER DIRECȚIE", + "DESCRIPTOR ONLY", + ): + assert marker in out, f"missing section: {marker!r}" + + def test_render_stats_flags_under_threshold(self, tmp_path: Path) -> None: + """All Sets in synthetic fixture have N<40 → all should be flagged.""" + path = _synthetic_csv(tmp_path) + out = render_stats(compute_stats(path), "pl_marius") + for k in ("A1", "A2", "B", "D"): + assert f"{k}: N=" in out + assert "NEEDS MORE DATA" in out + + def test_render_stats_empty(self, tmp_path: Path) -> None: + path = tmp_path / "empty.csv" + _write_csv(path, []) + out = render_stats(compute_stats(path), "pl_marius") + assert "Trade-uri totale: 0" in out + # No crash, no per-Set table for an empty dataset. + assert "NEEDS MORE DATA" not in out # --------------------------------------------------------------------------- -# Calibration mode: pairing + mismatch +# compute_calibration # --------------------------------------------------------------------------- -class TestCalibration: - def test_pairs_and_zero_mismatch(self, tmp_path: Path) -> None: - m = _base_row( - id=1, source="manual_calibration", screenshot_file="cal-1.png" - ) - v = _base_row( - id=2, source="vision_calibration", screenshot_file="cal-1.png" - ) - p = tmp_path / "j.csv" - _write_csv(p, [m, v]) - trades = load_trades(p) - rep = calibration_mismatch(trades) - assert rep.pairs == 1 - assert sum(rep.field_mismatches.values()) == 0 - assert rep.overall_mismatch_rate == 0.0 - - def test_one_field_mismatch(self, tmp_path: Path) -> None: - m = _base_row( - id=1, source="manual_calibration", screenshot_file="cal-1.png", - entry="400.0", - ) - v = _base_row( - id=2, source="vision_calibration", screenshot_file="cal-1.png", - entry="400.10", # different entry - ) - p = tmp_path / "j.csv" - _write_csv(p, [m, v]) - trades = load_trades(p) - rep = calibration_mismatch(trades) - assert rep.pairs == 1 - assert rep.field_mismatches["entry"] == 1 - # all other core fields match +class TestComputeCalibration: + def test_compute_calibration_pairs(self, tmp_path: Path) -> None: + rows: list[dict[str, str]] = [] + for i in range(5): + f = f"cal-{i}.png" + rows.append(_base_row( + id=i * 2 + 1, source="manual_calibration", screenshot_file=f + )) + rows.append(_base_row( + id=i * 2 + 2, source="vision_calibration", screenshot_file=f + )) + path = tmp_path / "j.csv" + _write_csv(path, rows) + cal = compute_calibration(path) + assert cal["n_pairs"] == 5 for fld in CORE_CALIBRATION_FIELDS: - if fld == "entry": - continue - assert rep.field_mismatches[fld] == 0 - # 1 mismatch / (1 pair * 8 fields) = 12.5% - assert rep.overall_mismatch_rate == pytest.approx(1.0 / len(CORE_CALIBRATION_FIELDS)) + assert fld in cal["fields"] + # All identical → 5 matches, 0 mismatches per field. + assert cal["fields"][fld]["match"] == 5 + assert cal["fields"][fld]["mismatch"] == 0 + assert cal["fields"][fld]["match_rate"] == pytest.approx(1.0) - def test_unpaired_rows_ignored(self, tmp_path: Path) -> None: - # Only a manual leg — no pair → 0 pairs. - m = _base_row( - id=1, source="manual_calibration", screenshot_file="lonely.png" - ) - p = tmp_path / "j.csv" - _write_csv(p, [m]) - trades = load_trades(p) - rep = calibration_mismatch(trades) - assert rep.pairs == 0 - assert rep.total_comparisons == 0 - assert rep.overall_mismatch_rate == 0.0 + def test_compute_calibration_mismatch_examples(self, tmp_path: Path) -> None: + """Modify entry on 2 pairs → mismatch_examples contains both.""" + rows: list[dict[str, str]] = [] + for i in range(5): + f = f"cal-{i}.png" + manual_entry = "400.0" + # First two pairs differ on entry; the rest match exactly. + vision_entry = "401.5" if i < 2 else "400.0" + rows.append(_base_row( + id=i * 2 + 1, source="manual_calibration", + screenshot_file=f, entry=manual_entry, + )) + rows.append(_base_row( + id=i * 2 + 2, source="vision_calibration", + screenshot_file=f, entry=vision_entry, + )) + path = tmp_path / "j.csv" + _write_csv(path, rows) + cal = compute_calibration(path) + assert cal["n_pairs"] == 5 + entry = cal["fields"]["entry"] + assert entry["match"] == 3 + assert entry["mismatch"] == 2 + assert entry["match_rate"] == pytest.approx(3 / 5) + assert len(entry["mismatch_examples"]) == 2 + for ex in entry["mismatch_examples"]: + assert "manual=" in ex and "vision=" in ex - def test_numeric_equivalence_tolerated(self, tmp_path: Path) -> None: - """'400' and '400.0000' should NOT count as a mismatch on entry.""" - m = _base_row( - id=1, source="manual_calibration", screenshot_file="cal-1.png", - entry="400", - ) - v = _base_row( - id=2, source="vision_calibration", screenshot_file="cal-1.png", - entry="400.0000", - ) - p = tmp_path / "j.csv" - _write_csv(p, [m, v]) - rep = calibration_mismatch(load_trades(p)) - assert rep.field_mismatches["entry"] == 0 + def test_calibration_examples_capped_at_3(self, tmp_path: Path) -> None: + """5 mismatches but mismatch_examples is capped at 3.""" + rows: list[dict[str, str]] = [] + for i in range(5): + f = f"cal-{i}.png" + rows.append(_base_row( + id=i * 2 + 1, source="manual_calibration", + screenshot_file=f, entry="400.0", + )) + rows.append(_base_row( + id=i * 2 + 2, source="vision_calibration", + screenshot_file=f, entry="500.0", + )) + path = tmp_path / "j.csv" + _write_csv(path, rows) + cal = compute_calibration(path) + assert cal["fields"]["entry"]["mismatch"] == 5 + assert len(cal["fields"]["entry"]["mismatch_examples"]) == 3 - -# --------------------------------------------------------------------------- -# Report formatting + CLI -# --------------------------------------------------------------------------- - - -class TestReporting: - def test_format_report_contains_sections(self, tmp_path: Path) -> None: - out = format_report( - load_trades(_synthetic_30(tmp_path)), - bootstrap_iterations=200, - seed=0, - ) - assert "M2D Backtest Stats" in out - assert "Overall" in out - assert "By Set" in out - assert "A1" in out and "A2" in out and "A3" in out - # calitate warning present - assert "descriptor only" in out.lower() or "biased" in out.lower() - - def test_format_calibration_report(self, tmp_path: Path) -> None: + def test_calibration_numeric_tolerance(self, tmp_path: Path) -> None: + """Floats within 0.01 must NOT count as a mismatch.""" rows = [ _base_row( - id=1, source="manual_calibration", screenshot_file="cal-1.png" + id=1, source="manual_calibration", + screenshot_file="cal-1.png", entry="400.005", ), _base_row( - id=2, source="vision_calibration", screenshot_file="cal-1.png", - directie="Sell", # mismatch on directie - entry="400.0", sl="401.0", tp0="399.5", tp1="399.0", tp2="398.0", + id=2, source="vision_calibration", + screenshot_file="cal-1.png", entry="400.010", ), ] - p = tmp_path / "j.csv" - _write_csv(p, rows) - out = format_calibration_report(load_trades(p)) - assert "Paired screenshots" in out + path = tmp_path / "j.csv" + _write_csv(path, rows) + cal = compute_calibration(path) + assert cal["fields"]["entry"]["match"] == 1 + assert cal["fields"]["entry"]["mismatch"] == 0 + + def test_calibration_outside_tolerance(self, tmp_path: Path) -> None: + """Floats > 0.01 apart DO count as a mismatch.""" + rows = [ + _base_row( + id=1, source="manual_calibration", + screenshot_file="cal-1.png", entry="400.00", + ), + _base_row( + id=2, source="vision_calibration", + screenshot_file="cal-1.png", entry="400.05", + ), + ] + path = tmp_path / "j.csv" + _write_csv(path, rows) + cal = compute_calibration(path) + assert cal["fields"]["entry"]["mismatch"] == 1 + + def test_calibration_no_pairs(self, tmp_path: Path) -> None: + """No paired screenshot → n_pairs=0, all rates 0.0.""" + path = tmp_path / "j.csv" + _write_csv(path, [ + _base_row(id=1, source="manual_calibration", screenshot_file="lonely.png"), + ]) + cal = compute_calibration(path) + assert cal["n_pairs"] == 0 + for fld in CORE_CALIBRATION_FIELDS: + assert cal["fields"][fld]["match"] == 0 + assert cal["fields"][fld]["mismatch"] == 0 + + def test_render_calibration_no_crash(self, tmp_path: Path) -> None: + rows = [ + _base_row(id=1, source="manual_calibration", + screenshot_file="cal-1.png", directie="Buy"), + _base_row(id=2, source="vision_calibration", + screenshot_file="cal-1.png", directie="Sell", + entry="400.0", sl="401.0", tp0="399.5", + tp1="399.0", tp2="398.0"), + ] + path = tmp_path / "j.csv" + _write_csv(path, rows) + out = render_calibration(compute_calibration(path)) + assert "Calibration P4" in out assert "directie" in out - # 1 mismatch (directie) of 8 fields = 12.5% → FAIL P4 gate - assert "FAIL" in out - def test_empty_csv_report(self, tmp_path: Path) -> None: - p = tmp_path / "empty.csv" - _write_csv(p, []) - out = format_report(load_trades(p)) - assert "no backtest trades" in out.lower() + def test_render_calibration_empty(self, tmp_path: Path) -> None: + path = tmp_path / "empty.csv" + _write_csv(path, []) + out = render_calibration(compute_calibration(path)) + assert "0" in out + assert "FAIL" not in out + assert "PASS" not in out - def test_main_cli_runs( + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +class TestCLI: + def test_main_stats( self, tmp_path: Path, capsys: pytest.CaptureFixture ) -> None: - path = _synthetic_30(tmp_path) - rc = main(["--csv", str(path), "--seed", "0", "--bootstrap-iterations", "100"]) + path = _synthetic_csv(tmp_path) + rc = main(["--csv", str(path)]) assert rc == 0 - captured = capsys.readouterr() - assert "M2D Backtest Stats" in captured.out + assert "Stats jurnal.csv" in capsys.readouterr().out - def test_main_cli_calibration( + def test_main_overlay( + self, tmp_path: Path, capsys: pytest.CaptureFixture + ) -> None: + path = _synthetic_csv(tmp_path) + rc = main(["--csv", str(path), "--overlay", "pl_theoretical"]) + assert rc == 0 + assert "pl_theoretical" in capsys.readouterr().out + + def test_main_calibration( self, tmp_path: Path, capsys: pytest.CaptureFixture ) -> None: rows = [ - _base_row(id=1, source="manual_calibration", screenshot_file="cal-1.png"), - _base_row(id=2, source="vision_calibration", screenshot_file="cal-1.png"), + _base_row(id=1, source="manual_calibration", + screenshot_file="cal-1.png"), + _base_row(id=2, source="vision_calibration", + screenshot_file="cal-1.png"), ] - p = tmp_path / "j.csv" - _write_csv(p, rows) - rc = main(["--csv", str(p), "--calibration"]) + path = tmp_path / "j.csv" + _write_csv(path, rows) + rc = main(["--csv", str(path), "--calibration"]) assert rc == 0 out = capsys.readouterr().out - assert "Calibration P4 gate" in out - assert "PASS" in out # all fields match → PASS + assert "Calibration P4" in out + assert "PASS" in out diff --git a/tests/test_stats_ci.py b/tests/test_stats_ci.py new file mode 100644 index 0000000..12d5367 --- /dev/null +++ b/tests/test_stats_ci.py @@ -0,0 +1,83 @@ +"""Pure-math tests for stats CI primitives (no I/O).""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from scripts.stats import bootstrap_expectancy_ci, wilson_ci # noqa: E402 + + +# --------------------------------------------------------------------------- +# Wilson CI +# --------------------------------------------------------------------------- + + +class TestWilsonCI: + def test_wilson_n_zero(self) -> None: + assert wilson_ci(0, 0) == (0.0, 0.0) + + def test_wilson_perfect_winrate(self) -> None: + lo, hi = wilson_ci(10, 10) + assert lo > 0.65 + assert hi == pytest.approx(1.0, abs=1e-12) + + def test_wilson_reference_15_55(self) -> None: + """wins=8, n=15 (WR≈53%) → CI approximately [29%, 76%] ±2%.""" + lo, hi = wilson_ci(8, 15) + assert lo == pytest.approx(0.29, abs=0.02) + assert hi == pytest.approx(0.76, abs=0.02) + + def test_wilson_all_losses(self) -> None: + lo, hi = wilson_ci(0, 10) + assert lo == pytest.approx(0.0, abs=1e-12) + assert hi < 0.35 + + def test_wilson_wins_out_of_range(self) -> None: + with pytest.raises(ValueError): + wilson_ci(11, 10) + with pytest.raises(ValueError): + wilson_ci(-1, 10) + + def test_wilson_clamps_at_50pct_n40(self) -> None: + """Reference at WR=50%, N=40: CI ≈ [35.2%, 64.8%].""" + lo, hi = wilson_ci(20, 40) + assert lo == pytest.approx(0.352, abs=0.005) + assert hi == pytest.approx(0.648, abs=0.005) + + +# --------------------------------------------------------------------------- +# Bootstrap CI +# --------------------------------------------------------------------------- + + +class TestBootstrap: + def test_bootstrap_deterministic(self) -> None: + values = [1.0, -0.5, 0.5, -1.0] + a = bootstrap_expectancy_ci(values, n_resamples=1000, seed=42) + b = bootstrap_expectancy_ci(values, n_resamples=1000, seed=42) + assert a == b + + def test_bootstrap_different_seed_different_result(self) -> None: + values = [1.0, -0.5, 0.5, -1.0, 0.2, -0.3, 0.5] + a = bootstrap_expectancy_ci(values, n_resamples=1000, seed=1) + b = bootstrap_expectancy_ci(values, n_resamples=1000, seed=2) + assert a != b + + def test_bootstrap_empty(self) -> None: + assert bootstrap_expectancy_ci([], n_resamples=100, seed=0) == (0.0, 0.0) + + def test_bootstrap_single_value(self) -> None: + lo, hi = bootstrap_expectancy_ci([0.5], n_resamples=100, seed=0) + assert lo == pytest.approx(0.5, abs=1e-9) + assert hi == pytest.approx(0.5, abs=1e-9) + + def test_bootstrap_brackets_the_mean(self) -> None: + values = [0.5, -1.0, 0.5, 0.5, -1.0, 0.2, -0.3, 0.5, -1.0, 0.5] * 5 + mean = sum(values) / len(values) + lo, hi = bootstrap_expectancy_ci(values, n_resamples=1000, seed=7) + assert lo <= mean <= hi