"""Regenerate ``data/jurnal.md`` from ``data/jurnal.csv``. CSV is the source of truth (29 columns, schema owned by ``scripts.append_row``). MD is a human-readable mirror with a curated 18-column table. CLI: ``python scripts/regenerate_md.py [csv_path] [md_path]`` """ from __future__ import annotations import csv import os import sys import tempfile from datetime import datetime, timezone from pathlib import Path from typing import Sequence from scripts.append_row import csv_columns __all__ = ["MD_COLUMNS", "regenerate_md", "main"] MD_COLUMNS: tuple[str, ...] = ( "#", "Data", "Zi", "Ora RO", "Set", "Instrument", "Direcție", "Calitate", "Entry", "SL", "TP0", "TP1", "TP2", "outcome_path", "P/L (Marius)", "P/L (theoretic)", "Source", "Note", ) _CSV_FIELDS_USED: tuple[str, ...] = ( "id", "data", "zi", "ora_ro", "set", "instrument", "directie", "calitate", "entry", "sl", "tp0", "tp1", "tp2", "outcome_path", "pl_marius", "pl_theoretical", "source", "note", ) _DIRECTIE_DISPLAY = {"long": "Buy", "short": "Sell", "buy": "Buy", "sell": "Sell"} def _fmt_pl(value: str) -> str: if value is None or value == "": return "pending" try: return f"{float(value):+.2f}" except ValueError: return value def _fmt_directie(value: str) -> str: if not value: return "" return _DIRECTIE_DISPLAY.get(value.strip().lower(), value) def _escape_cell(value: str) -> str: return (value or "").replace("|", "\\|").replace("\n", " ").strip() def _placeholder_md() -> str: return ( "# Jurnal M2D (auto-generated)\n" "\n" "*Niciun trade încă. Adaugă unul prin `/m2d-log` sau `/backtest`.*\n" ) def _atomic_write_text(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_name = tempfile.mkstemp( prefix=path.name + ".", suffix=".tmp", dir=str(path.parent) ) try: with os.fdopen(fd, "w", encoding="utf-8", newline="\n") as fh: fh.write(content) os.replace(tmp_name, path) except Exception: try: os.unlink(tmp_name) except OSError: pass raise def _row_to_cells(row: dict[str, str], display_index: int) -> tuple[str, ...]: g = row.get return ( str(display_index), g("data", "") or "", g("zi", "") or "", g("ora_ro", "") or "", g("set", "") or "", g("instrument", "") or "", _fmt_directie(g("directie", "") or ""), g("calitate", "") or "", g("entry", "") or "", g("sl", "") or "", g("tp0", "") or "", g("tp1", "") or "", g("tp2", "") or "", g("outcome_path", "") or "", _fmt_pl(g("pl_marius", "") or ""), _fmt_pl(g("pl_theoretical", "") or ""), g("source", "") or "", g("note", "") or "", ) def _render_table(rows: Sequence[dict[str, str]]) -> str: header_line = "| " + " | ".join(MD_COLUMNS) + " |" sep_line = "|" + "|".join(["---"] * len(MD_COLUMNS)) + "|" data_lines = [] for i, row in enumerate(rows, start=1): cells = _row_to_cells(row, i) data_lines.append( "| " + " | ".join(_escape_cell(c) for c in cells) + " |" ) return "\n".join([header_line, sep_line, *data_lines]) def _render_md(rows: Sequence[dict[str, str]]) -> str: if not rows: return _placeholder_md() now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") table = _render_table(rows) return ( "# Jurnal M2D (auto-generated from data/jurnal.csv)\n" "\n" f"Generated: {now_iso}\n" f"Rows: {len(rows)}\n" "\n" f"{table}\n" "\n" "*Vezi `data/jurnal.csv` pentru toate cele 29 coloane " "(id, ora_utc, tf_*, risc_pct, be_moved, max_reached, versions, extracted_at).*\n" ) def _id_sort_key(raw: str) -> tuple[int, int | str]: try: return (0, int(raw)) except (ValueError, TypeError): return (1, raw or "") def _load_rows(csv_path: Path) -> list[dict[str, str]]: """Read CSV, returning rows sorted by id. Schema drift handling: - Extra header columns → warning to stderr, dropped. - Missing required header columns → warning to stderr per affected row (row skipped). """ if not csv_path.exists() or csv_path.stat().st_size == 0: return [] expected = set(csv_columns()) required = set(_CSV_FIELDS_USED) with csv_path.open("r", encoding="utf-8", newline="") as fh: reader = csv.DictReader(fh) header = reader.fieldnames or [] header_set = set(header) extras = [c for c in header if c not in expected] if extras: print( f"regenerate_md: warning: unknown CSV columns ignored: {extras}", file=sys.stderr, ) missing_required = required - header_set rows: list[dict[str, str]] = [] for raw in reader: if missing_required: print( f"regenerate_md: warning: row skipped (missing required " f"columns: {sorted(missing_required)})", file=sys.stderr, ) continue rows.append({k: (raw.get(k) or "") for k in required}) rows.sort(key=lambda r: _id_sort_key(r.get("id", ""))) return rows def regenerate_md( csv_path: Path | str = "data/jurnal.csv", md_path: Path | str = "data/jurnal.md", ) -> int: """Read CSV → write MD atomically. Returns count of trade rows written.""" csv_p = Path(csv_path) md_p = Path(md_path) rows = _load_rows(csv_p) content = _render_md(rows) _atomic_write_text(md_p, content) return len(rows) def main() -> int: args = sys.argv[1:] csv_arg = args[0] if len(args) >= 1 else "data/jurnal.csv" md_arg = args[1] if len(args) >= 2 else "data/jurnal.md" n = regenerate_md(csv_arg, md_arg) print(f"regenerate_md: wrote {md_arg} with {n} row(s)") return 0 if __name__ == "__main__": raise SystemExit(main())