feat(maintenance): guard DB + log growth (Option B + daily prune + rotation)

Root cause of the 2GB prod import.db: the sync_run_orders audit junction
recorded every order on every run; under the 1-minute scheduler ~98% of
21.7M rows were no-op ALREADY_IMPORTED re-observations. NSSM stdout/stderr
also grew unbounded (rotation never applied to the live service).

Changes:
- sqlite_service: skip ALREADY_IMPORTED rows in sync_run_orders (write-side
  guard, _SKIP_JUNCTION_STATUSES); add prune_sync_history(retention_days)
  with incremental_vacuum.
- maintenance_service (new): cleanup_old_logs + run_daily_maintenance.
- scheduler_service: start_maintenance_job (daily CronTrigger).
- main.py: RotatingFileHandler (sync_comenzi_current.log, 10MB x5) instead
  of a new timestamped file per start; schedule daily maintenance + one-shot
  catch-up at startup.
- scripts/db_maintenance.py (new): one-shot prune + VACUUM + log cleanup,
  plain sqlite3, invoked by deploy.ps1 while the service is stopped.
- deploy.ps1: stop -> run db_maintenance.py -> (re)apply NSSM AppRotate*
  idempotently -> start, so rotation reaches pre-existing services.

Retention defaults: 7 days history, 7 days logs.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-06-26 09:59:41 +00:00
parent ccc6a933fa
commit dcc5042586
6 changed files with 331 additions and 29 deletions

View File

@@ -0,0 +1,75 @@
"""Periodic maintenance: prune audit history + clean up old log files.
Keeps the SQLite DB and the logs/ directory from growing unbounded. The audit
tables (sync_runs, sync_run_orders) were the only DB growth source under the
1-minute scheduler; business tables (orders, order_items) are never touched.
The one-shot heavy reclaim (full VACUUM, run while the service is stopped) lives
in scripts/db_maintenance.py and is invoked by deploy.ps1.
"""
import logging
import os
import time
logger = logging.getLogger(__name__)
DEFAULT_HISTORY_RETENTION_DAYS = 7
DEFAULT_LOG_RETENTION_DAYS = 7
def _logs_dir() -> str:
"""Absolute path to the repo-root logs/ directory (matches main.py)."""
here = os.path.dirname(os.path.abspath(__file__))
return os.path.join(os.path.abspath(os.path.join(here, "..", "..", "..")), "logs")
def cleanup_old_logs(retention_days: int = DEFAULT_LOG_RETENTION_DAYS,
log_dir: str | None = None) -> int:
"""Delete log files older than `retention_days`. Returns count removed.
Targets any file with `.log` in its name (covers `sync_comenzi_current.log`,
NSSM `service_stdout.log`, and rotated backups like `*.log.3`). The live
rotating files stay fresh (recent mtime) so they fall inside the window.
"""
log_dir = log_dir or _logs_dir()
if not os.path.isdir(log_dir):
return 0
cutoff = time.time() - retention_days * 86400
removed = 0
for name in os.listdir(log_dir):
if ".log" not in name:
continue
path = os.path.join(log_dir, name)
try:
if os.path.isfile(path) and os.path.getmtime(path) < cutoff:
os.remove(path)
removed += 1
except OSError as e:
logger.warning(f"cleanup_old_logs: could not remove {name}: {e}")
if removed:
logger.info(f"cleanup_old_logs: removed {removed} file(s) older than "
f"{retention_days}d from {log_dir}")
return removed
async def run_daily_maintenance(
history_days: int = DEFAULT_HISTORY_RETENTION_DAYS,
log_days: int = DEFAULT_LOG_RETENTION_DAYS) -> dict:
"""Daily job: prune audit history (+reclaim pages) and clean old log files.
Each step is isolated — a failure in one does not skip the other.
"""
from . import sqlite_service
result: dict = {}
try:
result["db"] = await sqlite_service.prune_sync_history(history_days)
except Exception as e:
logger.warning(f"run_daily_maintenance: prune_sync_history failed: {e}")
result["db_error"] = str(e)
try:
result["logs_removed"] = cleanup_old_logs(log_days)
except Exception as e:
logger.warning(f"run_daily_maintenance: cleanup_old_logs failed: {e}")
result["logs_error"] = str(e)
return result

View File

@@ -1,6 +1,7 @@
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
logger = logging.getLogger(__name__)
@@ -42,6 +43,31 @@ def start_scheduler(interval_minutes: int = 10):
logger.info(f"Scheduler started with interval {interval_minutes}min")
def start_maintenance_job(hour: int = 3):
"""Schedule the daily DB/log maintenance job (prune history + cleanup logs).
Runs independently of the sync job — starts the scheduler if it isn't already
running so maintenance happens even when auto-sync is disabled.
"""
if _scheduler is None:
init_scheduler()
from . import maintenance_service
_scheduler.add_job(
maintenance_service.run_daily_maintenance,
trigger=CronTrigger(hour=hour, minute=0),
id="maintenance_job",
name="Daily DB/Log Maintenance",
replace_existing=True
)
if not _scheduler.running:
_scheduler.start()
logger.info(f"Maintenance job scheduled daily at {hour:02d}:00")
def stop_scheduler():
"""Stop the scheduler."""
global _is_running

View File

@@ -2,7 +2,7 @@ import json
import logging
import logging.handlers
import os
from datetime import datetime
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from ..database import get_sqlite, get_sqlite_sync
from ..constants import OrderStatus
@@ -114,6 +114,45 @@ async def update_sync_run(run_id: str, status: str, total_orders: int = 0,
await db.close()
async def prune_sync_history(retention_days: int = 7) -> dict:
"""Delete sync_runs + sync_run_orders older than `retention_days`.
Audit-only tables — `orders`/`order_items` (business data) are never touched.
Frees pages via incremental_vacuum (prod DB is auto_vacuum=INCREMENTAL after
the initial reclaim). Returns counts for logging. See _SKIP_JUNCTION_STATUSES
for the complementary write-side guard.
"""
cutoff = (datetime.now(_tz_bucharest).replace(tzinfo=None)
- timedelta(days=retention_days)).strftime("%Y-%m-%d")
db = await get_sqlite()
try:
cur = await db.execute(
"DELETE FROM sync_run_orders WHERE sync_run_id IN "
"(SELECT run_id FROM sync_runs WHERE substr(started_at,1,10) < ?)",
(cutoff,))
junction_deleted = cur.rowcount
cur = await db.execute(
"DELETE FROM sync_runs WHERE substr(started_at,1,10) < ?", (cutoff,))
runs_deleted = cur.rowcount
# Drop phase-failure rows orphaned by the run deletion.
await db.execute(
"DELETE FROM sync_phase_failures "
"WHERE run_id NOT IN (SELECT run_id FROM sync_runs)")
await db.commit()
try:
await db.execute("PRAGMA incremental_vacuum")
await db.commit()
except Exception as e: # auto_vacuum may be OFF on a fresh dev DB
logger.debug(f"prune_sync_history: incremental_vacuum skipped: {e}")
logger.info(
f"prune_sync_history: cutoff<{cutoff} runs_deleted={runs_deleted} "
f"junction_deleted={junction_deleted}")
return {"cutoff": cutoff, "runs_deleted": runs_deleted,
"junction_deleted": junction_deleted}
finally:
await db.close()
async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
customer_name: str, status: str, id_comanda: int = None,
id_partener: int = None, error_message: str = None,
@@ -171,8 +210,28 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
await db.close()
# Audit junction policy (DB-size guard):
# The sync_run_orders junction recorded EVERY order seen on EVERY run. Under the
# 1-minute scheduler, ~98% of rows were no-op ALREADY_IMPORTED re-observations,
# which grew the table to 21M+ rows / 2GB. We no longer record those: the order's
# current state still lives in `orders`; the junction now only lists orders a run
# actually touched (new / changed / skipped / errored / cancelled). Run-detail
# views therefore show only meaningful orders per run.
_SKIP_JUNCTION_STATUSES = {OrderStatus.ALREADY_IMPORTED.value}
def _record_in_junction(status_at_run: str) -> bool:
"""Whether this per-run status is worth persisting in sync_run_orders."""
return status_at_run not in _SKIP_JUNCTION_STATUSES
async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str):
"""Record that this run processed this order (junction table)."""
"""Record that this run processed this order (junction table).
No-op ALREADY_IMPORTED observations are skipped — see _SKIP_JUNCTION_STATUSES.
"""
if not _record_in_junction(status_at_run):
return
db = await get_sqlite()
try:
await db.execute("""
@@ -258,10 +317,16 @@ async def _insert_orders_only(db, orders: list[dict]):
if not orders:
return
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
[(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) for d in orders],
)
junction_rows = [
(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"]))
for d in orders
if _record_in_junction(d.get("status_at_run", d["status"]))
]
if junction_rows:
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
junction_rows,
)
async def _insert_valid_batch(db, orders: list[dict]):
@@ -273,10 +338,16 @@ async def _insert_valid_batch(db, orders: list[dict]):
if not orders:
return
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
[(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders],
)
junction_rows = [
(d["sync_run_id"], d["order_number"], d["status_at_run"])
for d in orders
if _record_in_junction(d["status_at_run"])
]
if junction_rows:
await db.executemany(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
junction_rows,
)
all_items: list[tuple] = []
order_numbers_with_items: set = set()
@@ -314,10 +385,11 @@ async def _insert_single_order(db, d: dict):
Caller wraps in SAVEPOINT so a per-row failure doesn't poison the batch.
"""
await db.execute(_ORDERS_UPSERT_SQL, _orders_row(d))
await db.execute(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
(d["sync_run_id"], d["order_number"], d["status_at_run"]),
)
if _record_in_junction(d["status_at_run"]):
await db.execute(
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
(d["sync_run_id"], d["order_number"], d["status_at_run"]),
)
raw_items = d.get("items", [])
if raw_items:
await db.execute("DELETE FROM order_items WHERE order_number = ?", (d["order_number"],))