"""Tests for scripts/regenerate_md.py.""" from __future__ import annotations import csv import sys from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from scripts.append_row import csv_columns # noqa: E402 from scripts.regenerate_md import MD_COLUMNS, regenerate_md # noqa: E402 def _row(**overrides: str) -> dict[str, str]: base = { "id": "1", "screenshot_file": "2026-05-13_dia_5min.png", "source": "vision", "data": "2026-05-13", "zi": "Mi", "ora_ro": "17:23", "ora_utc": "14:23", "instrument": "DIA", "directie": "long", "tf_mare": "5min", "tf_mic": "1min", "calitate": "Clară", "entry": "497.42", "sl": "496.80", "tp0": "497.67", "tp1": "497.79", "tp2": "498.04", "risc_pct": "0.50", "outcome_path": "TP0→TP1", "max_reached": "TP1", "be_moved": "true", "pl_marius": "0.5000", "pl_theoretical": "0.3330", "set": "A2", "indicator_version": "1", "pl_overlay_version": "1", "csv_schema_version": "1", "extracted_at": "2026-05-13T14:30:00Z", "note": "", } base.update(overrides) return base def _write_csv( path: Path, rows: list[dict[str, str]], extra_columns: list[str] | None = None, ) -> None: fieldnames = csv_columns() if extra_columns: fieldnames = fieldnames + extra_columns with path.open("w", encoding="utf-8", newline="") as fh: writer = csv.DictWriter(fh, fieldnames=fieldnames) writer.writeheader() for r in rows: writer.writerow({k: r.get(k, "") for k in fieldnames}) def _data_lines(md_text: str) -> list[str]: header_prefix = "| " + MD_COLUMNS[0] + " | " + MD_COLUMNS[1] return [ ln for ln in md_text.splitlines() if ln.startswith("|") and not ln.startswith(header_prefix) and not ln.startswith("|---") ] def test_empty_csv_placeholder(tmp_path: Path) -> None: csv_p = tmp_path / "jurnal.csv" md_p = tmp_path / "jurnal.md" _write_csv(csv_p, []) n = regenerate_md(csv_p, md_p) assert n == 0 content = md_p.read_text(encoding="utf-8") assert "# Jurnal M2D (auto-generated)" in content assert "Niciun trade încă" in content assert "| # |" not in content def test_missing_csv_placeholder(tmp_path: Path) -> None: csv_p = tmp_path / "does_not_exist.csv" md_p = tmp_path / "jurnal.md" n = regenerate_md(csv_p, md_p) assert n == 0 content = md_p.read_text(encoding="utf-8") assert "Niciun trade încă" in content assert md_p.exists() def test_single_row_format(tmp_path: Path) -> None: csv_p = tmp_path / "jurnal.csv" md_p = tmp_path / "jurnal.md" _write_csv(csv_p, [_row()]) n = regenerate_md(csv_p, md_p) assert n == 1 content = md_p.read_text(encoding="utf-8") assert "# Jurnal M2D (auto-generated from data/jurnal.csv)" in content assert "Rows: 1" in content header_line = "| " + " | ".join(MD_COLUMNS) + " |" assert header_line in content rows = _data_lines(content) assert len(rows) == 1 cells = [c.strip() for c in rows[0].strip("|").split("|")] assert cells[0] == "1" assert cells[1] == "2026-05-13" assert cells[2] == "Mi" assert cells[3] == "17:23" assert cells[4] == "A2" assert cells[5] == "DIA" assert cells[6] == "Buy" assert cells[7] == "Clară" assert cells[13] == "TP0→TP1" assert cells[14] == "+0.50" assert cells[15] == "+0.33" assert cells[16] == "vision" def test_three_rows(tmp_path: Path) -> None: csv_p = tmp_path / "jurnal.csv" md_p = tmp_path / "jurnal.md" rows = [ _row(id="3", data="2026-05-15", pl_marius="-1.0000"), _row(id="1", data="2026-05-13"), _row(id="2", data="2026-05-14", pl_marius="0.2000"), ] _write_csv(csv_p, rows) n = regenerate_md(csv_p, md_p) assert n == 3 content = md_p.read_text(encoding="utf-8") assert "Rows: 3" in content data = _data_lines(content) assert len(data) == 3 assert "| 1 | 2026-05-13 |" in data[0] assert "| 2 | 2026-05-14 |" in data[1] assert "| 3 | 2026-05-15 |" in data[2] def test_pending_pl_displayed(tmp_path: Path) -> None: csv_p = tmp_path / "jurnal.csv" md_p = tmp_path / "jurnal.md" _write_csv(csv_p, [_row(pl_marius="", pl_theoretical="")]) n = regenerate_md(csv_p, md_p) assert n == 1 content = md_p.read_text(encoding="utf-8") rows = _data_lines(content) cells = [c.strip() for c in rows[0].strip("|").split("|")] assert cells[14] == "pending" assert cells[15] == "pending" def test_unknown_column_graceful( tmp_path: Path, capsys: pytest.CaptureFixture[str] ) -> None: csv_p = tmp_path / "jurnal.csv" md_p = tmp_path / "jurnal.md" _write_csv(csv_p, [_row()], extra_columns=["extra_field"]) n = regenerate_md(csv_p, md_p) assert n == 1 content = md_p.read_text(encoding="utf-8") assert "Rows: 1" in content captured = capsys.readouterr() assert "unknown CSV columns ignored" in captured.err assert "extra_field" in captured.err def test_atomic_write_no_tmp_leftover(tmp_path: Path) -> None: csv_p = tmp_path / "jurnal.csv" md_p = tmp_path / "jurnal.md" _write_csv(csv_p, [_row()]) regenerate_md(csv_p, md_p) leftovers = list(tmp_path.glob("*.tmp")) assert leftovers == [] assert md_p.exists() def test_rows_count_returned(tmp_path: Path) -> None: csv_p = tmp_path / "jurnal.csv" md_p = tmp_path / "jurnal.md" _write_csv(csv_p, [_row(id=str(i)) for i in range(1, 6)]) n = regenerate_md(csv_p, md_p) assert n == 5