Running in python 3.7 and 3.8 have been going beautifully, but now I've decided I wanted to add my API client using httpx to a Python 3.6 project, and I'm getting the above error whenever I try to call httpx. Below is the stack trace of one of the calls:
src/emcee/api.py:119: in get_with_retry
return self.session.get(path, params=params)
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:818: in get
proxies=proxies,
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:729: in request
proxies=proxies,
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:760: in send
async_response = concurrency_backend.run(coroutine, *args, **kwargs)
.nox/test-3-6/lib/python3.6/site-packages/httpx/concurrency/asyncio.py:241: in run
return self.loop.run_until_complete(coroutine(*args, **kwargs))
../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/base_events.py:484: in run_until_complete
return future.result()
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:230: in _get_response
return await get_response(request)
.nox/test-3-6/lib/python3.6/site-packages/httpx/middleware/redirect.py:31: in __call__
response = await get_response(request)
.nox/test-3-6/lib/python3.6/site-packages/httpx/client.py:191: in get_response
request, verify=verify, cert=cert, timeout=timeout
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/connection_pool.py:126: in send
raise exc
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/connection_pool.py:121: in send
request, verify=verify, cert=cert, timeout=timeout
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/connection.py:65: in send
response = await self.h11_connection.send(request, timeout=timeout)
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/http11.py:53: in send
http_version, status_code, headers = await self._receive_response(timeout)
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/http11.py:133: in _receive_response
event = await self._receive_event(timeout)
.nox/test-3-6/lib/python3.6/site-packages/httpx/dispatch/http11.py:174: in _receive_event
self.READ_NUM_BYTES, timeout, flag=self.timeout_flag
.nox/test-3-6/lib/python3.6/site-packages/httpx/concurrency/asyncio.py:79: in read
data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/streams.py:634: in read
yield from self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <StreamReader t=<asyncio.sslproto._SSLProtocolTransport object at 0x10b508470>>, func_name = 'read'
@coroutine
def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
"""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError('%s() called while another coroutine is '
> 'already waiting for incoming data' % func_name)
E RuntimeError: read() called while another coroutine is already waiting for incoming data
../../../.pyenv/versions/3.6.9/lib/python3.6/asyncio/streams.py:452: RuntimeError
Python: 3.6.9
httpx: 0.7.3
It appears to be with the way I've instantiated TimeoutConfig (with no values, to allow infinite timeout). Here's a minimally reproducible example:
>>> import httpx
>>> no_timeout = httpx.config.TimeoutConfig()
>>> session = httpx.Client(timeout=no_timeout)
>>> session.get("https://httpbin.org")
<Response [200 OK]>
Works fine in 3.7+, fails in 3.6
Thanks for reporting this :)
Could be that in 3.6 asyncio’s read() only allowed one coroutine to call it at a time, while that constraint was relieved in 3.7?
Trio has a similar constraint on the write operation, and I remember we solved it using a simple lock.
Do you get this behavior with the usual timeout config?
I do not. Further testing giving no timeout value to httpx.Client i.e.
session = httpx.Client()
or giving a concrete timeout value i.e.
timeout = httpx.config.TimeoutConfig(timeout=300)
session = httpx.Client(timeout=timeout)
both proceed as expected, with no RuntimeError in py3.6 or others.
I did see something about using an asyncio lock in my searches... https://stackoverflow.com/a/25799871
My question is why would more than one .read() coroutine call be active at any one time?
@sethmlarson I debugged it a bit and it's a race condition between asyncio cancelling the previous .read() and us starting the new one in case of read timeout. I'll file a fix. :)
I also get the same error with Python 3.7, the trace is:
ERROR:__main__:Error
Traceback (most recent call last):
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/widgets/owimageembedding.py", line 283, in __set_results
embeddings = f.result()
File "/Users/primoz/miniconda3/lib/python3.7/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/Users/primoz/miniconda3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/Users/primoz/miniconda3/lib/python3.7/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/widgets/owimageembedding.py", line 235, in run_embedding
image_processed_callback=advance)
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/image_embedder.py", line 109, in __call__
return self.from_table(*args, **kwargs)
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/image_embedder.py", line 142, in from_table
file_paths_valid, image_processed_callback)
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 75, in from_file_paths
paths, repeats_count, image_processed_callback))
File "/Users/primoz/miniconda3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 120, in embedd_batch
embeddings = await asyncio.gather(*requests)
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 173, in _send_to_server
client, im, url, proc_callback)
File "/Users/primoz/PycharmProjects/orange3-imageanalytics/orangecontrib/imageanalytics/server_embedder.py", line 194, in _send_request
data=image
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 484, in post
trust_env=trust_env,
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 626, in request
trust_env=trust_env,
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 650, in send
trust_env=trust_env,
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 265, in _get_response
return await get_response(request)
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/middleware/redirect.py", line 31, in __call__
response = await get_response(request)
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/client.py", line 240, in get_response
await response.read()
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 996, in read
self._content = b"".join([part async for part in self.stream()])
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 996, in <listcomp>
self._content = b"".join([part async for part in self.stream()])
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 1007, in stream
async for chunk in self.raw():
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/models.py", line 1035, in raw
async for part in self._raw_stream:
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 192, in body_iter
event = await self.receive_event(stream_id, timeout)
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/dispatch/http2.py", line 206, in receive_event
data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
File "/Users/primoz/venv/orange/lib/python3.7/site-packages/httpx/concurrency/asyncio.py", line 114, in read
data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
File "/Users/primoz/miniconda3/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
return fut.result()
File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 640, in read
await self._wait_for_data('read')
File "/Users/primoz/miniconda3/lib/python3.7/asyncio/streams.py", line 460, in _wait_for_data
f'{func_name}() called while another coroutine is '
RuntimeError: read() called while another coroutine is already waiting for incoming data
My code looks like this:
async def embedd_batch(self, file_paths, n_repeats, proc_callback=None):
requests = []
async with httpx.AsyncClient(timeout=self.timeouts, base_url=self.server_url) as client:
for p in file_paths:
requests.append(self._send_to_server(p, n_repeats, proc_callback, client))
embeddings = await asyncio.gather(*requests)
return embeddings
async def __wait_until_released(self):
"""This function makes sure than not more that 100 requests are sent at the same time and also more than 100 images are not loaded to memory"""
while self.num_parallel_requests >= self.MAX_PARALLEL:
await asyncio.sleep(0.1)
async def _send_to_server(self, image, retry_n, proc_callback, client):
await self.__wait_until_released()
self.num_parallel_requests += 1
# load image
im = self._image_loader.load_image_bytes(image, self._im_size)
if im is None:
self.num_parallel_requests -= 1
return None
url = f"/image/{self._model}?machine={self.machine_id}" \
f"&session={self.session_id}&retry={retry_n}"
emb = await self._send_request(client, im, url, proc_callback)
self.num_parallel_requests -= 1
return emb
@staticmethod
async def _send_request(client, image, url, callback):
headers = {
'Content-Type': 'image/jpeg',
'Content-Length': str(len(image))
}
response = await client.post(
url,
headers=headers,
data=image
)
return response
As you can see from the code it is implemented that only max 100 requests are sent to server at the same time.
@florimondmanca is it an error or did I make a mistake when using httpx?
@PrimozGodec I'm not sure about the error you've got. We'd need to narrow down in which conditions this error occurs, as currently it is not very clear.
Also, I noticed you have custom logic to rate-limite the requests. Did you know that HTTPX's connection pooling already performed this kind of limiting, also with a 100 hard limit by default? You could leverage this by using a lazy (i.e. streaming) request body. This would also allow you not to have to deal with Content-Length etc:
async def send_request(...):
async def body():
yield load_image(...)
await client.post(url, headers=headers, data=body())
...
Can you see if your code can be adapted to use this idea, and then whether the asyncio error is still present?
@florimondmanca thank you a lot for your suggestion and fast reply. I am awarer of the HTTPX limit of the number of requests but I made a custom limit since I didn't want that all images load in memory at the same time. Your idea of lazy loading is very good but the thing is that I removed the part of the code from _send_to_server function. Since we are performing a local caching we need to load an image to decide whether to send it to the server or not. If an image is already in a local cache, we do not send it to the server (do not call _ send_request function). If you want to see complete code it is here: https://github.com/biolab/orange3-imageanalytics/blob/2940bb04c1e3d8169c6dee08986377d51c42f460/orangecontrib/imageanalytics/server_embedder.py
I would really like to use lazy loading that you suggested but still, I do not want to load images twice. I know that the solution I implemented is not the best but still, currently, I do not have the other idea.
I see… If you are able to narrow down the situation when the error occurs to a small reproducible example, I suggest you'd open a separate issue so that we can more easily track this other edge case. (Although it's the same error, this particular issue was related to an edge case on Python 3.6.) Thanks!
Yep, right now I am trying to make a standalone smallest example to reproduce the error.
I was getting a similar error while trying to use the same AsyncClient, to make multiple requests in parallel. Currently, this is not supported (see https://github.com/encode/httpx/pull/52). httpx.AsyncClient is NOT the same as aiohttp.ClientSession.
I was doing something like this:
async def print_content_from_url(url, client):
res = await client.get(url)
bytes_res = await res.read()
str_res = bytes_res.decode()
print(str_res)
async def main():
urls = ['https://iluxonchik.me', 'https://iluxonchik.github.io', 'https://google.com']*100
async with httpx.AsyncClient() as client:
await asyncio.gather(*(print_content_from_url(url, client) for url in urls))
asyncio.run(main())
Running the code above results in the following exception:
RuntimeError: read() called while another coroutine is already waiting for incoming data
Changing the code to the following fixes the problem:
async def print_content_from_url(url):
async with httpx.AsyncClient() as client:
res = await client.get(url)
bytes_res = await res.read()
print(bytes_res)
async def main():
urls = ['https://iluxonchik.me', 'https://iluxonchik.github.io', 'https://google.com']*100
await asyncio.gather(*(print_content_from_url(url) for url in urls))
asyncio.run(main())
Again, keep a close look on https://github.com/encode/httpx/pull/52. If you're reading this in the future (hello 2020 and beyond 👋), you might already have an alternative solution to use.
@iluxonchik Can we continue discussion on this in #527?
Also, I think you don't need to .read() the response and decode it yourelf. You can just use res.text, which will take care of applying appropriate decoders.