From 5f87545b66c6b0b4b07bf24ee625e999bd97e7a5 Mon Sep 17 00:00:00 2001 From: Marius Mutu Date: Tue, 21 Apr 2026 07:13:50 +0000 Subject: [PATCH] feat(migrations): add one-shot import_openclaw_jobs_2026-04 script Audit-trail tool that translates OpenClaw's nested jobs.json schema (schedule.expr with optional tz, payload.message, agentId, state) into echo-core's flat schema. UTC -> Europe/Bucharest cron conversion with DST-aware offset; Bucharest-tagged source expressions pass through unchanged. Rewrites `cd ~/clawd` / `/home/moltbot/clawd/` -> echo-core without matching `clawd-archive` or `clawdbot` substrings. Built-in skip list covers night-execute and antfarm/feature-dev/*; YouTube: prefix is auto-skipped. --dry-run, --skip-disabled, --skip, --channel, --source, --target flags. Duplicate job names in target are skipped with a warning; existing entries are preserved. --- .../import_openclaw_jobs_2026-04.py | 458 ++++++++++++++++++ 1 file changed, 458 insertions(+) create mode 100755 tools/migrations/import_openclaw_jobs_2026-04.py diff --git a/tools/migrations/import_openclaw_jobs_2026-04.py b/tools/migrations/import_openclaw_jobs_2026-04.py new file mode 100755 index 0000000..fc52a19 --- /dev/null +++ b/tools/migrations/import_openclaw_jobs_2026-04.py @@ -0,0 +1,458 @@ +#!/usr/bin/env python3 +""" +One-shot migration: translate OpenClaw cron/jobs.json to echo-core schema. + +Dated: 2026-04 +Status: ONE-SHOT tool. Kept in git as an audit artifact for the consolidation. +Restore path: if this needs to be re-run, the original OpenClaw file is at + /home/moltbot/.openclaw/cron/jobs.json + /home/moltbot/.openclaw/cron/jobs.json.bak +and the pre-migration echo-core jobs.json is recoverable from git history +(commit preceding `feat(cron): populate jobs.json with decomposed ...`). + +OpenClaw schema (nested): + { + "id": "", + "agentId": "echo", + "name": "", + "enabled": bool, + "schedule": {"kind": "cron", "expr": "0 6 * * *", "tz": "Europe/Bucharest"?}, + "sessionTarget": "isolated", + "payload": {"kind": "agentTurn", "message": "", "model": "sonnet"?}, + "state": {...}, + ... + } + +Echo-core schema (flat, Claude job): + { + "name": "", + "cron": "", + "channel": "", + "model": "sonnet", + "prompt": "", + "allowed_tools": [], + "enabled": bool, + "last_run": null, "last_status": null, "next_run": null + } + +Echo-core scheduler interprets cron expressions in Europe/Bucharest. OpenClaw +used UTC by default (per its runtime) unless schedule.tz is set explicitly. +This script converts UTC -> Europe/Bucharest for jobs without an explicit tz. + +Usage: + python3 tools/migrations/import_openclaw_jobs_2026-04.py [flags] + +Flags: + --dry-run Print what would change without writing. + --skip-disabled Skip jobs where enabled is false (default: import all). + --skip name1,name2,... Comma-separated list of job names to exclude. + --channel Default channel for imported jobs (default: echo-work). + --source Path to openclaw jobs.json. + --target Path to echo-core jobs.json. + +The script is idempotent with respect to existing jobs: if a job with the same +name is already present in the target, it is skipped with a warning, and the +existing entry is preserved untouched. +""" + +from __future__ import annotations + +import argparse +import json +import re +import sys +from datetime import datetime, timezone +from pathlib import Path +from zoneinfo import ZoneInfo + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +DEFAULT_SOURCE = Path("/home/moltbot/.openclaw/cron/jobs.json") +DEFAULT_TARGET = PROJECT_ROOT / "cron" / "jobs.json" +DEFAULT_CHANNEL = "echo-work" + +BUCHAREST = ZoneInfo("Europe/Bucharest") +UTC = ZoneInfo("UTC") + +# Jobs to skip by default. Anti-foot-gun list for known-dead/bad openclaw jobs. +# Can be extended at invocation time via --skip. +SKIP_BY_DEFAULT: set[str] = { + "night-execute", # SSH to LXC, dead infra + "antfarm/feature-dev/planner", + "antfarm/feature-dev/setup", + "antfarm/feature-dev/developer", + "antfarm/feature-dev/verifier", + "antfarm/feature-dev/tester", + "antfarm/feature-dev/reviewer", +} + +# YouTube:* one-off pinned prompts — always auto-skipped regardless of flags. +YOUTUBE_PREFIX = "YouTube:" + +# Path rewrites applied to prompt bodies. Each pattern is a compiled regex; +# the replacement is a literal string. Order matters — longer/more-specific +# patterns first so the shorter ones don't eat them prematurely. +# +# We use lookahead/boundary tricks so that `clawd-archive`, `clawdbot`, +# `clawd.old`, etc. are NOT matched. `clawd` must be immediately followed +# by `/` (path boundary) or `$` / whitespace (end-of-token). +PATH_REWRITES: list[tuple[re.Pattern[str], str]] = [ + # Absolute path: /home/moltbot/clawd/... -> /home/moltbot/echo-core/... + (re.compile(r"/home/moltbot/clawd(?=/)"), "/home/moltbot/echo-core"), + # Shell form: cd ~/clawd -> cd ~/echo-core (allow trailing & or space) + (re.compile(r"(? cd /home/moltbot/echo-core + (re.compile(r"(? list[int] | None: + """Return sorted list of ints the field expands to, or None if uncertain. + + Handles: "*", "N", "N,M", "N-M", "*/S", "A-B/S", "N,M-P/S". + Returns None for anything we don't recognise (caller should warn and leave + the job for manual review). + """ + # Don't attempt to resolve `*` here — caller handles it per-field since + # the valid range depends on which field it is. + result: set[int] = set() + parts = field.split(",") + for part in parts: + if part == "*": + return None # caller handles + step = 1 + if "/" in part: + base, step_s = part.split("/", 1) + try: + step = int(step_s) + except ValueError: + return None + else: + base = part + if base == "*": + return None + if "-" in base: + try: + lo_s, hi_s = base.split("-", 1) + lo, hi = int(lo_s), int(hi_s) + except ValueError: + return None + for v in range(lo, hi + 1, step): + result.add(v) + else: + try: + result.add(int(base)) + except ValueError: + return None + return sorted(result) + + +def _convert_hour_field(hour_field: str, day_shift_from_utc: int) -> tuple[str, bool]: + """Convert UTC hour field to Bucharest-local hour field. + + Returns (converted_field, approx_ok). approx_ok is False when we + couldn't confidently translate (e.g. odd step that crosses midnight). + The caller should warn if False and present the job for manual review. + + Strategy: if the field is "*" -> "*"; if it expands to a concrete list + of hours, shift each hour by `day_shift_from_utc` (UTC+2 or UTC+3 depending + on DST) modulo 24. If any hour wraps past midnight (which would change the + day-of-week / day-of-month field in a way a simple script can't handle), + flag approx_ok=False. + """ + if hour_field == "*": + return "*", True + + hours = _parse_cron_field(hour_field) + if hours is None: + return hour_field, False + + # Shift and check for day-wrap + day_wrap = False + shifted = [] + for h in hours: + new_h = h + day_shift_from_utc + if new_h >= 24: + new_h -= 24 + day_wrap = True + elif new_h < 0: + new_h += 24 + day_wrap = True + shifted.append(new_h) + + shifted = sorted(set(shifted)) + if not shifted: + return hour_field, False + + # Try to re-compress into a step form if the input looked like A-B/S. + # For simplicity we emit a comma-separated list. APScheduler accepts that. + return ",".join(str(h) for h in shifted), not day_wrap + + +def convert_cron_utc_to_bucharest( + expr: str, + src_tz: str | None, + reference_dt: datetime | None = None, +) -> tuple[str, list[str]]: + """Translate a cron expression from src_tz to Europe/Bucharest. + + If `src_tz == 'Europe/Bucharest'` the expression is returned unchanged. + Otherwise we assume UTC source (OpenClaw's default runtime) and shift the + hour field by the current UTC->Bucharest offset. + + Returns (new_expr, warnings). warnings is a list of human-readable notes; + if non-empty, caller should flag for manual review. + + DST caveat: the offset is evaluated at `reference_dt` (default: now). + Jobs that span DST transitions may need manual tuning. We emit a warning + rather than trying to be clever. + """ + warnings: list[str] = [] + if src_tz == "Europe/Bucharest": + return expr, warnings + + fields = expr.split() + if len(fields) != 5: + warnings.append(f"cron expr does not have 5 fields: {expr!r}") + return expr, warnings + + minute, hour, dom, month, dow = fields + + ref = reference_dt or datetime.now(UTC) + # offset for "what is UTC hour X in Bucharest?" + offset_seconds = int( + ref.replace(tzinfo=UTC).astimezone(BUCHAREST).utcoffset().total_seconds() + ) + # should be +7200 (winter) or +10800 (summer) + shift_hours = offset_seconds // 3600 + + new_hour, ok = _convert_hour_field(hour, shift_hours) + if not ok: + warnings.append( + f"hour field {hour!r} crosses day boundary or is complex — " + "verify day-of-week/day-of-month manually" + ) + + return f"{minute} {new_hour} {dom} {month} {dow}", warnings + + +def rewrite_prompt_paths(text: str) -> tuple[str, list[tuple[str, str]]]: + """Apply path rewrites to a prompt body. + + Returns (new_text, substitutions) where substitutions is a list of + (old_snippet, new_snippet) tuples — every rewrite that was performed. + """ + substitutions: list[tuple[str, str]] = [] + new = text + for pattern, replacement in PATH_REWRITES: + def _sub(match: re.Match[str]) -> str: + old = match.group(0) + substitutions.append((old, replacement)) + return replacement + + new = pattern.sub(_sub, new) + return new, substitutions + + +def translate_job( + oc_job: dict, + default_channel: str, + reference_dt: datetime | None = None, +) -> tuple[dict | None, list[str]]: + """Translate one openclaw job dict to an echo-core job dict. + + Returns (echo_job, warnings). echo_job is None if the job cannot be + translated (e.g. non-cron schedule). + """ + warnings: list[str] = [] + name = oc_job.get("name") or oc_job.get("id") or "" + + sched = oc_job.get("schedule") or {} + if sched.get("kind") != "cron": + warnings.append( + f"job {name!r}: schedule.kind={sched.get('kind')!r} " + "is not 'cron' — skipping (manual review)" + ) + return None, warnings + + expr = sched.get("expr") + if not isinstance(expr, str) or not expr.strip(): + warnings.append(f"job {name!r}: missing/empty schedule.expr — skipping") + return None, warnings + + src_tz = sched.get("tz") + new_expr, tz_warnings = convert_cron_utc_to_bucharest( + expr, src_tz, reference_dt=reference_dt + ) + for w in tz_warnings: + warnings.append(f"job {name!r}: {w}") + + payload = oc_job.get("payload") or {} + prompt = payload.get("message") or "" + new_prompt, subs = rewrite_prompt_paths(prompt) + for old, new in subs: + warnings.append(f"job {name!r}: rewrote {old!r} -> {new!r}") + + model = payload.get("model") or "sonnet" + + # openclaw doesn't track allowedTools in the same way; start with []. + allowed = oc_job.get("allowedTools") or payload.get("allowedTools") or [] + if not isinstance(allowed, list): + allowed = [] + + echo_job = { + "name": name, + "cron": new_expr, + "channel": default_channel, + "model": model, + "prompt": new_prompt, + "allowed_tools": list(allowed), + "enabled": bool(oc_job.get("enabled", False)), + "last_run": None, + "last_status": None, + "next_run": None, + } + return echo_job, warnings + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def load_json(path: Path) -> object: + with path.open("r", encoding="utf-8") as f: + return json.load(f) + + +def _is_skipped(name: str, skip_set: set[str], include_default_skip: bool) -> bool: + if name.startswith(YOUTUBE_PREFIX): + return True + if include_default_skip and name in SKIP_BY_DEFAULT: + return True + if name in skip_set: + return True + return False + + +def run(argv: list[str] | None = None) -> int: + p = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + p.add_argument("--dry-run", action="store_true") + p.add_argument("--skip-disabled", action="store_true") + p.add_argument("--skip", default="", + help="Comma-separated list of additional names to skip.") + p.add_argument("--no-default-skip", action="store_true", + help="Disable the built-in SKIP_BY_DEFAULT list.") + p.add_argument("--channel", default=DEFAULT_CHANNEL, + help=f"Default channel for imported jobs (default: {DEFAULT_CHANNEL}).") + p.add_argument("--source", default=str(DEFAULT_SOURCE)) + p.add_argument("--target", default=str(DEFAULT_TARGET)) + args = p.parse_args(argv) + + source = Path(args.source) + target = Path(args.target) + extra_skip = {s.strip() for s in args.skip.split(",") if s.strip()} + include_default_skip = not args.no_default_skip + + if not source.exists(): + print(f"ERROR: source not found: {source}", file=sys.stderr) + return 2 + + oc_data = load_json(source) + if not isinstance(oc_data, dict) or "jobs" not in oc_data: + print(f"ERROR: source {source} is not a dict with 'jobs' key", + file=sys.stderr) + return 2 + + # load target (may not exist yet) + if target.exists(): + target_jobs = load_json(target) + if not isinstance(target_jobs, list): + print(f"ERROR: target {target} is not a JSON list", file=sys.stderr) + return 2 + else: + target_jobs = [] + + existing_names = {j.get("name") for j in target_jobs} + + ref = datetime.now(UTC) + to_add: list[dict] = [] + summary_lines: list[str] = [] + + for oc_job in oc_data["jobs"]: + name = oc_job.get("name") or oc_job.get("id") or "" + + if _is_skipped(name, extra_skip, include_default_skip): + summary_lines.append(f" SKIP {name:40s} (skip list)") + continue + + if args.skip_disabled and not oc_job.get("enabled", False): + summary_lines.append(f" SKIP {name:40s} (disabled, --skip-disabled)") + continue + + echo_job, warnings = translate_job(oc_job, args.channel, reference_dt=ref) + + if echo_job is None: + for w in warnings: + summary_lines.append(f" WARN {w}") + summary_lines.append(f" SKIP {name:40s} (untranslatable)") + continue + + if echo_job["name"] in existing_names: + summary_lines.append( + f" DUPE {name:40s} (already in target — existing entry preserved)" + ) + continue + + for w in warnings: + summary_lines.append(f" WARN {w}") + + summary_lines.append( + f" ADD {name:40s} cron={echo_job['cron']!r:18s} " + f"enabled={echo_job['enabled']} model={echo_job['model']}" + ) + to_add.append(echo_job) + + # Print summary + print(f"Source: {source}") + print(f"Target: {target}") + print(f"Dry-run: {args.dry_run}") + print(f"Default channel for imports: {args.channel}") + print(f"Existing target jobs: {len(target_jobs)}") + print(f"Source jobs: {len(oc_data['jobs'])}") + print() + print("Per-job decisions:") + for line in summary_lines: + print(line) + print() + print(f"Would add {len(to_add)} new job(s) to target.") + + if args.dry_run: + print("[DRY-RUN] no changes written.") + return 0 + + if not to_add: + print("Nothing to write.") + return 0 + + target_jobs.extend(to_add) + target.parent.mkdir(parents=True, exist_ok=True) + tmp = target.with_suffix(target.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as f: + json.dump(target_jobs, f, indent=2, ensure_ascii=False) + f.write("\n") + tmp.replace(target) + print(f"Wrote {len(target_jobs)} jobs to {target}") + return 0 + + +if __name__ == "__main__": + sys.exit(run())