Files
ROMFASTSQL/scripts/convert_to_10g.py
Marius 29e88631ba feat(scripts): add PACK_CONTAFIN Oracle 10g converter
Script Python + bat care converteste automat FORALL/BULK_ROWCOUNT
din PACK_CONTAFIN.pck in FOR LOOP compatibil Oracle 10g.
Include pre/post validare, scriere atomica si diff afisare.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-09 15:19:36 +02:00

365 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Convert PACK_CONTAFIN.pck from Oracle 21c (FORALL/BULK) to Oracle 10g compatible syntax.
Replaces 3 FORALL blocks in SCRIE_JC_2007 with equivalent FOR LOOPs:
T1: FORALL UPDATE -> FOR LOOP UPDATE
T2: FOR + SQL%BULK_ROWCOUNT -> inline SQL%ROWCOUNT (folds into T1's loop)
T3: FORALL INSERT -> FOR LOOP INSERT
T4: add END LOOP after last INSERT value
T5: FORALL DELETE -> FOR LOOP DELETE
T6: add END LOOP after last DELETE condition
Exit codes: 0=ok, 1=validation failed, 2=IO error
---------------------------------------------------------------------------
UTILIZARE / USAGE
---------------------------------------------------------------------------
Linux / WSL:
python3 scripts/convert_to_10g.py # conversie normala
python3 scripts/convert_to_10g.py --dry-run # doar arata diff, nu scrie
python3 scripts/convert_to_10g.py --no-backup # fara fisier .bak
python3 scripts/convert_to_10g.py --input alt.pck --output out.pck
Windows (din radacina proiectului):
scripts\\convert_to_10g.bat
scripts\\convert_to_10g.bat --dry-run
scripts\\convert_to_10g.bat --input alt.pck --output out.pck
Argumente optionale:
--dry-run Afiseaza diff-ul fara sa scrie nimic pe disk
--no-backup Nu crea fisierul <output>.bak inainte de scriere
--input FILE Fisier sursa (default: input/PACK_CONTAFIN.pck)
--output FILE Fisier destinatie (default: input/PACK_CONTAFIN_ORACLE_10G.pck)
---------------------------------------------------------------------------
"""
import argparse
import difflib
import os
import sys
import tempfile
# ---------------------------------------------------------------------------
# Transformations (exact string, \n normalized — CRLF handled at I/O layer)
# ---------------------------------------------------------------------------
TRANSFORMS = [
{
"id": "T1",
"desc": "FORALL UPDATE -> FOR LOOP UPDATE",
"old": (
" -- BULK UPDATE all rows at once using FORALL\n"
" FORALL i IN 1 .. S.COUNT\n"
" UPDATE JC2007 J"
),
"new": (
" -- UPDATE all rows using FOR loop (Oracle 10g compatible)\n"
" FOR i IN 1 .. S.COUNT LOOP\n"
" UPDATE JC2007 J"
),
"expected_count": 1,
},
{
"id": "T2",
"desc": "FOR + SQL%BULK_ROWCOUNT -> inline SQL%ROWCOUNT",
"old": (
" AND NVL(J.ID_SUCURSALA, -99) = NVL(S(i).ID_SUCURSALA, -99);\n"
"\n"
" -- Populate SI and SD collections for INSERT and DELETE\n"
" FOR i IN 1 .. S.COUNT LOOP\n"
" IF SQL%BULK_ROWCOUNT(i) = 0 THEN"
),
"new": (
" AND NVL(J.ID_SUCURSALA, -99) = NVL(S(i).ID_SUCURSALA, -99);\n"
"\n"
" -- Check result of UPDATE and populate SI/SD collections\n"
" IF SQL%ROWCOUNT = 0 THEN"
),
"expected_count": 1,
},
{
"id": "T3",
"desc": "FORALL INSERT -> FOR LOOP INSERT",
"old": (
" -- BULK INSERT for new rows\n"
" IF SI.COUNT > 0 THEN\n"
" FORALL i IN 1 .. SI.COUNT\n"
" INSERT INTO JC2007"
),
"new": (
" -- INSERT for new rows (Oracle 10g compatible)\n"
" IF SI.COUNT > 0 THEN\n"
" FOR i IN 1 .. SI.COUNT LOOP\n"
" INSERT INTO JC2007"
),
"expected_count": 1,
},
{
"id": "T4",
"desc": "Add END LOOP after last INSERT value",
"old": (
" SI(i).ID_SUCURSALA);\n"
" END IF;\n"
"\n"
" -- BULK DELETE for rows where all values became 0 after UPDATE"
),
"new": (
" SI(i).ID_SUCURSALA);\n"
" END LOOP;\n"
" END IF;\n"
"\n"
" -- DELETE for rows where all values became 0 after UPDATE (Oracle 10g compatible)"
),
"expected_count": 1,
},
{
"id": "T5",
"desc": "FORALL DELETE -> FOR LOOP DELETE",
"old": (
" FORALL i IN 1 .. SD.COUNT\n"
" DELETE FROM JC2007 J"
),
"new": (
" FOR i IN 1 .. SD.COUNT LOOP\n"
" DELETE FROM JC2007 J"
),
"expected_count": 1,
},
{
"id": "T6",
"desc": "Add END LOOP after last DELETE condition",
"old": (
" AND J.ROTN100 = 0;\n"
" END IF;"
),
"new": (
" AND J.ROTN100 = 0;\n"
" END LOOP;\n"
" END IF;"
),
"expected_count": 1,
},
]
# ---------------------------------------------------------------------------
# I/O helpers
# ---------------------------------------------------------------------------
def load_file(path):
"""Load file, detect CRLF, return (text_with_LF, used_crlf, encoding_used)."""
for enc in ("utf-8-sig", "utf-8", "cp1250"):
try:
with open(path, "rb") as fh:
raw = fh.read()
text = raw.decode(enc)
used_crlf = b"\r\n" in raw
# Normalize to LF for processing
text = text.replace("\r\n", "\n").replace("\r", "\n")
return text, used_crlf, enc
except (UnicodeDecodeError, LookupError):
continue
print(f"ERROR: Cannot decode {path} with utf-8-sig / utf-8 / cp1250", file=sys.stderr)
sys.exit(2)
def write_atomic(path, text, used_crlf, encoding, no_backup):
"""Write atomically via temp file; optionally keep a .bak first."""
if used_crlf:
text = text.replace("\n", "\r\n")
encoded = text.encode(encoding if encoding != "utf-8-sig" else "utf-8")
# Backup existing output
if not no_backup and os.path.exists(path):
bak = path + ".bak"
try:
import shutil
shutil.copy2(path, bak)
print(f" Backup: {bak}")
except OSError as exc:
print(f"WARNING: Could not create backup: {exc}", file=sys.stderr)
# Atomic write via temp file in same directory
dir_ = os.path.dirname(os.path.abspath(path))
fd, tmp = tempfile.mkstemp(dir=dir_, suffix=".tmp")
try:
with os.fdopen(fd, "wb") as fh:
fh.write(encoded)
os.replace(tmp, path)
except OSError as exc:
try:
os.unlink(tmp)
except OSError:
pass
print(f"ERROR writing {path}: {exc}", file=sys.stderr)
sys.exit(2)
# ---------------------------------------------------------------------------
# Validation phases
# ---------------------------------------------------------------------------
def count_forall(text):
"""Count FORALL keyword occurrences that are actual statements (not comments)."""
count = 0
for line in text.splitlines():
stripped = line.lstrip()
if stripped.upper().startswith("FORALL "):
count += 1
return count
def phase1_validate(text):
"""Verify all patterns exist with expected counts. Returns True or exits."""
ok = True
for t in TRANSFORMS:
found = text.count(t["old"])
if found != t["expected_count"]:
print(
f" FAIL [{t['id']}] {t['desc']}: "
f"expected {t['expected_count']} occurrence(s), found {found}"
)
ok = False
else:
print(f" OK [{t['id']}] {t['desc']}: found {found}x")
return ok
def phase3_validate(text):
"""Verify no FORALL statements remain. Returns True or prints error."""
remaining = count_forall(text)
if remaining != 0:
print(f" FAIL: {remaining} FORALL statement(s) remain in output!")
return False
print(" OK No FORALL statements in output.")
return True
# ---------------------------------------------------------------------------
# Transforms
# ---------------------------------------------------------------------------
def apply_transforms(text):
"""Apply all 6 transformations sequentially."""
for t in TRANSFORMS:
text = text.replace(t["old"], t["new"], 1)
return text
# ---------------------------------------------------------------------------
# Diff display
# ---------------------------------------------------------------------------
def show_diff(original, converted, src_label, dst_label):
diff = list(
difflib.unified_diff(
original.splitlines(keepends=True),
converted.splitlines(keepends=True),
fromfile=src_label,
tofile=dst_label,
n=3,
)
)
if not diff:
print(" (no differences)")
return
print(f" {len(diff)} diff lines")
print()
for line in diff:
sys.stdout.write(line)
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def parse_args():
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--input",
default=os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"input",
"PACK_CONTAFIN.pck",
),
help="Source file (default: input/PACK_CONTAFIN.pck)",
)
p.add_argument(
"--output",
default=None,
help="Output file (default: input/PACK_CONTAFIN_ORACLE_10G.pck)",
)
p.add_argument("--dry-run", action="store_true", help="Show diff only, do not write")
p.add_argument("--no-backup", action="store_true", help="Skip .bak creation")
return p.parse_args()
def main():
args = parse_args()
input_path = args.input
output_path = args.output or os.path.join(
os.path.dirname(input_path), "PACK_CONTAFIN_ORACLE_10G.pck"
)
print(f"Input: {input_path}")
print(f"Output: {output_path}")
print()
# Load
try:
original, used_crlf, encoding = load_file(input_path)
except FileNotFoundError:
print(f"ERROR: Input file not found: {input_path}", file=sys.stderr)
sys.exit(2)
forall_count = count_forall(original)
print(f"Input FORALL statements: {forall_count} (encoding={encoding}, crlf={used_crlf})")
print()
# Idempotency check
if forall_count == 0:
print("Already converted (0 FORALL statements). Nothing to do.")
sys.exit(0)
# Phase 1 — pre-validate
print("Phase 1: Pre-validate patterns")
if not phase1_validate(original):
print()
print("ERROR: One or more patterns not found. Aborting without changes.")
sys.exit(1)
print()
# Apply transforms
converted = apply_transforms(original)
# Phase 3 — post-validate
print("Phase 3: Post-validate output")
if not phase3_validate(converted):
print()
print("ERROR: FORALL statements remain after conversion. Aborting.")
sys.exit(1)
print()
# Show diff
print("Diff:")
show_diff(original, converted, input_path, output_path)
if args.dry_run:
print("Dry-run mode: no files written.")
sys.exit(0)
# Write
write_atomic(output_path, converted, used_crlf, encoding, args.no_backup)
print(f"Written: {output_path}")
print("Done.")
sys.exit(0)
if __name__ == "__main__":
main()