Files
atm-backtesting/scripts/regenerate_md.py

241 lines
6.1 KiB
Python

"""Regenerate ``data/jurnal.md`` from ``data/jurnal.csv``.
CSV is the source of truth (29 columns, schema owned by ``scripts.append_row``).
MD is a human-readable mirror with a curated 18-column table.
CLI: ``python scripts/regenerate_md.py [csv_path] [md_path]``
"""
from __future__ import annotations
import csv
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Sequence
from scripts.append_row import csv_columns
__all__ = ["MD_COLUMNS", "regenerate_md", "main"]
MD_COLUMNS: tuple[str, ...] = (
"#",
"Data",
"Zi",
"Ora RO",
"Set",
"Instrument",
"Direcție",
"Calitate",
"Entry",
"SL",
"TP0",
"TP1",
"TP2",
"outcome_path",
"P/L (Marius)",
"P/L (theoretic)",
"Source",
"Note",
)
_CSV_FIELDS_USED: tuple[str, ...] = (
"id",
"data",
"zi",
"ora_ro",
"set",
"instrument",
"directie",
"calitate",
"entry",
"sl",
"tp0",
"tp1",
"tp2",
"outcome_path",
"pl_marius",
"pl_theoretical",
"source",
"note",
)
_DIRECTIE_DISPLAY = {"long": "Buy", "short": "Sell", "buy": "Buy", "sell": "Sell"}
def _fmt_pl(value: str) -> str:
if value is None or value == "":
return "pending"
try:
return f"{float(value):+.2f}"
except ValueError:
return value
def _fmt_directie(value: str) -> str:
if not value:
return ""
return _DIRECTIE_DISPLAY.get(value.strip().lower(), value)
def _escape_cell(value: str) -> str:
return (value or "").replace("|", "\\|").replace("\n", " ").strip()
def _placeholder_md() -> str:
return (
"# Jurnal M2D (auto-generated)\n"
"\n"
"*Niciun trade încă. Adaugă unul prin `/m2d-log` sau `/backtest`.*\n"
)
def _atomic_write_text(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_name = tempfile.mkstemp(
prefix=path.name + ".", suffix=".tmp", dir=str(path.parent)
)
try:
with os.fdopen(fd, "w", encoding="utf-8", newline="\n") as fh:
fh.write(content)
os.replace(tmp_name, path)
except Exception:
try:
os.unlink(tmp_name)
except OSError:
pass
raise
def _row_to_cells(row: dict[str, str], display_index: int) -> tuple[str, ...]:
g = row.get
return (
str(display_index),
g("data", "") or "",
g("zi", "") or "",
g("ora_ro", "") or "",
g("set", "") or "",
g("instrument", "") or "",
_fmt_directie(g("directie", "") or ""),
g("calitate", "") or "",
g("entry", "") or "",
g("sl", "") or "",
g("tp0", "") or "",
g("tp1", "") or "",
g("tp2", "") or "",
g("outcome_path", "") or "",
_fmt_pl(g("pl_marius", "") or ""),
_fmt_pl(g("pl_theoretical", "") or ""),
g("source", "") or "",
g("note", "") or "",
)
def _render_table(rows: Sequence[dict[str, str]]) -> str:
header_line = "| " + " | ".join(MD_COLUMNS) + " |"
sep_line = "|" + "|".join(["---"] * len(MD_COLUMNS)) + "|"
data_lines = []
for i, row in enumerate(rows, start=1):
cells = _row_to_cells(row, i)
data_lines.append(
"| " + " | ".join(_escape_cell(c) for c in cells) + " |"
)
return "\n".join([header_line, sep_line, *data_lines])
def _render_md(rows: Sequence[dict[str, str]]) -> str:
if not rows:
return _placeholder_md()
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
table = _render_table(rows)
return (
"# Jurnal M2D (auto-generated from data/jurnal.csv)\n"
"\n"
f"Generated: {now_iso}\n"
f"Rows: {len(rows)}\n"
"\n"
f"{table}\n"
"\n"
"*Vezi `data/jurnal.csv` pentru toate cele 29 coloane "
"(id, ora_utc, tf_*, risc_pct, be_moved, max_reached, versions, extracted_at).*\n"
)
def _id_sort_key(raw: str) -> tuple[int, int | str]:
try:
return (0, int(raw))
except (ValueError, TypeError):
return (1, raw or "")
def _load_rows(csv_path: Path) -> list[dict[str, str]]:
"""Read CSV, returning rows sorted by id.
Schema drift handling:
- Extra header columns → warning to stderr, dropped.
- Missing required header columns → warning to stderr per affected row (row skipped).
"""
if not csv_path.exists() or csv_path.stat().st_size == 0:
return []
expected = set(csv_columns())
required = set(_CSV_FIELDS_USED)
with csv_path.open("r", encoding="utf-8", newline="") as fh:
reader = csv.DictReader(fh)
header = reader.fieldnames or []
header_set = set(header)
extras = [c for c in header if c not in expected]
if extras:
print(
f"regenerate_md: warning: unknown CSV columns ignored: {extras}",
file=sys.stderr,
)
missing_required = required - header_set
rows: list[dict[str, str]] = []
for raw in reader:
if missing_required:
print(
f"regenerate_md: warning: row skipped (missing required "
f"columns: {sorted(missing_required)})",
file=sys.stderr,
)
continue
rows.append({k: (raw.get(k) or "") for k in required})
rows.sort(key=lambda r: _id_sort_key(r.get("id", "")))
return rows
def regenerate_md(
csv_path: Path | str = "data/jurnal.csv",
md_path: Path | str = "data/jurnal.md",
) -> int:
"""Read CSV → write MD atomically. Returns count of trade rows written."""
csv_p = Path(csv_path)
md_p = Path(md_path)
rows = _load_rows(csv_p)
content = _render_md(rows)
_atomic_write_text(md_p, content)
return len(rows)
def main() -> int:
args = sys.argv[1:]
csv_arg = args[0] if len(args) >= 1 else "data/jurnal.csv"
md_arg = args[1] if len(args) >= 2 else "data/jurnal.md"
n = regenerate_md(csv_arg, md_arg)
print(f"regenerate_md: wrote {md_arg} with {n} row(s)")
return 0
if __name__ == "__main__":
raise SystemExit(main())