Httpx: Python 3.6 RuntimeError: read() called while another coroutine is already waiting for incoming data

Created on 24 Sep 2019  Â·  13Comments  Â·  Source: encode/httpx

Running in python 3.7 and 3.8 have been going beautifully, but now I've decided I wanted to add my API client using httpx to a Python 3.6 project, and I'm getting the above error whenever I try to call httpx. Below is the stack trace of one of the calls:

src/emcee/api.py:119: in get_with_retry
    return self.session.get(path, params=params)
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:818: in get
    proxies=proxies,
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:729: in request
    proxies=proxies,
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:760: in send
    async_response = concurrency_backend.run(coroutine, *args, **kwargs)
.nox/test-3-6/lib/python3.6/site-packages/httpx/concurrency/asyncio.py:241: in run
    return self.loop.run_until_complete(coroutine(*args, **kwargs))
../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/base_events.py:484: in run_until_complete
    return future.result()
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:230: in _get_response
    return await get_response(request)
.nox/test-3-6/lib/python3.6/site-packages/httpx/middleware/redirect.py:31: in __call__
    response = await get_response(request)
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:191: in get_response
    request, verify=verify, cert=cert, timeout=timeout
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/connection_pool.py:126: in send
    raise exc
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/connection_pool.py:121: in send
    request, verify=verify, cert=cert, timeout=timeout
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/connection.py:65: in send
    response = await self.h11_connection.send(request, timeout=timeout)
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/http11.py:53: in send
    http_version, status_code, headers = await self._receive_response(timeout)
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/http11.py:133: in _receive_response
    event = await self._receive_event(timeout)
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/http11.py:174: in _receive_event
    self.READ_NUM_BYTES, timeout, flag=self.timeout_flag
.nox/test-3-6/lib/python3.6/site-packages/httpx/concurrency/asyncio.py:79: in read
    data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/tasks.py:339: in wait_for
    return (yield from fut)
../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/streams.py:634: in read
    yield from self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <StreamReader t=<asyncio.sslproto._SSLProtocolTransport object at 0x10b508470>>, func_name = 'read'

    @coroutine
    def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.

        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError('%s() called while another coroutine is '
>                              'already waiting for incoming data' % func_name)
E           RuntimeError: read() called while another coroutine is already waiting for incoming data

../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/streams.py:452: RuntimeError

Python: 3.6.9
httpx: 0.7.3

It appears to be with the way I've instantiated TimeoutConfig (with no values, to allow infinite timeout). Here's a minimally reproducible example:

>>> import httpx

>>> no_timeout = httpx.config.TimeoutConfig()
>>> session = httpx.Client(timeout=no_timeout)
>>> session.get("https://httpbin.org")
<Response [200 OK]>

Works fine in 3.7+, fails in 3.6

bug

All 13 comments

Thanks for reporting this :)

Could be that in 3.6 asyncio’s read() only allowed one coroutine to call it at a time, while that constraint was relieved in 3.7?

Trio has a similar constraint on the write operation, and I remember we solved it using a simple lock.

Do you get this behavior with the usual timeout config?

I do not. Further testing giving no timeout value to httpx.Client i.e.

session = httpx.Client()

or giving a concrete timeout value i.e.

timeout = httpx.config.TimeoutConfig(timeout=300)
session = httpx.Client(timeout=timeout)

both proceed as expected, with no RuntimeError in py3.6 or others.

I did see something about using an asyncio lock in my searches... https://stackoverflow.com/a/25799871

My question is why would more than one .read() coroutine call be active at any one time?

@sethmlarson I debugged it a bit and it's a race condition between asyncio cancelling the previous .read() and us starting the new one in case of read timeout. I'll file a fix. :)

I also get the same error with Python 3.7, the trace is:

ERROR:__main__:Error
Traceback (most recent call last):
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/widgets/owimageembedding.py", line 283, in __set_results
    embeddings = f.result()
  File "/Users/primoz/miniconda3/lib/python3.7/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/Users/primoz/miniconda3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/Users/primoz/miniconda3/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/widgets/owimageembedding.py", line 235, in run_embedding
    image_processed_callback=advance)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/image_embedder.py", line 109, in __call__
    return self.from_table(*args, **kwargs)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/image_embedder.py", line 142, in from_table
    file_paths_valid, image_processed_callback)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 75, in from_file_paths
    paths, repeats_count, image_processed_callback))
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 120, in embedd_batch
    embeddings = await asyncio.gather(*requests)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 173, in _send_to_server
    client, im, url, proc_callback)
  File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 194, in _send_request
    data=image
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 484, in post
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 626, in request
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 650, in send
    trust_env=trust_env,
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 265, in _get_response
    return await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/middleware/redirect.py", line 31, in __call__
    response = await get_response(request)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 240, in get_response
    await response.read()
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 996, in read
    self._content = b"".join([part async for part in self.stream()])
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 996, in <listcomp>
    self._content = b"".join([part async for part in self.stream()])
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 1007, in stream
    async for chunk in self.raw():
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 1035, in raw
    async for part in self._raw_stream:
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 192, in body_iter
    event = await self.receive_event(stream_id, timeout)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 206, in receive_event
    data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
  File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/concurrency/asyncio.py", line 114, in read
    data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 640, in read
    await self._wait_for_data('read')
  File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 460, in _wait_for_data
    f'{func_name}() called while another coroutine is '
RuntimeError: read() called while another coroutine is already waiting for incoming data

My code looks like this:

async def embedd_batch(self, file_paths, n_repeats, proc_callback=None):
    requests = []
    async with httpx.AsyncClient(timeout=self.timeouts, base_url=self.server_url) as client:
        for p in file_paths:
            requests.append(self._send_to_server(p, n_repeats, proc_callback, client))

        embeddings = await asyncio.gather(*requests)
   return embeddings

async def __wait_until_released(self):
    """This function makes sure than not more that 100 requests are sent at the same time and also more than 100 images are not loaded to memory"""
    while self.num_parallel_requests >= self.MAX_PARALLEL:
        await asyncio.sleep(0.1)

async def _send_to_server(self, image, retry_n, proc_callback, client):
    await self.__wait_until_released()

    self.num_parallel_requests += 1
    # load image
    im = self._image_loader.load_image_bytes(image, self._im_size)
    if im is None:
        self.num_parallel_requests -= 1
        return None

        url = f"/image/{self._model}?machine={self.machine_id}" \
              f"&session={self.session_id}&retry={retry_n}"
        emb = await self._send_request(client, im, url, proc_callback)

        self.num_parallel_requests -= 1
        return emb

@staticmethod
async def _send_request(client, image, url, callback):
    headers = {
        'Content-Type': 'image/jpeg',
        'Content-Length': str(len(image))
    }
    response = await client.post(
        url,
        headers=headers,
        data=image
    )
    return response

As you can see from the code it is implemented that only max 100 requests are sent to server at the same time.

@florimondmanca is it an error or did I make a mistake when using httpx?

@PrimozGodec I'm not sure about the error you've got. We'd need to narrow down in which conditions this error occurs, as currently it is not very clear.

Also, I noticed you have custom logic to rate-limite the requests. Did you know that HTTPX's connection pooling already performed this kind of limiting, also with a 100 hard limit by default? You could leverage this by using a lazy (i.e. streaming) request body. This would also allow you not to have to deal with Content-Length etc:

async def send_request(...):
    async def body():
        yield load_image(...)

    await client.post(url, headers=headers, data=body())
    ...

Can you see if your code can be adapted to use this idea, and then whether the asyncio error is still present?

@florimondmanca thank you a lot for your suggestion and fast reply. I am awarer of the HTTPX limit of the number of requests but I made a custom limit since I didn't want that all images load in memory at the same time. Your idea of lazy loading is very good but the thing is that I removed the part of the code from _send_to_server function. Since we are performing a local caching we need to load an image to decide whether to send it to the server or not. If an image is already in a local cache, we do not send it to the server (do not call _ send_request function). If you want to see complete code it is here: https://github.com/biolab/orange3-imageanalytics/blob/2940bb04c1e3d8169c6dee08986377d51c42f460/orangecontrib/imageanalytics/server_embedder.py

I would really like to use lazy loading that you suggested but still, I do not want to load images twice. I know that the solution I implemented is not the best but still, currently, I do not have the other idea.

I see… If you are able to narrow down the situation when the error occurs to a small reproducible example, I suggest you'd open a separate issue so that we can more easily track this other edge case. (Although it's the same error, this particular issue was related to an edge case on Python 3.6.) Thanks!

Yep, right now I am trying to make a standalone smallest example to reproduce the error.

I was getting a similar error while trying to use the same AsyncClient, to make multiple requests in parallel. Currently, this is not supported (see https://github.com/encode/httpx/pull/52). httpx.AsyncClient is NOT the same as aiohttp.ClientSession.

I was doing something like this:

async def print_content_from_url(url, client):
    res = await client.get(url)
    bytes_res = await res.read()
    str_res = bytes_res.decode()
    print(str_res)

async def main():
    urls = ['https://iluxonchik.me', 'https://iluxonchik.github.io', 'https://google.com']*100
    async with httpx.AsyncClient() as client:
        await asyncio.gather(*(print_content_from_url(url, client) for url in urls))

asyncio.run(main())    

Running the code above results in the following exception:

RuntimeError: read() called while another coroutine is already waiting for incoming data

Changing the code to the following fixes the problem:

async def print_content_from_url(url):
    async with httpx.AsyncClient() as client:
        res = await client.get(url)
        bytes_res = await res.read()
        print(bytes_res)

async def main():
    urls = ['https://iluxonchik.me', 'https://iluxonchik.github.io', 'https://google.com']*100
    await asyncio.gather(*(print_content_from_url(url) for url in urls))

asyncio.run(main())

Again, keep a close look on https://github.com/encode/httpx/pull/52. If you're reading this in the future (hello 2020 and beyond 👋), you might already have an alternative solution to use.

@iluxonchik Can we continue discussion on this in #527?

Also, I think you don't need to .read() the response and decode it yourelf. You can just use res.text, which will take care of applying appropriate decoders.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

florimondmanca picture florimondmanca  Â·  4Comments

tomchristie picture tomchristie  Â·  3Comments

FlorianREGAZ picture FlorianREGAZ  Â·  4Comments

florimondmanca picture florimondmanca  Â·  4Comments

sethmlarson picture sethmlarson  Â·  5Comments