1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import signal
 
import pytest
 
import trio
from .. import _core
from .._util import signal_raise
from .._signals import open_signal_receiver, _signal_handler
 
 
async def test_open_signal_receiver():
    orig = signal.getsignal(signal.SIGILL)
    with open_signal_receiver(signal.SIGILL) as receiver:
        # Raise it a few times, to exercise signal coalescing, both at the
        # call_soon level and at the SignalQueue level
        signal_raise(signal.SIGILL)
        signal_raise(signal.SIGILL)
        await _core.wait_all_tasks_blocked()
        signal_raise(signal.SIGILL)
        await _core.wait_all_tasks_blocked()
        async for signum in receiver:  # pragma: no branch
            assert signum == signal.SIGILL
            break
        assert receiver._pending_signal_count() == 0
        signal_raise(signal.SIGILL)
        async for signum in receiver:  # pragma: no branch
            assert signum == signal.SIGILL
            break
        assert receiver._pending_signal_count() == 0
    with pytest.raises(RuntimeError):
        await receiver.__anext__()
    assert signal.getsignal(signal.SIGILL) is orig
 
 
async def test_open_signal_receiver_restore_handler_after_one_bad_signal():
    orig = signal.getsignal(signal.SIGILL)
    with pytest.raises(ValueError):
        with open_signal_receiver(signal.SIGILL, 1234567):
            pass  # pragma: no cover
    # Still restored even if we errored out
    assert signal.getsignal(signal.SIGILL) is orig
 
 
async def test_open_signal_receiver_empty_fail():
    with pytest.raises(TypeError, match="No signals were provided"):
        with open_signal_receiver():
            pass
 
 
async def test_open_signal_receiver_restore_handler_after_duplicate_signal():
    orig = signal.getsignal(signal.SIGILL)
    with open_signal_receiver(signal.SIGILL, signal.SIGILL):
        pass
    # Still restored correctly
    assert signal.getsignal(signal.SIGILL) is orig
 
 
async def test_catch_signals_wrong_thread():
    async def naughty():
        with open_signal_receiver(signal.SIGINT):
            pass  # pragma: no cover
 
    with pytest.raises(RuntimeError):
        await trio.to_thread.run_sync(trio.run, naughty)
 
 
async def test_open_signal_receiver_conflict():
    with pytest.raises(trio.BusyResourceError):
        with open_signal_receiver(signal.SIGILL) as receiver:
            async with trio.open_nursery() as nursery:
                nursery.start_soon(receiver.__anext__)
                nursery.start_soon(receiver.__anext__)
 
 
# Blocks until all previous calls to run_sync_soon(idempotent=True) have been
# processed.
async def wait_run_sync_soon_idempotent_queue_barrier():
    ev = trio.Event()
    token = _core.current_trio_token()
    token.run_sync_soon(ev.set, idempotent=True)
    await ev.wait()
 
 
async def test_open_signal_receiver_no_starvation():
    # Set up a situation where there are always 2 pending signals available to
    # report, and make sure that instead of getting the same signal reported
    # over and over, it alternates between reporting both of them.
    with open_signal_receiver(signal.SIGILL, signal.SIGFPE) as receiver:
        try:
            print(signal.getsignal(signal.SIGILL))
            previous = None
            for _ in range(10):
                signal_raise(signal.SIGILL)
                signal_raise(signal.SIGFPE)
                await wait_run_sync_soon_idempotent_queue_barrier()
                if previous is None:
                    previous = await receiver.__anext__()
                else:
                    got = await receiver.__anext__()
                    assert got in [signal.SIGILL, signal.SIGFPE]
                    assert got != previous
                    previous = got
            # Clear out the last signal so it doesn't get redelivered
            while receiver._pending_signal_count() != 0:
                await receiver.__anext__()
        except:  # pragma: no cover
            # If there's an unhandled exception above, then exiting the
            # open_signal_receiver block might cause the signal to be
            # redelivered and give us a core dump instead of a traceback...
            import traceback
 
            traceback.print_exc()
 
 
async def test_catch_signals_race_condition_on_exit():
    delivered_directly = set()
 
    def direct_handler(signo, frame):
        delivered_directly.add(signo)
 
    print(1)
    # Test the version where the call_soon *doesn't* have a chance to run
    # before we exit the with block:
    with _signal_handler({signal.SIGILL, signal.SIGFPE}, direct_handler):
        with open_signal_receiver(signal.SIGILL, signal.SIGFPE) as receiver:
            signal_raise(signal.SIGILL)
            signal_raise(signal.SIGFPE)
        await wait_run_sync_soon_idempotent_queue_barrier()
    assert delivered_directly == {signal.SIGILL, signal.SIGFPE}
    delivered_directly.clear()
 
    print(2)
    # Test the version where the call_soon *does* have a chance to run before
    # we exit the with block:
    with _signal_handler({signal.SIGILL, signal.SIGFPE}, direct_handler):
        with open_signal_receiver(signal.SIGILL, signal.SIGFPE) as receiver:
            signal_raise(signal.SIGILL)
            signal_raise(signal.SIGFPE)
            await wait_run_sync_soon_idempotent_queue_barrier()
            assert receiver._pending_signal_count() == 2
    assert delivered_directly == {signal.SIGILL, signal.SIGFPE}
    delivered_directly.clear()
 
    # Again, but with a SIG_IGN signal:
 
    print(3)
    with _signal_handler({signal.SIGILL}, signal.SIG_IGN):
        with open_signal_receiver(signal.SIGILL) as receiver:
            signal_raise(signal.SIGILL)
        await wait_run_sync_soon_idempotent_queue_barrier()
    # test passes if the process reaches this point without dying
 
    print(4)
    with _signal_handler({signal.SIGILL}, signal.SIG_IGN):
        with open_signal_receiver(signal.SIGILL) as receiver:
            signal_raise(signal.SIGILL)
            await wait_run_sync_soon_idempotent_queue_barrier()
            assert receiver._pending_signal_count() == 1
    # test passes if the process reaches this point without dying
 
    # Check exception chaining if there are multiple exception-raising
    # handlers
    def raise_handler(signum, _):
        raise RuntimeError(signum)
 
    with _signal_handler({signal.SIGILL, signal.SIGFPE}, raise_handler):
        with pytest.raises(RuntimeError) as excinfo:
            with open_signal_receiver(signal.SIGILL, signal.SIGFPE) as receiver:
                signal_raise(signal.SIGILL)
                signal_raise(signal.SIGFPE)
                await wait_run_sync_soon_idempotent_queue_barrier()
                assert receiver._pending_signal_count() == 2
        exc = excinfo.value
        signums = {exc.args[0]}
        assert isinstance(exc.__context__, RuntimeError)
        signums.add(exc.__context__.args[0])
        assert signums == {signal.SIGILL, signal.SIGFPE}