import sys import weakref import pytest from math import inf from functools import partial from async_generator import aclosing from ... import _core from .tutil import gc_collect_harder, buggy_pypy_asyncgens, restore_unraisablehook def test_asyncgen_basics(): collected = [] async def example(cause): try: try: yield 42 except GeneratorExit: pass await _core.checkpoint() except _core.Cancelled: assert "exhausted" not in cause task_name = _core.current_task().name assert cause in task_name or task_name == "" assert _core.current_effective_deadline() == -inf with pytest.raises(_core.Cancelled): await _core.checkpoint() collected.append(cause) else: assert "async_main" in _core.current_task().name assert "exhausted" in cause assert _core.current_effective_deadline() == inf await _core.checkpoint() collected.append(cause) saved = [] async def async_main(): # GC'ed before exhausted with pytest.warns( ResourceWarning, match="Async generator.*collected before.*exhausted" ): assert 42 == await example("abandoned").asend(None) gc_collect_harder() await _core.wait_all_tasks_blocked() assert collected.pop() == "abandoned" # aclosing() ensures it's cleaned up at point of use async with aclosing(example("exhausted 1")) as aiter: assert 42 == await aiter.asend(None) assert collected.pop() == "exhausted 1" # Also fine if you exhaust it at point of use async for val in example("exhausted 2"): assert val == 42 assert collected.pop() == "exhausted 2" gc_collect_harder() # No problems saving the geniter when using either of these patterns async with aclosing(example("exhausted 3")) as aiter: saved.append(aiter) assert 42 == await aiter.asend(None) assert collected.pop() == "exhausted 3" # Also fine if you exhaust it at point of use saved.append(example("exhausted 4")) async for val in saved[-1]: assert val == 42 assert collected.pop() == "exhausted 4" # Leave one referenced-but-unexhausted and make sure it gets cleaned up if buggy_pypy_asyncgens: collected.append("outlived run") else: saved.append(example("outlived run")) assert 42 == await saved[-1].asend(None) assert collected == [] _core.run(async_main) assert collected.pop() == "outlived run" for agen in saved: assert agen.ag_frame is None # all should now be exhausted async def test_asyncgen_throws_during_finalization(caplog): record = [] async def agen(): try: yield 1 finally: await _core.cancel_shielded_checkpoint() record.append("crashing") raise ValueError("oops") with restore_unraisablehook(): await agen().asend(None) gc_collect_harder() await _core.wait_all_tasks_blocked() assert record == ["crashing"] exc_type, exc_value, exc_traceback = caplog.records[0].exc_info assert exc_type is ValueError assert str(exc_value) == "oops" assert "during finalization of async generator" in caplog.records[0].message @pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") def test_firstiter_after_closing(): saved = [] record = [] async def funky_agen(): try: yield 1 except GeneratorExit: record.append("cleanup 1") raise try: yield 2 finally: record.append("cleanup 2") await funky_agen().asend(None) async def async_main(): aiter = funky_agen() saved.append(aiter) assert 1 == await aiter.asend(None) assert 2 == await aiter.asend(None) _core.run(async_main) assert record == ["cleanup 2", "cleanup 1"] @pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") def test_interdependent_asyncgen_cleanup_order(): saved = [] record = [] async def innermost(): try: yield 1 finally: await _core.cancel_shielded_checkpoint() record.append("innermost") async def agen(label, inner): try: yield await inner.asend(None) finally: # Either `inner` has already been cleaned up, or # we're about to exhaust it. Either way, we wind # up with `record` containing the labels in # innermost-to-outermost order. with pytest.raises(StopAsyncIteration): await inner.asend(None) record.append(label) async def async_main(): # This makes a chain of 101 interdependent asyncgens: # agen(99)'s cleanup will iterate agen(98)'s will iterate # ... agen(0)'s will iterate innermost()'s ag_chain = innermost() for idx in range(100): ag_chain = agen(idx, ag_chain) saved.append(ag_chain) assert 1 == await ag_chain.asend(None) assert record == [] _core.run(async_main) assert record == ["innermost"] + list(range(100)) @restore_unraisablehook() def test_last_minute_gc_edge_case(): saved = [] record = [] needs_retry = True async def agen(): try: yield 1 finally: record.append("cleaned up") def collect_at_opportune_moment(token): runner = _core._run.GLOBAL_RUN_CONTEXT.runner if runner.system_nursery._closed and isinstance( runner.asyncgens.alive, weakref.WeakSet ): saved.clear() record.append("final collection") gc_collect_harder() record.append("done") else: try: token.run_sync_soon(collect_at_opportune_moment, token) except _core.RunFinishedError: # pragma: no cover nonlocal needs_retry needs_retry = True async def async_main(): token = _core.current_trio_token() token.run_sync_soon(collect_at_opportune_moment, token) saved.append(agen()) await saved[-1].asend(None) # Actually running into the edge case requires that the run_sync_soon task # execute in between the system nursery's closure and the strong-ification # of runner.asyncgens. There's about a 25% chance that it doesn't # (if the run_sync_soon task runs before init on one tick and after init # on the next tick); if we try enough times, we can make the chance of # failure as small as we want. for attempt in range(50): needs_retry = False del record[:] del saved[:] _core.run(async_main) if needs_retry: # pragma: no cover if not buggy_pypy_asyncgens: assert record == ["cleaned up"] else: assert record == ["final collection", "done", "cleaned up"] break else: # pragma: no cover pytest.fail( f"Didn't manage to hit the trailing_finalizer_asyncgens case " f"despite trying {attempt} times" ) async def step_outside_async_context(aiter): # abort_fns run outside of task context, at least if they're # triggered by a deadline expiry rather than a direct # cancellation. Thus, an asyncgen first iterated inside one # will appear non-Trio, and since no other hooks were installed, # will use the last-ditch fallback handling (that tries to mimic # CPython's behavior with no hooks). # # NB: the strangeness with aiter being an attribute of abort_fn is # to make it as easy as possible to ensure we don't hang onto a # reference to aiter inside the guts of the run loop. def abort_fn(_): with pytest.raises(StopIteration, match="42"): abort_fn.aiter.asend(None).send(None) del abort_fn.aiter return _core.Abort.SUCCEEDED abort_fn.aiter = aiter async with _core.open_nursery() as nursery: nursery.start_soon(_core.wait_task_rescheduled, abort_fn) await _core.wait_all_tasks_blocked() nursery.cancel_scope.deadline = _core.current_time() @pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") async def test_fallback_when_no_hook_claims_it(capsys): async def well_behaved(): yield 42 async def yields_after_yield(): with pytest.raises(GeneratorExit): yield 42 yield 100 async def awaits_after_yield(): with pytest.raises(GeneratorExit): yield 42 await _core.cancel_shielded_checkpoint() with restore_unraisablehook(): await step_outside_async_context(well_behaved()) gc_collect_harder() assert capsys.readouterr().err == "" await step_outside_async_context(yields_after_yield()) gc_collect_harder() assert "ignored GeneratorExit" in capsys.readouterr().err await step_outside_async_context(awaits_after_yield()) gc_collect_harder() assert "awaited something during finalization" in capsys.readouterr().err @pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") def test_delegation_to_existing_hooks(): record = [] def my_firstiter(agen): record.append("firstiter " + agen.ag_frame.f_locals["arg"]) def my_finalizer(agen): record.append("finalizer " + agen.ag_frame.f_locals["arg"]) async def example(arg): try: yield 42 finally: with pytest.raises(_core.Cancelled): await _core.checkpoint() record.append("trio collected " + arg) async def async_main(): await step_outside_async_context(example("theirs")) assert 42 == await example("ours").asend(None) gc_collect_harder() assert record == ["firstiter theirs", "finalizer theirs"] record[:] = [] await _core.wait_all_tasks_blocked() assert record == ["trio collected ours"] with restore_unraisablehook(): old_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(my_firstiter, my_finalizer) try: _core.run(async_main) finally: assert sys.get_asyncgen_hooks() == (my_firstiter, my_finalizer) sys.set_asyncgen_hooks(*old_hooks)