A structured concurrency library that makes your life easy!
Edit: The article was updated to take into account the latest major version of anyio (3.X).
These last years there has been a new trend for concurrency in python called asyncio. Its goal is to help us create more efficient concurrent programs than the ones we traditionally made with threads. It leverages the new keywords async/await introduced in python3.5 and achieve better concurrency through the usage of coroutines sometimes defined as lightweight threads.
Although asyncio brings some fresh air into the python concurrency world, it has confusing design decisions that make it difficult to use. For more information you can look at these blog posts of Armin Ronacher (the creator of Flask) and Nathaniel J Smith (a python core developer and creator of trio):
- I don’t understand asyncio
- I’m not feeling the async pressure
- Some thoughts on asynchronous API design in a post-async/await world
Other async libraries came into play to correct asyncio flaws:
- curio: A simple and pleasant async library that tried to keep as much as possible the synchronous concurrency API well known by the developers and just added the async/await syntax on top of it.
- trio: Another user-friendly async library inspired by curio which introduced the concept of structured concurrency in python. IMHO structured concurrency is a revolutionary approach that make concurrent programming easier. If you want to know more about it there is a collection of resources related to this topic on the trio forum.
So in 2020, we have three libraries for async programming using the async/await keywords (there are libraries like Twisted or gevent that allow async programming using other concepts). Which one to use? Note that a program written in one of these libraries cannot be used in another one without some hacking.
This is where anyio comes to the rescue! I will just quote its definition:
AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio. It implements trio-like structured concurrency (SC) on top of asyncio, and works in harmony with the native SC of trio itself.
Unfortunately, curio is no longer supported due to some design decisions not matching with anyio development. More information can be found here.
The creator of anyio just takes the concepts of trio and applied them to asyncio (it does not have a concept of structured concurrency). This is the best of both worlds for me! Note however that if you write a program in anyio you must use the primitives it provides and not directly use asyncio or trio primitives to still be compatible with every library.
In the rest of the article, I will show you practical examples of how to use it and make detailed comments. Some examples are taken directly from anyio documentation but with additional comments resulting from my experiences.
This tutorial is quite long, so take your time to test, and understand examples one after another. After all, there is no hurry. 😉
Anyio requires Python 3.6.2 or later to run. You can use pip to install anyio:
pip install anyio
# if you want to test the trio backend
pip install anyio[trio]
You can also use poetry if you want (and it’s better!):
poetry add anyio
# if you want to test the trio backend
poetry add anyio[trio]
Creating and managing tasks
This is the simplest possible example to show how to create tasks in anyio.
- The only way to spawn tasks with anyio is the construct create_task_group used in an async context manager. The resulting object has a start_soon method for creating child tasks. You pass the function and its arguments like in the previous example. Only positional arguments can be passed for the function you want to run concurrently. The start_soon method accepts an optional keyword argument name to set the task name.
- By default, anyio runs with the asyncio backend. If you want to run it with another backend, look at the comments at the end of the example.
- Be aware that the main async function can be run directly with run callables of asyncio or trio i.e you can do asyncio.run(main()) (python3.7+) or trio.run(main).
- Last but not the least, if you check the output of the program, you will see that it takes approximately 1s to complete. So the tasks are really run concurrently!
With concurrent programming, multiple tasks can raise errors at (approximately) the same time. So which error to raise? In anyio, the answer is “all of them”. There is an ExceptionGroup object that is raised by anyio in this case with a traceback including the traceback of every exception encountered.
- If you look carefully at the output of the program, you will notice that it stops at soon as the first error is encountered. But wait a minute… there is no ExceptionGroup raised, just a ValueError! Yeah.. unfortunately, it is not simple to get the ExceptionGroup error, and the behavior changes slightly between backends. For example, if you change the backend to trio, you will notice that sometimes not even one task is finished, and the error raised can be TypeError or ValueError.
- I think that the different backends have distant behaviors when it comes to handling exceptions, so it is difficult for anyio to have a consistent behavior here. Be aware of that.
Often, we need to perform operations and be sure to not be blocked forever. So in many libraries like requests, you will find some kind of timeout argument on operations. This is not bad, but it is really a boilerplate and can become quickly difficult to follow. In anyio, it is simple to timeout a bloc of statements with a context manager.
- The statement print(‘This should never be printed’) will not be printed because the sleep takes to 2s and the timeout is 1s.
- If you look at the API documentation, you will see that it can take a shield argument to prevent the block to be canceled from any outside cancel operation. According to the creator of trio (remember that anyio follows its API), it is probably a bad idea if high-level code uses this property.
- There is also the context manager fail_after which raises a TimeoutError if the timeout expires.
In fact, when a context manager like move_on_after or fail_after cancels operations, they raise a canceled error that is intercepted at the exit of the context manager. If you are in a middle of an important operation and want to absolutely perform some cleanup, here is what you can do.
- The exception raised in response to cancellation changed between backends, so the only way to be sure to catch it is to use the function get_cancelled_exc_class which returns the relevant exception.
- If your cleanup code involves async code, it should be placed in a CancelScope block with a shield attribute set to True. Otherwise, the operation will be canceled immediately since it’s in an already canceled scope.
- Don’t forget to re-raise the cancellation error if not, the program will crash in an unexpected way.
Like in any concurrency library, in anyio you have all the classic synchronization primitives to coordinate tasks: locks, semaphores, events, etc.. There is also a capacity limiter which is a sort of enhanced semaphore. Let’s see some examples.
- In the first example, you have usage of a capacity limiter. As I said earlier, the usage is familiar to that of a semaphore. But you have extra features like the ability to set dynamically the number of tokens to use, know how many tokens have been used, etc.. Look at the API documentation for more information.
- In the second example, you have a classic usage of an event where tasks wait for a signal to continue their execution. Note that, unlike other concurrency libraries, there is no clear method for reusing an event. If you need to reuse an event, you just have to create a new one.
- To see other primitive examples, you can look at this section of the documentation.
Managing operating system signals
Often it is expected that programs respond to system signals. For example, if a user sends a SIGTERM signal, he expects that the program shutdowns gracefully, if he sends SIGHUP, he expects that the program reloads its configuration. Anyio provides a beautiful mechanism to intercept these signals. See yourselves.
- If you are a Windows user, SIGTERM and SIGHUP will probably not work. Sorry for that but Windows doesn’t handle many signals that exist on Unix systems. You can test the previous example with the SIGINT signal which works on Windows, replace lines 7 and 9 with SIGINT, and delete lines 12 and 13.
- As you can see you just iterate on the signals you want to check, with no callback to use like in the standard library. This is really clever and simple to use. 🙃
- If you want to test the program, open a first terminal window, run the script, open a second terminal window, and check the PID of the program with “ps -ef | grep myscript.py” (replace myscript.py with the real name of your program). The PID is the first number you will see on the relevant line. After that, you have to use the kill command with the signal you want to send. For SIGTERM it will be “kill -TERM PID” and for SIGHUP it will be “kill -HUP PID”.
- Again Windows users, the previous comment will not work for you, the only thing you can do is Ctrl+C to see that the signal is handled.
TCP client and server
One thing I really love about trio/anyio is the ease of writing network programs. Here is an example.
- To test this, you need to run the client script in a terminal window and the server script in another window. Start with the server.
- The client script is self-descriptive. You pass the host and port to connect_tcp and you have a SocketStream object with a clean interface to send and receive messages.
- For receiving messages you can use the syntax “async for message in client” and the client will continue iterating on the messages as long as there are messages, otherwise it will leave the loop.
- The server script is also easy to follow. You create a listener passing the port and you call the serve method on this listener with a callable. The callable takes the peer SocketStream object as its only argument.
- On line 7 of the server script, you probably noticed the usage of extra method. Anyio has a system of typed attributes to easily retrieve socket or listener attributes especially if there are wrapped in another socket or listener. For more information, you can look at this section of anyio documentation.
TLS client and server
The previous example was simple but not realistic. In the real world, you need to secure your connections and for that, the usage of TLS is needed. The server needs to have a certificate. There are many vendors that can provide certificates recognized by the majority of clients, but for testing purposes, you probably want to use a self-signed certificate. The following command creates this for you, a certificate named cert.pem with a private key key.pem that will be valid 365 days.
openssl req -x509 -newkey rsa:2048 -subj '/CN=localhost' -keyout key.pem -out cert.pem -nodes -days 365
Now we can slightly modify the previous example to make it TLS-capable.
- For the client script, like it is said in the comments, you can omit lines 11 and 12 if you are connecting to well-known services that have probably recognized certificates. In that case, instead of the ssl_context argument, you can just pass the tls argument with True value.
- The server script is again easy to understand. We create an SSL context, passing the certificate and private key. After that, we create a tcp_listener like in the previous example, but this time we wrapped it in a TLSListener which takes at creation the tcp_listener and the ssl context. The rest is like the previous example.
Unix client and server
For Unix systems (sorry Windows!), there are Unix sockets that can be very convenient for inter-process communication. You can see in the next example that writing clients and servers for this type of socket is almost identical to the way to do it with TCP.
- I don’t think there is much to say, the interface is pretty much the same except that the connect_unix and create_unix_listener functions take a file as an argument instead of host and port.
UDP client and server
With UDP we don’t have the notion of a connected socket but it is still useful for many protocols like DNS.
- For the client script, the interface is the same as before for sending and receiving messages. The UDP object used here is a ConnectedUDPSocket object.
- For the server script, the UDP object used is an UDPSocket one. To send a message we use the method sendto and we have to pass the message, the client host and the client port. Yeah.. a bit boilerplate, but is due to the nature of UDP socket.
Bonus: a dual server
It is something I’ve been dreaming about lately and anyio makes it easy to realize one. I am talking about a server capable of serving UDP and TCP clients at the same time. This is especially useful for DNS or HTTP servers. Yeah for the latter, with the future introduction of HTTP3, UDP will be used for transport! So HTTP servers will need to both handle TCP and UDP. Here is a simple example of a dual server.
- There are three tasks spawned in the main async function. The first checks signal handlers to stop the program if needed. The second creates a UDP server and the third creates a TCP server. This should be familiar to you now.
- However, note that for the creation of the servers, I use an option “reuse_port” which is a socket option to allow multiple sockets to bind on the same address/port. This is part of the magic that allows it to handle both TCP and UDP on port 1234 like in the previous example. Unfortunately, this option is not available on Windows so you cannot run the program on this platform.
- To test this example you can reuse the clients shown in the previous TCP and UDP sections.
Communication between tasks
In real life, we generally want some kind of communication between tasks. In anyio, it is achieved via the use of memory channels. Example:
- Line 17, we create a memory channel. The default size of the channel is 0 which is a good default to prevent bugs related to back pressure. If you want an infinite size, just pass the value math.inf but this is rarely a good idea. For more literature about this, I can only recommend you read this section of trio’s documentation.
- In producer and consumer functions, you will notice that we start with the syntax “async with..”. This is the correct way to do to ensure correct resource cleanup.
- If you prefer the traditional approach with a queue, you can combine the sender and receiver of the previous example into a unique object using the StapledStream class. If you do that, create the memory channel with a minimum value of 1 if not, you will have surprises.
Managing multiple producers and/or multiple consumers
A more tricky case of communication between tasks is when multiple producers or consumers are involved. In this case, the clone method of a channel comes in handy.
- Even if we clone the channels, a value is sent only to a unique consumer.
- The original channel objects created at line 21 will not be closed until all clones are closed.
Working with threads
Sometimes, you will have to deal with some blocking io (input/output) operations you can’t perform with anyio like manipulating paths. For that, you can use the helper function to_thread.run_sync.
Dealing with files
Anyio provides a simple construct open_file similar to the built-in open function to read or write files.
- If you want to test it, replace “path/to/file” with a real path.
- The interface of the file object returned by open_file is similar to the classical one you know, just add “await” before calling its io methods like read or write. You can read the API documentation here.
It is simple enough to run arbitrary executables in subprocesses with anyio.
- The second example, it is quite simple. We read the content of the current directory. For Windows users, you can replace this command with dir.
- For the first example, you have an advanced use case where you need more control over the process run. You can check the interface of the Process object in the API documentation. For Windows users, you can replace ps with tasklist.
Running CPU-intensive computation
For operations that use heavy computation (pure mathematics functions for example), running them in threads as shown above is not enough because the thread running the CPU task can starve the main thread which contains the event loop. This happens because the python interpreter cannot run code in multiple cores with threads due to the GIL. So for that reason, we need a construct to run these functions in a process. Anyio has a helper function to_process.run_sync for that purpose.
- For now, you need to use the syntax “from anyio import to_process” to use this utility function.
- You need to run the code inside the bloc “if __name__ ..” due to some limitations of anyio implementation. More details can be found here.
So that is all for this long tutorial. If you reach the end after thoroughly reading all the contents, congratulations!! You did very well. Let me know if you have suggestions to improve the article or if some sections were unclear. Also before we leave each other, I can only advise you to have a look at the documentation of anyio and trio to deepen your knowledge.
Take care of yourself and see you soon for the next tutorial. 😁