zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import signal
import sys
 
import pytest
 
import trio
from .. import _core
from .._core.tests.tutil import (
    ignore_coroutine_never_awaited_warnings,
    create_asyncio_future_in_new_loop,
)
from .._util import (
    signal_raise,
    ConflictDetector,
    is_main_thread,
    coroutine_or_error,
    generic_function,
    Final,
    NoPublicConstructor,
)
from ..testing import wait_all_tasks_blocked
 
 
def test_signal_raise():
    record = []
 
    def handler(signum, _):
        record.append(signum)
 
    old = signal.signal(signal.SIGFPE, handler)
    try:
        signal_raise(signal.SIGFPE)
    finally:
        signal.signal(signal.SIGFPE, old)
    assert record == [signal.SIGFPE]
 
 
async def test_ConflictDetector():
    ul1 = ConflictDetector("ul1")
    ul2 = ConflictDetector("ul2")
 
    with ul1:
        with ul2:
            print("ok")
 
    with pytest.raises(_core.BusyResourceError) as excinfo:
        with ul1:
            with ul1:
                pass  # pragma: no cover
    assert "ul1" in str(excinfo.value)
 
    async def wait_with_ul1():
        with ul1:
            await wait_all_tasks_blocked()
 
    with pytest.raises(_core.BusyResourceError) as excinfo:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(wait_with_ul1)
            nursery.start_soon(wait_with_ul1)
    assert "ul1" in str(excinfo.value)
 
 
def test_module_metadata_is_fixed_up():
    import trio
    import trio.testing
 
    assert trio.Cancelled.__module__ == "trio"
    assert trio.open_nursery.__module__ == "trio"
    assert trio.abc.Stream.__module__ == "trio.abc"
    assert trio.lowlevel.wait_task_rescheduled.__module__ == "trio.lowlevel"
    assert trio.testing.trio_test.__module__ == "trio.testing"
 
    # Also check methods
    assert trio.lowlevel.ParkingLot.__init__.__module__ == "trio.lowlevel"
    assert trio.abc.Stream.send_all.__module__ == "trio.abc"
 
    # And names
    assert trio.Cancelled.__name__ == "Cancelled"
    assert trio.Cancelled.__qualname__ == "Cancelled"
    assert trio.abc.SendStream.send_all.__name__ == "send_all"
    assert trio.abc.SendStream.send_all.__qualname__ == "SendStream.send_all"
    assert trio.to_thread.__name__ == "trio.to_thread"
    assert trio.to_thread.run_sync.__name__ == "run_sync"
    assert trio.to_thread.run_sync.__qualname__ == "run_sync"
 
 
async def test_is_main_thread():
    assert is_main_thread()
 
    def not_main_thread():
        assert not is_main_thread()
 
    await trio.to_thread.run_sync(not_main_thread)
 
 
# @coroutine is deprecated since python 3.8, which is fine with us.
@pytest.mark.filterwarnings("ignore:.*@coroutine.*:DeprecationWarning")
def test_coroutine_or_error():
    class Deferred:
        "Just kidding"
 
    with ignore_coroutine_never_awaited_warnings():
 
        async def f():  # pragma: no cover
            pass
 
        with pytest.raises(TypeError) as excinfo:
            coroutine_or_error(f())
        assert "expecting an async function" in str(excinfo.value)
 
        import asyncio
 
        if sys.version_info < (3, 11):
 
            @asyncio.coroutine
            def generator_based_coro():  # pragma: no cover
                yield from asyncio.sleep(1)
 
            with pytest.raises(TypeError) as excinfo:
                coroutine_or_error(generator_based_coro())
            assert "asyncio" in str(excinfo.value)
 
        with pytest.raises(TypeError) as excinfo:
            coroutine_or_error(create_asyncio_future_in_new_loop())
        assert "asyncio" in str(excinfo.value)
 
        with pytest.raises(TypeError) as excinfo:
            coroutine_or_error(create_asyncio_future_in_new_loop)
        assert "asyncio" in str(excinfo.value)
 
        with pytest.raises(TypeError) as excinfo:
            coroutine_or_error(Deferred())
        assert "twisted" in str(excinfo.value)
 
        with pytest.raises(TypeError) as excinfo:
            coroutine_or_error(lambda: Deferred())
        assert "twisted" in str(excinfo.value)
 
        with pytest.raises(TypeError) as excinfo:
            coroutine_or_error(len, [[1, 2, 3]])
 
        assert "appears to be synchronous" in str(excinfo.value)
 
        async def async_gen(arg):  # pragma: no cover
            yield
 
        with pytest.raises(TypeError) as excinfo:
            coroutine_or_error(async_gen, [0])
        msg = "expected an async function but got an async generator"
        assert msg in str(excinfo.value)
 
        # Make sure no references are kept around to keep anything alive
        del excinfo
 
 
def test_generic_function():
    @generic_function
    def test_func(arg):
        """Look, a docstring!"""
        return arg
 
    assert test_func is test_func[int] is test_func[int, str]
    assert test_func(42) == test_func[int](42) == 42
    assert test_func.__doc__ == "Look, a docstring!"
    assert test_func.__qualname__ == "test_generic_function.<locals>.test_func"
    assert test_func.__name__ == "test_func"
    assert test_func.__module__ == __name__
 
 
def test_final_metaclass():
    class FinalClass(metaclass=Final):
        pass
 
    with pytest.raises(TypeError):
 
        class SubClass(FinalClass):
            pass
 
 
def test_no_public_constructor_metaclass():
    class SpecialClass(metaclass=NoPublicConstructor):
        pass
 
    with pytest.raises(TypeError):
        SpecialClass()
 
    with pytest.raises(TypeError):
 
        class SubClass(SpecialClass):
            pass
 
    # Private constructor should not raise
    assert isinstance(SpecialClass._create(), SpecialClass)