Files
atm-backtesting/scripts/append_row.py

312 lines
9.2 KiB
Python

"""Append a validated M2D extraction to ``data/jurnal.csv``.
Pipeline:
JSON file --> pydantic validate (M2DExtraction)
--> load data/_meta.yaml (versions)
--> compute id, ora_ro, zi, set, pl_marius, pl_theoretical, extracted_at
--> dedup on (screenshot_file, source)
--> atomic CSV write (sibling .tmp + os.replace)
Source values
- ``vision`` : produced by the vision subagent
- ``manual`` : Marius logged by hand
- ``manual_calibration`` : calibration P4 — manual leg
- ``vision_calibration`` : calibration P4 — vision leg
A row with ``source=manual_calibration`` and a row with ``source=vision_calibration``
for the *same* screenshot are allowed to coexist (different dedup keys).
Failure mode: ``append_extraction`` NEVER raises. On any error (missing JSON,
pydantic ValidationError, dedup hit, etc.) it returns
``{"status": "rejected", "reason": "...", "id": None, "row": None}`` so the
caller (a slash command) can decide what to do with the screenshot
(move to ``needs_review/``, log to workflow, etc.).
"""
from __future__ import annotations
import csv
import json
import os
import traceback
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Literal
import yaml
from pydantic import ValidationError
from scripts.calendar_parse import calc_set, load_calendar, utc_to_ro
from scripts.pl_calc import pl_marius, pl_theoretical
from scripts.vision_schema import M2DExtraction, parse_extraction
__all__ = [
"CSV_COLUMNS",
"VALID_SOURCES",
"ZI_RO_MAP",
"csv_columns",
"append_extraction",
]
Source = Literal["vision", "manual", "manual_calibration", "vision_calibration"]
VALID_SOURCES: frozenset[str] = frozenset(
{"vision", "manual", "manual_calibration", "vision_calibration"}
)
# Canonical column order (29) — must stay stable; regenerate_md + stats depend on it.
CSV_COLUMNS: tuple[str, ...] = (
"id",
"screenshot_file",
"source",
"data",
"zi",
"ora_ro",
"ora_utc",
"instrument",
"directie",
"tf_mare",
"tf_mic",
"calitate",
"entry",
"sl",
"tp0",
"tp1",
"tp2",
"risc_pct",
"outcome_path",
"max_reached",
"be_moved",
"pl_marius",
"pl_theoretical",
"set",
"indicator_version",
"pl_overlay_version",
"csv_schema_version",
"extracted_at",
"note",
)
ZI_RO_MAP: dict[str, str] = {
"Mon": "Lu",
"Tue": "Ma",
"Wed": "Mi",
"Thu": "Jo",
"Fri": "Vi",
"Sat": "Sa",
"Sun": "Du",
}
def csv_columns() -> list[str]:
"""Return the 29-column header in canonical order."""
return list(CSV_COLUMNS)
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _load_meta(meta_path: Path) -> dict[str, Any]:
with meta_path.open("r", encoding="utf-8") as fh:
meta = yaml.safe_load(fh) or {}
required = ("indicator_version", "pl_overlay_version", "csv_schema_version")
missing = [k for k in required if k not in meta]
if missing:
raise ValueError(f"_meta.yaml missing required keys: {missing}")
return meta
def _read_existing_rows(csv_path: Path) -> list[dict[str, str]]:
if not csv_path.exists() or csv_path.stat().st_size == 0:
return []
with csv_path.open("r", encoding="utf-8", newline="") as fh:
reader = csv.DictReader(fh)
return list(reader)
def _next_id(rows: list[dict[str, str]]) -> int:
max_id = 0
for r in rows:
raw = r.get("id", "")
if not raw:
continue
try:
v = int(raw)
except (TypeError, ValueError):
continue
if v > max_id:
max_id = v
return max_id + 1
def _format_optional(value: float | None) -> str:
return "" if value is None else f"{value:.4f}"
def _write_csv_atomic(
csv_path: Path, rows: list[dict[str, str]], columns: list[str]
) -> None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
tmp = csv_path.with_suffix(csv_path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(fh, fieldnames=columns)
writer.writeheader()
for row in rows:
writer.writerow({k: row.get(k, "") for k in columns})
os.replace(tmp, csv_path)
def _build_row(
extraction: M2DExtraction,
*,
source: str,
row_id: int,
meta: dict[str, Any],
calendar: list[dict[str, Any]],
extracted_at: str,
) -> dict[str, str]:
d_ro, t_ro, day_short = utc_to_ro(extraction.data, extraction.ora_utc)
set_label = calc_set(d_ro, t_ro, day_short, calendar)
pl_m = pl_marius(extraction.outcome_path, extraction.be_moved)
pl_t = pl_theoretical(extraction.max_reached)
zi_ro = ZI_RO_MAP[day_short]
return {
"id": str(row_id),
"screenshot_file": extraction.screenshot_file,
"source": source,
"data": extraction.data,
"zi": zi_ro,
"ora_ro": t_ro.strftime("%H:%M"),
"ora_utc": extraction.ora_utc,
"instrument": extraction.instrument,
"directie": extraction.directie,
"tf_mare": extraction.tf_mare,
"tf_mic": extraction.tf_mic,
"calitate": extraction.calitate,
"entry": f"{extraction.entry}",
"sl": f"{extraction.sl}",
"tp0": f"{extraction.tp0}",
"tp1": f"{extraction.tp1}",
"tp2": f"{extraction.tp2}",
"risc_pct": f"{extraction.risc_pct}",
"outcome_path": extraction.outcome_path,
"max_reached": extraction.max_reached,
"be_moved": str(extraction.be_moved),
"pl_marius": _format_optional(pl_m),
"pl_theoretical": _format_optional(pl_t),
"set": set_label,
"indicator_version": str(meta["indicator_version"]),
"pl_overlay_version": str(meta["pl_overlay_version"]),
"csv_schema_version": str(meta["csv_schema_version"]),
"extracted_at": extracted_at,
"note": extraction.note,
}
def _reject(reason: str) -> dict[str, Any]:
return {"status": "rejected", "reason": reason, "id": None, "row": None}
# ---------------------------------------------------------------------------
# public API
# ---------------------------------------------------------------------------
def append_extraction(
json_path: Path | str,
source: str,
csv_path: Path | str = "data/jurnal.csv",
meta_path: Path | str = "data/_meta.yaml",
calendar_path: Path | str = "calendar_evenimente.yaml",
) -> dict[str, Any]:
"""Append one validated extraction to the jurnal CSV.
Never raises. Returns one of:
- ``{"status": "ok", "reason": "", "id": <int>, "row": <dict>}``
- ``{"status": "rejected", "reason": <str>, "id": None, "row": None}``
"""
json_path = Path(json_path)
csv_path = Path(csv_path)
meta_path = Path(meta_path)
calendar_path = Path(calendar_path)
if source not in VALID_SOURCES:
return _reject(
f"invalid source {source!r}; must be one of {sorted(VALID_SOURCES)}"
)
if not json_path.exists():
return _reject(f"JSON file not found: {json_path}")
try:
with json_path.open("r", encoding="utf-8") as fh:
raw = fh.read()
except OSError as exc:
return _reject(f"failed to read JSON {json_path}: {exc}")
try:
extraction = parse_extraction(raw)
except ValidationError as exc:
return _reject(f"validation error: {exc}")
except (ValueError, json.JSONDecodeError) as exc:
return _reject(f"validation error (json parse): {exc}")
try:
meta = _load_meta(meta_path)
except (FileNotFoundError, OSError) as exc:
return _reject(f"_meta.yaml not found: {exc}")
except (ValueError, yaml.YAMLError) as exc:
return _reject(f"_meta.yaml invalid: {exc}")
try:
calendar = load_calendar(calendar_path)
except (FileNotFoundError, OSError) as exc:
return _reject(f"calendar not found: {exc}")
except (ValueError, yaml.YAMLError) as exc:
return _reject(f"calendar invalid: {exc}")
try:
existing = _read_existing_rows(csv_path)
except OSError as exc:
return _reject(f"failed to read existing CSV {csv_path}: {exc}")
key = (extraction.screenshot_file, source)
for r in existing:
if (r.get("screenshot_file"), r.get("source")) == key:
return _reject(
f"duplicate row: screenshot_file={key[0]!r} source={key[1]!r}"
)
row_id = _next_id(existing)
extracted_at = (
datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
)
try:
row = _build_row(
extraction,
source=source,
row_id=row_id,
meta=meta,
calendar=calendar,
extracted_at=extracted_at,
)
except (KeyError, ValueError) as exc:
return _reject(f"derived-field computation failed: {exc}")
try:
_write_csv_atomic(csv_path, [*existing, row], list(CSV_COLUMNS))
except OSError as exc:
return _reject(
f"atomic write failed: {exc}\n{traceback.format_exc()}"
)
return {"status": "ok", "reason": "", "id": row_id, "row": row}