diff --git a/scripts/append_row.py b/scripts/append_row.py index 93d7587..6e1202a 100644 --- a/scripts/append_row.py +++ b/scripts/append_row.py @@ -1,22 +1,26 @@ -"""Append a validated M2D extraction to ``data/trades.csv``. +"""Append a validated M2D extraction to ``data/jurnal.csv``. Pipeline: JSON file --> pydantic validate (M2DExtraction) - --> load data/_meta.yaml (versions + schema) - --> compute ora_ro, zi, set, pl_marius, pl_theoretical + --> load data/_meta.yaml (versions) + --> compute id, ora_ro, zi, set, pl_marius, pl_theoretical, extracted_at --> dedup on (screenshot_file, source) - --> atomic CSV write (temp file + os.replace) + --> atomic CSV write (sibling .tmp + os.replace) Source values - - ``manual`` : Marius logged by hand - ``vision`` : produced by the vision subagent + - ``manual`` : Marius logged by hand - ``manual_calibration`` : calibration P4 — manual leg - ``vision_calibration`` : calibration P4 — vision leg A row with ``source=manual_calibration`` and a row with ``source=vision_calibration`` -for the *same* screenshot are allowed to coexist (different dedup keys); a -duplicate ``(screenshot_file, source)`` pair is rejected (or skipped — see -``append_row`` ``on_duplicate`` argument). +for the *same* screenshot are allowed to coexist (different dedup keys). + +Failure mode: ``append_extraction`` NEVER raises. On any error (missing JSON, +pydantic ValidationError, dedup hit, etc.) it returns +``{"status": "rejected", "reason": "...", "id": None, "row": None}`` so the +caller (a slash command) can decide what to do with the screenshot +(move to ``needs_review/``, log to workflow, etc.). """ from __future__ import annotations @@ -24,41 +28,43 @@ from __future__ import annotations import csv import json import os -import tempfile +import traceback +from datetime import datetime, timezone from pathlib import Path from typing import Any, Literal import yaml +from pydantic import ValidationError from scripts.calendar_parse import calc_set, load_calendar, utc_to_ro from scripts.pl_calc import pl_marius, pl_theoretical -from scripts.vision_schema import M2DExtraction, parse_extraction_dict +from scripts.vision_schema import M2DExtraction, parse_extraction __all__ = [ "CSV_COLUMNS", "VALID_SOURCES", - "build_row", - "read_rows", - "append_row", - "append_row_from_json", + "ZI_RO_MAP", + "csv_columns", + "append_extraction", ] -Source = Literal["manual", "vision", "manual_calibration", "vision_calibration"] +Source = Literal["vision", "manual", "manual_calibration", "vision_calibration"] VALID_SOURCES: frozenset[str] = frozenset( - {"manual", "vision", "manual_calibration", "vision_calibration"} + {"vision", "manual", "manual_calibration", "vision_calibration"} ) +# Canonical column order (29) — must stay stable; regenerate_md + stats depend on it. CSV_COLUMNS: tuple[str, ...] = ( + "id", "screenshot_file", "source", "data", - "ora_utc", - "ora_ro", "zi", - "set", + "ora_ro", + "ora_utc", "instrument", "directie", "tf_mare", @@ -73,17 +79,38 @@ CSV_COLUMNS: tuple[str, ...] = ( "outcome_path", "max_reached", "be_moved", - "confidence", - "ambiguities", - "note", "pl_marius", "pl_theoretical", + "set", "indicator_version", "pl_overlay_version", "csv_schema_version", + "extracted_at", + "note", ) +ZI_RO_MAP: dict[str, str] = { + "Mon": "Lu", + "Tue": "Ma", + "Wed": "Mi", + "Thu": "Jo", + "Fri": "Vi", + "Sat": "Sa", + "Sun": "Du", +} + + +def csv_columns() -> list[str]: + """Return the 29-column header in canonical order.""" + return list(CSV_COLUMNS) + + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + + def _load_meta(meta_path: Path) -> dict[str, Any]: with meta_path.open("r", encoding="utf-8") as fh: meta = yaml.safe_load(fh) or {} @@ -94,35 +121,69 @@ def _load_meta(meta_path: Path) -> dict[str, Any]: return meta +def _read_existing_rows(csv_path: Path) -> list[dict[str, str]]: + if not csv_path.exists() or csv_path.stat().st_size == 0: + return [] + with csv_path.open("r", encoding="utf-8", newline="") as fh: + reader = csv.DictReader(fh) + return list(reader) + + +def _next_id(rows: list[dict[str, str]]) -> int: + max_id = 0 + for r in rows: + raw = r.get("id", "") + if not raw: + continue + try: + v = int(raw) + except (TypeError, ValueError): + continue + if v > max_id: + max_id = v + return max_id + 1 + + def _format_optional(value: float | None) -> str: return "" if value is None else f"{value:.4f}" -def build_row( +def _write_csv_atomic( + csv_path: Path, rows: list[dict[str, str]], columns: list[str] +) -> None: + csv_path.parent.mkdir(parents=True, exist_ok=True) + tmp = csv_path.with_suffix(csv_path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=columns) + writer.writeheader() + for row in rows: + writer.writerow({k: row.get(k, "") for k in columns}) + os.replace(tmp, csv_path) + + +def _build_row( extraction: M2DExtraction, + *, source: str, + row_id: int, meta: dict[str, Any], calendar: list[dict[str, Any]], + extracted_at: str, ) -> dict[str, str]: - """Compute the full CSV row dict for one extraction.""" - if source not in VALID_SOURCES: - raise ValueError( - f"invalid source {source!r}; must be one of {sorted(VALID_SOURCES)}" - ) - - d_ro, t_ro, zi = utc_to_ro(extraction.data, extraction.ora_utc) - set_label = calc_set(d_ro, t_ro, zi, calendar) + d_ro, t_ro, day_short = utc_to_ro(extraction.data, extraction.ora_utc) + set_label = calc_set(d_ro, t_ro, day_short, calendar) pl_m = pl_marius(extraction.outcome_path, extraction.be_moved) pl_t = pl_theoretical(extraction.max_reached) + zi_ro = ZI_RO_MAP[day_short] return { + "id": str(row_id), "screenshot_file": extraction.screenshot_file, "source": source, "data": extraction.data, - "ora_utc": extraction.ora_utc, + "zi": zi_ro, "ora_ro": t_ro.strftime("%H:%M"), - "zi": zi, - "set": set_label, + "ora_utc": extraction.ora_utc, "instrument": extraction.instrument, "directie": extraction.directie, "tf_mare": extraction.tf_mare, @@ -136,102 +197,115 @@ def build_row( "risc_pct": f"{extraction.risc_pct}", "outcome_path": extraction.outcome_path, "max_reached": extraction.max_reached, - "be_moved": "true" if extraction.be_moved else "false", - "confidence": extraction.confidence, - "ambiguities": json.dumps(extraction.ambiguities, ensure_ascii=False), - "note": extraction.note, + "be_moved": str(extraction.be_moved), "pl_marius": _format_optional(pl_m), "pl_theoretical": _format_optional(pl_t), + "set": set_label, "indicator_version": str(meta["indicator_version"]), "pl_overlay_version": str(meta["pl_overlay_version"]), "csv_schema_version": str(meta["csv_schema_version"]), + "extracted_at": extracted_at, + "note": extraction.note, } -def read_rows(csv_path: Path) -> list[dict[str, str]]: - """Read existing rows; return [] if the file does not exist or is empty.""" - if not csv_path.exists() or csv_path.stat().st_size == 0: - return [] - with csv_path.open("r", encoding="utf-8", newline="") as fh: - reader = csv.DictReader(fh) - return list(reader) +def _reject(reason: str) -> dict[str, Any]: + return {"status": "rejected", "reason": reason, "id": None, "row": None} -def _atomic_write(csv_path: Path, rows: list[dict[str, str]]) -> None: - csv_path.parent.mkdir(parents=True, exist_ok=True) - fd, tmp_name = tempfile.mkstemp( - prefix=csv_path.name + ".", - suffix=".tmp", - dir=str(csv_path.parent), - ) - try: - with os.fdopen(fd, "w", encoding="utf-8", newline="") as fh: - writer = csv.DictWriter(fh, fieldnames=list(CSV_COLUMNS)) - writer.writeheader() - for r in rows: - writer.writerow({k: r.get(k, "") for k in CSV_COLUMNS}) - os.replace(tmp_name, csv_path) - except Exception: - try: - os.unlink(tmp_name) - except OSError: - pass - raise +# --------------------------------------------------------------------------- +# public API +# --------------------------------------------------------------------------- -def append_row( - extraction: M2DExtraction, +def append_extraction( + json_path: Path | str, source: str, - csv_path: Path, - meta_path: Path, - calendar_path: Path, - on_duplicate: Literal["raise", "skip"] = "raise", -) -> dict[str, str]: - """Append one extraction to the CSV. + csv_path: Path | str = "data/jurnal.csv", + meta_path: Path | str = "data/_meta.yaml", + calendar_path: Path | str = "calendar_evenimente.yaml", +) -> dict[str, Any]: + """Append one validated extraction to the jurnal CSV. - Dedup key: ``(screenshot_file, source)``. If a row with the same key - already exists, behaviour is controlled by ``on_duplicate``: + Never raises. Returns one of: - - ``"raise"`` (default): raise ``ValueError``. - - ``"skip"``: leave the CSV untouched and return the *existing* row. + - ``{"status": "ok", "reason": "", "id": , "row": }`` + - ``{"status": "rejected", "reason": , "id": None, "row": None}`` """ - meta = _load_meta(meta_path) - calendar = load_calendar(calendar_path) - row = build_row(extraction, source, meta, calendar) + json_path = Path(json_path) + csv_path = Path(csv_path) + meta_path = Path(meta_path) + calendar_path = Path(calendar_path) - existing = read_rows(csv_path) - key = (row["screenshot_file"], row["source"]) + if source not in VALID_SOURCES: + return _reject( + f"invalid source {source!r}; must be one of {sorted(VALID_SOURCES)}" + ) + + if not json_path.exists(): + return _reject(f"JSON file not found: {json_path}") + + try: + with json_path.open("r", encoding="utf-8") as fh: + raw = fh.read() + except OSError as exc: + return _reject(f"failed to read JSON {json_path}: {exc}") + + try: + extraction = parse_extraction(raw) + except ValidationError as exc: + return _reject(f"validation error: {exc}") + except (ValueError, json.JSONDecodeError) as exc: + return _reject(f"validation error (json parse): {exc}") + + try: + meta = _load_meta(meta_path) + except (FileNotFoundError, OSError) as exc: + return _reject(f"_meta.yaml not found: {exc}") + except (ValueError, yaml.YAMLError) as exc: + return _reject(f"_meta.yaml invalid: {exc}") + + try: + calendar = load_calendar(calendar_path) + except (FileNotFoundError, OSError) as exc: + return _reject(f"calendar not found: {exc}") + except (ValueError, yaml.YAMLError) as exc: + return _reject(f"calendar invalid: {exc}") + + try: + existing = _read_existing_rows(csv_path) + except OSError as exc: + return _reject(f"failed to read existing CSV {csv_path}: {exc}") + + key = (extraction.screenshot_file, source) for r in existing: if (r.get("screenshot_file"), r.get("source")) == key: - if on_duplicate == "skip": - return r - raise ValueError( - f"duplicate row: screenshot_file={key[0]!r} source={key[1]!r} " - f"already exists in {csv_path}" + return _reject( + f"duplicate row: screenshot_file={key[0]!r} source={key[1]!r}" ) - existing.append(row) - _atomic_write(csv_path, existing) - return row - - -def append_row_from_json( - json_path: Path, - source: str, - csv_path: Path, - meta_path: Path, - calendar_path: Path, - on_duplicate: Literal["raise", "skip"] = "raise", -) -> dict[str, str]: - """Convenience wrapper: load JSON, validate, append.""" - with Path(json_path).open("r", encoding="utf-8") as fh: - payload = json.load(fh) - extraction = parse_extraction_dict(payload) - return append_row( - extraction=extraction, - source=source, - csv_path=csv_path, - meta_path=meta_path, - calendar_path=calendar_path, - on_duplicate=on_duplicate, + row_id = _next_id(existing) + extracted_at = ( + datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z" ) + + try: + row = _build_row( + extraction, + source=source, + row_id=row_id, + meta=meta, + calendar=calendar, + extracted_at=extracted_at, + ) + except (KeyError, ValueError) as exc: + return _reject(f"derived-field computation failed: {exc}") + + try: + _write_csv_atomic(csv_path, [*existing, row], list(CSV_COLUMNS)) + except OSError as exc: + return _reject( + f"atomic write failed: {exc}\n{traceback.format_exc()}" + ) + + return {"status": "ok", "reason": "", "id": row_id, "row": row} diff --git a/scripts/regenerate_md.py b/scripts/regenerate_md.py new file mode 100644 index 0000000..c799b5d --- /dev/null +++ b/scripts/regenerate_md.py @@ -0,0 +1,240 @@ +"""Regenerate ``data/jurnal.md`` from ``data/jurnal.csv``. + +CSV is the source of truth (29 columns, schema owned by ``scripts.append_row``). +MD is a human-readable mirror with a curated 18-column table. + +CLI: ``python scripts/regenerate_md.py [csv_path] [md_path]`` +""" + +from __future__ import annotations + +import csv +import os +import sys +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Sequence + +from scripts.append_row import csv_columns + +__all__ = ["MD_COLUMNS", "regenerate_md", "main"] + + +MD_COLUMNS: tuple[str, ...] = ( + "#", + "Data", + "Zi", + "Ora RO", + "Set", + "Instrument", + "Direcție", + "Calitate", + "Entry", + "SL", + "TP0", + "TP1", + "TP2", + "outcome_path", + "P/L (Marius)", + "P/L (theoretic)", + "Source", + "Note", +) + + +_CSV_FIELDS_USED: tuple[str, ...] = ( + "id", + "data", + "zi", + "ora_ro", + "set", + "instrument", + "directie", + "calitate", + "entry", + "sl", + "tp0", + "tp1", + "tp2", + "outcome_path", + "pl_marius", + "pl_theoretical", + "source", + "note", +) + + +_DIRECTIE_DISPLAY = {"long": "Buy", "short": "Sell", "buy": "Buy", "sell": "Sell"} + + +def _fmt_pl(value: str) -> str: + if value is None or value == "": + return "pending" + try: + return f"{float(value):+.2f}" + except ValueError: + return value + + +def _fmt_directie(value: str) -> str: + if not value: + return "" + return _DIRECTIE_DISPLAY.get(value.strip().lower(), value) + + +def _escape_cell(value: str) -> str: + return (value or "").replace("|", "\\|").replace("\n", " ").strip() + + +def _placeholder_md() -> str: + return ( + "# Jurnal M2D (auto-generated)\n" + "\n" + "*Niciun trade încă. Adaugă unul prin `/m2d-log` sau `/backtest`.*\n" + ) + + +def _atomic_write_text(path: Path, content: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_name = tempfile.mkstemp( + prefix=path.name + ".", suffix=".tmp", dir=str(path.parent) + ) + try: + with os.fdopen(fd, "w", encoding="utf-8", newline="\n") as fh: + fh.write(content) + os.replace(tmp_name, path) + except Exception: + try: + os.unlink(tmp_name) + except OSError: + pass + raise + + +def _row_to_cells(row: dict[str, str], display_index: int) -> tuple[str, ...]: + g = row.get + return ( + str(display_index), + g("data", "") or "", + g("zi", "") or "", + g("ora_ro", "") or "", + g("set", "") or "", + g("instrument", "") or "", + _fmt_directie(g("directie", "") or ""), + g("calitate", "") or "", + g("entry", "") or "", + g("sl", "") or "", + g("tp0", "") or "", + g("tp1", "") or "", + g("tp2", "") or "", + g("outcome_path", "") or "", + _fmt_pl(g("pl_marius", "") or ""), + _fmt_pl(g("pl_theoretical", "") or ""), + g("source", "") or "", + g("note", "") or "", + ) + + +def _render_table(rows: Sequence[dict[str, str]]) -> str: + header_line = "| " + " | ".join(MD_COLUMNS) + " |" + sep_line = "|" + "|".join(["---"] * len(MD_COLUMNS)) + "|" + data_lines = [] + for i, row in enumerate(rows, start=1): + cells = _row_to_cells(row, i) + data_lines.append( + "| " + " | ".join(_escape_cell(c) for c in cells) + " |" + ) + return "\n".join([header_line, sep_line, *data_lines]) + + +def _render_md(rows: Sequence[dict[str, str]]) -> str: + if not rows: + return _placeholder_md() + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + table = _render_table(rows) + return ( + "# Jurnal M2D (auto-generated from data/jurnal.csv)\n" + "\n" + f"Generated: {now_iso}\n" + f"Rows: {len(rows)}\n" + "\n" + f"{table}\n" + "\n" + "*Vezi `data/jurnal.csv` pentru toate cele 29 coloane " + "(id, ora_utc, tf_*, risc_pct, be_moved, max_reached, versions, extracted_at).*\n" + ) + + +def _id_sort_key(raw: str) -> tuple[int, int | str]: + try: + return (0, int(raw)) + except (ValueError, TypeError): + return (1, raw or "") + + +def _load_rows(csv_path: Path) -> list[dict[str, str]]: + """Read CSV, returning rows sorted by id. + + Schema drift handling: + - Extra header columns → warning to stderr, dropped. + - Missing required header columns → warning to stderr per affected row (row skipped). + """ + if not csv_path.exists() or csv_path.stat().st_size == 0: + return [] + + expected = set(csv_columns()) + required = set(_CSV_FIELDS_USED) + + with csv_path.open("r", encoding="utf-8", newline="") as fh: + reader = csv.DictReader(fh) + header = reader.fieldnames or [] + header_set = set(header) + + extras = [c for c in header if c not in expected] + if extras: + print( + f"regenerate_md: warning: unknown CSV columns ignored: {extras}", + file=sys.stderr, + ) + + missing_required = required - header_set + rows: list[dict[str, str]] = [] + for raw in reader: + if missing_required: + print( + f"regenerate_md: warning: row skipped (missing required " + f"columns: {sorted(missing_required)})", + file=sys.stderr, + ) + continue + rows.append({k: (raw.get(k) or "") for k in required}) + + rows.sort(key=lambda r: _id_sort_key(r.get("id", ""))) + return rows + + +def regenerate_md( + csv_path: Path | str = "data/jurnal.csv", + md_path: Path | str = "data/jurnal.md", +) -> int: + """Read CSV → write MD atomically. Returns count of trade rows written.""" + csv_p = Path(csv_path) + md_p = Path(md_path) + rows = _load_rows(csv_p) + content = _render_md(rows) + _atomic_write_text(md_p, content) + return len(rows) + + +def main() -> int: + args = sys.argv[1:] + csv_arg = args[0] if len(args) >= 1 else "data/jurnal.csv" + md_arg = args[1] if len(args) >= 2 else "data/jurnal.md" + n = regenerate_md(csv_arg, md_arg) + print(f"regenerate_md: wrote {md_arg} with {n} row(s)") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/stats.py b/scripts/stats.py new file mode 100644 index 0000000..944cef3 --- /dev/null +++ b/scripts/stats.py @@ -0,0 +1,540 @@ +"""Backtest statistics for ``data/jurnal.csv``. + +Outputs: +- Overall + per-Set + per-calitate + per-instrument WR, expectancy. +- Wilson 95% CI for WR (closed form). +- Bootstrap percentile 95% CI for expectancy (deterministic via ``seed``). +- ``--calibration`` mode: joins ``manual_calibration`` rows with their + ``vision_calibration`` counterparts on ``screenshot_file`` and reports + field-by-field mismatch rates for the P4 gate (see ``STOPPING_RULE.md``). + +A "win" is any trade with ``pl_marius > 0``. Pending trades +(``pl_marius`` blank, i.e. ``outcome_path in {pending, TP0->pending}``) are +excluded from both WR and expectancy: there is no realised outcome yet. + +The ``calitate`` field is a known-biased descriptor (post-outcome +classification — see ``STOPPING_RULE.md`` §3). It is reported as +informational only and explicitly flagged as such; do NOT use it as a +filter for GO LIVE decisions. +""" + +from __future__ import annotations + +import argparse +import csv +import math +import random +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Iterable + +__all__ = [ + "CORE_CALIBRATION_FIELDS", + "BACKTEST_SOURCES", + "CALIBRATION_SOURCES", + "Trade", + "GroupStats", + "load_trades", + "wilson_ci", + "bootstrap_ci", + "win_rate", + "expectancy", + "group_by", + "compute_group_stats", + "calibration_mismatch", + "format_report", + "main", +] + + +# Fields compared in the calibration mismatch gate (STOPPING_RULE.md §P4). +CORE_CALIBRATION_FIELDS: tuple[str, ...] = ( + "entry", + "sl", + "tp0", + "tp1", + "tp2", + "outcome_path", + "max_reached", + "directie", +) + + +BACKTEST_SOURCES: frozenset[str] = frozenset({"vision", "manual"}) +CALIBRATION_SOURCES: frozenset[str] = frozenset( + {"manual_calibration", "vision_calibration"} +) + + +# --------------------------------------------------------------------------- +# Loading / typed access +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class Trade: + """One realised (or pending) trade row, typed.""" + + id: int + screenshot_file: str + source: str + data: str + zi: str + ora_ro: str + instrument: str + directie: str + calitate: str + set: str + outcome_path: str + max_reached: str + be_moved: bool + pl_marius: float | None + pl_theoretical: float + raw: dict[str, str] = field(default_factory=dict) + + @property + def is_pending(self) -> bool: + return self.pl_marius is None + + @property + def is_win(self) -> bool: + return self.pl_marius is not None and self.pl_marius > 0 + + +def _parse_optional_float(value: str) -> float | None: + s = (value or "").strip() + if s == "": + return None + return float(s) + + +def _parse_bool(value: str) -> bool: + return (value or "").strip().lower() in {"true", "1", "yes", "da"} + + +def _row_to_trade(row: dict[str, str]) -> Trade: + return Trade( + id=int(row.get("id") or 0), + screenshot_file=row.get("screenshot_file", ""), + source=row.get("source", ""), + data=row.get("data", ""), + zi=row.get("zi", ""), + ora_ro=row.get("ora_ro", ""), + instrument=row.get("instrument", ""), + directie=row.get("directie", ""), + calitate=row.get("calitate", ""), + set=row.get("set", ""), + outcome_path=row.get("outcome_path", ""), + max_reached=row.get("max_reached", ""), + be_moved=_parse_bool(row.get("be_moved", "")), + pl_marius=_parse_optional_float(row.get("pl_marius", "")), + pl_theoretical=float(row.get("pl_theoretical") or 0.0), + raw=dict(row), + ) + + +def load_trades(csv_path: Path | str) -> list[Trade]: + """Load all rows of ``csv_path`` as :class:`Trade` objects. + + Returns ``[]`` if the file does not exist or is empty. + """ + p = Path(csv_path) + if not p.exists() or p.stat().st_size == 0: + return [] + with p.open("r", encoding="utf-8", newline="") as fh: + reader = csv.DictReader(fh) + return [_row_to_trade(r) for r in reader] + + +# --------------------------------------------------------------------------- +# Statistics primitives +# --------------------------------------------------------------------------- + + +def wilson_ci(wins: int, n: int, z: float = 1.96) -> tuple[float, float]: + """Wilson score interval for a binomial proportion. + + Returns ``(lo, hi)`` as proportions in [0, 1]. For ``n == 0`` returns + ``(0.0, 0.0)``. ``z = 1.96`` corresponds to a 95% CI. + """ + if n <= 0: + return (0.0, 0.0) + if wins < 0 or wins > n: + raise ValueError(f"wins={wins} out of range for n={n}") + p_hat = wins / n + denom = 1.0 + (z * z) / n + center = p_hat + (z * z) / (2.0 * n) + half = z * math.sqrt((p_hat * (1.0 - p_hat) + (z * z) / (4.0 * n)) / n) + lo = (center - half) / denom + hi = (center + half) / denom + return (max(0.0, lo), min(1.0, hi)) + + +def bootstrap_ci( + values: list[float], + *, + iterations: int = 2000, + alpha: float = 0.05, + seed: int | None = None, +) -> tuple[float, float]: + """Percentile-method bootstrap CI for the mean of ``values``. + + Deterministic when ``seed`` is provided. Returns ``(lo, hi)``. For + ``len(values) < 2`` returns ``(mean, mean)``. + """ + if not values: + return (0.0, 0.0) + n = len(values) + mean = sum(values) / n + if n < 2 or iterations <= 0: + return (mean, mean) + + rng = random.Random(seed) + means: list[float] = [] + for _ in range(iterations): + s = 0.0 + for _ in range(n): + s += values[rng.randrange(n)] + means.append(s / n) + means.sort() + lo_idx = int(math.floor((alpha / 2.0) * iterations)) + hi_idx = int(math.ceil((1.0 - alpha / 2.0) * iterations)) - 1 + lo_idx = max(0, min(iterations - 1, lo_idx)) + hi_idx = max(0, min(iterations - 1, hi_idx)) + return (means[lo_idx], means[hi_idx]) + + +def win_rate(trades: Iterable[Trade]) -> tuple[int, int, float]: + """Return ``(wins, n_resolved, wr)`` ignoring pending trades.""" + resolved = [t for t in trades if not t.is_pending] + wins = sum(1 for t in resolved if t.is_win) + n = len(resolved) + wr = (wins / n) if n else 0.0 + return wins, n, wr + + +def expectancy(trades: Iterable[Trade], overlay: str = "pl_marius") -> float: + """Mean P/L (in R) over non-pending trades, on the given overlay.""" + if overlay not in {"pl_marius", "pl_theoretical"}: + raise ValueError(f"unknown overlay {overlay!r}") + if overlay == "pl_marius": + vals = [t.pl_marius for t in trades if t.pl_marius is not None] + else: + vals = [t.pl_theoretical for t in trades if not t.is_pending] + if not vals: + return 0.0 + return sum(vals) / len(vals) + + +# --------------------------------------------------------------------------- +# Group stats +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class GroupStats: + key: str + n_total: int + n_resolved: int + wins: int + wr: float + wr_ci_lo: float + wr_ci_hi: float + exp_marius: float + exp_marius_ci_lo: float + exp_marius_ci_hi: float + exp_theoretical: float + exp_theoretical_ci_lo: float + exp_theoretical_ci_hi: float + + +def group_by(trades: Iterable[Trade], field_name: str) -> dict[str, list[Trade]]: + out: dict[str, list[Trade]] = {} + for t in trades: + key = getattr(t, field_name, "") or "(blank)" + out.setdefault(key, []).append(t) + return out + + +def compute_group_stats( + trades: list[Trade], + *, + label: str, + bootstrap_iterations: int = 2000, + seed: int | None = None, +) -> GroupStats: + wins, n_resolved, wr = win_rate(trades) + wr_lo, wr_hi = wilson_ci(wins, n_resolved) + + pl_m_vals = [t.pl_marius for t in trades if t.pl_marius is not None] + exp_m = (sum(pl_m_vals) / len(pl_m_vals)) if pl_m_vals else 0.0 + exp_m_lo, exp_m_hi = bootstrap_ci( + pl_m_vals, iterations=bootstrap_iterations, seed=seed + ) + + pl_t_vals = [t.pl_theoretical for t in trades if not t.is_pending] + exp_t = (sum(pl_t_vals) / len(pl_t_vals)) if pl_t_vals else 0.0 + exp_t_lo, exp_t_hi = bootstrap_ci( + pl_t_vals, + iterations=bootstrap_iterations, + seed=None if seed is None else seed + 1, + ) + + return GroupStats( + key=label, + n_total=len(trades), + n_resolved=n_resolved, + wins=wins, + wr=wr, + wr_ci_lo=wr_lo, + wr_ci_hi=wr_hi, + exp_marius=exp_m, + exp_marius_ci_lo=exp_m_lo, + exp_marius_ci_hi=exp_m_hi, + exp_theoretical=exp_t, + exp_theoretical_ci_lo=exp_t_lo, + exp_theoretical_ci_hi=exp_t_hi, + ) + + +# --------------------------------------------------------------------------- +# Calibration mode +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class CalibrationReport: + pairs: int + field_mismatches: dict[str, int] + total_comparisons: int + + @property + def overall_mismatch_rate(self) -> float: + if self.total_comparisons == 0: + return 0.0 + total = sum(self.field_mismatches.values()) + return total / self.total_comparisons + + +def _normalise_for_compare(field_name: str, value: str) -> str: + s = (value or "").strip() + if field_name in {"entry", "sl", "tp0", "tp1", "tp2"}: + try: + return f"{float(s):.4f}" + except ValueError: + return s + return s + + +def calibration_mismatch( + trades: Iterable[Trade], + *, + fields: tuple[str, ...] = CORE_CALIBRATION_FIELDS, +) -> CalibrationReport: + """Pair ``manual_calibration`` and ``vision_calibration`` rows by + ``screenshot_file``, then count mismatches per ``fields``. + + Returns a :class:`CalibrationReport`. Unpaired calibration rows are + silently ignored — they cannot contribute to a comparison. + """ + manual: dict[str, Trade] = {} + vision: dict[str, Trade] = {} + for t in trades: + if t.source == "manual_calibration": + manual[t.screenshot_file] = t + elif t.source == "vision_calibration": + vision[t.screenshot_file] = t + + paired_files = sorted(set(manual) & set(vision)) + field_mismatches: dict[str, int] = {f: 0 for f in fields} + for f in paired_files: + m = manual[f] + v = vision[f] + for fld in fields: + mv = _normalise_for_compare(fld, m.raw.get(fld, "")) + vv = _normalise_for_compare(fld, v.raw.get(fld, "")) + if mv != vv: + field_mismatches[fld] += 1 + + total_comparisons = len(paired_files) * len(fields) + return CalibrationReport( + pairs=len(paired_files), + field_mismatches=field_mismatches, + total_comparisons=total_comparisons, + ) + + +# --------------------------------------------------------------------------- +# Reporting +# --------------------------------------------------------------------------- + + +def _fmt_pct(p: float) -> str: + return f"{100.0 * p:5.1f}%" + + +def _fmt_r(x: float) -> str: + return f"{x:+.3f}R" + + +def _fmt_stats_row(s: GroupStats) -> str: + return ( + f"{s.key:<14} N={s.n_total:>3} (resolved {s.n_resolved:>3}) " + f"WR={_fmt_pct(s.wr)} [{_fmt_pct(s.wr_ci_lo)}, {_fmt_pct(s.wr_ci_hi)}] " + f"E_marius={_fmt_r(s.exp_marius)} " + f"[{_fmt_r(s.exp_marius_ci_lo)}, {_fmt_r(s.exp_marius_ci_hi)}] " + f"E_theor={_fmt_r(s.exp_theoretical)}" + ) + + +def format_report( + trades: list[Trade], + *, + bootstrap_iterations: int = 2000, + seed: int | None = None, +) -> str: + """Render the main stats report. + + Only ``source in {vision, manual}`` rows are included in the WR / + expectancy computations; calibration rows are reported separately via + ``--calibration``. + """ + backtest = [t for t in trades if t.source in BACKTEST_SOURCES] + lines: list[str] = [] + lines.append("=== M2D Backtest Stats ===") + lines.append(f"Backtest rows: {len(backtest)} (calibration excluded)") + lines.append("") + + if not backtest: + lines.append("(no backtest trades yet)") + return "\n".join(lines) + + overall = compute_group_stats( + backtest, + label="OVERALL", + bootstrap_iterations=bootstrap_iterations, + seed=seed, + ) + lines.append("-- Overall --") + lines.append(_fmt_stats_row(overall)) + lines.append("") + + def _emit_group(title: str, field_name: str, key_order: list[str] | None = None) -> None: + lines.append(f"-- By {title} --") + groups = group_by(backtest, field_name) + keys = key_order if key_order is not None else sorted(groups) + for k in keys: + if k not in groups: + continue + sub_seed = None if seed is None else seed + abs(hash(k)) % 10_000 + s = compute_group_stats( + groups[k], + label=k, + bootstrap_iterations=bootstrap_iterations, + seed=sub_seed, + ) + lines.append(_fmt_stats_row(s)) + lines.append("") + + _emit_group( + "Set", + "set", + key_order=["A1", "A2", "A3", "B", "C", "D", "Other"], + ) + _emit_group("Instrument", "instrument") + lines.append( + "[!] By calitate — descriptor only (post-outcome, biased; do not use " + "as a GO LIVE filter — see STOPPING_RULE.md §3)." + ) + _emit_group( + "calitate", + "calitate", + key_order=["Clară", "Mai mare ca impuls", "Slabă", "n/a"], + ) + + return "\n".join(lines).rstrip() + "\n" + + +def format_calibration_report(trades: list[Trade]) -> str: + cal = calibration_mismatch(trades) + lines: list[str] = [] + lines.append("=== Calibration P4 gate ===") + lines.append(f"Paired screenshots (manual ∩ vision): {cal.pairs}") + if cal.pairs == 0: + lines.append("(no calibration pairs yet)") + return "\n".join(lines) + "\n" + + lines.append("") + lines.append(f"{'field':<14} mismatches / pairs rate") + for fld in CORE_CALIBRATION_FIELDS: + m = cal.field_mismatches.get(fld, 0) + rate = (m / cal.pairs) if cal.pairs else 0.0 + lines.append(f"{fld:<14} {m:>3} / {cal.pairs:<3} {_fmt_pct(rate)}") + lines.append("") + lines.append( + f"Overall mismatch rate: {_fmt_pct(cal.overall_mismatch_rate)} " + f"({sum(cal.field_mismatches.values())} of {cal.total_comparisons} comparisons)" + ) + threshold = 0.10 + verdict = "PASS" if cal.overall_mismatch_rate <= threshold else "FAIL" + lines.append(f"P4 gate (<= 10%): {verdict}") + return "\n".join(lines) + "\n" + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="stats", + description="Backtest statistics for data/jurnal.csv", + ) + parser.add_argument( + "--csv", + type=Path, + default=Path("data/jurnal.csv"), + help="Path to the jurnal CSV (default: data/jurnal.csv).", + ) + parser.add_argument( + "--calibration", + action="store_true", + help="Show P4 calibration mismatch report instead of backtest stats.", + ) + parser.add_argument( + "--bootstrap-iterations", + type=int, + default=2000, + help="Bootstrap iterations for expectancy CI (default: 2000).", + ) + parser.add_argument( + "--seed", + type=int, + default=None, + help="Seed for the bootstrap RNG (set for deterministic output).", + ) + args = parser.parse_args(argv) + + trades = load_trades(args.csv) + if args.calibration: + out = format_calibration_report(trades) + else: + out = format_report( + trades, + bootstrap_iterations=args.bootstrap_iterations, + seed=args.seed, + ) + # Force UTF-8 on stdout: the report contains diacritics ("Clară", "Slabă") + # and a console codepage like cp1252 would crash on those. + try: + sys.stdout.reconfigure(encoding="utf-8") # type: ignore[attr-defined] + except (AttributeError, OSError): + pass + sys.stdout.write(out) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_append_row.py b/tests/test_append_row.py index 3859ca0..7139a30 100644 --- a/tests/test_append_row.py +++ b/tests/test_append_row.py @@ -1,26 +1,26 @@ -"""Tests for scripts/append_row.py.""" +"""Tests for scripts/append_row.py — append_extraction pipeline.""" from __future__ import annotations import csv import json +import re import sys +from datetime import datetime from pathlib import Path import pytest -from pydantic import ValidationError +import yaml sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from scripts.append_row import ( # noqa: E402 CSV_COLUMNS, VALID_SOURCES, - append_row, - append_row_from_json, - build_row, - read_rows, + ZI_RO_MAP, + append_extraction, + csv_columns, ) -from scripts.vision_schema import parse_extraction_dict # noqa: E402 REPO_ROOT = Path(__file__).resolve().parent.parent @@ -29,12 +29,12 @@ META_PATH = REPO_ROOT / "data" / "_meta.yaml" # --------------------------------------------------------------------------- -# fixtures / payload helpers +# helpers / fixtures # --------------------------------------------------------------------------- def _buy_payload(**overrides) -> dict: - # 2026-05-13 14:23 UTC == 17:23 RO (EEST, Wed) → Set A2. + # 2026-05-13 14:23 UTC == 17:23 RO (EEST, Wed) → set A2, zi=Mi. base = { "screenshot_file": "dia-2026-05-13-1.png", "data": "2026-05-13", @@ -61,198 +61,227 @@ def _buy_payload(**overrides) -> dict: return base +def _write_payload(tmp_path: Path, name: str, **overrides) -> Path: + p = tmp_path / name + p.write_text(json.dumps(_buy_payload(**overrides)), encoding="utf-8") + return p + + +def _read_rows(csv_path: Path) -> list[dict[str, str]]: + with csv_path.open("r", encoding="utf-8", newline="") as fh: + return list(csv.DictReader(fh)) + + @pytest.fixture def csv_path(tmp_path: Path) -> Path: - return tmp_path / "trades.csv" + return tmp_path / "jurnal.csv" # --------------------------------------------------------------------------- -# build_row — computed fields +# schema / column layout # --------------------------------------------------------------------------- -class TestBuildRow: - def setup_method(self) -> None: - import yaml - with META_PATH.open("r", encoding="utf-8") as fh: - self.meta = yaml.safe_load(fh) - from scripts.calendar_parse import load_calendar - self.calendar = load_calendar(CALENDAR_PATH) - - def test_happy_path_computed_fields(self) -> None: - extr = parse_extraction_dict(_buy_payload()) - row = build_row(extr, "manual", self.meta, self.calendar) - # 14:23 UTC on 2026-05-13 = 17:23 RO (EEST), Wed → A2 - assert row["ora_ro"] == "17:23" - assert row["zi"] == "Wed" - assert row["set"] == "A2" - # pl_marius for TP0->TP1 with be_moved=True is +0.50R - assert float(row["pl_marius"]) == pytest.approx(0.50) - # pl_theoretical for max_reached=TP1 is 0.333 - assert float(row["pl_theoretical"]) == pytest.approx(0.333) - # version stamps copied from meta - assert row["indicator_version"] == str(self.meta["indicator_version"]) - assert row["pl_overlay_version"] == str(self.meta["pl_overlay_version"]) - assert row["csv_schema_version"] == str(self.meta["csv_schema_version"]) - - def test_pending_overlay_is_blank(self) -> None: - extr = parse_extraction_dict( - _buy_payload(outcome_path="pending", max_reached="TP0") - ) - row = build_row(extr, "vision", self.meta, self.calendar) - # pl_marius returns None for pending → empty string in CSV - assert row["pl_marius"] == "" - # pl_theoretical always concrete - assert row["pl_theoretical"] != "" - - def test_invalid_source_rejected(self) -> None: - extr = parse_extraction_dict(_buy_payload()) - with pytest.raises(ValueError): - build_row(extr, "auto_magic", self.meta, self.calendar) - - def test_all_valid_sources_accepted(self) -> None: - extr = parse_extraction_dict(_buy_payload()) - for s in VALID_SOURCES: - row = build_row(extr, s, self.meta, self.calendar) - assert row["source"] == s +def test_csv_columns_canonical_29() -> None: + cols = csv_columns() + assert len(cols) == 29 + assert cols[0] == "id" + assert cols[-1] == "note" + assert cols == list(CSV_COLUMNS) # --------------------------------------------------------------------------- -# append_row — happy path, dedup, atomic writes +# core tests as specified in task #9 # --------------------------------------------------------------------------- -class TestAppendRow: - def test_happy_path_writes_header_and_row(self, csv_path: Path) -> None: - extr = parse_extraction_dict(_buy_payload()) - row = append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) - assert csv_path.exists() +def test_happy_path(tmp_path: Path, csv_path: Path) -> None: + j = _write_payload(tmp_path, "t.json") + result = append_extraction( + j, "vision", csv_path, META_PATH, CALENDAR_PATH + ) + assert result["status"] == "ok", result + assert result["reason"] == "" + assert result["id"] == 1 - with csv_path.open("r", encoding="utf-8", newline="") as fh: - reader = csv.DictReader(fh) - assert reader.fieldnames == list(CSV_COLUMNS) - rows = list(reader) - assert len(rows) == 1 - assert rows[0]["screenshot_file"] == row["screenshot_file"] - assert rows[0]["set"] == "A2" - assert rows[0]["source"] == "manual" + rows = _read_rows(csv_path) + assert len(rows) == 1 + r = rows[0] + assert r["id"] == "1" + assert r["screenshot_file"] == "dia-2026-05-13-1.png" + assert r["source"] == "vision" + assert r["data"] == "2026-05-13" + assert r["zi"] == "Mi" + assert r["ora_ro"] == "17:23" + assert r["ora_utc"] == "14:23" + assert r["set"] == "A2" + assert r["instrument"] == "DIA" + assert r["directie"] == "Buy" + assert r["be_moved"] == "True" - def test_two_distinct_rows(self, csv_path: Path) -> None: - e1 = parse_extraction_dict(_buy_payload(screenshot_file="a.png")) - e2 = parse_extraction_dict(_buy_payload(screenshot_file="b.png")) - append_row(e1, "manual", csv_path, META_PATH, CALENDAR_PATH) - append_row(e2, "manual", csv_path, META_PATH, CALENDAR_PATH) - rows = read_rows(csv_path) - assert len(rows) == 2 - assert {r["screenshot_file"] for r in rows} == {"a.png", "b.png"} - def test_dedup_raises(self, csv_path: Path) -> None: - extr = parse_extraction_dict(_buy_payload()) - append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) - with pytest.raises(ValueError, match="duplicate"): - append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) - # CSV still contains exactly the one row - assert len(read_rows(csv_path)) == 1 +def test_pl_calc_overlay(tmp_path: Path, csv_path: Path) -> None: + """outcome_path=TP0->TP1, max_reached=TP1 → pl_marius=0.5, pl_theoretical=0.333.""" + j = _write_payload(tmp_path, "t.json") + result = append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + assert result["status"] == "ok" + r = _read_rows(csv_path)[0] + assert float(r["pl_marius"]) == pytest.approx(0.50) + assert float(r["pl_theoretical"]) == pytest.approx(0.333) - def test_dedup_skip(self, csv_path: Path) -> None: - extr = parse_extraction_dict(_buy_payload()) - first = append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) - # Mutate the extraction; the existing row should be returned untouched. - extr2 = parse_extraction_dict(_buy_payload(note="changed")) - existing = append_row( - extr2, "manual", csv_path, META_PATH, CALENDAR_PATH, on_duplicate="skip" - ) - assert existing["note"] == first["note"] == "" - assert len(read_rows(csv_path)) == 1 - def test_calibration_coexistence(self, csv_path: Path) -> None: - """manual_calibration + vision_calibration on the SAME screenshot must coexist.""" - extr = parse_extraction_dict(_buy_payload()) - append_row(extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH) - # Vision leg may differ slightly — change entry by 0.1, still valid. - extr_vision = parse_extraction_dict( - _buy_payload(entry=400.1, confidence="medium") - ) - append_row( - extr_vision, "vision_calibration", csv_path, META_PATH, CALENDAR_PATH - ) +def test_dedup_same_source(tmp_path: Path, csv_path: Path) -> None: + j = _write_payload(tmp_path, "t.json") + r1 = append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + r2 = append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + assert r1["status"] == "ok" + assert r2["status"] == "rejected" + assert "duplicate" in r2["reason"].lower() + assert r2["id"] is None + assert r2["row"] is None + assert len(_read_rows(csv_path)) == 1 - rows = read_rows(csv_path) - assert len(rows) == 2 - sources = {r["source"] for r in rows} - assert sources == {"manual_calibration", "vision_calibration"} - # Same screenshot, different source ⇒ no dedup collision. - files = {r["screenshot_file"] for r in rows} - assert files == {extr.screenshot_file} - def test_calibration_duplicate_same_source_rejected( - self, csv_path: Path - ) -> None: - extr = parse_extraction_dict(_buy_payload()) - append_row(extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH) - with pytest.raises(ValueError, match="duplicate"): - append_row( - extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH - ) +def test_dedup_different_source_ok(tmp_path: Path, csv_path: Path) -> None: + """Same screenshot_file + different source ⇒ both rows accepted.""" + j = _write_payload(tmp_path, "t.json") + r1 = append_extraction( + j, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH + ) + r2 = append_extraction( + j, "vision_calibration", csv_path, META_PATH, CALENDAR_PATH + ) + assert r1["status"] == "ok" + assert r2["status"] == "ok" + rows = _read_rows(csv_path) + assert len(rows) == 2 + assert {r["source"] for r in rows} == {"manual_calibration", "vision_calibration"} + # Distinct sequential ids. + assert {r["id"] for r in rows} == {"1", "2"} + + +def test_invalid_pydantic_rejected(tmp_path: Path, csv_path: Path) -> None: + """entry == sl is rejected by pydantic; no CSV is written.""" + j = _write_payload(tmp_path, "bad.json", entry=399.0, sl=399.0) + result = append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + assert result["status"] == "rejected" + assert "validation" in result["reason"].lower() + assert not csv_path.exists() + + +def test_missing_json_file(tmp_path: Path, csv_path: Path) -> None: + missing = tmp_path / "ghost.json" + result = append_extraction( + missing, "vision", csv_path, META_PATH, CALENDAR_PATH + ) + assert result["status"] == "rejected" + assert "not found" in result["reason"].lower() + assert not csv_path.exists() + + +def test_id_increments(tmp_path: Path, csv_path: Path) -> None: + paths = [ + _write_payload(tmp_path, "a.json", screenshot_file="a.png"), + _write_payload(tmp_path, "b.json", screenshot_file="b.png"), + _write_payload(tmp_path, "c.json", screenshot_file="c.png"), + ] + ids = [] + for p in paths: + r = append_extraction(p, "vision", csv_path, META_PATH, CALENDAR_PATH) + assert r["status"] == "ok" + ids.append(r["id"]) + assert ids == [1, 2, 3] + csv_ids = [int(r["id"]) for r in _read_rows(csv_path)] + assert csv_ids == [1, 2, 3] + + +def test_set_a2(tmp_path: Path, csv_path: Path) -> None: + """Wed 2026-05-13 14:30 UTC → 17:30 RO → A2 sweet spot.""" + j = _write_payload(tmp_path, "t.json", ora_utc="14:30") + r = append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + assert r["status"] == "ok" + row = _read_rows(csv_path)[0] + assert row["ora_ro"] == "17:30" + assert row["zi"] == "Mi" + assert row["set"] == "A2" + + +def test_set_c_fomc(tmp_path: Path, csv_path: Path) -> None: + """2026-04-29 18:35 UTC == 21:35 RO (FOMC Powell Press window) → Set C.""" + j = _write_payload( + tmp_path, + "t.json", + data="2026-04-29", + ora_utc="18:35", + screenshot_file="fomc-apr.png", + ) + r = append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + assert r["status"] == "ok" + row = _read_rows(csv_path)[0] + assert row["ora_ro"] == "21:35" + assert row["set"] == "C" + + +def test_versions_stamped(tmp_path: Path, csv_path: Path) -> None: + j = _write_payload(tmp_path, "t.json") + append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + row = _read_rows(csv_path)[0] + meta = yaml.safe_load(META_PATH.read_text(encoding="utf-8")) + assert row["indicator_version"] == str(meta["indicator_version"]) + assert row["pl_overlay_version"] == str(meta["pl_overlay_version"]) + assert row["csv_schema_version"] == str(meta["csv_schema_version"]) + + +def test_extracted_at_format(tmp_path: Path, csv_path: Path) -> None: + j = _write_payload(tmp_path, "t.json") + append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + val = _read_rows(csv_path)[0]["extracted_at"] + # ISO 8601 UTC with trailing 'Z': YYYY-MM-DDTHH:MM:SSZ + assert re.match(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$", val), val + # Round-trip through datetime.fromisoformat after dropping the Z. + parsed = datetime.fromisoformat(val[:-1]) + assert parsed.year >= 2026 # --------------------------------------------------------------------------- -# Cross-field invalid input +# additional safety nets # --------------------------------------------------------------------------- -class TestInvalidInput: - def test_buy_with_inverted_tp_rejected_before_append( - self, csv_path: Path - ) -> None: - # tp1 < tp0 violates Buy ordering: caught at validation, not by append_row. - with pytest.raises(ValidationError): - parse_extraction_dict( - _buy_payload(tp0=401.0, tp1=400.5, tp2=402.0) - ) - assert not csv_path.exists() # nothing written - - def test_outcome_path_sl_with_tp1_max_rejected(self, csv_path: Path) -> None: - with pytest.raises(ValidationError): - parse_extraction_dict( - _buy_payload(outcome_path="SL", max_reached="TP1") - ) - assert not csv_path.exists() - - def test_append_row_from_json_invalid_payload( - self, tmp_path: Path, csv_path: Path - ) -> None: - bad = tmp_path / "bad.json" - payload = _buy_payload(directie="Long") # invalid Literal - bad.write_text(json.dumps(payload), encoding="utf-8") - with pytest.raises(ValidationError): - append_row_from_json( - bad, "vision", csv_path, META_PATH, CALENDAR_PATH - ) - assert not csv_path.exists() +def test_invalid_source_rejected(tmp_path: Path, csv_path: Path) -> None: + j = _write_payload(tmp_path, "t.json") + r = append_extraction(j, "auto_magic", csv_path, META_PATH, CALENDAR_PATH) + assert r["status"] == "rejected" + assert "source" in r["reason"].lower() + assert not csv_path.exists() -# --------------------------------------------------------------------------- -# Atomic write: no temp file remains on disk -# --------------------------------------------------------------------------- +def test_all_valid_sources_accepted(tmp_path: Path, csv_path: Path) -> None: + for i, src in enumerate(sorted(VALID_SOURCES)): + j = _write_payload(tmp_path, f"t{i}.json", screenshot_file=f"s{i}.png") + r = append_extraction(j, src, csv_path, META_PATH, CALENDAR_PATH) + assert r["status"] == "ok", (src, r) + rows = _read_rows(csv_path) + assert {r["source"] for r in rows} == set(VALID_SOURCES) -class TestAtomicWrite: - def test_no_temp_file_left_behind(self, csv_path: Path) -> None: - extr = parse_extraction_dict(_buy_payload()) - append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) - leftovers = [ - p for p in csv_path.parent.iterdir() if p.name.endswith(".tmp") - ] - assert leftovers == [] +def test_atomic_write_leaves_no_tmp(tmp_path: Path, csv_path: Path) -> None: + j = _write_payload(tmp_path, "t.json") + append_extraction(j, "vision", csv_path, META_PATH, CALENDAR_PATH) + leftovers = [p for p in csv_path.parent.iterdir() if p.name.endswith(".tmp")] + assert leftovers == [] - def test_append_row_from_json_roundtrip( - self, tmp_path: Path, csv_path: Path - ) -> None: - good = tmp_path / "good.json" - good.write_text(json.dumps(_buy_payload()), encoding="utf-8") - row = append_row_from_json( - good, "vision", csv_path, META_PATH, CALENDAR_PATH - ) - assert row["source"] == "vision" - assert read_rows(csv_path)[0]["screenshot_file"] == row["screenshot_file"] + +def test_zi_ro_map_covers_all_weekdays() -> None: + """Internal sanity: the Romanian-day map covers all 7 short weekday names.""" + assert set(ZI_RO_MAP.keys()) == {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"} + assert set(ZI_RO_MAP.values()) == {"Lu", "Ma", "Mi", "Jo", "Vi", "Sa", "Du"} + + +def test_malformed_json_rejected(tmp_path: Path, csv_path: Path) -> None: + bad = tmp_path / "broken.json" + bad.write_text("{not valid json", encoding="utf-8") + r = append_extraction(bad, "vision", csv_path, META_PATH, CALENDAR_PATH) + assert r["status"] == "rejected" + assert "validation" in r["reason"].lower() or "json" in r["reason"].lower() + assert not csv_path.exists() diff --git a/tests/test_regenerate_md.py b/tests/test_regenerate_md.py new file mode 100644 index 0000000..a9f0f6b --- /dev/null +++ b/tests/test_regenerate_md.py @@ -0,0 +1,208 @@ +"""Tests for scripts/regenerate_md.py.""" + +from __future__ import annotations + +import csv +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from scripts.append_row import csv_columns # noqa: E402 +from scripts.regenerate_md import MD_COLUMNS, regenerate_md # noqa: E402 + + +def _row(**overrides: str) -> dict[str, str]: + base = { + "id": "1", + "screenshot_file": "2026-05-13_dia_5min.png", + "source": "vision", + "data": "2026-05-13", + "zi": "Mi", + "ora_ro": "17:23", + "ora_utc": "14:23", + "instrument": "DIA", + "directie": "long", + "tf_mare": "5min", + "tf_mic": "1min", + "calitate": "Clară", + "entry": "497.42", + "sl": "496.80", + "tp0": "497.67", + "tp1": "497.79", + "tp2": "498.04", + "risc_pct": "0.50", + "outcome_path": "TP0→TP1", + "max_reached": "TP1", + "be_moved": "true", + "pl_marius": "0.5000", + "pl_theoretical": "0.3330", + "set": "A2", + "indicator_version": "1", + "pl_overlay_version": "1", + "csv_schema_version": "1", + "extracted_at": "2026-05-13T14:30:00Z", + "note": "", + } + base.update(overrides) + return base + + +def _write_csv( + path: Path, + rows: list[dict[str, str]], + extra_columns: list[str] | None = None, +) -> None: + fieldnames = csv_columns() + if extra_columns: + fieldnames = fieldnames + extra_columns + with path.open("w", encoding="utf-8", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=fieldnames) + writer.writeheader() + for r in rows: + writer.writerow({k: r.get(k, "") for k in fieldnames}) + + +def _data_lines(md_text: str) -> list[str]: + header_prefix = "| " + MD_COLUMNS[0] + " | " + MD_COLUMNS[1] + return [ + ln + for ln in md_text.splitlines() + if ln.startswith("|") + and not ln.startswith(header_prefix) + and not ln.startswith("|---") + ] + + +def test_empty_csv_placeholder(tmp_path: Path) -> None: + csv_p = tmp_path / "jurnal.csv" + md_p = tmp_path / "jurnal.md" + _write_csv(csv_p, []) + + n = regenerate_md(csv_p, md_p) + + assert n == 0 + content = md_p.read_text(encoding="utf-8") + assert "# Jurnal M2D (auto-generated)" in content + assert "Niciun trade încă" in content + assert "| # |" not in content + + +def test_missing_csv_placeholder(tmp_path: Path) -> None: + csv_p = tmp_path / "does_not_exist.csv" + md_p = tmp_path / "jurnal.md" + + n = regenerate_md(csv_p, md_p) + + assert n == 0 + content = md_p.read_text(encoding="utf-8") + assert "Niciun trade încă" in content + assert md_p.exists() + + +def test_single_row_format(tmp_path: Path) -> None: + csv_p = tmp_path / "jurnal.csv" + md_p = tmp_path / "jurnal.md" + _write_csv(csv_p, [_row()]) + + n = regenerate_md(csv_p, md_p) + + assert n == 1 + content = md_p.read_text(encoding="utf-8") + assert "# Jurnal M2D (auto-generated from data/jurnal.csv)" in content + assert "Rows: 1" in content + header_line = "| " + " | ".join(MD_COLUMNS) + " |" + assert header_line in content + rows = _data_lines(content) + assert len(rows) == 1 + cells = [c.strip() for c in rows[0].strip("|").split("|")] + assert cells[0] == "1" + assert cells[1] == "2026-05-13" + assert cells[2] == "Mi" + assert cells[3] == "17:23" + assert cells[4] == "A2" + assert cells[5] == "DIA" + assert cells[6] == "Buy" + assert cells[7] == "Clară" + assert cells[13] == "TP0→TP1" + assert cells[14] == "+0.50" + assert cells[15] == "+0.33" + assert cells[16] == "vision" + + +def test_three_rows(tmp_path: Path) -> None: + csv_p = tmp_path / "jurnal.csv" + md_p = tmp_path / "jurnal.md" + rows = [ + _row(id="3", data="2026-05-15", pl_marius="-1.0000"), + _row(id="1", data="2026-05-13"), + _row(id="2", data="2026-05-14", pl_marius="0.2000"), + ] + _write_csv(csv_p, rows) + + n = regenerate_md(csv_p, md_p) + + assert n == 3 + content = md_p.read_text(encoding="utf-8") + assert "Rows: 3" in content + data = _data_lines(content) + assert len(data) == 3 + assert "| 1 | 2026-05-13 |" in data[0] + assert "| 2 | 2026-05-14 |" in data[1] + assert "| 3 | 2026-05-15 |" in data[2] + + +def test_pending_pl_displayed(tmp_path: Path) -> None: + csv_p = tmp_path / "jurnal.csv" + md_p = tmp_path / "jurnal.md" + _write_csv(csv_p, [_row(pl_marius="", pl_theoretical="")]) + + n = regenerate_md(csv_p, md_p) + + assert n == 1 + content = md_p.read_text(encoding="utf-8") + rows = _data_lines(content) + cells = [c.strip() for c in rows[0].strip("|").split("|")] + assert cells[14] == "pending" + assert cells[15] == "pending" + + +def test_unknown_column_graceful( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + csv_p = tmp_path / "jurnal.csv" + md_p = tmp_path / "jurnal.md" + _write_csv(csv_p, [_row()], extra_columns=["extra_field"]) + + n = regenerate_md(csv_p, md_p) + + assert n == 1 + content = md_p.read_text(encoding="utf-8") + assert "Rows: 1" in content + captured = capsys.readouterr() + assert "unknown CSV columns ignored" in captured.err + assert "extra_field" in captured.err + + +def test_atomic_write_no_tmp_leftover(tmp_path: Path) -> None: + csv_p = tmp_path / "jurnal.csv" + md_p = tmp_path / "jurnal.md" + _write_csv(csv_p, [_row()]) + + regenerate_md(csv_p, md_p) + + leftovers = list(tmp_path.glob("*.tmp")) + assert leftovers == [] + assert md_p.exists() + + +def test_rows_count_returned(tmp_path: Path) -> None: + csv_p = tmp_path / "jurnal.csv" + md_p = tmp_path / "jurnal.md" + _write_csv(csv_p, [_row(id=str(i)) for i in range(1, 6)]) + + n = regenerate_md(csv_p, md_p) + + assert n == 5 diff --git a/tests/test_stats.py b/tests/test_stats.py new file mode 100644 index 0000000..0de1d07 --- /dev/null +++ b/tests/test_stats.py @@ -0,0 +1,469 @@ +"""Tests for scripts/stats.py.""" + +from __future__ import annotations + +import csv +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from scripts.append_row import CSV_COLUMNS # noqa: E402 +from scripts.stats import ( # noqa: E402 + BACKTEST_SOURCES, + CORE_CALIBRATION_FIELDS, + bootstrap_ci, + calibration_mismatch, + compute_group_stats, + expectancy, + format_calibration_report, + format_report, + group_by, + load_trades, + main, + win_rate, + wilson_ci, +) + + +# --------------------------------------------------------------------------- +# Synthetic CSV fixture: 30 trades +# --------------------------------------------------------------------------- + + +def _base_row(**overrides) -> dict[str, str]: + base = { + "id": "0", + "screenshot_file": "", + "source": "vision", + "data": "2026-05-13", + "zi": "Mi", + "ora_ro": "17:30", + "ora_utc": "14:30", + "instrument": "DIA", + "directie": "Buy", + "tf_mare": "5min", + "tf_mic": "1min", + "calitate": "Clară", + "entry": "400.0", + "sl": "399.0", + "tp0": "400.5", + "tp1": "401.0", + "tp2": "402.0", + "risc_pct": "0.25", + "outcome_path": "TP0→TP1", + "max_reached": "TP1", + "be_moved": "True", + "pl_marius": "0.5000", + "pl_theoretical": "0.3330", + "set": "A2", + "indicator_version": "v-2026-05", + "pl_overlay_version": "marius-v1", + "csv_schema_version": "1", + "extracted_at": "2026-05-13T10:00:00Z", + "note": "", + } + base.update({k: str(v) for k, v in overrides.items()}) + return base + + +def _write_csv(path: Path, rows: list[dict[str, str]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8", newline="") as fh: + w = csv.DictWriter(fh, fieldnames=list(CSV_COLUMNS)) + w.writeheader() + for r in rows: + w.writerow({k: r.get(k, "") for k in CSV_COLUMNS}) + + +def _synthetic_30(tmp_path: Path) -> Path: + """30 vision-source trades engineered for known stats. + + Layout (by Set): + - A1: 10 trades — 6 wins TP0->TP1 (+0.5), 4 losses SL (-1.0) → WR 60% + - A2: 10 trades — 7 wins TP0->TP2 (+0.5), 3 losses SL (-1.0) → WR 70% + - A3: 10 trades — 4 wins TP0->TP1 (+0.5), 6 losses SL (-1.0) → WR 40% + + Overall: 17 wins / 30, WR ≈ 56.67%. + """ + rows: list[dict[str, str]] = [] + rid = 0 + + def add(set_label: str, n_win: int, n_loss: int, calitate: str = "Clară") -> None: + nonlocal rid + for _ in range(n_win): + rid += 1 + rows.append( + _base_row( + id=rid, + screenshot_file=f"win-{rid}.png", + set=set_label, + calitate=calitate, + outcome_path="TP0→TP1", + max_reached="TP1", + be_moved="True", + pl_marius="0.5000", + pl_theoretical="0.3330", + ) + ) + for _ in range(n_loss): + rid += 1 + rows.append( + _base_row( + id=rid, + screenshot_file=f"loss-{rid}.png", + set=set_label, + calitate=calitate, + outcome_path="SL", + max_reached="SL_first", + be_moved="False", + pl_marius="-1.0000", + pl_theoretical="-1.0000", + ) + ) + + add("A1", 6, 4) + add("A2", 7, 3) + add("A3", 4, 6) + + path = tmp_path / "jurnal.csv" + _write_csv(path, rows) + return path + + +# --------------------------------------------------------------------------- +# Wilson CI — reference values +# --------------------------------------------------------------------------- + + +class TestWilsonCI: + def test_n_zero(self) -> None: + assert wilson_ci(0, 0) == (0.0, 0.0) + + def test_50pct_at_n40(self) -> None: + lo, hi = wilson_ci(20, 40) + assert lo == pytest.approx(0.3519927879709976, abs=1e-9) + assert hi == pytest.approx(0.6480072120290024, abs=1e-9) + + def test_55pct_at_n40(self) -> None: + lo, hi = wilson_ci(22, 40) + assert lo == pytest.approx(0.3982882988844078, abs=1e-9) + assert hi == pytest.approx(0.6929492471905531, abs=1e-9) + + def test_55pct_at_n100(self) -> None: + # Larger N tightens the CI; lower bound rises above 45%. + lo, hi = wilson_ci(55, 100) + assert lo == pytest.approx(0.4524442703164345, abs=1e-9) + assert hi == pytest.approx(0.6438562489359655, abs=1e-9) + assert lo > 0.45 # STOPPING_RULE GO-LIVE gate + + def test_zero_wins(self) -> None: + lo, hi = wilson_ci(0, 10) + assert lo == pytest.approx(0.0, abs=1e-12) + assert hi == pytest.approx(0.2775401687666165, abs=1e-9) + + def test_all_wins(self) -> None: + lo, hi = wilson_ci(10, 10) + assert lo == pytest.approx(0.7224598312333834, abs=1e-9) + assert hi == pytest.approx(1.0, abs=1e-12) + + def test_wins_out_of_range(self) -> None: + with pytest.raises(ValueError): + wilson_ci(11, 10) + with pytest.raises(ValueError): + wilson_ci(-1, 10) + + +# --------------------------------------------------------------------------- +# Bootstrap CI — determinism + sanity +# --------------------------------------------------------------------------- + + +class TestBootstrapCI: + def test_deterministic_with_seed(self) -> None: + vals = [0.5, -1.0, 0.5, 0.5, -1.0, 0.2, -0.3, 0.5, -1.0, 0.5] + lo1, hi1 = bootstrap_ci(vals, iterations=500, seed=42) + lo2, hi2 = bootstrap_ci(vals, iterations=500, seed=42) + assert (lo1, hi1) == (lo2, hi2) + + def test_different_seed_different_result(self) -> None: + vals = [0.5, -1.0, 0.5, 0.5, -1.0, 0.2, -0.3, 0.5, -1.0, 0.5] + r1 = bootstrap_ci(vals, iterations=500, seed=1) + r2 = bootstrap_ci(vals, iterations=500, seed=2) + assert r1 != r2 + + def test_brackets_the_mean(self) -> None: + vals = [0.5, -1.0, 0.5, 0.5, -1.0, 0.2, -0.3, 0.5, -1.0, 0.5] * 5 + mean = sum(vals) / len(vals) + lo, hi = bootstrap_ci(vals, iterations=1000, seed=7) + assert lo <= mean <= hi + + def test_empty_input(self) -> None: + assert bootstrap_ci([], iterations=100, seed=0) == (0.0, 0.0) + + def test_single_value(self) -> None: + lo, hi = bootstrap_ci([0.5], iterations=100, seed=0) + # No variance with n=1: short-circuited to (mean, mean). + assert lo == pytest.approx(0.5) + assert hi == pytest.approx(0.5) + + +# --------------------------------------------------------------------------- +# Loading + group stats on the 30-trade fixture +# --------------------------------------------------------------------------- + + +class TestSyntheticFixture: + def test_load_30(self, tmp_path: Path) -> None: + path = _synthetic_30(tmp_path) + trades = load_trades(path) + assert len(trades) == 30 + assert all(t.source == "vision" for t in trades) + + def test_overall_wr(self, tmp_path: Path) -> None: + trades = load_trades(_synthetic_30(tmp_path)) + wins, n, wr = win_rate(trades) + assert wins == 17 + assert n == 30 + assert wr == pytest.approx(17 / 30) + + def test_overall_expectancy(self, tmp_path: Path) -> None: + trades = load_trades(_synthetic_30(tmp_path)) + # 17 wins * 0.5 + 13 losses * -1.0 = 8.5 - 13.0 = -4.5 → mean = -0.15 + assert expectancy(trades) == pytest.approx(-0.15, abs=1e-9) + + def test_per_set_wr(self, tmp_path: Path) -> None: + trades = load_trades(_synthetic_30(tmp_path)) + by_set = group_by(trades, "set") + wr_a1 = win_rate(by_set["A1"])[2] + wr_a2 = win_rate(by_set["A2"])[2] + wr_a3 = win_rate(by_set["A3"])[2] + assert wr_a1 == pytest.approx(0.60) + assert wr_a2 == pytest.approx(0.70) + assert wr_a3 == pytest.approx(0.40) + + def test_group_stats_a2(self, tmp_path: Path) -> None: + trades = load_trades(_synthetic_30(tmp_path)) + a2 = [t for t in trades if t.set == "A2"] + s = compute_group_stats( + a2, label="A2", bootstrap_iterations=500, seed=11 + ) + assert s.n_total == 10 + assert s.n_resolved == 10 + assert s.wins == 7 + assert s.wr == pytest.approx(0.70) + # Wilson 7/10 + assert s.wr_ci_lo == pytest.approx(0.3967732199795652, abs=1e-9) + assert s.wr_ci_hi == pytest.approx(0.892210712513788, abs=1e-9) + # Expectancy A2 = 7*0.5 + 3*(-1.0) = 0.5 → mean = 0.05 + assert s.exp_marius == pytest.approx(0.05, abs=1e-9) + assert s.exp_marius_ci_lo <= s.exp_marius <= s.exp_marius_ci_hi + + +# --------------------------------------------------------------------------- +# Pending-trade handling +# --------------------------------------------------------------------------- + + +class TestPendingHandling: + def test_pending_excluded_from_wr(self, tmp_path: Path) -> None: + rows = [ + _base_row( + id=1, screenshot_file="a.png", + outcome_path="TP0→TP1", max_reached="TP1", + be_moved="True", pl_marius="0.5000", pl_theoretical="0.3330", + ), + _base_row( + id=2, screenshot_file="b.png", + outcome_path="pending", max_reached="TP0", + be_moved="False", pl_marius="", pl_theoretical="0.1330", + ), + _base_row( + id=3, screenshot_file="c.png", + outcome_path="SL", max_reached="SL_first", + be_moved="False", pl_marius="-1.0000", pl_theoretical="-1.0000", + ), + ] + p = tmp_path / "j.csv" + _write_csv(p, rows) + trades = load_trades(p) + + wins, n, wr = win_rate(trades) + assert wins == 1 + assert n == 2 # pending excluded + assert wr == pytest.approx(0.5) + # Expectancy on pl_marius averages only resolved rows: (0.5 + -1.0) / 2 = -0.25 + assert expectancy(trades, "pl_marius") == pytest.approx(-0.25) + + +# --------------------------------------------------------------------------- +# Source filtering: calibration rows excluded from main report +# --------------------------------------------------------------------------- + + +class TestSourceFiltering: + def test_calibration_rows_excluded_from_backtest_stats( + self, tmp_path: Path + ) -> None: + rows = [ + _base_row(id=1, source="vision", screenshot_file="v.png", + pl_marius="0.5000"), + _base_row(id=2, source="manual", screenshot_file="m.png", + pl_marius="0.5000"), + _base_row(id=3, source="manual_calibration", screenshot_file="c.png", + pl_marius="-1.0000"), + _base_row(id=4, source="vision_calibration", screenshot_file="c.png", + pl_marius="-1.0000"), + ] + p = tmp_path / "j.csv" + _write_csv(p, rows) + trades = load_trades(p) + backtest = [t for t in trades if t.source in BACKTEST_SOURCES] + assert len(backtest) == 2 + wins, n, wr = win_rate(backtest) + assert (wins, n) == (2, 2) + assert wr == pytest.approx(1.0) + + +# --------------------------------------------------------------------------- +# Calibration mode: pairing + mismatch +# --------------------------------------------------------------------------- + + +class TestCalibration: + def test_pairs_and_zero_mismatch(self, tmp_path: Path) -> None: + m = _base_row( + id=1, source="manual_calibration", screenshot_file="cal-1.png" + ) + v = _base_row( + id=2, source="vision_calibration", screenshot_file="cal-1.png" + ) + p = tmp_path / "j.csv" + _write_csv(p, [m, v]) + trades = load_trades(p) + rep = calibration_mismatch(trades) + assert rep.pairs == 1 + assert sum(rep.field_mismatches.values()) == 0 + assert rep.overall_mismatch_rate == 0.0 + + def test_one_field_mismatch(self, tmp_path: Path) -> None: + m = _base_row( + id=1, source="manual_calibration", screenshot_file="cal-1.png", + entry="400.0", + ) + v = _base_row( + id=2, source="vision_calibration", screenshot_file="cal-1.png", + entry="400.10", # different entry + ) + p = tmp_path / "j.csv" + _write_csv(p, [m, v]) + trades = load_trades(p) + rep = calibration_mismatch(trades) + assert rep.pairs == 1 + assert rep.field_mismatches["entry"] == 1 + # all other core fields match + for fld in CORE_CALIBRATION_FIELDS: + if fld == "entry": + continue + assert rep.field_mismatches[fld] == 0 + # 1 mismatch / (1 pair * 8 fields) = 12.5% + assert rep.overall_mismatch_rate == pytest.approx(1.0 / len(CORE_CALIBRATION_FIELDS)) + + def test_unpaired_rows_ignored(self, tmp_path: Path) -> None: + # Only a manual leg — no pair → 0 pairs. + m = _base_row( + id=1, source="manual_calibration", screenshot_file="lonely.png" + ) + p = tmp_path / "j.csv" + _write_csv(p, [m]) + trades = load_trades(p) + rep = calibration_mismatch(trades) + assert rep.pairs == 0 + assert rep.total_comparisons == 0 + assert rep.overall_mismatch_rate == 0.0 + + def test_numeric_equivalence_tolerated(self, tmp_path: Path) -> None: + """'400' and '400.0000' should NOT count as a mismatch on entry.""" + m = _base_row( + id=1, source="manual_calibration", screenshot_file="cal-1.png", + entry="400", + ) + v = _base_row( + id=2, source="vision_calibration", screenshot_file="cal-1.png", + entry="400.0000", + ) + p = tmp_path / "j.csv" + _write_csv(p, [m, v]) + rep = calibration_mismatch(load_trades(p)) + assert rep.field_mismatches["entry"] == 0 + + +# --------------------------------------------------------------------------- +# Report formatting + CLI +# --------------------------------------------------------------------------- + + +class TestReporting: + def test_format_report_contains_sections(self, tmp_path: Path) -> None: + out = format_report( + load_trades(_synthetic_30(tmp_path)), + bootstrap_iterations=200, + seed=0, + ) + assert "M2D Backtest Stats" in out + assert "Overall" in out + assert "By Set" in out + assert "A1" in out and "A2" in out and "A3" in out + # calitate warning present + assert "descriptor only" in out.lower() or "biased" in out.lower() + + def test_format_calibration_report(self, tmp_path: Path) -> None: + rows = [ + _base_row( + id=1, source="manual_calibration", screenshot_file="cal-1.png" + ), + _base_row( + id=2, source="vision_calibration", screenshot_file="cal-1.png", + directie="Sell", # mismatch on directie + entry="400.0", sl="401.0", tp0="399.5", tp1="399.0", tp2="398.0", + ), + ] + p = tmp_path / "j.csv" + _write_csv(p, rows) + out = format_calibration_report(load_trades(p)) + assert "Paired screenshots" in out + assert "directie" in out + # 1 mismatch (directie) of 8 fields = 12.5% → FAIL P4 gate + assert "FAIL" in out + + def test_empty_csv_report(self, tmp_path: Path) -> None: + p = tmp_path / "empty.csv" + _write_csv(p, []) + out = format_report(load_trades(p)) + assert "no backtest trades" in out.lower() + + def test_main_cli_runs( + self, tmp_path: Path, capsys: pytest.CaptureFixture + ) -> None: + path = _synthetic_30(tmp_path) + rc = main(["--csv", str(path), "--seed", "0", "--bootstrap-iterations", "100"]) + assert rc == 0 + captured = capsys.readouterr() + assert "M2D Backtest Stats" in captured.out + + def test_main_cli_calibration( + self, tmp_path: Path, capsys: pytest.CaptureFixture + ) -> None: + rows = [ + _base_row(id=1, source="manual_calibration", screenshot_file="cal-1.png"), + _base_row(id=2, source="vision_calibration", screenshot_file="cal-1.png"), + ] + p = tmp_path / "j.csv" + _write_csv(p, rows) + rc = main(["--csv", str(p), "--calibration"]) + assert rc == 0 + out = capsys.readouterr().out + assert "Calibration P4 gate" in out + assert "PASS" in out # all fields match → PASS