import inspect
|
import signal
|
import sys
|
from functools import wraps
|
import attr
|
|
import async_generator
|
|
from .._util import is_main_thread
|
|
if False:
|
from typing import Any, TypeVar, Callable
|
|
F = TypeVar("F", bound=Callable[..., Any])
|
|
# In ordinary single-threaded Python code, when you hit control-C, it raises
|
# an exception and automatically does all the regular unwinding stuff.
|
#
|
# In Trio code, we would like hitting control-C to raise an exception and
|
# automatically do all the regular unwinding stuff. In particular, we would
|
# like to maintain our invariant that all tasks always run to completion (one
|
# way or another), by unwinding all of them.
|
#
|
# But it's basically impossible to write the core task running code in such a
|
# way that it can maintain this invariant in the face of KeyboardInterrupt
|
# exceptions arising at arbitrary bytecode positions. Similarly, if a
|
# KeyboardInterrupt happened at the wrong moment inside pretty much any of our
|
# inter-task synchronization or I/O primitives, then the system state could
|
# get corrupted and prevent our being able to clean up properly.
|
#
|
# So, we need a way to defer KeyboardInterrupt processing from these critical
|
# sections.
|
#
|
# Things that don't work:
|
#
|
# - Listen for SIGINT and process it in a system task: works fine for
|
# well-behaved programs that regularly pass through the event loop, but if
|
# user-code goes into an infinite loop then it can't be interrupted. Which
|
# is unfortunate, since dealing with infinite loops is what
|
# KeyboardInterrupt is for!
|
#
|
# - Use pthread_sigmask to disable signal delivery during critical section:
|
# (a) windows has no pthread_sigmask, (b) python threads start with all
|
# signals unblocked, so if there are any threads around they'll receive the
|
# signal and then tell the main thread to run the handler, even if the main
|
# thread has that signal blocked.
|
#
|
# - Install a signal handler which checks a global variable to decide whether
|
# to raise the exception immediately (if we're in a non-critical section),
|
# or to schedule it on the event loop (if we're in a critical section). The
|
# problem here is that it's impossible to transition safely out of user code:
|
#
|
# with keyboard_interrupt_enabled:
|
# msg = coro.send(value)
|
#
|
# If this raises a KeyboardInterrupt, it might be because the coroutine got
|
# interrupted and has unwound... or it might be the KeyboardInterrupt
|
# arrived just *after* 'send' returned, so the coroutine is still running
|
# but we just lost the message it sent. (And worse, in our actual task
|
# runner, the send is hidden inside a utility function etc.)
|
#
|
# Solution:
|
#
|
# Mark *stack frames* as being interrupt-safe or interrupt-unsafe, and from
|
# the signal handler check which kind of frame we're currently in when
|
# deciding whether to raise or schedule the exception.
|
#
|
# There are still some cases where this can fail, like if someone hits
|
# control-C while the process is in the event loop, and then it immediately
|
# enters an infinite loop in user code. In this case the user has to hit
|
# control-C a second time. And of course if the user code is written so that
|
# it doesn't actually exit after a task crashes and everything gets cancelled,
|
# then there's not much to be done. (Hitting control-C repeatedly might help,
|
# but in general the solution is to kill the process some other way, just like
|
# for any Python program that's written to catch and ignore
|
# KeyboardInterrupt.)
|
|
# We use this special string as a unique key into the frame locals dictionary.
|
# The @ ensures it is not a valid identifier and can't clash with any possible
|
# real local name. See: https://github.com/python-trio/trio/issues/469
|
LOCALS_KEY_KI_PROTECTION_ENABLED = "@TRIO_KI_PROTECTION_ENABLED"
|
|
|
# NB: according to the signal.signal docs, 'frame' can be None on entry to
|
# this function:
|
def ki_protection_enabled(frame):
|
while frame is not None:
|
if LOCALS_KEY_KI_PROTECTION_ENABLED in frame.f_locals:
|
return frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED]
|
if frame.f_code.co_name == "__del__":
|
return True
|
frame = frame.f_back
|
return True
|
|
|
def currently_ki_protected():
|
r"""Check whether the calling code has :exc:`KeyboardInterrupt` protection
|
enabled.
|
|
It's surprisingly easy to think that one's :exc:`KeyboardInterrupt`
|
protection is enabled when it isn't, or vice-versa. This function tells
|
you what Trio thinks of the matter, which makes it useful for ``assert``\s
|
and unit tests.
|
|
Returns:
|
bool: True if protection is enabled, and False otherwise.
|
|
"""
|
return ki_protection_enabled(sys._getframe())
|
|
|
def _ki_protection_decorator(enabled):
|
def decorator(fn):
|
# In some version of Python, isgeneratorfunction returns true for
|
# coroutine functions, so we have to check for coroutine functions
|
# first.
|
if inspect.iscoroutinefunction(fn):
|
|
@wraps(fn)
|
def wrapper(*args, **kwargs):
|
# See the comment for regular generators below
|
coro = fn(*args, **kwargs)
|
coro.cr_frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
|
return coro
|
|
return wrapper
|
elif inspect.isgeneratorfunction(fn):
|
|
@wraps(fn)
|
def wrapper(*args, **kwargs):
|
# It's important that we inject this directly into the
|
# generator's locals, as opposed to setting it here and then
|
# doing 'yield from'. The reason is, if a generator is
|
# throw()n into, then it may magically pop to the top of the
|
# stack. And @contextmanager generators in particular are a
|
# case where we often want KI protection, and which are often
|
# thrown into! See:
|
# https://bugs.python.org/issue29590
|
gen = fn(*args, **kwargs)
|
gen.gi_frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
|
return gen
|
|
return wrapper
|
elif async_generator.isasyncgenfunction(fn):
|
|
@wraps(fn)
|
def wrapper(*args, **kwargs):
|
# See the comment for regular generators above
|
agen = fn(*args, **kwargs)
|
agen.ag_frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
|
return agen
|
|
return wrapper
|
else:
|
|
@wraps(fn)
|
def wrapper(*args, **kwargs):
|
locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
|
return fn(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
enable_ki_protection = _ki_protection_decorator(True) # type: Callable[[F], F]
|
enable_ki_protection.__name__ = "enable_ki_protection"
|
|
disable_ki_protection = _ki_protection_decorator(False) # type: Callable[[F], F]
|
disable_ki_protection.__name__ = "disable_ki_protection"
|
|
|
@attr.s
|
class KIManager:
|
handler = attr.ib(default=None)
|
|
def install(self, deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
|
assert self.handler is None
|
if (
|
not is_main_thread()
|
or signal.getsignal(signal.SIGINT) != signal.default_int_handler
|
):
|
return
|
|
def handler(signum, frame):
|
assert signum == signal.SIGINT
|
protection_enabled = ki_protection_enabled(frame)
|
if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
|
deliver_cb()
|
else:
|
raise KeyboardInterrupt
|
|
self.handler = handler
|
signal.signal(signal.SIGINT, handler)
|
|
def close(self):
|
if self.handler is not None:
|
if signal.getsignal(signal.SIGINT) is self.handler:
|
signal.signal(signal.SIGINT, signal.default_int_handler)
|
self.handler = None
|