import attr from .. import _core from .._deprecate import deprecated from .._util import Final @attr.s(frozen=True) class _UnboundedQueueStats: qsize = attr.ib() tasks_waiting = attr.ib() class UnboundedQueue(metaclass=Final): """An unbounded queue suitable for certain unusual forms of inter-task communication. This class is designed for use as a queue in cases where the producer for some reason cannot be subjected to back-pressure, i.e., :meth:`put_nowait` has to always succeed. In order to prevent the queue backlog from actually growing without bound, the consumer API is modified to dequeue items in "batches". If a consumer task processes each batch without yielding, then this helps achieve (but does not guarantee) an effective bound on the queue's memory use, at the cost of potentially increasing system latencies in general. You should generally prefer to use a memory channel instead if you can. Currently each batch completely empties the queue, but `this may change in the future `__. A :class:`UnboundedQueue` object can be used as an asynchronous iterator, where each iteration returns a new batch of items. I.e., these two loops are equivalent:: async for batch in queue: ... while True: obj = await queue.get_batch() ... """ @deprecated( "0.9.0", issue=497, thing="trio.lowlevel.UnboundedQueue", instead="trio.open_memory_channel(math.inf)", ) def __init__(self): self._lot = _core.ParkingLot() self._data = [] # used to allow handoff from put to the first task in the lot self._can_get = False def __repr__(self): return "".format(len(self._data)) def qsize(self): """Returns the number of items currently in the queue.""" return len(self._data) def empty(self): """Returns True if the queue is empty, False otherwise. There is some subtlety to interpreting this method's return value: see `issue #63 `__. """ return not self._data @_core.enable_ki_protection def put_nowait(self, obj): """Put an object into the queue, without blocking. This always succeeds, because the queue is unbounded. We don't provide a blocking ``put`` method, because it would never need to block. Args: obj (object): The object to enqueue. """ if not self._data: assert not self._can_get if self._lot: self._lot.unpark(count=1) else: self._can_get = True self._data.append(obj) def _get_batch_protected(self): data = self._data.copy() self._data.clear() self._can_get = False return data def get_batch_nowait(self): """Attempt to get the next batch from the queue, without blocking. Returns: list: A list of dequeued items, in order. On a successful call this list is always non-empty; if it would be empty we raise :exc:`~trio.WouldBlock` instead. Raises: ~trio.WouldBlock: if the queue is empty. """ if not self._can_get: raise _core.WouldBlock return self._get_batch_protected() async def get_batch(self): """Get the next batch from the queue, blocking as necessary. Returns: list: A list of dequeued items, in order. This list is always non-empty. """ await _core.checkpoint_if_cancelled() if not self._can_get: await self._lot.park() return self._get_batch_protected() else: try: return self._get_batch_protected() finally: await _core.cancel_shielded_checkpoint() def statistics(self): """Return an object containing debugging information. Currently the following fields are defined: * ``qsize``: The number of items currently in the queue. * ``tasks_waiting``: The number of tasks blocked on this queue's :meth:`get_batch` method. """ return _UnboundedQueueStats( qsize=len(self._data), tasks_waiting=self._lot.statistics().tasks_waiting ) def __aiter__(self): return self async def __anext__(self): return await self.get_batch()