zmc
2023-10-12 ed135d79df12a2466b52dae1a82326941211dcc9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import inspect
import signal
import sys
from functools import wraps
import attr
 
import async_generator
 
from .._util import is_main_thread
 
if False:
    from typing import Any, TypeVar, Callable
 
    F = TypeVar("F", bound=Callable[..., Any])
 
# In ordinary single-threaded Python code, when you hit control-C, it raises
# an exception and automatically does all the regular unwinding stuff.
#
# In Trio code, we would like hitting control-C to raise an exception and
# automatically do all the regular unwinding stuff. In particular, we would
# like to maintain our invariant that all tasks always run to completion (one
# way or another), by unwinding all of them.
#
# But it's basically impossible to write the core task running code in such a
# way that it can maintain this invariant in the face of KeyboardInterrupt
# exceptions arising at arbitrary bytecode positions. Similarly, if a
# KeyboardInterrupt happened at the wrong moment inside pretty much any of our
# inter-task synchronization or I/O primitives, then the system state could
# get corrupted and prevent our being able to clean up properly.
#
# So, we need a way to defer KeyboardInterrupt processing from these critical
# sections.
#
# Things that don't work:
#
# - Listen for SIGINT and process it in a system task: works fine for
#   well-behaved programs that regularly pass through the event loop, but if
#   user-code goes into an infinite loop then it can't be interrupted. Which
#   is unfortunate, since dealing with infinite loops is what
#   KeyboardInterrupt is for!
#
# - Use pthread_sigmask to disable signal delivery during critical section:
#   (a) windows has no pthread_sigmask, (b) python threads start with all
#   signals unblocked, so if there are any threads around they'll receive the
#   signal and then tell the main thread to run the handler, even if the main
#   thread has that signal blocked.
#
# - Install a signal handler which checks a global variable to decide whether
#   to raise the exception immediately (if we're in a non-critical section),
#   or to schedule it on the event loop (if we're in a critical section). The
#   problem here is that it's impossible to transition safely out of user code:
#
#     with keyboard_interrupt_enabled:
#         msg = coro.send(value)
#
#   If this raises a KeyboardInterrupt, it might be because the coroutine got
#   interrupted and has unwound... or it might be the KeyboardInterrupt
#   arrived just *after* 'send' returned, so the coroutine is still running
#   but we just lost the message it sent. (And worse, in our actual task
#   runner, the send is hidden inside a utility function etc.)
#
# Solution:
#
# Mark *stack frames* as being interrupt-safe or interrupt-unsafe, and from
# the signal handler check which kind of frame we're currently in when
# deciding whether to raise or schedule the exception.
#
# There are still some cases where this can fail, like if someone hits
# control-C while the process is in the event loop, and then it immediately
# enters an infinite loop in user code. In this case the user has to hit
# control-C a second time. And of course if the user code is written so that
# it doesn't actually exit after a task crashes and everything gets cancelled,
# then there's not much to be done. (Hitting control-C repeatedly might help,
# but in general the solution is to kill the process some other way, just like
# for any Python program that's written to catch and ignore
# KeyboardInterrupt.)
 
# We use this special string as a unique key into the frame locals dictionary.
# The @ ensures it is not a valid identifier and can't clash with any possible
# real local name. See: https://github.com/python-trio/trio/issues/469
LOCALS_KEY_KI_PROTECTION_ENABLED = "@TRIO_KI_PROTECTION_ENABLED"
 
 
# NB: according to the signal.signal docs, 'frame' can be None on entry to
# this function:
def ki_protection_enabled(frame):
    while frame is not None:
        if LOCALS_KEY_KI_PROTECTION_ENABLED in frame.f_locals:
            return frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED]
        if frame.f_code.co_name == "__del__":
            return True
        frame = frame.f_back
    return True
 
 
def currently_ki_protected():
    r"""Check whether the calling code has :exc:`KeyboardInterrupt` protection
    enabled.
 
    It's surprisingly easy to think that one's :exc:`KeyboardInterrupt`
    protection is enabled when it isn't, or vice-versa. This function tells
    you what Trio thinks of the matter, which makes it useful for ``assert``\s
    and unit tests.
 
    Returns:
      bool: True if protection is enabled, and False otherwise.
 
    """
    return ki_protection_enabled(sys._getframe())
 
 
def _ki_protection_decorator(enabled):
    def decorator(fn):
        # In some version of Python, isgeneratorfunction returns true for
        # coroutine functions, so we have to check for coroutine functions
        # first.
        if inspect.iscoroutinefunction(fn):
 
            @wraps(fn)
            def wrapper(*args, **kwargs):
                # See the comment for regular generators below
                coro = fn(*args, **kwargs)
                coro.cr_frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
                return coro
 
            return wrapper
        elif inspect.isgeneratorfunction(fn):
 
            @wraps(fn)
            def wrapper(*args, **kwargs):
                # It's important that we inject this directly into the
                # generator's locals, as opposed to setting it here and then
                # doing 'yield from'. The reason is, if a generator is
                # throw()n into, then it may magically pop to the top of the
                # stack. And @contextmanager generators in particular are a
                # case where we often want KI protection, and which are often
                # thrown into! See:
                #     https://bugs.python.org/issue29590
                gen = fn(*args, **kwargs)
                gen.gi_frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
                return gen
 
            return wrapper
        elif async_generator.isasyncgenfunction(fn):
 
            @wraps(fn)
            def wrapper(*args, **kwargs):
                # See the comment for regular generators above
                agen = fn(*args, **kwargs)
                agen.ag_frame.f_locals[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
                return agen
 
            return wrapper
        else:
 
            @wraps(fn)
            def wrapper(*args, **kwargs):
                locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = enabled
                return fn(*args, **kwargs)
 
            return wrapper
 
    return decorator
 
 
enable_ki_protection = _ki_protection_decorator(True)  # type: Callable[[F], F]
enable_ki_protection.__name__ = "enable_ki_protection"
 
disable_ki_protection = _ki_protection_decorator(False)  # type: Callable[[F], F]
disable_ki_protection.__name__ = "disable_ki_protection"
 
 
@attr.s
class KIManager:
    handler = attr.ib(default=None)
 
    def install(self, deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
        assert self.handler is None
        if (
            not is_main_thread()
            or signal.getsignal(signal.SIGINT) != signal.default_int_handler
        ):
            return
 
        def handler(signum, frame):
            assert signum == signal.SIGINT
            protection_enabled = ki_protection_enabled(frame)
            if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
                deliver_cb()
            else:
                raise KeyboardInterrupt
 
        self.handler = handler
        signal.signal(signal.SIGINT, handler)
 
    def close(self):
        if self.handler is not None:
            if signal.getsignal(signal.SIGINT) is self.handler:
                signal.signal(signal.SIGINT, signal.default_int_handler)
            self.handler = None