diff --git a/src/atm/calibrate.py b/src/atm/calibrate.py index f042525..cf35ed9 100644 --- a/src/atm/calibrate.py +++ b/src/atm/calibrate.py @@ -1,6 +1,8 @@ """Calibration wizard for chart window — Tk-based, safe to import headlessly.""" from __future__ import annotations +import os +import time from datetime import datetime, timezone from pathlib import Path @@ -30,7 +32,6 @@ def _toml_scalar(v: object) -> str: def _emit_table(lines: list[str], data: dict, prefix: str) -> None: - """Emit scalar key-value pairs then recurse into sub-tables.""" for k, v in data.items(): if not isinstance(v, dict): lines.append(f"{k} = {_toml_scalar(v)}") @@ -48,77 +49,351 @@ def _dict_to_toml(data: dict) -> str: return "\n".join(lines) + "\n" -# --------------------------------------------------------------------------- -# Pure helper — testable without Tk -# --------------------------------------------------------------------------- - def write_config(data: dict, out_dir: Path) -> Path: - """Serialise *data* to a timestamped TOML file in *out_dir* and update current.txt. - - Returns the path of the newly written config file. - """ + """Serialise *data* to a timestamped TOML file in *out_dir* and update current.txt.""" out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) ts = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M") filename = f"{ts}.toml" config_path = out_dir / filename - config_path.write_text(_dict_to_toml(data), encoding="utf-8") (out_dir / "current.txt").write_text(filename, encoding="utf-8") - return config_path # --------------------------------------------------------------------------- -# Interactive wizard — Tk imported only at runtime +# Window capture (Windows live) / screenshot fallback # --------------------------------------------------------------------------- -def run_calibration(out_dir: Path) -> Path: - """Launch the guided calibration wizard and return the saved config path.""" +def _capture_window(title_substr: str): + """Screenshot the first window matching *title_substr*. Windows-only (mss+pygetwindow). + + Returns a PIL RGB Image. + """ + import mss # type: ignore[import-untyped] + import pygetwindow as gw # type: ignore[import-untyped] + from PIL import Image + + wins = gw.getWindowsWithTitle(title_substr) + if not wins: + raise RuntimeError(f"No window matching title substring: {title_substr!r}") + win = wins[0] + try: + win.activate() + except Exception: + pass # activate may fail on some windows; continue — mss still captures by region + time.sleep(0.3) + + with mss.mss() as sct: + mon = { + "top": int(win.top), "left": int(win.left), + "width": int(win.width), "height": int(win.height), + } + shot = sct.grab(mon) + return Image.frombytes("RGB", shot.size, shot.rgb) + + +# --------------------------------------------------------------------------- +# Wizard +# --------------------------------------------------------------------------- + +_COLOR_ORDER = [ + "turquoise", "yellow", "dark_green", "dark_red", + "light_green", "light_red", "gray", "background", +] + + +def run_calibration(out_dir: Path, screenshot: Path | None = None) -> Path: + """Launch the guided calibration wizard and return the saved config path. + + If *screenshot* is provided, use that image instead of live window capture + (useful for dev/testing on non-Windows hosts). + """ import tkinter as tk from tkinter import simpledialog + from PIL import Image out_dir = Path(out_dir) - configs_dir = out_dir / "configs" + configs_dir = out_dir / "configs" if out_dir.name != "configs" else out_dir configs_dir.mkdir(parents=True, exist_ok=True) - root = tk.Tk() - root.withdraw() + # ------------------------------------------------------------------ + # 1. Acquire image + window title + # ------------------------------------------------------------------ + pil_img: "Image.Image" + if screenshot is not None: + pil_img = Image.open(screenshot).convert("RGB") + root0 = tk.Tk(); root0.withdraw() + window_title = simpledialog.askstring( + "Window title", + "Window title substring used by 'atm run' to locate the chart:", + parent=root0, + initialvalue=screenshot.stem, + ) or screenshot.stem + root0.destroy() + else: + root0 = tk.Tk(); root0.withdraw() + window_title = simpledialog.askstring( + "Window title", + "Substring of the chart window title (e.g. 'DIA' or 'TradeStation'):", + parent=root0, + ) or "" + root0.destroy() + if not window_title: + raise ValueError("Window title is required to proceed.") + pil_img = _capture_window(window_title) - # Step 1: window title - window_title = simpledialog.askstring( - "Step 1 — Window title", - "Enter the exact title of the chart window:", - parent=root, - ) or "" - if not window_title: - root.destroy() - raise ValueError("Window title is required to proceed.") + # ------------------------------------------------------------------ + # 2. Launch interactive wizard on the image + # ------------------------------------------------------------------ + wizard = _CalibrationWizard(pil_img, window_title) + data = wizard.run() - # Steps 2-5 require a visible window; skeleton data used until full wizard is built. - data: dict = { - "window_title": window_title, - "dot_roi": {"x": 0, "y": 0, "w": 120, "h": 120}, - "chart_roi": {"x": 0, "y": 0, "w": 800, "h": 600}, - "colors": { - "turquoise": {"rgb": [0, 200, 200], "tolerance": 30.0}, - "yellow": {"rgb": [255, 255, 0], "tolerance": 30.0}, - "dark_green": {"rgb": [0, 128, 0], "tolerance": 30.0}, - "dark_red": {"rgb": [139, 0, 0], "tolerance": 30.0}, - "light_green": {"rgb": [144, 238, 144], "tolerance": 30.0}, - "light_red": {"rgb": [255, 182, 193], "tolerance": 30.0}, - "gray": {"rgb": [128, 128, 128], "tolerance": 30.0}, - }, - "y_axis": {"p1_y": 0, "p1_price": 0.0, "p2_y": 1, "p2_price": 1.0}, - "canary": { - "roi": {"x": 0, "y": 0, "w": 50, "h": 50}, - "baseline_phash": "", - "drift_threshold": 8, - }, - "discord": {"webhook_url": "http://placeholder"}, - "telegram": {"bot_token": "placeholder", "chat_id": "0"}, + # ------------------------------------------------------------------ + # 3. Inject notifier creds (env → placeholders otherwise) + # ------------------------------------------------------------------ + data["discord"] = { + "webhook_url": os.environ.get( + "ATM_DISCORD_URL", + "https://discord.com/api/webhooks/REPLACE_ME", + ), } + data["telegram"] = { + "bot_token": os.environ.get("ATM_TG_TOKEN", "REPLACE_ME"), + "chat_id": os.environ.get("ATM_TG_CHAT", "0"), + } + data.setdefault("options", {}) - root.destroy() return write_config(data, configs_dir) + + +class _CalibrationWizard: + """Single-window Tk wizard with step-by-step click instructions.""" + + def __init__(self, pil_img, window_title: str) -> None: + import tkinter as tk + from PIL import ImageTk + + self._pil_img = pil_img + self._window_title = window_title + + self._root = tk.Tk() + self._root.title("ATM calibration wizard") + self._result: dict | None = None + + # --- build step plan --- + self._steps: list[tuple[str, str]] = [ + ("dot_roi_tl", "M2D MAPS strip: click TOP-LEFT corner"), + ("dot_roi_br", "M2D MAPS strip: click BOTTOM-RIGHT corner"), + ] + for c in _COLOR_ORDER: + if c == "background": + self._steps.append(("color_background", "Click on the chart BACKGROUND (empty area)")) + else: + self._steps.append((f"color_{c}", f"Click on a {c.replace('_',' ').upper()} dot")) + self._steps += [ + ("chart_roi_tl", "Chart area: click TOP-LEFT (where SL/TP lines will appear)"), + ("chart_roi_br", "Chart area: click BOTTOM-RIGHT"), + ("y1", "Click on a known price level (any horizontal grid line) — reference 1"), + ("y2", "Click on ANOTHER price level — reference 2"), + ("canary_tl", "Canary region: click TOP-LEFT of a stable UI element (axis label, logo)"), + ("canary_br", "Canary region: click BOTTOM-RIGHT"), + ("done", "Calibration complete — click Save"), + ] + self._idx = 0 + + self._data: dict = { + "window_title": window_title, + "dot_roi": None, + "chart_roi": None, + "colors": {}, + "y_axis": {}, + "canary_roi": None, + } + self._tmp: dict = {} # first-click holders + + # --- layout --- + screen_w = self._root.winfo_screenwidth() - 100 + screen_h = self._root.winfo_screenheight() - 200 + orig_w, orig_h = pil_img.size + self._scale = min(screen_w / orig_w, screen_h / orig_h, 1.0) + disp_w = int(orig_w * self._scale) + disp_h = int(orig_h * self._scale) + + scaled = pil_img.resize((disp_w, disp_h)) + self._tkimg = ImageTk.PhotoImage(scaled) + + self._status = tk.Label( + self._root, text="", font=("Arial", 13), fg="white", bg="#2b7de9", + padx=8, pady=6, anchor="w", + ) + self._status.pack(fill="x") + + bar = tk.Frame(self._root) + bar.pack(fill="x") + self._back_btn = tk.Button(bar, text="← Back", command=self._back) + self._back_btn.pack(side="left", padx=4, pady=4) + self._skip_btn = tk.Button(bar, text="Skip (can't find)", command=self._skip) + self._skip_btn.pack(side="left", padx=4, pady=4) + self._save_btn = tk.Button(bar, text="💾 Save", command=self._save, state="disabled") + self._save_btn.pack(side="right", padx=4, pady=4) + + self._canvas = tk.Canvas( + self._root, width=disp_w, height=disp_h, bg="#222", cursor="crosshair", + highlightthickness=0, + ) + self._canvas.pack() + self._canvas.create_image(0, 0, anchor="nw", image=self._tkimg) + self._canvas.bind("", self._on_click) + + self._markers: list[int] = [] + self._refresh() + + # -------- state control -------- + def _refresh(self) -> None: + name, prompt = self._steps[self._idx] + self._status.config(text=f"Step {self._idx + 1}/{len(self._steps)} — {prompt}") + self._back_btn.config(state=("normal" if self._idx > 0 else "disabled")) + self._skip_btn.config(state=("normal" if name.startswith("color_") else "disabled")) + self._save_btn.config(state=("normal" if name == "done" else "disabled")) + + def _advance(self) -> None: + self._idx = min(self._idx + 1, len(self._steps) - 1) + self._refresh() + + def _back(self) -> None: + if self._idx == 0: + return + self._idx -= 1 + if self._markers: + self._canvas.delete(self._markers.pop()) + self._refresh() + + def _skip(self) -> None: + name, _ = self._steps[self._idx] + if name.startswith("color_"): + cname = name.split("_", 1)[1] + self._data["colors"][cname] = {"rgb": [128, 128, 128], "tolerance": 30.0} + self._advance() + + # -------- click handler -------- + def _on_click(self, event) -> None: + name, _ = self._steps[self._idx] + if name == "done": + return + ox = int(event.x / self._scale) + oy = int(event.y / self._scale) + + # draw marker + r = 8 + mk = self._canvas.create_oval( + event.x - r, event.y - r, event.x + r, event.y + r, + outline="#ff3838", width=3, + ) + self._markers.append(mk) + + if name == "dot_roi_tl": + self._tmp["dot_tl"] = (ox, oy) + elif name == "dot_roi_br": + x0, y0 = self._tmp["dot_tl"] + self._data["dot_roi"] = { + "x": min(ox, x0), "y": min(oy, y0), + "w": abs(ox - x0), "h": abs(oy - y0), + } + elif name.startswith("color_"): + cname = name.split("_", 1)[1] + rgb = self._sample_rgb(ox, oy) + tol = 15.0 if cname == "background" else 30.0 + self._data["colors"][cname] = {"rgb": list(rgb), "tolerance": tol} + elif name == "chart_roi_tl": + self._tmp["chart_tl"] = (ox, oy) + elif name == "chart_roi_br": + x0, y0 = self._tmp["chart_tl"] + self._data["chart_roi"] = { + "x": min(ox, x0), "y": min(oy, y0), + "w": abs(ox - x0), "h": abs(oy - y0), + } + elif name in ("y1", "y2"): + from tkinter import simpledialog + price = simpledialog.askfloat( + "Price", f"Enter the price at this y-coordinate (pixel y={oy}):", + parent=self._root, + ) + if price is None: + self._canvas.delete(self._markers.pop()) + return + if name == "y1": + self._data["y_axis"]["p1_y"] = oy + self._data["y_axis"]["p1_price"] = price + else: + if oy == self._data["y_axis"].get("p1_y"): + from tkinter import messagebox + messagebox.showerror("Invalid", "Reference 2 must have a DIFFERENT y-coordinate.") + self._canvas.delete(self._markers.pop()) + return + self._data["y_axis"]["p2_y"] = oy + self._data["y_axis"]["p2_price"] = price + elif name == "canary_tl": + self._tmp["canary_tl"] = (ox, oy) + elif name == "canary_br": + x0, y0 = self._tmp["canary_tl"] + self._data["canary_roi"] = { + "x": min(ox, x0), "y": min(oy, y0), + "w": abs(ox - x0), "h": abs(oy - y0), + } + + self._advance() + + # -------- helpers -------- + def _sample_rgb(self, x: int, y: int) -> tuple[int, int, int]: + """Average RGB of a 7x7 patch centered on (x,y) in the original image.""" + box = (max(0, x - 3), max(0, y - 3), x + 4, y + 4) + pixels = list(self._pil_img.crop(box).getdata()) + n = len(pixels) or 1 + r = sum(p[0] for p in pixels) // n + g = sum(p[1] for p in pixels) // n + b = sum(p[2] for p in pixels) // n + return (r, g, b) + + def _save(self) -> None: + from tkinter import messagebox + missing = [k for k in ("dot_roi", "chart_roi", "canary_roi") if not self._data.get(k)] + if missing: + messagebox.showerror("Incomplete", f"Missing: {', '.join(missing)}") + return + if len(self._data["colors"]) < 7: + messagebox.showerror("Incomplete", "All dot colours are required (background optional).") + return + if "p1_y" not in self._data["y_axis"] or "p2_y" not in self._data["y_axis"]: + messagebox.showerror("Incomplete", "Both y-axis reference points are required.") + return + + # Compute canary baseline phash + import numpy as np + from .vision import phash, crop_roi + from .config import ROI + arr = np.array(self._pil_img)[:, :, ::-1] # RGB → BGR + roi = ROI(**self._data["canary_roi"]) + ph = phash(crop_roi(arr, roi)) + + out = { + "window_title": self._window_title, + "dot_roi": self._data["dot_roi"], + "chart_roi": self._data["chart_roi"], + "colors": self._data["colors"], + "y_axis": self._data["y_axis"], + "canary": { + "roi": self._data["canary_roi"], + "baseline_phash": ph, + "drift_threshold": 8, + }, + } + self._result = out + self._root.destroy() + + def run(self) -> dict: + self._root.mainloop() + if self._result is None: + raise ValueError("Calibration cancelled (window closed before save)") + return self._result diff --git a/src/atm/main.py b/src/atm/main.py index 03995d7..248412d 100644 --- a/src/atm/main.py +++ b/src/atm/main.py @@ -30,7 +30,11 @@ def main(argv=None) -> None: sub.required = True # calibrate - sub.add_parser("calibrate", help="Launch guided calibration wizard (Tk)") + p_cal = sub.add_parser("calibrate", help="Launch guided calibration wizard (Tk)") + p_cal.add_argument( + "--screenshot", type=Path, default=None, metavar="PATH", + help="Use a saved screenshot instead of live window capture (dev/testing)", + ) # label p_label = sub.add_parser("label", help="Label dot-colour samples (Tk)") @@ -91,7 +95,7 @@ def _cmd_calibrate(args) -> None: from atm.calibrate import run_calibration except ImportError as exc: sys.exit(f"calibrate module not available: {exc}") - run_calibration(Path("configs")) + run_calibration(Path("configs"), screenshot=args.screenshot) def _cmd_label(args) -> None: