zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import itertools
 
import pytest
 
from ... import _core
from ...testing import assert_checkpoints, wait_all_tasks_blocked
 
pytestmark = pytest.mark.filterwarnings(
    "ignore:.*UnboundedQueue:trio.TrioDeprecationWarning"
)
 
 
async def test_UnboundedQueue_basic():
    q = _core.UnboundedQueue()
    q.put_nowait("hi")
    assert await q.get_batch() == ["hi"]
    with pytest.raises(_core.WouldBlock):
        q.get_batch_nowait()
    q.put_nowait(1)
    q.put_nowait(2)
    q.put_nowait(3)
    assert q.get_batch_nowait() == [1, 2, 3]
 
    assert q.empty()
    assert q.qsize() == 0
    q.put_nowait(None)
    assert not q.empty()
    assert q.qsize() == 1
 
    stats = q.statistics()
    assert stats.qsize == 1
    assert stats.tasks_waiting == 0
 
    # smoke test
    repr(q)
 
 
async def test_UnboundedQueue_blocking():
    record = []
    q = _core.UnboundedQueue()
 
    async def get_batch_consumer():
        while True:
            batch = await q.get_batch()
            assert batch
            record.append(batch)
 
    async def aiter_consumer():
        async for batch in q:
            assert batch
            record.append(batch)
 
    for consumer in (get_batch_consumer, aiter_consumer):
        record.clear()
        async with _core.open_nursery() as nursery:
            nursery.start_soon(consumer)
            await _core.wait_all_tasks_blocked()
            stats = q.statistics()
            assert stats.qsize == 0
            assert stats.tasks_waiting == 1
            q.put_nowait(10)
            q.put_nowait(11)
            await _core.wait_all_tasks_blocked()
            q.put_nowait(12)
            await _core.wait_all_tasks_blocked()
            assert record == [[10, 11], [12]]
            nursery.cancel_scope.cancel()
 
 
async def test_UnboundedQueue_fairness():
    q = _core.UnboundedQueue()
 
    # If there's no-one else around, we can put stuff in and take it out
    # again, no problem
    q.put_nowait(1)
    q.put_nowait(2)
    assert q.get_batch_nowait() == [1, 2]
 
    result = None
 
    async def get_batch(q):
        nonlocal result
        result = await q.get_batch()
 
    # But if someone else is waiting to read, then they get dibs
    async with _core.open_nursery() as nursery:
        nursery.start_soon(get_batch, q)
        await _core.wait_all_tasks_blocked()
        q.put_nowait(3)
        q.put_nowait(4)
        with pytest.raises(_core.WouldBlock):
            q.get_batch_nowait()
    assert result == [3, 4]
 
    # If two tasks are trying to read, they alternate
    record = []
 
    async def reader(name):
        while True:
            record.append((name, await q.get_batch()))
 
    async with _core.open_nursery() as nursery:
        nursery.start_soon(reader, "a")
        await _core.wait_all_tasks_blocked()
        nursery.start_soon(reader, "b")
        await _core.wait_all_tasks_blocked()
 
        for i in range(20):
            q.put_nowait(i)
            await _core.wait_all_tasks_blocked()
 
        nursery.cancel_scope.cancel()
 
    assert record == list(zip(itertools.cycle("ab"), [[i] for i in range(20)]))
 
 
async def test_UnboundedQueue_trivial_yields():
    q = _core.UnboundedQueue()
 
    q.put_nowait(None)
    with assert_checkpoints():
        await q.get_batch()
 
    q.put_nowait(None)
    with assert_checkpoints():
        async for _ in q:  # noqa # pragma: no branch
            break
 
 
async def test_UnboundedQueue_no_spurious_wakeups():
    # If we have two tasks waiting, and put two items into the queue... then
    # only one task wakes up
    record = []
 
    async def getter(q, i):
        got = await q.get_batch()
        record.append((i, got))
 
    async with _core.open_nursery() as nursery:
        q = _core.UnboundedQueue()
        nursery.start_soon(getter, q, 1)
        await wait_all_tasks_blocked()
        nursery.start_soon(getter, q, 2)
        await wait_all_tasks_blocked()
 
        for i in range(10):
            q.put_nowait(i)
        await wait_all_tasks_blocked()
 
        assert record == [(1, list(range(10)))]
 
        nursery.cancel_scope.cancel()