zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import attr
 
from .. import _core
from .._deprecate import deprecated
from .._util import Final
 
 
@attr.s(frozen=True)
class _UnboundedQueueStats:
    qsize = attr.ib()
    tasks_waiting = attr.ib()
 
 
class UnboundedQueue(metaclass=Final):
    """An unbounded queue suitable for certain unusual forms of inter-task
    communication.
 
    This class is designed for use as a queue in cases where the producer for
    some reason cannot be subjected to back-pressure, i.e., :meth:`put_nowait`
    has to always succeed. In order to prevent the queue backlog from actually
    growing without bound, the consumer API is modified to dequeue items in
    "batches". If a consumer task processes each batch without yielding, then
    this helps achieve (but does not guarantee) an effective bound on the
    queue's memory use, at the cost of potentially increasing system latencies
    in general. You should generally prefer to use a memory channel
    instead if you can.
 
    Currently each batch completely empties the queue, but `this may change in
    the future <https://github.com/python-trio/trio/issues/51>`__.
 
    A :class:`UnboundedQueue` object can be used as an asynchronous iterator,
    where each iteration returns a new batch of items. I.e., these two loops
    are equivalent::
 
       async for batch in queue:
           ...
 
       while True:
           obj = await queue.get_batch()
           ...
 
    """
 
    @deprecated(
        "0.9.0",
        issue=497,
        thing="trio.lowlevel.UnboundedQueue",
        instead="trio.open_memory_channel(math.inf)",
    )
    def __init__(self):
        self._lot = _core.ParkingLot()
        self._data = []
        # used to allow handoff from put to the first task in the lot
        self._can_get = False
 
    def __repr__(self):
        return "<UnboundedQueue holding {} items>".format(len(self._data))
 
    def qsize(self):
        """Returns the number of items currently in the queue."""
        return len(self._data)
 
    def empty(self):
        """Returns True if the queue is empty, False otherwise.
 
        There is some subtlety to interpreting this method's return value: see
        `issue #63 <https://github.com/python-trio/trio/issues/63>`__.
 
        """
        return not self._data
 
    @_core.enable_ki_protection
    def put_nowait(self, obj):
        """Put an object into the queue, without blocking.
 
        This always succeeds, because the queue is unbounded. We don't provide
        a blocking ``put`` method, because it would never need to block.
 
        Args:
          obj (object): The object to enqueue.
 
        """
        if not self._data:
            assert not self._can_get
            if self._lot:
                self._lot.unpark(count=1)
            else:
                self._can_get = True
        self._data.append(obj)
 
    def _get_batch_protected(self):
        data = self._data.copy()
        self._data.clear()
        self._can_get = False
        return data
 
    def get_batch_nowait(self):
        """Attempt to get the next batch from the queue, without blocking.
 
        Returns:
          list: A list of dequeued items, in order. On a successful call this
              list is always non-empty; if it would be empty we raise
              :exc:`~trio.WouldBlock` instead.
 
        Raises:
          ~trio.WouldBlock: if the queue is empty.
 
        """
        if not self._can_get:
            raise _core.WouldBlock
        return self._get_batch_protected()
 
    async def get_batch(self):
        """Get the next batch from the queue, blocking as necessary.
 
        Returns:
          list: A list of dequeued items, in order. This list is always
              non-empty.
 
        """
        await _core.checkpoint_if_cancelled()
        if not self._can_get:
            await self._lot.park()
            return self._get_batch_protected()
        else:
            try:
                return self._get_batch_protected()
            finally:
                await _core.cancel_shielded_checkpoint()
 
    def statistics(self):
        """Return an object containing debugging information.
 
        Currently the following fields are defined:
 
        * ``qsize``: The number of items currently in the queue.
        * ``tasks_waiting``: The number of tasks blocked on this queue's
          :meth:`get_batch` method.
 
        """
        return _UnboundedQueueStats(
            qsize=len(self._data), tasks_waiting=self._lot.statistics().tasks_waiting
        )
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        return await self.get_batch()