Home > python > concurrent.futures

concurrent.futures

The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor.” (source)

This feature appeared in Python 3.2 but the good news is that it was backported to Python 2 too. You’ll need to install the futures module:

sudo pip install futures

(0) Basic example without concurrency
Let’s take the following basic example:

#!/usr/bin/env python

from Queue import Queue
import random
import time

q = Queue()
fred = [1,2,3,4,5,6,7,8,9,10]

def f(x):
    if random.randint(0,1):
        time.sleep(0.1)
    #
    res = x * x
    q.put(res)

def main():
    for num in fred:
        f(num)
    #
    while not q.empty():
        print q.get()

if __name__ == "__main__":
    main()

We have a list of numbers and we want to calculate their squares. The results are stored in a queue. In general, function “f()” performs a job that can take longer time too (that’s why I added some random waiting). These jobs are executed one after the other but the jobs are independent from each other: calculating the square of 5 doesn’t rely on the square of 4 for instance, i.e. these jobs could be processed parallely.

(1) Using ThreadPoolExecutor
Let’s execute the jobs mentioned above parallely with threads:

#!/usr/bin/env python

from Queue import Queue
import concurrent.futures
import random
import time

q = Queue()
fred = [1,2,3,4,5,6,7,8,9,10]

def f(x):
    if random.randint(0,1):
        time.sleep(0.1)
    #
    res = x * x
    q.put(res)

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        for num in fred:
            executor.submit(f, num)
    #
    while not q.empty():
        print q.get()

####################

if __name__ == "__main__":
    main()

Here we have a pool (a “list”) in which the jobs are added (see the for loop). We have 4 threads working on this pool: each thread takes a job out from the pool, executes it, and when the job is done, it takes another job-to-be-processed from the pool. When all the jobs are processed in the pool by the 4 workers, the execution goes on after the “with” block. The “with” statement guarantees that the execution is waiting until all worker threads finish. When we reach the “while” loop, all jobs are processed and all the worker threads finished.

The results are stored in a Queue because it is thread-safe. “The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.

(2) Using ProcessPoolExecutor
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

So, instead of threads we will use processes here.

#!/usr/bin/env python

import sys
import redis
import concurrent.futures

r = redis.Redis()
fred = [1,2,3,4,5,6,7,8,9,10]

def check_server():
    try:
        r.info()
    except redis.exceptions.ConnectionError:
        print >>sys.stderr, "Error: cannot connect to redis server. Is the server running?"
        sys.exit(1)

def f(x):
    res = x * x
    r.rpush("test", res)

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        for num in fred:
            executor.submit(f, num)
    #
    print r.lrange("test", 0, -1)

####################

if __name__ == "__main__":
    check_server()
    ###
    r.delete("test")
    main()

It’s basically the same, simply ThreadPoolExecutor was replaced with ProcessPoolExecutor.

Again, we want to store the results in a “list”. However, Queue is not a good choice here because we are using processes here, and Queue is made for threads. I decided to store the results in a redis list. For more information about redis, read this post of mine: redis: getting started. In redis all operations are atomic, thus different processes can safely write the results in it.

When using processes, you might get this error:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe

It seems to be a bug, I didn’t have this issue with Python3. A possible workaround is to submit elements slower, i.e. add “time.sleep(0.01)” after the line “executor.submit(…)”. More info here.

Which solution to use?
Once you have a basic solution without concurrency, it’s quite easy to parallelize the code with concurrent.futures, you just need to add some extra lines. Threads are good for I/O tasks, while processes are good for CPU-bound tasks.

However, you should make some tests because the results can be surprising. Out of curiosity, I tried the three methods above with a simple prime test. The source codes are available here. I go from 1 to 1000, and I test every number if it’s a prime or not. The prime test is very simple, and the whole exercise is CPU-bound.

Results:

$ time ./basic.py
real    0m0.026s
$ time ./with_threads.py
real    0m0.138s
$ time ./with_processes.py
real    0m0.255s

That is, the naive approach was the fastest. Then threads, and finally processes.

I also tried to test numbers up to 100000. Basic: 0.3 sec, threads: 11 sec, processes: 17 sec (on a CPU with 4 cores).

I posed a question on reddit concerning multithreading and GIL. I got interesting answers, you can read them here.

  1. Ravit Ben Shlush
    January 22, 2015 at 21:18

    hi i installed futures using what you suggested: sudo pip install futures
    and i still get ImportError: No module named concurrent.futures

    • January 22, 2015 at 22:48

      Try it with Python 3 if you can. Python 3 contains this module in its stdlib.

  2. June 2, 2015 at 22:31

    you don’t need to install python 3, you just do a # pip install futures and that’s it.

  1. No trackbacks yet.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: