"""Tests for src.jsonlock — flock-based JSON locking primitives. Covers: - read_locked: missing file, valid JSON - write_locked: create-on-missing, mutator chain, atomic temp file - re-entrant write_locked (same thread, no deadlock) - LockTimeoutError on contended lock (cross-thread) """ from __future__ import annotations import fcntl import json import os import threading import time from pathlib import Path import pytest from src import jsonlock from src.jsonlock import ( LockTimeoutError, read_locked, write_locked, ) # ── read_locked ────────────────────────────────────────────────────── def test_read_locked_missing_file(tmp_path): """read_locked surfaces FileNotFoundError on missing path. (Callers are expected to catch and default to {}; the helper itself does not swallow this error.) """ target = tmp_path / "nope.json" with pytest.raises(FileNotFoundError): read_locked(str(target)) def test_read_locked_valid_json(tmp_path): target = tmp_path / "ok.json" target.write_text(json.dumps({"a": 1, "b": [2, 3]}), encoding="utf-8") assert read_locked(str(target)) == {"a": 1, "b": [2, 3]} # ── write_locked ───────────────────────────────────────────────────── def test_write_locked_creates_file(tmp_path): target = tmp_path / "new.json" seen: list = [] def _mut(data): seen.append(dict(data)) data["created"] = True return data out = write_locked(str(target), _mut) assert out == {"created": True} assert seen == [{}] assert target.exists() assert json.loads(target.read_text()) == {"created": True} def test_write_locked_mutator_applied(tmp_path): target = tmp_path / "exists.json" target.write_text(json.dumps({"counter": 1}), encoding="utf-8") seen: list = [] def _mut(data): seen.append(dict(data)) data["counter"] = data.get("counter", 0) + 1 return data out = write_locked(str(target), _mut) assert out == {"counter": 2} assert seen == [{"counter": 1}] assert json.loads(target.read_text()) == {"counter": 2} def test_write_locked_atomic(tmp_path): """After write_locked returns, the .tmp sibling must be gone (rename clean).""" target = tmp_path / "atomic.json" write_locked(str(target), lambda d: {"x": 42}) assert target.exists() assert not (tmp_path / "atomic.json.tmp").exists() assert json.loads(target.read_text()) == {"x": 42} # ── re-entry guard ─────────────────────────────────────────────────── def test_reentrant_write_locked(tmp_path): """Same-thread write_locked nested inside the mutator must not deadlock.""" target = tmp_path / "reentry.json" def _outer(data): data["outer"] = True # Re-entrant call to the same path — must skip flock acquisition # (would deadlock otherwise on the held LOCK_EX). write_locked(str(target), _inner) # Re-read after inner; the inner write replaced the file. return data def _inner(data): # Inner observes whatever is currently on disk (data is read at start # of write_locked; we just stash an inner marker). data["inner"] = True return data out = write_locked(str(target), _outer) # The outer mutator's `data` is independent of the inner write and # is what gets persisted last (replaces the inner-only file). persisted = json.loads(target.read_text()) assert persisted.get("outer") is True # `out` is the dict the outer mutator returned. assert out is not None # ── timeout ────────────────────────────────────────────────────────── def test_lock_timeout_raises(tmp_path, monkeypatch): """If another thread holds LOCK_EX, write_locked must give up with LockTimeoutError. We patch _TIMEOUT_SEC down to keep the test fast (the real value of 5s × 2 retries would make the test take 10s). """ monkeypatch.setattr(jsonlock, "_TIMEOUT_SEC", 0.1) monkeypatch.setattr(jsonlock, "_POLL_INTERVAL", 0.01) target = tmp_path / "contended.json" target.write_text("{}", encoding="utf-8") lock_path = str(target) + ".lock" holder_acquired = threading.Event() holder_release = threading.Event() def _holder(): fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o644) try: fcntl.flock(fd, fcntl.LOCK_EX) holder_acquired.set() # Hold until the test signals release. holder_release.wait(timeout=5.0) try: fcntl.flock(fd, fcntl.LOCK_UN) except OSError: pass finally: os.close(fd) t = threading.Thread(target=_holder, daemon=True) t.start() assert holder_acquired.wait(timeout=2.0), "holder thread did not acquire lock" try: with pytest.raises(LockTimeoutError): write_locked(str(target), lambda d: {"should": "fail"}) finally: holder_release.set() t.join(timeout=2.0)