import itertools
|
|
import pytest
|
|
from ... import _core
|
from ...testing import assert_checkpoints, wait_all_tasks_blocked
|
|
pytestmark = pytest.mark.filterwarnings(
|
"ignore:.*UnboundedQueue:trio.TrioDeprecationWarning"
|
)
|
|
|
async def test_UnboundedQueue_basic():
|
q = _core.UnboundedQueue()
|
q.put_nowait("hi")
|
assert await q.get_batch() == ["hi"]
|
with pytest.raises(_core.WouldBlock):
|
q.get_batch_nowait()
|
q.put_nowait(1)
|
q.put_nowait(2)
|
q.put_nowait(3)
|
assert q.get_batch_nowait() == [1, 2, 3]
|
|
assert q.empty()
|
assert q.qsize() == 0
|
q.put_nowait(None)
|
assert not q.empty()
|
assert q.qsize() == 1
|
|
stats = q.statistics()
|
assert stats.qsize == 1
|
assert stats.tasks_waiting == 0
|
|
# smoke test
|
repr(q)
|
|
|
async def test_UnboundedQueue_blocking():
|
record = []
|
q = _core.UnboundedQueue()
|
|
async def get_batch_consumer():
|
while True:
|
batch = await q.get_batch()
|
assert batch
|
record.append(batch)
|
|
async def aiter_consumer():
|
async for batch in q:
|
assert batch
|
record.append(batch)
|
|
for consumer in (get_batch_consumer, aiter_consumer):
|
record.clear()
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(consumer)
|
await _core.wait_all_tasks_blocked()
|
stats = q.statistics()
|
assert stats.qsize == 0
|
assert stats.tasks_waiting == 1
|
q.put_nowait(10)
|
q.put_nowait(11)
|
await _core.wait_all_tasks_blocked()
|
q.put_nowait(12)
|
await _core.wait_all_tasks_blocked()
|
assert record == [[10, 11], [12]]
|
nursery.cancel_scope.cancel()
|
|
|
async def test_UnboundedQueue_fairness():
|
q = _core.UnboundedQueue()
|
|
# If there's no-one else around, we can put stuff in and take it out
|
# again, no problem
|
q.put_nowait(1)
|
q.put_nowait(2)
|
assert q.get_batch_nowait() == [1, 2]
|
|
result = None
|
|
async def get_batch(q):
|
nonlocal result
|
result = await q.get_batch()
|
|
# But if someone else is waiting to read, then they get dibs
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(get_batch, q)
|
await _core.wait_all_tasks_blocked()
|
q.put_nowait(3)
|
q.put_nowait(4)
|
with pytest.raises(_core.WouldBlock):
|
q.get_batch_nowait()
|
assert result == [3, 4]
|
|
# If two tasks are trying to read, they alternate
|
record = []
|
|
async def reader(name):
|
while True:
|
record.append((name, await q.get_batch()))
|
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(reader, "a")
|
await _core.wait_all_tasks_blocked()
|
nursery.start_soon(reader, "b")
|
await _core.wait_all_tasks_blocked()
|
|
for i in range(20):
|
q.put_nowait(i)
|
await _core.wait_all_tasks_blocked()
|
|
nursery.cancel_scope.cancel()
|
|
assert record == list(zip(itertools.cycle("ab"), [[i] for i in range(20)]))
|
|
|
async def test_UnboundedQueue_trivial_yields():
|
q = _core.UnboundedQueue()
|
|
q.put_nowait(None)
|
with assert_checkpoints():
|
await q.get_batch()
|
|
q.put_nowait(None)
|
with assert_checkpoints():
|
async for _ in q: # noqa # pragma: no branch
|
break
|
|
|
async def test_UnboundedQueue_no_spurious_wakeups():
|
# If we have two tasks waiting, and put two items into the queue... then
|
# only one task wakes up
|
record = []
|
|
async def getter(q, i):
|
got = await q.get_batch()
|
record.append((i, got))
|
|
async with _core.open_nursery() as nursery:
|
q = _core.UnboundedQueue()
|
nursery.start_soon(getter, q, 1)
|
await wait_all_tasks_blocked()
|
nursery.start_soon(getter, q, 2)
|
await wait_all_tasks_blocked()
|
|
for i in range(10):
|
q.put_nowait(i)
|
await wait_all_tasks_blocked()
|
|
assert record == [(1, list(range(10)))]
|
|
nursery.cancel_scope.cancel()
|