Anyio: all you need for async programming stuff

Kevin Tewouda
14 min readNov 25, 2020

--

A structured concurrency library that makes your life easy!

A woman trying to eat and read at the same time
Photo by The Creative Exchange on Unsplash

Edit: The article was updated to take into account the latest major version of anyio (4.X).

Foreword

These last years there has been a new trend for concurrency in Python called asyncio. Its goal is to help us create more efficient concurrent programs than the ones we traditionally made with threads. It leverages the new keywords async/await introduced in python3.5 and achieves better concurrency through the usage of coroutines sometimes defined as lightweight threads.

Although asyncio brings some fresh air into the Python concurrency world, it has confusing design decisions that make it difficult to use. For more information, you can look at these blog posts of Armin Ronacher (the creator of Flask) and Nathaniel J Smith (a Python core developer and creator of trio):

Other async libraries came into play to correct asyncio flaws:

  • curio: A simple and pleasant async library that tried to keep as much as possible the synchronous concurrency API well known by the developers and just added the async/await syntax on top of it.
  • trio: Another user-friendly async library inspired by curio which introduced the concept of structured concurrency in Python. IMHO structured concurrency is a revolutionary approach that makes concurrent programming easier. If you want to know more about it there is a collection of resources related to this topic on the trio forum.

So in 2020, we have three libraries for async programming using the async/await keywords (there are libraries like Twisted or gevent that allow async programming using other concepts). Which one to use? Note that a program written in one of these libraries cannot be used in another one without some hacking.

This is where anyio comes to the rescue! I will just quote its definition:

AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio. It implements trio-like structured concurrency (SC) on top of asyncio, and works in harmony with the native SC of trio itself.

Unfortunately, curio is no longer supported due to some design decisions not matching with anyio development. More information can be found here.

The creator of anyio just took the concepts of trio and applied them to asyncio (it does not have a concept of structured concurrency). This is the best of both worlds for me! Note however that if you write a program in anyio you must use the primitives it provides and not directly use asyncio or trio primitives to still be compatible with every library.

In the rest of the article, I will show you practical examples of how to use it and make detailed comments. Some examples are taken directly from anyio documentation but with additional comments resulting from my experiences.

This tutorial is quite long, so take your time to test, and understand examples one after another. After all, there is no hurry. 😉

Installation

Anyio requires Python 3.8 or later to run. You can use pip to install anyio:

pip install anyio
# if you want to test the trio backend
pip install anyio[trio]

You can also use poetry if you want (and it’s better!):

poetry add anyio
# if you want to test the trio backend
poetry add anyio[trio]

If you don’t know poetry, I wrote an article on it here:

Usage

Creating and managing tasks

This is the simplest possible example to show how to create tasks in anyio.

Notes:

  • The only way to spawn tasks with anyio is the construct create_task_group used in an async context manager. The resulting object has a start_soon method for creating child tasks. You pass the function and its arguments like in the previous example. Only positional arguments can be passed for the function you want to run concurrently. The start_soon method accepts an optional keyword argument name to set the task name.
  • By default, anyio runs with the asyncio backend. If you want to run it with another backend, look at the comments at the end of the example.
  • Be aware that the main async function can be run directly with run callables of asyncio or trio i.e. you can do asyncio.run(main()) (python3.7+) or trio.run(main).
  • Last but not least, if you check the output of the program, you will see that it takes approximately 1s to complete. So the tasks are really run concurrently!

Handling exceptions

With concurrent programming, multiple tasks can raise errors at (approximately) the same time. So which error to raise? In anyio, the answer is “all of them”. There is an ExceptionGroup object that is raised by anyio in this case with a traceback including the traceback of every exception encountered.

If you run this program, you will have an output somewhat similar to the following (here I’m using Python 3.11 which is more informative in terms of error messages):

Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
+ Exception Group Traceback (most recent call last):
| File "C:\Users\rolla\PycharmProjects\python\tutorials\io\main.py", line 23, in <module>
| anyio.run(main)
Task 0 finished
Task 1 finished
Task 3 finished
| File "C:\Users\rolla\AppData\Local\pypoetry\Cache\virtualenvs\io-Y-7FwA9r-py3.11\Lib\site-packages\anyio\_core\_eventloop.py", line 66, in run
| return async_backend.run(func, args, {}, backend_options)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Users\rolla\AppData\Local\pypoetry\Cache\virtualenvs\io-Y-7FwA9r-py3.11\Lib\site-packages\anyio\_backends\_asyncio.py", line 1968, in run
| return native_run(wrapper(), debug=debug)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Users\rolla\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
| return runner.run(main)
| ^^^^^^^^^^^^^^^^
| File "C:\Users\rolla\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
| return self._loop.run_until_complete(task)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Users\rolla\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
| return future.result()
| ^^^^^^^^^^^^^^^
| File "C:\Users\rolla\AppData\Local\pypoetry\Cache\virtualenvs\io-Y-7FwA9r-py3.11\Lib\site-packages\anyio\_backends\_asyncio.py", line 1961, in wrapper
| return await func(*args)
| ^^^^^^^^^^^^^^^^^
| File "C:\Users\rolla\PycharmProjects\python\tutorials\io\main.py", line 16, in main
| async with anyio.create_task_group() as tg:
| File "C:\Users\rolla\AppData\Local\pypoetry\Cache\virtualenvs\io-Y-7FwA9r-py3.11\Lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (2 sub-exceptions)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "C:\Users\rolla\PycharmProjects\python\tutorials\io\main.py", line 8, in task
| raise ValueError
| ValueError
+---------------- 2 ----------------
| Traceback (most recent call last):
| File "C:\Users\rolla\PycharmProjects\python\tutorials\io\main.py", line 10, in task
| raise TypeError
| TypeError
+------------------------------------

If you want to catch these errors, there are two cases.

First, you are using Python 3.11+ and then you can write the following code:

async def main():
start = anyio.current_time()
try:
async with anyio.create_task_group() as tg:
for i in range(5):
tg.start_soon(task, i)
except* ValueError as exc_group:
for exc in exc_group.exceptions:
# handle ValueError here
print(exc)
except* TypeError as exc_group:
for exc in exc_group.exceptions:
# handle TypeError here
print(exc)

runtime = anyio.current_time() - start
print(f'program executed in {runtime:.2f}s')

If you are using an older Python version, you need to install the exceptiongroup library which is a backport of the ExceptionGroup feature, and write the following code:

import anyio
from exceptiongroup import ExceptionGroup, catch

...

def value_type_error_handler(exc_group: ExceptionGroup) -> None:
for exc in exc_group.exceptions:
print(exc)

async def main():
start = anyio.current_time()
# you can define different handlers for different types of error
with catch({
(ValueError, TypeError): value_type_error_handler,
}):
async with anyio.create_task_group() as tg:
for i in range(5):
tg.start_soon(task, i)

runtime = anyio.current_time() - start
print(f'program executed in {runtime:.2f}s')

Note: if you need to set local variables in your handler functions, you should declare them as nonlocal.

def handle_value_error(exc):
nonlocal somevariable
somevariable = 'whatever'

Timeouts

Often, we need to perform operations and be sure to not be blocked forever. So in many libraries like requests, you will find some kind of timeout argument on operations. This is not bad, but it is really a boilerplate and can become quickly difficult to follow. In anyio, it is simple to timeout a bloc of statements with a context manager.

Notes:

  • The statement print(‘This should never be printed’) will not be printed because the sleep takes 2s and the timeout is 1s.
  • If you look at the API documentation, you will see that it can take a shield argument to prevent the block from being canceled from any outside cancel operation. According to the creator of trio (remember that anyio follows its API), it is probably a bad idea if high-level code uses this property.
  • There is also the context manager fail_after which raises a TimeoutError if the timeout expires.

Cancellation

In fact, when a context manager like move_on_after or fail_after cancels operations, it raises a canceled error that is intercepted at the exit of the context manager. If you are in the middle of an important operation and want to absolutely perform some cleanup, here is what you can do.

Notes:

  • The exception raised in response to cancellation changed between backends, so the only way to be sure to catch it is to use the function get_cancelled_exc_class which returns the relevant exception.
  • If your cleanup code involves async code, it should be placed in a CancelScope block with a shield attribute set to True. Otherwise, the operation will be canceled immediately since it’s in an already canceled scope.
  • Don’t forget to re-raise the cancellation error if not, the program will crash in an unexpected way.

Synchronization primitives

Like in any concurrency library, in anyio you have all the classic synchronization primitives to coordinate tasks: locks, semaphores, events, etc... There is also a capacity limiter which is a sort of enhanced semaphore. Let’s see some examples.

Notes:

  • In the first example, you have a usage of the capacity limiter. As I said earlier, the usage is similar to that of a semaphore. But you have extra features like the ability to set dynamically the number of tokens to use, know how many tokens have been used, etc.. Look at the API documentation for more information.
  • In the second example, you have a classic usage of an event where tasks wait for a signal to continue their execution. Note that, unlike other concurrency libraries, there is no clear method for reusing an event. If you need to reuse an event, you just have to create a new one.
  • To see other primitive examples, you can look at this section of the documentation.

Managing operating system signals

Often it is expected that programs respond to system signals. For example, if a user sends a SIGTERM signal, he expects that the program shutdowns gracefully, if he sends SIGHUP, he expects that the program reloads its configuration. Anyio provides a beautiful mechanism to intercept these signals. See yourselves.

Notes:

  • If you are a Windows user, I’m sorry but signal handling does not work. trio tries his best to support SIGINT on Windows, but it is not a good idea to rely on it.
  • As you can see you just iterate on the signals you want to check, with no callback to use like in the standard library. This is really clever and simple to use. 🙃
  • If you want to test the program, open a first terminal window, run the script, open a second terminal window, and check the PID of the program with “ps -ef | grep myscript.py” (replace myscript.py with the real name of your program). The PID is the first number you will see on the relevant line. After that, you have to use the kill command with the signal you want to send. For SIGTERM it will be “kill -TERM PID” and for SIGHUP it will be “kill -HUP PID”.

TCP client and server

One thing I really love about trio/anyio is the ease of writing network programs. Here is an example.

Notes:

  • To test this, you need to run the client script in a terminal window and the server script in another window. Start with the server.
  • The client script is self-descriptive. You pass the host and port to connect_tcp and you have a SocketStream object with a clean interface to send and receive messages.
  • For receiving messages you can use the syntax “async for message in client” and the client will continue iterating on the messages as long as there are messages, otherwise it will leave the loop.
  • The server script is also easy to follow. You create a listener passing the port and call the serve method on this listener with a callable. The callable takes the peer SocketStream object as its only argument.
  • On line 7 of the server script, you probably noticed the usage of extra method. Anyio has a system of typed attributes to easily retrieve socket or listener attributes especially if they are wrapped in another socket or listener. For more information, you can look at this section of anyio documentation.

TLS client and server

The previous example was simple but not realistic. In the real world, you need to secure your connections and for that, the usage of TLS is needed. The server needs to have a certificate. There are many vendors that can provide certificates recognized by the majority of clients, but for testing purposes, you probably want to use a self-signed certificate. The following command creates this for you, a certificate named cert.pem with a private key key.pem that will be valid for 365 days.

openssl req -x509 -newkey rsa:2048 -subj '/CN=localhost' -keyout key.pem -out cert.pem -nodes -days 365

We can slightly modify the previous example to make it TLS-capable.

Notes:

  • For the client script, like it is said in the comments, you can omit lines 11 and 12 if you are connecting to well-known services that have probably recognized certificates. In that case, instead of the ssl_context argument, you can just pass the tls argument with True value.
  • The server script is again easy to understand. We create an SSL context, passing the certificate and private key. After that, we create a tcp_listener like in the previous example, but this time we wrapped it in a TLSListener which takes the creation of the tcp_listener and the ssl context. The rest is like the previous example.

Unix client and server

For Unix systems (sorry Windows!), there are Unix sockets that can be very convenient for inter-process communication. You can see in the next example that writing clients and servers for this type of socket is almost identical to the way to do it with TCP.

Notes:

  • I don’t think there is much to say, the interface is pretty much the same except that the connect_unix and create_unix_listener functions take a file as an argument instead of host and port.

UDP client and server

With UDP we don’t have the notion of a connected socket but it is still useful for many protocols like DNS.

Notes:

  • For the client script, the interface is the same as before for sending and receiving messages. The UDP object used here is a ConnectedUDPSocket object.
  • For the server script, the UDP object used is an UDPSocket one. To send a message we use the method sendto and we have to pass the message, the client host and the client port. Yeah.. a bit boilerplate, but is due to the nature of UDP socket.

Bonus: a dual server

It is something I’ve been dreaming about lately and anyio makes it easy to realize one. I am talking about a server capable of serving UDP and TCP clients at the same time. This is especially useful for DNS or HTTP servers. Yeah for the latter, with the future introduction of HTTP3, UDP will be used for transport! So HTTP servers will need to both handle TCP and UDP. Here is a simple example of a dual server.

Notes:

  • There are three tasks spawned in the main async function. The first checks signal handlers to stop the program if needed. The second creates a UDP server and the third creates a TCP server. This should be familiar to you now.
  • However, note that for the creation of the servers, I use an option “reuse_port” which is a socket option to allow multiple sockets to bind on the same address/port. This is part of the magic that allows it to handle both TCP and UDP on port 1234 like in the previous example. Unfortunately, this option is not available on Windows so you cannot run the program on this platform.
  • To test this example you can reuse the clients shown in the previous TCP and UDP sections.

Communication between tasks

In real life, we generally want some kind of communication between tasks. In anyio, it is achieved via the use of memory channels. Example:

Notes:

  • Line 17, we create a memory channel. The default size of the channel is 0 which is a good default to prevent bugs related to back pressure. If you want an infinite size, just pass the value math.inf but this is rarely a good idea. For more literature about this, I can only recommend you read this section of trio’s documentation.
  • In producer and consumer functions, you will notice that we start with the syntax “async with..”. This is the correct way to do to ensure correct resource cleanup.
  • If you prefer the traditional approach with a queue, you can combine the sender and receiver of the previous example into a unique object using the StapledStream class. If you do that, create the memory channel with a minimum value of 1 if not, you will have surprises.

Managing multiple producers and/or multiple consumers

A more tricky case of communication between tasks is when multiple producers or consumers are involved. In this case, the clone method of a channel comes in handy.

Notes:

  • Even if we clone the channels, a value is sent only to a unique consumer.
  • The original channel objects created at line 21 will not be closed until all clones are closed.

Working with threads

Sometimes, you will have to deal with some blocking io (input/output) operations you can’t perform with anyio like manipulating paths. For that, you can use the helper function to_thread.run_sync.

Dealing with files

Anyio provides a simple construct open_file similar to the built-in open function to read or write files.

Notes:

  • If you want to test it, replace “path/to/file” with a real path.
  • The interface of the file object returned by open_file is similar to the classical one you know, just add “await” before calling its io methods like read or write. You can read the API documentation here.

Using subprocesses

It is simple enough to run arbitrary executables in subprocesses with anyio.

Notes:

  • The second example is quite simple. We read the content of the current directory. For Windows users, you can replace this command with notepad. This command opens the default bloc-notes.
  • For the first example, you have an advanced use case where you need more control over the process run. You can check the interface of the Process object in the API documentation. For Windows users, you can replace this example with the following one:
from anyio import open_process, run
from charset_normalizer import from_bytes


async def main():
async with await open_process(['tasklist']) as process:
async for data in process.stdout:
print(from_bytes(data).best())


run(main)

Here we use the library charset_normalizer to detect the correct encoding since it is trickier on Windows. We don’t have the ps command but instead, a tasklist command doing approximately the same thing.

Running CPU-intensive computation

For operations that use heavy computation (pure mathematics functions for example), running them in threads as shown above is not enough because the thread running the CPU task can starve the main thread which contains the event loop. This happens because the Python interpreter cannot run code in multiple cores with threads due to the GIL. So for that reason, we need a construct to run these functions in a process. Anyio has a helper function to_process.run_sync for that purpose.

Notes:

  • For now, you need to use the syntax “from anyio import to_process” to use this utility function.
  • You need to run the code inside the bloc “if __name__ ..” due to some limitations of anyio implementation. More details can be found here.

So that is all for this long tutorial. If you reach the end after thoroughly reading all the contents, congratulations!! You did very well. Let me know if you have suggestions to improve the article or if some sections are unclear. Also before we leave each other, I can only advise you to have a look at the documentation of anyio and trio to deepen your knowledge.

Take care of yourself and see you soon for the next tutorial. 😁

If you like my article and want to continue learning with me, don’t hesitate to follow me here and subscribe to my substack 😉

--

--

Kevin Tewouda
Kevin Tewouda

Written by Kevin Tewouda

Déserteur camerounais résidant désormais en France. Passionné de programmation, sport, de cinéma et mangas. J’écris en français et en anglais dû à mes origines.

Responses (2)