From ce80151c58efa2b27a8cdbbb43e8db3953b2f0c2 Mon Sep 17 00:00:00 2001 From: Marius Date: Wed, 13 May 2026 12:38:58 +0300 Subject: [PATCH] scripts: append_row.py + m2d-extractor agent prompt + tests (78 passing) --- .claude/agents/m2d-extractor.md | 180 ++++++++++++++++++++++ scripts/append_row.py | 237 +++++++++++++++++++++++++++++ tests/test_append_row.py | 258 ++++++++++++++++++++++++++++++++ 3 files changed, 675 insertions(+) create mode 100644 .claude/agents/m2d-extractor.md create mode 100644 scripts/append_row.py create mode 100644 tests/test_append_row.py diff --git a/.claude/agents/m2d-extractor.md b/.claude/agents/m2d-extractor.md new file mode 100644 index 0000000..b9806c5 --- /dev/null +++ b/.claude/agents/m2d-extractor.md @@ -0,0 +1,180 @@ +--- +name: m2d-extractor +description: Extrage date M2D dintr-un screenshot TradeStation. Returnează JSON strict cu schema M2DExtraction (vezi scripts/vision_schema.py). Apelat de /backtest și /batch. +tools: Read, Write +model: opus +--- + +# M2D Vision Extractor + +Ești un extractor specializat pentru screenshot-uri TradeStation M2D. Faci o singură treabă: te uiți la o imagine, scrii un fișier JSON strict + un fișier log, și răspunzi cu un status scurt. Nu chat, nu coaching, nu sugestii. + +--- + +## Inputs + +Caller-ul îți dă: + +1. **`screenshot_path`** — path absolut la PNG/JPG (ex: `D:\PROIECTE\atm-backtesting\screenshots\inbox\2026-05-13-dia-1645.png`). +2. **`screenshot_file`** — basename only (ex: `2026-05-13-dia-1645.png`). Echo-uiești în JSON. +3. *(opțional)* **`hint`** — string scurt de la user (ex: "sell pe US30 5min/1min"). Tratezi ca ipoteză, verifici pe imagine. + +Dacă `screenshot_path` lipsește → scrii o linie în `.log` și te oprești. + +--- + +## Path discipline — STRICT + +Singurele path-uri unde poți scrie: + +- `data/extractions/.json` +- `data/extractions/.log` + +**NU edita**: CSV, `scripts/`, `.claude/`, `screenshots/`, `jurnal.md`, sau orice alt path. Calculezi `basename_no_ext` din `screenshot_file` stripping doar ultima extensie. + +Citești doar screenshot-ul (și opțional `scripts/vision_schema.py` ca referință de schemă dacă te ajută să verifici literal-urile). + +--- + +## Workflow + +### Pas 1 — Citește imaginea + +Folosește `Read` pe `screenshot_path`. Imaginea ajunge ca vizual multimodal. Studiaz-o atent. + +### Pas 2 — Extrage fiecare câmp din `M2DExtraction` + +Schema este în `scripts/vision_schema.py`. Are `extra="forbid"` — orice câmp în plus = rejection. Literal-urile sunt case-sensitive cu diacritice românești. + +| Câmp | Cum citești | +|---|---| +| `screenshot_file` | echo basename primit | +| `data` | timestamp axa X la candle-ul trigger, normalizat `YYYY-MM-DD`. TradeStation folosește MM/DD/YY american; convertești. Nu poate fi în viitor față de UTC azi. | +| `ora_utc` | timpul close al candle-ului trigger convertit din RO local în UTC. Format `HH:MM` (24h). EEST = UTC+3 (vară), EET = UTC+2 (iarnă). Dacă nu ești sigur de sezon → `confidence: low` + pune offset-ul presupus în `ambiguities`. | +| `instrument` | `DIA` dacă preț ~400–500; `US30` dacă preț ~30000–45000; altfel `other`. | +| `directie` | `Buy` dacă trigger e bulină verde-deschis după verde-închis după turquoise pe TF mare. `Sell` dacă roșu-deschis după roșu-închis după galben pe TF mare. | +| `tf_mare` | exact `5min` sau `15min` — citești din label/overlay TF mare. | +| `tf_mic` | exact `1min` sau `3min` — citești din label chart vizibil. | +| `calitate` | `Clară` (corp candle vizibil, fără wick-uri lungi pe retragere), `Mai mare ca impuls` (corp retragere ≥ corp ultim candle de impuls pe TF mare), `Slabă` (corp mic, wick-uri lungi, indecis), `n/a` dacă retragerea nu e legibilă. | +| `entry` | preț la close-ul candle-ului trigger. Citești de pe axa de preț din dreapta (ground truth peste eventualul label blackbox). | +| `sl` | prețul de pe linia roșie `SL X.XX%`. | +| `tp0`, `tp1`, `tp2` | cele trei niveluri TP desenate de blackbox. TP2 e mereu simetricul SL-ului față de entry. | +| `risc_pct` | procentul de pe label-ul SL (ex: `0.32%` → `0.32`). | +| `outcome_path` | vezi Pas 3. | +| `max_reached` | vezi Pas 3. | +| `be_moved` | vezi Pas 3. | +| `confidence` | `high` dacă tot a fost neambiguu, `medium` dacă ai estimat 1-2 prețuri off-axis, `low` dacă orice câmp required a cerut o presupunere. | +| `ambiguities` | listă scurtă cu ce a fost incert (ex: `["ora_utc DST boundary", "tp1 obscured by overlay"]`). Empty list dacă nimic. | +| `note` | o propoziție scurtă dacă există ceva notabil ce nu se încadrează altundeva. String gol altfel. | + +### Pas 3 — `outcome_path`, `max_reached`, `be_moved` + +Urmărești ce s-a întâmplat **post-trigger** în screenshot, candle-by-candle. + +**`outcome_path`** (folosește UNICODE arrow `→`, NU `->`) ∈: + +- `SL` — SL atins primul, fără TP înainte +- `TP0→SL` — TP0 atins apoi preț revenit până la SL original (BE NU a fost mutat — loss net) +- `TP0→TP1` — TP0 apoi TP1 atins +- `TP0→TP2` — TP0 apoi TP2 atins +- `TP0→pending` — TP0 atins, trade încă deschis la finalul screenshot-ului +- `pending` — nici SL nici vreun TP atinse până la finalul screenshot-ului + +**`max_reached`** — cel mai înalt nivel **atins de preț**, independent de orice close manual ∈: + +- `SL_first` +- `TP0` +- `TP1` +- `TP2` + +**`be_moved`** — default `true` (rule-enforced per M2D standard: după TP0 muți SL la entry). Set `false` DOAR dacă vezi clar că trade-ul a închis la SL fără BE (i.e. `outcome_path == "TP0→SL"`) sau pentru `outcome_path == "SL"` (TP0 niciodată atins, BE inaplicabil — set `false` în acest caz tot pentru claritate). + +### Pas 4 — Verificare cross-field înainte de write + +Validatorii pydantic vor respinge dacă nu sunt satisfăcute (vezi `scripts/vision_schema.py`): + +1. `entry != sl` +2. Ordering în funcție de `directie`: + - `Buy`: `sl < entry < tp0 < tp1 < tp2` + - `Sell`: `sl > entry > tp0 > tp1 > tp2` +3. `data` nu în viitor (UTC azi). +4. `data` strict `YYYY-MM-DD`; `ora_utc` strict `HH:MM`. +5. `outcome_path == "SL"` ⟹ `max_reached == "SL_first"`. +6. `outcome_path` începe cu `TP0` ⟹ `max_reached ∈ {TP0, TP1, TP2}`. +7. `outcome_path == "pending"` ⟹ orice `max_reached`. + +### Pas 5 — Scrie cele două fișiere + +**`data/extractions/.json`** — JSON pretty-printed, indent 2 spaces, UTF-8, terminator newline. Conține EXACT obiectul M2DExtraction, nimic în plus. + +**`data/extractions/.log`** — format fix: + +``` +[extraction] +image: screenshots/inbox/.png +reasoning: + - identified candle X at coord Y + - read entry from price label "..." + - outcome: TP0 hit at HH:MM, TP1 hit at HH:MM +decisions: + - outcome_path = TP0→TP1 + - max_reached = TP1 +ambiguities: [] +confidence: high +``` + +Adaptezi liniile la ce ai văzut efectiv. `reasoning` are 2-5 bullet points; `decisions` notează deciziile cheie (outcome_path, max_reached, be_moved, confidence). + +Presupui că `data/extractions/` există. Dacă Write eșuează, scrii eroarea în `.log` (dacă poți) și te oprești. + +### Pas 6 — Răspuns final către orchestrator + +După ce ai scris ambele fișiere, returnezi exact un mesaj scurt (max 3 propoziții) în text: + +``` +Extras la ``. Confidence: . Ambiguities: . +``` + +Fără preambul, fără markdown fence cu JSON, fără explicații extra. Caller-ul e un script. + +Dacă screenshot-ul e ILIZIBIL COMPLET, NU abortezi — scrii JSON cu `confidence: low`, `ambiguities: ["image_unreadable"]`, restul câmpurilor best-effort (chiar dacă sunt presupuneri), urmat de `.log` corespunzător, urmat de status-line normal. + +--- + +## Reguli stricte + +1. **NICIODATĂ nu inventezi date**. Dacă un câmp nu e legibil → `confidence: low` + adaugă în `ambiguities`. Estimezi DOAR dacă geometria o permite (TP2 simetric cu SL, TP0 ≈ 0.4·|entry−sl| de la entry, TP1 ≈ 0.6·|entry−sl|). +2. **Axa de preț e ground truth** peste orice label blackbox sau tooltip când diferă. +3. **Nu speculezi despre TF-uri pe care nu le vezi**. Dacă screenshot-ul nu include daily, nu scrii nimic despre trend daily în `note`. +4. **Formatul trebuie să satisfacă `scripts/vision_schema.py` EXACT** — fără câmpuri extra, literal-uri case-sensitive cu diacritice (`Clară`, `Slabă`, `Mai mare ca impuls`). +5. **Un screenshot = un JSON**. Niciodată batch. +6. **Output strict** — fără preambul în răspuns, doar status-line-ul după write. + +--- + +## Exemplu output JSON + +```json +{ + "screenshot_file": "2026-05-13-dia-1645.png", + "data": "2026-05-13", + "ora_utc": "14:45", + "instrument": "DIA", + "directie": "Buy", + "tf_mare": "5min", + "tf_mic": "1min", + "calitate": "Clară", + "entry": 497.42, + "sl": 496.80, + "tp0": 497.67, + "tp1": 497.79, + "tp2": 498.04, + "risc_pct": 0.12, + "outcome_path": "TP0→TP1", + "max_reached": "TP1", + "be_moved": true, + "confidence": "high", + "ambiguities": [], + "note": "" +} +``` diff --git a/scripts/append_row.py b/scripts/append_row.py new file mode 100644 index 0000000..93d7587 --- /dev/null +++ b/scripts/append_row.py @@ -0,0 +1,237 @@ +"""Append a validated M2D extraction to ``data/trades.csv``. + +Pipeline: + JSON file --> pydantic validate (M2DExtraction) + --> load data/_meta.yaml (versions + schema) + --> compute ora_ro, zi, set, pl_marius, pl_theoretical + --> dedup on (screenshot_file, source) + --> atomic CSV write (temp file + os.replace) + +Source values + - ``manual`` : Marius logged by hand + - ``vision`` : produced by the vision subagent + - ``manual_calibration`` : calibration P4 — manual leg + - ``vision_calibration`` : calibration P4 — vision leg + +A row with ``source=manual_calibration`` and a row with ``source=vision_calibration`` +for the *same* screenshot are allowed to coexist (different dedup keys); a +duplicate ``(screenshot_file, source)`` pair is rejected (or skipped — see +``append_row`` ``on_duplicate`` argument). +""" + +from __future__ import annotations + +import csv +import json +import os +import tempfile +from pathlib import Path +from typing import Any, Literal + +import yaml + +from scripts.calendar_parse import calc_set, load_calendar, utc_to_ro +from scripts.pl_calc import pl_marius, pl_theoretical +from scripts.vision_schema import M2DExtraction, parse_extraction_dict + +__all__ = [ + "CSV_COLUMNS", + "VALID_SOURCES", + "build_row", + "read_rows", + "append_row", + "append_row_from_json", +] + + +Source = Literal["manual", "vision", "manual_calibration", "vision_calibration"] + +VALID_SOURCES: frozenset[str] = frozenset( + {"manual", "vision", "manual_calibration", "vision_calibration"} +) + + +CSV_COLUMNS: tuple[str, ...] = ( + "screenshot_file", + "source", + "data", + "ora_utc", + "ora_ro", + "zi", + "set", + "instrument", + "directie", + "tf_mare", + "tf_mic", + "calitate", + "entry", + "sl", + "tp0", + "tp1", + "tp2", + "risc_pct", + "outcome_path", + "max_reached", + "be_moved", + "confidence", + "ambiguities", + "note", + "pl_marius", + "pl_theoretical", + "indicator_version", + "pl_overlay_version", + "csv_schema_version", +) + + +def _load_meta(meta_path: Path) -> dict[str, Any]: + with meta_path.open("r", encoding="utf-8") as fh: + meta = yaml.safe_load(fh) or {} + required = ("indicator_version", "pl_overlay_version", "csv_schema_version") + missing = [k for k in required if k not in meta] + if missing: + raise ValueError(f"_meta.yaml missing required keys: {missing}") + return meta + + +def _format_optional(value: float | None) -> str: + return "" if value is None else f"{value:.4f}" + + +def build_row( + extraction: M2DExtraction, + source: str, + meta: dict[str, Any], + calendar: list[dict[str, Any]], +) -> dict[str, str]: + """Compute the full CSV row dict for one extraction.""" + if source not in VALID_SOURCES: + raise ValueError( + f"invalid source {source!r}; must be one of {sorted(VALID_SOURCES)}" + ) + + d_ro, t_ro, zi = utc_to_ro(extraction.data, extraction.ora_utc) + set_label = calc_set(d_ro, t_ro, zi, calendar) + pl_m = pl_marius(extraction.outcome_path, extraction.be_moved) + pl_t = pl_theoretical(extraction.max_reached) + + return { + "screenshot_file": extraction.screenshot_file, + "source": source, + "data": extraction.data, + "ora_utc": extraction.ora_utc, + "ora_ro": t_ro.strftime("%H:%M"), + "zi": zi, + "set": set_label, + "instrument": extraction.instrument, + "directie": extraction.directie, + "tf_mare": extraction.tf_mare, + "tf_mic": extraction.tf_mic, + "calitate": extraction.calitate, + "entry": f"{extraction.entry}", + "sl": f"{extraction.sl}", + "tp0": f"{extraction.tp0}", + "tp1": f"{extraction.tp1}", + "tp2": f"{extraction.tp2}", + "risc_pct": f"{extraction.risc_pct}", + "outcome_path": extraction.outcome_path, + "max_reached": extraction.max_reached, + "be_moved": "true" if extraction.be_moved else "false", + "confidence": extraction.confidence, + "ambiguities": json.dumps(extraction.ambiguities, ensure_ascii=False), + "note": extraction.note, + "pl_marius": _format_optional(pl_m), + "pl_theoretical": _format_optional(pl_t), + "indicator_version": str(meta["indicator_version"]), + "pl_overlay_version": str(meta["pl_overlay_version"]), + "csv_schema_version": str(meta["csv_schema_version"]), + } + + +def read_rows(csv_path: Path) -> list[dict[str, str]]: + """Read existing rows; return [] if the file does not exist or is empty.""" + if not csv_path.exists() or csv_path.stat().st_size == 0: + return [] + with csv_path.open("r", encoding="utf-8", newline="") as fh: + reader = csv.DictReader(fh) + return list(reader) + + +def _atomic_write(csv_path: Path, rows: list[dict[str, str]]) -> None: + csv_path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_name = tempfile.mkstemp( + prefix=csv_path.name + ".", + suffix=".tmp", + dir=str(csv_path.parent), + ) + try: + with os.fdopen(fd, "w", encoding="utf-8", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=list(CSV_COLUMNS)) + writer.writeheader() + for r in rows: + writer.writerow({k: r.get(k, "") for k in CSV_COLUMNS}) + os.replace(tmp_name, csv_path) + except Exception: + try: + os.unlink(tmp_name) + except OSError: + pass + raise + + +def append_row( + extraction: M2DExtraction, + source: str, + csv_path: Path, + meta_path: Path, + calendar_path: Path, + on_duplicate: Literal["raise", "skip"] = "raise", +) -> dict[str, str]: + """Append one extraction to the CSV. + + Dedup key: ``(screenshot_file, source)``. If a row with the same key + already exists, behaviour is controlled by ``on_duplicate``: + + - ``"raise"`` (default): raise ``ValueError``. + - ``"skip"``: leave the CSV untouched and return the *existing* row. + """ + meta = _load_meta(meta_path) + calendar = load_calendar(calendar_path) + row = build_row(extraction, source, meta, calendar) + + existing = read_rows(csv_path) + key = (row["screenshot_file"], row["source"]) + for r in existing: + if (r.get("screenshot_file"), r.get("source")) == key: + if on_duplicate == "skip": + return r + raise ValueError( + f"duplicate row: screenshot_file={key[0]!r} source={key[1]!r} " + f"already exists in {csv_path}" + ) + + existing.append(row) + _atomic_write(csv_path, existing) + return row + + +def append_row_from_json( + json_path: Path, + source: str, + csv_path: Path, + meta_path: Path, + calendar_path: Path, + on_duplicate: Literal["raise", "skip"] = "raise", +) -> dict[str, str]: + """Convenience wrapper: load JSON, validate, append.""" + with Path(json_path).open("r", encoding="utf-8") as fh: + payload = json.load(fh) + extraction = parse_extraction_dict(payload) + return append_row( + extraction=extraction, + source=source, + csv_path=csv_path, + meta_path=meta_path, + calendar_path=calendar_path, + on_duplicate=on_duplicate, + ) diff --git a/tests/test_append_row.py b/tests/test_append_row.py new file mode 100644 index 0000000..3859ca0 --- /dev/null +++ b/tests/test_append_row.py @@ -0,0 +1,258 @@ +"""Tests for scripts/append_row.py.""" + +from __future__ import annotations + +import csv +import json +import sys +from pathlib import Path + +import pytest +from pydantic import ValidationError + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from scripts.append_row import ( # noqa: E402 + CSV_COLUMNS, + VALID_SOURCES, + append_row, + append_row_from_json, + build_row, + read_rows, +) +from scripts.vision_schema import parse_extraction_dict # noqa: E402 + + +REPO_ROOT = Path(__file__).resolve().parent.parent +CALENDAR_PATH = REPO_ROOT / "calendar_evenimente.yaml" +META_PATH = REPO_ROOT / "data" / "_meta.yaml" + + +# --------------------------------------------------------------------------- +# fixtures / payload helpers +# --------------------------------------------------------------------------- + + +def _buy_payload(**overrides) -> dict: + # 2026-05-13 14:23 UTC == 17:23 RO (EEST, Wed) → Set A2. + base = { + "screenshot_file": "dia-2026-05-13-1.png", + "data": "2026-05-13", + "ora_utc": "14:23", + "instrument": "DIA", + "directie": "Buy", + "tf_mare": "5min", + "tf_mic": "1min", + "calitate": "Clară", + "entry": 400.0, + "sl": 399.0, + "tp0": 400.5, + "tp1": 401.0, + "tp2": 402.0, + "risc_pct": 0.25, + "outcome_path": "TP0→TP1", + "max_reached": "TP1", + "be_moved": True, + "confidence": "high", + "ambiguities": [], + "note": "", + } + base.update(overrides) + return base + + +@pytest.fixture +def csv_path(tmp_path: Path) -> Path: + return tmp_path / "trades.csv" + + +# --------------------------------------------------------------------------- +# build_row — computed fields +# --------------------------------------------------------------------------- + + +class TestBuildRow: + def setup_method(self) -> None: + import yaml + with META_PATH.open("r", encoding="utf-8") as fh: + self.meta = yaml.safe_load(fh) + from scripts.calendar_parse import load_calendar + self.calendar = load_calendar(CALENDAR_PATH) + + def test_happy_path_computed_fields(self) -> None: + extr = parse_extraction_dict(_buy_payload()) + row = build_row(extr, "manual", self.meta, self.calendar) + # 14:23 UTC on 2026-05-13 = 17:23 RO (EEST), Wed → A2 + assert row["ora_ro"] == "17:23" + assert row["zi"] == "Wed" + assert row["set"] == "A2" + # pl_marius for TP0->TP1 with be_moved=True is +0.50R + assert float(row["pl_marius"]) == pytest.approx(0.50) + # pl_theoretical for max_reached=TP1 is 0.333 + assert float(row["pl_theoretical"]) == pytest.approx(0.333) + # version stamps copied from meta + assert row["indicator_version"] == str(self.meta["indicator_version"]) + assert row["pl_overlay_version"] == str(self.meta["pl_overlay_version"]) + assert row["csv_schema_version"] == str(self.meta["csv_schema_version"]) + + def test_pending_overlay_is_blank(self) -> None: + extr = parse_extraction_dict( + _buy_payload(outcome_path="pending", max_reached="TP0") + ) + row = build_row(extr, "vision", self.meta, self.calendar) + # pl_marius returns None for pending → empty string in CSV + assert row["pl_marius"] == "" + # pl_theoretical always concrete + assert row["pl_theoretical"] != "" + + def test_invalid_source_rejected(self) -> None: + extr = parse_extraction_dict(_buy_payload()) + with pytest.raises(ValueError): + build_row(extr, "auto_magic", self.meta, self.calendar) + + def test_all_valid_sources_accepted(self) -> None: + extr = parse_extraction_dict(_buy_payload()) + for s in VALID_SOURCES: + row = build_row(extr, s, self.meta, self.calendar) + assert row["source"] == s + + +# --------------------------------------------------------------------------- +# append_row — happy path, dedup, atomic writes +# --------------------------------------------------------------------------- + + +class TestAppendRow: + def test_happy_path_writes_header_and_row(self, csv_path: Path) -> None: + extr = parse_extraction_dict(_buy_payload()) + row = append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) + assert csv_path.exists() + + with csv_path.open("r", encoding="utf-8", newline="") as fh: + reader = csv.DictReader(fh) + assert reader.fieldnames == list(CSV_COLUMNS) + rows = list(reader) + assert len(rows) == 1 + assert rows[0]["screenshot_file"] == row["screenshot_file"] + assert rows[0]["set"] == "A2" + assert rows[0]["source"] == "manual" + + def test_two_distinct_rows(self, csv_path: Path) -> None: + e1 = parse_extraction_dict(_buy_payload(screenshot_file="a.png")) + e2 = parse_extraction_dict(_buy_payload(screenshot_file="b.png")) + append_row(e1, "manual", csv_path, META_PATH, CALENDAR_PATH) + append_row(e2, "manual", csv_path, META_PATH, CALENDAR_PATH) + rows = read_rows(csv_path) + assert len(rows) == 2 + assert {r["screenshot_file"] for r in rows} == {"a.png", "b.png"} + + def test_dedup_raises(self, csv_path: Path) -> None: + extr = parse_extraction_dict(_buy_payload()) + append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) + with pytest.raises(ValueError, match="duplicate"): + append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) + # CSV still contains exactly the one row + assert len(read_rows(csv_path)) == 1 + + def test_dedup_skip(self, csv_path: Path) -> None: + extr = parse_extraction_dict(_buy_payload()) + first = append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) + # Mutate the extraction; the existing row should be returned untouched. + extr2 = parse_extraction_dict(_buy_payload(note="changed")) + existing = append_row( + extr2, "manual", csv_path, META_PATH, CALENDAR_PATH, on_duplicate="skip" + ) + assert existing["note"] == first["note"] == "" + assert len(read_rows(csv_path)) == 1 + + def test_calibration_coexistence(self, csv_path: Path) -> None: + """manual_calibration + vision_calibration on the SAME screenshot must coexist.""" + extr = parse_extraction_dict(_buy_payload()) + append_row(extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH) + # Vision leg may differ slightly — change entry by 0.1, still valid. + extr_vision = parse_extraction_dict( + _buy_payload(entry=400.1, confidence="medium") + ) + append_row( + extr_vision, "vision_calibration", csv_path, META_PATH, CALENDAR_PATH + ) + + rows = read_rows(csv_path) + assert len(rows) == 2 + sources = {r["source"] for r in rows} + assert sources == {"manual_calibration", "vision_calibration"} + # Same screenshot, different source ⇒ no dedup collision. + files = {r["screenshot_file"] for r in rows} + assert files == {extr.screenshot_file} + + def test_calibration_duplicate_same_source_rejected( + self, csv_path: Path + ) -> None: + extr = parse_extraction_dict(_buy_payload()) + append_row(extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH) + with pytest.raises(ValueError, match="duplicate"): + append_row( + extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH + ) + + +# --------------------------------------------------------------------------- +# Cross-field invalid input +# --------------------------------------------------------------------------- + + +class TestInvalidInput: + def test_buy_with_inverted_tp_rejected_before_append( + self, csv_path: Path + ) -> None: + # tp1 < tp0 violates Buy ordering: caught at validation, not by append_row. + with pytest.raises(ValidationError): + parse_extraction_dict( + _buy_payload(tp0=401.0, tp1=400.5, tp2=402.0) + ) + assert not csv_path.exists() # nothing written + + def test_outcome_path_sl_with_tp1_max_rejected(self, csv_path: Path) -> None: + with pytest.raises(ValidationError): + parse_extraction_dict( + _buy_payload(outcome_path="SL", max_reached="TP1") + ) + assert not csv_path.exists() + + def test_append_row_from_json_invalid_payload( + self, tmp_path: Path, csv_path: Path + ) -> None: + bad = tmp_path / "bad.json" + payload = _buy_payload(directie="Long") # invalid Literal + bad.write_text(json.dumps(payload), encoding="utf-8") + with pytest.raises(ValidationError): + append_row_from_json( + bad, "vision", csv_path, META_PATH, CALENDAR_PATH + ) + assert not csv_path.exists() + + +# --------------------------------------------------------------------------- +# Atomic write: no temp file remains on disk +# --------------------------------------------------------------------------- + + +class TestAtomicWrite: + def test_no_temp_file_left_behind(self, csv_path: Path) -> None: + extr = parse_extraction_dict(_buy_payload()) + append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) + leftovers = [ + p for p in csv_path.parent.iterdir() if p.name.endswith(".tmp") + ] + assert leftovers == [] + + def test_append_row_from_json_roundtrip( + self, tmp_path: Path, csv_path: Path + ) -> None: + good = tmp_path / "good.json" + good.write_text(json.dumps(_buy_payload()), encoding="utf-8") + row = append_row_from_json( + good, "vision", csv_path, META_PATH, CALENDAR_PATH + ) + assert row["source"] == "vision" + assert read_rows(csv_path)[0]["screenshot_file"] == row["screenshot_file"]