From dcc50425863ad00ee2dd355cf1fe89810651fec3 Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Fri, 26 Jun 2026 09:59:41 +0000 Subject: [PATCH] feat(maintenance): guard DB + log growth (Option B + daily prune + rotation) Root cause of the 2GB prod import.db: the sync_run_orders audit junction recorded every order on every run; under the 1-minute scheduler ~98% of 21.7M rows were no-op ALREADY_IMPORTED re-observations. NSSM stdout/stderr also grew unbounded (rotation never applied to the live service). Changes: - sqlite_service: skip ALREADY_IMPORTED rows in sync_run_orders (write-side guard, _SKIP_JUNCTION_STATUSES); add prune_sync_history(retention_days) with incremental_vacuum. - maintenance_service (new): cleanup_old_logs + run_daily_maintenance. - scheduler_service: start_maintenance_job (daily CronTrigger). - main.py: RotatingFileHandler (sync_comenzi_current.log, 10MB x5) instead of a new timestamped file per start; schedule daily maintenance + one-shot catch-up at startup. - scripts/db_maintenance.py (new): one-shot prune + VACUUM + log cleanup, plain sqlite3, invoked by deploy.ps1 while the service is stopped. - deploy.ps1: stop -> run db_maintenance.py -> (re)apply NSSM AppRotate* idempotently -> start, so rotation reaches pre-existing services. Retention defaults: 7 days history, 7 days logs. Co-Authored-By: Claude Opus 4.8 (1M context) --- api/app/main.py | 20 ++++- api/app/services/maintenance_service.py | 75 +++++++++++++++++ api/app/services/scheduler_service.py | 26 ++++++ api/app/services/sqlite_service.py | 100 +++++++++++++++++++---- deploy.ps1 | 37 ++++++--- scripts/db_maintenance.py | 102 ++++++++++++++++++++++++ 6 files changed, 331 insertions(+), 29 deletions(-) create mode 100644 api/app/services/maintenance_service.py create mode 100644 scripts/db_maintenance.py diff --git a/api/app/main.py b/api/app/main.py index d4ae30e..fb043bc 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -1,9 +1,10 @@ +import asyncio from contextlib import asynccontextmanager -from datetime import datetime from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from pathlib import Path import logging +import logging.handlers import os from .config import settings @@ -19,8 +20,12 @@ _stream_handler.setFormatter(_formatter) _log_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'logs') os.makedirs(_log_dir, exist_ok=True) -_log_filename = f"sync_comenzi_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" -_file_handler = logging.FileHandler(os.path.join(_log_dir, _log_filename), encoding='utf-8') +# Rotating handler (10MB x 5 backups) instead of a new timestamped file per +# start — caps log growth and stops file proliferation across restarts. Fixed +# name still matches the QA glob `sync_comenzi_*.log`. +_file_handler = logging.handlers.RotatingFileHandler( + os.path.join(_log_dir, "sync_comenzi_current.log"), + maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8') _file_handler.setFormatter(_formatter) _root_logger = logging.getLogger() @@ -54,6 +59,15 @@ async def lifespan(app: FastAPI): except Exception: pass + # Daily DB/log maintenance (prune audit history + cleanup old logs) + a + # one-shot catch-up so a long-down service reclaims immediately on start. + try: + from .services import maintenance_service + scheduler_service.start_maintenance_job() + asyncio.create_task(maintenance_service.run_daily_maintenance()) + except Exception as e: + logger.warning(f"Maintenance scheduling failed: {e}") + logger.info("GoMag Import Manager started") yield diff --git a/api/app/services/maintenance_service.py b/api/app/services/maintenance_service.py new file mode 100644 index 0000000..cc03613 --- /dev/null +++ b/api/app/services/maintenance_service.py @@ -0,0 +1,75 @@ +"""Periodic maintenance: prune audit history + clean up old log files. + +Keeps the SQLite DB and the logs/ directory from growing unbounded. The audit +tables (sync_runs, sync_run_orders) were the only DB growth source under the +1-minute scheduler; business tables (orders, order_items) are never touched. + +The one-shot heavy reclaim (full VACUUM, run while the service is stopped) lives +in scripts/db_maintenance.py and is invoked by deploy.ps1. +""" +import logging +import os +import time + +logger = logging.getLogger(__name__) + +DEFAULT_HISTORY_RETENTION_DAYS = 7 +DEFAULT_LOG_RETENTION_DAYS = 7 + + +def _logs_dir() -> str: + """Absolute path to the repo-root logs/ directory (matches main.py).""" + here = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(os.path.abspath(os.path.join(here, "..", "..", "..")), "logs") + + +def cleanup_old_logs(retention_days: int = DEFAULT_LOG_RETENTION_DAYS, + log_dir: str | None = None) -> int: + """Delete log files older than `retention_days`. Returns count removed. + + Targets any file with `.log` in its name (covers `sync_comenzi_current.log`, + NSSM `service_stdout.log`, and rotated backups like `*.log.3`). The live + rotating files stay fresh (recent mtime) so they fall inside the window. + """ + log_dir = log_dir or _logs_dir() + if not os.path.isdir(log_dir): + return 0 + cutoff = time.time() - retention_days * 86400 + removed = 0 + for name in os.listdir(log_dir): + if ".log" not in name: + continue + path = os.path.join(log_dir, name) + try: + if os.path.isfile(path) and os.path.getmtime(path) < cutoff: + os.remove(path) + removed += 1 + except OSError as e: + logger.warning(f"cleanup_old_logs: could not remove {name}: {e}") + if removed: + logger.info(f"cleanup_old_logs: removed {removed} file(s) older than " + f"{retention_days}d from {log_dir}") + return removed + + +async def run_daily_maintenance( + history_days: int = DEFAULT_HISTORY_RETENTION_DAYS, + log_days: int = DEFAULT_LOG_RETENTION_DAYS) -> dict: + """Daily job: prune audit history (+reclaim pages) and clean old log files. + + Each step is isolated — a failure in one does not skip the other. + """ + from . import sqlite_service + + result: dict = {} + try: + result["db"] = await sqlite_service.prune_sync_history(history_days) + except Exception as e: + logger.warning(f"run_daily_maintenance: prune_sync_history failed: {e}") + result["db_error"] = str(e) + try: + result["logs_removed"] = cleanup_old_logs(log_days) + except Exception as e: + logger.warning(f"run_daily_maintenance: cleanup_old_logs failed: {e}") + result["logs_error"] = str(e) + return result diff --git a/api/app/services/scheduler_service.py b/api/app/services/scheduler_service.py index 7a54f82..1a20d18 100644 --- a/api/app/services/scheduler_service.py +++ b/api/app/services/scheduler_service.py @@ -1,6 +1,7 @@ import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.cron import CronTrigger logger = logging.getLogger(__name__) @@ -42,6 +43,31 @@ def start_scheduler(interval_minutes: int = 10): logger.info(f"Scheduler started with interval {interval_minutes}min") +def start_maintenance_job(hour: int = 3): + """Schedule the daily DB/log maintenance job (prune history + cleanup logs). + + Runs independently of the sync job — starts the scheduler if it isn't already + running so maintenance happens even when auto-sync is disabled. + """ + if _scheduler is None: + init_scheduler() + + from . import maintenance_service + + _scheduler.add_job( + maintenance_service.run_daily_maintenance, + trigger=CronTrigger(hour=hour, minute=0), + id="maintenance_job", + name="Daily DB/Log Maintenance", + replace_existing=True + ) + + if not _scheduler.running: + _scheduler.start() + + logger.info(f"Maintenance job scheduled daily at {hour:02d}:00") + + def stop_scheduler(): """Stop the scheduler.""" global _is_running diff --git a/api/app/services/sqlite_service.py b/api/app/services/sqlite_service.py index ce5e02d..f9d1cf3 100644 --- a/api/app/services/sqlite_service.py +++ b/api/app/services/sqlite_service.py @@ -2,7 +2,7 @@ import json import logging import logging.handlers import os -from datetime import datetime +from datetime import datetime, timedelta from zoneinfo import ZoneInfo from ..database import get_sqlite, get_sqlite_sync from ..constants import OrderStatus @@ -114,6 +114,45 @@ async def update_sync_run(run_id: str, status: str, total_orders: int = 0, await db.close() +async def prune_sync_history(retention_days: int = 7) -> dict: + """Delete sync_runs + sync_run_orders older than `retention_days`. + + Audit-only tables — `orders`/`order_items` (business data) are never touched. + Frees pages via incremental_vacuum (prod DB is auto_vacuum=INCREMENTAL after + the initial reclaim). Returns counts for logging. See _SKIP_JUNCTION_STATUSES + for the complementary write-side guard. + """ + cutoff = (datetime.now(_tz_bucharest).replace(tzinfo=None) + - timedelta(days=retention_days)).strftime("%Y-%m-%d") + db = await get_sqlite() + try: + cur = await db.execute( + "DELETE FROM sync_run_orders WHERE sync_run_id IN " + "(SELECT run_id FROM sync_runs WHERE substr(started_at,1,10) < ?)", + (cutoff,)) + junction_deleted = cur.rowcount + cur = await db.execute( + "DELETE FROM sync_runs WHERE substr(started_at,1,10) < ?", (cutoff,)) + runs_deleted = cur.rowcount + # Drop phase-failure rows orphaned by the run deletion. + await db.execute( + "DELETE FROM sync_phase_failures " + "WHERE run_id NOT IN (SELECT run_id FROM sync_runs)") + await db.commit() + try: + await db.execute("PRAGMA incremental_vacuum") + await db.commit() + except Exception as e: # auto_vacuum may be OFF on a fresh dev DB + logger.debug(f"prune_sync_history: incremental_vacuum skipped: {e}") + logger.info( + f"prune_sync_history: cutoff<{cutoff} runs_deleted={runs_deleted} " + f"junction_deleted={junction_deleted}") + return {"cutoff": cutoff, "runs_deleted": runs_deleted, + "junction_deleted": junction_deleted} + finally: + await db.close() + + async def upsert_order(sync_run_id: str, order_number: str, order_date: str, customer_name: str, status: str, id_comanda: int = None, id_partener: int = None, error_message: str = None, @@ -171,8 +210,28 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str, await db.close() +# Audit junction policy (DB-size guard): +# The sync_run_orders junction recorded EVERY order seen on EVERY run. Under the +# 1-minute scheduler, ~98% of rows were no-op ALREADY_IMPORTED re-observations, +# which grew the table to 21M+ rows / 2GB. We no longer record those: the order's +# current state still lives in `orders`; the junction now only lists orders a run +# actually touched (new / changed / skipped / errored / cancelled). Run-detail +# views therefore show only meaningful orders per run. +_SKIP_JUNCTION_STATUSES = {OrderStatus.ALREADY_IMPORTED.value} + + +def _record_in_junction(status_at_run: str) -> bool: + """Whether this per-run status is worth persisting in sync_run_orders.""" + return status_at_run not in _SKIP_JUNCTION_STATUSES + + async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str): - """Record that this run processed this order (junction table).""" + """Record that this run processed this order (junction table). + + No-op ALREADY_IMPORTED observations are skipped — see _SKIP_JUNCTION_STATUSES. + """ + if not _record_in_junction(status_at_run): + return db = await get_sqlite() try: await db.execute(""" @@ -258,10 +317,16 @@ async def _insert_orders_only(db, orders: list[dict]): if not orders: return await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders]) - await db.executemany( - "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", - [(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) for d in orders], - ) + junction_rows = [ + (d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) + for d in orders + if _record_in_junction(d.get("status_at_run", d["status"])) + ] + if junction_rows: + await db.executemany( + "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", + junction_rows, + ) async def _insert_valid_batch(db, orders: list[dict]): @@ -273,10 +338,16 @@ async def _insert_valid_batch(db, orders: list[dict]): if not orders: return await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders]) - await db.executemany( - "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", - [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders], - ) + junction_rows = [ + (d["sync_run_id"], d["order_number"], d["status_at_run"]) + for d in orders + if _record_in_junction(d["status_at_run"]) + ] + if junction_rows: + await db.executemany( + "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", + junction_rows, + ) all_items: list[tuple] = [] order_numbers_with_items: set = set() @@ -314,10 +385,11 @@ async def _insert_single_order(db, d: dict): Caller wraps in SAVEPOINT so a per-row failure doesn't poison the batch. """ await db.execute(_ORDERS_UPSERT_SQL, _orders_row(d)) - await db.execute( - "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", - (d["sync_run_id"], d["order_number"], d["status_at_run"]), - ) + if _record_in_junction(d["status_at_run"]): + await db.execute( + "INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)", + (d["sync_run_id"], d["order_number"], d["status_at_run"]), + ) raw_items = d.get("items", []) if raw_items: await db.execute("DELETE FROM order_items WHERE order_number = ?", (d["order_number"],)) diff --git a/deploy.ps1 b/deploy.ps1 index c6ebfcf..d96d1fa 100644 --- a/deploy.ps1 +++ b/deploy.ps1 @@ -431,26 +431,39 @@ if ($NssmExe) { $existingService = Get-Service -Name $ServiceName -ErrorAction SilentlyContinue - if ($existingService) { - Write-Info "Serviciu existent, restarteaza..." - & $NssmExe restart $ServiceName - Write-OK "Serviciu $ServiceName restartat" - } else { + if (-not $existingService) { Write-Info "Instalez serviciu $ServiceName cu NSSM..." & $NssmExe install $ServiceName (Join-Path $RepoPath "start.bat") & $NssmExe set $ServiceName AppDirectory $RepoPath & $NssmExe set $ServiceName DisplayName "GoMag Vending Import Manager" & $NssmExe set $ServiceName Description "Import comenzi web GoMag -> ROA Oracle" & $NssmExe set $ServiceName Start SERVICE_AUTO_START - & $NssmExe set $ServiceName AppStdout (Join-Path $RepoPath "logs\service_stdout.log") - & $NssmExe set $ServiceName AppStderr (Join-Path $RepoPath "logs\service_stderr.log") - & $NssmExe set $ServiceName AppRotateFiles 1 - & $NssmExe set $ServiceName AppRotateOnline 1 - & $NssmExe set $ServiceName AppRotateBytes 10485760 - & $NssmExe start $ServiceName - Write-OK "Serviciu $ServiceName instalat si pornit" + } else { + Write-Info "Serviciu existent, il opresc pentru mentenanta..." + & $NssmExe stop $ServiceName 2>$null + Start-Sleep -Seconds 3 } + # Mentenanta DB + log-uri cu serviciul oprit: prune istoric (7z) + VACUUM + # (reclaim disc) + cleanup log-uri (7z). Non-fatal daca esueaza. + Write-Info "Mentenanta DB/log-uri (prune + VACUUM + cleanup)..." + try { + & $VenvPy (Join-Path $RepoPath "scripts\db_maintenance.py") --history-days 7 --log-days 7 + } catch { + Write-Warn "Mentenanta DB a esuat (continui): $_" + } + + # (Re)aplica config log/rotatie de fiecare data — idempotent, astfel rotatia + # ajunge si pe serviciile instalate inainte ca aceste setari sa existe. + & $NssmExe set $ServiceName AppStdout (Join-Path $RepoPath "logs\service_stdout.log") + & $NssmExe set $ServiceName AppStderr (Join-Path $RepoPath "logs\service_stderr.log") + & $NssmExe set $ServiceName AppRotateFiles 1 + & $NssmExe set $ServiceName AppRotateOnline 1 + & $NssmExe set $ServiceName AppRotateBytes 10485760 + + & $NssmExe start $ServiceName + Write-OK "Serviciu $ServiceName pornit (mentenanta + rotatie aplicate)" + } else { # Fallback: Task Scheduler Write-Warn "NSSM nu este instalat" diff --git a/scripts/db_maintenance.py b/scripts/db_maintenance.py new file mode 100644 index 0000000..acd6826 --- /dev/null +++ b/scripts/db_maintenance.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +One-shot SQLite + log maintenance, invoked by deploy.ps1 while the GoMagVending +service is stopped. + +What it does: + 1. Prune audit history older than --history-days (sync_runs, sync_run_orders, + orphaned sync_phase_failures). Business tables (orders, order_items) are + NEVER touched. + 2. Enable PRAGMA auto_vacuum=INCREMENTAL and run a full VACUUM to reclaim disk. + 3. Delete log files older than --log-days from logs/. + +Plain sqlite3 only — no app imports, no Oracle, no event loop — so it runs even +if the app/Oracle env isn't set up. + +Usage: + python scripts/db_maintenance.py # defaults: 7/7 days + python scripts/db_maintenance.py --history-days 7 --log-days 7 + python scripts/db_maintenance.py --db C:\\path\\import.db +""" +import argparse +import os +import sqlite3 +import sys +import time +from datetime import datetime, timedelta + +REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +DEFAULT_DB = os.path.join(REPO_ROOT, "api", "data", "import.db") +DEFAULT_LOGS = os.path.join(REPO_ROOT, "logs") + + +def prune_and_vacuum(db_path: str, history_days: int) -> None: + cutoff = (datetime.now() - timedelta(days=history_days)).strftime("%Y-%m-%d") + before = os.path.getsize(db_path) / 1048576.0 + conn = sqlite3.connect(db_path, timeout=120) + try: + cur = conn.cursor() + cur.execute( + "DELETE FROM sync_run_orders WHERE sync_run_id IN " + "(SELECT run_id FROM sync_runs WHERE substr(started_at,1,10) < ?)", + (cutoff,)) + junction = cur.rowcount + cur.execute( + "DELETE FROM sync_runs WHERE substr(started_at,1,10) < ?", (cutoff,)) + runs = cur.rowcount + cur.execute( + "DELETE FROM sync_phase_failures " + "WHERE run_id NOT IN (SELECT run_id FROM sync_runs)") + conn.commit() + # auto_vacuum mode change only takes effect on the next VACUUM. + conn.isolation_level = None + conn.execute("PRAGMA auto_vacuum=INCREMENTAL") + t0 = time.time() + conn.execute("VACUUM") + vac = time.time() - t0 + finally: + conn.close() + after = os.path.getsize(db_path) / 1048576.0 + print(f"[db_maintenance] cutoff<{cutoff} runs_deleted={runs} " + f"junction_deleted={junction} size {before:.1f}MB -> {after:.1f}MB " + f"(VACUUM {vac:.1f}s)") + + +def cleanup_logs(log_dir: str, log_days: int) -> None: + if not os.path.isdir(log_dir): + print(f"[db_maintenance] logs dir not found: {log_dir}") + return + cutoff = time.time() - log_days * 86400 + removed = 0 + for name in os.listdir(log_dir): + if ".log" not in name: + continue + path = os.path.join(log_dir, name) + try: + if os.path.isfile(path) and os.path.getmtime(path) < cutoff: + os.remove(path) + removed += 1 + except OSError as e: + print(f"[db_maintenance] could not remove {name}: {e}") + print(f"[db_maintenance] removed {removed} log file(s) older than {log_days}d") + + +def main() -> int: + ap = argparse.ArgumentParser(description="SQLite + log maintenance") + ap.add_argument("--db", default=DEFAULT_DB) + ap.add_argument("--logs-dir", default=DEFAULT_LOGS) + ap.add_argument("--history-days", type=int, default=7) + ap.add_argument("--log-days", type=int, default=7) + args = ap.parse_args() + + if not os.path.exists(args.db): + # Non-fatal: a fresh install may not have a DB yet. + print(f"[db_maintenance] DB not found, skipping: {args.db}", file=sys.stderr) + else: + prune_and_vacuum(args.db, args.history_days) + cleanup_logs(args.logs_dir, args.log_days) + return 0 + + +if __name__ == "__main__": + sys.exit(main())