zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
import pytest
 
from ..testing import wait_all_tasks_blocked, assert_checkpoints
import trio
from trio import open_memory_channel, EndOfChannel
 
 
async def test_channel():
    with pytest.raises(TypeError):
        open_memory_channel(1.0)
    with pytest.raises(ValueError):
        open_memory_channel(-1)
 
    s, r = open_memory_channel(2)
    repr(s)  # smoke test
    repr(r)  # smoke test
 
    s.send_nowait(1)
    with assert_checkpoints():
        await s.send(2)
    with pytest.raises(trio.WouldBlock):
        s.send_nowait(None)
 
    with assert_checkpoints():
        assert await r.receive() == 1
    assert r.receive_nowait() == 2
    with pytest.raises(trio.WouldBlock):
        r.receive_nowait()
 
    s.send_nowait("last")
    await s.aclose()
    with pytest.raises(trio.ClosedResourceError):
        await s.send("too late")
    with pytest.raises(trio.ClosedResourceError):
        s.send_nowait("too late")
    with pytest.raises(trio.ClosedResourceError):
        s.clone()
    await s.aclose()
 
    assert r.receive_nowait() == "last"
    with pytest.raises(EndOfChannel):
        await r.receive()
    await r.aclose()
    with pytest.raises(trio.ClosedResourceError):
        await r.receive()
    with pytest.raises(trio.ClosedResourceError):
        await r.receive_nowait()
    await r.aclose()
 
 
async def test_553(autojump_clock):
    s, r = open_memory_channel(1)
    with trio.move_on_after(10) as timeout_scope:
        await r.receive()
    assert timeout_scope.cancelled_caught
    await s.send("Test for PR #553")
 
 
async def test_channel_multiple_producers():
    async def producer(send_channel, i):
        # We close our handle when we're done with it
        async with send_channel:
            for j in range(3 * i, 3 * (i + 1)):
                await send_channel.send(j)
 
    send_channel, receive_channel = open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        # We hand out clones to all the new producers, and then close the
        # original.
        async with send_channel:
            for i in range(10):
                nursery.start_soon(producer, send_channel.clone(), i)
 
        got = []
        async for value in receive_channel:
            got.append(value)
 
        got.sort()
        assert got == list(range(30))
 
 
async def test_channel_multiple_consumers():
    successful_receivers = set()
    received = []
 
    async def consumer(receive_channel, i):
        async for value in receive_channel:
            successful_receivers.add(i)
            received.append(value)
 
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(1)
        async with send_channel:
            for i in range(5):
                nursery.start_soon(consumer, receive_channel, i)
            await wait_all_tasks_blocked()
            for i in range(10):
                await send_channel.send(i)
 
    assert successful_receivers == set(range(5))
    assert len(received) == 10
    assert set(received) == set(range(10))
 
 
async def test_close_basics():
    async def send_block(s, expect):
        with pytest.raises(expect):
            await s.send(None)
 
    # closing send -> other send gets ClosedResourceError
    s, r = open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(send_block, s, trio.ClosedResourceError)
        await wait_all_tasks_blocked()
        await s.aclose()
 
    # and it's persistent
    with pytest.raises(trio.ClosedResourceError):
        s.send_nowait(None)
    with pytest.raises(trio.ClosedResourceError):
        await s.send(None)
 
    # and receive gets EndOfChannel
    with pytest.raises(EndOfChannel):
        r.receive_nowait()
    with pytest.raises(EndOfChannel):
        await r.receive()
 
    # closing receive -> send gets BrokenResourceError
    s, r = open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(send_block, s, trio.BrokenResourceError)
        await wait_all_tasks_blocked()
        await r.aclose()
 
    # and it's persistent
    with pytest.raises(trio.BrokenResourceError):
        s.send_nowait(None)
    with pytest.raises(trio.BrokenResourceError):
        await s.send(None)
 
    # closing receive -> other receive gets ClosedResourceError
    async def receive_block(r):
        with pytest.raises(trio.ClosedResourceError):
            await r.receive()
 
    s, r = open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(receive_block, r)
        await wait_all_tasks_blocked()
        await r.aclose()
 
    # and it's persistent
    with pytest.raises(trio.ClosedResourceError):
        r.receive_nowait()
    with pytest.raises(trio.ClosedResourceError):
        await r.receive()
 
 
async def test_close_sync():
    async def send_block(s, expect):
        with pytest.raises(expect):
            await s.send(None)
 
    # closing send -> other send gets ClosedResourceError
    s, r = open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(send_block, s, trio.ClosedResourceError)
        await wait_all_tasks_blocked()
        s.close()
 
    # and it's persistent
    with pytest.raises(trio.ClosedResourceError):
        s.send_nowait(None)
    with pytest.raises(trio.ClosedResourceError):
        await s.send(None)
 
    # and receive gets EndOfChannel
    with pytest.raises(EndOfChannel):
        r.receive_nowait()
    with pytest.raises(EndOfChannel):
        await r.receive()
 
    # closing receive -> send gets BrokenResourceError
    s, r = open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(send_block, s, trio.BrokenResourceError)
        await wait_all_tasks_blocked()
        r.close()
 
    # and it's persistent
    with pytest.raises(trio.BrokenResourceError):
        s.send_nowait(None)
    with pytest.raises(trio.BrokenResourceError):
        await s.send(None)
 
    # closing receive -> other receive gets ClosedResourceError
    async def receive_block(r):
        with pytest.raises(trio.ClosedResourceError):
            await r.receive()
 
    s, r = open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(receive_block, r)
        await wait_all_tasks_blocked()
        r.close()
 
    # and it's persistent
    with pytest.raises(trio.ClosedResourceError):
        r.receive_nowait()
    with pytest.raises(trio.ClosedResourceError):
        await r.receive()
 
 
async def test_receive_channel_clone_and_close():
    s, r = open_memory_channel(10)
 
    r2 = r.clone()
    r3 = r.clone()
 
    s.send_nowait(None)
    await r.aclose()
    with r2:
        pass
 
    with pytest.raises(trio.ClosedResourceError):
        r.clone()
 
    with pytest.raises(trio.ClosedResourceError):
        r2.clone()
 
    # Can still send, r3 is still open
    s.send_nowait(None)
 
    await r3.aclose()
 
    # But now the receiver is really closed
    with pytest.raises(trio.BrokenResourceError):
        s.send_nowait(None)
 
 
async def test_close_multiple_send_handles():
    # With multiple send handles, closing one handle only wakes senders on
    # that handle, but others can continue just fine
    s1, r = open_memory_channel(0)
    s2 = s1.clone()
 
    async def send_will_close():
        with pytest.raises(trio.ClosedResourceError):
            await s1.send("nope")
 
    async def send_will_succeed():
        await s2.send("ok")
 
    async with trio.open_nursery() as nursery:
        nursery.start_soon(send_will_close)
        nursery.start_soon(send_will_succeed)
        await wait_all_tasks_blocked()
        await s1.aclose()
        assert await r.receive() == "ok"
 
 
async def test_close_multiple_receive_handles():
    # With multiple receive handles, closing one handle only wakes receivers on
    # that handle, but others can continue just fine
    s, r1 = open_memory_channel(0)
    r2 = r1.clone()
 
    async def receive_will_close():
        with pytest.raises(trio.ClosedResourceError):
            await r1.receive()
 
    async def receive_will_succeed():
        assert await r2.receive() == "ok"
 
    async with trio.open_nursery() as nursery:
        nursery.start_soon(receive_will_close)
        nursery.start_soon(receive_will_succeed)
        await wait_all_tasks_blocked()
        await r1.aclose()
        await s.send("ok")
 
 
async def test_inf_capacity():
    s, r = open_memory_channel(float("inf"))
 
    # It's accepted, and we can send all day without blocking
    with s:
        for i in range(10):
            s.send_nowait(i)
 
    got = []
    async for i in r:
        got.append(i)
    assert got == list(range(10))
 
 
async def test_statistics():
    s, r = open_memory_channel(2)
 
    assert s.statistics() == r.statistics()
    stats = s.statistics()
    assert stats.current_buffer_used == 0
    assert stats.max_buffer_size == 2
    assert stats.open_send_channels == 1
    assert stats.open_receive_channels == 1
    assert stats.tasks_waiting_send == 0
    assert stats.tasks_waiting_receive == 0
 
    s.send_nowait(None)
    assert s.statistics().current_buffer_used == 1
 
    s2 = s.clone()
    assert s.statistics().open_send_channels == 2
    await s.aclose()
    assert s2.statistics().open_send_channels == 1
 
    r2 = r.clone()
    assert s2.statistics().open_receive_channels == 2
    await r2.aclose()
    assert s2.statistics().open_receive_channels == 1
 
    async with trio.open_nursery() as nursery:
        s2.send_nowait(None)  # fill up the buffer
        assert s.statistics().current_buffer_used == 2
        nursery.start_soon(s2.send, None)
        nursery.start_soon(s2.send, None)
        await wait_all_tasks_blocked()
        assert s.statistics().tasks_waiting_send == 2
        nursery.cancel_scope.cancel()
    assert s.statistics().tasks_waiting_send == 0
 
    # empty out the buffer again
    try:
        while True:
            r.receive_nowait()
    except trio.WouldBlock:
        pass
 
    async with trio.open_nursery() as nursery:
        nursery.start_soon(r.receive)
        await wait_all_tasks_blocked()
        assert s.statistics().tasks_waiting_receive == 1
        nursery.cancel_scope.cancel()
    assert s.statistics().tasks_waiting_receive == 0
 
 
async def test_channel_fairness():
 
    # We can remove an item we just sent, and send an item back in after, if
    # no-one else is waiting.
    s, r = open_memory_channel(1)
    s.send_nowait(1)
    assert r.receive_nowait() == 1
    s.send_nowait(2)
    assert r.receive_nowait() == 2
 
    # But if someone else is waiting to receive, then they "own" the item we
    # send, so we can't receive it (even though we run first):
 
    result = None
 
    async def do_receive(r):
        nonlocal result
        result = await r.receive()
 
    async with trio.open_nursery() as nursery:
        nursery.start_soon(do_receive, r)
        await wait_all_tasks_blocked()
        s.send_nowait(2)
        with pytest.raises(trio.WouldBlock):
            r.receive_nowait()
    assert result == 2
 
    # And the analogous situation for send: if we free up a space, we can't
    # immediately send something in it if someone is already waiting to do
    # that
    s, r = open_memory_channel(1)
    s.send_nowait(1)
    with pytest.raises(trio.WouldBlock):
        s.send_nowait(None)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(s.send, 2)
        await wait_all_tasks_blocked()
        assert r.receive_nowait() == 1
        with pytest.raises(trio.WouldBlock):
            s.send_nowait(3)
        assert (await r.receive()) == 2
 
 
async def test_unbuffered():
    s, r = open_memory_channel(0)
    with pytest.raises(trio.WouldBlock):
        r.receive_nowait()
    with pytest.raises(trio.WouldBlock):
        s.send_nowait(1)
 
    async def do_send(s, v):
        with assert_checkpoints():
            await s.send(v)
 
    async with trio.open_nursery() as nursery:
        nursery.start_soon(do_send, s, 1)
        with assert_checkpoints():
            assert await r.receive() == 1
    with pytest.raises(trio.WouldBlock):
        r.receive_nowait()