Channels: Testing with pytest-asyncio raises RuntimeError: Event loop is closed

Created on 16 Feb 2018  路  38Comments  路  Source: django/channels

Hi, this is a duplicate of this question.

I'm trying to test new channels 2.0 with pytest-asyncio (0.8.0). If I place different assertions in the same function like:

import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer


@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
    communicator = await setup_database_and_websocket()
    sent = {"message": 'abc'}
    await communicator.send_json_to(sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

    communicator = await setup_database_and_websocket()
    sent = {"message": 1}
    await communicator.send_json_to(sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

then I'm not getting an error. But if I separate test cases like:

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
    communicator = await setup_database_and_websocket()
    sent = {"message": 'abc'}
    await communicator.send_json_to(sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
    communicator = await setup_database_and_websocket()
    sent = {"message": 1}
    await communicator.send_json_to(sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

then I'm getting following error upon the second receive_form call:

with pytest.raises(TimeoutError):
>           await communicator.receive_from()

someapp/tests/test_consumers_async.py:106: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

def _check_closed(self):
if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError

Also if I do (as in https://channels.readthedocs.io/en/latest/topics/testing.html):

await communicator.disconnect()

instead of:

await communicator.send_input({
    "type": "websocket.disconnect",
    "code": 1000,
})

then the following error is raised:

>       await communicator.disconnect()

someapp/tests/test_consumers_async.py:96: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
    await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
    await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
    return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
    return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
    result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
    return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
    handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}

    def websocket_disconnect(self, message):
        """
            Called when a WebSocket connection is closed. Base level so you don't
            need to call super() all the time.
            """
        # TODO: group leaving
>       self.disconnect(message["code"])
E       TypeError: disconnect() takes 1 positional argument but 2 were given

What should I do to separate those test cases in the respective individual test functions?

bug exadvanced

All 38 comments

What version of all the Channels packages (daphne, channels, asgiref, channels_redis) are you running? Event loop is closed should have been fixed by channels_redis 2.0.3.

As for the disconnect error, it looks like your disconnect function does not take a code argument - its signature should be def disconnect(self, code):

I'm with channels cloned from github repo (master) (pip install git+git://github.com/django/channels.git@master).

$pip freeze
aioredis==1.1.0
asgiref==2.1.5
async-generator==1.9
async-timeout==2.0.0
autobahn==17.10.1
Automat==0.6.0
channels==2.0.2
channels-redis==2.0.3
daphne==2.0.3
Django==2.0.2
django-redis==4.8.0
hiredis==0.2.0
msgpack==0.5.4
pytest==3.4.0
pytest-asyncio==0.8.0
pytest-django==3.1.2
redis==2.10.6

The rest of redis packages are installed for the cache purpose.

This error with disconnect is strange as generic/websocket.py has correct signature:

def websocket_disconnect(self, message):
    """
    Called when a WebSocket connection is closed. Base level so you don't
    need to call super() all the time.
    """
    # TODO: group leaving
    self.disconnect(message["code"])
    raise StopConsumer()

def disconnect(self, code):
    """
    Called when a WebSocket connection is closed.
    """
    pass

That is very strange. I'd try sticking a debugger in where it fails and making sure the method signature is absolutely, definitely, what you think it is, and there's not bytecode caching or something going on.

As for the Event loop is closed problem, I won't be able to look into that until I can get a small failing example to reproduce (there's no way I can go by the traceback on this one). I'll leave this ticket open for a bit if you want to try and do that.

My bad for disconnect error, I forgot to add the positional argument in subclassed method:

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
    #
    def disconnect(self, code):
        AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)

And about Event loop is closed ?'m still getting that error and so still searching for a bugfix, I'll post here whatever happens with that.

So I've updated some channels packages (channels is on the latest master):

daphne==2.0.4
channels-redis==2.1.0
msgpack==0.5.5

and I refactored my test modules. As before, invocation of single test function always passes.

And for the whole modules calls, things are going like this:

  • If there are 4 or fewer test functions then all of them would pass.

  • If there are five of them then _sometimes_ the last of them fails with TimeoutError and with the old "Event loop is closed".

  • If there are more then 5 test functions then exactly 2 of them would fail. If I comment out one of those failing then some other 2 would fail. If I set with pytest.raises(TimeoutError): on one of the failing tests then it fails with "Error not raised" and all of the others test functions pass.

  • If I call the whole test suite with all the modules then one third of websocket tests fail.

The output of the case when 2 tests failed:

(my_env) kradem@i5-4570:~/dev/some_repo/project$ pt someapp/tests/test_async_sample_consumer.py 
============================================== test session starts ==============================================
platform linux -- Python 3.6.4, pytest-3.4.1, py-1.5.2, pluggy-0.6.0 -- /home/kradem/.virtualenvs/my_env/bin/python3.6
cachedir: .pytest_cache
metadata: {'Python': '3.6.4', 'Platform': 'Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial', 'Packages': {'pytest': '3.4.1', 'py': '1.5.2', 'pluggy': '0.6.0'}, 'Plugins': {'xdist': '1.22.1', 'ordering': '0.5', 'metadata': '1.6.0', 'forked': '0.2', 'django': '3.1.2', 'cov': '2.5.1', 'asyncio': '0.8.0'}, 'JAVA_HOME': '/usr/lib/jvm/java-8-oracle'}
Django settings: project.settings.development (from environment variable)
rootdir: /home/kradem/dev/some_repo/project, inifile:
plugins: xdist-1.22.1, ordering-0.5, metadata-1.6.0, forked-0.2, django-3.1.2, cov-2.5.1, asyncio-0.8.0
collected 13 items                                                                                              

someapp/tests/test_async_sample_consumer.py::test_sample_1 PASSED   [  7%]
someapp/tests/test_async_sample_consumer.py::test_sample_2 PASSED [ 15%]
someapp/tests/test_async_sample_consumer.py::test_sample_3 PASSED          [ 23%]
someapp/tests/test_async_sample_consumer.py::test_sample_4 PASSED              [ 30%]
someapp/tests/test_async_sample_consumer.py::test_sample_5 PASSED                 [ 38%]
someapp/tests/test_async_sample_consumer.py::test_sample_6 PASSED [ 46%]
someapp/tests/test_async_sample_consumer.py::test_sample_7 PASSED [ 53%]
someapp/tests/test_async_sample_consumer.py::test_sample_8 FAILED [ 61%]
someapp/tests/test_async_sample_consumer.py::test_sample_9 Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:169> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7d207d5a68>()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Exception ignored in: <coroutine object RedisChannelLayer.receive at 0x7f7d22ff3200>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 173, in receive
    task.cancel()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive_loop() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:187> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive_loop at 0x7f7d22ff32b0>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 187, in receive_loop
    real_channel, message = await self.receive_single(general_channel)
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 236, in receive_single
    return channel, message
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 407, in __aexit__
    self.conn.close()
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/commands/__init__.py", line 54, in close
    self._pool_or_conn.close()
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 352, in close
    self._do_close(ConnectionForcedCloseError())
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 359, in _do_close
    self._writer.transport.close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py:181> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7d207d5b28>()]> cb=[Future.set_result()]>
Task was destroyed but it is pending!
task: <Task pending coro=<AsyncConsumer.__call__() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object AsyncConsumer.__call__ at 0x7f7d24103990>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py", line 53, in await_many_dispatch
    task.cancel()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/lib/python3.6/asyncio/queues.py:167> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
FAILED [ 69%]
someapp/tests/test_async_sample_consumer.py::test_sample_10 PASSED [ 76%]
someapp/tests/test_async_sample_consumer.py::test_sample_11 PASSED [ 84%]
someapp/tests/test_async_sample_consumer.py::test_sample_12 PASSED [ 92%]
someapp/tests/test_async_sample_consumer.py::test_sample_13 PASSED [100%]

=================================================== FAILURES ====================================================
________________________ test_sample_8 ________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f7d20726e10>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f7d20700390 maxsize=0 tasks=1>

    @coroutine
    def get(self):
        """Remove and return an item from the queue.

            If queue is empty, wait until an item is available.

            This method is a coroutine.
            """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               yield from getter
E               concurrent.futures._base.CancelledError

/usr/lib/python3.6/asyncio/queues.py:167: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db
    @pytest.mark.asyncio
    async def test_sample_8():
        communicator = await setup_database_and_sample_websocket()
        sent = {"sample": "word"}
        await communicator.send_json_to(sent)
>       response = await communicator.receive_json_from()

someapp/tests/test_async_sample_consumer.py:144: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
    payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
    response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: in receive_output
    return await self.output_queue.get()
/usr/lib/python3.6/asyncio/coroutines.py:212: in coro
    res = func(*args, **kw)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7f7d2070f390>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type):
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
____________________________ test_sample_9 _____________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f7d205b4128>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f7d205b4470 maxsize=0 tasks=1>

    @coroutine
    def get(self):
        """Remove and return an item from the queue.

            If queue is empty, wait until an item is available.

            This method is a coroutine.
            """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               yield from getter
E               concurrent.futures._base.CancelledError

/usr/lib/python3.6/asyncio/queues.py:167: CancelledError

During handling of the above exception, another exception occurred:

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f7d205b4128>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

args = (<async_timeout.timeout object at 0x7f7d207006d8>, <class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7f7d20b79608>)
kw = {}

    @functools.wraps(func)
    def coro(*args, **kw):
>       res = func(*args, **kw)

/usr/lib/python3.6/asyncio/coroutines.py:212: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7f7d207006d8>
exc_type = <class 'concurrent.futures._base.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7f7d20b79608>

    @asyncio.coroutine
    def __aexit__(self, exc_type, exc_val, exc_tb):
>       self._do_exit(exc_type)

../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7f7d207006d8>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type):
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db
    @pytest.mark.asyncio
    async def test_sample_9():
        communicator = await setup_database_and_sample_websocket()
        sent = {"sample": ((0, 0), '', '')}
        await communicator.send_json_to(sent)
>       response = await communicator.receive_json_from()

someapp/tests/test_async_sample_consumer.py:155: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
    payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
    response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
    self.future.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py:47: in await_many_dispatch
    result = task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:167: in receive
    task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:186: in receive_loop
    async with self.check_receive_lock():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <channels_redis.core.RedisChannelLayer object at 0x7f7d230b1f28>

    def check_receive_lock(self):
        """
            Returns the receive lock, doing current-loop checking.
            """
        loop = asyncio.get_event_loop()
        if self.receive_lock_loop is None:
            # Lock was not yet populated. Populate it!
            self.receive_lock_loop = loop
            self.receive_lock = asyncio.Lock()
        elif self.receive_lock_loop != loop:
            # See if the lock is locked
            if self.receive_lock.locked():
>               raise RuntimeError("Two event loops are trying to receive() on one channel layer at once!")
E               RuntimeError: Two event loops are trying to receive() on one channel layer at once!

../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:204: RuntimeError
====================================== 2 failed, 11 passed in 3.73 seconds ======================================
(my_env) kradem@i5-4570:~/dev/some_repo/project$

@kradem i don't think this is your issue but why have you decorated the setup_database_and_websocket function they are just called with plain old await from all of your other test cases?

@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():

Yep, I left that after multiple refactoring. I removed those decorators, but, as you predicted, it's not the issue here. Tnx for pointing that out.

hmm, could you share a gist.github.com with it all in one file?

It's a quite complex testing as database records have to be present to test the websockets code. I kind of tried to strip it down before I posted here but I gave up. I'll try with Mock and post here afterwards...

P.S. but I don't think it'll throw with Mock. :(

ok you're interacting with the db, is your consumer a subclass of Aync or Sync? is it possible that somewhere you are calling a db function and it is not wrapped in database_sync_to_async

It's a subclass of sync WebsocketConsumer and all calls are made with the help of AsyncToSync wrapper. In tests all db functions (two of them and they are mutually exclusive) are wrapped to database_sync_to_async.

def connect(self):
    self.accept()
    AsyncToSync(self.channel_layer.group_add)(
        self.group_name, self.channel_name)

#

@property
def group_name(self):
    return 'foo'

#

def receive_sample(self):
    #
    AsyncToSync(self.channel_layer.group_send)(
        self.group_name,
        {
            "type": "client.send",
            'text': json.dumps({
                "sample":  sample,
                "sometype": 1,
            })},
    )

def client_send(self, event):
    self.send(text_data=event["text"])

hmm where is your model code? queries/eg that hit the db.

you sync methods here will be being run inside sync_to_async and not inside database_sync_to_async but this should errors you are getting.

Above I stripped that down, the actual test code is like:

def get_sample():
    try:
        samples = Sample.objects.all()
        if samples:
            return samples[0]
    except Sample.DoesNotExist:
        return None


def create_sample():
    name = str(uuid.uuid4())
    user = User.objects.create_user(
        username=name,
        email='{}@test.com'.format(name)
    )
    sample = Sample.objects.create(
        description='ws person',
        profile=user.profile,
        data=TEST_DATA,
    )
    return sample


async def setup_database_and_sample_websocket():
    sample = await database_sync_to_async(get_sample)()
    if sample is None:
        sample = await database_sync_to_async(create_sample)()
    path = '/sample/{}/'.format(sample.id)
    communicator = WebsocketCommunicator(MyConsumer, path)
    communicator.scope['user'] = sample.profile.user
    communicator.scope['session'] = {}
    connected, subprotocol = await communicator.connect()
    assert connected

return communicator

are you leaving groups? if you're subscribed to them.

in the disconnect method

Yes:

def disconnect(self, code):
    AsyncToSync(self.channel_layer.group_discard)(
        self.group_name, self.channel_name)

can you try to use channels.layers.InMemoryChannelLayer if you haven't already.

What to put in the configuration? With:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer",
        "CONFIG": {
            "expiry": 100500,
        },
        "TEST_CONFIG": {
            "expiry": 100500,
        },
    },
}

or with:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer",
    },
}

a single test is frozen. Do I need an additional code change besides that settings value?

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer",
    },
}

or

 @override_settings(CHANNEL_LAYERS={
        "default": {
            "BACKEND": "channels.layers.InMemoryChannelLayer",
        },
    })
@pytest.....
async def test....

should be good enough.

I'm afraid I can't get my project to work with it. Settings file change makes page stuck on HANDSHAKING (manual testing), just like setting override decorator in tests (it just freezes, very possibly on the same handshaking process).

P.S. I had deleted all the .pyc files and __pycache__ folders.

that is interesting, this is with the latest master of this repo?

Yes, last time updated early today.

P.S. console says: "WebSocket connection to 'ws://127.0.0.1:8081/sample/1/' failed: WebSocket is closed before the connection is established."

you may have found multiple bugs ;)

can you give a rough overview of this HANDSHAKING prosses and how you are using the layers within it?

The following js code (chat.js) is from one of the apps:

var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
var ws_path = ws_scheme + '://' + window.location.host + window.location.pathname;
console.log("Connecting to " + ws_path); 
var webSocketBridge = new channels.WebSocketBridge();
webSocketBridge.connect(ws_path);

webSocketBridge.listen(function(message) {
    var which = 'row-other';
    var row = '<div class="row col s12 ' + which + '">' +
        '<span class="time" data-time="data.time">' + locTime(message.time) + '</span>' +
        '<span class="body z-depth-2">' + message.message + '</span>' +
        '</div>';
    messages.prepend(row);
});

And at the end of template:

<script src="{% static 'channels/js/websocketbridge.js' %}"></script>
<script src="{% static 'js/chat.js' %}"></script>

Project routing file:

application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            url("^sample/(?P<wspath>.+)/$", ChatConsumer),
        ])
    ),
})

P.S. consumer's code is in this thread above...

what methods have you overridden in your ChatConsumer

STOP THE PRESS!! I hadn't upgraded to master today, I did it just recently and channels.layers.InMemoryChannelLayer is working as expected. Still, all errors have remained the same.

Still, all errors have remained the same.

SILLY ME!! They are all working with InMemoryChannelLayer.

@hishnash THANK YOU, THANK YOU, THANK YOU!! It's somethiong with redis...

or something with how Redis is talking to channels...

I already mentioned I have installed django-redis (4.8.0) and redis (2.10.6) for the cache purpose. I'll try to remove cache backhand and test with redis again.

I already mentioned I have installed django-redis (4.8.0) and redis (2.10.6) for the cache purpose. I'll try to remove cache backhand and test with redis again.

I switched cache to memcache and reverted channel layer to redis, but all the error remains the same. So installed django-redis and redis aren't the issue here.

I'll switch to @override_settings and channels.layers.InMemoryChannelLayer for all my websockets tests.

Let me know if I may do something more with this; I'll surely try to revert back to redis in tests after every update of channels and/or redis related packages.

Well, the actual bug here is waaaay back and is this: RuntimeError: Two event loops are trying to receive() on one channel layer at once!

The question is, why. I presume what is happening is a consumer is still living on in the background (and trying to receive) as the next test is coming around.

Do all your tests that use the Communicator end with WebsocketCommunicator.disconnect()?

Do all your tests that use the Communicator end with WebsocketCommunicator.disconnect()?

Yep, the last line in every async pytest function is:

await communicator.disconnect()

Alright, then it may be an issue that the disconnect is not correctly waiting for the coroutines of the test consumers to exit.

Having a simple set of steps to reproduce is going to be the thing that makes this bug fixable - is there any subset of your tests you know fails if run one after the other and can simplify down?

I've just upgraded to latest master, aioredis and msgpack.

If the following is a test module:

def get_sample():
    try:
        samples = Sample.objects.all()
        if samples:
            return samples[0]
    except Sample.DoesNotExist:
        return None


def create_sample():
    name = str(uuid.uuid4())
    user = User.objects.create_user(
        username=name,
        email='{}@test.com'.format(name)
    )
    sample = Sample.objects.create(
        description='ws person',
        profile=user.profile,
        data=TEST_DATA,
    )
    return sample


async def setup_database_and_sample_websocket():
    sample = await database_sync_to_async(get_sample)()
    if sample is None:
        sample = await database_sync_to_async(create_sample)()
    path = '/sample/{}/'.format(sample.id)
    communicator = WebsocketCommunicator(DetailsConsumer, path)
    communicator.scope['user'] = sample.profile.user
    communicator.scope['session'] = {}
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_chat_returns_sent_for_valid_input():
    communicator = await setup_database_and_sample_websocket()
    await communicator.send_json_to({'message': 'sent text', 'ordinal': '1'})
    response = await communicator.receive_json_from()
    assert response['message'] == 'sent text'
    assert response['ordinal'] == '1'
    await communicator.disconnect()


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_chat_returns_sent_for_valid_input2():
    communicator = await setup_database_and_sample_websocket()
    await communicator.send_json_to({'message': 'sent text2', 'ordinal': '1'})
    response = await communicator.receive_json_from()
    assert response['message'] == 'sent text2'
    assert response['ordinal'] == '1'
    await communicator.disconnect()


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_chat_returns_sent_for_valid_input3():
    communicator = await setup_database_and_sample_websocket()
    await communicator.send_json_to({'message': 'sent text3', 'ordinal': '1'})
    response = await communicator.receive_json_from()
    assert response['message'] == 'sent text3'
    assert response['ordinal'] == '1'
    await communicator.disconnect()

then the third test would fail with:

(my_env) kradem@i5-4570:~/dev/my_repo/project$ pt someapp/tests/test_async_consumer.py 
======================================== test session starts =========================================
platform linux -- Python 3.6.4, pytest-3.4.1, py-1.5.2, pluggy-0.6.0 -- /home/kradem/.virtualenvs/my_env/bin/python3.6
cachedir: .pytest_cache
metadata: {'Python': '3.6.4', 'Platform': 'Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial', 'Packages': {'pytest': '3.4.1', 'py': '1.5.2', 'pluggy': '0.6.0'}, 'Plugins': {'xdist': '1.22.1', 'ordering': '0.5', 'metadata': '1.6.0', 'forked': '0.2', 'django': '3.1.2', 'cov': '2.5.1', 'asyncio': '0.8.0'}, 'JAVA_HOME': '/usr/lib/jvm/java-8-oracle'}
Django settings: project.settings.development (from environment variable)
rootdir: /home/kradem/dev/my_repo/project, inifile:
plugins: xdist-1.22.1, ordering-0.5, metadata-1.6.0, forked-0.2, django-3.1.2, cov-2.5.1, asyncio-0.8.0
collected 3 items                                                                                    

someapp/tests/test_async_consumer.py::test_chat_returns_sent_for_valid_input PASSED           [ 33%]
someapp/tests/test_async_consumer.py::test_chat_returns_sent_for_valid_input2 PASSED          [ 66%]
someapp/tests/test_async_consumer.py::test_chat_returns_sent_for_valid_input3 FAILED          [100%]

============================================== FAILURES ==============================================
______________________________ test_chat_returns_sent_for_valid_input3 _______________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f04201a96a0>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7f04201a9cf8 maxsize=0 tasks=1>

    @coroutine
    def get(self):
        """Remove and return an item from the queue.

            If queue is empty, wait until an item is available.

            This method is a coroutine.
            """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               yield from getter
E               concurrent.futures._base.CancelledError

/usr/lib/python3.6/asyncio/queues.py:167: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db
    @pytest.mark.asyncio
    async def test_chat_returns_sent_for_valid_input3():
        communicator = await setup_database_and_websocket()
        await communicator.send_json_to({'message': 'sent text3', 'ordinal': '1'})
>       response = await communicator.receive_json_from()

someapp/tests/test_async_consumer.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
    payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
    response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: in receive_output
    return await self.output_queue.get()
/usr/lib/python3.6/asyncio/coroutines.py:212: in coro
    res = func(*args, **kw)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <async_timeout.timeout object at 0x7f04201a9828>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type):
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
-------------------------------------- Captured stdout teardown --------------------------------------
Destroying test database for alias 'default'...
================================= 1 failed, 2 passed in 1.94 seconds =================================
Task was destroyed but it is pending!
task: <Task pending coro=<AsyncConsumer.__call__() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object AsyncConsumer.__call__ at 0x7f0421942200>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py", line 53, in await_many_dispatch
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/lib/python3.6/asyncio/queues.py:167> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:169> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0420673d38>()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Exception ignored in: <coroutine object RedisChannelLayer.receive at 0x7f042067cbf8>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 173, in receive
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive_loop() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:187> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive_loop at 0x7f042067cba0>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 187, in receive_loop
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 236, in receive_single
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py", line 407, in __aexit__
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/commands/__init__.py", line 54, in close
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 352, in close
  File "/home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py", line 359, in _do_close
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/kradem/.virtualenvs/my_env/lib/python3.6/site-packages/aioredis/connection.py:181> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0420673a08>()]> cb=[Future.set_result()]>

In another app and test case the fifth (edit: sorry, sometimes sixth or seventh...) test will fail (those tests are different, not like those "same" from above) no matter how I rearrange them from the top to bottom - and always the very next one, and always the rest of the suit will pass (just like the first four or five or six):

Edit: Actually I've managed to rearrange test functions in some smaller test modules (8 or fewer test functions) to pass almost always if single test module is called, by placing "critical" test functions on top.
However, if I call the whole test suite then the first module would pass and the "RuntimeError: Two event loops are trying to receive() on one channel layer at once!" would be raised on the first few test functions in the very next async test module.

____________________ test_async_1 ____________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7fb1cbd902e8>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7fb1cbdb55f8 maxsize=0 tasks=1>

    @coroutine
    def get(self):
        """Remove and return an item from the queue.

            If queue is empty, wait until an item is available.

            This method is a coroutine.
            """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               yield from getter
E               concurrent.futures._base.CancelledError

/usr/lib/python3.6/asyncio/queues.py:167: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db
    @pytest.mark.asyncio
    async def test_async_1():
        communicator = await setup_database_and_person_websocket()
        sent = {'words': []}
        await communicator.send_json_to(sent)
>       response = await communicator.receive_json_from()

someapp2/tests/test_async_person_consumer.py:138:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
    payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
    response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62: in receive_output
    return await self.output_queue.get()
/usr/lib/python3.6/asyncio/coroutines.py:212: in coro
    res = func(*args, **kw)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7fb1cbdb8898>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type):
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError
_________________ test_person_detail_websocket_words_returns_equal_length_collection _________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7fb1c84f7a20>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7fb1c84f7470 maxsize=0 tasks=1>

    @coroutine
    def get(self):
        """Remove and return an item from the queue.

            If queue is empty, wait until an item is available.

            This method is a coroutine.
            """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               yield from getter
E               concurrent.futures._base.CancelledError

/usr/lib/python3.6/asyncio/queues.py:167: CancelledError

During handling of the above exception, another exception occurred:

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7fb1c84f7a20>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

args = (<async_timeout.timeout object at 0x7fb1e39b5780>, <class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7fb1e3d2c708>)
kw = {}

    @functools.wraps(func)
    def coro(*args, **kw):
>       res = func(*args, **kw)

/usr/lib/python3.6/asyncio/coroutines.py:212:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7fb1e39b5780>
exc_type = <class 'concurrent.futures._base.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7fb1e3d2c708>

    @asyncio.coroutine
    def __aexit__(self, exc_type, exc_val, exc_tb):
>       self._do_exit(exc_type)

../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:43:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7fb1e39b5780>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type):
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

../../../.virtualenvs/my_env/lib/python3.6/site-packages/async_timeout/__init__.py:80: TimeoutError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db
    @pytest.mark.asyncio
    async def test_person_detail_websocket_words_returns_equal_length_collection():
        communicator = await setup_database_and_person_websocket()
        sent = {'words': ['ferist', "secnode"]}
        await communicator.send_json_to(sent)
>       response = await communicator.receive_json_from()

someapp2/tests/test_async_person_consumer.py:149:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:88: in receive_json_from
    payload = await self.receive_from(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
    response = await self.receive_output(timeout)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
    self.future.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels/utils.py:47: in await_many_dispatch
    result = task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:167: in receive
    task.result()
../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:186: in receive_loop
    async with self.check_receive_lock():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <channels_redis.core.RedisChannelLayer object at 0x7fb1e275ac88>

    def check_receive_lock(self):
        """
            Returns the receive lock, doing current-loop checking.
            """
        loop = asyncio.get_event_loop()
        if self.receive_lock_loop is None:
            # Lock was not yet populated. Populate it!
            self.receive_lock_loop = loop
            self.receive_lock = asyncio.Lock()
        elif self.receive_lock_loop != loop:
            # See if the lock is locked
            if self.receive_lock.locked():
>               raise RuntimeError("Two event loops are trying to receive() on one channel layer at once!")
E               RuntimeError: Two event loops are trying to receive() on one channel layer at once!

../../../.virtualenvs/my_env/lib/python3.6/site-packages/channels_redis/core.py:204: RuntimeError
-------------------------------------- Captured stdout teardown --------------------------------------
Destroying test database for alias 'default'...
================================= 2 failed, 6 passed in 3.44 seconds =================================

The error for the very next test that fails is:

someapp2/tests/test_async_person_consumer.py::test_async_sample_2 Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py:169> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb1cbe02228>()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Task was destroyed but it is pending!
task: <Task pending coro=<AsyncConsumer.__call__() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels/consumer.py:54> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive at 0x7fb1cbde89e8>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 173, in receive
    task.cancel()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<RedisChannelLayer.receive_loop() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py:187> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive_loop at 0x7fb1e26bf570>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 187, in receive_loop
    real_channel, message = await self.receive_single(general_channel)
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 236, in receive_single
    return channel, message
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels_redis/core.py", line 407, in __aexit__
    self.conn.close()
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/commands/__init__.py", line 54, in close
    self._pool_or_conn.close()
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/connection.py", line 352, in close
    self._do_close(ConnectionForcedCloseError())
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/connection.py", line 359, in _do_close
    self._writer.transport.close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object AsyncConsumer.__call__ at 0x7fb1e39b2db0>
Traceback (most recent call last):
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/channels/utils.py", line 53, in await_many_dispatch
    task.cancel()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 574, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/lib/python3.6/asyncio/queues.py:167> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.6/asyncio/tasks.py:380]>
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/kradem/.virtualenvs/dev_hm/lib/python3.6/site-packages/aioredis/connection.py:181> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb1cbe02618>()]> cb=[Future.set_result()]>
FAILED [100%]

Alright, I've managed to replicate it locally with that, thanks. Will investigate and fix soon.

I'm glad I was able to help. Best regards!

I have pushed up commits to the master branch of both asgiref and channels. If you could pull those down and install them both, that should fix the issue and I can close this (your consumer is throwing another error, I presume, the problem here was that error handling skipped cleanup for all the async stuff and left it in a mess)

Works like a charm! Now I'm able to test some project's functionalities I haven't tested since those part were in the early stage (on http, not websocket). Together with ChannelsLiveServerTestCase, my project is secured like Fort Knox! :)

Thank you guys for your work on this beautiful package!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

davidfstr picture davidfstr  路  18Comments

DarthLabonne picture DarthLabonne  路  33Comments

joshua-s picture joshua-s  路  30Comments

orokusaki picture orokusaki  路  27Comments

shtalinberg picture shtalinberg  路  23Comments