I've been using Python for a very long time, possibly since around the 1.2 release although I am not sure. I'm also relatively slow to adopt new technologies. As a result, I've been using Python 2.7 until relatively recently. I dabbled in Python 3 in 2011, but the python ecosystem was still firmly entrenched in the Python 2 branch. The situation has changed a lot since then, and Python 3 is coming into its own with significant performance improvements, support across nearly all the most popular libraries, and some new language features.

I've taken the plunge into Python 3 (Python 3.6.1 specifically) and I will likely not start new projects in Python 2. I'm particularly interested in how the new asynchronous framework (asyncio) can help improve the code I write around microservices and websockets. Previously I employed the excellent gevent library to handle websockets under the bottle framework, however Python 3.6 comes with an internal coroutine library (asyncio) and event driven frameworks built upon it like aiohttp and sanic which could make working with websockets easier and improve their performance. I've done some experimentation with sanic, but I wanted to delve into a part of Python 3.6 which I found confusing first, and that is the relationship between asyncio and concurrent.futures.

The concurrent.futures library allows you to set up a thread or process pool for concurrent paths of execution. It is very similar in design to asyncio in that a function is defined and scheduled to execute. During the scheduling a Future object is returned (note that concurrent.futures.Future objects are similar to, but not compatible with asyncio.Future objects). The status and return value of the function can then be accessed using the Future object. The difference is that concurrent asyncio routines run in a single thread of execution, yielding when waiting for I/O jobs to process (typically), whereas a concurrent.futures routine runs on a thread or process pool. Of course I wanted to know how I could mix them so that I could run a computationally intensive process in an asyncio event driven program.

The Computationally Intensive Routine

I will run through several examples involving the following function which calculates if a number greater than 2 is prime. This code is taken from the concurrent.futures example in the python documentation.

import math

# Typical computationally intensive function
def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

Now for a baseline, let's just use this routine to find if a set of numbers is prime. We'll run through the numbers synchronously in a single process using the code below, and find out how much time it takes.

import math

# The definition of is_prime has been cut from this file

def main():
    prime_candidates = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
        17,
        4]

    for i, n in enumerate(prime_candidates):
        prime = is_prime(n)
        if prime:
            print("{}: {} is prime".format(i,n))
        else:
            print("{}: {} is not prime".format(i,n))

if __name__=='__main__':
    main()

I found that took about 4.2 seconds on my computer. My process viewer showed that a single Python process was started with a single thread of execution.

If we use a process pool and the concurrent.futures library we can improve this time quite a bit (provided we have multiple processors). A process pool is preferable to a thread pool with Python for computationally intensive work because Python has a global interpreter lock which only allows one thread to be active at a time. Some, including me, consider this to be generally a good thing. Below is code which dispatches this function to be calculated on a process pool. Note that I have created a get_result function which returns both the number I sent and if it is prime. I did this because I am going to be receiving answers in the order in which they are completed, and it is convenient for me to get back both the number and the boolean indicating it is prime.

import math
import concurrent.futures

# The definition of is_prime has been cut from this file

# Return the number as well, so we can keep track of which answer is which
def get_result(n):
    prime = is_prime(n)
    return n, prime

def main():
    prime_candidates = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
        17,
        4]

    # create the process pool
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # submit jobs to the pool. These are concurrent.futures.Future objects.
        futures = [executor.submit(get_result, n) for n in prime_candidates]

        # As futures are completed they are returned and the result can be obtained
        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            n, prime = future.result()
            if prime:
                print("{}: {} is prime".format(i,n))
            else:
                print("{}: {} is not prime".format(i,n))

if __name__=='__main__':
    main()

This version of the code returned in 1.1 seconds. I notice that it started 8 processes on my computer, which is equal to the number of cores in my CPU.

A Naive Approach

Without understanding the difference between asyncio and concurrent.futures, you may be inclined to just modify the above code into something that uses the new async and await keywords, and uses the asyncio event loop. That would look something like the code below.

import math
import asyncio

# DO NOT USE THIS. IT IS AN EXAMPLE OF THE WRONG WAY TO DO IT

# The definition of is_prime has been cut from this file

# Return the number as well, so we can keep track of which answer is which
async def get_result(n):
    prime = is_prime(n)
    return n, prime

# Scheduling the run in the asyncio event loop
async def main():
    prime_candidates = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
        17,
        4]

    # Calling the coroutines returns futures.
    futures = [get_result(n) for n in prime_candidates]

    # As futures are completed they are returned and the result can be obtained
    for i, future in enumerate(asyncio.as_completed(futures)):
        n, prime = await future
        if prime:
            print("{}: {} is prime".format(i,n))
        else:
            print("{}: {} is not prime".format(i,n))

if __name__=='__main__':
    # This creates the event loop and runs main in the loop until main returns.
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

When I timed that implementation, it ran in 4.0 seconds and in only one thread in one process. Although we allow the functions to run concurrently, there is no waiting for I/O here, so we have to wait for each function to complete sequentially before all the results are ready. In this case concurrency isn't saving us any time because we don't have a pool of processes running on separate CPUs or CPU cores.

A Proper Mix of Futures

So, can one mix the benefits of asyncio event loops with concurrent.futures pools? The answer is yes, it is possible to let your event loop know that it can move on while a coroutine waits for a process to finish on a process pool, and the function that let's us bridge this divide is the asyncio.AbstractEventLoop.run_in_executor function. You can think of this function as creating an asyncio.Future wrapper around a concurrent.futures.Future object. You can then use await on the return value of run_in_executor to have your coroutine wait for the response from the underlying process pool.

The example below implements this for the is_prime function. To get the current event loop we use the asyncio.get_event_loop() function, and then use that loop's run_in_executor method.

import math
import concurrent.futures
import asyncio

# The definition of is_prime has been cut from this file

# Wrapping corouting which waits for return from process pool.
async def get_result(executor, n):
    loop = asyncio.get_event_loop()
    prime = await loop.run_in_executor(executor, is_prime, n)
    return n, prime

# Scheduling the run in the asyncio event loop
async def main():
    prime_candidates = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
        17,
        4]

    # create the process pool
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Calling the asyncio coroutines returns futures.
        futures = [get_result(executor, n) for n in prime_candidates]

        # As futures are completed they are returned and the result can be obtained
        for i, future in enumerate(asyncio.as_completed(futures)):
            n, prime = await future
            if prime:
                print("{}: {} is prime".format(i,n))
            else:
                print("{}: {} is not prime".format(i,n))

if __name__=='__main__':
    # This creates the event loop and runs main in the loop until main returns.
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

This program runs in 1.2 seconds on my computer, and uses 8 processes in the process pool. In this case using asyncio is unnecessary, but if we were to be running an event driven web server and we had a computationally intensive process to perform, this would be the pattern to follow to assure the web server didn't stop serving new events.

i