Reading message loop async for msg in ws: raises low-level concurrent.futures._base.CancelledError when connection is closed unexpectedly.
Expected to get message with type aiohtto.http_websocket.WSMsgType.ERROR, or silently stop the loop, or at least aiohtto.http_websocket.WebSocketError.
Run the following two scripts server.py and client.py, then stop client.py by Ctrl+C.
import logging
from aiohttp import web
logger = logging.getLogger(__name__)
async def index(request):
ws = web.WebSocketResponse()
request.app['websockets'].add(ws)
try:
await ws.prepare(request)
logger.debug('Connected')
async for msg in ws:
logger.info('Received: %r', msg.data)
except Exception:
logger.exception('Error')
logger.debug('Disconnected')
request.app['websockets'].discard(ws)
return ws
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close()
app['websockets'].clear()
def main():
logging.basicConfig(level=logging.DEBUG)
app = web.Application()
app['websockets'] = set()
app.router.add_get('/', index)
app.on_shutdown.append(on_shutdown)
web.run_app(app, host='127.0.0.1', port=9000)
if __name__ == '__main__':
main()
import asyncio
import aiohttp
async def communicate(loop):
async with aiohttp.ClientSession(loop=loop) as session:
async with session.ws_connect('http://127.0.0.1:9000') as ws:
while True:
await ws.send_str('Hello')
await asyncio.sleep(1, loop=loop)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(communicate(loop))
if __name__ == '__main__':
main()
server.py$ python server.py
DEBUG:asyncio:Using selector: EpollSelector
======== Running on http://127.0.0.1:9000 ========
(Press CTRL+C to quit)
DEBUG:__main__:Connected
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
ERROR:__main__:Error
Traceback (most recent call last):
File "server.py", line 16, in index
async for msg in ws:
File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 343, in __anext__
msg = yield from self.receive()
File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 273, in receive
msg = yield from self._reader.read()
File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 627, in read
return (yield from super().read())
File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 509, in read
yield from self._waiter
File "/usr/lib64/python3.5/asyncio/futures.py", line 380, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib64/python3.5/asyncio/tasks.py", line 304, in _wakeup
future.result()
File "/usr/lib64/python3.5/asyncio/futures.py", line 285, in result
raise CancelledError
concurrent.futures._base.CancelledError
DEBUG:__main__:Disconnected
INFO:aiohttp.access:- - - [06/Jul/2017:11:41:25 +0000] "GET / HTTP/1.1" 101 0 "-" "Python/3.5 aiohttp/2.2.3"
OS: CentOS Linux 7
Linux kernel: 3.10.0-514.16.1.el7.x86_64
Python: 3.5.3
aiohttp: 2.2.3
Technically aiohttp creates a task per client request.
On client disconnection the system stops the task ASAP.
The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).
Task.cancel() is done by sending asyncio.CancelledError exception, the exception class is derived from standard Exception. This is asyncio behavior, nothing specific to aiohttp itself.
The only thing I could suggest is catching CancelledError in your handler explicitly:
try:
...
except asyncio.CancelledError:
pass
except Exception as exc:
log(exc)
Or you could just don't catch so broad type like Exception.
I see two options:
CancelledError is normal in async world.CancelledError and return closed message. I think, this is better solution for webocket handler.Maybe it's better to introduce a separate ConnectionClosed exception, in the same way as was done in websockets library?
Technically aiohttp creates a task per client request.
On client disconnection the system stops the task ASAP.
The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).
I think I have just been bitten by this behavior. I had some code like this:
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async with contextlib.AsyncExitStack() as stack:
# acquire_resource_X are async context managers
await stack.enter_async_context(acquire_resource_1())
await stack.enter_async_context(acquire_resource_2())
await stack.enter_async_context(acquire_resource_3())
async for msg in ws:
# do stuff
await ws.close()
return ws
After putting it in production I found that the exiting part of acquire_resource_3() would be silently skipped. More logging revealed that a CancelledError was being raised inside acquire_resource_3. Here's what I think happened:
async for msg in ws loop exits, the AsyncExitStack starts to unwind, the exiting part of acquire_resource_3 starts to execute, hits an awaitaiohttp cancels the handler taskCancelledError is raised inside handler at the current await, which is inside acquire_resource_3, therefore the remaining part of acquire_resource_3 is skippedacquire_resource_2 and acquire_resource_1 still executes normally, since from their perspective they are simply exiting an async context on an exceptionThis is a really weird problem, particularly because how it breaks the expectation that the exiting part of a context manager will always run. I had to basically shield all the async contexts from cancellation, like this:
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
await asyncio.shield(asyncio.ensure_future(actually_do_stuff(ws)))
return ws
async def actually_do_stuff(ws):
async with contextlib.AsyncExitStack() as stack:
# acquire_resource_X are async context managers
await stack.enter_async_context(acquire_resource_1())
await stack.enter_async_context(acquire_resource_2())
await stack.enter_async_context(acquire_resource_3())
async for msg in ws:
# do stuff
await ws.close()
Is there a better way to do this?
Most helpful comment
Maybe it's better to introduce a separate ConnectionClosed exception, in the same way as was done in websockets library?