import outcome
|
import pytest
|
import sys
|
import os
|
import signal
|
import threading
|
import contextlib
|
import time
|
|
from async_generator import (
|
async_generator,
|
yield_,
|
isasyncgenfunction,
|
asynccontextmanager,
|
)
|
|
from ... import _core
|
from ...testing import wait_all_tasks_blocked
|
from ..._util import signal_raise, is_main_thread
|
from ..._timeouts import sleep
|
from .tutil import slow
|
|
|
def ki_self():
|
signal_raise(signal.SIGINT)
|
|
|
def test_ki_self():
|
with pytest.raises(KeyboardInterrupt):
|
ki_self()
|
|
|
async def test_ki_enabled():
|
# Regular tasks aren't KI-protected
|
assert not _core.currently_ki_protected()
|
|
# Low-level call-soon callbacks are KI-protected
|
token = _core.current_trio_token()
|
record = []
|
|
def check():
|
record.append(_core.currently_ki_protected())
|
|
token.run_sync_soon(check)
|
await wait_all_tasks_blocked()
|
assert record == [True]
|
|
@_core.enable_ki_protection
|
def protected():
|
assert _core.currently_ki_protected()
|
unprotected()
|
|
@_core.disable_ki_protection
|
def unprotected():
|
assert not _core.currently_ki_protected()
|
|
protected()
|
|
@_core.enable_ki_protection
|
async def aprotected():
|
assert _core.currently_ki_protected()
|
await aunprotected()
|
|
@_core.disable_ki_protection
|
async def aunprotected():
|
assert not _core.currently_ki_protected()
|
|
await aprotected()
|
|
# make sure that the decorator here overrides the automatic manipulation
|
# that start_soon() does:
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(aprotected)
|
nursery.start_soon(aunprotected)
|
|
@_core.enable_ki_protection
|
def gen_protected():
|
assert _core.currently_ki_protected()
|
yield
|
|
for _ in gen_protected():
|
pass
|
|
@_core.disable_ki_protection
|
def gen_unprotected():
|
assert not _core.currently_ki_protected()
|
yield
|
|
for _ in gen_unprotected():
|
pass
|
|
|
# This used to be broken due to
|
#
|
# https://bugs.python.org/issue29590
|
#
|
# Specifically, after a coroutine is resumed with .throw(), then the stack
|
# makes it look like the immediate caller is the function that called
|
# .throw(), not the actual caller. So child() here would have a caller deep in
|
# the guts of the run loop, and always be protected, even when it shouldn't
|
# have been. (Solution: we don't use .throw() anymore.)
|
async def test_ki_enabled_after_yield_briefly():
|
@_core.enable_ki_protection
|
async def protected():
|
await child(True)
|
|
@_core.disable_ki_protection
|
async def unprotected():
|
await child(False)
|
|
async def child(expected):
|
import traceback
|
|
traceback.print_stack()
|
assert _core.currently_ki_protected() == expected
|
await _core.checkpoint()
|
traceback.print_stack()
|
assert _core.currently_ki_protected() == expected
|
|
await protected()
|
await unprotected()
|
|
|
# This also used to be broken due to
|
# https://bugs.python.org/issue29590
|
async def test_generator_based_context_manager_throw():
|
@contextlib.contextmanager
|
@_core.enable_ki_protection
|
def protected_manager():
|
assert _core.currently_ki_protected()
|
try:
|
yield
|
finally:
|
assert _core.currently_ki_protected()
|
|
with protected_manager():
|
assert not _core.currently_ki_protected()
|
|
with pytest.raises(KeyError):
|
# This is the one that used to fail
|
with protected_manager():
|
raise KeyError
|
|
|
async def test_agen_protection():
|
@_core.enable_ki_protection
|
@async_generator
|
async def agen_protected1():
|
assert _core.currently_ki_protected()
|
try:
|
await yield_()
|
finally:
|
assert _core.currently_ki_protected()
|
|
@_core.disable_ki_protection
|
@async_generator
|
async def agen_unprotected1():
|
assert not _core.currently_ki_protected()
|
try:
|
await yield_()
|
finally:
|
assert not _core.currently_ki_protected()
|
|
# Swap the order of the decorators:
|
@async_generator
|
@_core.enable_ki_protection
|
async def agen_protected2():
|
assert _core.currently_ki_protected()
|
try:
|
await yield_()
|
finally:
|
assert _core.currently_ki_protected()
|
|
@async_generator
|
@_core.disable_ki_protection
|
async def agen_unprotected2():
|
assert not _core.currently_ki_protected()
|
try:
|
await yield_()
|
finally:
|
assert not _core.currently_ki_protected()
|
|
# Native async generators
|
@_core.enable_ki_protection
|
async def agen_protected3():
|
assert _core.currently_ki_protected()
|
try:
|
yield
|
finally:
|
assert _core.currently_ki_protected()
|
|
@_core.disable_ki_protection
|
async def agen_unprotected3():
|
assert not _core.currently_ki_protected()
|
try:
|
yield
|
finally:
|
assert not _core.currently_ki_protected()
|
|
for agen_fn in [
|
agen_protected1,
|
agen_protected2,
|
agen_protected3,
|
agen_unprotected1,
|
agen_unprotected2,
|
agen_unprotected3,
|
]:
|
async for _ in agen_fn(): # noqa
|
assert not _core.currently_ki_protected()
|
|
# asynccontextmanager insists that the function passed must itself be an
|
# async gen function, not a wrapper around one
|
if isasyncgenfunction(agen_fn):
|
async with asynccontextmanager(agen_fn)():
|
assert not _core.currently_ki_protected()
|
|
# Another case that's tricky due to:
|
# https://bugs.python.org/issue29590
|
with pytest.raises(KeyError):
|
async with asynccontextmanager(agen_fn)():
|
raise KeyError
|
|
|
# Test the case where there's no magic local anywhere in the call stack
|
def test_ki_disabled_out_of_context():
|
assert _core.currently_ki_protected()
|
|
|
def test_ki_disabled_in_del():
|
def nestedfunction():
|
return _core.currently_ki_protected()
|
|
def __del__():
|
assert _core.currently_ki_protected()
|
assert nestedfunction()
|
|
@_core.disable_ki_protection
|
def outerfunction():
|
assert not _core.currently_ki_protected()
|
assert not nestedfunction()
|
__del__()
|
|
__del__()
|
outerfunction()
|
assert nestedfunction()
|
|
|
def test_ki_protection_works():
|
async def sleeper(name, record):
|
try:
|
while True:
|
await _core.checkpoint()
|
except _core.Cancelled:
|
record.add(name + " ok")
|
|
async def raiser(name, record):
|
try:
|
# os.kill runs signal handlers before returning, so we don't need
|
# to worry that the handler will be delayed
|
print("killing, protection =", _core.currently_ki_protected())
|
ki_self()
|
except KeyboardInterrupt:
|
print("raised!")
|
# Make sure we aren't getting cancelled as well as siginted
|
await _core.checkpoint()
|
record.add(name + " raise ok")
|
raise
|
else:
|
print("didn't raise!")
|
# If we didn't raise (b/c protected), then we *should* get
|
# cancelled at the next opportunity
|
try:
|
await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)
|
except _core.Cancelled:
|
record.add(name + " cancel ok")
|
|
# simulated control-C during raiser, which is *unprotected*
|
print("check 1")
|
record = set()
|
|
async def check_unprotected_kill():
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(sleeper, "s1", record)
|
nursery.start_soon(sleeper, "s2", record)
|
nursery.start_soon(raiser, "r1", record)
|
|
with pytest.raises(KeyboardInterrupt):
|
_core.run(check_unprotected_kill)
|
assert record == {"s1 ok", "s2 ok", "r1 raise ok"}
|
|
# simulated control-C during raiser, which is *protected*, so the KI gets
|
# delivered to the main task instead
|
print("check 2")
|
record = set()
|
|
async def check_protected_kill():
|
async with _core.open_nursery() as nursery:
|
nursery.start_soon(sleeper, "s1", record)
|
nursery.start_soon(sleeper, "s2", record)
|
nursery.start_soon(_core.enable_ki_protection(raiser), "r1", record)
|
# __aexit__ blocks, and then receives the KI
|
|
with pytest.raises(KeyboardInterrupt):
|
_core.run(check_protected_kill)
|
assert record == {"s1 ok", "s2 ok", "r1 cancel ok"}
|
|
# kill at last moment still raises (run_sync_soon until it raises an
|
# error, then kill)
|
print("check 3")
|
|
async def check_kill_during_shutdown():
|
token = _core.current_trio_token()
|
|
def kill_during_shutdown():
|
assert _core.currently_ki_protected()
|
try:
|
token.run_sync_soon(kill_during_shutdown)
|
except _core.RunFinishedError:
|
# it's too late for regular handling! handle this!
|
print("kill! kill!")
|
ki_self()
|
|
token.run_sync_soon(kill_during_shutdown)
|
|
with pytest.raises(KeyboardInterrupt):
|
_core.run(check_kill_during_shutdown)
|
|
# KI arrives very early, before main is even spawned
|
print("check 4")
|
|
class InstrumentOfDeath:
|
def before_run(self):
|
ki_self()
|
|
async def main():
|
await _core.checkpoint()
|
|
with pytest.raises(KeyboardInterrupt):
|
_core.run(main, instruments=[InstrumentOfDeath()])
|
|
# checkpoint_if_cancelled notices pending KI
|
print("check 5")
|
|
@_core.enable_ki_protection
|
async def main():
|
assert _core.currently_ki_protected()
|
ki_self()
|
with pytest.raises(KeyboardInterrupt):
|
await _core.checkpoint_if_cancelled()
|
|
_core.run(main)
|
|
# KI arrives while main task is not abortable, b/c already scheduled
|
print("check 6")
|
|
@_core.enable_ki_protection
|
async def main():
|
assert _core.currently_ki_protected()
|
ki_self()
|
await _core.cancel_shielded_checkpoint()
|
await _core.cancel_shielded_checkpoint()
|
await _core.cancel_shielded_checkpoint()
|
with pytest.raises(KeyboardInterrupt):
|
await _core.checkpoint()
|
|
_core.run(main)
|
|
# KI arrives while main task is not abortable, b/c refuses to be aborted
|
print("check 7")
|
|
@_core.enable_ki_protection
|
async def main():
|
assert _core.currently_ki_protected()
|
ki_self()
|
task = _core.current_task()
|
|
def abort(_):
|
_core.reschedule(task, outcome.Value(1))
|
return _core.Abort.FAILED
|
|
assert await _core.wait_task_rescheduled(abort) == 1
|
with pytest.raises(KeyboardInterrupt):
|
await _core.checkpoint()
|
|
_core.run(main)
|
|
# KI delivered via slow abort
|
print("check 8")
|
|
@_core.enable_ki_protection
|
async def main():
|
assert _core.currently_ki_protected()
|
ki_self()
|
task = _core.current_task()
|
|
def abort(raise_cancel):
|
result = outcome.capture(raise_cancel)
|
_core.reschedule(task, result)
|
return _core.Abort.FAILED
|
|
with pytest.raises(KeyboardInterrupt):
|
assert await _core.wait_task_rescheduled(abort)
|
await _core.checkpoint()
|
|
_core.run(main)
|
|
# KI arrives just before main task exits, so the run_sync_soon machinery
|
# is still functioning and will accept the callback to deliver the KI, but
|
# by the time the callback is actually run, main has exited and can't be
|
# aborted.
|
print("check 9")
|
|
@_core.enable_ki_protection
|
async def main():
|
ki_self()
|
|
with pytest.raises(KeyboardInterrupt):
|
_core.run(main)
|
|
print("check 10")
|
# KI in unprotected code, with
|
# restrict_keyboard_interrupt_to_checkpoints=True
|
record = []
|
|
async def main():
|
# We're not KI protected...
|
assert not _core.currently_ki_protected()
|
ki_self()
|
# ...but even after the KI, we keep running uninterrupted...
|
record.append("ok")
|
# ...until we hit a checkpoint:
|
with pytest.raises(KeyboardInterrupt):
|
await sleep(10)
|
|
_core.run(main, restrict_keyboard_interrupt_to_checkpoints=True)
|
assert record == ["ok"]
|
record = []
|
# Exact same code raises KI early if we leave off the argument, doesn't
|
# even reach the record.append call:
|
with pytest.raises(KeyboardInterrupt):
|
_core.run(main)
|
assert record == []
|
|
# KI arrives while main task is inside a cancelled cancellation scope
|
# the KeyboardInterrupt should take priority
|
print("check 11")
|
|
@_core.enable_ki_protection
|
async def main():
|
assert _core.currently_ki_protected()
|
with _core.CancelScope() as cancel_scope:
|
cancel_scope.cancel()
|
with pytest.raises(_core.Cancelled):
|
await _core.checkpoint()
|
ki_self()
|
with pytest.raises(KeyboardInterrupt):
|
await _core.checkpoint()
|
with pytest.raises(_core.Cancelled):
|
await _core.checkpoint()
|
|
_core.run(main)
|
|
|
def test_ki_is_good_neighbor():
|
# in the unlikely event someone overwrites our signal handler, we leave
|
# the overwritten one be
|
try:
|
orig = signal.getsignal(signal.SIGINT)
|
|
def my_handler(signum, frame): # pragma: no cover
|
pass
|
|
async def main():
|
signal.signal(signal.SIGINT, my_handler)
|
|
_core.run(main)
|
|
assert signal.getsignal(signal.SIGINT) is my_handler
|
finally:
|
signal.signal(signal.SIGINT, orig)
|
|
|
# Regression test for #461
|
def test_ki_with_broken_threads():
|
thread = threading.main_thread()
|
|
# scary!
|
original = threading._active[thread.ident]
|
|
# put this in a try finally so we don't have a chance of cascading a
|
# breakage down to everything else
|
try:
|
del threading._active[thread.ident]
|
|
@_core.enable_ki_protection
|
async def inner():
|
assert signal.getsignal(signal.SIGINT) != signal.default_int_handler
|
|
_core.run(inner)
|
finally:
|
threading._active[thread.ident] = original
|