zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
U
Z±dfSã@sÞddlZddlmZddlmZddlmZddlmZmZGdd„dƒZ    Gd    d
„d
eej
d Z Gd d „d eej
d Z ddœdd„Z dd„Zdd„Zdd„ZGdd„dƒZGdd„deƒZGdd„deƒZdd„Zdd „ZdS)!éNé)Ú_core©Ú StapledStream)Ú_util)Ú
SendStreamÚ ReceiveStreamc@sPeZdZdd„Zdd„Zdd„Zdd„Zd    d
„Zd d „Zddd„Z    ddd„Z
d S)Ú_UnboundedByteQueuecCs(tƒ|_d|_t ¡|_t d¡|_dS)NFz%another task is already fetching data)    Ú    bytearrayÚ_dataÚ_closedrÚ
ParkingLotÚ_lotrÚConflictDetectorÚ _fetch_lock©Úself©rúSd:\z\workplace\vscode\pyvenv\venv\Lib\site-packages\trio/testing/_memory_streams.pyÚ__init__s 
ÿz_UnboundedByteQueue.__init__cCsd|_|j ¡dS©NT)r rÚ
unpark_allrrrrÚclosesz_UnboundedByteQueue.closecCstƒ|_| ¡dS©N)r
r rrrrrÚclose_and_wipesz"_UnboundedByteQueue.close_and_wipecCs,|jrt d¡‚|j|7_|j ¡dS)Nzvirtual connection closed)r rÚClosedResourceErrorr rr©rÚdatarrrÚput"s
z_UnboundedByteQueue.putcCs*|dkr dSt |¡}|dkr&tdƒ‚dS)Néúmax_bytes must be >= 1)ÚoperatorÚindexÚ
ValueError©rÚ    max_bytesrrrÚ_check_max_bytes(s
 
z$_UnboundedByteQueue._check_max_bytescCsX|js|jst‚|dkr"t|jƒ}|jrN|jd|…}|jd|…=|sJt‚|StƒSdSr)r r ÚAssertionErrorÚlenr
)rr%ÚchunkrrrÚ    _get_impl/s
 z_UnboundedByteQueue._get_implNc
CsD|j4| |¡|js$|js$tj‚| |¡W5QR£SQRXdSr)rr&r r rÚ
WouldBlockr*r$rrrÚ
get_nowait;s
 
 z_UnboundedByteQueue.get_nowaitc
Ãs^|jN| |¡|js0|js0|j ¡IdHnt ¡IdH| |¡W5QR£SQRXdSr)    rr&r r rÚparkrÚ
checkpointr*r$rrrÚgetBs 
 z_UnboundedByteQueue.get)N)N) Ú__name__Ú
__module__Ú __qualname__rrrrr&r*r,r/rrrrr     s 
r    c@sNeZdZdZddd„Zdd„Zdd„Zd    d
„Zd d „Zdd d„Z    ddd„Z
dS)ÚMemorySendStreamaÆAn in-memory :class:`~trio.abc.SendStream`.
 
    Args:
      send_all_hook: An async function, or None. Called from
          :meth:`send_all`. Can do whatever you like.
      wait_send_all_might_not_block_hook: An async function, or None. Called
          from :meth:`wait_send_all_might_not_block`. Can do whatever you
          like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.
 
    .. attribute:: send_all_hook
                   wait_send_all_might_not_block_hook
                   close_hook
 
       All of these hooks are also exposed as attributes on the object, and
       you can change them at any time.
 
    NcCs*t d¡|_tƒ|_||_||_||_dS)Nú!another task is using this stream)rrÚ_conflict_detectorr    Ú    _outgoingÚ send_all_hookÚ"wait_send_all_might_not_block_hookÚ
close_hook)rr7r8r9rrrrasÿzMemorySendStream.__init__c    ÃsV|jFt ¡IdHt ¡IdH|j |¡|jdk    rH| ¡IdHW5QRXdS)z}Places the given data into the object's internal buffer, and then
        calls the :attr:`send_all_hook` (if any).
 
        N)r5rr.r6rr7rrrrÚsend_allos  
zMemorySendStream.send_allc    ÃsV|jFt ¡IdHt ¡IdH|j d¡|jdk    rH| ¡IdHW5QRXdS)znCalls the :attr:`wait_send_all_might_not_block_hook` (if any), and
        then returns immediately.
 
        Nó)r5rr.r6rr8rrrrÚwait_send_all_might_not_block}s  
z.MemorySendStream.wait_send_all_might_not_blockcCs |j ¡|jdk    r| ¡dS)z^Marks this stream as closed, and then calls the :attr:`close_hook`
        (if any).
 
        N)r6rr9rrrrrŒs
 
zMemorySendStream.closecÃs| ¡t ¡IdHdS©z!Same as :meth:`close`, but async.N©rrr.rrrrÚacloseszMemorySendStream.aclosecÃs|j |¡IdHS)aÀRetrieves data from the internal buffer, blocking if necessary.
 
        Args:
          max_bytes (int or None): The maximum amount of data to
              retrieve. None (the default) means to retrieve all the data
              that's present (but still blocks until at least one byte is
              available).
 
        Returns:
          If this stream has been closed, an empty bytearray. Otherwise, the
          requested data.
 
        N)r6r/r$rrrÚget_data¢szMemorySendStream.get_datacCs |j |¡S)zÁRetrieves data from the internal buffer, but doesn't block.
 
        See :meth:`get_data` for details.
 
        Raises:
          trio.WouldBlock: if no data is available to retrieve.
 
        )r6r,r$rrrÚget_data_nowait²s    z MemorySendStream.get_data_nowait)NNN)N)N) r0r1r2Ú__doc__rr:r<rr?r@rArrrrr3Lsü
 
r3)Ú    metaclassc@sDeZdZdZddd„Zddd„Zdd„Zd    d
„Zd d „Zd d„Z    dS)ÚMemoryReceiveStreamaðAn in-memory :class:`~trio.abc.ReceiveStream`.
 
    Args:
      receive_some_hook: An async function, or None. Called from
          :meth:`receive_some`. Can do whatever you like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.
 
    .. attribute:: receive_some_hook
                   close_hook
 
       Both hooks are also exposed as attributes on the object, and you can
       change them at any time.
 
    NcCs*t d¡|_tƒ|_d|_||_||_dS)Nr4F)rrr5r    Ú    _incomingr Úreceive_some_hookr9)rrFr9rrrrÏsÿzMemoryReceiveStream.__init__c
Ãs€|jpt ¡IdHt ¡IdH|jr0tj‚|jdk    rH| ¡IdH|j |¡IdH}|jrftj‚|W5QR£SQRXdS)zˆCalls the :attr:`receive_some_hook` (if any), and then retrieves
        data from the internal buffer, blocking if necessary.
 
        N)r5rr.r rrFrEr/)rr%rrrrÚ receive_someØs
z MemoryReceiveStream.receive_somecCs&d|_|j ¡|jdk    r"| ¡dS)zfDiscards any pending data from the internal buffer, and marks this
        stream as closed.
 
        TN)r rErr9rrrrrïs
 
zMemoryReceiveStream.closecÃs| ¡t ¡IdHdSr=r>rrrrr?ùszMemoryReceiveStream.aclosecCs|j |¡dS)z.Appends the given data to the internal buffer.N)rErrrrrÚput_dataþszMemoryReceiveStream.put_datacCs|j ¡dS)z2Adds an end-of-file marker to the internal buffer.N)rErrrrrÚput_eofszMemoryReceiveStream.put_eof)NN)N)
r0r1r2rBrrGrr?rHrIrrrrrD¾s
    
 
rD)r%cCsjz| |¡}Wntjk
r&YdSXz|s8| ¡n
| |¡Wn tjk
rdt d¡‚YnXdS)aðTake data out of the given :class:`MemorySendStream`'s internal buffer,
    and put it into the given :class:`MemoryReceiveStream`'s internal buffer.
 
    Args:
      memory_send_stream (MemorySendStream): The stream to get data from.
      memory_receive_stream (MemoryReceiveStream): The stream to put data into.
      max_bytes (int or None): The maximum amount of data to transfer in this
          call, or None to transfer all available data.
 
    Returns:
      True if it successfully transferred some data, or False if there was no
      data to transfer.
 
    This is used to implement :func:`memory_stream_one_way_pair` and
    :func:`memory_stream_pair`; see the latter's docstring for an example
    of how you might use it yourself.
 
    FzMemoryReceiveStream was closedT)rArr+rIrHrÚBrokenResourceError)Zmemory_send_streamZmemory_receive_streamr%rrrrÚmemory_stream_pumps
rKcs:tƒ‰tƒ‰‡‡fdd„‰‡fdd„}|ˆ_ˆˆ_ˆˆfS)uQCreate a connected, pure-Python, unidirectional stream with infinite
    buffering and flexible configuration options.
 
    You can think of this as being a no-operating-system-involved
    Trio-streamsified version of :func:`os.pipe` (except that :func:`os.pipe`
    returns the streams in the wrong order â€“ we follow the superior convention
    that data flows from left to right).
 
    Returns:
      A tuple (:class:`MemorySendStream`, :class:`MemoryReceiveStream`), where
      the :class:`MemorySendStream` has its hooks set up so that it calls
      :func:`memory_stream_pump` from its
      :attr:`~MemorySendStream.send_all_hook` and
      :attr:`~MemorySendStream.close_hook`.
 
    The end result is that data automatically flows from the
    :class:`MemorySendStream` to the :class:`MemoryReceiveStream`. But you're
    also free to rearrange things however you like. For example, you can
    temporarily set the :attr:`~MemorySendStream.send_all_hook` to None if you
    want to simulate a stall in data transmission. Or see
    :func:`memory_stream_pair` for a more elaborate example.
 
    cstˆˆƒdSr)rKr)Ú recv_streamÚ send_streamrrÚ$pump_from_send_stream_to_recv_streamCszHmemory_stream_one_way_pair.<locals>.pump_from_send_stream_to_recv_streamc“s
ˆƒdSrrr)rNrrÚ*async_pump_from_send_stream_to_recv_streamFszNmemory_stream_one_way_pair.<locals>.async_pump_from_send_stream_to_recv_stream)r3rDr7r9)rOr)rNrLrMrÚmemory_stream_one_way_pair(s rPcCs0|ƒ\}}|ƒ\}}t||ƒ}t||ƒ}||fSrr)Z one_way_pairZ
pipe1_sendZ
pipe1_recvZ
pipe2_sendZ
pipe2_recvZstream1Zstream2rrrÚ_make_stapled_pairNs
 
 
 
 
rQcCsttƒS)a· Create a connected, pure-Python, bidirectional stream with infinite
    buffering and flexible configuration options.
 
    This is a convenience function that creates two one-way streams using
    :func:`memory_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.
 
    This is like a no-operating-system-involved, Trio-streamsified version of
    :func:`socket.socketpair`.
 
    Returns:
      A pair of :class:`~trio.StapledStream` objects that are connected so
      that data automatically flows from one to the other in both directions.
 
    After creating a stream pair, you can send data back and forth, which is
    enough for simple tests::
 
       left, right = memory_stream_pair()
       await left.send_all(b"123")
       assert await right.receive_some() == b"123"
       await right.send_all(b"456")
       assert await left.receive_some() == b"456"
 
    But if you read the docs for :class:`~trio.StapledStream` and
    :func:`memory_stream_one_way_pair`, you'll see that all the pieces
    involved in wiring this up are public APIs, so you can adjust to suit the
    requirements of your tests. For example, here's how to tweak a stream so
    that data flowing from left to right trickles in one byte at a time (but
    data flowing from right to left proceeds at full speed)::
 
        left, right = memory_stream_pair()
        async def trickle():
            # left is a StapledStream, and left.send_stream is a MemorySendStream
            # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream
            while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1):
                # Pause between each byte
                await trio.sleep(1)
        # Normally this send_all_hook calls memory_stream_pump directly without
        # passing in a max_bytes. We replace it with our custom version:
        left.send_stream.send_all_hook = trickle
 
    And here's a simple test using our modified stream objects::
 
        async def sender():
            await left.send_all(b"12345")
            await left.send_eof()
 
        async def receiver():
            async for data in right:
                print(data)
 
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sender)
            nursery.start_soon(receiver)
 
    By default, this will print ``b"12345"`` and then immediately exit; with
    our trickle stream it instead sleeps 1 second, then prints ``b"1"``, then
    sleeps 1 second, then prints ``b"2"``, etc.
 
    Pro-tip: you can insert sleep calls (like in our example above) to
    manipulate the flow of data across tasks... and then use
    :class:`MockClock` and its :attr:`~MockClock.autojump_threshold`
    functionality to keep your test suite running quickly.
 
    If you want to stress test a protocol implementation, one nice trick is to
    use the :mod:`random` module (preferably with a fixed seed) to move random
    numbers of bytes at a time, and insert random sleeps in between them. You
    can also set up a custom :attr:`~MemoryReceiveStream.receive_some_hook` if
    you want to manipulate things on the receiving side, and not just the
    sending side.
 
    )rQrPrrrrÚmemory_stream_pairVsJrRc@sNeZdZdd„Zdd„Zdd„Zdd„Zd    d
„Zd d „Zd d„Z    ddd„Z
dS)Ú_LockstepByteQueuecCs@tƒ|_d|_d|_d|_t ¡|_t     d¡|_
t     d¡|_ dS)NFzanother task is already sendingz!another task is already receiving) r
r Ú_sender_closedÚ_receiver_closedÚ_receiver_waitingrr Ú_waitersrrÚ_send_conflict_detectorÚ_receive_conflict_detectorrrrrr©s
ÿÿz_LockstepByteQueue.__init__cCs|j ¡dSr)rWrrrrrÚ_something_happened¶sz&_LockstepByteQueue._something_happenedcÃs:|ƒrq(|js(|jrq(|j ¡IdHqt ¡IdHdSr)rTrUrWr-rr.)rÚfnrrrÚ    _wait_for»s  z_LockstepByteQueue._wait_forcCsd|_| ¡dSr)rTrZrrrrÚ close_senderÄsz_LockstepByteQueue.close_sendercCsd|_| ¡dSr)rUrZrrrrÚclose_receiverÈsz!_LockstepByteQueue.close_receiverc    ƒs„ˆjtˆjrtj‚ˆjr tj‚ˆjr*t‚ˆj|7_ˆ ¡ˆ     ‡fdd„¡IdHˆjrdtj‚ˆjrvˆjrvtj‚W5QRXdS)Ncsˆj Sr©r rrrrÚ<lambda>Õr;z-_LockstepByteQueue.send_all.<locals>.<lambda>)
rXrTrrrUrJr r'rZr\rrrrr:Ìs
 z_LockstepByteQueue.send_allc    ƒshˆjXˆjrtj‚ˆjr6t ¡IdHW5QR£dSˆ ‡fdd„¡IdHˆjrZtj‚W5QRXdS)NcsˆjSr)rVrrrrr`âr;zB_LockstepByteQueue.wait_send_all_might_not_block.<locals>.<lambda>)rXrTrrrUr.r\rrrrr<Ûsz0_LockstepByteQueue.wait_send_all_might_not_blockNc
ƒsԈjÄ|dk    r*t |¡}|dkr*tdƒ‚ˆjr6tj‚dˆ_ˆ ¡zˆ     ‡fdd„¡IdHW5dˆ_Xˆjrvtj‚ˆj
r®ˆj
d|…}ˆj
d|…=ˆ ¡|W5QR£Sˆj s¸t ‚W5QR£dSW5QRXdS)Nrr TFcsˆjSrr_rrrrr`ôr;z1_LockstepByteQueue.receive_some.<locals>.<lambda>r;) rYr!r"r#rUrrrVrZr\r rTr')rr%ÚgotrrrrGæs*
 
z_LockstepByteQueue.receive_some)N) r0r1r2rrZr\r]r^r:r<rGrrrrrS¨s      rSc@s4eZdZdd„Zdd„Zdd„Zdd„Zd    d
„Zd S) Ú_LockstepSendStreamcCs
||_dSr©Ú_lbq©rÚlbqrrrrsz_LockstepSendStream.__init__cCs|j ¡dSr)rdr]rrrrr
sz_LockstepSendStream.closecÃs| ¡t ¡IdHdSrr>rrrrr? sz_LockstepSendStream.aclosecÃs|j |¡IdHdSr)rdr:rrrrr:sz_LockstepSendStream.send_allcÃs|j ¡IdHdSr)rdr<rrrrr<sz1_LockstepSendStream.wait_send_all_might_not_blockN)r0r1r2rrr?r:r<rrrrrbs
rbc@s.eZdZdd„Zdd„Zdd„Zd
dd    „ZdS) Ú_LockstepReceiveStreamcCs
||_dSrrcrerrrrsz_LockstepReceiveStream.__init__cCs|j ¡dSr)rdr^rrrrrsz_LockstepReceiveStream.closecÃs| ¡t ¡IdHdSrr>rrrrr?sz_LockstepReceiveStream.acloseNcÃs|j |¡IdHSr)rdrGr$rrrrG#sz#_LockstepReceiveStream.receive_some)N)r0r1r2rrr?rGrrrrrgsrgcCstƒ}t|ƒt|ƒfS)a Create a connected, pure Python, unidirectional stream where data flows
    in lockstep.
 
    Returns:
      A tuple
      (:class:`~trio.abc.SendStream`, :class:`~trio.abc.ReceiveStream`).
 
    This stream has *absolutely no* buffering. Each call to
    :meth:`~trio.abc.SendStream.send_all` will block until all the given data
    has been returned by a call to
    :meth:`~trio.abc.ReceiveStream.receive_some`.
 
    This can be useful for testing flow control mechanisms in an extreme case,
    or for setting up "clogged" streams to use with
    :func:`check_one_way_stream` and friends.
 
    In addition to fulfilling the :class:`~trio.abc.SendStream` and
    :class:`~trio.abc.ReceiveStream` interfaces, the return objects
    also have a synchronous ``close`` method.
 
    )rSrbrg)rfrrrÚlockstep_stream_one_way_pair'srhcCsttƒS)a“Create a connected, pure-Python, bidirectional stream where data flows
    in lockstep.
 
    Returns:
      A tuple (:class:`~trio.StapledStream`, :class:`~trio.StapledStream`).
 
    This is a convenience function that creates two one-way streams using
    :func:`lockstep_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.
 
    )rQrhrrrrÚlockstep_stream_pairBs ri)r!ÚrZ_highlevel_genericrrÚabcrrr    ÚFinalr3rDrKrPrQrRrSrbrgrhrirrrrÚ<module>s    ?rI!&R^