from collections import deque import threading import attr from .. import _core from .._util import NoPublicConstructor from ._wakeup_socketpair import WakeupSocketpair @attr.s(slots=True) class EntryQueue: # This used to use a queue.Queue. but that was broken, because Queues are # implemented in Python, and not reentrant -- so it was thread-safe, but # not signal-safe. deque is implemented in C, so each operation is atomic # WRT threads (and this is guaranteed in the docs), AND each operation is # atomic WRT signal delivery (signal handlers can run on either side, but # not *during* a deque operation). dict makes similar guarantees - and # it's even ordered! queue = attr.ib(factory=deque) idempotent_queue = attr.ib(factory=dict) wakeup = attr.ib(factory=WakeupSocketpair) done = attr.ib(default=False) # Must be a reentrant lock, because it's acquired from signal handlers. # RLock is signal-safe as of cpython 3.2. NB that this does mean that the # lock is effectively *disabled* when we enter from signal context. The # way we use the lock this is OK though, because when # run_sync_soon is called from a signal it's atomic WRT the # main thread -- it just might happen at some inconvenient place. But if # you look at the one place where the main thread holds the lock, it's # just to make 1 assignment, so that's atomic WRT a signal anyway. lock = attr.ib(factory=threading.RLock) async def task(self): assert _core.currently_ki_protected() # RLock has two implementations: a signal-safe version in _thread, and # and signal-UNsafe version in threading. We need the signal safe # version. Python 3.2 and later should always use this anyway, but, # since the symptoms if this goes wrong are just "weird rare # deadlocks", then let's make a little check. # See: # https://bugs.python.org/issue13697#msg237140 assert self.lock.__class__.__module__ == "_thread" def run_cb(job): # We run this with KI protection enabled; it's the callback's # job to disable it if it wants it disabled. Exceptions are # treated like system task exceptions (i.e., converted into # TrioInternalError and cause everything to shut down). sync_fn, args = job try: sync_fn(*args) except BaseException as exc: async def kill_everything(exc): raise exc try: _core.spawn_system_task(kill_everything, exc) except RuntimeError: # We're quite late in the shutdown process and the # system nursery is already closed. # TODO(2020-06): this is a gross hack and should # be fixed soon when we address #1607. _core.current_task().parent_nursery.start_soon(kill_everything, exc) return True # This has to be carefully written to be safe in the face of new items # being queued while we iterate, and to do a bounded amount of work on # each pass: def run_all_bounded(): for _ in range(len(self.queue)): run_cb(self.queue.popleft()) for job in list(self.idempotent_queue): del self.idempotent_queue[job] run_cb(job) try: while True: run_all_bounded() if not self.queue and not self.idempotent_queue: await self.wakeup.wait_woken() else: await _core.checkpoint() except _core.Cancelled: # Keep the work done with this lock held as minimal as possible, # because it doesn't protect us against concurrent signal delivery # (see the comment above). Notice that this code would still be # correct if written like: # self.done = True # with self.lock: # pass # because all we want is to force run_sync_soon # to either be completely before or completely after the write to # done. That's why we don't need the lock to protect # against signal handlers. with self.lock: self.done = True # No more jobs will be submitted, so just clear out any residual # ones: run_all_bounded() assert not self.queue assert not self.idempotent_queue def close(self): self.wakeup.close() def size(self): return len(self.queue) + len(self.idempotent_queue) def run_sync_soon(self, sync_fn, *args, idempotent=False): with self.lock: if self.done: raise _core.RunFinishedError("run() has exited") # We have to hold the lock all the way through here, because # otherwise the main thread might exit *while* we're doing these # calls, and then our queue item might not be processed, or the # wakeup call might trigger an OSError b/c the IO manager has # already been shut down. if idempotent: self.idempotent_queue[(sync_fn, args)] = None else: self.queue.append((sync_fn, args)) self.wakeup.wakeup_thread_and_signal_safe() @attr.s(eq=False, hash=False, slots=True) class TrioToken(metaclass=NoPublicConstructor): """An opaque object representing a single call to :func:`trio.run`. It has no public constructor; instead, see :func:`current_trio_token`. This object has two uses: 1. It lets you re-enter the Trio run loop from external threads or signal handlers. This is the low-level primitive that :func:`trio.to_thread` and `trio.from_thread` use to communicate with worker threads, that `trio.open_signal_receiver` uses to receive notifications about signals, and so forth. 2. Each call to :func:`trio.run` has exactly one associated :class:`TrioToken` object, so you can use it to identify a particular call. """ _reentry_queue = attr.ib() def run_sync_soon(self, sync_fn, *args, idempotent=False): """Schedule a call to ``sync_fn(*args)`` to occur in the context of a Trio task. This is safe to call from the main thread, from other threads, and from signal handlers. This is the fundamental primitive used to re-enter the Trio run loop from outside of it. The call will happen "soon", but there's no guarantee about exactly when, and no mechanism provided for finding out when it's happened. If you need this, you'll have to build your own. The call is effectively run as part of a system task (see :func:`~trio.lowlevel.spawn_system_task`). In particular this means that: * :exc:`KeyboardInterrupt` protection is *enabled* by default; if you want ``sync_fn`` to be interruptible by control-C, then you need to use :func:`~trio.lowlevel.disable_ki_protection` explicitly. * If ``sync_fn`` raises an exception, then it's converted into a :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You should be careful that ``sync_fn`` doesn't crash. All calls with ``idempotent=False`` are processed in strict first-in first-out order. If ``idempotent=True``, then ``sync_fn`` and ``args`` must be hashable, and Trio will make a best-effort attempt to discard any call submission which is equal to an already-pending call. Trio will process these in first-in first-out order. Any ordering guarantees apply separately to ``idempotent=False`` and ``idempotent=True`` calls; there's no rule for how calls in the different categories are ordered with respect to each other. :raises trio.RunFinishedError: if the associated call to :func:`trio.run` has already exited. (Any call that *doesn't* raise this error is guaranteed to be fully processed before :func:`trio.run` exits.) """ self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)