reboot: replace vision pipeline with Excel-first manual journal

Pipeline-ul vision (screenshot extraction + CSV append + Python stats) era
greoi pentru backtest semi-manual. Înlocuit cu un singur template Excel
generat din openpyxl + Dashboard cu comparație 5 strategii management pe
aceleași semnale blackbox.

- Strategii: TP0 only / TP1 only / TP2 only / Hybrid+BE / Hybrid no BE
- Input minim (12 coloane galbene); Sesiune și Zi derivate auto din Data+Ora
- Dashboard cu coloana "Cum citesc" + secțiune Glosar cu exemple concrete
- Breakdowns PER SESIUNE / STRATEGIE / INDICATOR / DIRECȚIE
- Equity curve cu 5 linii

Eliminat: m2d-extractor agent, /backtest, /batch, /m2d-log, /stats slash
commands, scripts/{append_row,pl_calc,stats,manual_log,regenerate_md,
vision_schema,calendar_parse}.py, tests/, screenshots/, data/extractions/.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Marius
2026-05-13 18:30:33 +03:00
parent a73ec30c13
commit 017921794e
40 changed files with 906 additions and 4391 deletions

View File

@@ -1,311 +0,0 @@
"""Append a validated M2D extraction to ``data/jurnal.csv``.
Pipeline:
JSON file --> pydantic validate (M2DExtraction)
--> load data/_meta.yaml (versions)
--> compute id, ora_ro, zi, set, pl_marius, pl_theoretical, extracted_at
--> dedup on (screenshot_file, source)
--> atomic CSV write (sibling .tmp + os.replace)
Source values
- ``vision`` : produced by the vision subagent
- ``manual`` : Marius logged by hand
- ``manual_calibration`` : calibration P4 — manual leg
- ``vision_calibration`` : calibration P4 — vision leg
A row with ``source=manual_calibration`` and a row with ``source=vision_calibration``
for the *same* screenshot are allowed to coexist (different dedup keys).
Failure mode: ``append_extraction`` NEVER raises. On any error (missing JSON,
pydantic ValidationError, dedup hit, etc.) it returns
``{"status": "rejected", "reason": "...", "id": None, "row": None}`` so the
caller (a slash command) can decide what to do with the screenshot
(move to ``needs_review/``, log to workflow, etc.).
"""
from __future__ import annotations
import csv
import json
import os
import traceback
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Literal
import yaml
from pydantic import ValidationError
from scripts.calendar_parse import calc_set, load_calendar, utc_to_ro
from scripts.pl_calc import pl_marius, pl_theoretical
from scripts.vision_schema import M2DExtraction, parse_extraction
__all__ = [
"CSV_COLUMNS",
"VALID_SOURCES",
"ZI_RO_MAP",
"csv_columns",
"append_extraction",
]
Source = Literal["vision", "manual", "manual_calibration", "vision_calibration"]
VALID_SOURCES: frozenset[str] = frozenset(
{"vision", "manual", "manual_calibration", "vision_calibration"}
)
# Canonical column order (29) — must stay stable; regenerate_md + stats depend on it.
CSV_COLUMNS: tuple[str, ...] = (
"id",
"screenshot_file",
"source",
"data",
"zi",
"ora_ro",
"ora_utc",
"instrument",
"directie",
"tf_mare",
"tf_mic",
"calitate",
"entry",
"sl",
"tp0",
"tp1",
"tp2",
"risc_pct",
"outcome_path",
"max_reached",
"be_moved",
"pl_marius",
"pl_theoretical",
"set",
"indicator_version",
"pl_overlay_version",
"csv_schema_version",
"extracted_at",
"note",
)
ZI_RO_MAP: dict[str, str] = {
"Mon": "Lu",
"Tue": "Ma",
"Wed": "Mi",
"Thu": "Jo",
"Fri": "Vi",
"Sat": "Sa",
"Sun": "Du",
}
def csv_columns() -> list[str]:
"""Return the 29-column header in canonical order."""
return list(CSV_COLUMNS)
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _load_meta(meta_path: Path) -> dict[str, Any]:
with meta_path.open("r", encoding="utf-8") as fh:
meta = yaml.safe_load(fh) or {}
required = ("indicator_version", "pl_overlay_version", "csv_schema_version")
missing = [k for k in required if k not in meta]
if missing:
raise ValueError(f"_meta.yaml missing required keys: {missing}")
return meta
def _read_existing_rows(csv_path: Path) -> list[dict[str, str]]:
if not csv_path.exists() or csv_path.stat().st_size == 0:
return []
with csv_path.open("r", encoding="utf-8", newline="") as fh:
reader = csv.DictReader(fh)
return list(reader)
def _next_id(rows: list[dict[str, str]]) -> int:
max_id = 0
for r in rows:
raw = r.get("id", "")
if not raw:
continue
try:
v = int(raw)
except (TypeError, ValueError):
continue
if v > max_id:
max_id = v
return max_id + 1
def _format_optional(value: float | None) -> str:
return "" if value is None else f"{value:.4f}"
def _write_csv_atomic(
csv_path: Path, rows: list[dict[str, str]], columns: list[str]
) -> None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
tmp = csv_path.with_suffix(csv_path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(fh, fieldnames=columns)
writer.writeheader()
for row in rows:
writer.writerow({k: row.get(k, "") for k in columns})
os.replace(tmp, csv_path)
def _build_row(
extraction: M2DExtraction,
*,
source: str,
row_id: int,
meta: dict[str, Any],
calendar: list[dict[str, Any]],
extracted_at: str,
) -> dict[str, str]:
d_ro, t_ro, day_short = utc_to_ro(extraction.data, extraction.ora_utc)
set_label = calc_set(d_ro, t_ro, day_short, calendar)
pl_m = pl_marius(extraction.outcome_path, extraction.be_moved)
pl_t = pl_theoretical(extraction.max_reached)
zi_ro = ZI_RO_MAP[day_short]
return {
"id": str(row_id),
"screenshot_file": extraction.screenshot_file,
"source": source,
"data": extraction.data,
"zi": zi_ro,
"ora_ro": t_ro.strftime("%H:%M"),
"ora_utc": extraction.ora_utc,
"instrument": extraction.instrument,
"directie": extraction.directie,
"tf_mare": extraction.tf_mare,
"tf_mic": extraction.tf_mic,
"calitate": extraction.calitate,
"entry": f"{extraction.entry}",
"sl": f"{extraction.sl}",
"tp0": f"{extraction.tp0}",
"tp1": f"{extraction.tp1}",
"tp2": f"{extraction.tp2}",
"risc_pct": f"{extraction.risc_pct}",
"outcome_path": extraction.outcome_path,
"max_reached": extraction.max_reached,
"be_moved": str(extraction.be_moved),
"pl_marius": _format_optional(pl_m),
"pl_theoretical": _format_optional(pl_t),
"set": set_label,
"indicator_version": str(meta["indicator_version"]),
"pl_overlay_version": str(meta["pl_overlay_version"]),
"csv_schema_version": str(meta["csv_schema_version"]),
"extracted_at": extracted_at,
"note": extraction.note,
}
def _reject(reason: str) -> dict[str, Any]:
return {"status": "rejected", "reason": reason, "id": None, "row": None}
# ---------------------------------------------------------------------------
# public API
# ---------------------------------------------------------------------------
def append_extraction(
json_path: Path | str,
source: str,
csv_path: Path | str = "data/jurnal.csv",
meta_path: Path | str = "data/_meta.yaml",
calendar_path: Path | str = "calendar_evenimente.yaml",
) -> dict[str, Any]:
"""Append one validated extraction to the jurnal CSV.
Never raises. Returns one of:
- ``{"status": "ok", "reason": "", "id": <int>, "row": <dict>}``
- ``{"status": "rejected", "reason": <str>, "id": None, "row": None}``
"""
json_path = Path(json_path)
csv_path = Path(csv_path)
meta_path = Path(meta_path)
calendar_path = Path(calendar_path)
if source not in VALID_SOURCES:
return _reject(
f"invalid source {source!r}; must be one of {sorted(VALID_SOURCES)}"
)
if not json_path.exists():
return _reject(f"JSON file not found: {json_path}")
try:
with json_path.open("r", encoding="utf-8") as fh:
raw = fh.read()
except OSError as exc:
return _reject(f"failed to read JSON {json_path}: {exc}")
try:
extraction = parse_extraction(raw)
except ValidationError as exc:
return _reject(f"validation error: {exc}")
except (ValueError, json.JSONDecodeError) as exc:
return _reject(f"validation error (json parse): {exc}")
try:
meta = _load_meta(meta_path)
except (FileNotFoundError, OSError) as exc:
return _reject(f"_meta.yaml not found: {exc}")
except (ValueError, yaml.YAMLError) as exc:
return _reject(f"_meta.yaml invalid: {exc}")
try:
calendar = load_calendar(calendar_path)
except (FileNotFoundError, OSError) as exc:
return _reject(f"calendar not found: {exc}")
except (ValueError, yaml.YAMLError) as exc:
return _reject(f"calendar invalid: {exc}")
try:
existing = _read_existing_rows(csv_path)
except OSError as exc:
return _reject(f"failed to read existing CSV {csv_path}: {exc}")
key = (extraction.screenshot_file, source)
for r in existing:
if (r.get("screenshot_file"), r.get("source")) == key:
return _reject(
f"duplicate row: screenshot_file={key[0]!r} source={key[1]!r}"
)
row_id = _next_id(existing)
extracted_at = (
datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
)
try:
row = _build_row(
extraction,
source=source,
row_id=row_id,
meta=meta,
calendar=calendar,
extracted_at=extracted_at,
)
except (KeyError, ValueError) as exc:
return _reject(f"derived-field computation failed: {exc}")
try:
_write_csv_atomic(csv_path, [*existing, row], list(CSV_COLUMNS))
except OSError as exc:
return _reject(
f"atomic write failed: {exc}\n{traceback.format_exc()}"
)
return {"status": "ok", "reason": "", "id": row_id, "row": row}

View File

@@ -1,181 +0,0 @@
"""Calendar parsing + Set classification for M2D backtesting.
Each trade is tagged with a ``Set`` derived from its date, RO-local time, and the
economic-event calendar:
- ``A1``: 16:35-17:00 RO, Tue/Wed/Thu
- ``A2``: 17:00-18:00 RO, Tue/Wed/Thu (sweet spot)
- ``A3``: 18:00-19:00 RO, Tue/Wed/Thu
- ``B`` : 22:00-22:45 RO, Tue/Wed/Thu
- ``C`` : inside the window of an event with severity in {extrem, mare}
- ``D`` : Mon or Fri
- ``Other``: anything else
Priority: C > D > A1/A2/A3/B > Other.
"""
from __future__ import annotations
from datetime import date, datetime, time
from pathlib import Path
from typing import Any
import yaml
from zoneinfo import ZoneInfo
__all__ = [
"RO_TZ",
"UTC_TZ",
"utc_to_ro",
"load_calendar",
"is_in_news_window",
"calc_set",
]
RO_TZ = ZoneInfo("Europe/Bucharest")
UTC_TZ = ZoneInfo("UTC")
_DAY_SHORT = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")
_HIGH_SEVERITY = frozenset({"extrem", "mare"})
_WEEKLY_DAY_MAP = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6,
}
def utc_to_ro(date_str: str, ora_utc_str: str) -> tuple[date, time, str]:
"""Convert ``(YYYY-MM-DD, HH:MM UTC)`` to ``(date_ro, time_ro, day_short)``.
DST-aware via :mod:`zoneinfo`. ``day_short`` is one of
``Mon Tue Wed Thu Fri Sat Sun``.
"""
dt_utc = datetime.strptime(f"{date_str} {ora_utc_str}", "%Y-%m-%d %H:%M").replace(
tzinfo=UTC_TZ
)
dt_ro = dt_utc.astimezone(RO_TZ)
return dt_ro.date(), dt_ro.time().replace(second=0, microsecond=0), _DAY_SHORT[dt_ro.weekday()]
def load_calendar(path: Path | str = "calendar_evenimente.yaml") -> list[dict[str, Any]]:
"""Load a YAML calendar file.
Validates ``schema_version == 1`` and returns the list of event dicts under
the top-level ``events`` key.
"""
p = Path(path)
with p.open("r", encoding="utf-8") as fh:
doc = yaml.safe_load(fh)
if not isinstance(doc, dict):
raise ValueError(f"calendar file {p} is not a mapping")
version = doc.get("schema_version")
if version != 1:
raise ValueError(
f"unsupported calendar schema_version: {version!r} (expected 1)"
)
events = doc.get("events") or []
if not isinstance(events, list):
raise ValueError(f"calendar events must be a list, got {type(events).__name__}")
return events
def _minutes(t: time) -> int:
return t.hour * 60 + t.minute
def _parse_hhmm(s: str) -> time:
return datetime.strptime(s, "%H:%M").time()
def _is_first_friday_of_month(d: date) -> bool:
return d.weekday() == 4 and d.day <= 7
def _event_matches_date(event: dict[str, Any], d: date) -> bool:
cadence = event.get("cadence", "")
if cadence == "scheduled":
ev_date_raw = event.get("date")
if isinstance(ev_date_raw, date):
ev_date = ev_date_raw
elif isinstance(ev_date_raw, str):
ev_date = datetime.strptime(ev_date_raw, "%Y-%m-%d").date()
else:
return False
return ev_date == d
if cadence == "first_friday_monthly":
return _is_first_friday_of_month(d)
if cadence.startswith("weekly_"):
day_name = cadence[len("weekly_") :].lower()
target = _WEEKLY_DAY_MAP.get(day_name)
if target is None:
return False
return d.weekday() == target
# cadences below are not pinned down to a precise calendar day yet, so we
# do not trigger Set C for them. ADP pre-NFP is also explicitly deferred.
return False
def is_in_news_window(d: date, t: time, calendar: list[dict[str, Any]]) -> bool:
"""Return True iff ``(d, t)`` falls inside the window of a high-severity event.
Window: ``[time_ro - window_before_min, time_ro + window_after_min]`` (inclusive
on both ends). Only events with ``severity`` in ``{extrem, mare}`` count.
Cadences honoured: ``scheduled``, ``first_friday_monthly``, ``weekly_<day>``.
Other cadences (``monthly_mid``, ``monthly_end``, ``monthly_15``,
``wednesday_pre_nfp``, ``monthly_first_week`` etc.) are deferred and never
trigger Set C.
"""
t_min = _minutes(t)
for event in calendar:
if event.get("severity") not in _HIGH_SEVERITY:
continue
if not _event_matches_date(event, d):
continue
ev_time_raw = event.get("time_ro")
if isinstance(ev_time_raw, time):
ev_time = ev_time_raw
elif isinstance(ev_time_raw, str):
ev_time = _parse_hhmm(ev_time_raw)
else:
continue
center = _minutes(ev_time)
before = int(event.get("window_before_min", 0))
after = int(event.get("window_after_min", 0))
if center - before <= t_min <= center + after:
return True
return False
def _in_range(t: time, lo: time, hi: time) -> bool:
"""Half-open ``[lo, hi)`` containment."""
return _minutes(lo) <= _minutes(t) < _minutes(hi)
def calc_set(d: date, t: time, day_of_week: str, calendar: list[dict[str, Any]]) -> str:
"""Classify a trade into one of ``A1 A2 A3 B C D Other``.
Priority: ``C`` (news) > ``D`` (Mon/Fri) > ``A1/A2/A3/B`` (time bands on
Tue/Wed/Thu) > ``Other``.
"""
if is_in_news_window(d, t, calendar):
return "C"
if day_of_week in ("Mon", "Fri"):
return "D"
if day_of_week in ("Tue", "Wed", "Thu"):
if _in_range(t, time(16, 35), time(17, 0)):
return "A1"
if _in_range(t, time(17, 0), time(18, 0)):
return "A2"
if _in_range(t, time(18, 0), time(19, 0)):
return "A3"
if _in_range(t, time(22, 0), time(22, 45)):
return "B"
return "Other"

View File

@@ -0,0 +1,731 @@
"""Generator pentru data/backtest.xlsx.
5 strategii de management comparate side-by-side pe semnale blackbox:
- TP0 only : 100% close la TP0
- TP1 only : 100% OCO la SL/TP1
- TP2 only : 100% OCO la SL/TP2
- Hybrid + BE : 50% TP0 + mut SL la BE + 50% TP1 (recomandat de trader)
- Hybrid no BE : 50% TP0 + 50% TP1, fără BE (control pentru a izola valoarea BE-ului)
Rulare:
pip install openpyxl
python scripts/generate_template.py
"""
from __future__ import annotations
from datetime import date, time
from pathlib import Path
from openpyxl import Workbook
from openpyxl.chart import LineChart, Reference
from openpyxl.formatting.rule import CellIsRule
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from openpyxl.worksheet.datavalidation import DataValidation
OUTPUT = Path(__file__).resolve().parent.parent / "data" / "backtest.xlsx"
MAX_ROWS = 500 # rânduri pre-completate cu formule în sheet-ul Trades
# ---------------------------------------------------------------------------
# Styles
# ---------------------------------------------------------------------------
HEADER_FILL = PatternFill("solid", fgColor="1F3864")
HEADER_FONT = Font(name="Calibri", size=11, bold=True, color="FFFFFF")
INPUT_FILL = PatternFill("solid", fgColor="FFF8E1")
DERIVED_FILL = PatternFill("solid", fgColor="E8F1FA")
HIDDEN_FILL = PatternFill("solid", fgColor="F0F0F0")
TITLE_FONT = Font(name="Calibri", size=16, bold=True, color="1F3864")
SUBTITLE_FONT = Font(name="Calibri", size=12, bold=True, color="1F3864")
THIN = Side(border_style="thin", color="BFBFBF")
BORDER = Border(left=THIN, right=THIN, top=THIN, bottom=THIN)
CENTER = Alignment(horizontal="center", vertical="center")
LEFT = Alignment(horizontal="left", vertical="center")
RIGHT = Alignment(horizontal="right", vertical="center")
# ---------------------------------------------------------------------------
# Lists
# ---------------------------------------------------------------------------
STRATEGIES = ["M2D", "EMA cross", "Order block", "Liquidity sweep", "Custom"]
SESSIONS = ["A1", "A2", "A3", "B", "C", "D", "Other"]
INDICATORS = ["DIA", "US30", "SPY", "QQQ", "ES", "NQ"]
TIMEFRAMES = ["1min", "3min", "15min"]
DIRECTIONS = ["Buy", "Sell"]
OUTCOMES = ["SL", "TP0 only", "TP1", "TP2"]
# Cele 5 strategii de management (sufix folosit în numele coloanelor) + label friendly
STRAT_KEYS = ["tp0only", "tp1only", "tp2only", "hybrid_be", "hybrid_nobe"]
STRAT_LABELS = {
"tp0only": "TP0 only",
"tp1only": "TP1 only",
"tp2only": "TP2 only",
"hybrid_be": "Hybrid + BE",
"hybrid_nobe": "Hybrid no BE",
}
# ---------------------------------------------------------------------------
# Trades sheet — schema
# ---------------------------------------------------------------------------
INPUT_HEADERS = [
"#", "Data", "Ora RO", "Zi", "Sesiune",
"Strategie", "Indicator", "TF",
"Direcție", "SL %", "TP0 %", "TP1 %", "TP2 %",
"Outcome", "Notes",
]
DERIVED_HEADERS = (
[f"R_{s}" for s in STRAT_KEYS]
+ [f"$_{s}" for s in STRAT_KEYS]
+ [f"Bal_{s}" for s in STRAT_KEYS]
)
HELPER_HEADERS = (
[f"Win_{s}" for s in STRAT_KEYS]
+ [f"Peak_{s}" for s in STRAT_KEYS]
+ [f"DD_{s}" for s in STRAT_KEYS]
)
TRADES_HEADERS = INPUT_HEADERS + DERIVED_HEADERS + HELPER_HEADERS
# Mapă nume → literă coloană Excel
COL = {name: get_column_letter(i + 1) for i, name in enumerate(TRADES_HEADERS)}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _col_to_int(letter: str) -> int:
n = 0
for ch in letter:
n = n * 26 + (ord(ch) - ord("A") + 1)
return n
# ---------------------------------------------------------------------------
# Config sheet
# ---------------------------------------------------------------------------
def build_config(wb: Workbook) -> None:
ws = wb.create_sheet("Config", 0)
ws.sheet_view.showGridLines = False
ws["A1"] = "📋 Config — editează doar celulele galbene"
ws["A1"].font = TITLE_FONT
ws.merge_cells("A1:C1")
ws["A3"] = "Setting"
ws["B3"] = "Value"
ws["C3"] = "Note"
for c in ("A3", "B3", "C3"):
ws[c].font = HEADER_FONT
ws[c].fill = HEADER_FILL
ws[c].alignment = CENTER
ws["A4"] = "Account Size Start ($)"
ws["B4"] = 10000
ws["C4"] = "Balanța inițială pentru calcule $ și HWM"
ws["A5"] = "Risk per Trade (%)"
ws["B5"] = 1.0
ws["C5"] = "% din account riscat per trade (= -1R)"
ws["A6"] = "Risk per Trade ($)"
ws["B6"] = "=B4*B5/100"
ws["C6"] = "Auto — derivat din B4 și B5"
for r in (4, 5):
ws.cell(row=r, column=2).fill = INPUT_FILL
ws.cell(row=r, column=2).border = BORDER
ws["B6"].fill = DERIVED_FILL
ws["B6"].border = BORDER
ws["B4"].number_format = "$#,##0"
ws["B5"].number_format = '0.0"%"'
ws["B6"].number_format = "$#,##0.00"
# Liste dropdown — coloanele EJ (6 coloane)
list_columns = [
("Strategii", STRATEGIES),
("Sesiuni (auto)", SESSIONS),
("Indicatori", INDICATORS),
("TF", TIMEFRAMES),
("Direcție", DIRECTIONS),
("Outcome", OUTCOMES),
]
for col_idx, (label, values) in enumerate(list_columns, start=5):
cell = ws.cell(row=3, column=col_idx, value=label)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = CENTER
for row_idx, v in enumerate(values, start=4):
c = ws.cell(row=row_idx, column=col_idx, value=v)
c.alignment = CENTER
widths = {
"A": 24, "B": 14, "C": 38, "D": 2,
"E": 14, "F": 14, "G": 13, "H": 10, "I": 10, "J": 12,
}
for col, w in widths.items():
ws.column_dimensions[col].width = w
# ---------------------------------------------------------------------------
# Formula builders pentru Trades sheet
# ---------------------------------------------------------------------------
def _f_day(r: int) -> str:
d = f'{COL["Data"]}{r}'
return (
f'=IF({d}="","",'
f'CHOOSE(WEEKDAY({d},2),"Lu","Ma","Mi","Jo","Vi","Sa","Du"))'
)
def _f_session(r: int) -> str:
"""Derivă Sesiunea M2D din Data + Ora RO."""
d = f'{COL["Data"]}{r}'
t = f'{COL["Ora RO"]}{r}'
wd = f"WEEKDAY({d},2)"
mid_week = f"AND({wd}>=2,{wd}<=4)"
return (
f'=IF(OR({d}="",{t}=""),"",'
f"IF(OR({wd}=1,{wd}=5),\"D\","
f'IF(AND({t}>=TIME(15,30,0),{t}<TIME(16,30,0)),"C",'
f'IF(AND({mid_week},{t}>=TIME(16,35,0),{t}<TIME(17,0,0)),"A1",'
f'IF(AND({mid_week},{t}>=TIME(17,0,0),{t}<TIME(18,0,0)),"A2",'
f'IF(AND({mid_week},{t}>=TIME(18,0,0),{t}<TIME(19,0,0)),"A3",'
f'IF(AND({mid_week},{t}>=TIME(22,0,0),{t}<TIME(22,45,0)),"B",'
f'"Other")))))))'
)
def _f_r_tp0only(r: int) -> str:
o = f'{COL["Outcome"]}{r}'
sl = f'{COL["SL %"]}{r}'
tp0 = f'{COL["TP0 %"]}{r}'
return f'=IF({o}="","",IF({o}="SL",-1,{tp0}/{sl}))'
def _f_r_tp1only(r: int) -> str:
o = f'{COL["Outcome"]}{r}'
sl = f'{COL["SL %"]}{r}'
tp1 = f'{COL["TP1 %"]}{r}'
return (
f'=IF({o}="","",'
f'IF(OR({o}="SL",{o}="TP0 only"),-1,{tp1}/{sl}))'
)
def _f_r_tp2only(r: int) -> str:
o = f'{COL["Outcome"]}{r}'
sl = f'{COL["SL %"]}{r}'
tp2 = f'{COL["TP2 %"]}{r}'
return f'=IF({o}="","",IF({o}="TP2",{tp2}/{sl},-1))'
def _f_r_hybrid_be(r: int) -> str:
o = f'{COL["Outcome"]}{r}'
sl = f'{COL["SL %"]}{r}'
tp0 = f'{COL["TP0 %"]}{r}'
tp1 = f'{COL["TP1 %"]}{r}'
return (
f'=IF({o}="","",'
f'IF({o}="SL",-1,'
f'IF({o}="TP0 only",0.5*{tp0}/{sl},'
f'0.5*({tp0}+{tp1})/{sl})))'
)
def _f_r_hybrid_nobe(r: int) -> str:
o = f'{COL["Outcome"]}{r}'
sl = f'{COL["SL %"]}{r}'
tp0 = f'{COL["TP0 %"]}{r}'
tp1 = f'{COL["TP1 %"]}{r}'
return (
f'=IF({o}="","",'
f'IF({o}="SL",-1,'
f'IF({o}="TP0 only",0.5*{tp0}/{sl}-0.5,'
f'0.5*({tp0}+{tp1})/{sl})))'
)
R_FN: dict[str, callable] = {
"tp0only": _f_r_tp0only,
"tp1only": _f_r_tp1only,
"tp2only": _f_r_tp2only,
"hybrid_be": _f_r_hybrid_be,
"hybrid_nobe": _f_r_hybrid_nobe,
}
def _f_dollar(r: int, r_col: str) -> str:
rc = f"{COL[r_col]}{r}"
return f'=IF({rc}="","",{rc}*Config!$B$6)'
def _f_balance(r: int, dollar_col: str) -> str:
dc = COL[dollar_col]
return f'=IF({dc}{r}="","",Config!$B$4 + SUM(${dc}$2:{dc}{r}))'
def _f_win(r: int, r_col: str) -> str:
rc = f"{COL[r_col]}{r}"
return f'=IF({rc}="","",IF({rc}>0,1,0))'
def _f_peak(r: int, balance_col: str, peak_col: str) -> str:
bc = COL[balance_col]
pc = COL[peak_col]
if r == 2:
return f'=IF({bc}{r}="","",{bc}{r})'
return (
f'=IF({bc}{r}="","",'
f'IF({pc}{r-1}="",{bc}{r},MAX({pc}{r-1},{bc}{r})))'
)
def _f_drawdown(r: int, peak_col: str, balance_col: str) -> str:
pc = f"{COL[peak_col]}{r}"
bc = f"{COL[balance_col]}{r}"
return f'=IF({bc}="","",{pc}-{bc})'
# ---------------------------------------------------------------------------
# Trades sheet
# ---------------------------------------------------------------------------
def build_trades(wb: Workbook) -> None:
ws = wb.create_sheet("Trades", 1)
ws.sheet_view.showGridLines = False
ws.freeze_panes = "B2"
# Headers
for col_idx, header in enumerate(TRADES_HEADERS, start=1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = CENTER
cell.border = BORDER
# Formule pe toate rândurile pre-pregătite
for r in range(2, MAX_ROWS + 2):
ws.cell(row=r, column=1, value="=ROW()-1")
ws[f'{COL["Zi"]}{r}'] = _f_day(r)
ws[f'{COL["Sesiune"]}{r}'] = _f_session(r)
for strat in STRAT_KEYS:
ws[f'{COL[f"R_{strat}"]}{r}'] = R_FN[strat](r)
ws[f'{COL[f"$_{strat}"]}{r}'] = _f_dollar(r, f"R_{strat}")
ws[f'{COL[f"Bal_{strat}"]}{r}'] = _f_balance(r, f"$_{strat}")
ws[f'{COL[f"Win_{strat}"]}{r}'] = _f_win(r, f"R_{strat}")
ws[f'{COL[f"Peak_{strat}"]}{r}'] = _f_peak(
r, f"Bal_{strat}", f"Peak_{strat}"
)
ws[f'{COL[f"DD_{strat}"]}{r}'] = _f_drawdown(
r, f"Peak_{strat}", f"Bal_{strat}"
)
# Sample row 2
ws["B2"] = date(2026, 5, 13)
ws["C2"] = time(17, 33)
ws[f'{COL["Strategie"]}2'] = "M2D"
ws[f'{COL["Indicator"]}2'] = "DIA"
ws[f'{COL["TF"]}2'] = "1min"
ws[f'{COL["Direcție"]}2'] = "Sell"
ws[f'{COL["SL %"]}2'] = 0.30
ws[f'{COL["TP0 %"]}2'] = 0.10
ws[f'{COL["TP1 %"]}2'] = 0.15
ws[f'{COL["TP2 %"]}2'] = 0.30
ws[f'{COL["Outcome"]}2'] = "TP1"
ws[f'{COL["Notes"]}2'] = "Exemplu — șterge când începi"
# Number formats
for col_name in ("SL %", "TP0 %", "TP1 %", "TP2 %"):
for r in range(2, MAX_ROWS + 2):
ws[f"{COL[col_name]}{r}"].number_format = '0.000"%"'
for strat in STRAT_KEYS:
for r in range(2, MAX_ROWS + 2):
ws[f"{COL[f'R_{strat}']}{r}"].number_format = "+0.000;-0.000;0.000"
for prefix in ("$_", "Bal_", "Peak_", "DD_"):
ws[f"{COL[f'{prefix}{strat}']}{r}"].number_format = '"$"#,##0.00'
for r in range(2, MAX_ROWS + 2):
ws[f"B{r}"].number_format = "yyyy-mm-dd"
# Coloring
input_letters = {
COL[n]
for n in (
"Data", "Ora RO", "Strategie", "Indicator", "TF",
"Direcție", "SL %", "TP0 %", "TP1 %", "TP2 %",
"Outcome", "Notes",
)
}
derived_letters = {COL["Zi"], COL["Sesiune"]}
for strat in STRAT_KEYS:
derived_letters.add(COL[f"R_{strat}"])
derived_letters.add(COL[f"$_{strat}"])
derived_letters.add(COL[f"Bal_{strat}"])
helper_letters = set()
for strat in STRAT_KEYS:
for prefix in ("Win_", "Peak_", "DD_"):
helper_letters.add(COL[f"{prefix}{strat}"])
for r in range(2, MAX_ROWS + 2):
for cl in input_letters:
ws[f"{cl}{r}"].fill = INPUT_FILL
for cl in derived_letters:
ws[f"{cl}{r}"].fill = DERIVED_FILL
for cl in helper_letters:
ws[f"{cl}{r}"].fill = HIDDEN_FILL
# Column widths
widths = {
"A": 5, "B": 12, "C": 9, "D": 5, "E": 9,
"F": 12, "G": 11, "H": 8, "I": 9,
"J": 9, "K": 9, "L": 9, "M": 9,
"N": 11, "O": 28,
}
for col, w in widths.items():
ws.column_dimensions[col].width = w
# Derived + helper: width 11
for strat in STRAT_KEYS:
for prefix in ("R_", "$_", "Bal_", "Win_", "Peak_", "DD_"):
ws.column_dimensions[COL[f"{prefix}{strat}"]].width = 11
# Data validation dropdowns
def _add_dv(col_name: str, source: str) -> None:
cl = COL[col_name]
dv = DataValidation(
type="list", formula1=source,
allow_blank=True, showErrorMessage=True,
)
dv.error = "Valoare invalidă — folosește dropdown-ul."
dv.errorTitle = "Input invalid"
dv.add(f"{cl}2:{cl}{MAX_ROWS + 1}")
ws.add_data_validation(dv)
# Config columns: E=Strategii, F=Sesiuni, G=Indicatori, H=TF, I=Direcție, J=Outcome
_add_dv("Strategie", "=Config!$E$4:$E$8")
_add_dv("Indicator", "=Config!$G$4:$G$9")
_add_dv("TF", "=Config!$H$4:$H$6")
_add_dv("Direcție", "=Config!$I$4:$I$5")
_add_dv("Outcome", "=Config!$J$4:$J$7")
# Conditional formatting pe coloanele R (5 strategii)
green_fill = PatternFill("solid", fgColor="C6EFCE")
red_fill = PatternFill("solid", fgColor="FFC7CE")
grey_fill = PatternFill("solid", fgColor="D9D9D9")
for strat in STRAT_KEYS:
cl = COL[f"R_{strat}"]
rng = f"{cl}2:{cl}{MAX_ROWS + 1}"
ws.conditional_formatting.add(
rng, CellIsRule(operator="greaterThan", formula=["0"], fill=green_fill)
)
ws.conditional_formatting.add(
rng, CellIsRule(operator="lessThan", formula=["0"], fill=red_fill)
)
ws.conditional_formatting.add(
rng, CellIsRule(operator="equal", formula=["0"], fill=grey_fill)
)
# ---------------------------------------------------------------------------
# Dashboard sheet
# ---------------------------------------------------------------------------
def _range(col_name: str) -> str:
cl = COL[col_name]
return f"Trades!${cl}$2:${cl}${MAX_ROWS + 1}"
METRIC_HINTS: dict[str, str] = {
"Trades Placed": "Numărul total de trade-uri logate",
"Wins": "Trade-uri cu R > 0",
"Win Ratio": "% wins. Singur NU spune mult — vezi împreună cu R:R și Expectancy",
"Average Win ($)": "Câștigul mediu pe trade winning",
"Average Loss ($)": "Pierderea medie pe trade losing",
"Best Trade ($)": "Cel mai mare câștig individual",
"Worst Trade ($)": "Cea mai mare pierdere individuală",
"Profit Factor": ">1.0 profitabil • >1.5 solid • >2.0 foarte bun • <1.0 pierzător",
"Risk:Reward": "Avg Win ÷ |Avg Loss|. >1 = câștig mediu > pierdere medie",
"Expectancy (R)": "★ STEAUA NORDULUI ★ >+0.20R = GO LIVE • negativ = ABANDON",
"Expectancy ($)": "Expectancy R convertit în $ (folosește Risk per Trade)",
"Cumulative P&L ($)": "P&L total în $ pe toate trade-urile",
"HWM Balance ($)": "Highest watermark — balanța de vârf atinsă",
"Max Drawdown ($)": "Cea mai mare cădere ($) din vârf la fund",
}
def build_dashboard(wb: Workbook) -> None:
ws = wb.create_sheet("Dashboard", 2)
ws.sheet_view.showGridLines = False
ws["A1"] = "📊 Backtest Dashboard"
ws["A1"].font = TITLE_FONT
ws.merge_cells("A1:G1")
ws["A2"] = (
"Comparație 5 strategii management — pe aceleași semnale blackbox"
)
ws["A2"].font = Font(name="Calibri", size=10, italic=True, color="595959")
ws.merge_cells("A2:G2")
# Row 4: headers (5 columns B-F pentru strategii + G pentru "Cum citesc")
ws["A4"] = "Metric"
strat_cols = {} # strat_key → column letter (B/C/D/E/F)
for i, strat in enumerate(STRAT_KEYS):
letter = get_column_letter(2 + i)
strat_cols[strat] = letter
ws[f"{letter}4"] = STRAT_LABELS[strat]
ws["G4"] = "Cum citesc"
for letter in ["A"] + list(strat_cols.values()) + ["G"]:
c = ws[f"{letter}4"]
c.font = HEADER_FONT
c.fill = HEADER_FILL
c.alignment = CENTER
c.border = BORDER
# Ranges per strategie
R = {s: _range(f"R_{s}") for s in STRAT_KEYS}
D = {s: _range(f"$_{s}") for s in STRAT_KEYS}
W = {s: _range(f"Win_{s}") for s in STRAT_KEYS}
BAL = {s: _range(f"Bal_{s}") for s in STRAT_KEYS}
DD = {s: _range(f"DD_{s}") for s in STRAT_KEYS}
OUTCOME_RANGE = _range("Outcome")
# Metric rows — fiecare metric e un dict cu per-strategy formula + format
metrics: list[tuple[str, callable, str]] = [
# (label, fn(strat_key) -> formula, number_format)
("Trades Placed", lambda s: f'=COUNTA({OUTCOME_RANGE})', "0"),
("Wins", lambda s: f'=COUNTIF({W[s]},1)', "0"),
# Win Ratio: depends on rows above — handled after metrics list (placeholder)
("Win Ratio", lambda s: None, "0.0%"),
("Average Win ($)", lambda s: f'=IFERROR(AVERAGEIF({D[s]},">0"),0)', '"$"#,##0.00'),
("Average Loss ($)", lambda s: f'=IFERROR(AVERAGEIF({D[s]},"<0"),0)', '"$"#,##0.00'),
("Best Trade ($)", lambda s: f'=IFERROR(MAX({D[s]}),0)', '"$"#,##0.00'),
("Worst Trade ($)", lambda s: f'=IFERROR(MIN({D[s]}),0)', '"$"#,##0.00'),
("Profit Factor", lambda s: f'=IFERROR(SUMIF({D[s]},">0")/ABS(SUMIF({D[s]},"<0")),0)', "0.00"),
# Risk:Reward — placeholder; bazat pe rândurile Avg Win/Loss
("Risk:Reward", lambda s: None, "0.00"),
("Expectancy (R)", lambda s: f'=IFERROR(AVERAGE({R[s]}),0)', "+0.000;-0.000;0.000"),
("Expectancy ($)", lambda s: f'=IFERROR(AVERAGE({D[s]}),0)', '"$"#,##0.00'),
("Cumulative P&L ($)", lambda s: f'=SUM({D[s]})', '"$"#,##0.00'),
# HWM — placeholder cu ref la Trades Placed (row 5)
("HWM Balance ($)", lambda s: None, '"$"#,##0.00'),
("Max Drawdown ($)", lambda s: f'=IFERROR(MAX({DD[s]}),0)', '"$"#,##0.00'),
]
# Determine row indexes pentru formule speciale (depind de poziție)
label_to_row = {label: 5 + idx for idx, (label, _, _) in enumerate(metrics)}
trades_row = label_to_row["Trades Placed"]
wins_row = label_to_row["Wins"]
avg_win_row = label_to_row["Average Win ($)"]
avg_loss_row = label_to_row["Average Loss ($)"]
for idx, (label, fn, fmt) in enumerate(metrics):
r = 5 + idx
ws[f"A{r}"] = label
ws[f"A{r}"].font = Font(name="Calibri", size=11, bold=True)
ws[f"A{r}"].border = BORDER
ws[f"A{r}"].alignment = LEFT
for strat in STRAT_KEYS:
letter = strat_cols[strat]
if label == "Win Ratio":
formula = f"=IFERROR({letter}{wins_row}/{letter}{trades_row},0)"
elif label == "Risk:Reward":
formula = f"=IFERROR({letter}{avg_win_row}/ABS({letter}{avg_loss_row}),0)"
elif label == "HWM Balance ($)":
formula = (
f"=IF({letter}{trades_row}=0,Config!$B$4,MAX({BAL[strat]}))"
)
else:
formula = fn(strat)
cell = ws[f"{letter}{r}"]
cell.value = formula
cell.number_format = fmt
cell.fill = DERIVED_FILL
cell.border = BORDER
cell.alignment = RIGHT
# Coloana G — interpretare scurtă
hint_cell = ws[f"G{r}"]
hint_cell.value = METRIC_HINTS.get(label, "")
hint_cell.font = Font(name="Calibri", size=10, italic=True, color="595959")
hint_cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True)
hint_cell.border = BORDER
# ---- Glosar section: exemple concrete pentru metricile-cheie ----
glosar_start = 5 + len(metrics) + 2 # 2 rânduri spațiu după metrici
ws[f"A{glosar_start}"] = "📖 Glosar metrici — exemple concrete"
ws[f"A{glosar_start}"].font = SUBTITLE_FONT
ws.merge_cells(f"A{glosar_start}:G{glosar_start}")
glosar_entries = [
(
"Profit Factor",
"Suma câștigurilor ÷ |suma pierderilor|. Total cumulativ, nu mediu.",
"10 trade-uri: 4 wins de $50 (=$200) + 6 losses de $30 (=$180). PF = 200÷180 = 1.11 (marginal profitabil). La PF=2.0 câștigi de 2× cât pierzi în total.",
),
(
"Risk:Reward",
"Avg Win ÷ |Avg Loss|. Privește per-trade, nu total.",
"Avg win $50, avg loss $30 → R:R = 1.67. La R:R=2.0 ești profitabil chiar cu Win Ratio doar 40%. La R:R=0.5 ai nevoie de WR >67%.",
),
(
"Expectancy (R)",
"Câștigul mediu per trade exprimat în multipli de risc (R). CEA MAI ONESTĂ metrică — combină WR și R:R într-un singur număr.",
"10 trade-uri cu R = [+0.5, +0.5, +0.5, +0.5, 1, 1, 1, 1, 1, 1] → media = 0.30R (pierdere) chiar dacă WR=40%. Pragul GO LIVE din STOPPING_RULE.md: ≥ +0.20R.",
),
(
"Win Ratio (WR)",
"% trade-uri cu R > 0. ÎNȘELĂTOR singur — un WR mare cu R:R mic poate fi pierzător.",
"WR=70% pare excelent, dar dacă R:R=0.3 (câștigi $30, pierzi $100) → Expectancy = 0.7·30 0.3·100 = $9 per trade. Pierzător.",
),
(
"Max Drawdown",
"Cea mai mare cădere din vârful balanței la fundul ulterior. Măsoară 'durerea psihologică'.",
"Balance peak $11,500 → fund $9,800 → DD = $1,700 (17% din peak). DD mare la backtest = greu de tolerat în live.",
),
]
row = glosar_start + 1
for term, definition, example in glosar_entries:
ws[f"A{row}"] = term
ws[f"A{row}"].font = Font(name="Calibri", size=11, bold=True, color="1F3864")
ws[f"A{row}"].alignment = Alignment(horizontal="left", vertical="top", wrap_text=True)
ws[f"B{row}"] = definition
ws[f"B{row}"].font = Font(name="Calibri", size=10)
ws[f"B{row}"].alignment = Alignment(horizontal="left", vertical="top", wrap_text=True)
ws.merge_cells(f"B{row}:C{row}")
ws[f"D{row}"] = f"Exemplu: {example}"
ws[f"D{row}"].font = Font(name="Calibri", size=10, italic=True, color="595959")
ws[f"D{row}"].alignment = Alignment(horizontal="left", vertical="top", wrap_text=True)
ws.merge_cells(f"D{row}:G{row}")
ws.row_dimensions[row].height = 48
row += 1
glosar_end = row # primul rând după glosar
# Helper pentru a emite un block breakdown (per Sesiune / Strategie / etc.)
def _emit_breakdown(
start_row: int, title: str, first_col_label: str,
items: list[str], item_range: str, overlay_strat: str,
) -> int:
ws[f"A{start_row}"] = title
ws[f"A{start_row}"].font = SUBTITLE_FONT
ws.merge_cells(f"A{start_row}:F{start_row}")
headers = [first_col_label, "N", "Wins", "WR", "Expectancy R", "Cum $"]
for col_idx, h in enumerate(headers, start=1):
c = ws.cell(row=start_row + 1, column=col_idx, value=h)
c.font = HEADER_FONT
c.fill = HEADER_FILL
c.alignment = CENTER
c.border = BORDER
for i, item in enumerate(items):
r = start_row + 2 + i
ws[f"A{r}"] = item
ws[f"B{r}"] = f'=COUNTIF({item_range},"{item}")'
ws[f"C{r}"] = f'=COUNTIFS({item_range},"{item}",{W[overlay_strat]},1)'
ws[f"D{r}"] = f"=IFERROR(C{r}/B{r},0)"
ws[f"E{r}"] = (
f'=IFERROR(AVERAGEIFS({R[overlay_strat]},{item_range},"{item}"),0)'
)
ws[f"F{r}"] = f'=SUMIFS({D[overlay_strat]},{item_range},"{item}")'
ws[f"B{r}"].number_format = "0"
ws[f"C{r}"].number_format = "0"
ws[f"D{r}"].number_format = "0.0%"
ws[f"E{r}"].number_format = "+0.000;-0.000;0.000"
ws[f"F{r}"].number_format = '"$"#,##0.00'
for c in ("A", "B", "C", "D", "E", "F"):
ws[f"{c}{r}"].border = BORDER
ws[f"{c}{r}"].alignment = RIGHT if c != "A" else LEFT
return start_row + 2 + len(items)
# Breakdowns — toate folosesc overlay-ul Hybrid+BE (recomandat de trader)
overlay = "hybrid_be"
start = glosar_end + 2 # 2 rânduri spațiu după glosar
after_sess = _emit_breakdown(
start, "PER SESIUNE (overlay: Hybrid + BE)", "Sesiune",
SESSIONS, _range("Sesiune"), overlay,
)
after_strat = _emit_breakdown(
after_sess + 2, "PER STRATEGIE (overlay: Hybrid + BE)", "Strategie",
STRATEGIES, _range("Strategie"), overlay,
)
after_ind = _emit_breakdown(
after_strat + 2, "PER INDICATOR (overlay: Hybrid + BE)", "Indicator",
INDICATORS, _range("Indicator"), overlay,
)
_emit_breakdown(
after_ind + 2, "PER DIRECȚIE (overlay: Hybrid + BE)", "Direcție",
DIRECTIONS, _range("Direcție"), overlay,
)
# Column widths
widths = {"A": 22, "B": 14, "C": 14, "D": 14, "E": 16, "F": 16, "G": 50}
for col, w in widths.items():
ws.column_dimensions[col].width = w
# Row height pentru rândurile cu hint (cu wrap)
for r in range(5, 5 + len(metrics)):
ws.row_dimensions[r].height = 22
# Equity curve chart — 5 linii
chart = LineChart()
chart.title = "Equity Curve — 5 strategii"
chart.style = 12
chart.y_axis.title = "Balance ($)"
chart.x_axis.title = "Trade #"
chart.height = 12
chart.width = 24
data = Reference(
wb["Trades"],
min_col=_col_to_int(COL[f"Bal_{STRAT_KEYS[0]}"]),
max_col=_col_to_int(COL[f"Bal_{STRAT_KEYS[-1]}"]),
min_row=1,
max_row=MAX_ROWS + 1,
)
chart.add_data(data, titles_from_data=True)
cats = Reference(
wb["Trades"], min_col=1, max_col=1,
min_row=2, max_row=MAX_ROWS + 1,
)
chart.set_categories(cats)
ws.add_chart(chart, "H4")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def build_workbook() -> Workbook:
wb = Workbook()
default = wb.active
wb.remove(default)
build_config(wb)
build_trades(wb)
build_dashboard(wb)
wb.active = wb.sheetnames.index("Dashboard")
return wb
def main() -> int:
OUTPUT.parent.mkdir(parents=True, exist_ok=True)
wb = build_workbook()
wb.save(OUTPUT)
print(f"Wrote {OUTPUT}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,134 +0,0 @@
"""Helper for manual M2D trade entry — derives full M2DExtraction dict from minimal user inputs.
User provides 6 required fields: data, ora_ro, directie, entry, sl, outcome_path.
All other fields default or are computed:
- tp0 = entry ± 0.4 × |entry - sl|
- tp1 = entry ± 0.6 × |entry - sl|
- tp2 = entry ± 1.0 × |entry - sl| (symmetric with sl)
- risc_pct = 100 × |entry - sl| / entry
- ora_utc = ora_ro converted via Europe/Bucharest (DST-aware)
- max_reached derived from outcome_path
- be_moved = True if outcome contains TP0 else False
- tf_mare/tf_mic default 5min/1min
- calitate default 'n/a'
- confidence = 'high' (manual entry)
- screenshot_file generated if not provided: <data>-<instrument>-<ora_ro>.png
"""
from __future__ import annotations
from datetime import date, datetime, time
from typing import Literal
from zoneinfo import ZoneInfo
RO_TZ = ZoneInfo("Europe/Bucharest")
UTC_TZ = ZoneInfo("UTC")
OUTCOME_TO_MAX_REACHED = {
"SL": "SL_first",
"TP0→SL": "TP0",
"TP0→TP1": "TP1",
"TP0→TP2": "TP2",
"TP0→pending": "TP0",
"pending": "SL_first", # placeholder; user can override
}
OUTCOME_TO_BE_MOVED = {
"SL": False,
"TP0→SL": True, # BE move should have happened; True = rule-enforced
"TP0→TP1": True,
"TP0→TP2": True,
"TP0→pending": True,
"pending": False,
}
def ro_to_utc(data_iso: str, ora_ro_str: str) -> str:
"""Convert (YYYY-MM-DD, HH:MM RO) -> HH:MM UTC string, DST-aware."""
d = date.fromisoformat(data_iso)
t = datetime.strptime(ora_ro_str, "%H:%M").time()
dt_ro = datetime.combine(d, t, tzinfo=RO_TZ)
dt_utc = dt_ro.astimezone(UTC_TZ)
return dt_utc.strftime("%H:%M")
def build_extraction(
data: str,
ora_ro: str,
directie: Literal["Buy", "Sell"],
entry: float,
sl: float,
outcome_path: Literal["SL", "TP0→SL", "TP0→TP1", "TP0→TP2", "TP0→pending", "pending"],
instrument: Literal["DIA", "US30", "other"] = "DIA",
tf_mare: Literal["5min", "15min"] = "5min",
tf_mic: Literal["1min", "3min"] = "1min",
calitate: Literal["Clară", "Mai mare ca impuls", "Slabă", "n/a"] = "n/a",
max_reached: Literal["SL_first", "TP0", "TP1", "TP2"] | None = None,
be_moved: bool | None = None,
screenshot_file: str | None = None,
note: str = "",
) -> dict:
"""Build a M2DExtraction-compatible dict from minimal manual inputs.
Derived fields:
- ora_utc from ora_ro (DST-aware)
- tp0/tp1/tp2 from entry/sl geometry
- risc_pct from |entry-sl|/entry
- max_reached/be_moved from outcome_path (overridable)
- screenshot_file generated from data+instrument+ora_ro if not provided
The returned dict satisfies scripts.vision_schema.M2DExtraction.
"""
if entry == sl:
raise ValueError("entry == sl — zero risk distance")
risk_abs = abs(entry - sl)
risc_pct = round(100 * risk_abs / entry, 4)
if directie == "Buy":
if sl >= entry:
raise ValueError(f"Buy: sl ({sl}) must be < entry ({entry})")
tp0 = round(entry + 0.4 * risk_abs, 4)
tp1 = round(entry + 0.6 * risk_abs, 4)
tp2 = round(entry + risk_abs, 4)
else: # Sell
if sl <= entry:
raise ValueError(f"Sell: sl ({sl}) must be > entry ({entry})")
tp0 = round(entry - 0.4 * risk_abs, 4)
tp1 = round(entry - 0.6 * risk_abs, 4)
tp2 = round(entry - risk_abs, 4)
ora_utc = ro_to_utc(data, ora_ro)
if max_reached is None:
max_reached = OUTCOME_TO_MAX_REACHED[outcome_path]
if be_moved is None:
be_moved = OUTCOME_TO_BE_MOVED[outcome_path]
if screenshot_file is None:
ora_compact = ora_ro.replace(":", "")
screenshot_file = f"{data}-{instrument.lower()}-{ora_compact}.png"
return {
"screenshot_file": screenshot_file,
"data": data,
"ora_utc": ora_utc,
"instrument": instrument,
"directie": directie,
"tf_mare": tf_mare,
"tf_mic": tf_mic,
"calitate": calitate,
"entry": round(float(entry), 4),
"sl": round(float(sl), 4),
"tp0": tp0,
"tp1": tp1,
"tp2": tp2,
"risc_pct": risc_pct,
"outcome_path": outcome_path,
"max_reached": max_reached,
"be_moved": be_moved,
"confidence": "high",
"ambiguities": [],
"note": note,
}

View File

@@ -1,76 +0,0 @@
"""P/L overlays for M2D backtesting.
Two overlays computed from the same trade outcome:
- ``pl_marius``: real overlay used by the trader. 50% closed at TP0 (+0.2 R),
BE move on the remaining half, then close 50% of that at ~TP1 (+0.3 R total
contribution) or at SL/BE depending on outcome. TP1 is treated as the final
exit even when the chart subsequently reaches TP2.
- ``pl_theoretical``: reference 1/3-1/3-1/3 overlay that holds to TP2. Used
as an opportunity-cost benchmark vs. ``pl_marius``.
Returns are expressed in multiples of R (risk per trade). ``None`` from
``pl_marius`` denotes a still-pending trade.
"""
from __future__ import annotations
__all__ = [
"PL_MARIUS_TABLE",
"PL_THEORETICAL_TABLE",
"pl_marius",
"pl_theoretical",
]
PL_MARIUS_TABLE: dict[tuple[str, bool], float | None] = {
("SL", True): -1.0,
("SL", False): -1.0,
("TP0->SL", True): 0.20,
("TP0->SL", False): -0.30,
("TP0->TP1", True): 0.50,
("TP0->TP1", False): 0.50,
("TP0->TP2", True): 0.50,
("TP0->TP2", False): 0.50,
("TP0->pending", True): None,
("TP0->pending", False): None,
("pending", True): None,
("pending", False): None,
}
PL_THEORETICAL_TABLE: dict[str, float] = {
"SL_first": -1.0,
"TP0": 0.133,
"TP1": 0.333,
"TP2": 0.667,
}
_VALID_OUTCOME_PATHS: frozenset[str] = frozenset(
{"SL", "TP0->SL", "TP0->TP1", "TP0->TP2", "TP0->pending", "pending"}
)
def _normalize_outcome_path(outcome_path: str) -> str:
return outcome_path.replace("", "->").replace("", "->")
def pl_marius(outcome_path: str, be_moved: bool) -> float | None:
"""Return the P/L (in R) for the real Marius overlay.
Accepts both ASCII arrow ``"TP0->TP1"`` and unicode arrow ``"TP0→TP1"``.
Returns ``None`` for pending outcomes.
"""
normalized = _normalize_outcome_path(outcome_path)
if normalized not in _VALID_OUTCOME_PATHS:
raise ValueError(f"invalid outcome_path: {outcome_path!r}")
return PL_MARIUS_TABLE[(normalized, be_moved)]
def pl_theoretical(max_reached: str) -> float:
"""Return the P/L (in R) for the theoretical 1/3-1/3-1/3 hold-to-TP2 overlay."""
if max_reached not in PL_THEORETICAL_TABLE:
raise ValueError(f"invalid max_reached: {max_reached!r}")
return PL_THEORETICAL_TABLE[max_reached]

View File

@@ -1,240 +0,0 @@
"""Regenerate ``data/jurnal.md`` from ``data/jurnal.csv``.
CSV is the source of truth (29 columns, schema owned by ``scripts.append_row``).
MD is a human-readable mirror with a curated 18-column table.
CLI: ``python scripts/regenerate_md.py [csv_path] [md_path]``
"""
from __future__ import annotations
import csv
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Sequence
from scripts.append_row import csv_columns
__all__ = ["MD_COLUMNS", "regenerate_md", "main"]
MD_COLUMNS: tuple[str, ...] = (
"#",
"Data",
"Zi",
"Ora RO",
"Set",
"Instrument",
"Direcție",
"Calitate",
"Entry",
"SL",
"TP0",
"TP1",
"TP2",
"outcome_path",
"P/L (Marius)",
"P/L (theoretic)",
"Source",
"Note",
)
_CSV_FIELDS_USED: tuple[str, ...] = (
"id",
"data",
"zi",
"ora_ro",
"set",
"instrument",
"directie",
"calitate",
"entry",
"sl",
"tp0",
"tp1",
"tp2",
"outcome_path",
"pl_marius",
"pl_theoretical",
"source",
"note",
)
_DIRECTIE_DISPLAY = {"long": "Buy", "short": "Sell", "buy": "Buy", "sell": "Sell"}
def _fmt_pl(value: str) -> str:
if value is None or value == "":
return "pending"
try:
return f"{float(value):+.2f}"
except ValueError:
return value
def _fmt_directie(value: str) -> str:
if not value:
return ""
return _DIRECTIE_DISPLAY.get(value.strip().lower(), value)
def _escape_cell(value: str) -> str:
return (value or "").replace("|", "\\|").replace("\n", " ").strip()
def _placeholder_md() -> str:
return (
"# Jurnal M2D (auto-generated)\n"
"\n"
"*Niciun trade încă. Adaugă unul prin `/m2d-log` sau `/backtest`.*\n"
)
def _atomic_write_text(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_name = tempfile.mkstemp(
prefix=path.name + ".", suffix=".tmp", dir=str(path.parent)
)
try:
with os.fdopen(fd, "w", encoding="utf-8", newline="\n") as fh:
fh.write(content)
os.replace(tmp_name, path)
except Exception:
try:
os.unlink(tmp_name)
except OSError:
pass
raise
def _row_to_cells(row: dict[str, str], display_index: int) -> tuple[str, ...]:
g = row.get
return (
str(display_index),
g("data", "") or "",
g("zi", "") or "",
g("ora_ro", "") or "",
g("set", "") or "",
g("instrument", "") or "",
_fmt_directie(g("directie", "") or ""),
g("calitate", "") or "",
g("entry", "") or "",
g("sl", "") or "",
g("tp0", "") or "",
g("tp1", "") or "",
g("tp2", "") or "",
g("outcome_path", "") or "",
_fmt_pl(g("pl_marius", "") or ""),
_fmt_pl(g("pl_theoretical", "") or ""),
g("source", "") or "",
g("note", "") or "",
)
def _render_table(rows: Sequence[dict[str, str]]) -> str:
header_line = "| " + " | ".join(MD_COLUMNS) + " |"
sep_line = "|" + "|".join(["---"] * len(MD_COLUMNS)) + "|"
data_lines = []
for i, row in enumerate(rows, start=1):
cells = _row_to_cells(row, i)
data_lines.append(
"| " + " | ".join(_escape_cell(c) for c in cells) + " |"
)
return "\n".join([header_line, sep_line, *data_lines])
def _render_md(rows: Sequence[dict[str, str]]) -> str:
if not rows:
return _placeholder_md()
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
table = _render_table(rows)
return (
"# Jurnal M2D (auto-generated from data/jurnal.csv)\n"
"\n"
f"Generated: {now_iso}\n"
f"Rows: {len(rows)}\n"
"\n"
f"{table}\n"
"\n"
"*Vezi `data/jurnal.csv` pentru toate cele 29 coloane "
"(id, ora_utc, tf_*, risc_pct, be_moved, max_reached, versions, extracted_at).*\n"
)
def _id_sort_key(raw: str) -> tuple[int, int | str]:
try:
return (0, int(raw))
except (ValueError, TypeError):
return (1, raw or "")
def _load_rows(csv_path: Path) -> list[dict[str, str]]:
"""Read CSV, returning rows sorted by id.
Schema drift handling:
- Extra header columns → warning to stderr, dropped.
- Missing required header columns → warning to stderr per affected row (row skipped).
"""
if not csv_path.exists() or csv_path.stat().st_size == 0:
return []
expected = set(csv_columns())
required = set(_CSV_FIELDS_USED)
with csv_path.open("r", encoding="utf-8", newline="") as fh:
reader = csv.DictReader(fh)
header = reader.fieldnames or []
header_set = set(header)
extras = [c for c in header if c not in expected]
if extras:
print(
f"regenerate_md: warning: unknown CSV columns ignored: {extras}",
file=sys.stderr,
)
missing_required = required - header_set
rows: list[dict[str, str]] = []
for raw in reader:
if missing_required:
print(
f"regenerate_md: warning: row skipped (missing required "
f"columns: {sorted(missing_required)})",
file=sys.stderr,
)
continue
rows.append({k: (raw.get(k) or "") for k in required})
rows.sort(key=lambda r: _id_sort_key(r.get("id", "")))
return rows
def regenerate_md(
csv_path: Path | str = "data/jurnal.csv",
md_path: Path | str = "data/jurnal.md",
) -> int:
"""Read CSV → write MD atomically. Returns count of trade rows written."""
csv_p = Path(csv_path)
md_p = Path(md_path)
rows = _load_rows(csv_p)
content = _render_md(rows)
_atomic_write_text(md_p, content)
return len(rows)
def main() -> int:
args = sys.argv[1:]
csv_arg = args[0] if len(args) >= 1 else "data/jurnal.csv"
md_arg = args[1] if len(args) >= 2 else "data/jurnal.md"
n = regenerate_md(csv_arg, md_arg)
print(f"regenerate_md: wrote {md_arg} with {n} row(s)")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,551 +0,0 @@
"""Backtest statistics for ``data/jurnal.csv``.
Public API:
- ``compute_stats(csv_path, overlay) -> dict``
- ``render_stats(stats, overlay) -> str``
- ``compute_calibration(csv_path) -> dict``
- ``render_calibration(cal) -> str``
- ``main()`` — CLI entry point.
A "win" is a closed trade with ``pl_overlay > 0`` (where ``pl_overlay`` is
either ``pl_marius`` or ``pl_theoretical``). Pending trades — ``pl_marius``
blank, i.e. ``outcome_path in {pending, TP0->pending}`` — are excluded from
both WR and expectancy: there is no realised outcome yet.
The ``calitate`` field is a known-biased descriptor: it is classified
post-outcome (see ``STOPPING_RULE.md`` §3). The per-``calitate`` split is
reported with an explicit *descriptor only — biased post-outcome* caveat.
"""
from __future__ import annotations
import argparse
import csv
import math
import sys
from pathlib import Path
from typing import Any, Iterable
import numpy as np
from scripts.append_row import CSV_COLUMNS
__all__ = [
"BACKTEST_SOURCES",
"CALIBRATION_SOURCES",
"CORE_CALIBRATION_FIELDS",
"NUMERIC_CALIBRATION_FIELDS",
"STOPPING_RULE_N",
"wilson_ci",
"bootstrap_expectancy_ci",
"compute_stats",
"render_stats",
"compute_calibration",
"render_calibration",
"main",
]
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BACKTEST_SOURCES: frozenset[str] = frozenset({"vision", "manual"})
CALIBRATION_SOURCES: frozenset[str] = frozenset(
{"manual_calibration", "vision_calibration"}
)
# Calibration P4 gate (STOPPING_RULE.md §P4) — explicitly reported per field.
CORE_CALIBRATION_FIELDS: tuple[str, ...] = (
"entry",
"sl",
"tp0",
"tp1",
"tp2",
"outcome_path",
"max_reached",
"directie",
"instrument",
)
NUMERIC_CALIBRATION_FIELDS: frozenset[str] = frozenset(
{"entry", "sl", "tp0", "tp1", "tp2"}
)
# STOPPING_RULE.md §"GO LIVE" gate: N >= 40 per Set.
STOPPING_RULE_N: int = 40
# ---------------------------------------------------------------------------
# Loading
# ---------------------------------------------------------------------------
def _parse_optional_float(value: str) -> float | None:
s = (value or "").strip()
if s == "":
return None
try:
return float(s)
except ValueError:
return None
def _load_rows(csv_path: Path | str) -> list[dict[str, str]]:
p = Path(csv_path)
if not p.exists() or p.stat().st_size == 0:
return []
with p.open("r", encoding="utf-8", newline="") as fh:
return list(csv.DictReader(fh))
# ---------------------------------------------------------------------------
# CI primitives
# ---------------------------------------------------------------------------
def wilson_ci(wins: int, n: int, z: float = 1.96) -> tuple[float, float]:
"""Wilson score interval for a binomial proportion.
Returns ``(lo, hi)`` clamped to ``[0.0, 1.0]``. For ``n == 0`` returns
``(0.0, 0.0)``. ``z = 1.96`` ≈ 95% confidence.
"""
if n <= 0:
return (0.0, 0.0)
if wins < 0 or wins > n:
raise ValueError(f"wins={wins} out of range for n={n}")
p = wins / n
denom = 1.0 + (z * z) / n
center = (p + (z * z) / (2.0 * n)) / denom
spread = z * math.sqrt(p * (1.0 - p) / n + (z * z) / (4.0 * n * n)) / denom
return (max(0.0, center - spread), min(1.0, center + spread))
def bootstrap_expectancy_ci(
values: list[float] | np.ndarray,
n_resamples: int = 5000,
seed: int = 42,
) -> tuple[float, float]:
"""Percentile-method bootstrap 95% CI for the mean of ``values``.
Deterministic for a given ``seed``. Empty input → ``(0.0, 0.0)``.
Single value → ``(value, value)`` (no variance to resample).
"""
arr = np.asarray(list(values), dtype=float)
if arr.size == 0:
return (0.0, 0.0)
if arr.size == 1:
v = float(arr[0])
return (v, v)
rng = np.random.default_rng(seed)
boots = np.empty(n_resamples, dtype=float)
n = arr.size
for i in range(n_resamples):
idx = rng.integers(0, n, size=n)
boots[i] = float(arr[idx].mean())
lo = float(np.percentile(boots, 2.5))
hi = float(np.percentile(boots, 97.5))
return (lo, hi)
# ---------------------------------------------------------------------------
# compute_stats
# ---------------------------------------------------------------------------
def _group_stats(
overlay_values: list[float | None],
*,
include_ci: bool,
bootstrap_seed: int,
) -> dict[str, Any]:
closed = [v for v in overlay_values if v is not None]
n = len(closed)
wins = sum(1 for v in closed if v > 0)
wr = (wins / n) if n else 0.0
out: dict[str, Any] = {
"n": n,
"wr": wr,
"expectancy": (sum(closed) / n) if n else 0.0,
}
if include_ci:
out["wr_ci_95"] = wilson_ci(wins, n)
out["expectancy_ci_95"] = bootstrap_expectancy_ci(
closed, seed=bootstrap_seed
)
return out
def _overlay_value(row: dict[str, str], overlay: str) -> float | None:
raw = row.get(overlay, "")
return _parse_optional_float(raw)
def compute_stats(
csv_path: Path | str = "data/jurnal.csv",
overlay: str = "pl_marius",
) -> dict[str, Any]:
"""Compute aggregate WR + expectancy stats over the backtest rows.
Calibration rows (``manual_calibration`` / ``vision_calibration``) are
excluded; use :func:`compute_calibration` for the P4 mismatch report.
``overlay`` selects the P/L column: ``"pl_marius"`` (default — the real
overlay Marius trades) or ``"pl_theoretical"`` (1/3-1/3-1/3 hold-to-TP2).
"""
if overlay not in {"pl_marius", "pl_theoretical"}:
raise ValueError(f"unknown overlay {overlay!r}")
rows = [r for r in _load_rows(csv_path) if r.get("source", "") in BACKTEST_SOURCES]
if not rows:
return {
"n_total": 0,
"n_pending": 0,
"n_closed": 0,
"wr": 0.0,
"wr_ci_95": (0.0, 0.0),
"expectancy": 0.0,
"expectancy_ci_95": (0.0, 0.0),
"per_set": {},
"per_calitate": {},
"per_directie": {},
}
# Pending status is overlay-independent: a trade is pending iff
# pl_marius is blank (outcome_path in {pending, TP0->pending}).
# pl_theoretical is concrete even for pending rows, so it would otherwise
# let pending trades sneak into the closed-trades stats — we mask those
# out explicitly here.
pending_mask = [_parse_optional_float(r.get("pl_marius", "")) is None for r in rows]
overlay_vals: list[float | None] = []
for r, is_pending in zip(rows, pending_mask):
overlay_vals.append(None if is_pending else _overlay_value(r, overlay))
n_total = len(rows)
n_pending = sum(1 for p in pending_mask if p)
n_closed = n_total - n_pending
overall = _group_stats(
overlay_vals, include_ci=True, bootstrap_seed=42
)
def _split(field: str, include_ci: bool) -> dict[str, dict[str, Any]]:
groups: dict[str, list[float | None]] = {}
for r, v in zip(rows, overlay_vals):
key = r.get(field, "") or "(blank)"
groups.setdefault(key, []).append(v)
out: dict[str, dict[str, Any]] = {}
for k in sorted(groups):
sub_seed = 42 + (abs(hash(("split", field, k))) % 1_000_000)
out[k] = _group_stats(
groups[k], include_ci=include_ci, bootstrap_seed=sub_seed
)
return out
return {
"n_total": n_total,
"n_pending": n_pending,
"n_closed": n_closed,
"wr": overall["wr"],
"wr_ci_95": overall["wr_ci_95"],
"expectancy": overall["expectancy"],
"expectancy_ci_95": overall["expectancy_ci_95"],
"per_set": _split("set", include_ci=True),
"per_calitate": _split("calitate", include_ci=True),
# per_directie skips CI per spec (no wr_ci_95 / expectancy_ci_95 keys).
"per_directie": {
k: {"n": v["n"], "wr": v["wr"], "expectancy": v["expectancy"]}
for k, v in _split("directie", include_ci=False).items()
},
}
# ---------------------------------------------------------------------------
# render_stats
# ---------------------------------------------------------------------------
def _fmt_pct(p: float) -> str:
return f"{100.0 * p:5.1f}%"
def _fmt_r(x: float) -> str:
return f"{x:+.2f} R"
def _set_sort_key(name: str) -> tuple[int, str]:
order = ["A1", "A2", "A3", "B", "C", "D", "Other"]
return (order.index(name), name) if name in order else (len(order), name)
def render_stats(stats: dict[str, Any], overlay: str) -> str:
lines: list[str] = []
lines.append(f"=== Stats jurnal.csv (overlay: {overlay}) ===")
lines.append(
f"Trade-uri totale: {stats['n_total']} | "
f"închise: {stats['n_closed']} | pending: {stats['n_pending']}"
)
if stats["n_total"] == 0:
lines.append("")
lines.append("(nu sunt trade-uri backtest în CSV)")
return "\n".join(lines) + "\n"
lines.append("")
lo, hi = stats["wr_ci_95"]
e_lo, e_hi = stats["expectancy_ci_95"]
lines.append(f"GLOBAL (n={stats['n_closed']}):")
lines.append(
f" WR: {_fmt_pct(stats['wr'])} "
f"[95% CI: {_fmt_pct(lo)}, {_fmt_pct(hi)}]"
)
lines.append(
f" Expectancy: {_fmt_r(stats['expectancy'])} "
f"[95% CI: {_fmt_r(e_lo)}, {_fmt_r(e_hi)}]"
)
lines.append("")
def _emit_split(
title: str,
data: dict[str, dict[str, Any]],
*,
sort_keys: list[str] | None = None,
include_ci: bool = True,
) -> None:
lines.append(title)
keys = sort_keys if sort_keys is not None else sorted(data)
for k in keys:
if k not in data:
continue
d = data[k]
if include_ci and "wr_ci_95" in d:
clo, chi = d["wr_ci_95"]
lines.append(
f" {k:<14} n={d['n']:>3} "
f"WR {_fmt_pct(d['wr'])} "
f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] "
f"E {_fmt_r(d['expectancy'])}"
)
else:
lines.append(
f" {k:<14} n={d['n']:>3} "
f"WR {_fmt_pct(d['wr'])} "
f"E {_fmt_r(d['expectancy'])}"
)
lines.append("")
_emit_split(
"PER SET:",
stats["per_set"],
sort_keys=sorted(stats["per_set"], key=_set_sort_key),
)
lines.append(
"PER CALITATE (⚠️ DESCRIPTOR ONLY — biased post-outcome, NU folosi ca filtru):"
)
cal_order = ["Clară", "Mai mare ca impuls", "Slabă", "n/a"]
keys = [k for k in cal_order if k in stats["per_calitate"]] + [
k for k in sorted(stats["per_calitate"]) if k not in cal_order
]
for k in keys:
d = stats["per_calitate"][k]
clo, chi = d["wr_ci_95"]
lines.append(
f" {k:<20} n={d['n']:>3} "
f"WR {_fmt_pct(d['wr'])} "
f"[{_fmt_pct(clo)}, {_fmt_pct(chi)}] "
f"E {_fmt_r(d['expectancy'])}"
)
lines.append("")
_emit_split("PER DIRECȚIE:", stats["per_directie"], include_ci=False)
# STOPPING_RULE gate check — flag every Set that hasn't crossed N>=40.
lines.append(f"⚠️ STOPPING RULE check (vezi STOPPING_RULE.md, N>={STOPPING_RULE_N}):")
set_keys = sorted(stats["per_set"], key=_set_sort_key)
any_flagged = False
for k in set_keys:
n = stats["per_set"][k]["n"]
if n < STOPPING_RULE_N:
lines.append(f" {k}: N={n} < {STOPPING_RULE_N} → NEEDS MORE DATA")
any_flagged = True
if not any_flagged:
lines.append(f" toate Set-urile au N>={STOPPING_RULE_N} (eligibile pentru GO LIVE check).")
return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# compute_calibration
# ---------------------------------------------------------------------------
def _calibration_match(field: str, m_val: str, v_val: str, tol: float = 0.01) -> bool:
if field in NUMERIC_CALIBRATION_FIELDS:
try:
return abs(float(m_val) - float(v_val)) <= tol
except ValueError:
return (m_val or "").strip() == (v_val or "").strip()
return (m_val or "").strip() == (v_val or "").strip()
def compute_calibration(
csv_path: Path | str = "data/jurnal.csv",
) -> dict[str, Any]:
"""Pair calibration legs by ``screenshot_file`` and report per-field mismatch.
Returns a dict ``{"n_pairs": int, "fields": {field: {match, mismatch,
match_rate, mismatch_examples}}}``. ``mismatch_examples`` holds up to 3
strings ``"<screenshot_file>: manual=X vs vision=Y"`` per field.
Numeric fields (``entry/sl/tp0/tp1/tp2``) use a tolerance of 0.01;
everything else is exact-string equality after strip.
"""
rows = _load_rows(csv_path)
manual: dict[str, dict[str, str]] = {}
vision: dict[str, dict[str, str]] = {}
for r in rows:
src = r.get("source", "")
if src == "manual_calibration":
manual[r.get("screenshot_file", "")] = r
elif src == "vision_calibration":
vision[r.get("screenshot_file", "")] = r
paired_files = sorted(set(manual) & set(vision))
fields_report: dict[str, dict[str, Any]] = {
f: {
"match": 0,
"mismatch": 0,
"match_rate": 0.0,
"mismatch_examples": [],
}
for f in CORE_CALIBRATION_FIELDS
}
for f in paired_files:
m = manual[f]
v = vision[f]
for fld in CORE_CALIBRATION_FIELDS:
mv = m.get(fld, "")
vv = v.get(fld, "")
if _calibration_match(fld, mv, vv):
fields_report[fld]["match"] += 1
else:
fields_report[fld]["mismatch"] += 1
examples = fields_report[fld]["mismatch_examples"]
if len(examples) < 3:
examples.append(f"{f}: manual={mv!r} vs vision={vv!r}")
for fld, data in fields_report.items():
total = data["match"] + data["mismatch"]
data["match_rate"] = (data["match"] / total) if total else 0.0
return {"n_pairs": len(paired_files), "fields": fields_report}
def render_calibration(cal: dict[str, Any]) -> str:
lines: list[str] = []
lines.append("=== Calibration P4 gate (vezi STOPPING_RULE.md §P4) ===")
lines.append(f"Perechi calibration: {cal['n_pairs']}")
if cal["n_pairs"] == 0:
lines.append("(nu există perechi manual_calibration ∩ vision_calibration)")
return "\n".join(lines) + "\n"
lines.append("")
lines.append(f"{'field':<14} match mismatch rate")
total_mismatches = 0
total_comparisons = 0
for fld in CORE_CALIBRATION_FIELDS:
d = cal["fields"][fld]
n = d["match"] + d["mismatch"]
total_mismatches += d["mismatch"]
total_comparisons += n
lines.append(
f"{fld:<14} {d['match']:>5} {d['mismatch']:>8} "
f"{_fmt_pct(d['match_rate'])}"
)
lines.append("")
overall_match_rate = (
(total_comparisons - total_mismatches) / total_comparisons
if total_comparisons
else 0.0
)
overall_mismatch_rate = 1.0 - overall_match_rate
verdict = "PASS" if overall_mismatch_rate <= 0.10 else "FAIL"
lines.append(
f"Overall mismatch rate: {_fmt_pct(overall_mismatch_rate)} "
f"({total_mismatches}/{total_comparisons}) → P4 gate: {verdict}"
)
has_examples = any(
cal["fields"][f]["mismatch_examples"] for f in CORE_CALIBRATION_FIELDS
)
if has_examples:
lines.append("")
lines.append("Mismatch examples (max 3 per field):")
for fld in CORE_CALIBRATION_FIELDS:
ex = cal["fields"][fld]["mismatch_examples"]
if not ex:
continue
lines.append(f" [{fld}]")
for e in ex:
lines.append(f" - {e}")
return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="stats",
description="Backtest statistics for data/jurnal.csv",
)
parser.add_argument(
"--csv",
type=Path,
default=Path("data/jurnal.csv"),
help="Path to the jurnal CSV (default: data/jurnal.csv).",
)
parser.add_argument(
"--overlay",
choices=("pl_marius", "pl_theoretical"),
default="pl_marius",
help="Which P/L overlay to use (default: pl_marius).",
)
parser.add_argument(
"--calibration",
action="store_true",
help="Show P4 calibration mismatch report instead of backtest stats.",
)
args = parser.parse_args(argv)
try:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore[attr-defined]
except (AttributeError, OSError):
pass
if args.calibration:
cal = compute_calibration(args.csv)
sys.stdout.write(render_calibration(cal))
else:
stats = compute_stats(args.csv, overlay=args.overlay)
sys.stdout.write(render_stats(stats, args.overlay))
return 0
if __name__ == "__main__":
raise SystemExit(main())
# Ensure the canonical CSV schema is importable from one place — fail fast if
# someone removes append_row.CSV_COLUMNS that this module depends on.
assert CSV_COLUMNS is not None

View File

@@ -1,125 +0,0 @@
"""Pydantic schema for the M2D vision-extraction JSON returned by the vision subagent."""
from __future__ import annotations
import re
from datetime import date as date_type, datetime, timezone
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, model_validator
_DATA_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$")
_ORA_PATTERN = re.compile(r"^\d{2}:\d{2}$")
class M2DExtraction(BaseModel):
model_config = ConfigDict(extra="forbid")
screenshot_file: str
data: str
ora_utc: str
instrument: Literal["DIA", "US30", "other"]
directie: Literal["Buy", "Sell"]
tf_mare: Literal["5min", "15min"]
tf_mic: Literal["1min", "3min"]
calitate: Literal["Clară", "Mai mare ca impuls", "Slabă", "n/a"]
entry: float
sl: float
tp0: float
tp1: float
tp2: float
risc_pct: float
outcome_path: Literal[
"SL", "TP0→SL", "TP0→TP1", "TP0→TP2", "TP0→pending", "pending"
]
max_reached: Literal["SL_first", "TP0", "TP1", "TP2"]
be_moved: bool
confidence: Literal["high", "medium", "low"]
ambiguities: list[str] = Field(default_factory=list)
note: str = ""
@model_validator(mode="after")
def _validate_data_format(self) -> "M2DExtraction":
if not _DATA_PATTERN.match(self.data):
raise ValueError(
f"data must match YYYY-MM-DD, got {self.data!r}"
)
try:
parsed = date_type.fromisoformat(self.data)
except ValueError as exc:
raise ValueError(f"data is not a valid ISO date: {self.data!r}") from exc
today = datetime.now(timezone.utc).date()
if parsed > today:
raise ValueError(
f"data {self.data!r} is in the future (today UTC: {today.isoformat()})"
)
return self
@model_validator(mode="after")
def _validate_ora_utc_format(self) -> "M2DExtraction":
if not _ORA_PATTERN.match(self.ora_utc):
raise ValueError(
f"ora_utc must match HH:MM, got {self.ora_utc!r}"
)
try:
datetime.strptime(self.ora_utc, "%H:%M")
except ValueError as exc:
raise ValueError(
f"ora_utc is not a valid HH:MM time: {self.ora_utc!r}"
) from exc
return self
@model_validator(mode="after")
def _validate_entry_ne_sl(self) -> "M2DExtraction":
if self.entry == self.sl:
raise ValueError("entry must not equal sl (zero risk distance)")
return self
@model_validator(mode="after")
def _validate_tp_ordering(self) -> "M2DExtraction":
if self.directie == "Buy":
if not (self.sl < self.entry < self.tp0 < self.tp1 < self.tp2):
raise ValueError(
"for Buy, required: sl < entry < tp0 < tp1 < tp2 "
f"(got sl={self.sl}, entry={self.entry}, tp0={self.tp0}, "
f"tp1={self.tp1}, tp2={self.tp2})"
)
else:
if not (self.sl > self.entry > self.tp0 > self.tp1 > self.tp2):
raise ValueError(
"for Sell, required: sl > entry > tp0 > tp1 > tp2 "
f"(got sl={self.sl}, entry={self.entry}, tp0={self.tp0}, "
f"tp1={self.tp1}, tp2={self.tp2})"
)
return self
@model_validator(mode="after")
def _validate_outcome_max_consistency(self) -> "M2DExtraction":
op = self.outcome_path
mr = self.max_reached
if op == "SL":
if mr != "SL_first":
raise ValueError(
f"outcome_path='SL' requires max_reached='SL_first', got {mr!r}"
)
elif op.startswith("TP0"):
if mr not in {"TP0", "TP1", "TP2"}:
raise ValueError(
f"outcome_path={op!r} requires max_reached in "
f"{{TP0, TP1, TP2}}, got {mr!r}"
)
# op == "pending" → any max_reached accepted
return self
def parse_extraction(json_str: str) -> M2DExtraction:
"""Parse a JSON string into an M2DExtraction.
Raises pydantic.ValidationError on invalid input.
"""
return M2DExtraction.model_validate_json(json_str)
def parse_extraction_dict(d: dict) -> M2DExtraction:
"""Validate a dict against the M2DExtraction schema."""
return M2DExtraction.model_validate(d)