1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from math import inf
import time
 
import pytest
 
from trio import sleep
from ... import _core
from .. import wait_all_tasks_blocked
from .._mock_clock import MockClock
from .tutil import slow
 
 
def test_mock_clock():
    REAL_NOW = 123.0
    c = MockClock()
    c._real_clock = lambda: REAL_NOW
    repr(c)  # smoke test
    assert c.rate == 0
    assert c.current_time() == 0
    c.jump(1.2)
    assert c.current_time() == 1.2
    with pytest.raises(ValueError):
        c.jump(-1)
    assert c.current_time() == 1.2
    assert c.deadline_to_sleep_time(1.1) == 0
    assert c.deadline_to_sleep_time(1.2) == 0
    assert c.deadline_to_sleep_time(1.3) > 999999
 
    with pytest.raises(ValueError):
        c.rate = -1
    assert c.rate == 0
 
    c.rate = 2
    assert c.current_time() == 1.2
    REAL_NOW += 1
    assert c.current_time() == 3.2
    assert c.deadline_to_sleep_time(3.1) == 0
    assert c.deadline_to_sleep_time(3.2) == 0
    assert c.deadline_to_sleep_time(4.2) == 0.5
 
    c.rate = 0.5
    assert c.current_time() == 3.2
    assert c.deadline_to_sleep_time(3.1) == 0
    assert c.deadline_to_sleep_time(3.2) == 0
    assert c.deadline_to_sleep_time(4.2) == 2.0
 
    c.jump(0.8)
    assert c.current_time() == 4.0
    REAL_NOW += 1
    assert c.current_time() == 4.5
 
    c2 = MockClock(rate=3)
    assert c2.rate == 3
    assert c2.current_time() < 10
 
 
async def test_mock_clock_autojump(mock_clock):
    assert mock_clock.autojump_threshold == inf
 
    mock_clock.autojump_threshold = 0
    assert mock_clock.autojump_threshold == 0
 
    real_start = time.perf_counter()
 
    virtual_start = _core.current_time()
    for i in range(10):
        print("sleeping {} seconds".format(10 * i))
        await sleep(10 * i)
        print("woke up!")
        assert virtual_start + 10 * i == _core.current_time()
        virtual_start = _core.current_time()
 
    real_duration = time.perf_counter() - real_start
    print("Slept {} seconds in {} seconds".format(10 * sum(range(10)), real_duration))
    assert real_duration < 1
 
    mock_clock.autojump_threshold = 0.02
    t = _core.current_time()
    # this should wake up before the autojump threshold triggers, so time
    # shouldn't change
    await wait_all_tasks_blocked()
    assert t == _core.current_time()
    # this should too
    await wait_all_tasks_blocked(0.01)
    assert t == _core.current_time()
 
    # set up a situation where the autojump task is blocked for a long long
    # time, to make sure that cancel-and-adjust-threshold logic is working
    mock_clock.autojump_threshold = 10000
    await wait_all_tasks_blocked()
    mock_clock.autojump_threshold = 0
    # if the above line didn't take affect immediately, then this would be
    # bad:
    await sleep(100000)
 
 
async def test_mock_clock_autojump_interference(mock_clock):
    mock_clock.autojump_threshold = 0.02
 
    mock_clock2 = MockClock()
    # messing with the autojump threshold of a clock that isn't actually
    # installed in the run loop shouldn't do anything.
    mock_clock2.autojump_threshold = 0.01
 
    # if the autojump_threshold of 0.01 were in effect, then the next line
    # would block forever, as the autojump task kept waking up to try to
    # jump the clock.
    await wait_all_tasks_blocked(0.015)
 
    # but the 0.02 limit does apply
    await sleep(100000)
 
 
def test_mock_clock_autojump_preset():
    # Check that we can set the autojump_threshold before the clock is
    # actually in use, and it gets picked up
    mock_clock = MockClock(autojump_threshold=0.1)
    mock_clock.autojump_threshold = 0.01
    real_start = time.perf_counter()
    _core.run(sleep, 10000, clock=mock_clock)
    assert time.perf_counter() - real_start < 1
 
 
async def test_mock_clock_autojump_0_and_wait_all_tasks_blocked_0(mock_clock):
    # Checks that autojump_threshold=0 doesn't interfere with
    # calling wait_all_tasks_blocked with the default cushion=0.
 
    mock_clock.autojump_threshold = 0
 
    record = []
 
    async def sleeper():
        await sleep(100)
        record.append("yawn")
 
    async def waiter():
        await wait_all_tasks_blocked()
        record.append("waiter woke")
        await sleep(1000)
        record.append("waiter done")
 
    async with _core.open_nursery() as nursery:
        nursery.start_soon(sleeper)
        nursery.start_soon(waiter)
 
    assert record == ["waiter woke", "yawn", "waiter done"]
 
 
@slow
async def test_mock_clock_autojump_0_and_wait_all_tasks_blocked_nonzero(mock_clock):
    # Checks that autojump_threshold=0 doesn't interfere with
    # calling wait_all_tasks_blocked with a non-zero cushion.
 
    mock_clock.autojump_threshold = 0
 
    record = []
 
    async def sleeper():
        await sleep(100)
        record.append("yawn")
 
    async def waiter():
        await wait_all_tasks_blocked(1)
        record.append("waiter done")
 
    async with _core.open_nursery() as nursery:
        nursery.start_soon(sleeper)
        nursery.start_soon(waiter)
 
    assert record == ["waiter done", "yawn"]