feat(migrations): add one-shot import_openclaw_jobs_2026-04 script

Audit-trail tool that translates OpenClaw's nested jobs.json schema
(schedule.expr with optional tz, payload.message, agentId, state) into
echo-core's flat schema. UTC -> Europe/Bucharest cron conversion with
DST-aware offset; Bucharest-tagged source expressions pass through
unchanged. Rewrites `cd ~/clawd` / `/home/moltbot/clawd/` -> echo-core
without matching `clawd-archive` or `clawdbot` substrings.

Built-in skip list covers night-execute and antfarm/feature-dev/*; YouTube:
prefix is auto-skipped. --dry-run, --skip-disabled, --skip, --channel,
--source, --target flags. Duplicate job names in target are skipped with
a warning; existing entries are preserved.
This commit is contained in:
2026-04-21 07:13:50 +00:00
parent 67d10c4c9a
commit 5f87545b66

View File

@@ -0,0 +1,458 @@
#!/usr/bin/env python3
"""
One-shot migration: translate OpenClaw cron/jobs.json to echo-core schema.
Dated: 2026-04
Status: ONE-SHOT tool. Kept in git as an audit artifact for the consolidation.
Restore path: if this needs to be re-run, the original OpenClaw file is at
/home/moltbot/.openclaw/cron/jobs.json
/home/moltbot/.openclaw/cron/jobs.json.bak
and the pre-migration echo-core jobs.json is recoverable from git history
(commit preceding `feat(cron): populate jobs.json with decomposed ...`).
OpenClaw schema (nested):
{
"id": "<uuid>",
"agentId": "echo",
"name": "<name>",
"enabled": bool,
"schedule": {"kind": "cron", "expr": "0 6 * * *", "tz": "Europe/Bucharest"?},
"sessionTarget": "isolated",
"payload": {"kind": "agentTurn", "message": "<prompt>", "model": "sonnet"?},
"state": {...},
...
}
Echo-core schema (flat, Claude job):
{
"name": "<name>",
"cron": "<expr, Bucharest local>",
"channel": "<channel name>",
"model": "sonnet",
"prompt": "<prompt, path-rewritten>",
"allowed_tools": [],
"enabled": bool,
"last_run": null, "last_status": null, "next_run": null
}
Echo-core scheduler interprets cron expressions in Europe/Bucharest. OpenClaw
used UTC by default (per its runtime) unless schedule.tz is set explicitly.
This script converts UTC -> Europe/Bucharest for jobs without an explicit tz.
Usage:
python3 tools/migrations/import_openclaw_jobs_2026-04.py [flags]
Flags:
--dry-run Print what would change without writing.
--skip-disabled Skip jobs where enabled is false (default: import all).
--skip name1,name2,... Comma-separated list of job names to exclude.
--channel <name> Default channel for imported jobs (default: echo-work).
--source <path> Path to openclaw jobs.json.
--target <path> Path to echo-core jobs.json.
The script is idempotent with respect to existing jobs: if a job with the same
name is already present in the target, it is skipped with a warning, and the
existing entry is preserved untouched.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
PROJECT_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_SOURCE = Path("/home/moltbot/.openclaw/cron/jobs.json")
DEFAULT_TARGET = PROJECT_ROOT / "cron" / "jobs.json"
DEFAULT_CHANNEL = "echo-work"
BUCHAREST = ZoneInfo("Europe/Bucharest")
UTC = ZoneInfo("UTC")
# Jobs to skip by default. Anti-foot-gun list for known-dead/bad openclaw jobs.
# Can be extended at invocation time via --skip.
SKIP_BY_DEFAULT: set[str] = {
"night-execute", # SSH to LXC, dead infra
"antfarm/feature-dev/planner",
"antfarm/feature-dev/setup",
"antfarm/feature-dev/developer",
"antfarm/feature-dev/verifier",
"antfarm/feature-dev/tester",
"antfarm/feature-dev/reviewer",
}
# YouTube:* one-off pinned prompts — always auto-skipped regardless of flags.
YOUTUBE_PREFIX = "YouTube:"
# Path rewrites applied to prompt bodies. Each pattern is a compiled regex;
# the replacement is a literal string. Order matters — longer/more-specific
# patterns first so the shorter ones don't eat them prematurely.
#
# We use lookahead/boundary tricks so that `clawd-archive`, `clawdbot`,
# `clawd.old`, etc. are NOT matched. `clawd` must be immediately followed
# by `/` (path boundary) or `$` / whitespace (end-of-token).
PATH_REWRITES: list[tuple[re.Pattern[str], str]] = [
# Absolute path: /home/moltbot/clawd/... -> /home/moltbot/echo-core/...
(re.compile(r"/home/moltbot/clawd(?=/)"), "/home/moltbot/echo-core"),
# Shell form: cd ~/clawd -> cd ~/echo-core (allow trailing & or space)
(re.compile(r"(?<![\w-])cd\s+~/clawd(?![\w/-])"), "cd ~/echo-core"),
# Shell form: cd /home/moltbot/clawd -> cd /home/moltbot/echo-core
(re.compile(r"(?<![\w-])cd\s+/home/moltbot/clawd(?![\w/-])"),
"cd /home/moltbot/echo-core"),
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_cron_field(field: str) -> list[int] | None:
"""Return sorted list of ints the field expands to, or None if uncertain.
Handles: "*", "N", "N,M", "N-M", "*/S", "A-B/S", "N,M-P/S".
Returns None for anything we don't recognise (caller should warn and leave
the job for manual review).
"""
# Don't attempt to resolve `*` here — caller handles it per-field since
# the valid range depends on which field it is.
result: set[int] = set()
parts = field.split(",")
for part in parts:
if part == "*":
return None # caller handles
step = 1
if "/" in part:
base, step_s = part.split("/", 1)
try:
step = int(step_s)
except ValueError:
return None
else:
base = part
if base == "*":
return None
if "-" in base:
try:
lo_s, hi_s = base.split("-", 1)
lo, hi = int(lo_s), int(hi_s)
except ValueError:
return None
for v in range(lo, hi + 1, step):
result.add(v)
else:
try:
result.add(int(base))
except ValueError:
return None
return sorted(result)
def _convert_hour_field(hour_field: str, day_shift_from_utc: int) -> tuple[str, bool]:
"""Convert UTC hour field to Bucharest-local hour field.
Returns (converted_field, approx_ok). approx_ok is False when we
couldn't confidently translate (e.g. odd step that crosses midnight).
The caller should warn if False and present the job for manual review.
Strategy: if the field is "*" -> "*"; if it expands to a concrete list
of hours, shift each hour by `day_shift_from_utc` (UTC+2 or UTC+3 depending
on DST) modulo 24. If any hour wraps past midnight (which would change the
day-of-week / day-of-month field in a way a simple script can't handle),
flag approx_ok=False.
"""
if hour_field == "*":
return "*", True
hours = _parse_cron_field(hour_field)
if hours is None:
return hour_field, False
# Shift and check for day-wrap
day_wrap = False
shifted = []
for h in hours:
new_h = h + day_shift_from_utc
if new_h >= 24:
new_h -= 24
day_wrap = True
elif new_h < 0:
new_h += 24
day_wrap = True
shifted.append(new_h)
shifted = sorted(set(shifted))
if not shifted:
return hour_field, False
# Try to re-compress into a step form if the input looked like A-B/S.
# For simplicity we emit a comma-separated list. APScheduler accepts that.
return ",".join(str(h) for h in shifted), not day_wrap
def convert_cron_utc_to_bucharest(
expr: str,
src_tz: str | None,
reference_dt: datetime | None = None,
) -> tuple[str, list[str]]:
"""Translate a cron expression from src_tz to Europe/Bucharest.
If `src_tz == 'Europe/Bucharest'` the expression is returned unchanged.
Otherwise we assume UTC source (OpenClaw's default runtime) and shift the
hour field by the current UTC->Bucharest offset.
Returns (new_expr, warnings). warnings is a list of human-readable notes;
if non-empty, caller should flag for manual review.
DST caveat: the offset is evaluated at `reference_dt` (default: now).
Jobs that span DST transitions may need manual tuning. We emit a warning
rather than trying to be clever.
"""
warnings: list[str] = []
if src_tz == "Europe/Bucharest":
return expr, warnings
fields = expr.split()
if len(fields) != 5:
warnings.append(f"cron expr does not have 5 fields: {expr!r}")
return expr, warnings
minute, hour, dom, month, dow = fields
ref = reference_dt or datetime.now(UTC)
# offset for "what is UTC hour X in Bucharest?"
offset_seconds = int(
ref.replace(tzinfo=UTC).astimezone(BUCHAREST).utcoffset().total_seconds()
)
# should be +7200 (winter) or +10800 (summer)
shift_hours = offset_seconds // 3600
new_hour, ok = _convert_hour_field(hour, shift_hours)
if not ok:
warnings.append(
f"hour field {hour!r} crosses day boundary or is complex — "
"verify day-of-week/day-of-month manually"
)
return f"{minute} {new_hour} {dom} {month} {dow}", warnings
def rewrite_prompt_paths(text: str) -> tuple[str, list[tuple[str, str]]]:
"""Apply path rewrites to a prompt body.
Returns (new_text, substitutions) where substitutions is a list of
(old_snippet, new_snippet) tuples — every rewrite that was performed.
"""
substitutions: list[tuple[str, str]] = []
new = text
for pattern, replacement in PATH_REWRITES:
def _sub(match: re.Match[str]) -> str:
old = match.group(0)
substitutions.append((old, replacement))
return replacement
new = pattern.sub(_sub, new)
return new, substitutions
def translate_job(
oc_job: dict,
default_channel: str,
reference_dt: datetime | None = None,
) -> tuple[dict | None, list[str]]:
"""Translate one openclaw job dict to an echo-core job dict.
Returns (echo_job, warnings). echo_job is None if the job cannot be
translated (e.g. non-cron schedule).
"""
warnings: list[str] = []
name = oc_job.get("name") or oc_job.get("id") or "<unnamed>"
sched = oc_job.get("schedule") or {}
if sched.get("kind") != "cron":
warnings.append(
f"job {name!r}: schedule.kind={sched.get('kind')!r} "
"is not 'cron' — skipping (manual review)"
)
return None, warnings
expr = sched.get("expr")
if not isinstance(expr, str) or not expr.strip():
warnings.append(f"job {name!r}: missing/empty schedule.expr — skipping")
return None, warnings
src_tz = sched.get("tz")
new_expr, tz_warnings = convert_cron_utc_to_bucharest(
expr, src_tz, reference_dt=reference_dt
)
for w in tz_warnings:
warnings.append(f"job {name!r}: {w}")
payload = oc_job.get("payload") or {}
prompt = payload.get("message") or ""
new_prompt, subs = rewrite_prompt_paths(prompt)
for old, new in subs:
warnings.append(f"job {name!r}: rewrote {old!r} -> {new!r}")
model = payload.get("model") or "sonnet"
# openclaw doesn't track allowedTools in the same way; start with [].
allowed = oc_job.get("allowedTools") or payload.get("allowedTools") or []
if not isinstance(allowed, list):
allowed = []
echo_job = {
"name": name,
"cron": new_expr,
"channel": default_channel,
"model": model,
"prompt": new_prompt,
"allowed_tools": list(allowed),
"enabled": bool(oc_job.get("enabled", False)),
"last_run": None,
"last_status": None,
"next_run": None,
}
return echo_job, warnings
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def load_json(path: Path) -> object:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _is_skipped(name: str, skip_set: set[str], include_default_skip: bool) -> bool:
if name.startswith(YOUTUBE_PREFIX):
return True
if include_default_skip and name in SKIP_BY_DEFAULT:
return True
if name in skip_set:
return True
return False
def run(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(description=__doc__.splitlines()[0])
p.add_argument("--dry-run", action="store_true")
p.add_argument("--skip-disabled", action="store_true")
p.add_argument("--skip", default="",
help="Comma-separated list of additional names to skip.")
p.add_argument("--no-default-skip", action="store_true",
help="Disable the built-in SKIP_BY_DEFAULT list.")
p.add_argument("--channel", default=DEFAULT_CHANNEL,
help=f"Default channel for imported jobs (default: {DEFAULT_CHANNEL}).")
p.add_argument("--source", default=str(DEFAULT_SOURCE))
p.add_argument("--target", default=str(DEFAULT_TARGET))
args = p.parse_args(argv)
source = Path(args.source)
target = Path(args.target)
extra_skip = {s.strip() for s in args.skip.split(",") if s.strip()}
include_default_skip = not args.no_default_skip
if not source.exists():
print(f"ERROR: source not found: {source}", file=sys.stderr)
return 2
oc_data = load_json(source)
if not isinstance(oc_data, dict) or "jobs" not in oc_data:
print(f"ERROR: source {source} is not a dict with 'jobs' key",
file=sys.stderr)
return 2
# load target (may not exist yet)
if target.exists():
target_jobs = load_json(target)
if not isinstance(target_jobs, list):
print(f"ERROR: target {target} is not a JSON list", file=sys.stderr)
return 2
else:
target_jobs = []
existing_names = {j.get("name") for j in target_jobs}
ref = datetime.now(UTC)
to_add: list[dict] = []
summary_lines: list[str] = []
for oc_job in oc_data["jobs"]:
name = oc_job.get("name") or oc_job.get("id") or "<unnamed>"
if _is_skipped(name, extra_skip, include_default_skip):
summary_lines.append(f" SKIP {name:40s} (skip list)")
continue
if args.skip_disabled and not oc_job.get("enabled", False):
summary_lines.append(f" SKIP {name:40s} (disabled, --skip-disabled)")
continue
echo_job, warnings = translate_job(oc_job, args.channel, reference_dt=ref)
if echo_job is None:
for w in warnings:
summary_lines.append(f" WARN {w}")
summary_lines.append(f" SKIP {name:40s} (untranslatable)")
continue
if echo_job["name"] in existing_names:
summary_lines.append(
f" DUPE {name:40s} (already in target — existing entry preserved)"
)
continue
for w in warnings:
summary_lines.append(f" WARN {w}")
summary_lines.append(
f" ADD {name:40s} cron={echo_job['cron']!r:18s} "
f"enabled={echo_job['enabled']} model={echo_job['model']}"
)
to_add.append(echo_job)
# Print summary
print(f"Source: {source}")
print(f"Target: {target}")
print(f"Dry-run: {args.dry_run}")
print(f"Default channel for imports: {args.channel}")
print(f"Existing target jobs: {len(target_jobs)}")
print(f"Source jobs: {len(oc_data['jobs'])}")
print()
print("Per-job decisions:")
for line in summary_lines:
print(line)
print()
print(f"Would add {len(to_add)} new job(s) to target.")
if args.dry_run:
print("[DRY-RUN] no changes written.")
return 0
if not to_add:
print("Nothing to write.")
return 0
target_jobs.extend(to_add)
target.parent.mkdir(parents=True, exist_ok=True)
tmp = target.with_suffix(target.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(target_jobs, f, indent=2, ensure_ascii=False)
f.write("\n")
tmp.replace(target)
print(f"Wrote {len(target_jobs)} jobs to {target}")
return 0
if __name__ == "__main__":
sys.exit(run())