I can't know whether a connection is available?
websocket connection is closing.
socket.send() raised exception.
async def worker(url, msg_queue, loop, on_message=None):
async with aiohttp.ClientSession(loop=loop) as session,\
session.ws_connect(url) as ws:
close_flag = False
async def bg_sending():
nonlocal close_flag
try:
while True:
if close_flag is True:
break
msg = await msg_queue.get()
if isinstance(msg, dict):
await ws.send_json(msg)
elif msg == "close":
print("WS: GOT SELF CLOSE")
await ws.close()
close_flag = True
break
except Exception as e:
close_flag = True
raise e
async def bg_receive():
nonlocal close_flag
try:
async for msg in ws:
if close_flag is True:
break
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
close_flag = True
print("WS: ACCEPT CLOSE")
_thread.interrupt_main() #kill is not work here
break
else:
if on_message is not None:
res = on_message(msg.data)
if res is not None:
ws.send_str(res)
elif msg.type == aiohttp.WSMsgType.CLOSED:
print("WS: CONNECTION IS CLOSED")
close_flag = True
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WS: ERROR:{msg}")
close_flag = True
raise Exception(str(msg))
except Exception as e:
close_flag = True
raise e
bg_send_task = asyncio.ensure_future(bg_sending(), loop=loop)
bg_receive_task = asyncio.ensure_future(bg_receive(), loop=loop)
while close_flag is False:
await asyncio.sleep(1)
if not bg_send_task.done():
bg_send_task.cancel()
if not bg_receive_task.done():
bg_receive_task.cancel()
if not bg_receive_task.cancelled() and bg_receive_task.exception() is not None:
print("WS: RECEIVE EXCEPTION")
raise bg_receive_task.exception()
if not bg_send_task.cancelled() and bg_send_task.exception() is not None:
print("WS: SEND EXCEPTION")
raise bg_send_task.exception()
If there is a error, an Exception should be raise.
It is a warning? not Exception raised
wait a long time, and websocket send_json failed with this warning
Ubuntu 16.04
Anaconda 4.3.1
Python 3.6.1
aiohttp 2.0.4
Without looking at the example in detail I suspect you're getting CancelledError which aiohttp is then catching silently.
What's weird and more problematic for me is that (perhaps because it's CancelledError or perhaps because of the way it's raised) any awaited coroutine after it's raised gets cancelled too. This is causing me to leak connections from the asyncpg connection pool, see https://github.com/MagicStack/asyncpg/issues/97.
If you put
try:
...
except Exception as e:
print(e.__class__.__name__, e)
raise
Around your code you'll see the error.
maybe we beed to add signal for disconnection or something
That would be useful, this confused me for a while and has required a fix in asyncpg.
I re-read issue, i need more information. what do you get? warning? error?
I can not repreduce this bug now, I've made some change, add ws.closed test
@fafhrd91 Now I just test weather the connection is closed silently, I raised my own Exception in this case.
The connection is always exit with error code 258, I've found nothing about it.
I got this issue as same. Basically when the remote connection is closing, the async for msg in ws will end silently as if you have exhausted the iterable ws. Comparing to another python websocket library "websockets", the async for msg in ws in websockets will raise an ConnectionClosed error if the remote is closing.
In my case, most causes is closing action (cancel request) by client, raised asyncio.CancelledError Exception. You can catch exception like this:
try:
async for msgobj in ws:
....
except asyncio.CancelledError:
print(f"the websocket({ws}) cancelled")
finally:
await ws.close()
Actually I’ve already caught the CancelledError outside the for loop.
On Mar 17, 2019, at 9:11 PM, Michael notifications@github.com wrote:
In my case, most causes is closing action (cancel request) by client, raised asyncio.CancelledError Exception. You can catch exception like this:
try:
async for msgobj in ws:
....
except asyncio.CancelledError:
printf(f"the websocket({ws}) cancelled")
finally:
await conn.close('finally')
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub https://github.com/aio-libs/aiohttp/issues/1768#issuecomment-473763852, or mute the thread https://github.com/notifications/unsubscribe-auth/AA88ztiGsfmLAms2pD5okSUdu_K96_DYks5vXxIHgaJpZM4MspxH.
I have the code like this:
class A:
async def _start(self):
async with self.session.ws_connect("...") as ws:
async for msg in ws:
pass
while True:
try:
await self._start()
logger.warning("Loop exit abnormally")
except CancelledError:
logger.warning(f"WS exiting because of cancellation")
break
except Exception as e:
logger.critical(f"WS unrecoverable exception: {type(e)}: {e}")
traceback.print_exception(*sys.exc_info())
break
And in log it always complains "Loop exit abnormally"
Maybe you can try to check more type of wsobj, such as:
try:
async for msgobj in ws:
if msgobj.type == aiohttp.WSMsgType.TEXT:
if msgobj.data:
await conn.handle_msg_here(..)
elif msgobj.type == aiohttp.WSMsgType.PING:
ws.pong()
elif msgobj.type == aiohttp.WSMsgType.CLOSED:
logging.warn(f"ws: Closed the websocket {ws} ")
break
elif msgobj.type == aiohttp.WSMsgType.ERROR:
break
except asyncio.CancelledError:
print(f"the websocket({ws}) cancelled")
finally:
await ws.close()
Maybe you can try to check more type of wsobj, such as:
try: async for msgobj in ws: if msgobj.type == aiohttp.WSMsgType.TEXT: if msgobj.data: await conn.handle_msg_here(..) elif msgobj.type == aiohttp.WSMsgType.PING: ws.pong() elif msgobj.type == aiohttp.WSMsgType.CLOSED: logging.warn(f"ws: Closed the websocket {ws} ") break elif msgobj.type == aiohttp.WSMsgType.ERROR: break except asyncio.CancelledError: print(f"the websocket({ws}) cancelled") finally: await ws.close()
Interestingly, I have something like
async for msg in ws:
if msg.type != WSMsgType.TEXT:
logger.warning(f"Non text ws message: {msg.data}")
continue
But I never see anything related in the log which suggests the for loop exits silently.
By looking into the source code, I found this in aiohttp
class ClientWebSocketResponse:
...
async def __anext__(self) -> WSMessage:
msg = await self.receive()
if msg.type in (WSMsgType.CLOSE,
WSMsgType.CLOSING,
WSMsgType.CLOSED):
raise StopAsyncIteration # NOQA
return msg
https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_ws.py#L295
I think this perfectly solved this problem: aiohttp WON'T inform you about the websocket closing if you are using async for.
The problem for me (and probably in this case) is that you have another thread (bg_sending) that is trying to send over the websocket after the bg_receive function.
The websocket connection is closing. error is actually coming from https://github.com/aio-libs/aiohttp/blob/master/aiohttp/http_websocket.py#L555 - this is happening from bg_sending, which explains why you aren't seeing the exception from within the async for in bg_receive.
It _does_ inform you of the websocket connection closing by virtue of exiting the loop.
I know this is years late and I doubt it's hopeful to the OP but it might be useful to the debugging efforts of others as the other comments on this thread have been for me! :)
Your bg_sending thread MIGHT be throwing the exception on send(), but you're only logging it if not bg_send_task.cancelled(), AND bg_send_task.exception() is not None.
The exception might not be None, however just a few lines before you call bg_send_task.cancel() and cancel it. So this condition will never be true and nothing will be logged.
I think the exception to catch is concurrent.futures.CancelledError, which is different from asyncio.CancelledError.
Most helpful comment
By looking into the source code, I found this in aiohttp
https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_ws.py#L295
I think this perfectly solved this problem: aiohttp WON'T inform you about the websocket closing if you are using
async for.