samgeo.codes

Resource management and generators in Python

I was pairing with a colleague on some asynchronous streaming abstractions1 implemented in Python. I realized that there are some pretty significant footguns with predictable cleanup in async generators.

In fact, the same footguns are present in normal generators, but don't cause quite as many issues as in async generators. Crucially, we can observe unexpected cleanup behavior when we don't fully consume a generator.

Let's take a look!

from __future__ import annotations

import itertools
import time
from collections.abc import Generator


def gen() -> Generator[int]:
    # Contrived example wrapping an infinite iterator in some cleanup code.
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    generator = gen()
    # We break out of this loop early, meaning we have not fully consumed `generator`.
    for elem in generator:
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # Perform some expensive operation.
    time.sleep(2)


if __name__ == "__main__":
    main()

Run with PyScript

When running this, the output looks something like this:

0
1
2
3
4
finished iterating
cleaned up

The order of these print statements suggests that the cleanup code only gets executed at the exit of the function (e.g. when the reference count of the generator object is decremented to 0).

We can do a little bit better by inlining the call to gen or explicitly using the del statement to ensure that the generator is cleaned up before we start some expensive operation. In this contrived example, it doesn't seem like that big of a deal, but it can often be desirable to relinquish resources (such as open file descriptors or database connections) as soon as we're finished with them.

from __future__ import annotations

import itertools
import time
from collections.abc import Generator


def gen() -> Generator[int]:
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    for elem in gen():
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # Perform some expensive operation.
    time.sleep(2)


if __name__ == "__main__":
    main()

Run with PyScript

The output this time:

0
1
2
3
4
cleaned up
finished iterating

So we clean up first! However, this relies pretty heavily on some implementation details of CPython. In fact, when I run this same code with pypy, the cleanup never gets run2! So what options do we have?

Let's take a look at some of the code in the standard library; specifically, let's look at Lib/_collections_abc.py to get a better sense of what operations may be available on generators.

This snippet seems promising:

class Generator(Iterator):

    __slots__ = ()

    def __next__(self):
        """Return the next item from the generator.
        When exhausted, raise StopIteration.
        """
        return self.send(None)

    @abstractmethod
    def send(self, value):
        """Send a value into the generator.
        Return next yielded value or raise StopIteration.
        """
        raise StopIteration

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        """Raise an exception in the generator.
        Return next yielded value or raise StopIteration.
        """
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val

    def close(self):
        """Raise GeneratorExit inside generator.
        """
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("generator ignored GeneratorExit")

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Generator:
            return _check_methods(C, '__iter__', '__next__',
                                  'send', 'throw', 'close')
        return NotImplemented

In particular, this suggests that generators have a close method!

So what happens if we explicitly invoke close on our generator after we've finished iteration?

from __future__ import annotations

import itertools
import time
from collections.abc import Generator


def gen() -> Generator[int]:
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    generator = gen()
    for elem in generator:
        print(elem)
        if elem > 3:
            break
    generator.close()
    print("finished iterating")
    # Perform some expensive operation.
    time.sleep(2)


if __name__ == "__main__":
    main()

Run with PyScript

Running with both CPython and pypy seems to work as desired! But we can even do a little bit better using the closing context manager from contextlib:

from __future__ import annotations

import itertools
import time
from collections.abc import Generator
from contextlib import closing


def gen() -> Generator[int]:
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    with closing(gen()) as generator:
        for elem in generator:
            print(elem)
            if elem > 3:
                break
    print("finished iterating")
    # Perform some expensive operation.
    time.sleep(2)


if __name__ == "__main__":
    main()

Run with PyScript

Even when an exception is raised out of the body of the for loop, we'll still invoke the generator cleanup.

Cool, so we have finally arrived at a sensible idiom for ensuring that we invoke cleanup code with reasonable confidence (as much confidence as the chosen Python runtime can provide, at least).

In practice, very few people write code like this; CPython cleanup behavior is fairly predictable. But what happens when we venture into "asyncland"? Here's a similar example:

from __future__ import annotations

import asyncio
import itertools
from collections.abc import AsyncGenerator


async def gen() -> AsyncGenerator[int]:
    try:
        # `yield from` is not supported in async generators
        for x in itertools.count(0):
            yield x
    finally:
        # Required to enable "cancellation"
        await asyncio.sleep(0)
        print("Clean up")


async def main() -> None:
    generator = gen()
    async for elem in generator:
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # Perform some expensive operation.
    await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

Run with PyScript

Running this with CPython demonstrates an issue very similar to what we observed when running the normal generator examples with pypy: the cleanup code is simply never run! But wait, when we run this using pypy, the cleanup code gets run?!

This is related to how async generator cleanup is managed: the event loop is required to register async generator hooks so that the language runtime can communicate with the event loop when an async generator object gets "finalized" (the work associated with cleaning up an unused object during garbage collection).

If you're curious why the await asyncio.sleep(0) is there: asyncio.run will invoke cancel on outstanding tasks once the provided coroutine has finished execution. The async generator cleanup task is one such task. When omitting the asyncio.sleep(0), the print statement is executed.

Let's make the similar adjustment of inlining the gen call into the async for loop and see what happens:

from __future__ import annotations

import asyncio
import itertools
from collections.abc import AsyncGenerator


async def gen() -> AsyncGenerator[int]:
    try:
        for x in itertools.count(0):
            yield x
    finally:
        # Required for "reasons"
        await asyncio.sleep(0)
        print("Clean up")


async def main() -> None:
    async for elem in gen():
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # Perform some expensive operation.
    await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

Run with PyScript

With CPython, the cleanup is now executed. This is because the generator is finalized as we exit the async for loop, so the cleanup task is scheduled before the await syncio.sleep(1) which allows the event loop to "work on" the cleanup task prior to the completion of the coroutine. With pypy, the cleanup is still executed, but due to the differences in garbage collection behavior, only after the await asyncio.sleep(1) completes.

Finally, let's take a look at using a similar explicit cleanup idiom with async code. The contextlib library provides an asynchronous variant of the closing context manager called aclosing:

from __future__ import annotations

import asyncio
import itertools
from collections.abc import AsyncGenerator
from contextlib import aclosing


async def gen() -> AsyncGenerator[int]:
    try:
        for x in itertools.count(0):
            yield x
    finally:
        # Required for "reasons"
        await asyncio.sleep(0)
        print("Clean up")


async def main() -> None:
    async with aclosing(gen()) as generator:
        async for elem in generator:
            print(elem)
            if elem > 3:
                break
    print("finished iterating")
    # Perform some expensive operation.
    await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

Run with PyScript

This code has consistent, reliable behavior across both CPython and pypy.

There are some extra reasons why explicit cleanup code is especially important in async code:

  • different event loop implementations may implement async generator hooks differently; so there is more surface area for weird behavior (see some of the linked PyScript examples which use Pyodide's webloop)
  • cleanup code that uses runtime context (e.g. inspects the currently running async task) will observe the runtime context of the tasks started by the event loop's async generator hooks

Conclusion

It's probably a reasonably good practice to follow predictable cleanup idioms when using generators (async or not). Practically, it seems unlikely that most folks will use these idioms with normal generators, but you should really consider applying these idioms when interacting with async generators.

I learned a ton while researching this post. Async generators are an interesting example of a language feature that requires the language runtime to work in tandem with an async event loop. Async generator cleanup used to be magic but now I think I have a reasonable mental model of what is going on and have a better sense of the idioms that might help to ensure predictable behavior.

Additional reading

  • PEP 525: The PEP that describes async generators and their implementation. Includes details about async generator hooks.
  • PEP 533: A "deferred" PEP that proposes extensions to synchronous and asynchronous iterator protocols to support deterministic cleanup in for loops.
  • contextlib documentation: The first place that I noticed the async with aclosing(gen()) as generator: idiom.
1

Lots of "streaming" when working with language models to improve UX of high-latency completions.

2

CPython leverages reference counting with a generation garbage collector only for dealing with cyclic references. PyPy uses its own incremental garbage collector (and no reference counting). PyPy's docs provide a reasonable discussion of some of the differences.