import contextvars import functools import platform import sys import threading import time import types import warnings import weakref from contextlib import contextmanager, ExitStack from math import inf from textwrap import dedent import gc import attr import outcome import sniffio import pytest from .tutil import ( slow, check_sequence_matches, gc_collect_harder, ignore_coroutine_never_awaited_warnings, buggy_pypy_asyncgens, restore_unraisablehook, create_asyncio_future_in_new_loop, ) from ... import _core from ..._core._multierror import MultiError, NonBaseMultiError from .._run import DEADLINE_HEAP_MIN_PRUNE_THRESHOLD from ..._threads import to_thread_run_sync from ..._timeouts import sleep, fail_after from ...testing import ( wait_all_tasks_blocked, Sequencer, assert_checkpoints, ) if sys.version_info < (3, 11): from exceptiongroup import ExceptionGroup # slightly different from _timeouts.sleep_forever because it returns the value # its rescheduled with, which is really only useful for tests of # rescheduling... async def sleep_forever(): return await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED) def test_basic(): async def trivial(x): return x assert _core.run(trivial, 8) == 8 with pytest.raises(TypeError): # Missing an argument _core.run(trivial) with pytest.raises(TypeError): # Not an async function _core.run(lambda: None) async def trivial2(x): await _core.checkpoint() return x assert _core.run(trivial2, 1) == 1 def test_initial_task_error(): async def main(x): raise ValueError(x) with pytest.raises(ValueError) as excinfo: _core.run(main, 17) assert excinfo.value.args == (17,) def test_run_nesting(): async def inception(): async def main(): # pragma: no cover pass return _core.run(main) with pytest.raises(RuntimeError) as excinfo: _core.run(inception) assert "from inside" in str(excinfo.value) async def test_nursery_warn_use_async_with(): with pytest.raises(RuntimeError) as excinfo: on = _core.open_nursery() with on: pass # pragma: no cover excinfo.match( r"use 'async with open_nursery\(...\)', not 'with open_nursery\(...\)'" ) # avoid unawaited coro. async with on: pass async def test_nursery_main_block_error_basic(): exc = ValueError("whoops") with pytest.raises(ValueError) as excinfo: async with _core.open_nursery(): raise exc assert excinfo.value is exc async def test_child_crash_basic(): exc = ValueError("uh oh") async def erroring(): raise exc try: # nursery.__aexit__ propagates exception from child back to parent async with _core.open_nursery() as nursery: nursery.start_soon(erroring) except ValueError as e: assert e is exc async def test_basic_interleave(): async def looper(whoami, record): for i in range(3): record.append((whoami, i)) await _core.checkpoint() record = [] async with _core.open_nursery() as nursery: nursery.start_soon(looper, "a", record) nursery.start_soon(looper, "b", record) check_sequence_matches( record, [{("a", 0), ("b", 0)}, {("a", 1), ("b", 1)}, {("a", 2), ("b", 2)}] ) def test_task_crash_propagation(): looper_record = [] async def looper(): try: while True: await _core.checkpoint() except _core.Cancelled: print("looper cancelled") looper_record.append("cancelled") async def crasher(): raise ValueError("argh") async def main(): async with _core.open_nursery() as nursery: nursery.start_soon(looper) nursery.start_soon(crasher) with pytest.raises(ValueError) as excinfo: _core.run(main) assert looper_record == ["cancelled"] assert excinfo.value.args == ("argh",) def test_main_and_task_both_crash(): # If main crashes and there's also a task crash, then we get both in a # MultiError async def crasher(): raise ValueError async def main(): async with _core.open_nursery() as nursery: nursery.start_soon(crasher) raise KeyError with pytest.raises(MultiError) as excinfo: _core.run(main) print(excinfo.value) assert {type(exc) for exc in excinfo.value.exceptions} == { ValueError, KeyError, } def test_two_child_crashes(): async def crasher(etype): raise etype async def main(): async with _core.open_nursery() as nursery: nursery.start_soon(crasher, KeyError) nursery.start_soon(crasher, ValueError) with pytest.raises(MultiError) as excinfo: _core.run(main) assert {type(exc) for exc in excinfo.value.exceptions} == { ValueError, KeyError, } async def test_child_crash_wakes_parent(): async def crasher(): raise ValueError with pytest.raises(ValueError): async with _core.open_nursery() as nursery: nursery.start_soon(crasher) await sleep_forever() async def test_reschedule(): t1 = None t2 = None async def child1(): nonlocal t1, t2 t1 = _core.current_task() print("child1 start") x = await sleep_forever() print("child1 woke") assert x == 0 print("child1 rescheduling t2") _core.reschedule(t2, outcome.Error(ValueError())) print("child1 exit") async def child2(): nonlocal t1, t2 print("child2 start") t2 = _core.current_task() _core.reschedule(t1, outcome.Value(0)) print("child2 sleep") with pytest.raises(ValueError): await sleep_forever() print("child2 successful exit") async with _core.open_nursery() as nursery: nursery.start_soon(child1) # let t1 run and fall asleep await _core.checkpoint() nursery.start_soon(child2) async def test_current_time(): t1 = _core.current_time() # Windows clock is pretty low-resolution -- appveyor tests fail unless we # sleep for a bit here. time.sleep(time.get_clock_info("perf_counter").resolution) t2 = _core.current_time() assert t1 < t2 async def test_current_time_with_mock_clock(mock_clock): start = mock_clock.current_time() assert mock_clock.current_time() == _core.current_time() assert mock_clock.current_time() == _core.current_time() mock_clock.jump(3.14) assert start + 3.14 == mock_clock.current_time() == _core.current_time() async def test_current_clock(mock_clock): assert mock_clock is _core.current_clock() async def test_current_task(): parent_task = _core.current_task() async def child(): assert _core.current_task().parent_nursery.parent_task is parent_task async with _core.open_nursery() as nursery: nursery.start_soon(child) async def test_root_task(): root = _core.current_root_task() assert root.parent_nursery is root.eventual_parent_nursery is None def test_out_of_context(): with pytest.raises(RuntimeError): _core.current_task() with pytest.raises(RuntimeError): _core.current_time() async def test_current_statistics(mock_clock): # Make sure all the early startup stuff has settled down await wait_all_tasks_blocked() # A child that sticks around to make some interesting stats: async def child(): try: await sleep_forever() except _core.Cancelled: pass stats = _core.current_statistics() print(stats) # 2 system tasks + us assert stats.tasks_living == 3 assert stats.run_sync_soon_queue_size == 0 async with _core.open_nursery() as nursery: nursery.start_soon(child) await wait_all_tasks_blocked() token = _core.current_trio_token() token.run_sync_soon(lambda: None) token.run_sync_soon(lambda: None, idempotent=True) stats = _core.current_statistics() print(stats) # 2 system tasks + us + child assert stats.tasks_living == 4 # the exact value here might shift if we change how we do accounting # (currently it only counts tasks that we already know will be # runnable on the next pass), but still useful to at least test the # difference between now and after we wake up the child: assert stats.tasks_runnable == 0 assert stats.run_sync_soon_queue_size == 2 nursery.cancel_scope.cancel() stats = _core.current_statistics() print(stats) assert stats.tasks_runnable == 1 # Give the child a chance to die and the run_sync_soon a chance to clear await _core.checkpoint() await _core.checkpoint() with _core.CancelScope(deadline=_core.current_time() + 5): stats = _core.current_statistics() print(stats) assert stats.seconds_to_next_deadline == 5 stats = _core.current_statistics() print(stats) assert stats.seconds_to_next_deadline == inf async def test_cancel_scope_repr(mock_clock): scope = _core.CancelScope() assert "unbound" in repr(scope) with scope: assert "active" in repr(scope) scope.deadline = _core.current_time() - 1 assert "deadline is 1.00 seconds ago" in repr(scope) scope.deadline = _core.current_time() + 10 assert "deadline is 10.00 seconds from now" in repr(scope) # when not in async context, can't get the current time assert "deadline" not in await to_thread_run_sync(repr, scope) scope.cancel() assert "cancelled" in repr(scope) assert "exited" in repr(scope) def test_cancel_points(): async def main1(): with _core.CancelScope() as scope: await _core.checkpoint_if_cancelled() scope.cancel() with pytest.raises(_core.Cancelled): await _core.checkpoint_if_cancelled() _core.run(main1) async def main2(): with _core.CancelScope() as scope: await _core.checkpoint() scope.cancel() with pytest.raises(_core.Cancelled): await _core.checkpoint() _core.run(main2) async def main3(): with _core.CancelScope() as scope: scope.cancel() with pytest.raises(_core.Cancelled): await sleep_forever() _core.run(main3) async def main4(): with _core.CancelScope() as scope: scope.cancel() await _core.cancel_shielded_checkpoint() await _core.cancel_shielded_checkpoint() with pytest.raises(_core.Cancelled): await _core.checkpoint() _core.run(main4) async def test_cancel_edge_cases(): with _core.CancelScope() as scope: # Two cancels in a row -- idempotent scope.cancel() scope.cancel() await _core.checkpoint() assert scope.cancel_called assert scope.cancelled_caught with _core.CancelScope() as scope: # Check level-triggering scope.cancel() with pytest.raises(_core.Cancelled): await sleep_forever() with pytest.raises(_core.Cancelled): await sleep_forever() async def test_cancel_scope_multierror_filtering(): async def crasher(): raise KeyError try: with _core.CancelScope() as outer: try: async with _core.open_nursery() as nursery: # Two children that get cancelled by the nursery scope nursery.start_soon(sleep_forever) # t1 nursery.start_soon(sleep_forever) # t2 nursery.cancel_scope.cancel() with _core.CancelScope(shield=True): await wait_all_tasks_blocked() # One child that gets cancelled by the outer scope nursery.start_soon(sleep_forever) # t3 outer.cancel() # And one that raises a different error nursery.start_soon(crasher) # t4 # and then our __aexit__ also receives an outer Cancelled except MultiError as multi_exc: # Since the outer scope became cancelled before the # nursery block exited, all cancellations inside the # nursery block continue propagating to reach the # outer scope. assert len(multi_exc.exceptions) == 5 summary = {} for exc in multi_exc.exceptions: summary.setdefault(type(exc), 0) summary[type(exc)] += 1 assert summary == {_core.Cancelled: 4, KeyError: 1} raise except AssertionError: # pragma: no cover raise except BaseException as exc: # This is outside the outer scope, so all the Cancelled # exceptions should have been absorbed, leaving just a regular # KeyError from crasher() assert type(exc) is KeyError else: # pragma: no cover assert False async def test_precancelled_task(): # a task that gets spawned into an already-cancelled nursery should begin # execution (https://github.com/python-trio/trio/issues/41), but get a # cancelled error at its first blocking call. record = [] async def blocker(): record.append("started") await sleep_forever() async with _core.open_nursery() as nursery: nursery.cancel_scope.cancel() nursery.start_soon(blocker) assert record == ["started"] async def test_cancel_shielding(): with _core.CancelScope() as outer: with _core.CancelScope() as inner: await _core.checkpoint() outer.cancel() with pytest.raises(_core.Cancelled): await _core.checkpoint() assert inner.shield is False with pytest.raises(TypeError): inner.shield = "hello" assert inner.shield is False inner.shield = True assert inner.shield is True # shield protects us from 'outer' await _core.checkpoint() with _core.CancelScope() as innerest: innerest.cancel() # but it doesn't protect us from scope inside inner with pytest.raises(_core.Cancelled): await _core.checkpoint() await _core.checkpoint() inner.shield = False # can disable shield again with pytest.raises(_core.Cancelled): await _core.checkpoint() # re-enable shield inner.shield = True await _core.checkpoint() # shield doesn't protect us from inner itself inner.cancel() # This should now raise, but be absorbed by the inner scope await _core.checkpoint() assert inner.cancelled_caught # make sure that cancellation propagates immediately to all children async def test_cancel_inheritance(): record = set() async def leaf(ident): try: await sleep_forever() except _core.Cancelled: record.add(ident) async def worker(ident): async with _core.open_nursery() as nursery: nursery.start_soon(leaf, ident + "-l1") nursery.start_soon(leaf, ident + "-l2") async with _core.open_nursery() as nursery: nursery.start_soon(worker, "w1") nursery.start_soon(worker, "w2") nursery.cancel_scope.cancel() assert record == {"w1-l1", "w1-l2", "w2-l1", "w2-l2"} async def test_cancel_shield_abort(): with _core.CancelScope() as outer: async with _core.open_nursery() as nursery: outer.cancel() nursery.cancel_scope.shield = True # The outer scope is cancelled, but this task is protected by the # shield, so it manages to get to sleep record = [] async def sleeper(): record.append("sleeping") try: await sleep_forever() except _core.Cancelled: record.append("cancelled") nursery.start_soon(sleeper) await wait_all_tasks_blocked() assert record == ["sleeping"] # now when we unshield, it should abort the sleep. nursery.cancel_scope.shield = False # wait for the task to finish before entering the nursery # __aexit__, because __aexit__ could make it spuriously look like # this worked by cancelling the nursery scope. (When originally # written, without these last few lines, the test spuriously # passed, even though shield assignment was buggy.) with _core.CancelScope(shield=True): await wait_all_tasks_blocked() assert record == ["sleeping", "cancelled"] async def test_basic_timeout(mock_clock): start = _core.current_time() with _core.CancelScope() as scope: assert scope.deadline == inf scope.deadline = start + 1 assert scope.deadline == start + 1 assert not scope.cancel_called mock_clock.jump(2) await _core.checkpoint() await _core.checkpoint() await _core.checkpoint() assert not scope.cancel_called start = _core.current_time() with _core.CancelScope(deadline=start + 1) as scope: mock_clock.jump(2) await sleep_forever() # But then the scope swallowed the exception... but we can still see it # here: assert scope.cancel_called assert scope.cancelled_caught # changing deadline start = _core.current_time() with _core.CancelScope() as scope: await _core.checkpoint() scope.deadline = start + 10 await _core.checkpoint() mock_clock.jump(5) await _core.checkpoint() scope.deadline = start + 1 with pytest.raises(_core.Cancelled): await _core.checkpoint() with pytest.raises(_core.Cancelled): await _core.checkpoint() async def test_cancel_scope_nesting(): # Nested scopes: if two triggering at once, the outer one wins with _core.CancelScope() as scope1: with _core.CancelScope() as scope2: with _core.CancelScope() as scope3: scope3.cancel() scope2.cancel() await sleep_forever() assert scope3.cancel_called assert not scope3.cancelled_caught assert scope2.cancel_called assert scope2.cancelled_caught assert not scope1.cancel_called assert not scope1.cancelled_caught # shielding with _core.CancelScope() as scope1: with _core.CancelScope() as scope2: scope1.cancel() with pytest.raises(_core.Cancelled): await _core.checkpoint() with pytest.raises(_core.Cancelled): await _core.checkpoint() scope2.shield = True await _core.checkpoint() scope2.cancel() with pytest.raises(_core.Cancelled): await _core.checkpoint() # if a scope is pending, but then gets popped off the stack, then it # isn't delivered with _core.CancelScope() as scope: scope.cancel() await _core.cancel_shielded_checkpoint() await _core.checkpoint() assert not scope.cancelled_caught # Regression test for https://github.com/python-trio/trio/issues/1175 async def test_unshield_while_cancel_propagating(): with _core.CancelScope() as outer: with _core.CancelScope() as inner: outer.cancel() try: await _core.checkpoint() finally: inner.shield = True assert outer.cancelled_caught and not inner.cancelled_caught async def test_cancel_unbound(): async def sleep_until_cancelled(scope): with scope, fail_after(1): await sleep_forever() # Cancel before entry scope = _core.CancelScope() scope.cancel() async with _core.open_nursery() as nursery: nursery.start_soon(sleep_until_cancelled, scope) # Cancel after entry scope = _core.CancelScope() async with _core.open_nursery() as nursery: nursery.start_soon(sleep_until_cancelled, scope) await wait_all_tasks_blocked() scope.cancel() # Shield before entry scope = _core.CancelScope() scope.shield = True with _core.CancelScope() as outer, scope: outer.cancel() await _core.checkpoint() scope.shield = False with pytest.raises(_core.Cancelled): await _core.checkpoint() # Can't reuse with _core.CancelScope() as scope: await _core.checkpoint() scope.cancel() await _core.checkpoint() assert scope.cancel_called assert not scope.cancelled_caught with pytest.raises(RuntimeError) as exc_info: with scope: pass # pragma: no cover assert "single 'with' block" in str(exc_info.value) # Can't reenter with _core.CancelScope() as scope: with pytest.raises(RuntimeError) as exc_info: with scope: pass # pragma: no cover assert "single 'with' block" in str(exc_info.value) # Can't enter from multiple tasks simultaneously scope = _core.CancelScope() async def enter_scope(): with scope: await sleep_forever() async with _core.open_nursery() as nursery: nursery.start_soon(enter_scope, name="this one") await wait_all_tasks_blocked() with pytest.raises(RuntimeError) as exc_info: with scope: pass # pragma: no cover assert "single 'with' block" in str(exc_info.value) nursery.cancel_scope.cancel() # If not yet entered, cancel_called is true when the deadline has passed # even if cancel() hasn't been called yet scope = _core.CancelScope(deadline=_core.current_time() + 1) assert not scope.cancel_called scope.deadline -= 1 assert scope.cancel_called scope.deadline += 1 assert scope.cancel_called # never become un-cancelled async def test_cancel_scope_misnesting(): outer = _core.CancelScope() inner = _core.CancelScope() with ExitStack() as stack: stack.enter_context(outer) with inner: with pytest.raises(RuntimeError, match="still within its child"): stack.close() # No further error is raised when exiting the inner context # If there are other tasks inside the abandoned part of the cancel tree, # they get cancelled when the misnesting is detected async def task1(): with pytest.raises(_core.Cancelled): await sleep_forever() # Even if inside another cancel scope async def task2(): with _core.CancelScope(): with pytest.raises(_core.Cancelled): await sleep_forever() with ExitStack() as stack: stack.enter_context(_core.CancelScope()) async with _core.open_nursery() as nursery: nursery.start_soon(task1) nursery.start_soon(task2) await wait_all_tasks_blocked() with pytest.raises(RuntimeError, match="still within its child"): stack.close() # Variant that makes the child tasks direct children of the scope # that noticed the misnesting: nursery_mgr = _core.open_nursery() nursery = await nursery_mgr.__aenter__() try: nursery.start_soon(task1) nursery.start_soon(task2) nursery.start_soon(sleep_forever) await wait_all_tasks_blocked() nursery.cancel_scope.__exit__(None, None, None) finally: with pytest.raises(RuntimeError) as exc_info: await nursery_mgr.__aexit__(*sys.exc_info()) assert "which had already been exited" in str(exc_info.value) assert type(exc_info.value.__context__) is NonBaseMultiError assert len(exc_info.value.__context__.exceptions) == 3 cancelled_in_context = False for exc in exc_info.value.__context__.exceptions: assert isinstance(exc, RuntimeError) assert "closed before the task exited" in str(exc) cancelled_in_context |= isinstance(exc.__context__, _core.Cancelled) assert cancelled_in_context # for the sleep_forever # Trying to exit a cancel scope from an unrelated task raises an error # without affecting any state async def task3(task_status): with _core.CancelScope() as scope: task_status.started(scope) await sleep_forever() async with _core.open_nursery() as nursery: scope = await nursery.start(task3) with pytest.raises(RuntimeError, match="from unrelated"): scope.__exit__(None, None, None) scope.cancel() @slow async def test_timekeeping(): # probably a good idea to use a real clock for *one* test anyway... TARGET = 1.0 # give it a few tries in case of random CI server flakiness for _ in range(4): real_start = time.perf_counter() with _core.CancelScope() as scope: scope.deadline = _core.current_time() + TARGET await sleep_forever() real_duration = time.perf_counter() - real_start accuracy = real_duration / TARGET print(accuracy) # Actual time elapsed should always be >= target time # (== is possible depending on system behavior for time.perf_counter resolution if 1.0 <= accuracy < 2: # pragma: no branch break else: # pragma: no cover assert False async def test_failed_abort(): stubborn_task = [None] stubborn_scope = [None] record = [] async def stubborn_sleeper(): stubborn_task[0] = _core.current_task() with _core.CancelScope() as scope: stubborn_scope[0] = scope record.append("sleep") x = await _core.wait_task_rescheduled(lambda _: _core.Abort.FAILED) assert x == 1 record.append("woke") try: await _core.checkpoint_if_cancelled() except _core.Cancelled: record.append("cancelled") async with _core.open_nursery() as nursery: nursery.start_soon(stubborn_sleeper) await wait_all_tasks_blocked() assert record == ["sleep"] stubborn_scope[0].cancel() await wait_all_tasks_blocked() # cancel didn't wake it up assert record == ["sleep"] # wake it up again by hand _core.reschedule(stubborn_task[0], outcome.Value(1)) assert record == ["sleep", "woke", "cancelled"] @restore_unraisablehook() def test_broken_abort(): async def main(): # These yields are here to work around an annoying warning -- we're # going to crash the main loop, and if we (by chance) do this before # the run_sync_soon task runs for the first time, then Python gives us # a spurious warning about it not being awaited. (I mean, the warning # is correct, but here we're testing our ability to deliver a # semi-meaningful error after things have gone totally pear-shaped, so # it's not relevant.) By letting the run_sync_soon_task run first, we # avoid the warning. await _core.checkpoint() await _core.checkpoint() with _core.CancelScope() as scope: scope.cancel() # None is not a legal return value here await _core.wait_task_rescheduled(lambda _: None) with pytest.raises(_core.TrioInternalError): _core.run(main) # Because this crashes, various __del__ methods print complaints on # stderr. Make sure that they get run now, so the output is attached to # this test. gc_collect_harder() @restore_unraisablehook() def test_error_in_run_loop(): # Blow stuff up real good to check we at least get a TrioInternalError async def main(): task = _core.current_task() task._schedule_points = "hello!" await _core.checkpoint() with ignore_coroutine_never_awaited_warnings(): with pytest.raises(_core.TrioInternalError): _core.run(main) async def test_spawn_system_task(): record = [] async def system_task(x): record.append(("x", x)) record.append(("ki", _core.currently_ki_protected())) await _core.checkpoint() _core.spawn_system_task(system_task, 1) await wait_all_tasks_blocked() assert record == [("x", 1), ("ki", True)] # intentionally make a system task crash def test_system_task_crash(): async def crasher(): raise KeyError async def main(): _core.spawn_system_task(crasher) await sleep_forever() with pytest.raises(_core.TrioInternalError): _core.run(main) def test_system_task_crash_MultiError(): async def crasher1(): raise KeyError async def crasher2(): raise ValueError async def system_task(): async with _core.open_nursery() as nursery: nursery.start_soon(crasher1) nursery.start_soon(crasher2) async def main(): _core.spawn_system_task(system_task) await sleep_forever() with pytest.raises(_core.TrioInternalError) as excinfo: _core.run(main) me = excinfo.value.__cause__ assert isinstance(me, MultiError) assert len(me.exceptions) == 2 for exc in me.exceptions: assert isinstance(exc, (KeyError, ValueError)) def test_system_task_crash_plus_Cancelled(): # Set up a situation where a system task crashes with a # MultiError([Cancelled, ValueError]) async def crasher(): try: await sleep_forever() except _core.Cancelled: raise ValueError async def cancelme(): await sleep_forever() async def system_task(): async with _core.open_nursery() as nursery: nursery.start_soon(crasher) nursery.start_soon(cancelme) async def main(): _core.spawn_system_task(system_task) # then we exit, triggering a cancellation with pytest.raises(_core.TrioInternalError) as excinfo: _core.run(main) assert type(excinfo.value.__cause__) is ValueError def test_system_task_crash_KeyboardInterrupt(): async def ki(): raise KeyboardInterrupt async def main(): _core.spawn_system_task(ki) await sleep_forever() with pytest.raises(_core.TrioInternalError) as excinfo: _core.run(main) assert isinstance(excinfo.value.__cause__, KeyboardInterrupt) # This used to fail because checkpoint was a yield followed by an immediate # reschedule. So we had: # 1) this task yields # 2) this task is rescheduled # ... # 3) next iteration of event loop starts, runs timeouts # 4) this task has timed out # 5) ...but it's on the run queue, so the timeout is queued to be delivered # the next time that it's blocked. async def test_yield_briefly_checks_for_timeout(mock_clock): with _core.CancelScope(deadline=_core.current_time() + 5): await _core.checkpoint() with pytest.raises(_core.Cancelled): mock_clock.jump(10) await _core.checkpoint() # This tests that sys.exc_info is properly saved/restored as we swap between # tasks. It turns out that the interpreter automagically handles this for us # so there's no special code in Trio required to pass this test, but it's # still nice to know that it works :-). # # Update: it turns out I was right to be nervous! see the next test... async def test_exc_info(): record = [] seq = Sequencer() async def child1(): with pytest.raises(ValueError) as excinfo: try: async with seq(0): pass # we don't yield until seq(2) below record.append("child1 raise") raise ValueError("child1") except ValueError: record.append("child1 sleep") async with seq(2): pass assert "child2 wake" in record record.append("child1 re-raise") raise assert excinfo.value.__context__ is None record.append("child1 success") async def child2(): with pytest.raises(KeyError) as excinfo: async with seq(1): pass # we don't yield until seq(3) below assert "child1 sleep" in record record.append("child2 wake") assert sys.exc_info() == (None, None, None) try: raise KeyError("child2") except KeyError: record.append("child2 sleep again") async with seq(3): pass assert "child1 re-raise" in record record.append("child2 re-raise") raise assert excinfo.value.__context__ is None record.append("child2 success") async with _core.open_nursery() as nursery: nursery.start_soon(child1) nursery.start_soon(child2) assert record == [ "child1 raise", "child1 sleep", "child2 wake", "child2 sleep again", "child1 re-raise", "child1 success", "child2 re-raise", "child2 success", ] # Before CPython 3.9, using .throw() to raise an exception inside a # coroutine/generator causes the original exc_info state to be lost, so things # like re-raising and exception chaining are broken. # # https://bugs.python.org/issue29587 async def test_exc_info_after_yield_error(): child_task = None async def child(): nonlocal child_task child_task = _core.current_task() try: raise KeyError except Exception: try: await sleep_forever() except Exception: pass raise with pytest.raises(KeyError): async with _core.open_nursery() as nursery: nursery.start_soon(child) await wait_all_tasks_blocked() _core.reschedule(child_task, outcome.Error(ValueError())) # Similar to previous test -- if the ValueError() gets sent in via 'throw', # then Python's normal implicit chaining stuff is broken. async def test_exception_chaining_after_yield_error(): child_task = None async def child(): nonlocal child_task child_task = _core.current_task() try: raise KeyError except Exception: await sleep_forever() with pytest.raises(ValueError) as excinfo: async with _core.open_nursery() as nursery: nursery.start_soon(child) await wait_all_tasks_blocked() _core.reschedule(child_task, outcome.Error(ValueError())) assert isinstance(excinfo.value.__context__, KeyError) async def test_nursery_exception_chaining_doesnt_make_context_loops(): async def crasher(): raise KeyError with pytest.raises(MultiError) as excinfo: async with _core.open_nursery() as nursery: nursery.start_soon(crasher) raise ValueError # the MultiError should not have the KeyError or ValueError as context assert excinfo.value.__context__ is None def test_TrioToken_identity(): async def get_and_check_token(): token = _core.current_trio_token() # Two calls in the same run give the same object assert token is _core.current_trio_token() return token t1 = _core.run(get_and_check_token) t2 = _core.run(get_and_check_token) assert t1 is not t2 assert t1 != t2 assert hash(t1) != hash(t2) async def test_TrioToken_run_sync_soon_basic(): record = [] def cb(x): record.append(("cb", x)) token = _core.current_trio_token() token.run_sync_soon(cb, 1) assert not record await wait_all_tasks_blocked() assert record == [("cb", 1)] def test_TrioToken_run_sync_soon_too_late(): token = None async def main(): nonlocal token token = _core.current_trio_token() _core.run(main) assert token is not None with pytest.raises(_core.RunFinishedError): token.run_sync_soon(lambda: None) # pragma: no branch async def test_TrioToken_run_sync_soon_idempotent(): record = [] def cb(x): record.append(x) token = _core.current_trio_token() token.run_sync_soon(cb, 1) token.run_sync_soon(cb, 1, idempotent=True) token.run_sync_soon(cb, 1, idempotent=True) token.run_sync_soon(cb, 1, idempotent=True) token.run_sync_soon(cb, 2, idempotent=True) token.run_sync_soon(cb, 2, idempotent=True) await wait_all_tasks_blocked() assert len(record) == 3 assert sorted(record) == [1, 1, 2] # ordering test record = [] for _ in range(3): for i in range(100): token.run_sync_soon(cb, i, idempotent=True) await wait_all_tasks_blocked() # We guarantee FIFO assert record == list(range(100)) def test_TrioToken_run_sync_soon_idempotent_requeue(): # We guarantee that if a call has finished, queueing it again will call it # again. Due to the lack of synchronization, this effectively means that # we have to guarantee that once a call has *started*, queueing it again # will call it again. Also this is much easier to test :-) record = [] def redo(token): record.append(None) try: token.run_sync_soon(redo, token, idempotent=True) except _core.RunFinishedError: pass async def main(): token = _core.current_trio_token() token.run_sync_soon(redo, token, idempotent=True) await _core.checkpoint() await _core.checkpoint() await _core.checkpoint() _core.run(main) assert len(record) >= 2 def test_TrioToken_run_sync_soon_after_main_crash(): record = [] async def main(): token = _core.current_trio_token() # After main exits but before finally cleaning up, callback processed # normally token.run_sync_soon(lambda: record.append("sync-cb")) raise ValueError with pytest.raises(ValueError): _core.run(main) assert record == ["sync-cb"] def test_TrioToken_run_sync_soon_crashes(): record = set() async def main(): token = _core.current_trio_token() token.run_sync_soon(lambda: dict()["nope"]) # check that a crashing run_sync_soon callback doesn't stop further # calls to run_sync_soon token.run_sync_soon(lambda: record.add("2nd run_sync_soon ran")) try: await sleep_forever() except _core.Cancelled: record.add("cancelled!") with pytest.raises(_core.TrioInternalError) as excinfo: _core.run(main) assert type(excinfo.value.__cause__) is KeyError assert record == {"2nd run_sync_soon ran", "cancelled!"} async def test_TrioToken_run_sync_soon_FIFO(): N = 100 record = [] token = _core.current_trio_token() for i in range(N): token.run_sync_soon(lambda j: record.append(j), i) await wait_all_tasks_blocked() assert record == list(range(N)) def test_TrioToken_run_sync_soon_starvation_resistance(): # Even if we push callbacks in from callbacks, so that the callback queue # never empties out, then we still can't starve out other tasks from # running. token = None record = [] def naughty_cb(i): nonlocal token try: token.run_sync_soon(naughty_cb, i + 1) except _core.RunFinishedError: record.append(("run finished", i)) async def main(): nonlocal token token = _core.current_trio_token() token.run_sync_soon(naughty_cb, 0) record.append("starting") for _ in range(20): await _core.checkpoint() _core.run(main) assert len(record) == 2 assert record[0] == "starting" assert record[1][0] == "run finished" assert record[1][1] >= 19 def test_TrioToken_run_sync_soon_threaded_stress_test(): cb_counter = 0 def cb(): nonlocal cb_counter cb_counter += 1 def stress_thread(token): try: while True: token.run_sync_soon(cb) time.sleep(0) except _core.RunFinishedError: pass async def main(): token = _core.current_trio_token() thread = threading.Thread(target=stress_thread, args=(token,)) thread.start() for _ in range(10): start_value = cb_counter while cb_counter == start_value: await sleep(0.01) _core.run(main) print(cb_counter) async def test_TrioToken_run_sync_soon_massive_queue(): # There are edge cases in the wakeup fd code when the wakeup fd overflows, # so let's try to make that happen. This is also just a good stress test # in general. (With the current-as-of-2017-02-14 code using a socketpair # with minimal buffer, Linux takes 6 wakeups to fill the buffer and macOS # takes 1 wakeup. So 1000 is overkill if anything. Windows OTOH takes # ~600,000 wakeups, but has the same code paths...) COUNT = 1000 token = _core.current_trio_token() counter = [0] def cb(i): # This also tests FIFO ordering of callbacks assert counter[0] == i counter[0] += 1 for i in range(COUNT): token.run_sync_soon(cb, i) await wait_all_tasks_blocked() assert counter[0] == COUNT @pytest.mark.skipif(buggy_pypy_asyncgens, reason="PyPy 7.2 is buggy") def test_TrioToken_run_sync_soon_late_crash(): # Crash after system nursery is closed -- easiest way to do that is # from an async generator finalizer. record = [] saved = [] async def agen(): token = _core.current_trio_token() try: yield 1 finally: token.run_sync_soon(lambda: {}["nope"]) token.run_sync_soon(lambda: record.append("2nd ran")) async def main(): saved.append(agen()) await saved[-1].asend(None) record.append("main exiting") with pytest.raises(_core.TrioInternalError) as excinfo: _core.run(main) assert type(excinfo.value.__cause__) is KeyError assert record == ["main exiting", "2nd ran"] async def test_slow_abort_basic(): with _core.CancelScope() as scope: scope.cancel() with pytest.raises(_core.Cancelled): task = _core.current_task() token = _core.current_trio_token() def slow_abort(raise_cancel): result = outcome.capture(raise_cancel) token.run_sync_soon(_core.reschedule, task, result) return _core.Abort.FAILED await _core.wait_task_rescheduled(slow_abort) async def test_slow_abort_edge_cases(): record = [] async def slow_aborter(): task = _core.current_task() token = _core.current_trio_token() def slow_abort(raise_cancel): record.append("abort-called") result = outcome.capture(raise_cancel) token.run_sync_soon(_core.reschedule, task, result) return _core.Abort.FAILED with pytest.raises(_core.Cancelled): record.append("sleeping") await _core.wait_task_rescheduled(slow_abort) record.append("cancelled") # blocking again, this time it's okay, because we're shielded await _core.checkpoint() record.append("done") with _core.CancelScope() as outer1: with _core.CancelScope() as outer2: async with _core.open_nursery() as nursery: # So we have a task blocked on an operation that can't be # aborted immediately nursery.start_soon(slow_aborter) await wait_all_tasks_blocked() assert record == ["sleeping"] # And then we cancel it, so the abort callback gets run outer1.cancel() assert record == ["sleeping", "abort-called"] # In fact that happens twice! (This used to cause the abort # callback to be run twice) outer2.cancel() assert record == ["sleeping", "abort-called"] # But then before the abort finishes, the task gets shielded! nursery.cancel_scope.shield = True # Now we wait for the task to finish... # The cancellation was delivered, even though it was shielded assert record == ["sleeping", "abort-called", "cancelled", "done"] async def test_task_tree_introspection(): tasks = {} nurseries = {} async def parent(task_status=_core.TASK_STATUS_IGNORED): tasks["parent"] = _core.current_task() assert tasks["parent"].child_nurseries == [] async with _core.open_nursery() as nursery1: async with _core.open_nursery() as nursery2: assert tasks["parent"].child_nurseries == [nursery1, nursery2] assert tasks["parent"].child_nurseries == [] async with _core.open_nursery() as nursery: nurseries["parent"] = nursery await nursery.start(child1) # Upward links survive after tasks/nurseries exit assert nurseries["parent"].parent_task is tasks["parent"] assert tasks["child1"].parent_nursery is nurseries["parent"] assert nurseries["child1"].parent_task is tasks["child1"] assert tasks["child2"].parent_nursery is nurseries["child1"] nursery = _core.current_task().parent_nursery # Make sure that chaining eventually gives a nursery of None (and not, # for example, an error) while nursery is not None: t = nursery.parent_task nursery = t.parent_nursery async def child2(): tasks["child2"] = _core.current_task() assert tasks["parent"].child_nurseries == [nurseries["parent"]] assert nurseries["parent"].child_tasks == frozenset({tasks["child1"]}) assert tasks["child1"].child_nurseries == [nurseries["child1"]] assert nurseries["child1"].child_tasks == frozenset({tasks["child2"]}) assert tasks["child2"].child_nurseries == [] async def child1(task_status=_core.TASK_STATUS_IGNORED): me = tasks["child1"] = _core.current_task() assert me.parent_nursery.parent_task is tasks["parent"] assert me.parent_nursery is not nurseries["parent"] assert me.eventual_parent_nursery is nurseries["parent"] task_status.started() assert me.parent_nursery is nurseries["parent"] assert me.eventual_parent_nursery is None # Wait for the start() call to return and close its internal nursery, to # ensure consistent results in child2: await _core.wait_all_tasks_blocked() async with _core.open_nursery() as nursery: nurseries["child1"] = nursery nursery.start_soon(child2) async with _core.open_nursery() as nursery: nursery.start_soon(parent) # There are no pending starts, so no one should have a non-None # eventual_parent_nursery for task in tasks.values(): assert task.eventual_parent_nursery is None async def test_nursery_closure(): async def child1(nursery): # We can add new tasks to the nursery even after entering __aexit__, # so long as there are still tasks running nursery.start_soon(child2) async def child2(): pass async with _core.open_nursery() as nursery: nursery.start_soon(child1, nursery) # But once we've left __aexit__, the nursery is closed with pytest.raises(RuntimeError): nursery.start_soon(child2) async def test_spawn_name(): async def func1(expected): task = _core.current_task() assert expected in task.name async def func2(): # pragma: no cover pass async with _core.open_nursery() as nursery: for spawn_fn in [nursery.start_soon, _core.spawn_system_task]: spawn_fn(func1, "func1") spawn_fn(func1, "func2", name=func2) spawn_fn(func1, "func3", name="func3") spawn_fn(functools.partial(func1, "func1")) spawn_fn(func1, "object", name=object()) async def test_current_effective_deadline(mock_clock): assert _core.current_effective_deadline() == inf with _core.CancelScope(deadline=5) as scope1: with _core.CancelScope(deadline=10) as scope2: assert _core.current_effective_deadline() == 5 scope2.deadline = 3 assert _core.current_effective_deadline() == 3 scope2.deadline = 10 assert _core.current_effective_deadline() == 5 scope2.shield = True assert _core.current_effective_deadline() == 10 scope2.shield = False assert _core.current_effective_deadline() == 5 scope1.cancel() assert _core.current_effective_deadline() == -inf scope2.shield = True assert _core.current_effective_deadline() == 10 assert _core.current_effective_deadline() == -inf assert _core.current_effective_deadline() == inf def test_nice_error_on_bad_calls_to_run_or_spawn(): def bad_call_run(*args): _core.run(*args) def bad_call_spawn(*args): async def main(): async with _core.open_nursery() as nursery: nursery.start_soon(*args) _core.run(main) for bad_call in bad_call_run, bad_call_spawn: async def f(): # pragma: no cover pass with pytest.raises(TypeError, match="expecting an async function"): bad_call(f()) async def async_gen(arg): # pragma: no cover yield arg with pytest.raises( TypeError, match="expected an async function but got an async generator" ): bad_call(async_gen, 0) def test_calling_asyncio_function_gives_nice_error(): async def child_xyzzy(): await create_asyncio_future_in_new_loop() async def misguided(): await child_xyzzy() with pytest.raises(TypeError) as excinfo: _core.run(misguided) assert "asyncio" in str(excinfo.value) # The traceback should point to the location of the foreign await assert any( # pragma: no branch entry.name == "child_xyzzy" for entry in excinfo.traceback ) async def test_asyncio_function_inside_nursery_does_not_explode(): # Regression test for https://github.com/python-trio/trio/issues/552 with pytest.raises(TypeError) as excinfo: async with _core.open_nursery() as nursery: import asyncio nursery.start_soon(sleep_forever) await create_asyncio_future_in_new_loop() assert "asyncio" in str(excinfo.value) async def test_trivial_yields(): with assert_checkpoints(): await _core.checkpoint() with assert_checkpoints(): await _core.checkpoint_if_cancelled() await _core.cancel_shielded_checkpoint() with assert_checkpoints(): async with _core.open_nursery(): pass with _core.CancelScope() as cancel_scope: cancel_scope.cancel() with pytest.raises(MultiError) as excinfo: async with _core.open_nursery(): raise KeyError assert len(excinfo.value.exceptions) == 2 assert {type(e) for e in excinfo.value.exceptions} == { KeyError, _core.Cancelled, } async def test_nursery_start(autojump_clock): async def no_args(): # pragma: no cover pass # Errors in calling convention get raised immediately from start async with _core.open_nursery() as nursery: with pytest.raises(TypeError): await nursery.start(no_args) async def sleep_then_start(seconds, *, task_status=_core.TASK_STATUS_IGNORED): repr(task_status) # smoke test await sleep(seconds) task_status.started(seconds) await sleep(seconds) # Basic happy-path check: start waits for the task to call started(), then # returns, passes back the value, and the given nursery then waits for it # to exit. for seconds in [1, 2]: async with _core.open_nursery() as nursery: assert len(nursery.child_tasks) == 0 t0 = _core.current_time() assert await nursery.start(sleep_then_start, seconds) == seconds assert _core.current_time() - t0 == seconds assert len(nursery.child_tasks) == 1 assert _core.current_time() - t0 == 2 * seconds # Make sure TASK_STATUS_IGNORED works so task function can be called # directly t0 = _core.current_time() await sleep_then_start(3) assert _core.current_time() - t0 == 2 * 3 # calling started twice async def double_started(task_status=_core.TASK_STATUS_IGNORED): task_status.started() with pytest.raises(RuntimeError): task_status.started() async with _core.open_nursery() as nursery: await nursery.start(double_started) # child crashes before calling started -> error comes out of .start() async def raise_keyerror(task_status=_core.TASK_STATUS_IGNORED): raise KeyError("oops") async with _core.open_nursery() as nursery: with pytest.raises(KeyError): await nursery.start(raise_keyerror) # child exiting cleanly before calling started -> triggers a RuntimeError async def nothing(task_status=_core.TASK_STATUS_IGNORED): return async with _core.open_nursery() as nursery: with pytest.raises(RuntimeError) as excinfo: await nursery.start(nothing) assert "exited without calling" in str(excinfo.value) # if the call to start() is cancelled, then the call to started() does # nothing -- the child keeps executing under start(). The value it passed # is ignored; start() raises Cancelled. async def just_started(task_status=_core.TASK_STATUS_IGNORED): task_status.started("hi") async with _core.open_nursery() as nursery: with _core.CancelScope() as cs: cs.cancel() with pytest.raises(_core.Cancelled): await nursery.start(just_started) # and if after the no-op started(), the child crashes, the error comes out # of start() async def raise_keyerror_after_started(task_status=_core.TASK_STATUS_IGNORED): task_status.started() raise KeyError("whoopsiedaisy") async with _core.open_nursery() as nursery: with _core.CancelScope() as cs: cs.cancel() with pytest.raises(MultiError) as excinfo: await nursery.start(raise_keyerror_after_started) assert {type(e) for e in excinfo.value.exceptions} == { _core.Cancelled, KeyError, } # trying to start in a closed nursery raises an error immediately async with _core.open_nursery() as closed_nursery: pass t0 = _core.current_time() with pytest.raises(RuntimeError): await closed_nursery.start(sleep_then_start, 7) assert _core.current_time() == t0 async def test_task_nursery_stack(): task = _core.current_task() assert task._child_nurseries == [] async with _core.open_nursery() as nursery1: assert task._child_nurseries == [nursery1] with pytest.raises(KeyError): async with _core.open_nursery() as nursery2: assert task._child_nurseries == [nursery1, nursery2] raise KeyError assert task._child_nurseries == [nursery1] assert task._child_nurseries == [] async def test_nursery_start_with_cancelled_nursery(): # This function isn't testing task_status, it's using task_status as a # convenient way to get a nursery that we can test spawning stuff into. async def setup_nursery(task_status=_core.TASK_STATUS_IGNORED): async with _core.open_nursery() as nursery: task_status.started(nursery) await sleep_forever() # Calls started() while children are asleep, so we can make sure # that the cancellation machinery notices and aborts when a sleeping task # is moved into a cancelled scope. async def sleeping_children(fn, *, task_status=_core.TASK_STATUS_IGNORED): async with _core.open_nursery() as nursery: nursery.start_soon(sleep_forever) nursery.start_soon(sleep_forever) await wait_all_tasks_blocked() fn() task_status.started() # Cancelling the setup_nursery just *before* calling started() async with _core.open_nursery() as nursery: target_nursery = await nursery.start(setup_nursery) await target_nursery.start( sleeping_children, target_nursery.cancel_scope.cancel ) # Cancelling the setup_nursery just *after* calling started() async with _core.open_nursery() as nursery: target_nursery = await nursery.start(setup_nursery) await target_nursery.start(sleeping_children, lambda: None) target_nursery.cancel_scope.cancel() async def test_nursery_start_keeps_nursery_open(autojump_clock): async def sleep_a_bit(task_status=_core.TASK_STATUS_IGNORED): await sleep(2) task_status.started() await sleep(3) async with _core.open_nursery() as nursery1: t0 = _core.current_time() async with _core.open_nursery() as nursery2: # Start the 'start' call running in the background nursery1.start_soon(nursery2.start, sleep_a_bit) # Sleep a bit await sleep(1) # Start another one. nursery1.start_soon(nursery2.start, sleep_a_bit) # Then exit this nursery. At this point, there are no tasks # present in this nursery -- the only thing keeping it open is # that the tasks will be placed into it soon, when they call # started(). assert _core.current_time() - t0 == 6 # Check that it still works even if the task that the nursery is waiting # for ends up crashing, and never actually enters the nursery. async def sleep_then_crash(task_status=_core.TASK_STATUS_IGNORED): await sleep(7) raise KeyError async def start_sleep_then_crash(nursery): with pytest.raises(KeyError): await nursery.start(sleep_then_crash) async with _core.open_nursery() as nursery1: t0 = _core.current_time() async with _core.open_nursery() as nursery2: nursery1.start_soon(start_sleep_then_crash, nursery2) await wait_all_tasks_blocked() assert _core.current_time() - t0 == 7 async def test_nursery_explicit_exception(): with pytest.raises(KeyError): async with _core.open_nursery(): raise KeyError() async def test_nursery_stop_iteration(): async def fail(): raise ValueError try: async with _core.open_nursery() as nursery: nursery.start_soon(fail) raise StopIteration except MultiError as e: assert tuple(map(type, e.exceptions)) == (StopIteration, ValueError) async def test_nursery_stop_async_iteration(): class it: def __init__(self, count): self.count = count self.val = 0 async def __anext__(self): await sleep(0) val = self.val if val >= self.count: raise StopAsyncIteration self.val += 1 return val class async_zip: def __init__(self, *largs): self.nexts = [obj.__anext__ for obj in largs] async def _accumulate(self, f, items, i): items[i] = await f() def __aiter__(self): return self async def __anext__(self): nexts = self.nexts items = [None] * len(nexts) async with _core.open_nursery() as nursery: for i, f in enumerate(nexts): nursery.start_soon(self._accumulate, f, items, i) return items result = [] async for vals in async_zip(it(4), it(2)): result.append(vals) assert result == [[0, 0], [1, 1]] async def test_traceback_frame_removal(): async def my_child_task(): raise KeyError() try: # Trick: For now cancel/nursery scopes still leave a bunch of tb gunk # behind. But if there's a MultiError, they leave it on the MultiError, # which lets us get a clean look at the KeyError itself. Someday I # guess this will always be a MultiError (#611), but for now we can # force it by raising two exceptions. async with _core.open_nursery() as nursery: nursery.start_soon(my_child_task) nursery.start_soon(my_child_task) except MultiError as exc: first_exc = exc.exceptions[0] assert isinstance(first_exc, KeyError) # The top frame in the exception traceback should be inside the child # task, not trio/contextvars internals. And there's only one frame # inside the child task, so this will also detect if our frame-removal # is too eager. frame = first_exc.__traceback__.tb_frame assert frame.f_code is my_child_task.__code__ def test_contextvar_support(): var = contextvars.ContextVar("test") var.set("before") assert var.get() == "before" async def inner(): task = _core.current_task() assert task.context.get(var) == "before" assert var.get() == "before" var.set("after") assert var.get() == "after" assert var in task.context assert task.context.get(var) == "after" _core.run(inner) assert var.get() == "before" async def test_contextvar_multitask(): var = contextvars.ContextVar("test", default="hmmm") async def t1(): assert var.get() == "hmmm" var.set("hmmmm") assert var.get() == "hmmmm" async def t2(): assert var.get() == "hmmmm" async with _core.open_nursery() as n: n.start_soon(t1) await wait_all_tasks_blocked() assert var.get() == "hmmm" var.set("hmmmm") n.start_soon(t2) await wait_all_tasks_blocked() def test_system_task_contexts(): cvar = contextvars.ContextVar("qwilfish") cvar.set("water") async def system_task(): assert cvar.get() == "water" async def regular_task(): assert cvar.get() == "poison" async def inner(): async with _core.open_nursery() as nursery: cvar.set("poison") nursery.start_soon(regular_task) _core.spawn_system_task(system_task) await wait_all_tasks_blocked() _core.run(inner) def test_Nursery_init(): with pytest.raises(TypeError): _core._run.Nursery(None, None) async def test_Nursery_private_init(): # context manager creation should not raise async with _core.open_nursery() as nursery: assert False == nursery._closed def test_Nursery_subclass(): with pytest.raises(TypeError): class Subclass(_core._run.Nursery): pass def test_Cancelled_init(): with pytest.raises(TypeError): raise _core.Cancelled with pytest.raises(TypeError): _core.Cancelled() # private constructor should not raise _core.Cancelled._create() def test_Cancelled_str(): cancelled = _core.Cancelled._create() assert str(cancelled) == "Cancelled" def test_Cancelled_subclass(): with pytest.raises(TypeError): class Subclass(_core.Cancelled): pass def test_CancelScope_subclass(): with pytest.raises(TypeError): class Subclass(_core.CancelScope): pass def test_sniffio_integration(): with pytest.raises(sniffio.AsyncLibraryNotFoundError): sniffio.current_async_library() async def check_inside_trio(): assert sniffio.current_async_library() == "trio" _core.run(check_inside_trio) with pytest.raises(sniffio.AsyncLibraryNotFoundError): sniffio.current_async_library() async def test_Task_custom_sleep_data(): task = _core.current_task() assert task.custom_sleep_data is None task.custom_sleep_data = 1 assert task.custom_sleep_data == 1 await _core.checkpoint() assert task.custom_sleep_data is None @types.coroutine def async_yield(value): yield value async def test_permanently_detach_coroutine_object(): task = None pdco_outcome = None async def detachable_coroutine(task_outcome, yield_value): await sleep(0) nonlocal task, pdco_outcome task = _core.current_task() pdco_outcome = await outcome.acapture( _core.permanently_detach_coroutine_object, task_outcome ) await async_yield(yield_value) async with _core.open_nursery() as nursery: nursery.start_soon(detachable_coroutine, outcome.Value(None), "I'm free!") # If we get here then Trio thinks the task has exited... but the coroutine # is still iterable assert pdco_outcome is None assert task.coro.send("be free!") == "I'm free!" assert pdco_outcome == outcome.Value("be free!") with pytest.raises(StopIteration): task.coro.send(None) # Check the exception paths too task = None pdco_outcome = None with pytest.raises(KeyError): async with _core.open_nursery() as nursery: nursery.start_soon(detachable_coroutine, outcome.Error(KeyError()), "uh oh") throw_in = ValueError() assert task.coro.throw(throw_in) == "uh oh" assert pdco_outcome == outcome.Error(throw_in) with pytest.raises(StopIteration): task.coro.send(None) async def bad_detach(): async with _core.open_nursery(): with pytest.raises(RuntimeError) as excinfo: await _core.permanently_detach_coroutine_object(outcome.Value(None)) assert "open nurser" in str(excinfo.value) async with _core.open_nursery() as nursery: nursery.start_soon(bad_detach) async def test_detach_and_reattach_coroutine_object(): unrelated_task = None task = None async def unrelated_coroutine(): nonlocal unrelated_task unrelated_task = _core.current_task() async def reattachable_coroutine(): await sleep(0) nonlocal task task = _core.current_task() def abort_fn(_): # pragma: no cover return _core.Abort.FAILED got = await _core.temporarily_detach_coroutine_object(abort_fn) assert got == "not trio!" await async_yield(1) await async_yield(2) with pytest.raises(RuntimeError) as excinfo: await _core.reattach_detached_coroutine_object(unrelated_task, None) assert "does not match" in str(excinfo.value) await _core.reattach_detached_coroutine_object(task, "byebye") await sleep(0) async with _core.open_nursery() as nursery: nursery.start_soon(unrelated_coroutine) nursery.start_soon(reattachable_coroutine) await wait_all_tasks_blocked() assert unrelated_task is not None assert task is not None # Okay, it's detached. Here's our coroutine runner: assert task.coro.send("not trio!") == 1 assert task.coro.send(None) == 2 assert task.coro.send(None) == "byebye" # Now it's been reattached, and we can leave the nursery async def test_detached_coroutine_cancellation(): abort_fn_called = False task = None async def reattachable_coroutine(): await sleep(0) nonlocal task task = _core.current_task() def abort_fn(_): nonlocal abort_fn_called abort_fn_called = True return _core.Abort.FAILED await _core.temporarily_detach_coroutine_object(abort_fn) await _core.reattach_detached_coroutine_object(task, None) with pytest.raises(_core.Cancelled): await sleep(0) async with _core.open_nursery() as nursery: nursery.start_soon(reattachable_coroutine) await wait_all_tasks_blocked() assert task is not None nursery.cancel_scope.cancel() task.coro.send(None) assert abort_fn_called @restore_unraisablehook() def test_async_function_implemented_in_C(): # These used to crash because we'd try to mutate the coroutine object's # cr_frame, but C functions don't have Python frames. async def agen_fn(record): assert not _core.currently_ki_protected() record.append("the generator ran") yield run_record = [] agen = agen_fn(run_record) _core.run(agen.__anext__) assert run_record == ["the generator ran"] async def main(): start_soon_record = [] agen = agen_fn(start_soon_record) async with _core.open_nursery() as nursery: nursery.start_soon(agen.__anext__) assert start_soon_record == ["the generator ran"] _core.run(main) async def test_very_deep_cancel_scope_nesting(): # This used to crash with a RecursionError in CancelStatus.recalculate with ExitStack() as exit_stack: outermost_scope = _core.CancelScope() exit_stack.enter_context(outermost_scope) for _ in range(5000): exit_stack.enter_context(_core.CancelScope()) outermost_scope.cancel() async def test_cancel_scope_deadline_duplicates(): # This exercises an assert in Deadlines._prune, by intentionally creating # duplicate entries in the deadline heap. now = _core.current_time() with _core.CancelScope() as cscope: for _ in range(DEADLINE_HEAP_MIN_PRUNE_THRESHOLD * 2): cscope.deadline = now + 9998 cscope.deadline = now + 9999 await sleep(0.01) @pytest.mark.skipif( sys.implementation.name != "cpython", reason="Only makes sense with refcounting GC" ) async def test_simple_cancel_scope_usage_doesnt_create_cyclic_garbage(): # https://github.com/python-trio/trio/issues/1770 gc.collect() async def do_a_cancel(): with _core.CancelScope() as cscope: cscope.cancel() await sleep_forever() async def crasher(): raise ValueError old_flags = gc.get_debug() try: gc.collect() gc.set_debug(gc.DEBUG_SAVEALL) # cover outcome.Error.unwrap # (See https://github.com/python-trio/outcome/pull/29) await do_a_cancel() # cover outcome.Error.unwrap if unrolled_run hangs on to exception refs # (See https://github.com/python-trio/trio/pull/1864) await do_a_cancel() with pytest.raises(ValueError): async with _core.open_nursery() as nursery: # cover NurseryManager.__aexit__ nursery.start_soon(crasher) gc.collect() assert not gc.garbage finally: gc.set_debug(old_flags) gc.garbage.clear() @pytest.mark.skipif( sys.implementation.name != "cpython", reason="Only makes sense with refcounting GC" ) async def test_cancel_scope_exit_doesnt_create_cyclic_garbage(): # https://github.com/python-trio/trio/pull/2063 gc.collect() async def crasher(): raise ValueError old_flags = gc.get_debug() try: with pytest.raises(ValueError), _core.CancelScope() as outer: async with _core.open_nursery() as nursery: gc.collect() gc.set_debug(gc.DEBUG_SAVEALL) # One child that gets cancelled by the outer scope nursery.start_soon(sleep_forever) outer.cancel() # And one that raises a different error nursery.start_soon(crasher) # so that outer filters a Cancelled from the MultiError and # covers CancelScope.__exit__ (and NurseryManager.__aexit__) # (See https://github.com/python-trio/trio/pull/2063) gc.collect() assert not gc.garbage finally: gc.set_debug(old_flags) gc.garbage.clear() @pytest.mark.skipif( sys.implementation.name != "cpython", reason="Only makes sense with refcounting GC" ) async def test_nursery_cancel_doesnt_create_cyclic_garbage(): # https://github.com/python-trio/trio/issues/1770#issuecomment-730229423 def toggle_collected(): nonlocal collected collected = True collected = False gc.collect() old_flags = gc.get_debug() try: gc.set_debug(0) gc.collect() gc.set_debug(gc.DEBUG_SAVEALL) # cover Nursery._nested_child_finished async with _core.open_nursery() as nursery: nursery.cancel_scope.cancel() weakref.finalize(nursery, toggle_collected) del nursery # a checkpoint clears the nursery from the internals, apparently # TODO: stop event loop from hanging on to the nursery at this point await _core.checkpoint() assert collected gc.collect() assert not gc.garbage finally: gc.set_debug(old_flags) gc.garbage.clear() @pytest.mark.skipif( sys.implementation.name != "cpython", reason="Only makes sense with refcounting GC" ) async def test_locals_destroyed_promptly_on_cancel(): destroyed = False def finalizer(): nonlocal destroyed destroyed = True class A: pass async def task(): a = A() weakref.finalize(a, finalizer) await _core.checkpoint() async with _core.open_nursery() as nursery: nursery.start_soon(task) nursery.cancel_scope.cancel() assert destroyed def test_run_strict_exception_groups(): """ Test that nurseries respect the global context setting of strict_exception_groups. """ async def main(): async with _core.open_nursery(): raise Exception("foo") with pytest.raises(MultiError) as exc: _core.run(main, strict_exception_groups=True) assert len(exc.value.exceptions) == 1 assert type(exc.value.exceptions[0]) is Exception assert exc.value.exceptions[0].args == ("foo",) def test_run_strict_exception_groups_nursery_override(): """ Test that a nursery can override the global context setting of strict_exception_groups. """ async def main(): async with _core.open_nursery(strict_exception_groups=False): raise Exception("foo") with pytest.raises(Exception, match="foo"): _core.run(main, strict_exception_groups=True) async def test_nursery_strict_exception_groups(): """Test that strict exception groups can be enabled on a per-nursery basis.""" with pytest.raises(MultiError) as exc: async with _core.open_nursery(strict_exception_groups=True): raise Exception("foo") assert len(exc.value.exceptions) == 1 assert type(exc.value.exceptions[0]) is Exception assert exc.value.exceptions[0].args == ("foo",) async def test_nursery_collapse_strict(): """ Test that a single exception from a nested nursery with strict semantics doesn't get collapsed when CancelledErrors are stripped from it. """ async def raise_error(): raise RuntimeError("test error") with pytest.raises(MultiError) as exc: async with _core.open_nursery() as nursery: nursery.start_soon(sleep_forever) nursery.start_soon(raise_error) async with _core.open_nursery(strict_exception_groups=True) as nursery2: nursery2.start_soon(sleep_forever) nursery2.start_soon(raise_error) nursery.cancel_scope.cancel() exceptions = exc.value.exceptions assert len(exceptions) == 2 assert isinstance(exceptions[0], RuntimeError) assert isinstance(exceptions[1], MultiError) assert len(exceptions[1].exceptions) == 1 assert isinstance(exceptions[1].exceptions[0], RuntimeError) async def test_nursery_collapse_loose(): """ Test that a single exception from a nested nursery with loose semantics gets collapsed when CancelledErrors are stripped from it. """ async def raise_error(): raise RuntimeError("test error") with pytest.raises(MultiError) as exc: async with _core.open_nursery() as nursery: nursery.start_soon(sleep_forever) nursery.start_soon(raise_error) async with _core.open_nursery() as nursery2: nursery2.start_soon(sleep_forever) nursery2.start_soon(raise_error) nursery.cancel_scope.cancel() exceptions = exc.value.exceptions assert len(exceptions) == 2 assert isinstance(exceptions[0], RuntimeError) assert isinstance(exceptions[1], RuntimeError) async def test_cancel_scope_no_cancellederror(): """ Test that when a cancel scope encounters an exception group that does NOT contain a Cancelled exception, it will NOT set the ``cancelled_caught`` flag. """ with pytest.raises(ExceptionGroup): with _core.CancelScope() as scope: scope.cancel() raise ExceptionGroup("test", [RuntimeError(), RuntimeError()]) assert not scope.cancelled_caught