Requests: Requests seems to stucks when using with futures.ThreadPoolExecutor

Created on 23 Jun 2015  路  24Comments  路  Source: psf/requests

Hello,

I'm using Python 2.7.9 with futures (3.0.3) and requests (2.7.0) on Debian (also tested on Win8 and results are same).

The problem is, Requests doesn't timeout and stucks, so it seems my threads never finish their jobs and stops processing queue.

I'm trying to make a multi-threaded web crawler and I'm fetching to-be-crawled URLs from frontier (which returns a json list of domains) and populating a queue with them.

After this I'm populating Thread Pool with the code below

while not url_queue.empty():
    queue_data = url_queue.get()
    task_pool.submit(processItem, queue_data)

In processItem() function, I'm fetching url with get_data() and marking the queue item with task_done()

My get_data() function is as follows

def get_data(fqdn):
    try:
         response = requests.get("http://"+fqdn, headers=headers, allow_redirects=True, timeout=3)

        if response.status_code == requests.codes.ok:
            result = response.text
        else:
            result = ""

    except requests.exceptions.RequestException as e:
        print "ERROR OCCURED:"
        print fqdn
        print e.message
        result  = ""

    return result

If I mark get_data() as comment in processItem(), all threads and queue works fine. If I uncomment it, works fine for most of requests but stucking for some and that affects all queue and script because queue.join() waits for threads to complete requests. I suppose it's a bug of requests module as everything works fine without calling get_data() and as requests doesn't time out the GET request.

Any help will be greatly appreciated... Thank you very much..

Needs More Information Propose Close

Most helpful comment

@metrue to maintain a thread-safe/multiprocess-safe queue, you can use the standard library's Queue implementation. If you're on Python 2

import Queue

task_queue = Queue.Queue()

if you're on Python 3

import queue

task_queue = queue.Queue()

All 24 comments

Have you tried requests-futures? If not, could you reproduce this with requests-futures?

Additionally, is it possible for us to obtain a traceback of where we're getting stuck? I want to know if we're getting stuck during the connection phase (in which case it's our fault) or if we're getting stuck in the read phase (in which case it's httplib's fault).

@Lukasa I would like to provide traceback but how can I do that? I'm getting lots of exceptions because it's uses 200 threads. Generally like these but not in particular order.. And after it stucks and does nothing.


('Connection aborted.', gaierror(-2, 'Name or service not known'))
ERROR OCCURED:
karawanghosting.net
('Connection aborted.', gaierror(-2, 'Name or service not known'))
ERROR OCCURED:
bjjzk.net
HTTPConnectionPool(host='bjjzk.net', port=80): Max retries exceeded with url: / (Caused by ConnectTimeoutError(, 'Connection to bjjzk.net timed out. (connect timeout=3)'))


Can you broaden your except statement to catch all exceptions of any kind, and then re-run your code? I want to see if anything isn't getting logged out.

It's weird, I changed "except requests.exceptions.RequestException as e" to "except Exception as e:" and now it seems working fine (at least still works for longer time than before)

So my theory is that if a thread dies because of an exception it can't get waited on.

After long time it stucked again and unfortunately doesn't give any different exception than many others. :( Do you have any other ideas to try?

 ('Connection aborted.', gaierror(-2, 'Name or service not known'))

Indicates a DNS issue.

 HTTPConnectionPool(host='bjjzk.net', port=80): Max retries exceeded with url: / (Caused by ConnectTimeoutError(, 'Connection to bjjzk.net timed out. (connect timeout=3)'))

Sounds like you're being ratelimited by bjjzk.net.

Have you tried using requests-futures?

No I didn't tried requests-futures yet and it's not the only domain that I crawl. I'm crawling millions of domains and accessing them for only one time, so being ratelimited is not likely.

Now I did a little change on get_data() function, declared "result" variable with empty string on top of try-catch block. I'm not sure is there any chance to pass try-catch block with or without any exception and returning null result but I wanted to try.

Same issue here.

Also developing a web crawler intended to process a continuous stream of URLs.

My code behaviour is something like the following:

from concurrent.futures import ThreadPoolExecutor
import logging
import random
import time

import requests

NTHREADS = 2
DELAY_SECONDS = 0.5
URLS = ['https://google.com', 'http://yahoo.com', 'http://github.com', 'https://bing.com']

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

def callback():
    response = requests.get(random.choice(URLS), timeout=120)
    logging.info('status_code=%d ok=%s', response.status_code, response.ok)

with ThreadPoolExecutor(NTHREADS) as executor:
    while True:
        time.sleep(DELAY_SECONDS) # do not hit the site too hard
        queued_works = executor._work_queue.qsize()
        logging.info('queued works: %s', queued_works)
        if queued_works < 10: # do not flood executor's queue
            executor.submit(callback)

I wasn't able to reproduce this very same error for this small list of URLs, but on my production environment (after some time running - let's say 2~3 hours), the log messages starts looking this:

2015-10-01 16:51:41,488 : INFO : queued works: 10
2015-10-01 16:51:41,489 : INFO : queued works: 10
2015-10-01 16:51:41,489 : INFO : queued works: 10
2015-10-01 16:51:41,489 : INFO : queued works: 10
2015-10-01 16:51:41,490 : INFO : queued works: 10
2015-10-01 16:51:41,490 : INFO : queued works: 10
2015-10-01 16:51:41,491 : INFO : queued works: 10
2015-10-01 16:51:41,491 : INFO : queued works: 10
2015-10-01 16:51:41,492 : INFO : queued works: 10
2015-10-01 16:51:41,492 : INFO : queued works: 10
2015-10-01 16:51:41,492 : INFO : queued works: 10
2015-10-01 16:51:41,493 : INFO : queued works: 10
.... (and goes like this forever - like, not even a few days it would stop)

I checked ThreadPoolExecutor's implementation and I'm pretty convinced the problem is NOT related to it. The code just seems to get stuck on line 55:

result = self.fn(*self.args, **self.kwargs)

edit: by "the issue is not related to ThreadPoolExecutor", I mean: it doesn't matter if callback() raises an exception or not; it's supposed to work just fine. The thing is that _WorkItem:run() method never stops.

edit 2: python 2.7

@eltermann Can you please add timeouts to your requests call and see if the problem persists?

@Lukasa, this snippet doesn't have timeout, but my call does. Edited the snippet anyway.

@eltermann Interesting. It would be really insightful to try to get stacks from those threads.

@Lukasa, what do you recommend to print an useful stack? And where to place it?

Good question. Try this.

@sezginriggs so have you resolve the problem ? I am stuck with it also.

@metrue, I changed my approach to use processes instead of threads -- also, because I found that python does not really parallelize threads execution because of GIL.

You have two choices:

  1. you deal with multiprocesses yourself in your python code -- I recommend looking at how scrapy does the parallelization (even though it uses twisted under the hood and not requests)
  2. (and that's what I did) you write a simple "stream-consumer" python program and let something else do the parallelization (something like Kafka or Storm) -- then, you start multiple processes for your "stream-consumer" and voil谩

@eltermann , I do know 'python does not really parallelize threads execution because of GIL'. So I am using ProcessPoolExecutor instead of ThreadPoolExecutor, But Still, requests also stucks.

If you are using a process pool executor you _must not_ use a Session that is shared across those processes.

@Lukasa

Right, I realized that, I am not a Python expert, but I wonder what's the best practice of sharing data (let's say a global task queue) between those process ?

@metrue to maintain a thread-safe/multiprocess-safe queue, you can use the standard library's Queue implementation. If you're on Python 2

import Queue

task_queue = Queue.Queue()

if you're on Python 3

import queue

task_queue = queue.Queue()

Have you fixed this problem?

Because I think, that I had and have the same problem: example. Sometimes this example has a deadlock.

@antongulikov We have not. We are still missing a bug chunk of debugging data as discussed earlier in the thread.

after you make the requests, make sure to kill the process. driver.close AND driver.quit. That should both keep your mem stable across all those requests and keep you from getting jobs stuck due to mem issues.

Was this page helpful?
0 / 5 - 0 ratings