1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
from collections import deque
import threading
 
import attr
 
from .. import _core
from .._util import NoPublicConstructor
from ._wakeup_socketpair import WakeupSocketpair
 
 
@attr.s(slots=True)
class EntryQueue:
    # This used to use a queue.Queue. but that was broken, because Queues are
    # implemented in Python, and not reentrant -- so it was thread-safe, but
    # not signal-safe. deque is implemented in C, so each operation is atomic
    # WRT threads (and this is guaranteed in the docs), AND each operation is
    # atomic WRT signal delivery (signal handlers can run on either side, but
    # not *during* a deque operation). dict makes similar guarantees - and
    # it's even ordered!
    queue = attr.ib(factory=deque)
    idempotent_queue = attr.ib(factory=dict)
 
    wakeup = attr.ib(factory=WakeupSocketpair)
    done = attr.ib(default=False)
    # Must be a reentrant lock, because it's acquired from signal handlers.
    # RLock is signal-safe as of cpython 3.2. NB that this does mean that the
    # lock is effectively *disabled* when we enter from signal context. The
    # way we use the lock this is OK though, because when
    # run_sync_soon is called from a signal it's atomic WRT the
    # main thread -- it just might happen at some inconvenient place. But if
    # you look at the one place where the main thread holds the lock, it's
    # just to make 1 assignment, so that's atomic WRT a signal anyway.
    lock = attr.ib(factory=threading.RLock)
 
    async def task(self):
        assert _core.currently_ki_protected()
        # RLock has two implementations: a signal-safe version in _thread, and
        # and signal-UNsafe version in threading. We need the signal safe
        # version. Python 3.2 and later should always use this anyway, but,
        # since the symptoms if this goes wrong are just "weird rare
        # deadlocks", then let's make a little check.
        # See:
        #     https://bugs.python.org/issue13697#msg237140
        assert self.lock.__class__.__module__ == "_thread"
 
        def run_cb(job):
            # We run this with KI protection enabled; it's the callback's
            # job to disable it if it wants it disabled. Exceptions are
            # treated like system task exceptions (i.e., converted into
            # TrioInternalError and cause everything to shut down).
            sync_fn, args = job
            try:
                sync_fn(*args)
            except BaseException as exc:
 
                async def kill_everything(exc):
                    raise exc
 
                try:
                    _core.spawn_system_task(kill_everything, exc)
                except RuntimeError:
                    # We're quite late in the shutdown process and the
                    # system nursery is already closed.
                    # TODO(2020-06): this is a gross hack and should
                    # be fixed soon when we address #1607.
                    _core.current_task().parent_nursery.start_soon(kill_everything, exc)
 
            return True
 
        # This has to be carefully written to be safe in the face of new items
        # being queued while we iterate, and to do a bounded amount of work on
        # each pass:
        def run_all_bounded():
            for _ in range(len(self.queue)):
                run_cb(self.queue.popleft())
            for job in list(self.idempotent_queue):
                del self.idempotent_queue[job]
                run_cb(job)
 
        try:
            while True:
                run_all_bounded()
                if not self.queue and not self.idempotent_queue:
                    await self.wakeup.wait_woken()
                else:
                    await _core.checkpoint()
        except _core.Cancelled:
            # Keep the work done with this lock held as minimal as possible,
            # because it doesn't protect us against concurrent signal delivery
            # (see the comment above). Notice that this code would still be
            # correct if written like:
            #   self.done = True
            #   with self.lock:
            #       pass
            # because all we want is to force run_sync_soon
            # to either be completely before or completely after the write to
            # done. That's why we don't need the lock to protect
            # against signal handlers.
            with self.lock:
                self.done = True
            # No more jobs will be submitted, so just clear out any residual
            # ones:
            run_all_bounded()
            assert not self.queue
            assert not self.idempotent_queue
 
    def close(self):
        self.wakeup.close()
 
    def size(self):
        return len(self.queue) + len(self.idempotent_queue)
 
    def run_sync_soon(self, sync_fn, *args, idempotent=False):
        with self.lock:
            if self.done:
                raise _core.RunFinishedError("run() has exited")
            # We have to hold the lock all the way through here, because
            # otherwise the main thread might exit *while* we're doing these
            # calls, and then our queue item might not be processed, or the
            # wakeup call might trigger an OSError b/c the IO manager has
            # already been shut down.
            if idempotent:
                self.idempotent_queue[(sync_fn, args)] = None
            else:
                self.queue.append((sync_fn, args))
            self.wakeup.wakeup_thread_and_signal_safe()
 
 
@attr.s(eq=False, hash=False, slots=True)
class TrioToken(metaclass=NoPublicConstructor):
    """An opaque object representing a single call to :func:`trio.run`.
 
    It has no public constructor; instead, see :func:`current_trio_token`.
 
    This object has two uses:
 
    1. It lets you re-enter the Trio run loop from external threads or signal
       handlers. This is the low-level primitive that :func:`trio.to_thread`
       and `trio.from_thread` use to communicate with worker threads, that
       `trio.open_signal_receiver` uses to receive notifications about
       signals, and so forth.
 
    2. Each call to :func:`trio.run` has exactly one associated
       :class:`TrioToken` object, so you can use it to identify a particular
       call.
 
    """
 
    _reentry_queue = attr.ib()
 
    def run_sync_soon(self, sync_fn, *args, idempotent=False):
        """Schedule a call to ``sync_fn(*args)`` to occur in the context of a
        Trio task.
 
        This is safe to call from the main thread, from other threads, and
        from signal handlers. This is the fundamental primitive used to
        re-enter the Trio run loop from outside of it.
 
        The call will happen "soon", but there's no guarantee about exactly
        when, and no mechanism provided for finding out when it's happened.
        If you need this, you'll have to build your own.
 
        The call is effectively run as part of a system task (see
        :func:`~trio.lowlevel.spawn_system_task`). In particular this means
        that:
 
        * :exc:`KeyboardInterrupt` protection is *enabled* by default; if
          you want ``sync_fn`` to be interruptible by control-C, then you
          need to use :func:`~trio.lowlevel.disable_ki_protection`
          explicitly.
 
        * If ``sync_fn`` raises an exception, then it's converted into a
          :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You
          should be careful that ``sync_fn`` doesn't crash.
 
        All calls with ``idempotent=False`` are processed in strict
        first-in first-out order.
 
        If ``idempotent=True``, then ``sync_fn`` and ``args`` must be
        hashable, and Trio will make a best-effort attempt to discard any
        call submission which is equal to an already-pending call. Trio
        will process these in first-in first-out order.
 
        Any ordering guarantees apply separately to ``idempotent=False``
        and ``idempotent=True`` calls; there's no rule for how calls in the
        different categories are ordered with respect to each other.
 
        :raises trio.RunFinishedError:
              if the associated call to :func:`trio.run`
              has already exited. (Any call that *doesn't* raise this error
              is guaranteed to be fully processed before :func:`trio.run`
              exits.)
 
        """
        self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)