Anyio: all you need for async programming stuff

A woman trying to eat and read at the same time
Photo by The Creative Exchange on Unsplash

Foreword

These last years there has been a new trend for concurrency in python called asyncio. It’s goal is to help us create more efficient concurrent programs than the ones we traditionally created with threads. It leverages the new keywords async/await introduced in python3.5 and achieve better concurrency through the usage of coroutines sometimes defined as lightweight threads.

  • curio: A simple and pleasant async library which tried to keep as much as possible the synchronous concurrency api well known by the developers and just adding the async/await syntax on top of it.
  • trio: Another user-friendly async library inspired by curio which introduced the concept of structured concurrency in python. IMHO structured concurrency is a revolutionary approach which make concurrent programming easier. If you want to know more about it there is a collection of resources related to this topic on the trio forum.

Installation

Anyio requires Python 3.6.2 or later to run. You can use pip to install anyio:

pip install anyio
# if you want to test the trio backend
pip install anyio[trio]
poetry add anyio
# if you want to test the trio backend
poetry add anyio[trio]

Usage

Creating and managing tasks

This is the simplest possible example to show how to create tasks in anyio.

  • The only way to spawn tasks with anyio is the construct create_task_group used in an async context manager. The resulting object has a start_soon method for creating child tasks. You pass the function and its arguments like in the previous example. Only positional arguments can be passed for the function you want to run concurrently. The start_soon method accepts an optional keyword argument name to set the task name.
  • By default anyio runs with the asyncio backend. If you want to run it with another backend, look at the comments at the end of the example.
  • Be aware that the main async function can be run directly with run callables of asyncio or trio i.e you can do asyncio.run(main()) (python3.7+) or trio.run(main).
  • Last but not the least, if you check the output of the program, you will see that it take approximately 1s to complete. So the tasks are really run concurrently!

Handling exceptions

With concurrent programming, errors can be raised by multiple tasks at (approximately) the same time. So which error to raise? In anyio, the answer is “all of them”. There is an ExceptionGroup object that is raised by anyio in this case with a traceback including the traceback of every exception encountered.

  • If you look carefully at the ouput of the program, you will notice that it stops at soon as a first error is encountered. But wait a minute.. there is no ExceptionGroup raised, just a ValueError! Yeah.. unfortunately, it is not simple to get the ExceptionGroup error and the behaviour changes slightly between backends. For example if you change the backend to trio, you will notice that sometimes not even one task finished, and the error raised can be TypeError or ValueError.
  • I think that the different backends have distant behaviours when it comes to handle exceptions, so it is difficult for anyio to have a consistent behaviour here. Be aware of that.

Timeouts

Often, we need to perform operations and be sure to not be blocked forever. So in many libraries like requests, you will find some kind of timeout argument on operations. This is not bad, but it is really boilerplate and can become quickly difficult to follow. In anyio, it is simple to timeout a bloc of statements with a context manager.

  • The statement print(‘This should never be printed’) will not be printed because the sleep takes to 2s and the timeout is 1s.
  • If you look at the api documentation, you will see that it can take a shield argument to prevent the block to be cancelled from any outside cancel operation. According to the creator of trio (remember that anyio follows its api) it is probably a bad idea if high level code use this property.
  • There is also the context manager fail_after which raises a TimeoutError if the timeout expires.

Cancellation

In fact when a context manager like move_on_after or fail_after cancel operations, they raise a cancelled error that is intercepted at the exit of the context manager. If you are in a middle of an important operation and want to absolutely perform some cleanup, here is what you can do.

  • The exception raised in response to cancellation changed between backends, so the only way to be sure to catch it is to use the function get_cancelled_exc_class which returns the relevant exception.
  • If your cleanup code involves async code, it should be placed in a CancelScope block with shield attribute set to True. Otherwise the operation will be cancelled immediately since it’s in an already cancelled scope.
  • Don’t forget to re-raise the cancellation error if not, the program will crash in an unexpected way.

Synchronization primitives

Like in any concurrency library, in anyio you have all the classic synchronization primitives to coordinate tasks: locks, semaphores, events,etc.. There is also a capacity limiter which is a sort of enhanced semaphore. Let’s see some examples.

  • In the first example you have a usage of a capacity limiter. Like I said earlier, the usage is familiar to that of a semaphore. But you have extra features like the ability to set dynamically the number of tokens to use, know how many tokens have been used, etc.. Look at the api documentation for more information.
  • In the second example, you have a classic usage of an event where tasks wait for a signal to continue their execution. Note that unlike other concurrency libraries, there is no clear method for reusing an event. If you need to reuse an event, you just have to create a new one.
  • To see other primitive examples, you can look at this section of the documentation.

Managing operating system signals

Often it is expected that programs respond to system signals. For example, if a user sends a SIGTERM signal, he expects that the program shutdowns gracefully, if he sends SIGHUP, he expects that the program reload its configuration. Anyio provides a beautiful mechanism to intercept these signals. See by yourselves.

  • If you are a Windows user, SIGTERM and SIGHUP will probably not work. Sorry for that but Windows don’t handle many signals that exist on Unix systems. You can test the previous example with the SIGINT signal which works on Windows, replace lines 7 and 9 by SIGINT and delete lines 12 and 13.
  • As you can see you just iterate on the signals you want to check, no callback to use like in the standard library. This is really clever and simple to use. 🙃
  • If you want to test the program, open a first terminal window, run the script, open a second terminal window, check the pid of the program with “ps -ef | grep myscript.py” (replace myscript.py with the real name of your program). The pid is the first number you will see on the relevant line. After that you have to use the kill command with the signal you want to send. For SIGTERM it will be “kill -TERM pid” and for SIGHUP it will be “kill -HUP pid”.
  • Again Windows users, the previous comment will not work for you, the only thing you can do is Ctrl+C to see that the signal is handled.

TCP client and server

One thing I really love with trio/anyio is the ease of writing network programs. Here is an example.

  • To test this, you need to run the client script in a terminal window and the server script in another window. Start with the server.
  • The client script is self descriptive. You pass the host and port to connect_tcp and you have a SocketStream object with a clean interface to send and receive messages.
  • For receiving messages you can use the syntax “async for message in client” and the client will continue iterating on the messages as long as there are messages, otherwise it will leave the loop.
  • The server script is also easy to follow. You create a listener passing the port and you call the serve method on this listener with a callable. The callable takes the peer SocketStream object as its only argument.
  • On line 7 of the server script, you probably noticed the usage of extra method. Anyio has a system of typed attributes to easily retrieve socket or listener attributes especially if there are wrapped in another socket or listener. For more information you can look this section of anyio documentation.

TLS client and server

The previous example was simple but not realistic. In the real world you need to secure your connections and for that the usage of TLS is needed. The server needs to have a certificate. There are many vendors that can provide certificates recognized by the majority of clients,but for testing purpose you probably want to use a self-signed certificate. The following command creates a this for you, a certificate named cert.pem with a private key key.pem that will be valid 365 days.

openssl req -x509 -newkey rsa:2048 -subj '/CN=localhost' -keyout key.pem -out cert.pem -nodes -days 365
  • For the client script, like it is said in the comments, you can omit lines 11 and 12 if you are connecting to well-known services that have probably recognized certificates. In that case, instead of the ssl_context argument, you can just pass the tls argument with True value.
  • The server script is again easy to understand. We create an ssl context, passing the certificate and private key. After that we create a tcp_listener like in the previous example, but this time we wrapped it in a TLSListener which takes at creation the tcp_listener and the ssl context. The rest is like the previous example.

Unix client and server

For Unix systems (sorry Windows!), there are unix sockets that can be very convenient for inter-process communication. You can see in the next example that writing clients and servers for this type of socket is almost identical to the way to do with tcp.

  • I don’t think there is much to say , the interface is pretty much the same except that the connect_unix and create_unix_listener functions take a file as argument instead of host and port.

UDP client and server

With UDP we don’t have the notion of connected socket but it is still useful for many protocols like DNS.

  • For the client script, the interface is the same as before for sending and receiving messages. The udp object used here is a ConnectedUDPSocket object.
  • For the server script, the udp object used is an UDPSocket one. To send a message we use the method sendto and we have to pass the message, the client host and the client port. Yeah.. a bit boilerplate, but is due to the nature of udp socket.

Bonus: a dual server

It is something I’ve been dreaming about lately and anyio make it easy to realize one. I am talking about a server capable of serving udp and tcp clients at the same time. This is especially useful for DNS or HTTP servers. Yeah for the latter, with the future introduction of HTTP3, UDP will be used for transport! So HTTP servers will need to both handle TCP and UDP. Here is a simple example of a dual server.

  • There are three tasks spawned in the main async function. The first checks signal handlers to stop the program if needed. The second creates an udp server and the third creates a tcp server. This should be familiar for you now.
  • However, note that for the creation of the servers, I use an option “reuse_port” which is a socket option to allow multiple sockets to bind on the same address/port. This is part of the magic that allows to handle both tcp and udp on port 1234 like in the previous example. Unfortunately this option is not available on Windows so you cannot run the program on this platform.
  • To test this example you can reuse the clients showed in previous tcp and udp sections.

Communcation between tasks

In real life, we generally want some kind of communication between tasks. In anyio, it is achieved via the use of memory channels. Example:

  • Line 17, we create a memory channel. The default size of the channel is 0 which is a good default to prevent bugs related to back pressure. If you want an infinite size, just pass the value math.inf but this is rarely a good idea. For more litterature about this, I can only recommend you to read this section of trio’s documentation.
  • In producer and consumer functions, you will notice that we start with the syntax “async with..”. This is the correct way to do to ensure correct resource cleanup.
  • If you prefer the traditional approach with a queue, you can combine the sender and receiver of the previous example into an unique object using the StapledStream class. If you do that, create the memory channel with a minimum value of 1 if not, you will have surprises.

Managing multiple producers and/or multiple consumers

A more tricky case of communication between tasks is when multiple producers or consumers are involved. In this case, the clone method of a channel comes in handy.

  • Even if we clone the channels, a value is sent only to a unique consumer.
  • The original channel objects created at line 21 will not be closed until all clones are closed.

Working with threads

Sometimes, you will have to deal with some blocking io (input/output) operations you can’t perform with anyio like manipulating paths. For that you can use the helper function to_thread.run_sync.

Dealing with files

Anyio provides a simple construct open_file similar to the builtin open function to read or write files.

  • If you want to test it, replace “path/to/file” with a real path.
  • The interface of the file object returned by open_file is similar to the classical one you know, just add “await” before calling its io methods like read or write. You can read the api documentation here.

Using subprocesses

It is simple enough to run arbirtrary executables in subprocesses with anyio.

  • For the second example, it is quite simple. We read the content of the current directory. For Windows users, you can replace this command by dir.
  • For the first example, you have an advanced use case where you need more control over the process run. You can check the interface of the Process object in the api documentation. For Windows users, you can replace ps by tasklist.

Running CPU intensive computation

For operations which use heavy computation (pure mathematics functions for example), running them in threads as shown above is not enough because the thread running the CPU task can starve the main thread which contains the event loop. This happends because the python interpreter cannot run code in multiple cores with threads due to the GIL. So for that reason, we need a construct to run these functions in a process. Anyio has a helper function to_process.run_sync for that purpose.

  • For now, you need to use the syntax “from anyio import to_process” to use this utility function.
  • You need to run the code inside the bloc “if __name__ ..” due to some limitations of anyio implementation. More details can be found here.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Kevin Tewouda

Kevin Tewouda

Déserteur camerounais résidant désormais en France. Passionné de programmation, sport, de cinéma et mangas. J’écris en français et en anglais dû à mes origines.