From ebbdcd542bbcb16f726f22b9376d9854ef9b65b9 Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 25 Jun 2026 02:23:22 +0800 Subject: [PATCH 1/4] Add thread-safety contention tests for ISOProber stats and Scheduler health snapshots --- tests/test_thread_safety.py | 206 ++++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 tests/test_thread_safety.py diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py new file mode 100644 index 0000000..1109a88 --- /dev/null +++ b/tests/test_thread_safety.py @@ -0,0 +1,206 @@ +"""Thread-safety contention tests for lock-protected paths. + +Intentional OS-thread stress of lock-protected paths; production uses +event-loop cooperative scheduling + snapshot reads. +""" + +from __future__ import annotations + +import threading +import time +from unittest.mock import MagicMock + +from paperscout.models import CycleStatus +from paperscout.monitor import Scheduler, _compute_probe_success_rate +from paperscout.protocols import SOURCE_ISO_PROBE, SOURCE_WG21_INDEX +from paperscout.sources import ISOProber, WG21Index +from paperscout.storage import ProbeState, UserWatchlist +from tests.conftest import make_test_settings + +_STATS_KEYS = frozenset(ISOProber._STATS_TEMPLATE.keys()) +_HEALTH_SNAPSHOT_KEYS = frozenset( + { + "last_updated", + "poll_count", + "last_successful_poll", + "last_cycle_status", + "last_cycle_error", + "probe_stats", + "probe_success_rate", + } +) + +THREAD_JOIN_TIMEOUT = 5.0 +RESET_ITERATIONS = 1000 +SNAPSHOT_ITERATIONS = 1000 +BUMPER_ITERATIONS = 1000 +PUBLISH_ITERATIONS = 500 +READ_ITERATIONS = 500 + + +def _make_prober(fake_pool) -> ISOProber: + index = WG21Index(fake_pool) + state = ProbeState(fake_pool) + wl = MagicMock(spec=UserWatchlist) + wl.get_all_watched_paper_nums.return_value = set() + return ISOProber(index, state, user_watchlist=wl, cfg=make_test_settings()) + + +def _make_scheduler(fake_pool) -> Scheduler: + wg21 = MagicMock() + wg21.source_id = SOURCE_WG21_INDEX + iso = MagicMock() + iso.source_id = SOURCE_ISO_PROBE + user_watchlist = MagicMock(spec=UserWatchlist) + user_watchlist.matches_for_users.return_value = {} + state = ProbeState(fake_pool) + return Scheduler( + sources=[wg21, iso], + user_watchlist=user_watchlist, + state=state, + cfg=make_test_settings(), + ) + + +def _join_threads(threads: list[threading.Thread]) -> None: + for thread in threads: + thread.join(timeout=THREAD_JOIN_TIMEOUT) + assert not thread.is_alive(), f"thread {thread.name} did not finish within {THREAD_JOIN_TIMEOUT}s" + + +def _assert_valid_probe_stats(stats: dict[str, int]) -> None: + assert set(stats.keys()) == _STATS_KEYS + assert all(isinstance(v, int) and v >= 0 for v in stats.values()) + + +def _assert_health_snapshot_consistent(snap: dict) -> None: + assert set(snap.keys()) == _HEALTH_SNAPSHOT_KEYS + assert isinstance(snap["poll_count"], int) and snap["poll_count"] >= 0 + assert isinstance(snap["probe_stats"], dict) + assert all(isinstance(v, int) for v in snap["probe_stats"].values()) + assert snap["probe_success_rate"] == _compute_probe_success_rate(snap["probe_stats"]) + last_updated = snap["last_updated"] + assert last_updated is None or (isinstance(last_updated, str) and len(last_updated) > 0) + + +class TestISOProberStatsContention: + """Intentional OS-thread stress of ISOProber._stats_lock.""" + + def test_concurrent_bump_stat_totals(self, fake_pool): + """Concurrent _bump_stat() calls produce correct totals.""" + prober = _make_prober(fake_pool) + n_threads = 32 + bumps_per_thread = 100 + barrier = threading.Barrier(n_threads) + + def worker() -> None: + barrier.wait() + for _ in range(bumps_per_thread): + prober._bump_stat("miss") + + threads = [threading.Thread(target=worker, name=f"bump-{i}") for i in range(n_threads)] + for thread in threads: + thread.start() + _join_threads(threads) + assert prober.snapshot_stats()["miss"] == n_threads * bumps_per_thread + + def test_snapshot_stats_consistent_under_concurrent_reset(self, fake_pool): + """snapshot_stats() concurrent with _reset_stats() returns consistent state.""" + prober = _make_prober(fake_pool) + errors: list[BaseException] = [] + errors_lock = threading.Lock() + + def record_error(exc: BaseException) -> None: + with errors_lock: + errors.append(exc) + + def resetter() -> None: + try: + for _ in range(RESET_ITERATIONS): + prober._reset_stats() + except BaseException as exc: + record_error(exc) + + def snapshotter() -> None: + try: + for _ in range(SNAPSHOT_ITERATIONS): + _assert_valid_probe_stats(prober.snapshot_stats()) + except BaseException as exc: + record_error(exc) + + def bumper() -> None: + try: + for _ in range(BUMPER_ITERATIONS): + prober._bump_stat("miss") + except BaseException as exc: + record_error(exc) + + threads = [ + threading.Thread(target=resetter, name="resetter"), + threading.Thread(target=snapshotter, name="snapshot-0"), + threading.Thread(target=snapshotter, name="snapshot-1"), + threading.Thread(target=bumper, name="bumper-0"), + threading.Thread(target=bumper, name="bumper-1"), + ] + for thread in threads: + thread.start() + _join_threads(threads) + assert not errors, f"thread errors: {errors!r}" + + +class TestSchedulerHealthContention: + """Intentional OS-thread stress of Scheduler._health_lock.""" + + def test_health_snapshot_consistent_under_concurrent_publish(self, fake_pool): + """health_snapshot() from non-event-loop threads returns consistent data.""" + scheduler = _make_scheduler(fake_pool) + scheduler._poll_count = 0 + scheduler._last_probe_stats = {} + scheduler._last_cycle_status = None + scheduler._last_successful_poll = None + scheduler._publish_health_snapshot() + + errors: list[BaseException] = [] + errors_lock = threading.Lock() + + def record_error(exc: BaseException) -> None: + with errors_lock: + errors.append(exc) + + def writer() -> None: + try: + for i in range(PUBLISH_ITERATIONS): + scheduler._poll_count = i + 1 + scheduler._last_probe_stats = { + "miss": i % 10, + "error": i % 3, + "hit_recent": i % 5, + "hit_old": 0, + "hit_no_lm": 0, + "skipped_discovered": 0, + "skipped_in_index": 0, + } + scheduler._last_cycle_status = ( + CycleStatus.SUCCESS if i % 2 == 0 else CycleStatus.EMPTY + ) + scheduler._last_successful_poll = time.time() + scheduler._publish_health_snapshot() + except BaseException as exc: + record_error(exc) + + def reader() -> None: + try: + for _ in range(READ_ITERATIONS): + _assert_health_snapshot_consistent(scheduler.health_snapshot()) + except BaseException as exc: + record_error(exc) + + writer_thread = threading.Thread(target=writer, name="writer") + reader_threads = [ + threading.Thread(target=reader, name=f"reader-{i}") for i in range(6) + ] + writer_thread.start() + for thread in reader_threads: + thread.start() + _join_threads([writer_thread, *reader_threads]) + assert not errors, f"thread errors: {errors!r}" From f7bea58fbfd2bdfb276a8fc82fec951f0582a12c Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 25 Jun 2026 02:26:01 +0800 Subject: [PATCH 2/4] fixed lint error --- tests/test_thread_safety.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py index 1109a88..1a23d8c 100644 --- a/tests/test_thread_safety.py +++ b/tests/test_thread_safety.py @@ -65,7 +65,9 @@ def _make_scheduler(fake_pool) -> Scheduler: def _join_threads(threads: list[threading.Thread]) -> None: for thread in threads: thread.join(timeout=THREAD_JOIN_TIMEOUT) - assert not thread.is_alive(), f"thread {thread.name} did not finish within {THREAD_JOIN_TIMEOUT}s" + assert not thread.is_alive(), ( + f"thread {thread.name} did not finish within {THREAD_JOIN_TIMEOUT}s" + ) def _assert_valid_probe_stats(stats: dict[str, int]) -> None: @@ -196,9 +198,7 @@ def reader() -> None: record_error(exc) writer_thread = threading.Thread(target=writer, name="writer") - reader_threads = [ - threading.Thread(target=reader, name=f"reader-{i}") for i in range(6) - ] + reader_threads = [threading.Thread(target=reader, name=f"reader-{i}") for i in range(6)] writer_thread.start() for thread in reader_threads: thread.start() From 01a6b1e81973c61bcdf347d3f08092a2868f6fd2 Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 25 Jun 2026 02:30:16 +0800 Subject: [PATCH 3/4] fixed exception processing --- tests/test_thread_safety.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py index 1a23d8c..d01343a 100644 --- a/tests/test_thread_safety.py +++ b/tests/test_thread_safety.py @@ -109,10 +109,10 @@ def worker() -> None: def test_snapshot_stats_consistent_under_concurrent_reset(self, fake_pool): """snapshot_stats() concurrent with _reset_stats() returns consistent state.""" prober = _make_prober(fake_pool) - errors: list[BaseException] = [] + errors: list[Exception] = [] errors_lock = threading.Lock() - def record_error(exc: BaseException) -> None: + def record_error(exc: Exception) -> None: with errors_lock: errors.append(exc) @@ -120,21 +120,21 @@ def resetter() -> None: try: for _ in range(RESET_ITERATIONS): prober._reset_stats() - except BaseException as exc: + except Exception as exc: record_error(exc) def snapshotter() -> None: try: for _ in range(SNAPSHOT_ITERATIONS): _assert_valid_probe_stats(prober.snapshot_stats()) - except BaseException as exc: + except Exception as exc: record_error(exc) def bumper() -> None: try: for _ in range(BUMPER_ITERATIONS): prober._bump_stat("miss") - except BaseException as exc: + except Exception as exc: record_error(exc) threads = [ @@ -162,10 +162,10 @@ def test_health_snapshot_consistent_under_concurrent_publish(self, fake_pool): scheduler._last_successful_poll = None scheduler._publish_health_snapshot() - errors: list[BaseException] = [] + errors: list[Exception] = [] errors_lock = threading.Lock() - def record_error(exc: BaseException) -> None: + def record_error(exc: Exception) -> None: with errors_lock: errors.append(exc) @@ -187,14 +187,14 @@ def writer() -> None: ) scheduler._last_successful_poll = time.time() scheduler._publish_health_snapshot() - except BaseException as exc: + except Exception as exc: record_error(exc) def reader() -> None: try: for _ in range(READ_ITERATIONS): _assert_health_snapshot_consistent(scheduler.health_snapshot()) - except BaseException as exc: + except Exception as exc: record_error(exc) writer_thread = threading.Thread(target=writer, name="writer") From b4f5a925eb59481661548f3f9e50cf5937a83d17 Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 25 Jun 2026 02:50:52 +0800 Subject: [PATCH 4/4] - Added threading.Barrier(5) with barrier.wait() in all five workers so reset/snapshot/bump threads start concurrently. - Extended _assert_health_snapshot_consistent with str/None checks for last_cycle_status, last_cycle_error, and last_successful_poll, plus a float check for probe_success_rate (using str for status, not CycleStatus, matching production). - Documented that same-thread attribute writes happen-before _publish_health_snapshot() reads; _health_lock only guards cross-thread snapshot access. - Added module docstring NOTE that FakePool is not thread-safe and these tests don't call pool methods concurrently. --- tests/test_thread_safety.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py index d01343a..5005806 100644 --- a/tests/test_thread_safety.py +++ b/tests/test_thread_safety.py @@ -2,6 +2,9 @@ Intentional OS-thread stress of lock-protected paths; production uses event-loop cooperative scheduling + snapshot reads. + +NOTE: FakePool is not thread-safe. These tests do not call pool methods +concurrently; they only stress ISOProber._stats_lock and Scheduler._health_lock. """ from __future__ import annotations @@ -81,6 +84,10 @@ def _assert_health_snapshot_consistent(snap: dict) -> None: assert isinstance(snap["probe_stats"], dict) assert all(isinstance(v, int) for v in snap["probe_stats"].values()) assert snap["probe_success_rate"] == _compute_probe_success_rate(snap["probe_stats"]) + assert snap["probe_success_rate"] is None or isinstance(snap["probe_success_rate"], float) + assert snap["last_cycle_status"] is None or isinstance(snap["last_cycle_status"], str) + assert snap["last_cycle_error"] is None or isinstance(snap["last_cycle_error"], str) + assert snap["last_successful_poll"] is None or isinstance(snap["last_successful_poll"], str) last_updated = snap["last_updated"] assert last_updated is None or (isinstance(last_updated, str) and len(last_updated) > 0) @@ -111,6 +118,8 @@ def test_snapshot_stats_consistent_under_concurrent_reset(self, fake_pool): prober = _make_prober(fake_pool) errors: list[Exception] = [] errors_lock = threading.Lock() + n_threads = 5 + barrier = threading.Barrier(n_threads) def record_error(exc: Exception) -> None: with errors_lock: @@ -118,6 +127,7 @@ def record_error(exc: Exception) -> None: def resetter() -> None: try: + barrier.wait() for _ in range(RESET_ITERATIONS): prober._reset_stats() except Exception as exc: @@ -125,6 +135,7 @@ def resetter() -> None: def snapshotter() -> None: try: + barrier.wait() for _ in range(SNAPSHOT_ITERATIONS): _assert_valid_probe_stats(prober.snapshot_stats()) except Exception as exc: @@ -132,6 +143,7 @@ def snapshotter() -> None: def bumper() -> None: try: + barrier.wait() for _ in range(BUMPER_ITERATIONS): prober._bump_stat("miss") except Exception as exc: @@ -171,6 +183,9 @@ def record_error(exc: Exception) -> None: def writer() -> None: try: + # Writer thread is the sole mutator; attribute writes happen-before + # _publish_health_snapshot() reads on the same thread. _health_lock + # only guards _health_snapshot assignment vs reader threads. for i in range(PUBLISH_ITERATIONS): scheduler._poll_count = i + 1 scheduler._last_probe_stats = {