Merge feat/db-log-maintenance: guard DB + log growth (prune + Option B + rotation)
This commit is contained in:
@@ -1,9 +1,10 @@
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime
|
||||
from fastapi import FastAPI
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from pathlib import Path
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
|
||||
from .config import settings
|
||||
@@ -19,8 +20,12 @@ _stream_handler.setFormatter(_formatter)
|
||||
|
||||
_log_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'logs')
|
||||
os.makedirs(_log_dir, exist_ok=True)
|
||||
_log_filename = f"sync_comenzi_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
||||
_file_handler = logging.FileHandler(os.path.join(_log_dir, _log_filename), encoding='utf-8')
|
||||
# Rotating handler (10MB x 5 backups) instead of a new timestamped file per
|
||||
# start — caps log growth and stops file proliferation across restarts. Fixed
|
||||
# name still matches the QA glob `sync_comenzi_*.log`.
|
||||
_file_handler = logging.handlers.RotatingFileHandler(
|
||||
os.path.join(_log_dir, "sync_comenzi_current.log"),
|
||||
maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8')
|
||||
_file_handler.setFormatter(_formatter)
|
||||
|
||||
_root_logger = logging.getLogger()
|
||||
@@ -54,6 +59,15 @@ async def lifespan(app: FastAPI):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Daily DB/log maintenance (prune audit history + cleanup old logs) + a
|
||||
# one-shot catch-up so a long-down service reclaims immediately on start.
|
||||
try:
|
||||
from .services import maintenance_service
|
||||
scheduler_service.start_maintenance_job()
|
||||
asyncio.create_task(maintenance_service.run_daily_maintenance())
|
||||
except Exception as e:
|
||||
logger.warning(f"Maintenance scheduling failed: {e}")
|
||||
|
||||
logger.info("GoMag Import Manager started")
|
||||
yield
|
||||
|
||||
|
||||
75
api/app/services/maintenance_service.py
Normal file
75
api/app/services/maintenance_service.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""Periodic maintenance: prune audit history + clean up old log files.
|
||||
|
||||
Keeps the SQLite DB and the logs/ directory from growing unbounded. The audit
|
||||
tables (sync_runs, sync_run_orders) were the only DB growth source under the
|
||||
1-minute scheduler; business tables (orders, order_items) are never touched.
|
||||
|
||||
The one-shot heavy reclaim (full VACUUM, run while the service is stopped) lives
|
||||
in scripts/db_maintenance.py and is invoked by deploy.ps1.
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_HISTORY_RETENTION_DAYS = 7
|
||||
DEFAULT_LOG_RETENTION_DAYS = 7
|
||||
|
||||
|
||||
def _logs_dir() -> str:
|
||||
"""Absolute path to the repo-root logs/ directory (matches main.py)."""
|
||||
here = os.path.dirname(os.path.abspath(__file__))
|
||||
return os.path.join(os.path.abspath(os.path.join(here, "..", "..", "..")), "logs")
|
||||
|
||||
|
||||
def cleanup_old_logs(retention_days: int = DEFAULT_LOG_RETENTION_DAYS,
|
||||
log_dir: str | None = None) -> int:
|
||||
"""Delete log files older than `retention_days`. Returns count removed.
|
||||
|
||||
Targets any file with `.log` in its name (covers `sync_comenzi_current.log`,
|
||||
NSSM `service_stdout.log`, and rotated backups like `*.log.3`). The live
|
||||
rotating files stay fresh (recent mtime) so they fall inside the window.
|
||||
"""
|
||||
log_dir = log_dir or _logs_dir()
|
||||
if not os.path.isdir(log_dir):
|
||||
return 0
|
||||
cutoff = time.time() - retention_days * 86400
|
||||
removed = 0
|
||||
for name in os.listdir(log_dir):
|
||||
if ".log" not in name:
|
||||
continue
|
||||
path = os.path.join(log_dir, name)
|
||||
try:
|
||||
if os.path.isfile(path) and os.path.getmtime(path) < cutoff:
|
||||
os.remove(path)
|
||||
removed += 1
|
||||
except OSError as e:
|
||||
logger.warning(f"cleanup_old_logs: could not remove {name}: {e}")
|
||||
if removed:
|
||||
logger.info(f"cleanup_old_logs: removed {removed} file(s) older than "
|
||||
f"{retention_days}d from {log_dir}")
|
||||
return removed
|
||||
|
||||
|
||||
async def run_daily_maintenance(
|
||||
history_days: int = DEFAULT_HISTORY_RETENTION_DAYS,
|
||||
log_days: int = DEFAULT_LOG_RETENTION_DAYS) -> dict:
|
||||
"""Daily job: prune audit history (+reclaim pages) and clean old log files.
|
||||
|
||||
Each step is isolated — a failure in one does not skip the other.
|
||||
"""
|
||||
from . import sqlite_service
|
||||
|
||||
result: dict = {}
|
||||
try:
|
||||
result["db"] = await sqlite_service.prune_sync_history(history_days)
|
||||
except Exception as e:
|
||||
logger.warning(f"run_daily_maintenance: prune_sync_history failed: {e}")
|
||||
result["db_error"] = str(e)
|
||||
try:
|
||||
result["logs_removed"] = cleanup_old_logs(log_days)
|
||||
except Exception as e:
|
||||
logger.warning(f"run_daily_maintenance: cleanup_old_logs failed: {e}")
|
||||
result["logs_error"] = str(e)
|
||||
return result
|
||||
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -42,6 +43,31 @@ def start_scheduler(interval_minutes: int = 10):
|
||||
logger.info(f"Scheduler started with interval {interval_minutes}min")
|
||||
|
||||
|
||||
def start_maintenance_job(hour: int = 3):
|
||||
"""Schedule the daily DB/log maintenance job (prune history + cleanup logs).
|
||||
|
||||
Runs independently of the sync job — starts the scheduler if it isn't already
|
||||
running so maintenance happens even when auto-sync is disabled.
|
||||
"""
|
||||
if _scheduler is None:
|
||||
init_scheduler()
|
||||
|
||||
from . import maintenance_service
|
||||
|
||||
_scheduler.add_job(
|
||||
maintenance_service.run_daily_maintenance,
|
||||
trigger=CronTrigger(hour=hour, minute=0),
|
||||
id="maintenance_job",
|
||||
name="Daily DB/Log Maintenance",
|
||||
replace_existing=True
|
||||
)
|
||||
|
||||
if not _scheduler.running:
|
||||
_scheduler.start()
|
||||
|
||||
logger.info(f"Maintenance job scheduled daily at {hour:02d}:00")
|
||||
|
||||
|
||||
def stop_scheduler():
|
||||
"""Stop the scheduler."""
|
||||
global _is_running
|
||||
|
||||
@@ -2,7 +2,7 @@ import json
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
from ..database import get_sqlite, get_sqlite_sync
|
||||
from ..constants import OrderStatus
|
||||
@@ -114,6 +114,45 @@ async def update_sync_run(run_id: str, status: str, total_orders: int = 0,
|
||||
await db.close()
|
||||
|
||||
|
||||
async def prune_sync_history(retention_days: int = 7) -> dict:
|
||||
"""Delete sync_runs + sync_run_orders older than `retention_days`.
|
||||
|
||||
Audit-only tables — `orders`/`order_items` (business data) are never touched.
|
||||
Frees pages via incremental_vacuum (prod DB is auto_vacuum=INCREMENTAL after
|
||||
the initial reclaim). Returns counts for logging. See _SKIP_JUNCTION_STATUSES
|
||||
for the complementary write-side guard.
|
||||
"""
|
||||
cutoff = (datetime.now(_tz_bucharest).replace(tzinfo=None)
|
||||
- timedelta(days=retention_days)).strftime("%Y-%m-%d")
|
||||
db = await get_sqlite()
|
||||
try:
|
||||
cur = await db.execute(
|
||||
"DELETE FROM sync_run_orders WHERE sync_run_id IN "
|
||||
"(SELECT run_id FROM sync_runs WHERE substr(started_at,1,10) < ?)",
|
||||
(cutoff,))
|
||||
junction_deleted = cur.rowcount
|
||||
cur = await db.execute(
|
||||
"DELETE FROM sync_runs WHERE substr(started_at,1,10) < ?", (cutoff,))
|
||||
runs_deleted = cur.rowcount
|
||||
# Drop phase-failure rows orphaned by the run deletion.
|
||||
await db.execute(
|
||||
"DELETE FROM sync_phase_failures "
|
||||
"WHERE run_id NOT IN (SELECT run_id FROM sync_runs)")
|
||||
await db.commit()
|
||||
try:
|
||||
await db.execute("PRAGMA incremental_vacuum")
|
||||
await db.commit()
|
||||
except Exception as e: # auto_vacuum may be OFF on a fresh dev DB
|
||||
logger.debug(f"prune_sync_history: incremental_vacuum skipped: {e}")
|
||||
logger.info(
|
||||
f"prune_sync_history: cutoff<{cutoff} runs_deleted={runs_deleted} "
|
||||
f"junction_deleted={junction_deleted}")
|
||||
return {"cutoff": cutoff, "runs_deleted": runs_deleted,
|
||||
"junction_deleted": junction_deleted}
|
||||
finally:
|
||||
await db.close()
|
||||
|
||||
|
||||
async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
|
||||
customer_name: str, status: str, id_comanda: int = None,
|
||||
id_partener: int = None, error_message: str = None,
|
||||
@@ -171,8 +210,28 @@ async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
|
||||
await db.close()
|
||||
|
||||
|
||||
# Audit junction policy (DB-size guard):
|
||||
# The sync_run_orders junction recorded EVERY order seen on EVERY run. Under the
|
||||
# 1-minute scheduler, ~98% of rows were no-op ALREADY_IMPORTED re-observations,
|
||||
# which grew the table to 21M+ rows / 2GB. We no longer record those: the order's
|
||||
# current state still lives in `orders`; the junction now only lists orders a run
|
||||
# actually touched (new / changed / skipped / errored / cancelled). Run-detail
|
||||
# views therefore show only meaningful orders per run.
|
||||
_SKIP_JUNCTION_STATUSES = {OrderStatus.ALREADY_IMPORTED.value}
|
||||
|
||||
|
||||
def _record_in_junction(status_at_run: str) -> bool:
|
||||
"""Whether this per-run status is worth persisting in sync_run_orders."""
|
||||
return status_at_run not in _SKIP_JUNCTION_STATUSES
|
||||
|
||||
|
||||
async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str):
|
||||
"""Record that this run processed this order (junction table)."""
|
||||
"""Record that this run processed this order (junction table).
|
||||
|
||||
No-op ALREADY_IMPORTED observations are skipped — see _SKIP_JUNCTION_STATUSES.
|
||||
"""
|
||||
if not _record_in_junction(status_at_run):
|
||||
return
|
||||
db = await get_sqlite()
|
||||
try:
|
||||
await db.execute("""
|
||||
@@ -258,10 +317,16 @@ async def _insert_orders_only(db, orders: list[dict]):
|
||||
if not orders:
|
||||
return
|
||||
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
|
||||
await db.executemany(
|
||||
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
|
||||
[(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"])) for d in orders],
|
||||
)
|
||||
junction_rows = [
|
||||
(d["sync_run_id"], d["order_number"], d.get("status_at_run", d["status"]))
|
||||
for d in orders
|
||||
if _record_in_junction(d.get("status_at_run", d["status"]))
|
||||
]
|
||||
if junction_rows:
|
||||
await db.executemany(
|
||||
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
|
||||
junction_rows,
|
||||
)
|
||||
|
||||
|
||||
async def _insert_valid_batch(db, orders: list[dict]):
|
||||
@@ -273,10 +338,16 @@ async def _insert_valid_batch(db, orders: list[dict]):
|
||||
if not orders:
|
||||
return
|
||||
await db.executemany(_ORDERS_UPSERT_SQL, [_orders_row(d) for d in orders])
|
||||
await db.executemany(
|
||||
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
|
||||
[(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders],
|
||||
)
|
||||
junction_rows = [
|
||||
(d["sync_run_id"], d["order_number"], d["status_at_run"])
|
||||
for d in orders
|
||||
if _record_in_junction(d["status_at_run"])
|
||||
]
|
||||
if junction_rows:
|
||||
await db.executemany(
|
||||
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
|
||||
junction_rows,
|
||||
)
|
||||
|
||||
all_items: list[tuple] = []
|
||||
order_numbers_with_items: set = set()
|
||||
@@ -314,10 +385,11 @@ async def _insert_single_order(db, d: dict):
|
||||
Caller wraps in SAVEPOINT so a per-row failure doesn't poison the batch.
|
||||
"""
|
||||
await db.execute(_ORDERS_UPSERT_SQL, _orders_row(d))
|
||||
await db.execute(
|
||||
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
|
||||
(d["sync_run_id"], d["order_number"], d["status_at_run"]),
|
||||
)
|
||||
if _record_in_junction(d["status_at_run"]):
|
||||
await db.execute(
|
||||
"INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?)",
|
||||
(d["sync_run_id"], d["order_number"], d["status_at_run"]),
|
||||
)
|
||||
raw_items = d.get("items", [])
|
||||
if raw_items:
|
||||
await db.execute("DELETE FROM order_items WHERE order_number = ?", (d["order_number"],))
|
||||
|
||||
37
deploy.ps1
37
deploy.ps1
@@ -431,26 +431,39 @@ if ($NssmExe) {
|
||||
|
||||
$existingService = Get-Service -Name $ServiceName -ErrorAction SilentlyContinue
|
||||
|
||||
if ($existingService) {
|
||||
Write-Info "Serviciu existent, restarteaza..."
|
||||
& $NssmExe restart $ServiceName
|
||||
Write-OK "Serviciu $ServiceName restartat"
|
||||
} else {
|
||||
if (-not $existingService) {
|
||||
Write-Info "Instalez serviciu $ServiceName cu NSSM..."
|
||||
& $NssmExe install $ServiceName (Join-Path $RepoPath "start.bat")
|
||||
& $NssmExe set $ServiceName AppDirectory $RepoPath
|
||||
& $NssmExe set $ServiceName DisplayName "GoMag Vending Import Manager"
|
||||
& $NssmExe set $ServiceName Description "Import comenzi web GoMag -> ROA Oracle"
|
||||
& $NssmExe set $ServiceName Start SERVICE_AUTO_START
|
||||
& $NssmExe set $ServiceName AppStdout (Join-Path $RepoPath "logs\service_stdout.log")
|
||||
& $NssmExe set $ServiceName AppStderr (Join-Path $RepoPath "logs\service_stderr.log")
|
||||
& $NssmExe set $ServiceName AppRotateFiles 1
|
||||
& $NssmExe set $ServiceName AppRotateOnline 1
|
||||
& $NssmExe set $ServiceName AppRotateBytes 10485760
|
||||
& $NssmExe start $ServiceName
|
||||
Write-OK "Serviciu $ServiceName instalat si pornit"
|
||||
} else {
|
||||
Write-Info "Serviciu existent, il opresc pentru mentenanta..."
|
||||
& $NssmExe stop $ServiceName 2>$null
|
||||
Start-Sleep -Seconds 3
|
||||
}
|
||||
|
||||
# Mentenanta DB + log-uri cu serviciul oprit: prune istoric (7z) + VACUUM
|
||||
# (reclaim disc) + cleanup log-uri (7z). Non-fatal daca esueaza.
|
||||
Write-Info "Mentenanta DB/log-uri (prune + VACUUM + cleanup)..."
|
||||
try {
|
||||
& $VenvPy (Join-Path $RepoPath "scripts\db_maintenance.py") --history-days 7 --log-days 7
|
||||
} catch {
|
||||
Write-Warn "Mentenanta DB a esuat (continui): $_"
|
||||
}
|
||||
|
||||
# (Re)aplica config log/rotatie de fiecare data — idempotent, astfel rotatia
|
||||
# ajunge si pe serviciile instalate inainte ca aceste setari sa existe.
|
||||
& $NssmExe set $ServiceName AppStdout (Join-Path $RepoPath "logs\service_stdout.log")
|
||||
& $NssmExe set $ServiceName AppStderr (Join-Path $RepoPath "logs\service_stderr.log")
|
||||
& $NssmExe set $ServiceName AppRotateFiles 1
|
||||
& $NssmExe set $ServiceName AppRotateOnline 1
|
||||
& $NssmExe set $ServiceName AppRotateBytes 10485760
|
||||
|
||||
& $NssmExe start $ServiceName
|
||||
Write-OK "Serviciu $ServiceName pornit (mentenanta + rotatie aplicate)"
|
||||
|
||||
} else {
|
||||
# Fallback: Task Scheduler
|
||||
Write-Warn "NSSM nu este instalat"
|
||||
|
||||
102
scripts/db_maintenance.py
Normal file
102
scripts/db_maintenance.py
Normal file
@@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
One-shot SQLite + log maintenance, invoked by deploy.ps1 while the GoMagVending
|
||||
service is stopped.
|
||||
|
||||
What it does:
|
||||
1. Prune audit history older than --history-days (sync_runs, sync_run_orders,
|
||||
orphaned sync_phase_failures). Business tables (orders, order_items) are
|
||||
NEVER touched.
|
||||
2. Enable PRAGMA auto_vacuum=INCREMENTAL and run a full VACUUM to reclaim disk.
|
||||
3. Delete log files older than --log-days from logs/.
|
||||
|
||||
Plain sqlite3 only — no app imports, no Oracle, no event loop — so it runs even
|
||||
if the app/Oracle env isn't set up.
|
||||
|
||||
Usage:
|
||||
python scripts/db_maintenance.py # defaults: 7/7 days
|
||||
python scripts/db_maintenance.py --history-days 7 --log-days 7
|
||||
python scripts/db_maintenance.py --db C:\\path\\import.db
|
||||
"""
|
||||
import argparse
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
DEFAULT_DB = os.path.join(REPO_ROOT, "api", "data", "import.db")
|
||||
DEFAULT_LOGS = os.path.join(REPO_ROOT, "logs")
|
||||
|
||||
|
||||
def prune_and_vacuum(db_path: str, history_days: int) -> None:
|
||||
cutoff = (datetime.now() - timedelta(days=history_days)).strftime("%Y-%m-%d")
|
||||
before = os.path.getsize(db_path) / 1048576.0
|
||||
conn = sqlite3.connect(db_path, timeout=120)
|
||||
try:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"DELETE FROM sync_run_orders WHERE sync_run_id IN "
|
||||
"(SELECT run_id FROM sync_runs WHERE substr(started_at,1,10) < ?)",
|
||||
(cutoff,))
|
||||
junction = cur.rowcount
|
||||
cur.execute(
|
||||
"DELETE FROM sync_runs WHERE substr(started_at,1,10) < ?", (cutoff,))
|
||||
runs = cur.rowcount
|
||||
cur.execute(
|
||||
"DELETE FROM sync_phase_failures "
|
||||
"WHERE run_id NOT IN (SELECT run_id FROM sync_runs)")
|
||||
conn.commit()
|
||||
# auto_vacuum mode change only takes effect on the next VACUUM.
|
||||
conn.isolation_level = None
|
||||
conn.execute("PRAGMA auto_vacuum=INCREMENTAL")
|
||||
t0 = time.time()
|
||||
conn.execute("VACUUM")
|
||||
vac = time.time() - t0
|
||||
finally:
|
||||
conn.close()
|
||||
after = os.path.getsize(db_path) / 1048576.0
|
||||
print(f"[db_maintenance] cutoff<{cutoff} runs_deleted={runs} "
|
||||
f"junction_deleted={junction} size {before:.1f}MB -> {after:.1f}MB "
|
||||
f"(VACUUM {vac:.1f}s)")
|
||||
|
||||
|
||||
def cleanup_logs(log_dir: str, log_days: int) -> None:
|
||||
if not os.path.isdir(log_dir):
|
||||
print(f"[db_maintenance] logs dir not found: {log_dir}")
|
||||
return
|
||||
cutoff = time.time() - log_days * 86400
|
||||
removed = 0
|
||||
for name in os.listdir(log_dir):
|
||||
if ".log" not in name:
|
||||
continue
|
||||
path = os.path.join(log_dir, name)
|
||||
try:
|
||||
if os.path.isfile(path) and os.path.getmtime(path) < cutoff:
|
||||
os.remove(path)
|
||||
removed += 1
|
||||
except OSError as e:
|
||||
print(f"[db_maintenance] could not remove {name}: {e}")
|
||||
print(f"[db_maintenance] removed {removed} log file(s) older than {log_days}d")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(description="SQLite + log maintenance")
|
||||
ap.add_argument("--db", default=DEFAULT_DB)
|
||||
ap.add_argument("--logs-dir", default=DEFAULT_LOGS)
|
||||
ap.add_argument("--history-days", type=int, default=7)
|
||||
ap.add_argument("--log-days", type=int, default=7)
|
||||
args = ap.parse_args()
|
||||
|
||||
if not os.path.exists(args.db):
|
||||
# Non-fatal: a fresh install may not have a DB yet.
|
||||
print(f"[db_maintenance] DB not found, skipping: {args.db}", file=sys.stderr)
|
||||
else:
|
||||
prune_and_vacuum(args.db, args.history_days)
|
||||
cleanup_logs(args.logs_dir, args.log_days)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user