Aiohttp: Websocket reading message loop raises low-level CanceledError when connection is closed unexpectedly

Created on 6 Jul 2017  路  4Comments  路  Source: aio-libs/aiohttp

Actual behaviour

Reading message loop async for msg in ws: raises low-level concurrent.futures._base.CancelledError when connection is closed unexpectedly.

Expected behaviour

Expected to get message with type aiohtto.http_websocket.WSMsgType.ERROR, or silently stop the loop, or at least aiohtto.http_websocket.WebSocketError.

Steps to reproduce

Run the following two scripts server.py and client.py, then stop client.py by Ctrl+C.

server.py

import logging

from aiohttp import web


logger = logging.getLogger(__name__)


async def index(request):
    ws = web.WebSocketResponse()
    request.app['websockets'].add(ws)

    try:
        await ws.prepare(request)
        logger.debug('Connected')
        async for msg in ws:
            logger.info('Received: %r', msg.data)
    except Exception:
        logger.exception('Error')
    logger.debug('Disconnected')

    request.app['websockets'].discard(ws)
    return ws


async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close()
    app['websockets'].clear()


def main():
    logging.basicConfig(level=logging.DEBUG)

    app = web.Application()
    app['websockets'] = set()
    app.router.add_get('/', index)
    app.on_shutdown.append(on_shutdown)

    web.run_app(app, host='127.0.0.1', port=9000)


if __name__ == '__main__':
    main()

client.py

import asyncio

import aiohttp


async def communicate(loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.ws_connect('http://127.0.0.1:9000') as ws:
            while True:
                await ws.send_str('Hello')
                await asyncio.sleep(1, loop=loop)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(communicate(loop))


if __name__ == '__main__':
    main()

Log output of server.py

$ python server.py 
DEBUG:asyncio:Using selector: EpollSelector
======== Running on http://127.0.0.1:9000 ========
(Press CTRL+C to quit)
DEBUG:__main__:Connected
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
ERROR:__main__:Error
Traceback (most recent call last):
  File "server.py", line 16, in index
    async for msg in ws:
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 343, in __anext__
    msg = yield from self.receive()
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 273, in receive
    msg = yield from self._reader.read()
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 627, in read
    return (yield from super().read())
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 509, in read
    yield from self._waiter
  File "/usr/lib64/python3.5/asyncio/futures.py", line 380, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib64/python3.5/asyncio/tasks.py", line 304, in _wakeup
    future.result()
  File "/usr/lib64/python3.5/asyncio/futures.py", line 285, in result
    raise CancelledError
concurrent.futures._base.CancelledError
DEBUG:__main__:Disconnected
INFO:aiohttp.access:- - - [06/Jul/2017:11:41:25 +0000] "GET / HTTP/1.1" 101 0 "-" "Python/3.5 aiohttp/2.2.3"

Your environment

OS: CentOS Linux 7
Linux kernel: 3.10.0-514.16.1.el7.x86_64
Python: 3.5.3
aiohttp: 2.2.3

Most helpful comment

Maybe it's better to introduce a separate ConnectionClosed exception, in the same way as was done in websockets library?

All 4 comments

Technically aiohttp creates a task per client request.
On client disconnection the system stops the task ASAP.
The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).

Task.cancel() is done by sending asyncio.CancelledError exception, the exception class is derived from standard Exception. This is asyncio behavior, nothing specific to aiohttp itself.

The only thing I could suggest is catching CancelledError in your handler explicitly:

try:
    ...
except asyncio.CancelledError:
    pass
except Exception as exc:
    log(exc)

Or you could just don't catch so broad type like Exception.

I see two options:

  • do not change anything, CancelledError is normal in async world.
  • catch CancelledError and return closed message. I think, this is better solution for webocket handler.

Maybe it's better to introduce a separate ConnectionClosed exception, in the same way as was done in websockets library?

Technically aiohttp creates a task per client request.
On client disconnection the system stops the task ASAP.
The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).

I think I have just been bitten by this behavior. I had some code like this:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async with contextlib.AsyncExitStack() as stack:
        # acquire_resource_X are async context managers
        await stack.enter_async_context(acquire_resource_1())
        await stack.enter_async_context(acquire_resource_2())
        await stack.enter_async_context(acquire_resource_3())

        async for msg in ws:
            # do stuff

    await ws.close()

    return ws

After putting it in production I found that the exiting part of acquire_resource_3() would be silently skipped. More logging revealed that a CancelledError was being raised inside acquire_resource_3. Here's what I think happened:

  • Client closes the WebSocket
  • async for msg in ws loop exits, the AsyncExitStack starts to unwind, the exiting part of acquire_resource_3 starts to execute, hits an await
  • aiohttp cancels the handler task
  • CancelledError is raised inside handler at the current await, which is inside acquire_resource_3, therefore the remaining part of acquire_resource_3 is skipped
  • The exiting part of acquire_resource_2 and acquire_resource_1 still executes normally, since from their perspective they are simply exiting an async context on an exception

This is a really weird problem, particularly because how it breaks the expectation that the exiting part of a context manager will always run. I had to basically shield all the async contexts from cancellation, like this:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    await asyncio.shield(asyncio.ensure_future(actually_do_stuff(ws)))

    return ws

async def actually_do_stuff(ws):
    async with contextlib.AsyncExitStack() as stack:
        # acquire_resource_X are async context managers
        await stack.enter_async_context(acquire_resource_1())
        await stack.enter_async_context(acquire_resource_2())
        await stack.enter_async_context(acquire_resource_3())

        async for msg in ws:
            # do stuff

    await ws.close()

Is there a better way to do this?

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jonringer picture jonringer  路  4Comments

JulienPalard picture JulienPalard  路  3Comments

amsb picture amsb  路  3Comments

rckclmbr picture rckclmbr  路  5Comments

zhmiao picture zhmiao  路  3Comments