Channels: Cannot test synchronous consumer that includes async_to_sync

Created on 1 Mar 2018  路  15Comments  路  Source: django/channels

I am having a problem calling async_to_sync from python manage.py tests. The relevant code is

class MyHandler(SyncConsumer):

    def websocket_connect(self, event):
        self.send({ "type": "websocket.accept"})
        async_to_sync(self.channel_layer.group_add('grpup', self.channel_name))

Test looks like this -- I use asynctest because I don't want this to be the only test in my project that uses pytest syntax.

class ChannelTests(asynctest.TestCase):

    async def test_websockets(self):
        communicator = WebsocketCommunicator(MyHandler, path)
        connected, subprotocol = await communicator.connect()
        self.assertTrue(connected)

When I run this, I get RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. But I'm actually not sure it ever gets to my async_to_sync call. Looking at the call stack, it seems to bomb on the previous line:

Traceback (most recent call last):
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asynctest/case.py", line 297, in run
    self._run_test_method(testMethod)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asynctest/case.py", line 354, in _run_test_method
    self.loop.run_until_complete(result)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asynctest/case.py", line 224, in wrapper
    return method(*args, **kwargs)
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/jonathanstray/PycharmProjects/cjworkbench/server/tests/test_websockets.py", line 26, in test_websockets
    connected, subprotocol = await communicator.connect()
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/testing/websocket.py", line 33, in connect
    response = await self.receive_output(timeout)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/testing.py", line 66, in receive_output
    self.future.result()
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/consumer.py", line 54, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/utils.py", line 48, in await_many_dispatch
    await dispatch(result)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/sync.py", line 110, in __call__
    return await asyncio.wait_for(future, timeout=None)
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 373, in wait_for
    return (yield from fut)
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
    future.result()
  File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/jonathanstray/anaconda/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/db.py", line 13, in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/sync.py", line 125, in thread_handler
    return self.func(*args, **kwargs)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/consumer.py", line 99, in dispatch
    handler(message)
  File "/Users/jonathanstray/PycharmProjects/cjworkbench/server/websockets.py", line 36, in websocket_connect
    self.send({ "type": "websocket.accept"})
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/consumer.py", line 107, in send
    self.base_send(message)
  File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/sync.py", line 34, in __call__
    "You cannot use AsyncToSync in the same thread as an async event loop - "
RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

This is on OS X / django 1.11 / channels 2.0.2

blockeuser-response bug

Most helpful comment

Maybe not directly related, but there's probably a problem with your code. The line async_to_sync(self.channel_layer.group_add('grpup', self.channel_name)) should be async_to_sync(self.channel_layer.group_add)('grpup', self.channel_name) instead. You want to convert the group_add method into a synchronous version and call it. See https://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions

All 15 comments

Does the same code work if you use pytest and its async support?

Maybe not directly related, but there's probably a problem with your code. The line async_to_sync(self.channel_layer.group_add('grpup', self.channel_name)) should be async_to_sync(self.channel_layer.group_add)('grpup', self.channel_name) instead. You want to convert the group_add method into a synchronous version and call it. See https://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions

Closing due to lack of response.

I'll get back to this, I promise, and then you'll have your answer. Will re-open then if needed.

I have the exact same issue.

I also tried to use django's Testcase and wrap the async code of the WebsocketCommunicator as follows:

communicator = WebsocketCommunicator(MyConsumer, "/ws/")
connected, subprotocol = async_to_sync(communicator.connect)()

I get the following error:

Traceback (most recent call last):
  File "/home/artemis/PycharmProjects/vcg_api_server/ws/tests/test_vcg_consumer.py", line 24, in test_non_registered_datacenter_connected
    connected, subprotocol = async_to_sync(communicator.connect)()
  File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py", line 64, in __call__
    return call_result.result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py", line 78, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/testing/websocket.py", line 38, in connect
    response = await self.receive_output(timeout)
  File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py", line 75, in receive_output
    return await self.output_queue.get()
  File "/usr/lib/python3.6/asyncio/queues.py", line 167, in get
    yield from getter
RuntimeError: Task <Task pending coro=<AsyncToSync.main_wrap() running at /home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:78> cb=[_run_until_complete_cb() at /usr/lib/python3.6/asyncio/base_events.py:176]> got Future <Future pending> attached to a different loop

----------------------------------------------------------------------
Ran 1 test in 0.033s

@artemistomaras Could you post the code of the consumer you are testing, and the test case that's failing?

@andrewgodwin thank you for your time. I may be able to provide my full source code if that will assist you further. Anyways, the relevant code parts are:

Code for my consumer:

import logging
import uuid

from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer
from django.utils import timezone

from ws.models import DatacenterClient, DatacenterMessageLog

log = logging.getLogger(__name__)


class VCGConsumer(JsonWebsocketConsumer):
    """Channels consumer that handles websocket connection coming from DCs.
    """
    def connect(self):
        """Fired when the vcg_api client websocket connects.
        """
        headers = dict(self.scope['headers'])
        wallet_address = headers[b'wallet'].decode('utf-8')
        self.dc_name = self.scope['url_route']['kwargs']['dc_name']
        self.dc_client = DatacenterClient.objects.filter(address=wallet_address, name=self.dc_name)

        # Reject unknown clients
        if not self.dc_client.exists():
            log.warning('Rejecting client with name={} and address={}'.format(self.dc_name, wallet_address))
            self.close()
            self.dc_client = None
            return

        # Update the client details
        self.dc_client.update(
            channel_name=self.channel_name, last_connected=timezone.now(), status='connected'
        )
        self.dc_client = self.dc_client.first()

        async_to_sync(self.channel_layer.group_add)(
            self.dc_client.group_name,
            self.channel_name
        )
        self.accept()
        log.info("Datacenter `{}` connected.".format(self.dc_client.name))

    def disconnect(self, close_code):
        """Fired when the vcg_api client websocket disconnects.
        """
        if self.dc_client:
            async_to_sync(self.channel_layer.group_discard)(
                self.dc_client.group_name,
                self.channel_name
            )
            self.close()
            self.dc_client.last_disconnected = timezone.now()
            self.dc_client.status = 'disconnected'
            self.dc_client.save()
            log.info("Datacenter `{}` closed connection with code: {}".format(self.dc_client.name, close_code))

    def receive_json(self, content, **kwargs):
        """Fired when the vcg_api client websocket sends a message.

        Args:
            content (dict): the message payload
            **kwargs: extra parameters
        """
        now = timezone.now()
        log.debug("Received payload: {}".format(content))
        _request_id = content.pop('_request_id')
        evm_exception = content.pop('error', False)
        msg_log = DatacenterMessageLog.objects.filter(request_id=uuid.UUID(_request_id))

        # Sanity check
        if not msg_log.exists():
            log.error('No message log exists for request id: {}'.format(_request_id))
            return

        # Set the response payload
        msg_log.update(response_payload=content, response_timestamp=now, evm_exception=evm_exception)
        self.dc_client.last_message = now
        self.dc_client.save()

    def chain_message(self, message):
        """Message dispatcher to the underlying Datacenter.

        The dispatched message is logged and persisted in the database. Along with its payload,
        a unique request_id is attached. When the vcg_api client responds back, it also attaches
        the same uuid and the response is also logged.

        Args:
            message (dict): the parameters to send to the vcg_api.
                One required argument is `method` which is the signature of the vcg_api method to invoke.
                For a complete list of methods and their parameters check `vcg_api.ethereum.py`.
        """

        # Create a new message log and attach the request_id uuid (if it is not present)
        if '_request_id' not in message.keys():
            msg_log = DatacenterMessageLog.objects.create(dc_client=self.dc_client, request_payload=message)
            message['_request_id'] = str(msg_log.request_id)

        # Send message to dc vcg_api client
        self.send_json(message)

Relative routing.py inside ws app

from django.conf.urls import url

from . import consumers

websocket_urlpatterns = [
    url(r'^ws/(?P<dc_name>[^/]+)/$', consumers.VCGConsumer),
]

Relative models.py

import uuid

from django.db import models
from django.contrib.postgres.fields import JSONField
from django.core.serializers.json import DjangoJSONEncoder

class DatacenterClient(models.Model):
    DC_STATUS = (
        ('connected', 'Connected'),
        ('disconnected', 'Disconnected'),
        ('unknown', 'Unknown')
    )
    name = models.CharField(max_length=120, unique=True)
    channel_name = models.CharField(max_length=120, blank=True, null=True, help_text='Ephemeral channel name')
    status = models.CharField(max_length=40, choices=DC_STATUS, default='unknown')
    address = models.CharField(max_length=42, unique=True, help_text='Wallet address of the DC')
    last_connected = models.DateTimeField(blank=True, null=True, help_text='Last timestamp of ws connect')
    last_disconnected = models.DateTimeField(blank=True, null=True, help_text='Last timestamp of ws disconnect')
    last_message = models.DateTimeField(blank=True, null=True, help_text='Last timestamp of message receive')

    class Meta:
        db_table = 'datacenter_clients'
        verbose_name = 'Datacenter client'
        verbose_name_plural = 'Datacenter clients'

    def __str__(self):
        return '{} - {}'.format(self.name, self.status)

    @property
    def group_name(self):
        return 'dc_{}'.format(self.name)


class DatacenterMessageLog(models.Model):
    request_id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    dc_client = models.ForeignKey(DatacenterClient, related_name='message_logs', on_delete=models.CASCADE)
    request_payload = JSONField(encoder=DjangoJSONEncoder, help_text='Payload send to the DC ws client')
    response_payload = JSONField(encoder=DjangoJSONEncoder, blank=True, null=True,
                                 help_text='Payload received from the DC ws client')
    request_timestamp = models.DateTimeField(auto_now_add=True, help_text='When the message was sent')
    response_timestamp = models.DateTimeField(blank=True, null=True, help_text='When the message response was received')
    evm_exception = models.BooleanField(default=False, help_text='If the ethereum client threw an exception')

    class Meta:
        db_table = 'datacenter_message_logs'
        verbose_name = 'Datacenter message log'
        verbose_name_plural = 'Datacenter message logs'

    def __str__(self):
        return '{} @ {}'.format(str(self.request_id), self.dc_client.name)

The test that fails (invoked by pytest )

import pytest
from channels.testing import WebsocketCommunicator

from vcg_api_server.routing import application


@pytest.mark.asyncio
async def test_non_registered_datacenter_connected():
    dc_address = "0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
    communicator = WebsocketCommunicator(application, "/ws/DC1/", headers=[('wallet', dc_address)])

    connected, subprotocol = await communicator.connect()
    assert connected is False

Error that I am getting:

(vcg_api_server) artemis@artemis-pc:~/PycharmProjects/vcg_api_server$ pytest
========================================== test session starts ===========================================
platform linux -- Python 3.6.3, pytest-3.6.1, py-1.5.3, pluggy-0.6.0
Django settings: vcg_api_server.settings.test (from ini file)
rootdir: /home/artemis/PycharmProjects/vcg_api_server, inifile: pytest.ini
plugins: django-3.2.1, asyncio-0.8.0
collected 1 item

ws/tests/test_vcg_consumer.py F                                                                    [100%]

================================================ FAILURES ================================================
________________________________ test_non_registered_datacenter_connected ________________________________

    @pytest.mark.asyncio
    async def test_non_registered_datacenter_connected():
        dc_address = "0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
>       communicator = WebsocketCommunicator(application, "/ws/DC1/", headers=[('wallet', dc_address)])

ws/tests/test_vcg_consumer.py:12:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/testing/websocket.py:26: in __init__
    super().__init__(application, self.scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:17: in __init__
    self.instance = self.application(scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/routing.py:56: in __call__
    return self.application_mapping[scope["type"]](scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/sessions.py:40: in __call__
    return self.inner(dict(scope, cookies=cookies))
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/sessions.py:138: in __call__
    return SessionMiddlewareInstance(scope, self)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/sessions.py:161: in __init__
    self.inner = self.middleware.inner(self.scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/auth.py:143: in __call__
    scope["user"] = async_to_sync(get_user)(scope)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.sync.AsyncToSync object at 0x7f9ca102dc50>
args = ({'cookies': {}, 'headers': [('wallet', '0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0')], 'path': '/ws/DC1/', 'query_string': b'', ...},)
kwargs = {}, event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>

    def __call__(self, *args, **kwargs):
        # You can't call AsyncToSync from a thread with a running event loop
        try:
            event_loop = asyncio.get_event_loop()
        except RuntimeError:
            pass
        else:
            if event_loop.is_running():
                raise RuntimeError(
>                   "You cannot use AsyncToSync in the same thread as an async event loop - "
                    "just await the async function directly."
                )
E               RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:34: RuntimeError
======================================== 1 failed in 0.19 seconds ========================================
Exception ignored in: <bound method ApplicationCommunicator.__del__ of <channels.testing.websocket.WebsocketCommunicator object at 0x7f9ca102d748>>
Traceback (most recent call last):
  File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py", line 53, in __del__
  File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py", line 44, in stop
AttributeError: 'WebsocketCommunicator' object has no attribute 'future'

P.S.: My test settings

from .base import *

CELERY_ALWAYS_EAGER = True

ASGI_APPLICATION = 'vcg_api_server.routing.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels.layers.InMemoryChannelLayer',
    },
}

....

Quick update

I've managed to get pass my initial error by changing my test case to this:

import pytest
from channels.testing import WebsocketCommunicator

from ws.consumers import VCGConsumer


@pytest.mark.asyncio
async def test_non_registered_datacenter_connected():
    dc_address = b"0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
    communicator = WebsocketCommunicator(VCGConsumer, "ws/DC1/", headers=[(b'wallet', dc_address)])

    connected, subprotocol = await communicator.connect()
    assert connected is False

but then when run my test again, I get the following error

(vcg_api_server) artemis@artemis-pc:~/PycharmProjects/vcg_api_server$ pytest
========================================== test session starts ===========================================
platform linux -- Python 3.6.3, pytest-3.6.1, py-1.5.3, pluggy-0.6.0
Django settings: vcg_api_server.settings.test (from ini file)
rootdir: /home/artemis/PycharmProjects/vcg_api_server, inifile: pytest.ini
plugins: django-3.2.1, asyncio-0.8.0
collected 1 item

ws/tests/test_vcg_consumer.py F                                                                    [100%]

================================================ FAILURES ================================================
________________________________ test_non_registered_datacenter_connected ________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f99fcbaa978>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:75:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f99fcbaac18 maxsize=0 _getters[1]>

    @coroutine
    def get(self):
        """Remove and return an item from the queue.

            If queue is empty, wait until an item is available.

            This method is a coroutine.
            """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               yield from getter
E               concurrent.futures._base.CancelledError

/usr/lib/python3.6/asyncio/queues.py:167: CancelledError

During handling of the above exception, another exception occurred:

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f99fcbaa978>, timeout = 1

    async def receive_output(self, timeout=1):
        """
            Receives a single message from the application, with optional timeout.
            """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout.timeout(timeout):
>               return await self.output_queue.get()

../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:75:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

args = (<async_timeout.timeout object at 0x7f99fcbaacc0>, <class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7f99fcbcf088>)
kw = {}

    @functools.wraps(func)
    def coro(*args, **kw):
>       res = func(*args, **kw)

/usr/lib/python3.6/asyncio/coroutines.py:210:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7f99fcbaacc0>
exc_type = <class 'concurrent.futures._base.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7f99fcbcf088>

    @asyncio.coroutine
    def __aexit__(self, exc_type, exc_val, exc_tb):
>       self._do_exit(exc_type)

../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/async_timeout/__init__.py:46:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <async_timeout.timeout object at 0x7f99fcbaacc0>
exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type):
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/async_timeout/__init__.py:83: TimeoutError

During handling of the above exception, another exception occurred:

    @pytest.mark.asyncio
    async def test_non_registered_datacenter_connected():
        dc_address = b"0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
        communicator = WebsocketCommunicator(VCGConsumer, "ws/DC1/", headers=[(b'wallet', dc_address)])

>       connected, subprotocol = await communicator.connect()

ws/tests/test_vcg_consumer.py:13:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/testing/websocket.py:38: in connect
    response = await self.receive_output(timeout)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:79: in receive_output
    self.future.result()
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/utils.py:50: in await_many_dispatch
    await dispatch(result)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:110: in __call__
    return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
    return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
    result = self.fn(*self.args, **self.kwargs)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:125: in thread_handler
    return self.func(*args, **kwargs)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
    handler(message)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/generic/websocket.py:32: in websocket_connect
    self.connect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <ws.consumers.vcg_consumer.VCGConsumer object at 0x7f99fcbaab70>

    def connect(self):
        """Fired when the vcg_api client websocket connects.
            """
        headers = dict(self.scope['headers'])
        wallet_address = headers[b'wallet'].decode('utf-8')
>       self.dc_name = self.scope['url_route']['kwargs']['dc_name']
E       KeyError: 'url_route'

ws/consumers/vcg_consumer.py:21: KeyError
======================================== 1 failed in 1.21 seconds ========================================

which kind of makes sesnse because when I look inside the WebsocketCommunicator source code, the scope object has no url_route key

Ah, yes, if you use URL params you need to test via a URLRouter. Docs here: http://channels.readthedocs.io/en/latest/topics/testing.html#websocketcommunicator

I have the same issue.
My view has a send method that use async_to_sync

    @classmethod
    def send_message(cls, serialized_object, message_type):
        # get current channel layer and send the announce to all connected client
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)("announcement", {
            "type": "send.notification",
            "object": "announcement",
            "payload": serialized_object,
            "crud_type": message_type
        })

My test create a "request" with the view, I post data so the channel method is called.

request = factory.post(url, data)
force_authenticate(request, user=superuser)
view = announcement_view.AnnouncementMessageList.as_view()

await view(request)
response_ws = await communicator.receive_from()

And the result

def __call__(self, *args, **kwargs):
        # You can't call AsyncToSync from a thread with a running event loop
        try:
            event_loop = asyncio.get_event_loop()
        except RuntimeError:
            pass
        else:
            if event_loop.is_running():
                raise RuntimeError(
>                   "You cannot use AsyncToSync in the same thread as an async event loop - "
                    "just await the async function directly."
                )
E               RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

Also, testing inside a class like an apitestcase seems not possible. The async method is never called.

If an async test runs some function which in turn runs something within async_to_sync you can wrap that function in sync_to_async to get rid of the "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly" error.

I got the same message "You cannot use AsyncToSync..." and I fixed it by replacing async_to_sync with a mock, and then putting an async version of the same call to group_send in my test case itself:

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_views():
    # need to use context manager because it seems mock decorators
    # don't work on async functions
    with patch('asgiref.sync.async_to_sync') as mock_a2s:
        function_that_calls_async_to_sync()
        [group_send], _ = mock_a2s.call_args
        args, kwargs = mock_a2s.return_value.call_args
        await group_send(*args, **kwargs)
        # instantiate WebsocketCommunicator, do asserts...

If your code uses async_to_sync multiple times, then you will need to use call_args_list instead of call_args.

For me, wrapping the function in sync_to_async did not make the "You cannot use AsyncToSync..." error go away.

(I also needed to use @pytest.mark.django_db(transaction=True) to prevent the DB from locking, as described here).

Thanks @oTree-org!
I fixed this issue by mocking async_to_sync with following class:

class AsyncToSyncMock:
    def __init__(self):
        self.awaitable = None
        self.constructor_kwargs = None
        self.args = ()
        self.kwargs = {}

    def __call__(self, awaitable, **kwargs):
        self.awaitable = awaitable
        self.constructor_kwargs = kwargs

        def _inner_call(*i_args, **i_kwargs):
            self.args = i_args
            self.kwargs = i_kwargs
        return _inner_call

    async def call(self):
        return await self.awaitable(*self.args, **self.kwargs)

Example of usage:

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_steam_login(client, monkeypatch):
    async_mock = AsyncToSyncMock()
    monkeypatch.setattr(application.module, 'async_to_sync', async_mock)
    # Call your sync function.
    function_with_aync_to_sync_inside()
    # Be careful! async_to_sync is actually evaluted here.
    await async_mock.call()

This works fine only if async_to_sync is called once. Hope that will be useful for somebody.

I have the exact same problem. Are there any updates?

If an async test runs some function which in turn runs something within async_to_sync you can wrap that function in sync_to_async to get rid of the "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly" error.

This worked, thanks!

EDIT: Actually, after some testing it seems that the testcase doesn't close the DB connection and I get this error after running the tests:

django.db.utils.OperationalError: database "test_foobar" is being accessed by other users
DETAIL:  There is 1 other session using the database.

Is there any workaround?

Was this page helpful?
0 / 5 - 0 ratings