Anyio: all you need for async programming stuff

A woman trying to eat and read at the same time
Photo by The Creative Exchange on Unsplash

Foreword

  • curio: A simple and pleasant async library which tried to keep as much as possible the synchronous concurrency api well known by the developers and just adding the async/await syntax on top of it.
  • trio: Another user-friendly async library inspired by curio which introduced the concept of structured concurrency in python. IMHO structured concurrency is a revolutionary approach which make concurrent programming easier. If you want to know more about it there is a collection of resources related to this topic on the trio forum.

Installation

pip install anyio
# if you want to test the trio backend
pip install anyio[trio]
poetry add anyio
# if you want to test the trio backend
poetry add anyio[trio]

Usage

Creating and managing tasks

  • The only way to spawn tasks with anyio is the construct create_task_group used in an async context manager. The resulting object has a start_soon method for creating child tasks. You pass the function and its arguments like in the previous example. Only positional arguments can be passed for the function you want to run concurrently. The start_soon method accepts an optional keyword argument name to set the task name.
  • By default anyio runs with the asyncio backend. If you want to run it with another backend, look at the comments at the end of the example.
  • Be aware that the main async function can be run directly with run callables of asyncio or trio i.e you can do asyncio.run(main()) (python3.7+) or trio.run(main).
  • Last but not the least, if you check the output of the program, you will see that it take approximately 1s to complete. So the tasks are really run concurrently!

Handling exceptions

  • If you look carefully at the ouput of the program, you will notice that it stops at soon as a first error is encountered. But wait a minute.. there is no ExceptionGroup raised, just a ValueError! Yeah.. unfortunately, it is not simple to get the ExceptionGroup error and the behaviour changes slightly between backends. For example if you change the backend to trio, you will notice that sometimes not even one task finished, and the error raised can be TypeError or ValueError.
  • I think that the different backends have distant behaviours when it comes to handle exceptions, so it is difficult for anyio to have a consistent behaviour here. Be aware of that.

Timeouts

  • The statement print(‘This should never be printed’) will not be printed because the sleep takes to 2s and the timeout is 1s.
  • If you look at the api documentation, you will see that it can take a shield argument to prevent the block to be cancelled from any outside cancel operation. According to the creator of trio (remember that anyio follows its api) it is probably a bad idea if high level code use this property.
  • There is also the context manager fail_after which raises a TimeoutError if the timeout expires.

Cancellation

  • The exception raised in response to cancellation changed between backends, so the only way to be sure to catch it is to use the function get_cancelled_exc_class which returns the relevant exception.
  • If your cleanup code involves async code, it should be placed in a CancelScope block with shield attribute set to True. Otherwise the operation will be cancelled immediately since it’s in an already cancelled scope.
  • Don’t forget to re-raise the cancellation error if not, the program will crash in an unexpected way.

Synchronization primitives

  • In the first example you have a usage of a capacity limiter. Like I said earlier, the usage is familiar to that of a semaphore. But you have extra features like the ability to set dynamically the number of tokens to use, know how many tokens have been used, etc.. Look at the api documentation for more information.
  • In the second example, you have a classic usage of an event where tasks wait for a signal to continue their execution. Note that unlike other concurrency libraries, there is no clear method for reusing an event. If you need to reuse an event, you just have to create a new one.
  • To see other primitive examples, you can look at this section of the documentation.

Managing operating system signals

  • If you are a Windows user, SIGTERM and SIGHUP will probably not work. Sorry for that but Windows don’t handle many signals that exist on Unix systems. You can test the previous example with the SIGINT signal which works on Windows, replace lines 7 and 9 by SIGINT and delete lines 12 and 13.
  • As you can see you just iterate on the signals you want to check, no callback to use like in the standard library. This is really clever and simple to use. 🙃
  • If you want to test the program, open a first terminal window, run the script, open a second terminal window, check the pid of the program with “ps -ef | grep myscript.py” (replace myscript.py with the real name of your program). The pid is the first number you will see on the relevant line. After that you have to use the kill command with the signal you want to send. For SIGTERM it will be “kill -TERM pid” and for SIGHUP it will be “kill -HUP pid”.
  • Again Windows users, the previous comment will not work for you, the only thing you can do is Ctrl+C to see that the signal is handled.

TCP client and server

  • To test this, you need to run the client script in a terminal window and the server script in another window. Start with the server.
  • The client script is self descriptive. You pass the host and port to connect_tcp and you have a SocketStream object with a clean interface to send and receive messages.
  • For receiving messages you can use the syntax “async for message in client” and the client will continue iterating on the messages as long as there are messages, otherwise it will leave the loop.
  • The server script is also easy to follow. You create a listener passing the port and you call the serve method on this listener with a callable. The callable takes the peer SocketStream object as its only argument.
  • On line 7 of the server script, you probably noticed the usage of extra method. Anyio has a system of typed attributes to easily retrieve socket or listener attributes especially if there are wrapped in another socket or listener. For more information you can look this section of anyio documentation.

TLS client and server

openssl req -x509 -newkey rsa:2048 -subj '/CN=localhost' -keyout key.pem -out cert.pem -nodes -days 365
  • For the client script, like it is said in the comments, you can omit lines 11 and 12 if you are connecting to well-known services that have probably recognized certificates. In that case, instead of the ssl_context argument, you can just pass the tls argument with True value.
  • The server script is again easy to understand. We create an ssl context, passing the certificate and private key. After that we create a tcp_listener like in the previous example, but this time we wrapped it in a TLSListener which takes at creation the tcp_listener and the ssl context. The rest is like the previous example.

Unix client and server

  • I don’t think there is much to say , the interface is pretty much the same except that the connect_unix and create_unix_listener functions take a file as argument instead of host and port.

UDP client and server

  • For the client script, the interface is the same as before for sending and receiving messages. The udp object used here is a ConnectedUDPSocket object.
  • For the server script, the udp object used is an UDPSocket one. To send a message we use the method sendto and we have to pass the message, the client host and the client port. Yeah.. a bit boilerplate, but is due to the nature of udp socket.

Bonus: a dual server

  • There are three tasks spawned in the main async function. The first checks signal handlers to stop the program if needed. The second creates an udp server and the third creates a tcp server. This should be familiar for you now.
  • However, note that for the creation of the servers, I use an option “reuse_port” which is a socket option to allow multiple sockets to bind on the same address/port. This is part of the magic that allows to handle both tcp and udp on port 1234 like in the previous example. Unfortunately this option is not available on Windows so you cannot run the program on this platform.
  • To test this example you can reuse the clients showed in previous tcp and udp sections.

Communcation between tasks

  • Line 17, we create a memory channel. The default size of the channel is 0 which is a good default to prevent bugs related to back pressure. If you want an infinite size, just pass the value math.inf but this is rarely a good idea. For more litterature about this, I can only recommend you to read this section of trio’s documentation.
  • In producer and consumer functions, you will notice that we start with the syntax “async with..”. This is the correct way to do to ensure correct resource cleanup.
  • If you prefer the traditional approach with a queue, you can combine the sender and receiver of the previous example into an unique object using the StapledStream class. If you do that, create the memory channel with a minimum value of 1 if not, you will have surprises.

Managing multiple producers and/or multiple consumers

  • Even if we clone the channels, a value is sent only to a unique consumer.
  • The original channel objects created at line 21 will not be closed until all clones are closed.

Working with threads

Dealing with files

  • If you want to test it, replace “path/to/file” with a real path.
  • The interface of the file object returned by open_file is similar to the classical one you know, just add “await” before calling its io methods like read or write. You can read the api documentation here.

Using subprocesses

  • For the second example, it is quite simple. We read the content of the current directory. For Windows users, you can replace this command by dir.
  • For the first example, you have an advanced use case where you need more control over the process run. You can check the interface of the Process object in the api documentation. For Windows users, you can replace ps by tasklist.

Running CPU intensive computation

  • For now, you need to use the syntax “from anyio import to_process” to use this utility function.
  • You need to run the code inside the bloc “if __name__ ..” due to some limitations of anyio implementation. More details can be found here.

--

--

--

Déserteur camerounais résidant désormais en France. Passionné de programmation, sport, de cinéma et mangas. J’écris en français et en anglais dû à mes origines.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

True Economics of Cloud Computing: Cost Benefits of Migration

Q&A With A LinkedIn Influencer

Oculus Quest 1 compared to Oculus Quest 2 Resolution Comparison

Give the Gift of a Tech Debt Sprint This Agile Holiday Season

TIL/2020–11–19

Python Library that displays readme-file content in CLI

CS371p Fall 2021: John Mackie : Final Entry

SynchroDynamic Software Abstraction and Polymorphism

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Kevin Tewouda

Kevin Tewouda

Déserteur camerounais résidant désormais en France. Passionné de programmation, sport, de cinéma et mangas. J’écris en français et en anglais dû à mes origines.

More from Medium

Getting Started — FastAPI + ArangoDB

How to run PostgreSql using Docker and Python

Using Python’s DataComPy to Compare PostgreSQL Data

Philips Hue with python

Python philips hue