scripts: pl_calc, vision_schema, calendar_parse + tests (67 passing)

This commit is contained in:
Marius
2026-05-13 12:36:32 +03:00
parent 31dcb4abe3
commit 6ae659605e
7 changed files with 872 additions and 0 deletions

181
scripts/calendar_parse.py Normal file
View File

@@ -0,0 +1,181 @@
"""Calendar parsing + Set classification for M2D backtesting.
Each trade is tagged with a ``Set`` derived from its date, RO-local time, and the
economic-event calendar:
- ``A1``: 16:35-17:00 RO, Tue/Wed/Thu
- ``A2``: 17:00-18:00 RO, Tue/Wed/Thu (sweet spot)
- ``A3``: 18:00-19:00 RO, Tue/Wed/Thu
- ``B`` : 22:00-22:45 RO, Tue/Wed/Thu
- ``C`` : inside the window of an event with severity in {extrem, mare}
- ``D`` : Mon or Fri
- ``Other``: anything else
Priority: C > D > A1/A2/A3/B > Other.
"""
from __future__ import annotations
from datetime import date, datetime, time
from pathlib import Path
from typing import Any
import yaml
from zoneinfo import ZoneInfo
__all__ = [
"RO_TZ",
"UTC_TZ",
"utc_to_ro",
"load_calendar",
"is_in_news_window",
"calc_set",
]
RO_TZ = ZoneInfo("Europe/Bucharest")
UTC_TZ = ZoneInfo("UTC")
_DAY_SHORT = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")
_HIGH_SEVERITY = frozenset({"extrem", "mare"})
_WEEKLY_DAY_MAP = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6,
}
def utc_to_ro(date_str: str, ora_utc_str: str) -> tuple[date, time, str]:
"""Convert ``(YYYY-MM-DD, HH:MM UTC)`` to ``(date_ro, time_ro, day_short)``.
DST-aware via :mod:`zoneinfo`. ``day_short`` is one of
``Mon Tue Wed Thu Fri Sat Sun``.
"""
dt_utc = datetime.strptime(f"{date_str} {ora_utc_str}", "%Y-%m-%d %H:%M").replace(
tzinfo=UTC_TZ
)
dt_ro = dt_utc.astimezone(RO_TZ)
return dt_ro.date(), dt_ro.time().replace(second=0, microsecond=0), _DAY_SHORT[dt_ro.weekday()]
def load_calendar(path: Path | str = "calendar_evenimente.yaml") -> list[dict[str, Any]]:
"""Load a YAML calendar file.
Validates ``schema_version == 1`` and returns the list of event dicts under
the top-level ``events`` key.
"""
p = Path(path)
with p.open("r", encoding="utf-8") as fh:
doc = yaml.safe_load(fh)
if not isinstance(doc, dict):
raise ValueError(f"calendar file {p} is not a mapping")
version = doc.get("schema_version")
if version != 1:
raise ValueError(
f"unsupported calendar schema_version: {version!r} (expected 1)"
)
events = doc.get("events") or []
if not isinstance(events, list):
raise ValueError(f"calendar events must be a list, got {type(events).__name__}")
return events
def _minutes(t: time) -> int:
return t.hour * 60 + t.minute
def _parse_hhmm(s: str) -> time:
return datetime.strptime(s, "%H:%M").time()
def _is_first_friday_of_month(d: date) -> bool:
return d.weekday() == 4 and d.day <= 7
def _event_matches_date(event: dict[str, Any], d: date) -> bool:
cadence = event.get("cadence", "")
if cadence == "scheduled":
ev_date_raw = event.get("date")
if isinstance(ev_date_raw, date):
ev_date = ev_date_raw
elif isinstance(ev_date_raw, str):
ev_date = datetime.strptime(ev_date_raw, "%Y-%m-%d").date()
else:
return False
return ev_date == d
if cadence == "first_friday_monthly":
return _is_first_friday_of_month(d)
if cadence.startswith("weekly_"):
day_name = cadence[len("weekly_") :].lower()
target = _WEEKLY_DAY_MAP.get(day_name)
if target is None:
return False
return d.weekday() == target
# cadences below are not pinned down to a precise calendar day yet, so we
# do not trigger Set C for them. ADP pre-NFP is also explicitly deferred.
return False
def is_in_news_window(d: date, t: time, calendar: list[dict[str, Any]]) -> bool:
"""Return True iff ``(d, t)`` falls inside the window of a high-severity event.
Window: ``[time_ro - window_before_min, time_ro + window_after_min]`` (inclusive
on both ends). Only events with ``severity`` in ``{extrem, mare}`` count.
Cadences honoured: ``scheduled``, ``first_friday_monthly``, ``weekly_<day>``.
Other cadences (``monthly_mid``, ``monthly_end``, ``monthly_15``,
``wednesday_pre_nfp``, ``monthly_first_week`` etc.) are deferred and never
trigger Set C.
"""
t_min = _minutes(t)
for event in calendar:
if event.get("severity") not in _HIGH_SEVERITY:
continue
if not _event_matches_date(event, d):
continue
ev_time_raw = event.get("time_ro")
if isinstance(ev_time_raw, time):
ev_time = ev_time_raw
elif isinstance(ev_time_raw, str):
ev_time = _parse_hhmm(ev_time_raw)
else:
continue
center = _minutes(ev_time)
before = int(event.get("window_before_min", 0))
after = int(event.get("window_after_min", 0))
if center - before <= t_min <= center + after:
return True
return False
def _in_range(t: time, lo: time, hi: time) -> bool:
"""Half-open ``[lo, hi)`` containment."""
return _minutes(lo) <= _minutes(t) < _minutes(hi)
def calc_set(d: date, t: time, day_of_week: str, calendar: list[dict[str, Any]]) -> str:
"""Classify a trade into one of ``A1 A2 A3 B C D Other``.
Priority: ``C`` (news) > ``D`` (Mon/Fri) > ``A1/A2/A3/B`` (time bands on
Tue/Wed/Thu) > ``Other``.
"""
if is_in_news_window(d, t, calendar):
return "C"
if day_of_week in ("Mon", "Fri"):
return "D"
if day_of_week in ("Tue", "Wed", "Thu"):
if _in_range(t, time(16, 35), time(17, 0)):
return "A1"
if _in_range(t, time(17, 0), time(18, 0)):
return "A2"
if _in_range(t, time(18, 0), time(19, 0)):
return "A3"
if _in_range(t, time(22, 0), time(22, 45)):
return "B"
return "Other"

76
scripts/pl_calc.py Normal file
View File

@@ -0,0 +1,76 @@
"""P/L overlays for M2D backtesting.
Two overlays computed from the same trade outcome:
- ``pl_marius``: real overlay used by the trader. 50% closed at TP0 (+0.2 R),
BE move on the remaining half, then close 50% of that at ~TP1 (+0.3 R total
contribution) or at SL/BE depending on outcome. TP1 is treated as the final
exit even when the chart subsequently reaches TP2.
- ``pl_theoretical``: reference 1/3-1/3-1/3 overlay that holds to TP2. Used
as an opportunity-cost benchmark vs. ``pl_marius``.
Returns are expressed in multiples of R (risk per trade). ``None`` from
``pl_marius`` denotes a still-pending trade.
"""
from __future__ import annotations
__all__ = [
"PL_MARIUS_TABLE",
"PL_THEORETICAL_TABLE",
"pl_marius",
"pl_theoretical",
]
PL_MARIUS_TABLE: dict[tuple[str, bool], float | None] = {
("SL", True): -1.0,
("SL", False): -1.0,
("TP0->SL", True): 0.20,
("TP0->SL", False): -0.30,
("TP0->TP1", True): 0.50,
("TP0->TP1", False): 0.50,
("TP0->TP2", True): 0.50,
("TP0->TP2", False): 0.50,
("TP0->pending", True): None,
("TP0->pending", False): None,
("pending", True): None,
("pending", False): None,
}
PL_THEORETICAL_TABLE: dict[str, float] = {
"SL_first": -1.0,
"TP0": 0.133,
"TP1": 0.333,
"TP2": 0.667,
}
_VALID_OUTCOME_PATHS: frozenset[str] = frozenset(
{"SL", "TP0->SL", "TP0->TP1", "TP0->TP2", "TP0->pending", "pending"}
)
def _normalize_outcome_path(outcome_path: str) -> str:
return outcome_path.replace("", "->").replace("", "->")
def pl_marius(outcome_path: str, be_moved: bool) -> float | None:
"""Return the P/L (in R) for the real Marius overlay.
Accepts both ASCII arrow ``"TP0->TP1"`` and unicode arrow ``"TP0→TP1"``.
Returns ``None`` for pending outcomes.
"""
normalized = _normalize_outcome_path(outcome_path)
if normalized not in _VALID_OUTCOME_PATHS:
raise ValueError(f"invalid outcome_path: {outcome_path!r}")
return PL_MARIUS_TABLE[(normalized, be_moved)]
def pl_theoretical(max_reached: str) -> float:
"""Return the P/L (in R) for the theoretical 1/3-1/3-1/3 hold-to-TP2 overlay."""
if max_reached not in PL_THEORETICAL_TABLE:
raise ValueError(f"invalid max_reached: {max_reached!r}")
return PL_THEORETICAL_TABLE[max_reached]

125
scripts/vision_schema.py Normal file
View File

@@ -0,0 +1,125 @@
"""Pydantic schema for the M2D vision-extraction JSON returned by the vision subagent."""
from __future__ import annotations
import re
from datetime import date as date_type, datetime, timezone
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, model_validator
_DATA_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$")
_ORA_PATTERN = re.compile(r"^\d{2}:\d{2}$")
class M2DExtraction(BaseModel):
model_config = ConfigDict(extra="forbid")
screenshot_file: str
data: str
ora_utc: str
instrument: Literal["DIA", "US30", "other"]
directie: Literal["Buy", "Sell"]
tf_mare: Literal["5min", "15min"]
tf_mic: Literal["1min", "3min"]
calitate: Literal["Clară", "Mai mare ca impuls", "Slabă", "n/a"]
entry: float
sl: float
tp0: float
tp1: float
tp2: float
risc_pct: float
outcome_path: Literal[
"SL", "TP0→SL", "TP0→TP1", "TP0→TP2", "TP0→pending", "pending"
]
max_reached: Literal["SL_first", "TP0", "TP1", "TP2"]
be_moved: bool
confidence: Literal["high", "medium", "low"]
ambiguities: list[str] = Field(default_factory=list)
note: str = ""
@model_validator(mode="after")
def _validate_data_format(self) -> "M2DExtraction":
if not _DATA_PATTERN.match(self.data):
raise ValueError(
f"data must match YYYY-MM-DD, got {self.data!r}"
)
try:
parsed = date_type.fromisoformat(self.data)
except ValueError as exc:
raise ValueError(f"data is not a valid ISO date: {self.data!r}") from exc
today = datetime.now(timezone.utc).date()
if parsed > today:
raise ValueError(
f"data {self.data!r} is in the future (today UTC: {today.isoformat()})"
)
return self
@model_validator(mode="after")
def _validate_ora_utc_format(self) -> "M2DExtraction":
if not _ORA_PATTERN.match(self.ora_utc):
raise ValueError(
f"ora_utc must match HH:MM, got {self.ora_utc!r}"
)
try:
datetime.strptime(self.ora_utc, "%H:%M")
except ValueError as exc:
raise ValueError(
f"ora_utc is not a valid HH:MM time: {self.ora_utc!r}"
) from exc
return self
@model_validator(mode="after")
def _validate_entry_ne_sl(self) -> "M2DExtraction":
if self.entry == self.sl:
raise ValueError("entry must not equal sl (zero risk distance)")
return self
@model_validator(mode="after")
def _validate_tp_ordering(self) -> "M2DExtraction":
if self.directie == "Buy":
if not (self.sl < self.entry < self.tp0 < self.tp1 < self.tp2):
raise ValueError(
"for Buy, required: sl < entry < tp0 < tp1 < tp2 "
f"(got sl={self.sl}, entry={self.entry}, tp0={self.tp0}, "
f"tp1={self.tp1}, tp2={self.tp2})"
)
else:
if not (self.sl > self.entry > self.tp0 > self.tp1 > self.tp2):
raise ValueError(
"for Sell, required: sl > entry > tp0 > tp1 > tp2 "
f"(got sl={self.sl}, entry={self.entry}, tp0={self.tp0}, "
f"tp1={self.tp1}, tp2={self.tp2})"
)
return self
@model_validator(mode="after")
def _validate_outcome_max_consistency(self) -> "M2DExtraction":
op = self.outcome_path
mr = self.max_reached
if op == "SL":
if mr != "SL_first":
raise ValueError(
f"outcome_path='SL' requires max_reached='SL_first', got {mr!r}"
)
elif op.startswith("TP0"):
if mr not in {"TP0", "TP1", "TP2"}:
raise ValueError(
f"outcome_path={op!r} requires max_reached in "
f"{{TP0, TP1, TP2}}, got {mr!r}"
)
# op == "pending" → any max_reached accepted
return self
def parse_extraction(json_str: str) -> M2DExtraction:
"""Parse a JSON string into an M2DExtraction.
Raises pydantic.ValidationError on invalid input.
"""
return M2DExtraction.model_validate_json(json_str)
def parse_extraction_dict(d: dict) -> M2DExtraction:
"""Validate a dict against the M2DExtraction schema."""
return M2DExtraction.model_validate(d)