I've been using Python for a very long time, possibly since around the 1.2 release although I am not sure. I'm also relatively slow to adopt new technologies. As a result, I've been using Python 2.7 until relatively recently. I dabbled in Python 3 in 2011, but the python ecosystem was still firmly entrenched in the Python 2 branch. The situation has changed a lot since then, and Python 3 is coming into its own with significant performance improvements, support across nearly all the most popular libraries, and some new language features.
I've taken the plunge into Python 3 (Python 3.6.1 specifically) and I will
likely not start new projects in Python 2. I'm particularly interested in how
the new asynchronous framework
(asyncio) can help improve
the code I write around microservices and websockets. Previously I employed
the excellent gevent library to handle websockets
under the bottle framework, however Python
3.6 comes with an internal coroutine library
(asyncio) and event driven
frameworks built upon it like
aiohttp and
sanic which could make working with
websockets easier and improve their performance. I've done some
experimentation with sanic, but I wanted to delve into a part of Python 3.6
which I found confusing first, and that is the relationship between asyncio
and
concurrent.futures.
The concurrent.futures
library allows you to set up a thread or process pool
for concurrent paths of execution. It is very similar in design to asyncio
in that a function is defined and scheduled to execute. During the scheduling
a Future
object is returned (note that concurrent.futures.Future
objects
are similar to, but not compatible with asyncio.Future
objects). The status
and return value of the function can then be accessed using the Future
object. The difference is that concurrent asyncio
routines run in a single
thread of execution, yielding when waiting for I/O jobs to process
(typically), whereas a concurrent.futures
routine runs on a thread or
process pool. Of course I wanted to know how I could mix them so that I could
run a computationally intensive process in an asyncio
event driven program.
The Computationally Intensive Routine
I will run through several examples involving the following function which
calculates if a number greater than 2 is prime. This code is taken from the
concurrent.futures
example in the python documentation.
import math
# Typical computationally intensive function
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
Now for a baseline, let's just use this routine to find if a set of numbers is prime. We'll run through the numbers synchronously in a single process using the code below, and find out how much time it takes.
import math
# The definition of is_prime has been cut from this file
def main():
prime_candidates = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
17,
4]
for i, n in enumerate(prime_candidates):
prime = is_prime(n)
if prime:
print("{}: {} is prime".format(i,n))
else:
print("{}: {} is not prime".format(i,n))
if __name__=='__main__':
main()
I found that took about 4.2 seconds on my computer. My process viewer showed that a single Python process was started with a single thread of execution.
If we use a process pool and the concurrent.futures
library we can improve
this time quite a bit (provided we have multiple processors). A process pool
is preferable to a thread pool with Python for computationally intensive work
because Python has a global interpreter lock which only allows one thread to
be active at a time. Some, including me, consider this to be generally a good
thing. Below is code which dispatches this function to be calculated on a
process pool. Note that I have created a get_result
function which returns
both the number I sent and if it is prime. I did this because I am going to be
receiving answers in the order in which they are completed, and it is
convenient for me to get back both the number and the boolean indicating it is
prime.
import math
import concurrent.futures
# The definition of is_prime has been cut from this file
# Return the number as well, so we can keep track of which answer is which
def get_result(n):
prime = is_prime(n)
return n, prime
def main():
prime_candidates = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
17,
4]
# create the process pool
with concurrent.futures.ProcessPoolExecutor() as executor:
# submit jobs to the pool. These are concurrent.futures.Future objects.
futures = [executor.submit(get_result, n) for n in prime_candidates]
# As futures are completed they are returned and the result can be obtained
for i, future in enumerate(concurrent.futures.as_completed(futures)):
n, prime = future.result()
if prime:
print("{}: {} is prime".format(i,n))
else:
print("{}: {} is not prime".format(i,n))
if __name__=='__main__':
main()
This version of the code returned in 1.1 seconds. I notice that it started 8 processes on my computer, which is equal to the number of cores in my CPU.
A Naive Approach
Without understanding the difference between asyncio
and
concurrent.futures
, you may be inclined to just modify the above code into
something that uses the new async
and await
keywords, and uses the
asyncio
event loop. That would look something like the code below.
import math
import asyncio
# DO NOT USE THIS. IT IS AN EXAMPLE OF THE WRONG WAY TO DO IT
# The definition of is_prime has been cut from this file
# Return the number as well, so we can keep track of which answer is which
async def get_result(n):
prime = is_prime(n)
return n, prime
# Scheduling the run in the asyncio event loop
async def main():
prime_candidates = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
17,
4]
# Calling the coroutines returns futures.
futures = [get_result(n) for n in prime_candidates]
# As futures are completed they are returned and the result can be obtained
for i, future in enumerate(asyncio.as_completed(futures)):
n, prime = await future
if prime:
print("{}: {} is prime".format(i,n))
else:
print("{}: {} is not prime".format(i,n))
if __name__=='__main__':
# This creates the event loop and runs main in the loop until main returns.
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
When I timed that implementation, it ran in 4.0 seconds and in only one thread in one process. Although we allow the functions to run concurrently, there is no waiting for I/O here, so we have to wait for each function to complete sequentially before all the results are ready. In this case concurrency isn't saving us any time because we don't have a pool of processes running on separate CPUs or CPU cores.
A Proper Mix of Futures
So, can one mix the benefits of asyncio
event loops with
concurrent.futures
pools? The answer is yes, it is possible to let your
event loop know that it can move on while a coroutine waits for a process to
finish on a process pool, and the function that let's us bridge this divide is
the asyncio.AbstractEventLoop.run_in_executor
function. You can think of
this function as creating an asyncio.Future
wrapper around a
concurrent.futures.Future
object. You can then use await
on the return
value of run_in_executor
to have your coroutine wait for the response from
the underlying process pool.
The example below implements this for the is_prime
function. To get the
current event loop we use the asyncio.get_event_loop()
function, and then
use that loop's run_in_executor
method.
import math
import concurrent.futures
import asyncio
# The definition of is_prime has been cut from this file
# Wrapping corouting which waits for return from process pool.
async def get_result(executor, n):
loop = asyncio.get_event_loop()
prime = await loop.run_in_executor(executor, is_prime, n)
return n, prime
# Scheduling the run in the asyncio event loop
async def main():
prime_candidates = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
17,
4]
# create the process pool
with concurrent.futures.ProcessPoolExecutor() as executor:
# Calling the asyncio coroutines returns futures.
futures = [get_result(executor, n) for n in prime_candidates]
# As futures are completed they are returned and the result can be obtained
for i, future in enumerate(asyncio.as_completed(futures)):
n, prime = await future
if prime:
print("{}: {} is prime".format(i,n))
else:
print("{}: {} is not prime".format(i,n))
if __name__=='__main__':
# This creates the event loop and runs main in the loop until main returns.
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
This program runs in 1.2 seconds on my computer, and uses 8 processes in the
process pool. In this case using asyncio
is unnecessary, but if we were to
be running an event driven web server and we had a computationally
intensive process to perform, this would be the pattern to follow to assure
the web server didn't stop serving new events.