Hi, this is a duplicate of this question.
I'm trying to test new channels 2.0 with pytest-asyncio (0.8.0). If I place different assertions in the same function like:
import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer
@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
path = 'foo'
communicator = WebsocketCommunicator(MyConsumer, path)
connected, subprotocol = await communicator.connect()
assert connected
return communicator
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
communicator = await setup_database_and_websocket()
sent = {"message": 'abc'}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
communicator = await setup_database_and_websocket()
sent = {"message": 1}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
then I'm not getting an error. But if I separate test cases like:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
communicator = await setup_database_and_websocket()
sent = {"message": 'abc'}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
communicator = await setup_database_and_websocket()
sent = {"message": 1}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
then I'm getting following error upon the second receive_form call:
with pytest.raises(TimeoutError):
> await communicator.receive_from()
someapp/tests/test_consumers_async.py:106:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError
Also if I do (as in https://channels.readthedocs.io/en/latest/topics/testing.html):
await communicator.disconnect()
instead of:
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
then the following error is raised:
> await communicator.disconnect()
someapp/tests/test_consumers_async.py:96:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}
def websocket_disconnect(self, message):
"""
Called when a WebSocket connection is closed. Base level so you don't
need to call super() all the time.
"""
# TODO: group leaving
> self.disconnect(message["code"])
E TypeError: disconnect() takes 1 positional argument but 2 were given
What should I do to separate those test cases in the respective individual test functions?
What version of all the Channels packages (daphne, channels, asgiref, channels_redis) are you running? Event loop is closed should have been fixed by channels_redis 2.0.3.
As for the disconnect error, it looks like your disconnect function does not take a code argument - its signature should be def disconnect(self, code):
I'm with channels cloned from github repo (master) (pip install git+git://github.com/django/channels.git@master).
$pip freeze
aioredis==1.1.0
asgiref==2.1.5
async-generator==1.9
async-timeout==2.0.0
autobahn==17.10.1
Automat==0.6.0
channels==2.0.2
channels-redis==2.0.3
daphne==2.0.3
Django==2.0.2
django-redis==4.8.0
hiredis==0.2.0
msgpack==0.5.4
pytest==3.4.0
pytest-asyncio==0.8.0
pytest-django==3.1.2
redis==2.10.6
The rest of redis packages are installed for the cache purpose.
This error with disconnect is strange as generic/websocket.py has correct signature:
def websocket_disconnect(self, message):
"""
Called when a WebSocket connection is closed. Base level so you don't
need to call super() all the time.
"""
# TODO: group leaving
self.disconnect(message["code"])
raise StopConsumer()
def disconnect(self, code):
"""
Called when a WebSocket connection is closed.
"""
pass
That is very strange. I'd try sticking a debugger in where it fails and making sure the method signature is absolutely, definitely, what you think it is, and there's not bytecode caching or something going on.
As for the Event loop is closed problem, I won't be able to look into that until I can get a small failing example to reproduce (there's no way I can go by the traceback on this one). I'll leave this ticket open for a bit if you want to try and do that.
My bad for disconnect error, I forgot to add the positional argument in subclassed method:
from channels.generic.websocket import WebsocketConsumer
class MyConsumer(WebsocketConsumer):
#
def disconnect(self, code):
AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)
And about Event loop is closed ?'m still getting that error and so still searching for a bugfix, I'll post here whatever happens with that.
So I've updated some channels packages (channels is on the latest master):
daphne==2.0.4
channels-redis==2.1.0
msgpack==0.5.5
and I refactored my test modules. As before, invocation of single test function always passes.
And for the whole modules calls, things are going like this:
If there are 4 or fewer test functions then all of them would pass.
If there are five of them then _sometimes_ the last of them fails with TimeoutError and with the old "Event loop is closed".
If there are more then 5 test functions then exactly 2 of them would fail. If I comment out one of those failing then some other 2 would fail. If I set with pytest.raises(TimeoutError): on one of the failing tests then it fails with "Error not raised" and all of the others test functions pass.
If I call the whole test suite with all the modules then one third of websocket tests fail.
The output of the case when 2 tests failed:
(my_env) kradem@i5-4570:~/dev/some_repo/project$ pt someapp/tests/test_async_sample_consumer.py
============================================== test session starts ==============================================
platform linux -- Python 3.6.4, pytest-3.4.1, py-1.5.2, pluggy-0.6.0 -- /home/kradem/.virtualenvs/my_env/bin/python3.6
cachedir: .pytest_cache
metadata: {'Python': '3.6.4', 'Platform': 'Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial', 'Packages': {'pytest': '3.4.1', 'py': '1.5.2', 'pluggy': '0.6.0'}, 'Plugins': {'xdist': '1.22.1', 'ordering': '0.5', 'metadata': '1.6.0', 'forked': '0.2', 'django': '3.1.2', 'cov': '2.5.1', 'asyncio': '0.8.0'}, 'JAVA_HOME': '/usr/lib/jvm/java-8-oracle'}
Django settings: project.settings.development (from environment variable)
rootdir: /home/kradem/dev/some_repo/project, inifile:
plugins: xdist-1.22.1, ordering-0.5, metadata-1.6.0, forked-0.2, django-3.1.2, cov-2.5.1, asyncio-0.8.0
collected 13 items
someapp/tests/test_async_sample_consumer.py::test_sample_1 PASSED [ 7%]
someapp/tests/test_async_sample_consumer.py::test_sample_2 PASSED [ 15%]
someapp/tests/test_async_sample_consumer.py::test_sample_3 PASSED [ 23%]
someapp/tests/test_async_sample_consumer.py::test_sample_4 PASSED [ 30%]
someapp/tests/test_async_sample_consumer.py::test_sample_5 PASSED [ 38%]
someapp/tests/test_async_sample_consumer.py::test_sample_6 PASSED [ 46%]
someapp/tests/test_async_sample_consumer.py::test_sample_7 PASSED [ 53%]
someapp/tests/test_async_sample_consumer.py::test_sample_8 FAILED [ 61%]
someapp/tests/test_async_sample_consumer.py::test_sample_9 Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:169> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7d207d5a68>()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Exception ignored in: <coroutine object RedisChannelLayer.receive at 0x7f7d22ff3200>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 173, in receive
task.cancel()
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive_loop() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:187> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive_loop at 0x7f7d22ff32b0>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 187, in receive_loop
real_channel, message = await self.receive_single(general_channel)
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 236, in receive_single
return channel, message
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 407, in __aexit__
self.conn.close()
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/commands/__init__.py", line 54, in close
self._pool_or_conn.close()
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 352, in close
self._do_close(ConnectionForcedCloseError())
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 359, in _do_close
self._writer.transport.close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py:181> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7d207d5b28>()]> cb=[Future.set_result()]>
Task was destroyed but it is pending!
task: <Task pending coro=<AsyncConsumer.__call__() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object AsyncConsumer.__call__ at 0x7f7d24103990>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py", line 53, in await_many_dispatch
task.cancel()
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/lib/python3.6/asyncio/queues.py:167> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
FAILED [ 69%]
someapp/tests/test_async_sample_consumer.py::test_sample_10 PASSED [ 76%]
someapp/tests/test_async_sample_consumer.py::test_sample_11 PASSED [ 84%]
someapp/tests/test_async_sample_consumer.py::test_sample_12 PASSED [ 92%]
someapp/tests/test_async_sample_consumer.py::test_sample_13 PASSED [100%]
=================================================== FAILURES ====================================================
________________________ test_sample_8 ________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f7d20726e10>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f7d20700390 maxsize=0 tasks=1>
@coroutine
def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> yield from getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.6/asyncio/queues.py:167: CancelledError
During handling of the above exception, another exception occurred:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_sample_8():
communicator = await setup_database_and_sample_websocket()
sent = {"sample": "word"}
await communicator.send_json_to(sent)
> response = await communicator.receive_json_from()
someapp/tests/test_async_sample_consumer.py:144:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: in receive_output
return await self.output_queue.get()
/usr/lib/python3.6/asyncio/coroutines.py:212: in coro
res = func(*args, **kw)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7f7d2070f390>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type):
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
____________________________ test_sample_9 _____________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f7d205b4128>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f7d205b4470 maxsize=0 tasks=1>
@coroutine
def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> yield from getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.6/asyncio/queues.py:167: CancelledError
During handling of the above exception, another exception occurred:
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f7d205b4128>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (<async_timeout.timeout object at 0x7f7d207006d8>, <class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7f7d20b79608>)
kw = {}
@functools.wraps(func)
def coro(*args, **kw):
> res = func(*args, **kw)
/usr/lib/python3.6/asyncio/coroutines.py:212:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7f7d207006d8>
exc_type = <class 'concurrent.futures._base.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7f7d20b79608>
@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
> self._do_exit(exc_type)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7f7d207006d8>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type):
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
During handling of the above exception, another exception occurred:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_sample_9():
communicator = await setup_database_and_sample_websocket()
sent = {"sample": ((0, 0), '', '')}
await communicator.send_json_to(sent)
> response = await communicator.receive_json_from()
someapp/tests/test_async_sample_consumer.py:155:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py:47: in await_many_dispatch
result = task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:167: in receive
task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:186: in receive_loop
async with self.check_receive_lock():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <channels_redis.core.RedisChannelLayer object at 0x7f7d230b1f28>
def check_receive_lock(self):
"""
Returns the receive lock, doing current-loop checking.
"""
loop = asyncio.get_event_loop()
if self.receive_lock_loop is None:
# Lock was not yet populated. Populate it!
self.receive_lock_loop = loop
self.receive_lock = asyncio.Lock()
elif self.receive_lock_loop != loop:
# See if the lock is locked
if self.receive_lock.locked():
> raise RuntimeError("Two event loops are trying to receive() on one channel layer at once!")
E RuntimeError: Two event loops are trying to receive() on one channel layer at once!
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:204: RuntimeError
====================================== 2 failed, 11 passed in 3.73 seconds ======================================
(my_env) kradem@i5-4570:~/dev/some_repo/project$
@kradem i don't think this is your issue but why have you decorated the setup_database_and_websocket function they are just called with plain old await from all of your other test cases?
@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
Yep, I left that after multiple refactoring. I removed those decorators, but, as you predicted, it's not the issue here. Tnx for pointing that out.
hmm, could you share a gist.github.com with it all in one file?
It's a quite complex testing as database records have to be present to test the websockets code. I kind of tried to strip it down before I posted here but I gave up. I'll try with Mock and post here afterwards...
P.S. but I don't think it'll throw with Mock. :(
ok you're interacting with the db, is your consumer a subclass of Aync or Sync? is it possible that somewhere you are calling a db function and it is not wrapped in database_sync_to_async
It's a subclass of sync WebsocketConsumer and all calls are made with the help of AsyncToSync wrapper. In tests all db functions (two of them and they are mutually exclusive) are wrapped to database_sync_to_async.
def connect(self):
self.accept()
AsyncToSync(self.channel_layer.group_add)(
self.group_name, self.channel_name)
#
@property
def group_name(self):
return 'foo'
#
def receive_sample(self):
#
AsyncToSync(self.channel_layer.group_send)(
self.group_name,
{
"type": "client.send",
'text': json.dumps({
"sample": sample,
"sometype": 1,
})},
)
def client_send(self, event):
self.send(text_data=event["text"])
hmm where is your model code? queries/eg that hit the db.
you sync methods here will be being run inside sync_to_async and not inside database_sync_to_async but this should errors you are getting.
Above I stripped that down, the actual test code is like:
def get_sample():
try:
samples = Sample.objects.all()
if samples:
return samples[0]
except Sample.DoesNotExist:
return None
def create_sample():
name = str(uuid.uuid4())
user = User.objects.create_user(
username=name,
email='{}@test.com'.format(name)
)
sample = Sample.objects.create(
description='ws person',
profile=user.profile,
data=TEST_DATA,
)
return sample
async def setup_database_and_sample_websocket():
sample = await database_sync_to_async(get_sample)()
if sample is None:
sample = await database_sync_to_async(create_sample)()
path = '/sample/{}/'.format(sample.id)
communicator = WebsocketCommunicator(MyConsumer, path)
communicator.scope['user'] = sample.profile.user
communicator.scope['session'] = {}
connected, subprotocol = await communicator.connect()
assert connected
return communicator
are you leaving groups? if you're subscribed to them.
in the disconnect method
Yes:
def disconnect(self, code):
AsyncToSync(self.channel_layer.group_discard)(
self.group_name, self.channel_name)
can you try to use channels.layers.InMemoryChannelLayer if you haven't already.
What to put in the configuration? With:
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
"CONFIG": {
"expiry": 100500,
},
"TEST_CONFIG": {
"expiry": 100500,
},
},
}
or with:
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
},
}
a single test is frozen. Do I need an additional code change besides that settings value?
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
},
}
or
@override_settings(CHANNEL_LAYERS={
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
},
})
@pytest.....
async def test....
should be good enough.
I'm afraid I can't get my project to work with it. Settings file change makes page stuck on HANDSHAKING (manual testing), just like setting override decorator in tests (it just freezes, very possibly on the same handshaking process).
P.S. I had deleted all the .pyc files and __pycache__ folders.
that is interesting, this is with the latest master of this repo?
Yes, last time updated early today.
P.S. console says: "WebSocket connection to 'ws://127.0.0.1:8081/sample/1/' failed: WebSocket is closed before the connection is established."
you may have found multiple bugs ;)
can you give a rough overview of this HANDSHAKING prosses and how you are using the layers within it?
The following js code (chat.js) is from one of the apps:
var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
var ws_path = ws_scheme + '://' + window.location.host + window.location.pathname;
console.log("Connecting to " + ws_path);
var webSocketBridge = new channels.WebSocketBridge();
webSocketBridge.connect(ws_path);
webSocketBridge.listen(function(message) {
var which = 'row-other';
var row = '<div class="row col s12 ' + which + '">' +
'<span class="time" data-time="data.time">' + locTime(message.time) + '</span>' +
'<span class="body z-depth-2">' + message.message + '</span>' +
'</div>';
messages.prepend(row);
});
And at the end of template:
<script src="{% static 'channels/js/websocketbridge.js' %}"></script>
<script src="{% static 'js/chat.js' %}"></script>
Project routing file:
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter([
url("^sample/(?P<wspath>.+)/$", ChatConsumer),
])
),
})
P.S. consumer's code is in this thread above...
what methods have you overridden in your ChatConsumer
STOP THE PRESS!! I hadn't upgraded to master today, I did it just recently and channels.layers.InMemoryChannelLayer is working as expected. Still, all errors have remained the same.
Still, all errors have remained the same.
SILLY ME!! They are all working with InMemoryChannelLayer.
@hishnash THANK YOU, THANK YOU, THANK YOU!! It's somethiong with redis...
or something with how Redis is talking to channels...
I already mentioned I have installed django-redis (4.8.0) and redis (2.10.6) for the cache purpose. I'll try to remove cache backhand and test with redis again.
I already mentioned I have installed django-redis (4.8.0) and redis (2.10.6) for the cache purpose. I'll try to remove cache backhand and test with redis again.
I switched cache to memcache and reverted channel layer to redis, but all the error remains the same. So installed django-redis and redis aren't the issue here.
I'll switch to @override_settings and channels.layers.InMemoryChannelLayer for all my websockets tests.
Let me know if I may do something more with this; I'll surely try to revert back to redis in tests after every update of channels and/or redis related packages.
Well, the actual bug here is waaaay back and is this: RuntimeError: Two event loops are trying to receive() on one channel layer at once!
The question is, why. I presume what is happening is a consumer is still living on in the background (and trying to receive) as the next test is coming around.
Do all your tests that use the Communicator end with WebsocketCommunicator.disconnect()?
Do all your tests that use the Communicator end with WebsocketCommunicator.disconnect()?
Yep, the last line in every async pytest function is:
await communicator.disconnect()
Alright, then it may be an issue that the disconnect is not correctly waiting for the coroutines of the test consumers to exit.
Having a simple set of steps to reproduce is going to be the thing that makes this bug fixable - is there any subset of your tests you know fails if run one after the other and can simplify down?
I've just upgraded to latest master, aioredis and msgpack.
If the following is a test module:
def get_sample():
try:
samples = Sample.objects.all()
if samples:
return samples[0]
except Sample.DoesNotExist:
return None
def create_sample():
name = str(uuid.uuid4())
user = User.objects.create_user(
username=name,
email='{}@test.com'.format(name)
)
sample = Sample.objects.create(
description='ws person',
profile=user.profile,
data=TEST_DATA,
)
return sample
async def setup_database_and_sample_websocket():
sample = await database_sync_to_async(get_sample)()
if sample is None:
sample = await database_sync_to_async(create_sample)()
path = '/sample/{}/'.format(sample.id)
communicator = WebsocketCommunicator(DetailsConsumer, path)
communicator.scope['user'] = sample.profile.user
communicator.scope['session'] = {}
connected, subprotocol = await communicator.connect()
assert connected
return communicator
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_chat_returns_sent_for_valid_input():
communicator = await setup_database_and_sample_websocket()
await communicator.send_json_to({'message': 'sent text', 'ordinal': '1'})
response = await communicator.receive_json_from()
assert response['message'] == 'sent text'
assert response['ordinal'] == '1'
await communicator.disconnect()
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_chat_returns_sent_for_valid_input2():
communicator = await setup_database_and_sample_websocket()
await communicator.send_json_to({'message': 'sent text2', 'ordinal': '1'})
response = await communicator.receive_json_from()
assert response['message'] == 'sent text2'
assert response['ordinal'] == '1'
await communicator.disconnect()
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_chat_returns_sent_for_valid_input3():
communicator = await setup_database_and_sample_websocket()
await communicator.send_json_to({'message': 'sent text3', 'ordinal': '1'})
response = await communicator.receive_json_from()
assert response['message'] == 'sent text3'
assert response['ordinal'] == '1'
await communicator.disconnect()
then the third test would fail with:
(my_env) kradem@i5-4570:~/dev/my_repo/project$ pt someapp/tests/test_async_consumer.py
======================================== test session starts =========================================
platform linux -- Python 3.6.4, pytest-3.4.1, py-1.5.2, pluggy-0.6.0 -- /home/kradem/.virtualenvs/my_env/bin/python3.6
cachedir: .pytest_cache
metadata: {'Python': '3.6.4', 'Platform': 'Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial', 'Packages': {'pytest': '3.4.1', 'py': '1.5.2', 'pluggy': '0.6.0'}, 'Plugins': {'xdist': '1.22.1', 'ordering': '0.5', 'metadata': '1.6.0', 'forked': '0.2', 'django': '3.1.2', 'cov': '2.5.1', 'asyncio': '0.8.0'}, 'JAVA_HOME': '/usr/lib/jvm/java-8-oracle'}
Django settings: project.settings.development (from environment variable)
rootdir: /home/kradem/dev/my_repo/project, inifile:
plugins: xdist-1.22.1, ordering-0.5, metadata-1.6.0, forked-0.2, django-3.1.2, cov-2.5.1, asyncio-0.8.0
collected 3 items
someapp/tests/test_async_consumer.py::test_chat_returns_sent_for_valid_input PASSED [ 33%]
someapp/tests/test_async_consumer.py::test_chat_returns_sent_for_valid_input2 PASSED [ 66%]
someapp/tests/test_async_consumer.py::test_chat_returns_sent_for_valid_input3 FAILED [100%]
============================================== FAILURES ==============================================
______________________________ test_chat_returns_sent_for_valid_input3 _______________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f04201a96a0>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f04201a9cf8 maxsize=0 tasks=1>
@coroutine
def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> yield from getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.6/asyncio/queues.py:167: CancelledError
During handling of the above exception, another exception occurred:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_chat_returns_sent_for_valid_input3():
communicator = await setup_database_and_websocket()
await communicator.send_json_to({'message': 'sent text3', 'ordinal': '1'})
> response = await communicator.receive_json_from()
someapp/tests/test_async_consumer.py:118:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: in receive_output
return await self.output_queue.get()
/usr/lib/python3.6/asyncio/coroutines.py:212: in coro
res = func(*args, **kw)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7f04201a9828>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type):
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
-------------------------------------- Captured stdout teardown --------------------------------------
Destroying test database for alias 'default'...
================================= 1 failed, 2 passed in 1.94 seconds =================================
Task was destroyed but it is pending!
task: <Task pending coro=<AsyncConsumer.__call__() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object AsyncConsumer.__call__ at 0x7f0421942200>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py", line 53, in await_many_dispatch
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/lib/python3.6/asyncio/queues.py:167> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:169> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0420673d38>()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Exception ignored in: <coroutine object RedisChannelLayer.receive at 0x7f042067cbf8>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 173, in receive
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive_loop() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:187> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive_loop at 0x7f042067cba0>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 187, in receive_loop
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 236, in receive_single
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 407, in __aexit__
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/commands/__init__.py", line 54, in close
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 352, in close
File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 359, in _do_close
File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py:181> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0420673a08>()]> cb=[Future.set_result()]>
In another app and test case the fifth (edit: sorry, sometimes sixth or seventh...) test will fail (those tests are different, not like those "same" from above) no matter how I rearrange them from the top to bottom - and always the very next one, and always the rest of the suit will pass (just like the first four or five or six):
Edit: Actually I've managed to rearrange test functions in some smaller test modules (8 or fewer test functions) to pass almost always if single test module is called, by placing "critical" test functions on top.
However, if I call the whole test suite then the first module would pass and the "RuntimeError: Two event loops are trying to receive() on one channel layer at once!" would be raised on the first few test functions in the very next async test module.
____________________ test_async_1 ____________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7fb1cbd902e8>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7fb1cbdb55f8 maxsize=0 tasks=1>
@coroutine
def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> yield from getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.6/asyncio/queues.py:167: CancelledError
During handling of the above exception, another exception occurred:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_async_1():
communicator = await setup_database_and_person_websocket()
sent = {'words': []}
await communicator.send_json_to(sent)
> response = await communicator.receive_json_from()
someapp2/tests/test_async_person_consumer.py:138:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: in receive_output
return await self.output_queue.get()
/usr/lib/python3.6/asyncio/coroutines.py:212: in coro
res = func(*args, **kw)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7fb1cbdb8898>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type):
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
_________________ test_person_detail_websocket_words_returns_equal_length_collection _________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7fb1c84f7a20>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7fb1c84f7470 maxsize=0 tasks=1>
@coroutine
def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> yield from getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.6/asyncio/queues.py:167: CancelledError
During handling of the above exception, another exception occurred:
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7fb1c84f7a20>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (<async_timeout.timeout object at 0x7fb1e39b5780>, <class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7fb1e3d2c708>)
kw = {}
@functools.wraps(func)
def coro(*args, **kw):
> res = func(*args, **kw)
/usr/lib/python3.6/asyncio/coroutines.py:212:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7fb1e39b5780>
exc_type = <class 'concurrent.futures._base.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7fb1e3d2c708>
@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
> self._do_exit(exc_type)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7fb1e39b5780>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type):
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
During handling of the above exception, another exception occurred:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_person_detail_websocket_words_returns_equal_length_collection():
communicator = await setup_database_and_person_websocket()
sent = {'words': ['ferist', "secnode"]}
await communicator.send_json_to(sent)
> response = await communicator.receive_json_from()
someapp2/tests/test_async_person_consumer.py:149:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py:47: in await_many_dispatch
result = task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:167: in receive
task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:186: in receive_loop
async with self.check_receive_lock():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <channels_redis.core.RedisChannelLayer object at 0x7fb1e275ac88>
def check_receive_lock(self):
"""
Returns the receive lock, doing current-loop checking.
"""
loop = asyncio.get_event_loop()
if self.receive_lock_loop is None:
# Lock was not yet populated. Populate it!
self.receive_lock_loop = loop
self.receive_lock = asyncio.Lock()
elif self.receive_lock_loop != loop:
# See if the lock is locked
if self.receive_lock.locked():
> raise RuntimeError("Two event loops are trying to receive() on one channel layer at once!")
E RuntimeError: Two event loops are trying to receive() on one channel layer at once!
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:204: RuntimeError
-------------------------------------- Captured stdout teardown --------------------------------------
Destroying test database for alias 'default'...
================================= 2 failed, 6 passed in 3.44 seconds =================================
The error for the very next test that fails is:
someapp2/tests/test_async_person_consumer.py::test_async_sample_2 Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py:169> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb1cbe02228>()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Task was destroyed but it is pending!
task: <Task pending coro=<AsyncConsumer.__call__() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels/consumer.py:54> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive at 0x7fb1cbde89e8>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 173, in receive
task.cancel()
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive_loop() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py:187> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive_loop at 0x7fb1e26bf570>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 187, in receive_loop
real_channel, message = await self.receive_single(general_channel)
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 236, in receive_single
return channel, message
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 407, in __aexit__
self.conn.close()
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/commands/__init__.py", line 54, in close
self._pool_or_conn.close()
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/connection.py", line 352, in close
self._do_close(ConnectionForcedCloseError())
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/connection.py", line 359, in _do_close
self._writer.transport.close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object AsyncConsumer.__call__ at 0x7fb1e39b2db0>
Traceback (most recent call last):
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels/utils.py", line 53, in await_many_dispatch
task.cancel()
File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/lib/python3.6/asyncio/queues.py:167> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/connection.py:181> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb1cbe02618>()]> cb=[Future.set_result()]>
FAILED [100%]
Alright, I've managed to replicate it locally with that, thanks. Will investigate and fix soon.
I'm glad I was able to help. Best regards!
I have pushed up commits to the master branch of both asgiref and channels. If you could pull those down and install them both, that should fix the issue and I can close this (your consumer is throwing another error, I presume, the problem here was that error handling skipped cleanup for all the async stuff and left it in a mess)
Works like a charm! Now I'm able to test some project's functionalities I haven't tested since those part were in the early stage (on http, not websocket). Together with ChannelsLiveServerTestCase, my project is secured like Fort Knox! :)
Thank you guys for your work on this beautiful package!