import attr
|
import logging
|
import sys
|
import warnings
|
import weakref
|
|
from .._util import name_asyncgen
|
from . import _run
|
from .. import _core
|
|
# Used to log exceptions in async generator finalizers
|
ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors")
|
|
|
@attr.s(eq=False, slots=True)
|
class AsyncGenerators:
|
# Async generators are added to this set when first iterated. Any
|
# left after the main task exits will be closed before trio.run()
|
# returns. During most of the run, this is a WeakSet so GC works.
|
# During shutdown, when we're finalizing all the remaining
|
# asyncgens after the system nursery has been closed, it's a
|
# regular set so we don't have to deal with GC firing at
|
# unexpected times.
|
alive = attr.ib(factory=weakref.WeakSet)
|
|
# This collects async generators that get garbage collected during
|
# the one-tick window between the system nursery closing and the
|
# init task starting end-of-run asyncgen finalization.
|
trailing_needs_finalize = attr.ib(factory=set)
|
|
prev_hooks = attr.ib(init=False)
|
|
def install_hooks(self, runner):
|
def firstiter(agen):
|
if hasattr(_run.GLOBAL_RUN_CONTEXT, "task"):
|
self.alive.add(agen)
|
else:
|
# An async generator first iterated outside of a Trio
|
# task doesn't belong to Trio. Probably we're in guest
|
# mode and the async generator belongs to our host.
|
# The locals dictionary is the only good place to
|
# remember this fact, at least until
|
# https://bugs.python.org/issue40916 is implemented.
|
agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True
|
if self.prev_hooks.firstiter is not None:
|
self.prev_hooks.firstiter(agen)
|
|
def finalize_in_trio_context(agen, agen_name):
|
try:
|
runner.spawn_system_task(
|
self._finalize_one,
|
agen,
|
agen_name,
|
name=f"close asyncgen {agen_name} (abandoned)",
|
)
|
except RuntimeError:
|
# There is a one-tick window where the system nursery
|
# is closed but the init task hasn't yet made
|
# self.asyncgens a strong set to disable GC. We seem to
|
# have hit it.
|
self.trailing_needs_finalize.add(agen)
|
|
def finalizer(agen):
|
agen_name = name_asyncgen(agen)
|
try:
|
is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen")
|
except AttributeError: # pragma: no cover
|
is_ours = True
|
|
if is_ours:
|
runner.entry_queue.run_sync_soon(
|
finalize_in_trio_context, agen, agen_name
|
)
|
|
# Do this last, because it might raise an exception
|
# depending on the user's warnings filter. (That
|
# exception will be printed to the terminal and
|
# ignored, since we're running in GC context.)
|
warnings.warn(
|
f"Async generator {agen_name!r} was garbage collected before it "
|
f"had been exhausted. Surround its use in 'async with "
|
f"aclosing(...):' to ensure that it gets cleaned up as soon as "
|
f"you're done using it.",
|
ResourceWarning,
|
stacklevel=2,
|
source=agen,
|
)
|
else:
|
# Not ours -> forward to the host loop's async generator finalizer
|
if self.prev_hooks.finalizer is not None:
|
self.prev_hooks.finalizer(agen)
|
else:
|
# Host has no finalizer. Reimplement the default
|
# Python behavior with no hooks installed: throw in
|
# GeneratorExit, step once, raise RuntimeError if
|
# it doesn't exit.
|
closer = agen.aclose()
|
try:
|
# If the next thing is a yield, this will raise RuntimeError
|
# which we allow to propagate
|
closer.send(None)
|
except StopIteration:
|
pass
|
else:
|
# If the next thing is an await, we get here. Give a nicer
|
# error than the default "async generator ignored GeneratorExit"
|
raise RuntimeError(
|
f"Non-Trio async generator {agen_name!r} awaited something "
|
f"during finalization; install a finalization hook to "
|
f"support this, or wrap it in 'async with aclosing(...):'"
|
)
|
|
self.prev_hooks = sys.get_asyncgen_hooks()
|
sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer)
|
|
async def finalize_remaining(self, runner):
|
# This is called from init after shutting down the system nursery.
|
# The only tasks running at this point are init and
|
# the run_sync_soon task, and since the system nursery is closed,
|
# there's no way for user code to spawn more.
|
assert _core.current_task() is runner.init_task
|
assert len(runner.tasks) == 2
|
|
# To make async generator finalization easier to reason
|
# about, we'll shut down asyncgen garbage collection by turning
|
# the alive WeakSet into a regular set.
|
self.alive = set(self.alive)
|
|
# Process all pending run_sync_soon callbacks, in case one of
|
# them was an asyncgen finalizer that snuck in under the wire.
|
runner.entry_queue.run_sync_soon(runner.reschedule, runner.init_task)
|
await _core.wait_task_rescheduled(
|
lambda _: _core.Abort.FAILED # pragma: no cover
|
)
|
self.alive.update(self.trailing_needs_finalize)
|
self.trailing_needs_finalize.clear()
|
|
# None of the still-living tasks use async generators, so
|
# every async generator must be suspended at a yield point --
|
# there's no one to be doing the iteration. That's good,
|
# because aclose() only works on an asyncgen that's suspended
|
# at a yield point. (If it's suspended at an event loop trap,
|
# because someone is in the middle of iterating it, then you
|
# get a RuntimeError on 3.8+, and a nasty surprise on earlier
|
# versions due to https://bugs.python.org/issue32526.)
|
#
|
# However, once we start aclose() of one async generator, it
|
# might start fetching the next value from another, thus
|
# preventing us from closing that other (at least until
|
# aclose() of the first one is complete). This constraint
|
# effectively requires us to finalize the remaining asyncgens
|
# in arbitrary order, rather than doing all of them at the
|
# same time. On 3.8+ we could defer any generator with
|
# ag_running=True to a later batch, but that only catches
|
# the case where our aclose() starts after the user's
|
# asend()/etc. If our aclose() starts first, then the
|
# user's asend()/etc will raise RuntimeError, since they're
|
# probably not checking ag_running.
|
#
|
# It might be possible to allow some parallelized cleanup if
|
# we can determine that a certain set of asyncgens have no
|
# interdependencies, using gc.get_referents() and such.
|
# But just doing one at a time will typically work well enough
|
# (since each aclose() executes in a cancelled scope) and
|
# is much easier to reason about.
|
|
# It's possible that that cleanup code will itself create
|
# more async generators, so we iterate repeatedly until
|
# all are gone.
|
while self.alive:
|
batch = self.alive
|
self.alive = set()
|
for agen in batch:
|
await self._finalize_one(agen, name_asyncgen(agen))
|
|
def close(self):
|
sys.set_asyncgen_hooks(*self.prev_hooks)
|
|
async def _finalize_one(self, agen, name):
|
try:
|
# This shield ensures that finalize_asyncgen never exits
|
# with an exception, not even a Cancelled. The inside
|
# is cancelled so there's no deadlock risk.
|
with _core.CancelScope(shield=True) as cancel_scope:
|
cancel_scope.cancel()
|
await agen.aclose()
|
except BaseException:
|
ASYNCGEN_LOGGER.exception(
|
"Exception ignored during finalization of async generator %r -- "
|
"surround your use of the generator in 'async with aclosing(...):' "
|
"to raise exceptions like this in the context where they're generated",
|
name,
|
)
|