"""Tests for scripts/append_row.py.""" from __future__ import annotations import csv import json import sys from pathlib import Path import pytest from pydantic import ValidationError sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from scripts.append_row import ( # noqa: E402 CSV_COLUMNS, VALID_SOURCES, append_row, append_row_from_json, build_row, read_rows, ) from scripts.vision_schema import parse_extraction_dict # noqa: E402 REPO_ROOT = Path(__file__).resolve().parent.parent CALENDAR_PATH = REPO_ROOT / "calendar_evenimente.yaml" META_PATH = REPO_ROOT / "data" / "_meta.yaml" # --------------------------------------------------------------------------- # fixtures / payload helpers # --------------------------------------------------------------------------- def _buy_payload(**overrides) -> dict: # 2026-05-13 14:23 UTC == 17:23 RO (EEST, Wed) → Set A2. base = { "screenshot_file": "dia-2026-05-13-1.png", "data": "2026-05-13", "ora_utc": "14:23", "instrument": "DIA", "directie": "Buy", "tf_mare": "5min", "tf_mic": "1min", "calitate": "Clară", "entry": 400.0, "sl": 399.0, "tp0": 400.5, "tp1": 401.0, "tp2": 402.0, "risc_pct": 0.25, "outcome_path": "TP0→TP1", "max_reached": "TP1", "be_moved": True, "confidence": "high", "ambiguities": [], "note": "", } base.update(overrides) return base @pytest.fixture def csv_path(tmp_path: Path) -> Path: return tmp_path / "trades.csv" # --------------------------------------------------------------------------- # build_row — computed fields # --------------------------------------------------------------------------- class TestBuildRow: def setup_method(self) -> None: import yaml with META_PATH.open("r", encoding="utf-8") as fh: self.meta = yaml.safe_load(fh) from scripts.calendar_parse import load_calendar self.calendar = load_calendar(CALENDAR_PATH) def test_happy_path_computed_fields(self) -> None: extr = parse_extraction_dict(_buy_payload()) row = build_row(extr, "manual", self.meta, self.calendar) # 14:23 UTC on 2026-05-13 = 17:23 RO (EEST), Wed → A2 assert row["ora_ro"] == "17:23" assert row["zi"] == "Wed" assert row["set"] == "A2" # pl_marius for TP0->TP1 with be_moved=True is +0.50R assert float(row["pl_marius"]) == pytest.approx(0.50) # pl_theoretical for max_reached=TP1 is 0.333 assert float(row["pl_theoretical"]) == pytest.approx(0.333) # version stamps copied from meta assert row["indicator_version"] == str(self.meta["indicator_version"]) assert row["pl_overlay_version"] == str(self.meta["pl_overlay_version"]) assert row["csv_schema_version"] == str(self.meta["csv_schema_version"]) def test_pending_overlay_is_blank(self) -> None: extr = parse_extraction_dict( _buy_payload(outcome_path="pending", max_reached="TP0") ) row = build_row(extr, "vision", self.meta, self.calendar) # pl_marius returns None for pending → empty string in CSV assert row["pl_marius"] == "" # pl_theoretical always concrete assert row["pl_theoretical"] != "" def test_invalid_source_rejected(self) -> None: extr = parse_extraction_dict(_buy_payload()) with pytest.raises(ValueError): build_row(extr, "auto_magic", self.meta, self.calendar) def test_all_valid_sources_accepted(self) -> None: extr = parse_extraction_dict(_buy_payload()) for s in VALID_SOURCES: row = build_row(extr, s, self.meta, self.calendar) assert row["source"] == s # --------------------------------------------------------------------------- # append_row — happy path, dedup, atomic writes # --------------------------------------------------------------------------- class TestAppendRow: def test_happy_path_writes_header_and_row(self, csv_path: Path) -> None: extr = parse_extraction_dict(_buy_payload()) row = append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) assert csv_path.exists() with csv_path.open("r", encoding="utf-8", newline="") as fh: reader = csv.DictReader(fh) assert reader.fieldnames == list(CSV_COLUMNS) rows = list(reader) assert len(rows) == 1 assert rows[0]["screenshot_file"] == row["screenshot_file"] assert rows[0]["set"] == "A2" assert rows[0]["source"] == "manual" def test_two_distinct_rows(self, csv_path: Path) -> None: e1 = parse_extraction_dict(_buy_payload(screenshot_file="a.png")) e2 = parse_extraction_dict(_buy_payload(screenshot_file="b.png")) append_row(e1, "manual", csv_path, META_PATH, CALENDAR_PATH) append_row(e2, "manual", csv_path, META_PATH, CALENDAR_PATH) rows = read_rows(csv_path) assert len(rows) == 2 assert {r["screenshot_file"] for r in rows} == {"a.png", "b.png"} def test_dedup_raises(self, csv_path: Path) -> None: extr = parse_extraction_dict(_buy_payload()) append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) with pytest.raises(ValueError, match="duplicate"): append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) # CSV still contains exactly the one row assert len(read_rows(csv_path)) == 1 def test_dedup_skip(self, csv_path: Path) -> None: extr = parse_extraction_dict(_buy_payload()) first = append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) # Mutate the extraction; the existing row should be returned untouched. extr2 = parse_extraction_dict(_buy_payload(note="changed")) existing = append_row( extr2, "manual", csv_path, META_PATH, CALENDAR_PATH, on_duplicate="skip" ) assert existing["note"] == first["note"] == "" assert len(read_rows(csv_path)) == 1 def test_calibration_coexistence(self, csv_path: Path) -> None: """manual_calibration + vision_calibration on the SAME screenshot must coexist.""" extr = parse_extraction_dict(_buy_payload()) append_row(extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH) # Vision leg may differ slightly — change entry by 0.1, still valid. extr_vision = parse_extraction_dict( _buy_payload(entry=400.1, confidence="medium") ) append_row( extr_vision, "vision_calibration", csv_path, META_PATH, CALENDAR_PATH ) rows = read_rows(csv_path) assert len(rows) == 2 sources = {r["source"] for r in rows} assert sources == {"manual_calibration", "vision_calibration"} # Same screenshot, different source ⇒ no dedup collision. files = {r["screenshot_file"] for r in rows} assert files == {extr.screenshot_file} def test_calibration_duplicate_same_source_rejected( self, csv_path: Path ) -> None: extr = parse_extraction_dict(_buy_payload()) append_row(extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH) with pytest.raises(ValueError, match="duplicate"): append_row( extr, "manual_calibration", csv_path, META_PATH, CALENDAR_PATH ) # --------------------------------------------------------------------------- # Cross-field invalid input # --------------------------------------------------------------------------- class TestInvalidInput: def test_buy_with_inverted_tp_rejected_before_append( self, csv_path: Path ) -> None: # tp1 < tp0 violates Buy ordering: caught at validation, not by append_row. with pytest.raises(ValidationError): parse_extraction_dict( _buy_payload(tp0=401.0, tp1=400.5, tp2=402.0) ) assert not csv_path.exists() # nothing written def test_outcome_path_sl_with_tp1_max_rejected(self, csv_path: Path) -> None: with pytest.raises(ValidationError): parse_extraction_dict( _buy_payload(outcome_path="SL", max_reached="TP1") ) assert not csv_path.exists() def test_append_row_from_json_invalid_payload( self, tmp_path: Path, csv_path: Path ) -> None: bad = tmp_path / "bad.json" payload = _buy_payload(directie="Long") # invalid Literal bad.write_text(json.dumps(payload), encoding="utf-8") with pytest.raises(ValidationError): append_row_from_json( bad, "vision", csv_path, META_PATH, CALENDAR_PATH ) assert not csv_path.exists() # --------------------------------------------------------------------------- # Atomic write: no temp file remains on disk # --------------------------------------------------------------------------- class TestAtomicWrite: def test_no_temp_file_left_behind(self, csv_path: Path) -> None: extr = parse_extraction_dict(_buy_payload()) append_row(extr, "manual", csv_path, META_PATH, CALENDAR_PATH) leftovers = [ p for p in csv_path.parent.iterdir() if p.name.endswith(".tmp") ] assert leftovers == [] def test_append_row_from_json_roundtrip( self, tmp_path: Path, csv_path: Path ) -> None: good = tmp_path / "good.json" good.write_text(json.dumps(_buy_payload()), encoding="utf-8") row = append_row_from_json( good, "vision", csv_path, META_PATH, CALENDAR_PATH ) assert row["source"] == "vision" assert read_rows(csv_path)[0]["screenshot_file"] == row["screenshot_file"]