diff --git a/scripts/calendar_parse.py b/scripts/calendar_parse.py new file mode 100644 index 0000000..6680636 --- /dev/null +++ b/scripts/calendar_parse.py @@ -0,0 +1,181 @@ +"""Calendar parsing + Set classification for M2D backtesting. + +Each trade is tagged with a ``Set`` derived from its date, RO-local time, and the +economic-event calendar: + +- ``A1``: 16:35-17:00 RO, Tue/Wed/Thu +- ``A2``: 17:00-18:00 RO, Tue/Wed/Thu (sweet spot) +- ``A3``: 18:00-19:00 RO, Tue/Wed/Thu +- ``B`` : 22:00-22:45 RO, Tue/Wed/Thu +- ``C`` : inside the window of an event with severity in {extrem, mare} +- ``D`` : Mon or Fri +- ``Other``: anything else + +Priority: C > D > A1/A2/A3/B > Other. +""" + +from __future__ import annotations + +from datetime import date, datetime, time +from pathlib import Path +from typing import Any + +import yaml +from zoneinfo import ZoneInfo + +__all__ = [ + "RO_TZ", + "UTC_TZ", + "utc_to_ro", + "load_calendar", + "is_in_news_window", + "calc_set", +] + + +RO_TZ = ZoneInfo("Europe/Bucharest") +UTC_TZ = ZoneInfo("UTC") + +_DAY_SHORT = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun") + +_HIGH_SEVERITY = frozenset({"extrem", "mare"}) + +_WEEKLY_DAY_MAP = { + "monday": 0, + "tuesday": 1, + "wednesday": 2, + "thursday": 3, + "friday": 4, + "saturday": 5, + "sunday": 6, +} + + +def utc_to_ro(date_str: str, ora_utc_str: str) -> tuple[date, time, str]: + """Convert ``(YYYY-MM-DD, HH:MM UTC)`` to ``(date_ro, time_ro, day_short)``. + + DST-aware via :mod:`zoneinfo`. ``day_short`` is one of + ``Mon Tue Wed Thu Fri Sat Sun``. + """ + dt_utc = datetime.strptime(f"{date_str} {ora_utc_str}", "%Y-%m-%d %H:%M").replace( + tzinfo=UTC_TZ + ) + dt_ro = dt_utc.astimezone(RO_TZ) + return dt_ro.date(), dt_ro.time().replace(second=0, microsecond=0), _DAY_SHORT[dt_ro.weekday()] + + +def load_calendar(path: Path | str = "calendar_evenimente.yaml") -> list[dict[str, Any]]: + """Load a YAML calendar file. + + Validates ``schema_version == 1`` and returns the list of event dicts under + the top-level ``events`` key. + """ + p = Path(path) + with p.open("r", encoding="utf-8") as fh: + doc = yaml.safe_load(fh) + if not isinstance(doc, dict): + raise ValueError(f"calendar file {p} is not a mapping") + version = doc.get("schema_version") + if version != 1: + raise ValueError( + f"unsupported calendar schema_version: {version!r} (expected 1)" + ) + events = doc.get("events") or [] + if not isinstance(events, list): + raise ValueError(f"calendar events must be a list, got {type(events).__name__}") + return events + + +def _minutes(t: time) -> int: + return t.hour * 60 + t.minute + + +def _parse_hhmm(s: str) -> time: + return datetime.strptime(s, "%H:%M").time() + + +def _is_first_friday_of_month(d: date) -> bool: + return d.weekday() == 4 and d.day <= 7 + + +def _event_matches_date(event: dict[str, Any], d: date) -> bool: + cadence = event.get("cadence", "") + if cadence == "scheduled": + ev_date_raw = event.get("date") + if isinstance(ev_date_raw, date): + ev_date = ev_date_raw + elif isinstance(ev_date_raw, str): + ev_date = datetime.strptime(ev_date_raw, "%Y-%m-%d").date() + else: + return False + return ev_date == d + if cadence == "first_friday_monthly": + return _is_first_friday_of_month(d) + if cadence.startswith("weekly_"): + day_name = cadence[len("weekly_") :].lower() + target = _WEEKLY_DAY_MAP.get(day_name) + if target is None: + return False + return d.weekday() == target + # cadences below are not pinned down to a precise calendar day yet, so we + # do not trigger Set C for them. ADP pre-NFP is also explicitly deferred. + return False + + +def is_in_news_window(d: date, t: time, calendar: list[dict[str, Any]]) -> bool: + """Return True iff ``(d, t)`` falls inside the window of a high-severity event. + + Window: ``[time_ro - window_before_min, time_ro + window_after_min]`` (inclusive + on both ends). Only events with ``severity`` in ``{extrem, mare}`` count. + + Cadences honoured: ``scheduled``, ``first_friday_monthly``, ``weekly_``. + Other cadences (``monthly_mid``, ``monthly_end``, ``monthly_15``, + ``wednesday_pre_nfp``, ``monthly_first_week`` etc.) are deferred and never + trigger Set C. + """ + t_min = _minutes(t) + for event in calendar: + if event.get("severity") not in _HIGH_SEVERITY: + continue + if not _event_matches_date(event, d): + continue + ev_time_raw = event.get("time_ro") + if isinstance(ev_time_raw, time): + ev_time = ev_time_raw + elif isinstance(ev_time_raw, str): + ev_time = _parse_hhmm(ev_time_raw) + else: + continue + center = _minutes(ev_time) + before = int(event.get("window_before_min", 0)) + after = int(event.get("window_after_min", 0)) + if center - before <= t_min <= center + after: + return True + return False + + +def _in_range(t: time, lo: time, hi: time) -> bool: + """Half-open ``[lo, hi)`` containment.""" + return _minutes(lo) <= _minutes(t) < _minutes(hi) + + +def calc_set(d: date, t: time, day_of_week: str, calendar: list[dict[str, Any]]) -> str: + """Classify a trade into one of ``A1 A2 A3 B C D Other``. + + Priority: ``C`` (news) > ``D`` (Mon/Fri) > ``A1/A2/A3/B`` (time bands on + Tue/Wed/Thu) > ``Other``. + """ + if is_in_news_window(d, t, calendar): + return "C" + if day_of_week in ("Mon", "Fri"): + return "D" + if day_of_week in ("Tue", "Wed", "Thu"): + if _in_range(t, time(16, 35), time(17, 0)): + return "A1" + if _in_range(t, time(17, 0), time(18, 0)): + return "A2" + if _in_range(t, time(18, 0), time(19, 0)): + return "A3" + if _in_range(t, time(22, 0), time(22, 45)): + return "B" + return "Other" diff --git a/scripts/pl_calc.py b/scripts/pl_calc.py new file mode 100644 index 0000000..56676fe --- /dev/null +++ b/scripts/pl_calc.py @@ -0,0 +1,76 @@ +"""P/L overlays for M2D backtesting. + +Two overlays computed from the same trade outcome: + +- ``pl_marius``: real overlay used by the trader. 50% closed at TP0 (+0.2 R), + BE move on the remaining half, then close 50% of that at ~TP1 (+0.3 R total + contribution) or at SL/BE depending on outcome. TP1 is treated as the final + exit even when the chart subsequently reaches TP2. + +- ``pl_theoretical``: reference 1/3-1/3-1/3 overlay that holds to TP2. Used + as an opportunity-cost benchmark vs. ``pl_marius``. + +Returns are expressed in multiples of R (risk per trade). ``None`` from +``pl_marius`` denotes a still-pending trade. +""" + +from __future__ import annotations + +__all__ = [ + "PL_MARIUS_TABLE", + "PL_THEORETICAL_TABLE", + "pl_marius", + "pl_theoretical", +] + + +PL_MARIUS_TABLE: dict[tuple[str, bool], float | None] = { + ("SL", True): -1.0, + ("SL", False): -1.0, + ("TP0->SL", True): 0.20, + ("TP0->SL", False): -0.30, + ("TP0->TP1", True): 0.50, + ("TP0->TP1", False): 0.50, + ("TP0->TP2", True): 0.50, + ("TP0->TP2", False): 0.50, + ("TP0->pending", True): None, + ("TP0->pending", False): None, + ("pending", True): None, + ("pending", False): None, +} + + +PL_THEORETICAL_TABLE: dict[str, float] = { + "SL_first": -1.0, + "TP0": 0.133, + "TP1": 0.333, + "TP2": 0.667, +} + + +_VALID_OUTCOME_PATHS: frozenset[str] = frozenset( + {"SL", "TP0->SL", "TP0->TP1", "TP0->TP2", "TP0->pending", "pending"} +) + + +def _normalize_outcome_path(outcome_path: str) -> str: + return outcome_path.replace("→", "->").replace("→", "->") + + +def pl_marius(outcome_path: str, be_moved: bool) -> float | None: + """Return the P/L (in R) for the real Marius overlay. + + Accepts both ASCII arrow ``"TP0->TP1"`` and unicode arrow ``"TP0→TP1"``. + Returns ``None`` for pending outcomes. + """ + normalized = _normalize_outcome_path(outcome_path) + if normalized not in _VALID_OUTCOME_PATHS: + raise ValueError(f"invalid outcome_path: {outcome_path!r}") + return PL_MARIUS_TABLE[(normalized, be_moved)] + + +def pl_theoretical(max_reached: str) -> float: + """Return the P/L (in R) for the theoretical 1/3-1/3-1/3 hold-to-TP2 overlay.""" + if max_reached not in PL_THEORETICAL_TABLE: + raise ValueError(f"invalid max_reached: {max_reached!r}") + return PL_THEORETICAL_TABLE[max_reached] diff --git a/scripts/vision_schema.py b/scripts/vision_schema.py new file mode 100644 index 0000000..0f45032 --- /dev/null +++ b/scripts/vision_schema.py @@ -0,0 +1,125 @@ +"""Pydantic schema for the M2D vision-extraction JSON returned by the vision subagent.""" +from __future__ import annotations + +import re +from datetime import date as date_type, datetime, timezone +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +_DATA_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$") +_ORA_PATTERN = re.compile(r"^\d{2}:\d{2}$") + + +class M2DExtraction(BaseModel): + model_config = ConfigDict(extra="forbid") + + screenshot_file: str + data: str + ora_utc: str + instrument: Literal["DIA", "US30", "other"] + directie: Literal["Buy", "Sell"] + tf_mare: Literal["5min", "15min"] + tf_mic: Literal["1min", "3min"] + calitate: Literal["Clară", "Mai mare ca impuls", "Slabă", "n/a"] + entry: float + sl: float + tp0: float + tp1: float + tp2: float + risc_pct: float + outcome_path: Literal[ + "SL", "TP0→SL", "TP0→TP1", "TP0→TP2", "TP0→pending", "pending" + ] + max_reached: Literal["SL_first", "TP0", "TP1", "TP2"] + be_moved: bool + confidence: Literal["high", "medium", "low"] + ambiguities: list[str] = Field(default_factory=list) + note: str = "" + + @model_validator(mode="after") + def _validate_data_format(self) -> "M2DExtraction": + if not _DATA_PATTERN.match(self.data): + raise ValueError( + f"data must match YYYY-MM-DD, got {self.data!r}" + ) + try: + parsed = date_type.fromisoformat(self.data) + except ValueError as exc: + raise ValueError(f"data is not a valid ISO date: {self.data!r}") from exc + today = datetime.now(timezone.utc).date() + if parsed > today: + raise ValueError( + f"data {self.data!r} is in the future (today UTC: {today.isoformat()})" + ) + return self + + @model_validator(mode="after") + def _validate_ora_utc_format(self) -> "M2DExtraction": + if not _ORA_PATTERN.match(self.ora_utc): + raise ValueError( + f"ora_utc must match HH:MM, got {self.ora_utc!r}" + ) + try: + datetime.strptime(self.ora_utc, "%H:%M") + except ValueError as exc: + raise ValueError( + f"ora_utc is not a valid HH:MM time: {self.ora_utc!r}" + ) from exc + return self + + @model_validator(mode="after") + def _validate_entry_ne_sl(self) -> "M2DExtraction": + if self.entry == self.sl: + raise ValueError("entry must not equal sl (zero risk distance)") + return self + + @model_validator(mode="after") + def _validate_tp_ordering(self) -> "M2DExtraction": + if self.directie == "Buy": + if not (self.sl < self.entry < self.tp0 < self.tp1 < self.tp2): + raise ValueError( + "for Buy, required: sl < entry < tp0 < tp1 < tp2 " + f"(got sl={self.sl}, entry={self.entry}, tp0={self.tp0}, " + f"tp1={self.tp1}, tp2={self.tp2})" + ) + else: + if not (self.sl > self.entry > self.tp0 > self.tp1 > self.tp2): + raise ValueError( + "for Sell, required: sl > entry > tp0 > tp1 > tp2 " + f"(got sl={self.sl}, entry={self.entry}, tp0={self.tp0}, " + f"tp1={self.tp1}, tp2={self.tp2})" + ) + return self + + @model_validator(mode="after") + def _validate_outcome_max_consistency(self) -> "M2DExtraction": + op = self.outcome_path + mr = self.max_reached + if op == "SL": + if mr != "SL_first": + raise ValueError( + f"outcome_path='SL' requires max_reached='SL_first', got {mr!r}" + ) + elif op.startswith("TP0"): + if mr not in {"TP0", "TP1", "TP2"}: + raise ValueError( + f"outcome_path={op!r} requires max_reached in " + f"{{TP0, TP1, TP2}}, got {mr!r}" + ) + # op == "pending" → any max_reached accepted + return self + + +def parse_extraction(json_str: str) -> M2DExtraction: + """Parse a JSON string into an M2DExtraction. + + Raises pydantic.ValidationError on invalid input. + """ + return M2DExtraction.model_validate_json(json_str) + + +def parse_extraction_dict(d: dict) -> M2DExtraction: + """Validate a dict against the M2DExtraction schema.""" + return M2DExtraction.model_validate(d) diff --git a/tests/test_calendar_yaml.py b/tests/test_calendar_yaml.py new file mode 100644 index 0000000..17e7abc --- /dev/null +++ b/tests/test_calendar_yaml.py @@ -0,0 +1,88 @@ +"""Tests for the YAML loader and news-window logic in calendar_parse.""" + +from __future__ import annotations + +import sys +import textwrap +from datetime import date, time +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from scripts.calendar_parse import ( # noqa: E402 + is_in_news_window, + load_calendar, +) + + +REPO_ROOT = Path(__file__).resolve().parent.parent +CALENDAR_PATH = REPO_ROOT / "calendar_evenimente.yaml" + + +def test_load_calendar() -> None: + events = load_calendar(CALENDAR_PATH) + assert isinstance(events, list) + assert len(events) > 0 + required = {"name", "cadence", "time_ro", "severity", "window_before_min", "window_after_min"} + for ev in events: + missing = required - set(ev.keys()) + assert not missing, f"event {ev.get('name')!r} missing fields: {missing}" + + +def test_load_calendar_bad_version(tmp_path: Path) -> None: + bad = tmp_path / "bad.yaml" + bad.write_text( + textwrap.dedent( + """ + schema_version: 99 + events: [] + """ + ).strip() + + "\n", + encoding="utf-8", + ) + with pytest.raises(ValueError): + load_calendar(bad) + + +def _scheduled(date_str: str, time_str: str, before: int, after: int, severity: str = "extrem") -> dict: + return { + "name": "Test", + "cadence": "scheduled", + "date": date_str, + "time_ro": time_str, + "severity": severity, + "window_before_min": before, + "window_after_min": after, + } + + +class TestWindowBoundaries: + def setup_method(self) -> None: + self.cal = [_scheduled("2026-05-06", "15:30", 15, 15)] + self.d = date(2026, 5, 6) + + def test_window_inside_boundary(self) -> None: + assert is_in_news_window(self.d, time(15, 14), self.cal) is False # 1 min before lower bound + assert is_in_news_window(self.d, time(15, 15), self.cal) is True # lower bound inclusive + assert is_in_news_window(self.d, time(15, 45), self.cal) is True # upper bound inclusive + + def test_window_outside(self) -> None: + assert is_in_news_window(self.d, time(15, 14), self.cal) is False + assert is_in_news_window(self.d, time(15, 46), self.cal) is False + + +def test_severity_filter_mediu_excluded() -> None: + # JOLTS-like event with severity 'mediu' at 17:00 — even smack on time, no Set C trigger. + cal = [_scheduled("2026-05-06", "17:00", 10, 10, severity="mediu")] + assert is_in_news_window(date(2026, 5, 6), time(17, 0), cal) is False + assert is_in_news_window(date(2026, 5, 6), time(17, 5), cal) is False + + +def test_fomc_powell_window() -> None: + """Real FOMC Powell Press Apr from calendar_evenimente.yaml (2026-04-29 21:30 RO, 0/45).""" + cal = load_calendar(CALENDAR_PATH) + assert is_in_news_window(date(2026, 4, 29), time(21, 35), cal) is True + assert is_in_news_window(date(2026, 4, 29), time(22, 16), cal) is False diff --git a/tests/test_pl_calc.py b/tests/test_pl_calc.py new file mode 100644 index 0000000..4bcb117 --- /dev/null +++ b/tests/test_pl_calc.py @@ -0,0 +1,89 @@ +"""Tests for scripts/pl_calc.py.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from scripts.pl_calc import ( # noqa: E402 + PL_MARIUS_TABLE, + PL_THEORETICAL_TABLE, + pl_marius, + pl_theoretical, +) + + +class TestPlMarius: + def test_sl(self) -> None: + assert pl_marius("SL", be_moved=True) == -1.0 + assert pl_marius("SL", be_moved=False) == -1.0 + + def test_tp0_sl_be_moved(self) -> None: + assert pl_marius("TP0->SL", be_moved=True) == pytest.approx(0.20) + + def test_tp0_sl_no_be(self) -> None: + assert pl_marius("TP0->SL", be_moved=False) == pytest.approx(-0.30) + + def test_tp0_tp1(self) -> None: + assert pl_marius("TP0->TP1", be_moved=True) == pytest.approx(0.50) + assert pl_marius("TP0->TP1", be_moved=False) == pytest.approx(0.50) + + def test_tp0_tp2_closes_at_tp1(self) -> None: + assert pl_marius("TP0->TP2", be_moved=True) == pytest.approx(0.50) + assert pl_marius("TP0->TP2", be_moved=False) == pytest.approx(0.50) + + def test_tp0_pending_returns_none(self) -> None: + assert pl_marius("TP0->pending", be_moved=True) is None + assert pl_marius("TP0->pending", be_moved=False) is None + + def test_pending_returns_none(self) -> None: + assert pl_marius("pending", be_moved=True) is None + assert pl_marius("pending", be_moved=False) is None + + def test_unicode_arrow_accepted(self) -> None: + assert pl_marius("TP0→TP1", be_moved=True) == pytest.approx(0.50) + assert pl_marius("TP0→SL", be_moved=False) == pytest.approx(-0.30) + + def test_invalid_outcome_path(self) -> None: + with pytest.raises(ValueError): + pl_marius("nonsense", be_moved=True) + with pytest.raises(ValueError): + pl_marius("TP3", be_moved=False) + with pytest.raises(ValueError): + pl_marius("", be_moved=True) + + +class TestPlTheoretical: + def test_sl_first(self) -> None: + assert pl_theoretical("SL_first") == -1.0 + + def test_tp0(self) -> None: + assert pl_theoretical("TP0") == pytest.approx(0.133) + + def test_tp1(self) -> None: + assert pl_theoretical("TP1") == pytest.approx(0.333) + + def test_tp2(self) -> None: + assert pl_theoretical("TP2") == pytest.approx(0.667) + + def test_invalid_max_reached(self) -> None: + with pytest.raises(ValueError): + pl_theoretical("TP3") + with pytest.raises(ValueError): + pl_theoretical("sl_first") # case-sensitive + with pytest.raises(ValueError): + pl_theoretical("") + + +class TestTables: + def test_marius_table_exported(self) -> None: + assert ("SL", True) in PL_MARIUS_TABLE + assert PL_MARIUS_TABLE[("TP0->TP1", True)] == pytest.approx(0.50) + + def test_theoretical_table_exported(self) -> None: + assert PL_THEORETICAL_TABLE["TP2"] == pytest.approx(0.667) + assert PL_THEORETICAL_TABLE["SL_first"] == -1.0 diff --git a/tests/test_set_calc.py b/tests/test_set_calc.py new file mode 100644 index 0000000..6e4a6d5 --- /dev/null +++ b/tests/test_set_calc.py @@ -0,0 +1,88 @@ +"""Tests for calc_set + utc_to_ro in calendar_parse.""" + +from __future__ import annotations + +import sys +from datetime import date, time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from scripts.calendar_parse import ( # noqa: E402 + calc_set, + load_calendar, + utc_to_ro, +) + + +REPO_ROOT = Path(__file__).resolve().parent.parent +CALENDAR_PATH = REPO_ROOT / "calendar_evenimente.yaml" + + +def _cal(): + return load_calendar(CALENDAR_PATH) + + +# Reference weekdays used below (verified via datetime): +# 2026-05-13 Wed 2026-05-12 Tue 2026-05-14 Thu +# 2026-05-11 Mon 2026-05-15 Fri +# 2026-04-29 Wed (FOMC Powell Press Apr — Set C trigger) + + +def test_a1_mid() -> None: + assert calc_set(date(2026, 5, 13), time(16, 50), "Wed", _cal()) == "A1" + + +def test_a1_boundary_low() -> None: + assert calc_set(date(2026, 5, 12), time(16, 35), "Tue", _cal()) == "A1" + + +def test_a1_boundary_high() -> None: + assert calc_set(date(2026, 5, 14), time(16, 59), "Thu", _cal()) == "A1" + + +def test_a2_sweet_spot() -> None: + assert calc_set(date(2026, 5, 13), time(17, 30), "Wed", _cal()) == "A2" + + +def test_a3() -> None: + assert calc_set(date(2026, 5, 12), time(18, 30), "Tue", _cal()) == "A3" + + +def test_b() -> None: + assert calc_set(date(2026, 5, 14), time(22, 15), "Thu", _cal()) == "B" + + +def test_c_fomc() -> None: + # 2026-04-29 is Wed; would otherwise hit a time band — but FOMC Powell Press window dominates. + assert calc_set(date(2026, 4, 29), time(21, 35), "Wed", _cal()) == "C" + + +def test_d_mon() -> None: + assert calc_set(date(2026, 5, 11), time(17, 0), "Mon", _cal()) == "D" + + +def test_d_fri() -> None: + assert calc_set(date(2026, 5, 15), time(17, 0), "Fri", _cal()) == "D" + + +def test_other() -> None: + # Tue 13:00 — not Mon/Fri, no news, before any A-band. + assert calc_set(date(2026, 5, 12), time(13, 0), "Tue", _cal()) == "Other" + + +def test_dst_boundary_oct_2026() -> None: + """DST ends on Sun 2026-10-25 at 04:00 RO (clocks go back to 03:00). + + Just before the shift, 00:30 UTC = 03:30 RO (EEST, UTC+3). The conversion must + pick the pre-shift offset and yield 03:30 — not 02:30 (which would be an + off-by-one-hour bug from naive +2h). + """ + d_ro, t_ro, dow = utc_to_ro("2026-10-25", "00:30") + assert d_ro == date(2026, 10, 25) + assert t_ro == time(3, 30) + assert dow == "Sun" + + # After the shift, 01:30 UTC also maps to 03:30 RO (EET, UTC+2) — sanity check. + d_ro2, t_ro2, _ = utc_to_ro("2026-10-25", "01:30") + assert (d_ro2, t_ro2) == (date(2026, 10, 25), time(3, 30)) diff --git a/tests/test_vision_schema.py b/tests/test_vision_schema.py new file mode 100644 index 0000000..08368f0 --- /dev/null +++ b/tests/test_vision_schema.py @@ -0,0 +1,225 @@ +"""Tests for scripts.vision_schema.M2DExtraction.""" +from __future__ import annotations + +import json +import sys +from datetime import datetime, timezone +from pathlib import Path + +import pytest +from pydantic import ValidationError + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from scripts.vision_schema import ( # noqa: E402 + M2DExtraction, + parse_extraction, + parse_extraction_dict, +) + + +def _buy_payload(**overrides) -> dict: + base = { + "screenshot_file": "dia-1min-example.png", + "data": "2026-05-13", + "ora_utc": "14:23", + "instrument": "DIA", + "directie": "Buy", + "tf_mare": "5min", + "tf_mic": "1min", + "calitate": "Clară", + "entry": 400.0, + "sl": 399.0, + "tp0": 400.5, + "tp1": 401.0, + "tp2": 402.0, + "risc_pct": 0.25, + "outcome_path": "TP0→TP1", + "max_reached": "TP1", + "be_moved": True, + "confidence": "high", + "ambiguities": [], + "note": "", + } + base.update(overrides) + return base + + +def _sell_payload(**overrides) -> dict: + base = { + "screenshot_file": "dia-sell.png", + "data": "2026-05-13", + "ora_utc": "15:00", + "instrument": "US30", + "directie": "Sell", + "tf_mare": "15min", + "tf_mic": "3min", + "calitate": "Mai mare ca impuls", + "entry": 400.0, + "sl": 401.0, + "tp0": 399.5, + "tp1": 399.0, + "tp2": 398.0, + "risc_pct": 0.3, + "outcome_path": "TP0→TP2", + "max_reached": "TP2", + "be_moved": False, + "confidence": "medium", + "ambiguities": ["entry overlap with wick"], + "note": "nothing", + } + base.update(overrides) + return base + + +def test_happy_path_buy(): + m = parse_extraction_dict(_buy_payload()) + assert m.directie == "Buy" + assert m.entry == 400.0 + + +def test_happy_path_sell(): + m = parse_extraction_dict(_sell_payload()) + assert m.directie == "Sell" + assert m.sl > m.entry > m.tp0 > m.tp1 > m.tp2 + + +def test_parse_extraction_from_json_str(): + payload = _buy_payload() + m = parse_extraction(json.dumps(payload)) + assert isinstance(m, M2DExtraction) + + +@pytest.mark.parametrize( + "field,bad_value", + [ + ("directie", "Long"), + ("instrument", "SPY"), + ("tf_mare", "30min"), + ("tf_mic", "2min"), + ("calitate", "Bună"), + ("outcome_path", "BE"), + ("max_reached", "BE"), + ("confidence", "very-high"), + ], +) +def test_each_literal_rejection(field, bad_value): + payload = _buy_payload(**{field: bad_value}) + with pytest.raises(ValidationError): + parse_extraction_dict(payload) + + +def test_entry_equals_sl(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(entry=399.0, sl=399.0)) + + +def test_buy_tp_inverted(): + # tp1 < tp0 violates ordering + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(tp0=401.0, tp1=400.5, tp2=402.0)) + + +def test_buy_sl_above_entry_rejected(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(sl=400.5)) + + +def test_sell_order_correct(): + m = parse_extraction_dict(_sell_payload()) + assert m.sl > m.entry + assert m.entry > m.tp0 + assert m.tp0 > m.tp1 > m.tp2 + + +def test_sell_order_inverted_rejected(): + # using Buy-ordering values with directie=Sell + with pytest.raises(ValidationError): + parse_extraction_dict( + _sell_payload(sl=399.0, entry=400.0, tp0=400.5, tp1=401.0, tp2=402.0) + ) + + +def test_data_in_future(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(data="2099-01-01")) + + +def test_data_today_ok(): + today = datetime.now(timezone.utc).date().isoformat() + m = parse_extraction_dict(_buy_payload(data=today)) + assert m.data == today + + +def test_outcome_path_sl_max_reached_inconsistent(): + with pytest.raises(ValidationError): + parse_extraction_dict( + _buy_payload(outcome_path="SL", max_reached="TP1") + ) + + +def test_outcome_path_sl_max_reached_sl_first_ok(): + m = parse_extraction_dict( + _buy_payload(outcome_path="SL", max_reached="SL_first") + ) + assert m.outcome_path == "SL" + + +def test_outcome_path_tp0_max_reached_sl_first_rejected(): + with pytest.raises(ValidationError): + parse_extraction_dict( + _buy_payload(outcome_path="TP0→TP1", max_reached="SL_first") + ) + + +def test_outcome_path_pending_any_max_reached_ok(): + m = parse_extraction_dict( + _buy_payload(outcome_path="pending", max_reached="SL_first") + ) + assert m.outcome_path == "pending" + + +def test_extra_field_forbidden(): + payload = _buy_payload() + payload["unexpected_field"] = "x" + with pytest.raises(ValidationError): + parse_extraction_dict(payload) + + +def test_data_bad_format(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(data="2026/05/13")) + + +def test_data_bad_format_short(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(data="26-05-13")) + + +def test_ora_utc_bad_format(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(ora_utc="14:23:00")) + + +def test_ora_utc_bad_format_no_colon(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(ora_utc="1423")) + + +def test_ora_utc_invalid_hour(): + with pytest.raises(ValidationError): + parse_extraction_dict(_buy_payload(ora_utc="25:00")) + + +def test_ambiguities_default_empty(): + payload = _buy_payload() + del payload["ambiguities"] + m = parse_extraction_dict(payload) + assert m.ambiguities == [] + + +def test_note_default_empty(): + payload = _buy_payload() + del payload["note"] + m = parse_extraction_dict(payload) + assert m.note == ""