Archive

Posts Tagged ‘threads’

Flask: cannot fetch a URL on localhost

November 2, 2013 Leave a comment

Problem
I had a simple Flask application that included a web service, i.e. calling an address returns some value (a JSON result for instance). I wanted to reuse this service inside the app. by simply calling it (via the HTTP protocol) and getting the return value. However, this call never finished. The browser was loading and I got no result.

What happened?
I posted the problem here and it turned out that “the development server is single threaded, so when you call a url served by that application from within the application itself, you create a deadlock situation.” Hmm…

My first idea was to replace the dev. server with a more serious one. With gunicorn I could make it work:

gunicorn -w 4 -b 127.0.0.1:5000 hello:app

However, I deploy the app. on Heroku, where you have just 1 worker for free, so it behaves just like the dev. server!

Solution
I had to rewrite the code to eliminate this extra call. (Or, I could have kept this call if I had had at least 2 worker threads.)

Example
Here is a simplified code that demonstrates the problem:

#!/usr/bin/env python

# hello.py

from flask import Flask
from flask import url_for
import requests

app = Flask(__name__)

@app.route('/')
def hello_world():
    return "Hello, World!"

@app.route('/get')
def get():
    url = url_for("hello_world", _external=True)    # full URL
    print '!!!', url    # debug info
    r = requests.get(url)    # it hangs at this point
    return "from get: " + r.text

if __name__ == "__main__":
    app.run(debug=True)
Advertisements

Python 3 is slower than Python 2?

August 15, 2013 2 comments

Recently I was playing with concurrent.futures. Following a comment on reddit, I got to the presentation of David Beazley entitled Understanding the Python GIL.

It’s a very interesting talk and from this I learned that Python 3.2 got a new GIL implementation! Out of curiosity I compared the performance of Python 2.7 and 3.3. The test machine had 4 cores. I made a CPU bound test script with three variations: (1) basic, single-threaded version, (2) using 4 threads, and (3) using 4 processes.

The results were surprising for me because Python 2.7 turned out to be faster!

(Legends: Py2 = Python 2.7.4, Py3 = Python 3.3.1)

basic.py:
Py2: 5.32 sec, Py3: 9.66 sec

with_threads:
Py2: 13.41 sec, Py3: 17.32 sec

with_processes:
Py2: 1.28 sec, Py3: 2.27 sec

You can also try the scripts, they are here.

Categories: python Tags: , , ,

Download files with threads easily

August 13, 2013 Leave a comment

Problem
You have a file with a list of URLs that you want to download. You already know the wget trick:

wget -i down.txt

However, if you want to fetch a lot of files, it can be slow.

Solution
Well, let’s launch wget instances parallelly and fetch those files quickly. With concurrent.futures, it’s just a few lines:

#!/usr/bin/env python

import os
import concurrent.futures
from threading import Lock

lock = Lock()
INPUT = "down.txt"
THREADS = 10

def download(url):
    cmd = "wget -q {url}".format(url=url)
    with lock:
        print cmd
    os.system(cmd)

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as ex:
        with open(INPUT) as f:
            for line in f:
                line = line.rstrip("\n")
                ex.submit(download, line)

##########

if __name__ == "__main__":
    main()

Thanks to defnull at reddit who directed me towards conncurrent.futures.

Download PyCon US 2012 videos in a multithreaded way

August 13, 2013 Leave a comment

Note: I wrote this script some time ago when I didn’t know about concurrent.futures. This script does the job nicely but it’s a bit verbose. With concurrent.futures it would be just a few lines.

With the following script you can download all the videos of PyCon US 2012:

#!/usr/bin/env python

"""
Download PyCon US 2012 videos in a multithreaded way.
Requirement: youtube-dl script.

written by Jabba Laci, 2013 (jabba.laci@gmail.com)
https://pythonadventures.wordpress.com/

PyCon US 2012 videos: http://pyvideo.org/category/17
youtube-dl: http://rg3.github.io/youtube-dl/

Usage:
======
1) download youtube-dl and put it somewhere in the PATH
2) create a subdirectory called "download"
3) launch this script

Total size of the videos is about 27.5 GB.
The download process can take several hours.
You can interrupt the downloading with "killall python".
If you re-launch the script, the downloading will resume.

Tested under Linux with Python 2.7.
"""

import os
from Queue import Queue
from threading import Thread, Lock

TO_DIR = "download"
THREADS = 10

lock = Lock()
q = Queue()
threads = []

DATA = ["AeQxx4zXd5Q", "O8WXXtDUUOE", "ktLyuWoRHH8", "tKTW8Jd0BlQ", "A3Qe5wUbXzM",
"ZwBiQEHS4T8", "Rmg4-Ae1P1o", "9XlPKEessD8", "MIAKOMzRl1I", "q_i3CHNITQ4",
"3CSxYKbxfPU", "4bWC_VXffq4", "v7HH_CNIdXc", "ziz2lh-14i8", "dhUo_lpD7v0",
"WMUXMqYhQ-M", "qLXllxd4Z1c", "3FcAcE3Zq2Q", "U1Y5Uxn2Rcw", "x-JDra36m38",
"Me9SZohibPQ", "KUOoStyV7Zs", "Qh4Gkkgi1Mw", "Hx6VxszpvsY", "CFt6QrzavH0",
"AMMBYLB3qd0", "fVpvd7OX6PQ", "OceCWIqZt7I", "VuFW0PkNS74", "5jRLjGWWaHs",
"_CPNLY_Gf7s", "67l4czkKsz8", "FCiA6e44aOI", "uUEwEMMCZhE", "cY7pE7vX6MU",
"vP6j7VDpPrI", "QrITN6GZDu4", "euh9ZQi339o", "EBRMq2Ioxsc", "3BYN3ouwkRA",
"tCUdeLIj4hE", "Wk8zAr0R9zQ", "NUQMr5R3dlk", "twQKAoq2OPE", "dJJDndQrsSw",
"Q0Q9K93bK-4", "5YQrFiWa50M", "VMIj6eB9baY", "KOfB5WQb39g", "M5IPlMe83yI",
"2gha47uSk5c", "lJL2asANiyM", "YHXX3KuB23Q", "LddeJ06JoXE", "gpKMwPoldak",
"BoMQqW0lxVE", "NkUTLRZBWLM", "fekA2mRGTTE", "b7R3-_ViNxk", "nhr-YErfW8k",
"WZoeqnsY9AY", "Wh9a0obtQUQ", "ahM4GBZ-6qg", "399c-ycBvo4", "kdZuUIj4lMo",
"E09qigk_hnY", "nvkCqFLtcJI", "NIcijUt-HlE", "l_HBRhcgeuQ", "dX3DRdFKW_E",
"y_cXzaymXm0", "RBOScqRGHZA", "QPgqfnKG_T4", "fWONoZvTi80", "sgHbC6udIqc",
"1CjX385y3e4", "hnhN2_TpY8g", "GxyfYEe8MiQ", "wslWYg0CTkY", "54XwSUC8klI",
"6wZoBbE-rOo", "Zv26xHYlc8s", "N4zdWLuSbV0", "H841U6RhrDU", "bwwf_HbEJQM",
"qmgh14LUOjQ", "qTwvObrRGdY", "Ycvg0PCQ-sM", "ickNQcNXiS4", "C9K8DOe1zWw",
"47NSfuuuMfs", "3UHE-zD1r_M", "bTXert2uRco", "Bt2HStzaBzE", "z1RQMm37Xmw",
"LnVkLXRIbIg", "P5ad6NpjR3M", "hyzPYaAmVOc", "tYW52SLy_w0", "JOXwclgvXB0",
"188mXjwdkak", "9G6-GksU7Ko", "TmuEDxX1FDQ", "jXlR0Icvvh8", "vfYul2E56fo",
"cSbD5SKwak0", "bGWytn-Ff9E", "hvPYuqzTPIk", "RAxiiRPHS9k", "Mv3xgBQJPaE",
"jOu0D9ttCFI", "4-TwdBuTR1A", "yflKOoAohEk", "ANhTacigaf8", "vfPtGsSJldg",
"YdnBK5yO4zU", "26wgEsg9Mcc", "R9ITLdmfdLI", "KUpIFhNW89A", "OBbvj0WWT-g",
"9q8LTZSvpr8", "qbYYamU42Sw", "-Mx1JVTFOBY", "AZDWveIdqjY", "__s45TTXxps",
"QGfxLXoMpPk", "3dMq_3UUPxg", "9LVqBQcFmyw", "Adr_QuDZxuM", "YyEReiAYGlU",
"G-lGCC4KKok", "1VZfL9JVgFg", "n6145JSeqWc", "XGF3Qu4dUqk", "Xu5EhKVZdV8",
"o9pEzgHorH0", "miGolgp9xq8", "Xk6gQ6s2QjU", "tYk4_Nzl-Gg", "sdkAXM36C7M",
"L-fXOoxrt0M", "Iw9-GckD-gQ", "xHqlzuPq_qQ", "duc3jYgAaR0", "Zd5dfooZWG4",
"g0CankXpFZg", "ULdDuwf48kM", "P7SVi0YTIuE", "Pi9NpxAvYSs", "qgGqaBAEy3Q",
"bobeo5kFz1g", "w26x-z-BdWQ", "t_ziKY1ayCo", "Bs6-sai1fKE", "oZw8m_lyhvo",
"hp5ymCrD9yw", "2G5YTlheCbw", "SULKL7TMRsU", "Thd8yoBou7k", "52wxGESwQSA",
"NBSosX8xiRk"]


def read_urls():
    global q
    #
    for yid in DATA:
        q.put("https://www.youtube.com/watch?v={yid}".format(yid=yid))


class DownLoadThread(Thread):
    def __init__(self, thread_id):
        super(DownLoadThread, self).__init__()
        self.thread_id = thread_id

    def run(self):
        global q
        #
        while not q.empty():
            url = q.get()
            cmd = "youtube-dl {url} -t -c 1>/dev/null".format(url=url)
            with lock:
                print "{tid}: START {cmd}".format(tid=self.thread_id, cmd=cmd)
                print "# queue size:", q.qsize()
            os.system(cmd)
            with lock:
                print "{tid}: STOP {cmd}".format(tid=self.thread_id, cmd=cmd)


def main():
    global threads
    #
    read_urls()
    #
    os.chdir(TO_DIR)
    #
    for i in xrange(THREADS):
        t = DownLoadThread(i)
        threads.append(t)

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    print "# END"

##########

if __name__ == "__main__":
    main()

Links

concurrent.futures

August 13, 2013 3 comments

The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor.” (source)

This feature appeared in Python 3.2 but the good news is that it was backported to Python 2 too. You’ll need to install the futures module:

sudo pip install futures

(0) Basic example without concurrency
Let’s take the following basic example:

#!/usr/bin/env python

from Queue import Queue
import random
import time

q = Queue()
fred = [1,2,3,4,5,6,7,8,9,10]

def f(x):
    if random.randint(0,1):
        time.sleep(0.1)
    #
    res = x * x
    q.put(res)

def main():
    for num in fred:
        f(num)
    #
    while not q.empty():
        print q.get()

if __name__ == "__main__":
    main()

We have a list of numbers and we want to calculate their squares. The results are stored in a queue. In general, function “f()” performs a job that can take longer time too (that’s why I added some random waiting). These jobs are executed one after the other but the jobs are independent from each other: calculating the square of 5 doesn’t rely on the square of 4 for instance, i.e. these jobs could be processed parallely.

(1) Using ThreadPoolExecutor
Let’s execute the jobs mentioned above parallely with threads:

#!/usr/bin/env python

from Queue import Queue
import concurrent.futures
import random
import time

q = Queue()
fred = [1,2,3,4,5,6,7,8,9,10]

def f(x):
    if random.randint(0,1):
        time.sleep(0.1)
    #
    res = x * x
    q.put(res)

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        for num in fred:
            executor.submit(f, num)
    #
    while not q.empty():
        print q.get()

####################

if __name__ == "__main__":
    main()

Here we have a pool (a “list”) in which the jobs are added (see the for loop). We have 4 threads working on this pool: each thread takes a job out from the pool, executes it, and when the job is done, it takes another job-to-be-processed from the pool. When all the jobs are processed in the pool by the 4 workers, the execution goes on after the “with” block. The “with” statement guarantees that the execution is waiting until all worker threads finish. When we reach the “while” loop, all jobs are processed and all the worker threads finished.

The results are stored in a Queue because it is thread-safe. “The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.

(2) Using ProcessPoolExecutor
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

So, instead of threads we will use processes here.

#!/usr/bin/env python

import sys
import redis
import concurrent.futures

r = redis.Redis()
fred = [1,2,3,4,5,6,7,8,9,10]

def check_server():
    try:
        r.info()
    except redis.exceptions.ConnectionError:
        print >>sys.stderr, "Error: cannot connect to redis server. Is the server running?"
        sys.exit(1)

def f(x):
    res = x * x
    r.rpush("test", res)

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        for num in fred:
            executor.submit(f, num)
    #
    print r.lrange("test", 0, -1)

####################

if __name__ == "__main__":
    check_server()
    ###
    r.delete("test")
    main()

It’s basically the same, simply ThreadPoolExecutor was replaced with ProcessPoolExecutor.

Again, we want to store the results in a “list”. However, Queue is not a good choice here because we are using processes here, and Queue is made for threads. I decided to store the results in a redis list. For more information about redis, read this post of mine: redis: getting started. In redis all operations are atomic, thus different processes can safely write the results in it.

When using processes, you might get this error:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe

It seems to be a bug, I didn’t have this issue with Python3. A possible workaround is to submit elements slower, i.e. add “time.sleep(0.01)” after the line “executor.submit(…)”. More info here.

Which solution to use?
Once you have a basic solution without concurrency, it’s quite easy to parallelize the code with concurrent.futures, you just need to add some extra lines. Threads are good for I/O tasks, while processes are good for CPU-bound tasks.

However, you should make some tests because the results can be surprising. Out of curiosity, I tried the three methods above with a simple prime test. The source codes are available here. I go from 1 to 1000, and I test every number if it’s a prime or not. The prime test is very simple, and the whole exercise is CPU-bound.

Results:

$ time ./basic.py
real    0m0.026s
$ time ./with_threads.py
real    0m0.138s
$ time ./with_processes.py
real    0m0.255s

That is, the naive approach was the fastest. Then threads, and finally processes.

I also tried to test numbers up to 100000. Basic: 0.3 sec, threads: 11 sec, processes: 17 sec (on a CPU with 4 cores).

I posed a question on reddit concerning multithreading and GIL. I got interesting answers, you can read them here.