import pytest import threading from queue import Queue import time import sys from contextlib import contextmanager from .tutil import slow, gc_collect_harder, disable_threading_excepthook from .. import _thread_cache from .._thread_cache import start_thread_soon, ThreadCache def test_thread_cache_basics(): q = Queue() def fn(): raise RuntimeError("hi") def deliver(outcome): q.put(outcome) start_thread_soon(fn, deliver) outcome = q.get() with pytest.raises(RuntimeError, match="hi"): outcome.unwrap() def test_thread_cache_deref(): res = [False] class del_me: def __call__(self): return 42 def __del__(self): res[0] = True q = Queue() def deliver(outcome): q.put(outcome) start_thread_soon(del_me(), deliver) outcome = q.get() assert outcome.unwrap() == 42 gc_collect_harder() assert res[0] @slow def test_spawning_new_thread_from_deliver_reuses_starting_thread(): # We know that no-one else is using the thread cache, so if we keep # submitting new jobs the instant the previous one is finished, we should # keep getting the same thread over and over. This tests both that the # thread cache is LIFO, and that threads can be assigned new work *before* # deliver exits. # Make sure there are a few threads running, so if we weren't LIFO then we # could grab the wrong one. q = Queue() COUNT = 5 for _ in range(COUNT): start_thread_soon(lambda: time.sleep(1), lambda result: q.put(result)) for _ in range(COUNT): q.get().unwrap() seen_threads = set() done = threading.Event() def deliver(n, _): print(n) seen_threads.add(threading.current_thread()) if n == 0: done.set() else: start_thread_soon(lambda: None, lambda _: deliver(n - 1, _)) start_thread_soon(lambda: None, lambda _: deliver(5, _)) done.wait() assert len(seen_threads) == 1 @slow def test_idle_threads_exit(monkeypatch): # Temporarily set the idle timeout to something tiny, to speed up the # test. (But non-zero, so that the worker loop will at least yield the # CPU.) monkeypatch.setattr(_thread_cache, "IDLE_TIMEOUT", 0.0001) q = Queue() start_thread_soon(lambda: None, lambda _: q.put(threading.current_thread())) seen_thread = q.get() # Since the idle timeout is 0, after sleeping for 1 second, the thread # should have exited time.sleep(1) assert not seen_thread.is_alive() @contextmanager def _join_started_threads(): before = frozenset(threading.enumerate()) try: yield finally: for thread in threading.enumerate(): if thread not in before: thread.join() def test_race_between_idle_exit_and_job_assignment(monkeypatch): # This is a lock where the first few times you try to acquire it with a # timeout, it waits until the lock is available and then pretends to time # out. Using this in our thread cache implementation causes the following # sequence: # # 1. start_thread_soon grabs the worker thread, assigns it a job, and # releases its lock. # 2. The worker thread wakes up (because the lock has been released), but # the JankyLock lies to it and tells it that the lock timed out. So the # worker thread tries to exit. # 3. The worker thread checks for the race between exiting and being # assigned a job, and discovers that it *is* in the process of being # assigned a job, so it loops around and tries to acquire the lock # again. # 4. Eventually the JankyLock admits that the lock is available, and # everything proceeds as normal. class JankyLock: def __init__(self): self._lock = threading.Lock() self._counter = 3 def acquire(self, timeout=None): self._lock.acquire() if timeout is None: return True else: if self._counter > 0: self._counter -= 1 self._lock.release() return False return True def release(self): self._lock.release() monkeypatch.setattr(_thread_cache, "Lock", JankyLock) with disable_threading_excepthook(), _join_started_threads(): tc = ThreadCache() done = threading.Event() tc.start_thread_soon(lambda: None, lambda _: done.set()) done.wait() # Let's kill the thread we started, so it doesn't hang around until the # test suite finishes. Doesn't really do any harm, but it can be confusing # to see it in debug output. This is hacky, and leaves our ThreadCache # object in an inconsistent state... but it doesn't matter, because we're # not going to use it again anyway. tc.start_thread_soon(lambda: None, lambda _: sys.exit())