1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# ParkingLot provides an abstraction for a fair waitqueue with cancellation
# and requeueing support. Inspiration:
#
#    https://webkit.org/blog/6161/locking-in-webkit/
#    https://amanieu.github.io/parking_lot/
#
# which were in turn heavily influenced by
#
#    http://gee.cs.oswego.edu/dl/papers/aqs.pdf
#
# Compared to these, our use of cooperative scheduling allows some
# simplifications (no need for internal locking). On the other hand, the need
# to support Trio's strong cancellation semantics adds some complications
# (tasks need to know where they're queued so they can cancel). Also, in the
# above work, the ParkingLot is a global structure that holds a collection of
# waitqueues keyed by lock address, and which are opportunistically allocated
# and destroyed as contention arises; this allows the worst-case memory usage
# for all waitqueues to be O(#tasks). Here we allocate a separate wait queue
# for each synchronization object, so we're O(#objects + #tasks). This isn't
# *so* bad since compared to our synchronization objects are heavier than
# theirs and our tasks are lighter, so for us #objects is smaller and #tasks
# is larger.
#
# This is in the core because for two reasons. First, it's used by
# UnboundedQueue, and UnboundedQueue is used for a number of things in the
# core. And second, it's responsible for providing fairness to all of our
# high-level synchronization primitives (locks, queues, etc.). For now with
# our FIFO scheduler this is relatively trivial (it's just a FIFO waitqueue),
# but in the future we ever start support task priorities or fair scheduling
#
#    https://github.com/python-trio/trio/issues/32
#
# then all we'll have to do is update this. (Well, full-fledged task
# priorities might also require priority inheritance, which would require more
# work.)
#
# For discussion of data structures to use here, see:
#
#     https://github.com/dabeaz/curio/issues/136
#
# (and also the articles above). Currently we use a SortedDict ordered by a
# global monotonic counter that ensures FIFO ordering. The main advantage of
# this is that it's easy to implement :-). An intrusive doubly-linked list
# would also be a natural approach, so long as we only handle FIFO ordering.
#
# XX: should we switch to the shared global ParkingLot approach?
#
# XX: we should probably add support for "parking tokens" to allow for
# task-fair RWlock (basically: when parking a task needs to be able to mark
# itself as a reader or a writer, and then a task-fair wakeup policy is, wake
# the next task, and if it's a reader than keep waking tasks so long as they
# are readers). Without this I think you can implement write-biased or
# read-biased RWlocks (by using two parking lots and drawing from whichever is
# preferred), but not task-fair -- and task-fair plays much more nicely with
# WFQ. (Consider what happens in the two-lot implementation if you're
# write-biased but all the pending writers are blocked at the scheduler level
# by the WFQ logic...)
# ...alternatively, "phase-fair" RWlocks are pretty interesting:
#    http://www.cs.unc.edu/~anderson/papers/ecrts09b.pdf
# Useful summary:
# https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html
#
# XX: if we do add WFQ, then we might have to drop the current feature where
# unpark returns the tasks that were unparked. Rationale: suppose that at the
# time we call unpark, the next task is deprioritized... and then, before it
# becomes runnable, a new task parks which *is* runnable. Ideally we should
# immediately wake the new task, and leave the old task on the queue for
# later. But this means we can't commit to which task we are unparking when
# unpark is called.
#
# See: https://github.com/python-trio/trio/issues/53
 
import attr
from collections import OrderedDict
 
from .. import _core
from .._util import Final
 
 
@attr.s(frozen=True, slots=True)
class _ParkingLotStatistics:
    tasks_waiting = attr.ib()
 
 
@attr.s(eq=False, hash=False, slots=True)
class ParkingLot(metaclass=Final):
    """A fair wait queue with cancellation and requeueing.
 
    This class encapsulates the tricky parts of implementing a wait
    queue. It's useful for implementing higher-level synchronization
    primitives like queues and locks.
 
    In addition to the methods below, you can use ``len(parking_lot)`` to get
    the number of parked tasks, and ``if parking_lot: ...`` to check whether
    there are any parked tasks.
 
    """
 
    # {task: None}, we just want a deque where we can quickly delete random
    # items
    _parked = attr.ib(factory=OrderedDict, init=False)
 
    def __len__(self):
        """Returns the number of parked tasks."""
        return len(self._parked)
 
    def __bool__(self):
        """True if there are parked tasks, False otherwise."""
        return bool(self._parked)
 
    # XX this currently returns None
    # if we ever add the ability to repark while one's resuming place in
    # line (for false wakeups), then we could have it return a ticket that
    # abstracts the "place in line" concept.
    @_core.enable_ki_protection
    async def park(self):
        """Park the current task until woken by a call to :meth:`unpark` or
        :meth:`unpark_all`.
 
        """
        task = _core.current_task()
        self._parked[task] = None
        task.custom_sleep_data = self
 
        def abort_fn(_):
            del task.custom_sleep_data._parked[task]
            return _core.Abort.SUCCEEDED
 
        await _core.wait_task_rescheduled(abort_fn)
 
    def _pop_several(self, count):
        for _ in range(min(count, len(self._parked))):
            task, _ = self._parked.popitem(last=False)
            yield task
 
    @_core.enable_ki_protection
    def unpark(self, *, count=1):
        """Unpark one or more tasks.
 
        This wakes up ``count`` tasks that are blocked in :meth:`park`. If
        there are fewer than ``count`` tasks parked, then wakes as many tasks
        are available and then returns successfully.
 
        Args:
          count (int): the number of tasks to unpark.
 
        """
        tasks = list(self._pop_several(count))
        for task in tasks:
            _core.reschedule(task)
        return tasks
 
    def unpark_all(self):
        """Unpark all parked tasks."""
        return self.unpark(count=len(self))
 
    @_core.enable_ki_protection
    def repark(self, new_lot, *, count=1):
        """Move parked tasks from one :class:`ParkingLot` object to another.
 
        This dequeues ``count`` tasks from one lot, and requeues them on
        another, preserving order. For example::
 
           async def parker(lot):
               print("sleeping")
               await lot.park()
               print("woken")
 
           async def main():
               lot1 = trio.lowlevel.ParkingLot()
               lot2 = trio.lowlevel.ParkingLot()
               async with trio.open_nursery() as nursery:
                   nursery.start_soon(parker, lot1)
                   await trio.testing.wait_all_tasks_blocked()
                   assert len(lot1) == 1
                   assert len(lot2) == 0
                   lot1.repark(lot2)
                   assert len(lot1) == 0
                   assert len(lot2) == 1
                   # This wakes up the task that was originally parked in lot1
                   lot2.unpark()
 
        If there are fewer than ``count`` tasks parked, then reparks as many
        tasks as are available and then returns successfully.
 
        Args:
          new_lot (ParkingLot): the parking lot to move tasks to.
          count (int): the number of tasks to move.
 
        """
        if not isinstance(new_lot, ParkingLot):
            raise TypeError("new_lot must be a ParkingLot")
        for task in self._pop_several(count):
            new_lot._parked[task] = None
            task.custom_sleep_data = new_lot
 
    def repark_all(self, new_lot):
        """Move all parked tasks from one :class:`ParkingLot` object to
        another.
 
        See :meth:`repark` for details.
 
        """
        return self.repark(new_lot, count=len(self))
 
    def statistics(self):
        """Return an object containing debugging information.
 
        Currently the following fields are defined:
 
        * ``tasks_waiting``: The number of tasks blocked on this lot's
          :meth:`park` method.
 
        """
        return _ParkingLotStatistics(tasks_waiting=len(self._parked))