zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import attr
import logging
import sys
import warnings
import weakref
 
from .._util import name_asyncgen
from . import _run
from .. import _core
 
# Used to log exceptions in async generator finalizers
ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors")
 
 
@attr.s(eq=False, slots=True)
class AsyncGenerators:
    # Async generators are added to this set when first iterated. Any
    # left after the main task exits will be closed before trio.run()
    # returns.  During most of the run, this is a WeakSet so GC works.
    # During shutdown, when we're finalizing all the remaining
    # asyncgens after the system nursery has been closed, it's a
    # regular set so we don't have to deal with GC firing at
    # unexpected times.
    alive = attr.ib(factory=weakref.WeakSet)
 
    # This collects async generators that get garbage collected during
    # the one-tick window between the system nursery closing and the
    # init task starting end-of-run asyncgen finalization.
    trailing_needs_finalize = attr.ib(factory=set)
 
    prev_hooks = attr.ib(init=False)
 
    def install_hooks(self, runner):
        def firstiter(agen):
            if hasattr(_run.GLOBAL_RUN_CONTEXT, "task"):
                self.alive.add(agen)
            else:
                # An async generator first iterated outside of a Trio
                # task doesn't belong to Trio. Probably we're in guest
                # mode and the async generator belongs to our host.
                # The locals dictionary is the only good place to
                # remember this fact, at least until
                # https://bugs.python.org/issue40916 is implemented.
                agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True
                if self.prev_hooks.firstiter is not None:
                    self.prev_hooks.firstiter(agen)
 
        def finalize_in_trio_context(agen, agen_name):
            try:
                runner.spawn_system_task(
                    self._finalize_one,
                    agen,
                    agen_name,
                    name=f"close asyncgen {agen_name} (abandoned)",
                )
            except RuntimeError:
                # There is a one-tick window where the system nursery
                # is closed but the init task hasn't yet made
                # self.asyncgens a strong set to disable GC. We seem to
                # have hit it.
                self.trailing_needs_finalize.add(agen)
 
        def finalizer(agen):
            agen_name = name_asyncgen(agen)
            try:
                is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen")
            except AttributeError:  # pragma: no cover
                is_ours = True
 
            if is_ours:
                runner.entry_queue.run_sync_soon(
                    finalize_in_trio_context, agen, agen_name
                )
 
                # Do this last, because it might raise an exception
                # depending on the user's warnings filter. (That
                # exception will be printed to the terminal and
                # ignored, since we're running in GC context.)
                warnings.warn(
                    f"Async generator {agen_name!r} was garbage collected before it "
                    f"had been exhausted. Surround its use in 'async with "
                    f"aclosing(...):' to ensure that it gets cleaned up as soon as "
                    f"you're done using it.",
                    ResourceWarning,
                    stacklevel=2,
                    source=agen,
                )
            else:
                # Not ours -> forward to the host loop's async generator finalizer
                if self.prev_hooks.finalizer is not None:
                    self.prev_hooks.finalizer(agen)
                else:
                    # Host has no finalizer.  Reimplement the default
                    # Python behavior with no hooks installed: throw in
                    # GeneratorExit, step once, raise RuntimeError if
                    # it doesn't exit.
                    closer = agen.aclose()
                    try:
                        # If the next thing is a yield, this will raise RuntimeError
                        # which we allow to propagate
                        closer.send(None)
                    except StopIteration:
                        pass
                    else:
                        # If the next thing is an await, we get here. Give a nicer
                        # error than the default "async generator ignored GeneratorExit"
                        raise RuntimeError(
                            f"Non-Trio async generator {agen_name!r} awaited something "
                            f"during finalization; install a finalization hook to "
                            f"support this, or wrap it in 'async with aclosing(...):'"
                        )
 
        self.prev_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer)
 
    async def finalize_remaining(self, runner):
        # This is called from init after shutting down the system nursery.
        # The only tasks running at this point are init and
        # the run_sync_soon task, and since the system nursery is closed,
        # there's no way for user code to spawn more.
        assert _core.current_task() is runner.init_task
        assert len(runner.tasks) == 2
 
        # To make async generator finalization easier to reason
        # about, we'll shut down asyncgen garbage collection by turning
        # the alive WeakSet into a regular set.
        self.alive = set(self.alive)
 
        # Process all pending run_sync_soon callbacks, in case one of
        # them was an asyncgen finalizer that snuck in under the wire.
        runner.entry_queue.run_sync_soon(runner.reschedule, runner.init_task)
        await _core.wait_task_rescheduled(
            lambda _: _core.Abort.FAILED  # pragma: no cover
        )
        self.alive.update(self.trailing_needs_finalize)
        self.trailing_needs_finalize.clear()
 
        # None of the still-living tasks use async generators, so
        # every async generator must be suspended at a yield point --
        # there's no one to be doing the iteration. That's good,
        # because aclose() only works on an asyncgen that's suspended
        # at a yield point.  (If it's suspended at an event loop trap,
        # because someone is in the middle of iterating it, then you
        # get a RuntimeError on 3.8+, and a nasty surprise on earlier
        # versions due to https://bugs.python.org/issue32526.)
        #
        # However, once we start aclose() of one async generator, it
        # might start fetching the next value from another, thus
        # preventing us from closing that other (at least until
        # aclose() of the first one is complete).  This constraint
        # effectively requires us to finalize the remaining asyncgens
        # in arbitrary order, rather than doing all of them at the
        # same time. On 3.8+ we could defer any generator with
        # ag_running=True to a later batch, but that only catches
        # the case where our aclose() starts after the user's
        # asend()/etc. If our aclose() starts first, then the
        # user's asend()/etc will raise RuntimeError, since they're
        # probably not checking ag_running.
        #
        # It might be possible to allow some parallelized cleanup if
        # we can determine that a certain set of asyncgens have no
        # interdependencies, using gc.get_referents() and such.
        # But just doing one at a time will typically work well enough
        # (since each aclose() executes in a cancelled scope) and
        # is much easier to reason about.
 
        # It's possible that that cleanup code will itself create
        # more async generators, so we iterate repeatedly until
        # all are gone.
        while self.alive:
            batch = self.alive
            self.alive = set()
            for agen in batch:
                await self._finalize_one(agen, name_asyncgen(agen))
 
    def close(self):
        sys.set_asyncgen_hooks(*self.prev_hooks)
 
    async def _finalize_one(self, agen, name):
        try:
            # This shield ensures that finalize_asyncgen never exits
            # with an exception, not even a Cancelled. The inside
            # is cancelled so there's no deadlock risk.
            with _core.CancelScope(shield=True) as cancel_scope:
                cancel_scope.cancel()
                await agen.aclose()
        except BaseException:
            ASYNCGEN_LOGGER.exception(
                "Exception ignored during finalization of async generator %r -- "
                "surround your use of the generator in 'async with aclosing(...):' "
                "to raise exceptions like this in the context where they're generated",
                name,
            )