# ParkingLot provides an abstraction for a fair waitqueue with cancellation # and requeueing support. Inspiration: # # https://webkit.org/blog/6161/locking-in-webkit/ # https://amanieu.github.io/parking_lot/ # # which were in turn heavily influenced by # # http://gee.cs.oswego.edu/dl/papers/aqs.pdf # # Compared to these, our use of cooperative scheduling allows some # simplifications (no need for internal locking). On the other hand, the need # to support Trio's strong cancellation semantics adds some complications # (tasks need to know where they're queued so they can cancel). Also, in the # above work, the ParkingLot is a global structure that holds a collection of # waitqueues keyed by lock address, and which are opportunistically allocated # and destroyed as contention arises; this allows the worst-case memory usage # for all waitqueues to be O(#tasks). Here we allocate a separate wait queue # for each synchronization object, so we're O(#objects + #tasks). This isn't # *so* bad since compared to our synchronization objects are heavier than # theirs and our tasks are lighter, so for us #objects is smaller and #tasks # is larger. # # This is in the core because for two reasons. First, it's used by # UnboundedQueue, and UnboundedQueue is used for a number of things in the # core. And second, it's responsible for providing fairness to all of our # high-level synchronization primitives (locks, queues, etc.). For now with # our FIFO scheduler this is relatively trivial (it's just a FIFO waitqueue), # but in the future we ever start support task priorities or fair scheduling # # https://github.com/python-trio/trio/issues/32 # # then all we'll have to do is update this. (Well, full-fledged task # priorities might also require priority inheritance, which would require more # work.) # # For discussion of data structures to use here, see: # # https://github.com/dabeaz/curio/issues/136 # # (and also the articles above). Currently we use a SortedDict ordered by a # global monotonic counter that ensures FIFO ordering. The main advantage of # this is that it's easy to implement :-). An intrusive doubly-linked list # would also be a natural approach, so long as we only handle FIFO ordering. # # XX: should we switch to the shared global ParkingLot approach? # # XX: we should probably add support for "parking tokens" to allow for # task-fair RWlock (basically: when parking a task needs to be able to mark # itself as a reader or a writer, and then a task-fair wakeup policy is, wake # the next task, and if it's a reader than keep waking tasks so long as they # are readers). Without this I think you can implement write-biased or # read-biased RWlocks (by using two parking lots and drawing from whichever is # preferred), but not task-fair -- and task-fair plays much more nicely with # WFQ. (Consider what happens in the two-lot implementation if you're # write-biased but all the pending writers are blocked at the scheduler level # by the WFQ logic...) # ...alternatively, "phase-fair" RWlocks are pretty interesting: # http://www.cs.unc.edu/~anderson/papers/ecrts09b.pdf # Useful summary: # https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html # # XX: if we do add WFQ, then we might have to drop the current feature where # unpark returns the tasks that were unparked. Rationale: suppose that at the # time we call unpark, the next task is deprioritized... and then, before it # becomes runnable, a new task parks which *is* runnable. Ideally we should # immediately wake the new task, and leave the old task on the queue for # later. But this means we can't commit to which task we are unparking when # unpark is called. # # See: https://github.com/python-trio/trio/issues/53 import attr from collections import OrderedDict from .. import _core from .._util import Final @attr.s(frozen=True, slots=True) class _ParkingLotStatistics: tasks_waiting = attr.ib() @attr.s(eq=False, hash=False, slots=True) class ParkingLot(metaclass=Final): """A fair wait queue with cancellation and requeueing. This class encapsulates the tricky parts of implementing a wait queue. It's useful for implementing higher-level synchronization primitives like queues and locks. In addition to the methods below, you can use ``len(parking_lot)`` to get the number of parked tasks, and ``if parking_lot: ...`` to check whether there are any parked tasks. """ # {task: None}, we just want a deque where we can quickly delete random # items _parked = attr.ib(factory=OrderedDict, init=False) def __len__(self): """Returns the number of parked tasks.""" return len(self._parked) def __bool__(self): """True if there are parked tasks, False otherwise.""" return bool(self._parked) # XX this currently returns None # if we ever add the ability to repark while one's resuming place in # line (for false wakeups), then we could have it return a ticket that # abstracts the "place in line" concept. @_core.enable_ki_protection async def park(self): """Park the current task until woken by a call to :meth:`unpark` or :meth:`unpark_all`. """ task = _core.current_task() self._parked[task] = None task.custom_sleep_data = self def abort_fn(_): del task.custom_sleep_data._parked[task] return _core.Abort.SUCCEEDED await _core.wait_task_rescheduled(abort_fn) def _pop_several(self, count): for _ in range(min(count, len(self._parked))): task, _ = self._parked.popitem(last=False) yield task @_core.enable_ki_protection def unpark(self, *, count=1): """Unpark one or more tasks. This wakes up ``count`` tasks that are blocked in :meth:`park`. If there are fewer than ``count`` tasks parked, then wakes as many tasks are available and then returns successfully. Args: count (int): the number of tasks to unpark. """ tasks = list(self._pop_several(count)) for task in tasks: _core.reschedule(task) return tasks def unpark_all(self): """Unpark all parked tasks.""" return self.unpark(count=len(self)) @_core.enable_ki_protection def repark(self, new_lot, *, count=1): """Move parked tasks from one :class:`ParkingLot` object to another. This dequeues ``count`` tasks from one lot, and requeues them on another, preserving order. For example:: async def parker(lot): print("sleeping") await lot.park() print("woken") async def main(): lot1 = trio.lowlevel.ParkingLot() lot2 = trio.lowlevel.ParkingLot() async with trio.open_nursery() as nursery: nursery.start_soon(parker, lot1) await trio.testing.wait_all_tasks_blocked() assert len(lot1) == 1 assert len(lot2) == 0 lot1.repark(lot2) assert len(lot1) == 0 assert len(lot2) == 1 # This wakes up the task that was originally parked in lot1 lot2.unpark() If there are fewer than ``count`` tasks parked, then reparks as many tasks as are available and then returns successfully. Args: new_lot (ParkingLot): the parking lot to move tasks to. count (int): the number of tasks to move. """ if not isinstance(new_lot, ParkingLot): raise TypeError("new_lot must be a ParkingLot") for task in self._pop_several(count): new_lot._parked[task] = None task.custom_sleep_data = new_lot def repark_all(self, new_lot): """Move all parked tasks from one :class:`ParkingLot` object to another. See :meth:`repark` for details. """ return self.repark(new_lot, count=len(self)) def statistics(self): """Return an object containing debugging information. Currently the following fields are defined: * ``tasks_waiting``: The number of tasks blocked on this lot's :meth:`park` method. """ return _ParkingLotStatistics(tasks_waiting=len(self._parked))