Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions tests/test_thread_safety.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""Thread-safety contention tests for lock-protected paths.

Intentional OS-thread stress of lock-protected paths; production uses
event-loop cooperative scheduling + snapshot reads.

NOTE: FakePool is not thread-safe. These tests do not call pool methods
concurrently; they only stress ISOProber._stats_lock and Scheduler._health_lock.
"""

from __future__ import annotations

import threading
import time
from unittest.mock import MagicMock

from paperscout.models import CycleStatus
from paperscout.monitor import Scheduler, _compute_probe_success_rate
from paperscout.protocols import SOURCE_ISO_PROBE, SOURCE_WG21_INDEX
from paperscout.sources import ISOProber, WG21Index
from paperscout.storage import ProbeState, UserWatchlist
from tests.conftest import make_test_settings

_STATS_KEYS = frozenset(ISOProber._STATS_TEMPLATE.keys())
_HEALTH_SNAPSHOT_KEYS = frozenset(
{
"last_updated",
"poll_count",
"last_successful_poll",
"last_cycle_status",
"last_cycle_error",
"probe_stats",
"probe_success_rate",
}
)

THREAD_JOIN_TIMEOUT = 5.0
RESET_ITERATIONS = 1000
SNAPSHOT_ITERATIONS = 1000
BUMPER_ITERATIONS = 1000
PUBLISH_ITERATIONS = 500
READ_ITERATIONS = 500


def _make_prober(fake_pool) -> ISOProber:
index = WG21Index(fake_pool)
state = ProbeState(fake_pool)
wl = MagicMock(spec=UserWatchlist)
wl.get_all_watched_paper_nums.return_value = set()
return ISOProber(index, state, user_watchlist=wl, cfg=make_test_settings())


def _make_scheduler(fake_pool) -> Scheduler:
wg21 = MagicMock()
wg21.source_id = SOURCE_WG21_INDEX
iso = MagicMock()
iso.source_id = SOURCE_ISO_PROBE
user_watchlist = MagicMock(spec=UserWatchlist)
user_watchlist.matches_for_users.return_value = {}
state = ProbeState(fake_pool)
return Scheduler(
sources=[wg21, iso],
user_watchlist=user_watchlist,
state=state,
cfg=make_test_settings(),
)


def _join_threads(threads: list[threading.Thread]) -> None:
for thread in threads:
thread.join(timeout=THREAD_JOIN_TIMEOUT)
assert not thread.is_alive(), (
f"thread {thread.name} did not finish within {THREAD_JOIN_TIMEOUT}s"
)


def _assert_valid_probe_stats(stats: dict[str, int]) -> None:
assert set(stats.keys()) == _STATS_KEYS
assert all(isinstance(v, int) and v >= 0 for v in stats.values())


def _assert_health_snapshot_consistent(snap: dict) -> None:
assert set(snap.keys()) == _HEALTH_SNAPSHOT_KEYS
assert isinstance(snap["poll_count"], int) and snap["poll_count"] >= 0
assert isinstance(snap["probe_stats"], dict)
assert all(isinstance(v, int) for v in snap["probe_stats"].values())
assert snap["probe_success_rate"] == _compute_probe_success_rate(snap["probe_stats"])
assert snap["probe_success_rate"] is None or isinstance(snap["probe_success_rate"], float)
assert snap["last_cycle_status"] is None or isinstance(snap["last_cycle_status"], str)
assert snap["last_cycle_error"] is None or isinstance(snap["last_cycle_error"], str)
assert snap["last_successful_poll"] is None or isinstance(snap["last_successful_poll"], str)
last_updated = snap["last_updated"]
assert last_updated is None or (isinstance(last_updated, str) and len(last_updated) > 0)


class TestISOProberStatsContention:
"""Intentional OS-thread stress of ISOProber._stats_lock."""

def test_concurrent_bump_stat_totals(self, fake_pool):
"""Concurrent _bump_stat() calls produce correct totals."""
prober = _make_prober(fake_pool)
n_threads = 32
bumps_per_thread = 100
barrier = threading.Barrier(n_threads)

def worker() -> None:
barrier.wait()
for _ in range(bumps_per_thread):
prober._bump_stat("miss")

threads = [threading.Thread(target=worker, name=f"bump-{i}") for i in range(n_threads)]
for thread in threads:
thread.start()
_join_threads(threads)
assert prober.snapshot_stats()["miss"] == n_threads * bumps_per_thread

def test_snapshot_stats_consistent_under_concurrent_reset(self, fake_pool):
"""snapshot_stats() concurrent with _reset_stats() returns consistent state."""
prober = _make_prober(fake_pool)
errors: list[Exception] = []
errors_lock = threading.Lock()
n_threads = 5
barrier = threading.Barrier(n_threads)

def record_error(exc: Exception) -> None:
with errors_lock:
errors.append(exc)

def resetter() -> None:
try:
barrier.wait()
for _ in range(RESET_ITERATIONS):
prober._reset_stats()
except Exception as exc:
record_error(exc)

def snapshotter() -> None:
try:
barrier.wait()
for _ in range(SNAPSHOT_ITERATIONS):
_assert_valid_probe_stats(prober.snapshot_stats())
except Exception as exc:
record_error(exc)

def bumper() -> None:
try:
barrier.wait()
for _ in range(BUMPER_ITERATIONS):
prober._bump_stat("miss")
except Exception as exc:
record_error(exc)

threads = [
threading.Thread(target=resetter, name="resetter"),
threading.Thread(target=snapshotter, name="snapshot-0"),
threading.Thread(target=snapshotter, name="snapshot-1"),
threading.Thread(target=bumper, name="bumper-0"),
threading.Thread(target=bumper, name="bumper-1"),
]
for thread in threads:
thread.start()
_join_threads(threads)
assert not errors, f"thread errors: {errors!r}"


class TestSchedulerHealthContention:
"""Intentional OS-thread stress of Scheduler._health_lock."""

def test_health_snapshot_consistent_under_concurrent_publish(self, fake_pool):
"""health_snapshot() from non-event-loop threads returns consistent data."""
scheduler = _make_scheduler(fake_pool)
scheduler._poll_count = 0
scheduler._last_probe_stats = {}
scheduler._last_cycle_status = None
scheduler._last_successful_poll = None
scheduler._publish_health_snapshot()

errors: list[Exception] = []
errors_lock = threading.Lock()

def record_error(exc: Exception) -> None:
with errors_lock:
errors.append(exc)

def writer() -> None:
try:
# Writer thread is the sole mutator; attribute writes happen-before
# _publish_health_snapshot() reads on the same thread. _health_lock
# only guards _health_snapshot assignment vs reader threads.
for i in range(PUBLISH_ITERATIONS):
scheduler._poll_count = i + 1
scheduler._last_probe_stats = {
"miss": i % 10,
"error": i % 3,
"hit_recent": i % 5,
"hit_old": 0,
"hit_no_lm": 0,
"skipped_discovered": 0,
"skipped_in_index": 0,
}
scheduler._last_cycle_status = (
CycleStatus.SUCCESS if i % 2 == 0 else CycleStatus.EMPTY
)
scheduler._last_successful_poll = time.time()
scheduler._publish_health_snapshot()
except Exception as exc:
record_error(exc)

def reader() -> None:
try:
for _ in range(READ_ITERATIONS):
_assert_health_snapshot_consistent(scheduler.health_snapshot())
except Exception as exc:
record_error(exc)

writer_thread = threading.Thread(target=writer, name="writer")
reader_threads = [threading.Thread(target=reader, name=f"reader-{i}") for i in range(6)]
writer_thread.start()
for thread in reader_threads:
thread.start()
_join_threads([writer_thread, *reader_threads])
assert not errors, f"thread errors: {errors!r}"