“The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor.” (source)
This feature appeared in Python 3.2 but the good news is that it was backported to Python 2 too. You’ll need to install the futures module:
sudo pip install futures
(0) Basic example without concurrency
Let’s take the following basic example:
#!/usr/bin/env python from Queue import Queue import random import time q = Queue() fred = [1,2,3,4,5,6,7,8,9,10] def f(x): if random.randint(0,1): time.sleep(0.1) # res = x * x q.put(res) def main(): for num in fred: f(num) # while not q.empty(): print q.get() if __name__ == "__main__": main()
We have a list of numbers and we want to calculate their squares. The results are stored in a queue. In general, function “
f()” performs a job that can take longer time too (that’s why I added some random waiting). These jobs are executed one after the other but the jobs are independent from each other: calculating the square of 5 doesn’t rely on the square of 4 for instance, i.e. these jobs could be processed parallely.
(1) Using ThreadPoolExecutor
Let’s execute the jobs mentioned above parallely with threads:
#!/usr/bin/env python from Queue import Queue import concurrent.futures import random import time q = Queue() fred = [1,2,3,4,5,6,7,8,9,10] def f(x): if random.randint(0,1): time.sleep(0.1) # res = x * x q.put(res) def main(): with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: for num in fred: executor.submit(f, num) # while not q.empty(): print q.get() #################### if __name__ == "__main__": main()
Here we have a pool (a “list”) in which the jobs are added (see the
for loop). We have 4 threads working on this pool: each thread takes a job out from the pool, executes it, and when the job is done, it takes another job-to-be-processed from the pool. When all the jobs are processed in the pool by the 4 workers, the execution goes on after the “
with” block. The “
with” statement guarantees that the execution is waiting until all worker threads finish. When we reach the “
while” loop, all jobs are processed and all the worker threads finished.
The results are stored in a Queue because it is thread-safe. “The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.”
(2) Using ProcessPoolExecutor
“The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.”
So, instead of threads we will use processes here.
#!/usr/bin/env python import sys import redis import concurrent.futures r = redis.Redis() fred = [1,2,3,4,5,6,7,8,9,10] def check_server(): try: r.info() except redis.exceptions.ConnectionError: print >>sys.stderr, "Error: cannot connect to redis server. Is the server running?" sys.exit(1) def f(x): res = x * x r.rpush("test", res) def main(): with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: for num in fred: executor.submit(f, num) # print r.lrange("test", 0, -1) #################### if __name__ == "__main__": check_server() ### r.delete("test") main()
It’s basically the same, simply
ThreadPoolExecutor was replaced with
Again, we want to store the results in a “list”. However, Queue is not a good choice here because we are using processes here, and Queue is made for threads. I decided to store the results in a redis list. For more information about redis, read this post of mine: redis: getting started. In redis all operations are atomic, thus different processes can safely write the results in it.
When using processes, you might get this error:
Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe
It seems to be a bug, I didn’t have this issue with Python3. A possible workaround is to submit elements slower, i.e. add “
time.sleep(0.01)” after the line “executor.submit(…)”. More info here.
Which solution to use?
Once you have a basic solution without concurrency, it’s quite easy to parallelize the code with concurrent.futures, you just need to add some extra lines. Threads are good for I/O tasks, while processes are good for CPU-bound tasks.
However, you should make some tests because the results can be surprising. Out of curiosity, I tried the three methods above with a simple prime test. The source codes are available here. I go from 1 to 1000, and I test every number if it’s a prime or not. The prime test is very simple, and the whole exercise is CPU-bound.
$ time ./basic.py real 0m0.026s $ time ./with_threads.py real 0m0.138s $ time ./with_processes.py real 0m0.255s
That is, the naive approach was the fastest. Then threads, and finally processes.
I also tried to test numbers up to 100000. Basic: 0.3 sec, threads: 11 sec, processes: 17 sec (on a CPU with 4 cores).
I posed a question on reddit concerning multithreading and GIL. I got interesting answers, you can read them here.