1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# Generic stream tests
 
from contextlib import contextmanager
import random
 
from .. import _core
from .._highlevel_generic import aclose_forcefully
from .._abc import SendStream, ReceiveStream, Stream, HalfCloseableStream
from ._checkpoints import assert_checkpoints
 
 
class _ForceCloseBoth:
    def __init__(self, both):
        self._both = list(both)
 
    async def __aenter__(self):
        return self._both
 
    async def __aexit__(self, *args):
        try:
            await aclose_forcefully(self._both[0])
        finally:
            await aclose_forcefully(self._both[1])
 
 
@contextmanager
def _assert_raises(exc):
    __tracebackhide__ = True
    try:
        yield
    except exc:
        pass
    else:
        raise AssertionError("expected exception: {}".format(exc))
 
 
async def check_one_way_stream(stream_maker, clogged_stream_maker):
    """Perform a number of generic tests on a custom one-way stream
    implementation.
 
    Args:
      stream_maker: An async (!) function which returns a connected
          (:class:`~trio.abc.SendStream`, :class:`~trio.abc.ReceiveStream`)
          pair.
      clogged_stream_maker: Either None, or an async function similar to
          stream_maker, but with the extra property that the returned stream
          is in a state where ``send_all`` and
          ``wait_send_all_might_not_block`` will block until ``receive_some``
          has been called. This allows for more thorough testing of some edge
          cases, especially around ``wait_send_all_might_not_block``.
 
    Raises:
      AssertionError: if a test fails.
 
    """
    async with _ForceCloseBoth(await stream_maker()) as (s, r):
        assert isinstance(s, SendStream)
        assert isinstance(r, ReceiveStream)
 
        async def do_send_all(data):
            with assert_checkpoints():
                assert await s.send_all(data) is None
 
        async def do_receive_some(*args):
            with assert_checkpoints():
                return await r.receive_some(*args)
 
        async def checked_receive_1(expected):
            assert await do_receive_some(1) == expected
 
        async def do_aclose(resource):
            with assert_checkpoints():
                await resource.aclose()
 
        # Simple sending/receiving
        async with _core.open_nursery() as nursery:
            nursery.start_soon(do_send_all, b"x")
            nursery.start_soon(checked_receive_1, b"x")
 
        async def send_empty_then_y():
            # Streams should tolerate sending b"" without giving it any
            # special meaning.
            await do_send_all(b"")
            await do_send_all(b"y")
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(send_empty_then_y)
            nursery.start_soon(checked_receive_1, b"y")
 
        # ---- Checking various argument types ----
 
        # send_all accepts bytearray and memoryview
        async with _core.open_nursery() as nursery:
            nursery.start_soon(do_send_all, bytearray(b"1"))
            nursery.start_soon(checked_receive_1, b"1")
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(do_send_all, memoryview(b"2"))
            nursery.start_soon(checked_receive_1, b"2")
 
        # max_bytes must be a positive integer
        with _assert_raises(ValueError):
            await r.receive_some(-1)
        with _assert_raises(ValueError):
            await r.receive_some(0)
        with _assert_raises(TypeError):
            await r.receive_some(1.5)
        # it can also be missing or None
        async with _core.open_nursery() as nursery:
            nursery.start_soon(do_send_all, b"x")
            assert await do_receive_some() == b"x"
        async with _core.open_nursery() as nursery:
            nursery.start_soon(do_send_all, b"x")
            assert await do_receive_some(None) == b"x"
 
        with _assert_raises(_core.BusyResourceError):
            async with _core.open_nursery() as nursery:
                nursery.start_soon(do_receive_some, 1)
                nursery.start_soon(do_receive_some, 1)
 
        # Method always has to exist, and an empty stream with a blocked
        # receive_some should *always* allow send_all. (Technically it's legal
        # for send_all to wait until receive_some is called to run, though; a
        # stream doesn't *have* to have any internal buffering. That's why we
        # start a concurrent receive_some call, then cancel it.)
        async def simple_check_wait_send_all_might_not_block(scope):
            with assert_checkpoints():
                await s.wait_send_all_might_not_block()
            scope.cancel()
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(
                simple_check_wait_send_all_might_not_block, nursery.cancel_scope
            )
            nursery.start_soon(do_receive_some, 1)
 
        # closing the r side leads to BrokenResourceError on the s side
        # (eventually)
        async def expect_broken_stream_on_send():
            with _assert_raises(_core.BrokenResourceError):
                while True:
                    await do_send_all(b"x" * 100)
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(expect_broken_stream_on_send)
            nursery.start_soon(do_aclose, r)
 
        # once detected, the stream stays broken
        with _assert_raises(_core.BrokenResourceError):
            await do_send_all(b"x" * 100)
 
        # r closed -> ClosedResourceError on the receive side
        with _assert_raises(_core.ClosedResourceError):
            await do_receive_some(4096)
 
        # we can close the same stream repeatedly, it's fine
        await do_aclose(r)
        await do_aclose(r)
 
        # closing the sender side
        await do_aclose(s)
 
        # now trying to send raises ClosedResourceError
        with _assert_raises(_core.ClosedResourceError):
            await do_send_all(b"x" * 100)
 
        # even if it's an empty send
        with _assert_raises(_core.ClosedResourceError):
            await do_send_all(b"")
 
        # ditto for wait_send_all_might_not_block
        with _assert_raises(_core.ClosedResourceError):
            with assert_checkpoints():
                await s.wait_send_all_might_not_block()
 
        # and again, repeated closing is fine
        await do_aclose(s)
        await do_aclose(s)
 
    async with _ForceCloseBoth(await stream_maker()) as (s, r):
        # if send-then-graceful-close, receiver gets data then b""
        async def send_then_close():
            await do_send_all(b"y")
            await do_aclose(s)
 
        async def receive_send_then_close():
            # We want to make sure that if the sender closes the stream before
            # we read anything, then we still get all the data. But some
            # streams might block on the do_send_all call. So we let the
            # sender get as far as it can, then we receive.
            await _core.wait_all_tasks_blocked()
            await checked_receive_1(b"y")
            await checked_receive_1(b"")
            await do_aclose(r)
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(send_then_close)
            nursery.start_soon(receive_send_then_close)
 
    async with _ForceCloseBoth(await stream_maker()) as (s, r):
        await aclose_forcefully(r)
 
        with _assert_raises(_core.BrokenResourceError):
            while True:
                await do_send_all(b"x" * 100)
 
        with _assert_raises(_core.ClosedResourceError):
            await do_receive_some(4096)
 
    async with _ForceCloseBoth(await stream_maker()) as (s, r):
        await aclose_forcefully(s)
 
        with _assert_raises(_core.ClosedResourceError):
            await do_send_all(b"123")
 
        # after the sender does a forceful close, the receiver might either
        # get BrokenResourceError or a clean b""; either is OK. Not OK would be
        # if it freezes, or returns data.
        try:
            await checked_receive_1(b"")
        except _core.BrokenResourceError:
            pass
 
    # cancelled aclose still closes
    async with _ForceCloseBoth(await stream_maker()) as (s, r):
        with _core.CancelScope() as scope:
            scope.cancel()
            await r.aclose()
 
        with _core.CancelScope() as scope:
            scope.cancel()
            await s.aclose()
 
        with _assert_raises(_core.ClosedResourceError):
            await do_send_all(b"123")
 
        with _assert_raises(_core.ClosedResourceError):
            await do_receive_some(4096)
 
    # Check that we can still gracefully close a stream after an operation has
    # been cancelled. This can be challenging if cancellation can leave the
    # stream internals in an inconsistent state, e.g. for
    # SSLStream. Unfortunately this test isn't very thorough; the really
    # challenging case for something like SSLStream is it gets cancelled
    # *while* it's sending data on the underlying, not before. But testing
    # that requires some special-case handling of the particular stream setup;
    # we can't do it here. Maybe we could do a bit better with
    #     https://github.com/python-trio/trio/issues/77
    async with _ForceCloseBoth(await stream_maker()) as (s, r):
 
        async def expect_cancelled(afn, *args):
            with _assert_raises(_core.Cancelled):
                await afn(*args)
 
        with _core.CancelScope() as scope:
            scope.cancel()
            async with _core.open_nursery() as nursery:
                nursery.start_soon(expect_cancelled, do_send_all, b"x")
                nursery.start_soon(expect_cancelled, do_receive_some, 1)
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(do_aclose, s)
            nursery.start_soon(do_aclose, r)
 
    # Check that if a task is blocked in receive_some, then closing the
    # receive stream causes it to wake up.
    async with _ForceCloseBoth(await stream_maker()) as (s, r):
 
        async def receive_expecting_closed():
            with _assert_raises(_core.ClosedResourceError):
                await r.receive_some(10)
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(receive_expecting_closed)
            await _core.wait_all_tasks_blocked()
            await aclose_forcefully(r)
 
    # check wait_send_all_might_not_block, if we can
    if clogged_stream_maker is not None:
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):
            record = []
 
            async def waiter(cancel_scope):
                record.append("waiter sleeping")
                with assert_checkpoints():
                    await s.wait_send_all_might_not_block()
                record.append("waiter wokeup")
                cancel_scope.cancel()
 
            async def receiver():
                # give wait_send_all_might_not_block a chance to block
                await _core.wait_all_tasks_blocked()
                record.append("receiver starting")
                while True:
                    await r.receive_some(16834)
 
            async with _core.open_nursery() as nursery:
                nursery.start_soon(waiter, nursery.cancel_scope)
                await _core.wait_all_tasks_blocked()
                nursery.start_soon(receiver)
 
            assert record == [
                "waiter sleeping",
                "receiver starting",
                "waiter wokeup",
            ]
 
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):
            # simultaneous wait_send_all_might_not_block fails
            with _assert_raises(_core.BusyResourceError):
                async with _core.open_nursery() as nursery:
                    nursery.start_soon(s.wait_send_all_might_not_block)
                    nursery.start_soon(s.wait_send_all_might_not_block)
 
            # and simultaneous send_all and wait_send_all_might_not_block (NB
            # this test might destroy the stream b/c we end up cancelling
            # send_all and e.g. SSLStream can't handle that, so we have to
            # recreate afterwards)
            with _assert_raises(_core.BusyResourceError):
                async with _core.open_nursery() as nursery:
                    nursery.start_soon(s.wait_send_all_might_not_block)
                    nursery.start_soon(s.send_all, b"123")
 
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):
            # send_all and send_all blocked simultaneously should also raise
            # (but again this might destroy the stream)
            with _assert_raises(_core.BusyResourceError):
                async with _core.open_nursery() as nursery:
                    nursery.start_soon(s.send_all, b"123")
                    nursery.start_soon(s.send_all, b"123")
 
        # closing the receiver causes wait_send_all_might_not_block to return,
        # with or without an exception
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):
 
            async def sender():
                try:
                    with assert_checkpoints():
                        await s.wait_send_all_might_not_block()
                except _core.BrokenResourceError:  # pragma: no cover
                    pass
 
            async def receiver():
                await _core.wait_all_tasks_blocked()
                await aclose_forcefully(r)
 
            async with _core.open_nursery() as nursery:
                nursery.start_soon(sender)
                nursery.start_soon(receiver)
 
        # and again with the call starting after the close
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):
            await aclose_forcefully(r)
            try:
                with assert_checkpoints():
                    await s.wait_send_all_might_not_block()
            except _core.BrokenResourceError:  # pragma: no cover
                pass
 
        # Check that if a task is blocked in a send-side method, then closing
        # the send stream causes it to wake up.
        async def close_soon(s):
            await _core.wait_all_tasks_blocked()
            await aclose_forcefully(s)
 
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):
            async with _core.open_nursery() as nursery:
                nursery.start_soon(close_soon, s)
                with _assert_raises(_core.ClosedResourceError):
                    await s.send_all(b"xyzzy")
 
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):
            async with _core.open_nursery() as nursery:
                nursery.start_soon(close_soon, s)
                with _assert_raises(_core.ClosedResourceError):
                    await s.wait_send_all_might_not_block()
 
 
async def check_two_way_stream(stream_maker, clogged_stream_maker):
    """Perform a number of generic tests on a custom two-way stream
    implementation.
 
    This is similar to :func:`check_one_way_stream`, except that the maker
    functions are expected to return objects implementing the
    :class:`~trio.abc.Stream` interface.
 
    This function tests a *superset* of what :func:`check_one_way_stream`
    checks – if you call this, then you don't need to also call
    :func:`check_one_way_stream`.
 
    """
    await check_one_way_stream(stream_maker, clogged_stream_maker)
 
    async def flipped_stream_maker():
        return reversed(await stream_maker())
 
    if clogged_stream_maker is not None:
 
        async def flipped_clogged_stream_maker():
            return reversed(await clogged_stream_maker())
 
    else:
        flipped_clogged_stream_maker = None
    await check_one_way_stream(flipped_stream_maker, flipped_clogged_stream_maker)
 
    async with _ForceCloseBoth(await stream_maker()) as (s1, s2):
        assert isinstance(s1, Stream)
        assert isinstance(s2, Stream)
 
        # Duplex can be a bit tricky, might as well check it as well
        DUPLEX_TEST_SIZE = 2**20
        CHUNK_SIZE_MAX = 2**14
 
        r = random.Random(0)
        i = r.getrandbits(8 * DUPLEX_TEST_SIZE)
        test_data = i.to_bytes(DUPLEX_TEST_SIZE, "little")
 
        async def sender(s, data, seed):
            r = random.Random(seed)
            m = memoryview(data)
            while m:
                chunk_size = r.randint(1, CHUNK_SIZE_MAX)
                await s.send_all(m[:chunk_size])
                m = m[chunk_size:]
 
        async def receiver(s, data, seed):
            r = random.Random(seed)
            got = bytearray()
            while len(got) < len(data):
                chunk = await s.receive_some(r.randint(1, CHUNK_SIZE_MAX))
                assert chunk
                got += chunk
            assert got == data
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(sender, s1, test_data, 0)
            nursery.start_soon(sender, s2, test_data[::-1], 1)
            nursery.start_soon(receiver, s1, test_data[::-1], 2)
            nursery.start_soon(receiver, s2, test_data, 3)
 
        async def expect_receive_some_empty():
            assert await s2.receive_some(10) == b""
            await s2.aclose()
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(expect_receive_some_empty)
            nursery.start_soon(s1.aclose)
 
 
async def check_half_closeable_stream(stream_maker, clogged_stream_maker):
    """Perform a number of generic tests on a custom half-closeable stream
    implementation.
 
    This is similar to :func:`check_two_way_stream`, except that the maker
    functions are expected to return objects that implement the
    :class:`~trio.abc.HalfCloseableStream` interface.
 
    This function tests a *superset* of what :func:`check_two_way_stream`
    checks – if you call this, then you don't need to also call
    :func:`check_two_way_stream`.
 
    """
    await check_two_way_stream(stream_maker, clogged_stream_maker)
 
    async with _ForceCloseBoth(await stream_maker()) as (s1, s2):
        assert isinstance(s1, HalfCloseableStream)
        assert isinstance(s2, HalfCloseableStream)
 
        async def send_x_then_eof(s):
            await s.send_all(b"x")
            with assert_checkpoints():
                await s.send_eof()
 
        async def expect_x_then_eof(r):
            await _core.wait_all_tasks_blocked()
            assert await r.receive_some(10) == b"x"
            assert await r.receive_some(10) == b""
 
        async with _core.open_nursery() as nursery:
            nursery.start_soon(send_x_then_eof, s1)
            nursery.start_soon(expect_x_then_eof, s2)
 
        # now sending is disallowed
        with _assert_raises(_core.ClosedResourceError):
            await s1.send_all(b"y")
 
        # but we can do send_eof again
        with assert_checkpoints():
            await s1.send_eof()
 
        # and we can still send stuff back the other way
        async with _core.open_nursery() as nursery:
            nursery.start_soon(send_x_then_eof, s2)
            nursery.start_soon(expect_x_then_eof, s1)
 
    if clogged_stream_maker is not None:
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s1, s2):
            # send_all and send_eof simultaneously is not ok
            with _assert_raises(_core.BusyResourceError):
                async with _core.open_nursery() as nursery:
                    nursery.start_soon(s1.send_all, b"x")
                    await _core.wait_all_tasks_blocked()
                    nursery.start_soon(s1.send_eof)
 
        async with _ForceCloseBoth(await clogged_stream_maker()) as (s1, s2):
            # wait_send_all_might_not_block and send_eof simultaneously is not
            # ok either
            with _assert_raises(_core.BusyResourceError):
                async with _core.open_nursery() as nursery:
                    nursery.start_soon(s1.wait_send_all_might_not_block)
                    await _core.wait_all_tasks_blocked()
                    nursery.start_soon(s1.send_eof)