"""Teste T6/T7: liveness probe worker + backup online SQLite.""" from __future__ import annotations import os import sqlite3 import tempfile from datetime import datetime, timedelta, timezone import pytest @pytest.fixture() def env(monkeypatch): tmp = tempfile.mkdtemp() monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) from app.config import get_settings get_settings.cache_clear() from app.db import get_connection, init_db init_db() yield get_connection, get_settings() get_settings.cache_clear() # --------------------------------------------------------------------------- # # T6 — worker liveness probe # # --------------------------------------------------------------------------- # def _set_beat(conn, dt: datetime | None): conn.execute( "UPDATE worker_heartbeat SET last_beat=? WHERE id=1", (dt.isoformat(timespec="seconds") if dt else None,), ) def test_healthy_when_beat_fresh(env): from app.worker.healthcheck import worker_healthy get_connection, settings = env conn = get_connection() try: _set_beat(conn, datetime.now(timezone.utc)) assert worker_healthy(conn, settings) is True finally: conn.close() def test_unhealthy_when_beat_stale(env): from app.worker.healthcheck import worker_healthy get_connection, settings = env conn = get_connection() try: stale = datetime.now(timezone.utc) - timedelta(seconds=settings.worker_heartbeat_stale_s + 60) _set_beat(conn, stale) assert worker_healthy(conn, settings) is False finally: conn.close() def test_unhealthy_when_never_started(env): from app.worker.healthcheck import worker_healthy get_connection, settings = env conn = get_connection() try: _set_beat(conn, None) # never started assert worker_healthy(conn, settings) is False finally: conn.close() # --------------------------------------------------------------------------- # # T7 — backup online # # --------------------------------------------------------------------------- # def test_backup_db_roundtrip(env): from tools.backup import backup_db get_connection, settings = env conn = get_connection() try: conn.execute( "INSERT INTO submissions (idempotency_key, status, payload_json) VALUES ('bk1','queued','{}')" ) finally: conn.close() dest = settings.db_path.parent / "backups" / "snap.db" out = backup_db(settings.db_path, dest) assert out.exists() # Copia contine randul scris (backup consistent, nu fisier gol). bk = sqlite3.connect(out) try: n = bk.execute("SELECT COUNT(*) FROM submissions WHERE idempotency_key='bk1'").fetchone()[0] finally: bk.close() assert n == 1 def test_backup_prune_keeps_newest(env): from tools.backup import prune get_connection, settings = env bdir = settings.db_path.parent / "backups" bdir.mkdir(parents=True, exist_ok=True) # Creeaza 5 snapshot-uri cu nume-timestamp crescator. names = [f"autopass-2026010{i}-000000.db" for i in range(1, 6)] for n in names: (bdir / n).write_bytes(b"x") removed = prune(bdir, keep=2) remaining = sorted(p.name for p in bdir.glob("autopass-*.db")) assert len(remaining) == 2 assert remaining == names[-2:] # cele mai noi doua assert len(removed) == 3