I was pairing with a colleague on some asynchronous streaming abstractions1 implemented in Python. I realized that there are some pretty significant footguns with predictable cleanup in async generators.
In fact, the same footguns are present in normal generators, but don't cause quite as many issues as in async generators. Crucially, we can observe unexpected cleanup behavior when we don't fully consume a generator.
Let's take a look!
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
def gen() -> Generator[int]:
# Contrived example wrapping an infinite iterator in some cleanup code.
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
generator = gen()
# We break out of this loop early, meaning we have not fully consumed `generator`.
for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# Perform some expensive operation.
time.sleep(2)
if __name__ == "__main__":
main()
When running this, the output looks something like this:
0
1
2
3
4
finished iterating
cleaned up
The order of these print statements suggests that the cleanup code only gets executed at the exit of the function (e.g. when the reference count of the generator object is decremented to 0
).
We can do a little bit better by inlining the call to gen
or explicitly using the del
statement to ensure that the generator is cleaned up before we start some expensive operation. In this contrived example, it doesn't seem like that big of a deal, but it can often be desirable to relinquish resources (such as open file descriptors or database connections) as soon as we're finished with them.
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
def gen() -> Generator[int]:
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
for elem in gen():
print(elem)
if elem > 3:
break
print("finished iterating")
# Perform some expensive operation.
time.sleep(2)
if __name__ == "__main__":
main()
The output this time:
0
1
2
3
4
cleaned up
finished iterating
So we clean up first! However, this relies pretty heavily on some implementation details of CPython. In fact, when I run this same code with pypy
, the cleanup never gets run2! So what options do we have?
Let's take a look at some of the code in the standard library; specifically, let's look at Lib/_collections_abc.py
to get a better sense of what operations may be available on generators.
This snippet seems promising:
class Generator(Iterator):
__slots__ = ()
def __next__(self):
"""Return the next item from the generator.
When exhausted, raise StopIteration.
"""
return self.send(None)
@abstractmethod
def send(self, value):
"""Send a value into the generator.
Return next yielded value or raise StopIteration.
"""
raise StopIteration
@abstractmethod
def throw(self, typ, val=None, tb=None):
"""Raise an exception in the generator.
Return next yielded value or raise StopIteration.
"""
if val is None:
if tb is None:
raise typ
val = typ()
if tb is not None:
val = val.with_traceback(tb)
raise val
def close(self):
"""Raise GeneratorExit inside generator.
"""
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")
@classmethod
def __subclasshook__(cls, C):
if cls is Generator:
return _check_methods(C, '__iter__', '__next__',
'send', 'throw', 'close')
return NotImplemented
In particular, this suggests that generators have a close
method!
So what happens if we explicitly invoke close
on our generator after we've finished iteration?
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
def gen() -> Generator[int]:
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
generator = gen()
for elem in generator:
print(elem)
if elem > 3:
break
generator.close()
print("finished iterating")
# Perform some expensive operation.
time.sleep(2)
if __name__ == "__main__":
main()
Running with both CPython and pypy
seems to work as desired! But we can even do a little bit better using the closing
context manager from contextlib
:
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
from contextlib import closing
def gen() -> Generator[int]:
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
with closing(gen()) as generator:
for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# Perform some expensive operation.
time.sleep(2)
if __name__ == "__main__":
main()
Even when an exception is raised out of the body of the for
loop, we'll still invoke the generator cleanup.
Cool, so we have finally arrived at a sensible idiom for ensuring that we invoke cleanup code with reasonable confidence (as much confidence as the chosen Python runtime can provide, at least).
In practice, very few people write code like this; CPython cleanup behavior is fairly predictable. But what happens when we venture into "asyncland"? Here's a similar example:
from __future__ import annotations
import asyncio
import itertools
from collections.abc import AsyncGenerator
async def gen() -> AsyncGenerator[int]:
try:
# `yield from` is not supported in async generators
for x in itertools.count(0):
yield x
finally:
# Required to enable "cancellation"
await asyncio.sleep(0)
print("Clean up")
async def main() -> None:
generator = gen()
async for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# Perform some expensive operation.
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
Running this with CPython demonstrates an issue very similar to what we observed when running the normal generator examples with pypy
: the cleanup code is simply never run! But wait, when we run this using pypy
, the cleanup code gets run?!
This is related to how async generator cleanup is managed: the event loop is required to register async generator hooks so that the language runtime can communicate with the event loop when an async generator object gets "finalized" (the work associated with cleaning up an unused object during garbage collection).
If you're curious why the await asyncio.sleep(0)
is there: asyncio.run
will invoke cancel
on outstanding tasks once the provided coroutine has finished execution. The async generator cleanup task is one such task. When omitting the asyncio.sleep(0)
, the print statement is executed.
Let's make the similar adjustment of inlining the gen
call into the async for
loop and see what happens:
from __future__ import annotations
import asyncio
import itertools
from collections.abc import AsyncGenerator
async def gen() -> AsyncGenerator[int]:
try:
for x in itertools.count(0):
yield x
finally:
# Required for "reasons"
await asyncio.sleep(0)
print("Clean up")
async def main() -> None:
async for elem in gen():
print(elem)
if elem > 3:
break
print("finished iterating")
# Perform some expensive operation.
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
With CPython, the cleanup is now executed. This is because the generator is finalized as we exit the async for
loop, so the cleanup task is scheduled before the await syncio.sleep(1)
which allows the event loop to "work on" the cleanup task prior to the completion of the coroutine. With pypy
, the cleanup is still executed, but due to the differences in garbage collection behavior, only after the await asyncio.sleep(1)
completes.
Finally, let's take a look at using a similar explicit cleanup idiom with async code. The contextlib
library provides an asynchronous variant of the closing
context manager called aclosing
:
from __future__ import annotations
import asyncio
import itertools
from collections.abc import AsyncGenerator
from contextlib import aclosing
async def gen() -> AsyncGenerator[int]:
try:
for x in itertools.count(0):
yield x
finally:
# Required for "reasons"
await asyncio.sleep(0)
print("Clean up")
async def main() -> None:
async with aclosing(gen()) as generator:
async for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# Perform some expensive operation.
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
This code has consistent, reliable behavior across both CPython and pypy
.
There are some extra reasons why explicit cleanup code is especially important in async code:
- different event loop implementations may implement async generator hooks differently; so there is more surface area for weird behavior (see some of the linked PyScript examples which use Pyodide's webloop)
- cleanup code that uses runtime context (e.g. inspects the currently running async task) will observe the runtime context of the tasks started by the event loop's async generator hooks
Conclusion
It's probably a reasonably good practice to follow predictable cleanup idioms when using generators (async or not). Practically, it seems unlikely that most folks will use these idioms with normal generators, but you should really consider applying these idioms when interacting with async generators.
I learned a ton while researching this post. Async generators are an interesting example of a language feature that requires the language runtime to work in tandem with an async event loop. Async generator cleanup used to be magic but now I think I have a reasonable mental model of what is going on and have a better sense of the idioms that might help to ensure predictable behavior.
Additional reading
- PEP 525: The PEP that describes async generators and their implementation. Includes details about async generator hooks.
- PEP 533: A "deferred" PEP that proposes extensions to synchronous and asynchronous iterator protocols to support deterministic cleanup in for loops.
- contextlib documentation: The first place that I noticed the
async with aclosing(gen()) as generator:
idiom.
Lots of "streaming" when working with language models to improve UX of high-latency completions.
CPython leverages reference counting with a generation garbage collector only for dealing with cyclic references. PyPy uses its own incremental garbage collector (and no reference counting). PyPy's docs provide a reasonable discussion of some of the differences.