I am having a problem calling async_to_sync from python manage.py tests. The relevant code is
class MyHandler(SyncConsumer):
def websocket_connect(self, event):
self.send({ "type": "websocket.accept"})
async_to_sync(self.channel_layer.group_add('grpup', self.channel_name))
Test looks like this -- I use asynctest because I don't want this to be the only test in my project that uses pytest syntax.
class ChannelTests(asynctest.TestCase):
async def test_websockets(self):
communicator = WebsocketCommunicator(MyHandler, path)
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
When I run this, I get RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. But I'm actually not sure it ever gets to my async_to_sync call. Looking at the call stack, it seems to bomb on the previous line:
Traceback (most recent call last):
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asynctest/case.py", line 297, in run
self._run_test_method(testMethod)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asynctest/case.py", line 354, in _run_test_method
self.loop.run_until_complete(result)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asynctest/case.py", line 224, in wrapper
return method(*args, **kwargs)
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "/Users/jonathanstray/PycharmProjects/cjworkbench/server/tests/test_websockets.py", line 26, in test_websockets
connected, subprotocol = await communicator.connect()
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/testing/websocket.py", line 33, in connect
response = await self.receive_output(timeout)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/testing.py", line 66, in receive_output
self.future.result()
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/consumer.py", line 54, in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/utils.py", line 48, in await_many_dispatch
await dispatch(result)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/sync.py", line 110, in __call__
return await asyncio.wait_for(future, timeout=None)
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 373, in wait_for
return (yield from fut)
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
future.result()
File "/Users/jonathanstray/anaconda/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/Users/jonathanstray/anaconda/lib/python3.5/concurrent/futures/thread.py", line 55, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/db.py", line 13, in thread_handler
return super().thread_handler(loop, *args, **kwargs)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/sync.py", line 125, in thread_handler
return self.func(*args, **kwargs)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/consumer.py", line 99, in dispatch
handler(message)
File "/Users/jonathanstray/PycharmProjects/cjworkbench/server/websockets.py", line 36, in websocket_connect
self.send({ "type": "websocket.accept"})
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/channels/consumer.py", line 107, in send
self.base_send(message)
File "/Users/jonathanstray/anaconda/lib/python3.5/site-packages/asgiref/sync.py", line 34, in __call__
"You cannot use AsyncToSync in the same thread as an async event loop - "
RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
This is on OS X / django 1.11 / channels 2.0.2
Does the same code work if you use pytest and its async support?
Maybe not directly related, but there's probably a problem with your code. The line async_to_sync(self.channel_layer.group_add('grpup', self.channel_name)) should be async_to_sync(self.channel_layer.group_add)('grpup', self.channel_name) instead. You want to convert the group_add method into a synchronous version and call it. See https://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions
Closing due to lack of response.
I'll get back to this, I promise, and then you'll have your answer. Will re-open then if needed.
I have the exact same issue.
I also tried to use django's Testcase and wrap the async code of the WebsocketCommunicator as follows:
communicator = WebsocketCommunicator(MyConsumer, "/ws/")
connected, subprotocol = async_to_sync(communicator.connect)()
I get the following error:
Traceback (most recent call last):
File "/home/artemis/PycharmProjects/vcg_api_server/ws/tests/test_vcg_consumer.py", line 24, in test_non_registered_datacenter_connected
connected, subprotocol = async_to_sync(communicator.connect)()
File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py", line 64, in __call__
return call_result.result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py", line 78, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/testing/websocket.py", line 38, in connect
response = await self.receive_output(timeout)
File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py", line 75, in receive_output
return await self.output_queue.get()
File "/usr/lib/python3.6/asyncio/queues.py", line 167, in get
yield from getter
RuntimeError: Task <Task pending coro=<AsyncToSync.main_wrap() running at /home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:78> cb=[_run_until_complete_cb() at /usr/lib/python3.6/asyncio/base_events.py:176]> got Future <Future pending> attached to a different loop
----------------------------------------------------------------------
Ran 1 test in 0.033s
@artemistomaras Could you post the code of the consumer you are testing, and the test case that's failing?
@andrewgodwin thank you for your time. I may be able to provide my full source code if that will assist you further. Anyways, the relevant code parts are:
Code for my consumer:
import logging
import uuid
from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer
from django.utils import timezone
from ws.models import DatacenterClient, DatacenterMessageLog
log = logging.getLogger(__name__)
class VCGConsumer(JsonWebsocketConsumer):
"""Channels consumer that handles websocket connection coming from DCs.
"""
def connect(self):
"""Fired when the vcg_api client websocket connects.
"""
headers = dict(self.scope['headers'])
wallet_address = headers[b'wallet'].decode('utf-8')
self.dc_name = self.scope['url_route']['kwargs']['dc_name']
self.dc_client = DatacenterClient.objects.filter(address=wallet_address, name=self.dc_name)
# Reject unknown clients
if not self.dc_client.exists():
log.warning('Rejecting client with name={} and address={}'.format(self.dc_name, wallet_address))
self.close()
self.dc_client = None
return
# Update the client details
self.dc_client.update(
channel_name=self.channel_name, last_connected=timezone.now(), status='connected'
)
self.dc_client = self.dc_client.first()
async_to_sync(self.channel_layer.group_add)(
self.dc_client.group_name,
self.channel_name
)
self.accept()
log.info("Datacenter `{}` connected.".format(self.dc_client.name))
def disconnect(self, close_code):
"""Fired when the vcg_api client websocket disconnects.
"""
if self.dc_client:
async_to_sync(self.channel_layer.group_discard)(
self.dc_client.group_name,
self.channel_name
)
self.close()
self.dc_client.last_disconnected = timezone.now()
self.dc_client.status = 'disconnected'
self.dc_client.save()
log.info("Datacenter `{}` closed connection with code: {}".format(self.dc_client.name, close_code))
def receive_json(self, content, **kwargs):
"""Fired when the vcg_api client websocket sends a message.
Args:
content (dict): the message payload
**kwargs: extra parameters
"""
now = timezone.now()
log.debug("Received payload: {}".format(content))
_request_id = content.pop('_request_id')
evm_exception = content.pop('error', False)
msg_log = DatacenterMessageLog.objects.filter(request_id=uuid.UUID(_request_id))
# Sanity check
if not msg_log.exists():
log.error('No message log exists for request id: {}'.format(_request_id))
return
# Set the response payload
msg_log.update(response_payload=content, response_timestamp=now, evm_exception=evm_exception)
self.dc_client.last_message = now
self.dc_client.save()
def chain_message(self, message):
"""Message dispatcher to the underlying Datacenter.
The dispatched message is logged and persisted in the database. Along with its payload,
a unique request_id is attached. When the vcg_api client responds back, it also attaches
the same uuid and the response is also logged.
Args:
message (dict): the parameters to send to the vcg_api.
One required argument is `method` which is the signature of the vcg_api method to invoke.
For a complete list of methods and their parameters check `vcg_api.ethereum.py`.
"""
# Create a new message log and attach the request_id uuid (if it is not present)
if '_request_id' not in message.keys():
msg_log = DatacenterMessageLog.objects.create(dc_client=self.dc_client, request_payload=message)
message['_request_id'] = str(msg_log.request_id)
# Send message to dc vcg_api client
self.send_json(message)
Relative routing.py inside ws app
from django.conf.urls import url
from . import consumers
websocket_urlpatterns = [
url(r'^ws/(?P<dc_name>[^/]+)/$', consumers.VCGConsumer),
]
Relative models.py
import uuid
from django.db import models
from django.contrib.postgres.fields import JSONField
from django.core.serializers.json import DjangoJSONEncoder
class DatacenterClient(models.Model):
DC_STATUS = (
('connected', 'Connected'),
('disconnected', 'Disconnected'),
('unknown', 'Unknown')
)
name = models.CharField(max_length=120, unique=True)
channel_name = models.CharField(max_length=120, blank=True, null=True, help_text='Ephemeral channel name')
status = models.CharField(max_length=40, choices=DC_STATUS, default='unknown')
address = models.CharField(max_length=42, unique=True, help_text='Wallet address of the DC')
last_connected = models.DateTimeField(blank=True, null=True, help_text='Last timestamp of ws connect')
last_disconnected = models.DateTimeField(blank=True, null=True, help_text='Last timestamp of ws disconnect')
last_message = models.DateTimeField(blank=True, null=True, help_text='Last timestamp of message receive')
class Meta:
db_table = 'datacenter_clients'
verbose_name = 'Datacenter client'
verbose_name_plural = 'Datacenter clients'
def __str__(self):
return '{} - {}'.format(self.name, self.status)
@property
def group_name(self):
return 'dc_{}'.format(self.name)
class DatacenterMessageLog(models.Model):
request_id = models.UUIDField(primary_key=True, default=uuid.uuid4)
dc_client = models.ForeignKey(DatacenterClient, related_name='message_logs', on_delete=models.CASCADE)
request_payload = JSONField(encoder=DjangoJSONEncoder, help_text='Payload send to the DC ws client')
response_payload = JSONField(encoder=DjangoJSONEncoder, blank=True, null=True,
help_text='Payload received from the DC ws client')
request_timestamp = models.DateTimeField(auto_now_add=True, help_text='When the message was sent')
response_timestamp = models.DateTimeField(blank=True, null=True, help_text='When the message response was received')
evm_exception = models.BooleanField(default=False, help_text='If the ethereum client threw an exception')
class Meta:
db_table = 'datacenter_message_logs'
verbose_name = 'Datacenter message log'
verbose_name_plural = 'Datacenter message logs'
def __str__(self):
return '{} @ {}'.format(str(self.request_id), self.dc_client.name)
The test that fails (invoked by pytest )
import pytest
from channels.testing import WebsocketCommunicator
from vcg_api_server.routing import application
@pytest.mark.asyncio
async def test_non_registered_datacenter_connected():
dc_address = "0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
communicator = WebsocketCommunicator(application, "/ws/DC1/", headers=[('wallet', dc_address)])
connected, subprotocol = await communicator.connect()
assert connected is False
Error that I am getting:
(vcg_api_server) artemis@artemis-pc:~/PycharmProjects/vcg_api_server$ pytest
========================================== test session starts ===========================================
platform linux -- Python 3.6.3, pytest-3.6.1, py-1.5.3, pluggy-0.6.0
Django settings: vcg_api_server.settings.test (from ini file)
rootdir: /home/artemis/PycharmProjects/vcg_api_server, inifile: pytest.ini
plugins: django-3.2.1, asyncio-0.8.0
collected 1 item
ws/tests/test_vcg_consumer.py F [100%]
================================================ FAILURES ================================================
________________________________ test_non_registered_datacenter_connected ________________________________
@pytest.mark.asyncio
async def test_non_registered_datacenter_connected():
dc_address = "0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
> communicator = WebsocketCommunicator(application, "/ws/DC1/", headers=[('wallet', dc_address)])
ws/tests/test_vcg_consumer.py:12:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/testing/websocket.py:26: in __init__
super().__init__(application, self.scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:17: in __init__
self.instance = self.application(scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/routing.py:56: in __call__
return self.application_mapping[scope["type"]](scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/sessions.py:40: in __call__
return self.inner(dict(scope, cookies=cookies))
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/sessions.py:138: in __call__
return SessionMiddlewareInstance(scope, self)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/sessions.py:161: in __init__
self.inner = self.middleware.inner(self.scope)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/auth.py:143: in __call__
scope["user"] = async_to_sync(get_user)(scope)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asgiref.sync.AsyncToSync object at 0x7f9ca102dc50>
args = ({'cookies': {}, 'headers': [('wallet', '0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0')], 'path': '/ws/DC1/', 'query_string': b'', ...},)
kwargs = {}, event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
def __call__(self, *args, **kwargs):
# You can't call AsyncToSync from a thread with a running event loop
try:
event_loop = asyncio.get_event_loop()
except RuntimeError:
pass
else:
if event_loop.is_running():
raise RuntimeError(
> "You cannot use AsyncToSync in the same thread as an async event loop - "
"just await the async function directly."
)
E RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:34: RuntimeError
======================================== 1 failed in 0.19 seconds ========================================
Exception ignored in: <bound method ApplicationCommunicator.__del__ of <channels.testing.websocket.WebsocketCommunicator object at 0x7f9ca102d748>>
Traceback (most recent call last):
File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py", line 53, in __del__
File "/home/artemis/.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py", line 44, in stop
AttributeError: 'WebsocketCommunicator' object has no attribute 'future'
P.S.: My test settings
from .base import *
CELERY_ALWAYS_EAGER = True
ASGI_APPLICATION = 'vcg_api_server.routing.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels.layers.InMemoryChannelLayer',
},
}
....
Quick update
I've managed to get pass my initial error by changing my test case to this:
import pytest
from channels.testing import WebsocketCommunicator
from ws.consumers import VCGConsumer
@pytest.mark.asyncio
async def test_non_registered_datacenter_connected():
dc_address = b"0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
communicator = WebsocketCommunicator(VCGConsumer, "ws/DC1/", headers=[(b'wallet', dc_address)])
connected, subprotocol = await communicator.connect()
assert connected is False
but then when run my test again, I get the following error
(vcg_api_server) artemis@artemis-pc:~/PycharmProjects/vcg_api_server$ pytest
========================================== test session starts ===========================================
platform linux -- Python 3.6.3, pytest-3.6.1, py-1.5.3, pluggy-0.6.0
Django settings: vcg_api_server.settings.test (from ini file)
rootdir: /home/artemis/PycharmProjects/vcg_api_server, inifile: pytest.ini
plugins: django-3.2.1, asyncio-0.8.0
collected 1 item
ws/tests/test_vcg_consumer.py F [100%]
================================================ FAILURES ================================================
________________________________ test_non_registered_datacenter_connected ________________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f99fcbaa978>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:75:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f99fcbaac18 maxsize=0 _getters[1]>
@coroutine
def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> yield from getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.6/asyncio/queues.py:167: CancelledError
During handling of the above exception, another exception occurred:
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f99fcbaa978>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout.timeout(timeout):
> return await self.output_queue.get()
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:75:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (<async_timeout.timeout object at 0x7f99fcbaacc0>, <class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7f99fcbcf088>)
kw = {}
@functools.wraps(func)
def coro(*args, **kw):
> res = func(*args, **kw)
/usr/lib/python3.6/asyncio/coroutines.py:210:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7f99fcbaacc0>
exc_type = <class 'concurrent.futures._base.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7f99fcbcf088>
@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
> self._do_exit(exc_type)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/async_timeout/__init__.py:46:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <async_timeout.timeout object at 0x7f99fcbaacc0>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type):
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/async_timeout/__init__.py:83: TimeoutError
During handling of the above exception, another exception occurred:
@pytest.mark.asyncio
async def test_non_registered_datacenter_connected():
dc_address = b"0xFFcf8FDEE72ac11b5c542428B35EEF5769C409f0"
communicator = WebsocketCommunicator(VCGConsumer, "ws/DC1/", headers=[(b'wallet', dc_address)])
> connected, subprotocol = await communicator.connect()
ws/tests/test_vcg_consumer.py:13:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/testing/websocket.py:38: in connect
response = await self.receive_output(timeout)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/testing.py:79: in receive_output
self.future.result()
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/utils.py:50: in await_many_dispatch
await dispatch(result)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:110: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/asgiref/sync.py:125: in thread_handler
return self.func(*args, **kwargs)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../.virtualenvs/vcg_api_server/lib/python3.6/site-packages/channels/generic/websocket.py:32: in websocket_connect
self.connect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <ws.consumers.vcg_consumer.VCGConsumer object at 0x7f99fcbaab70>
def connect(self):
"""Fired when the vcg_api client websocket connects.
"""
headers = dict(self.scope['headers'])
wallet_address = headers[b'wallet'].decode('utf-8')
> self.dc_name = self.scope['url_route']['kwargs']['dc_name']
E KeyError: 'url_route'
ws/consumers/vcg_consumer.py:21: KeyError
======================================== 1 failed in 1.21 seconds ========================================
which kind of makes sesnse because when I look inside the WebsocketCommunicator source code, the scope object has no url_route key
Ah, yes, if you use URL params you need to test via a URLRouter. Docs here: http://channels.readthedocs.io/en/latest/topics/testing.html#websocketcommunicator
I have the same issue.
My view has a send method that use async_to_sync
@classmethod
def send_message(cls, serialized_object, message_type):
# get current channel layer and send the announce to all connected client
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)("announcement", {
"type": "send.notification",
"object": "announcement",
"payload": serialized_object,
"crud_type": message_type
})
My test create a "request" with the view, I post data so the channel method is called.
request = factory.post(url, data)
force_authenticate(request, user=superuser)
view = announcement_view.AnnouncementMessageList.as_view()
await view(request)
response_ws = await communicator.receive_from()
And the result
def __call__(self, *args, **kwargs):
# You can't call AsyncToSync from a thread with a running event loop
try:
event_loop = asyncio.get_event_loop()
except RuntimeError:
pass
else:
if event_loop.is_running():
raise RuntimeError(
> "You cannot use AsyncToSync in the same thread as an async event loop - "
"just await the async function directly."
)
E RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
Also, testing inside a class like an apitestcase seems not possible. The async method is never called.
If an async test runs some function which in turn runs something within async_to_sync you can wrap that function in sync_to_async to get rid of the "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly" error.
I got the same message "You cannot use AsyncToSync..." and I fixed it by replacing async_to_sync with a mock, and then putting an async version of the same call to group_send in my test case itself:
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_views():
# need to use context manager because it seems mock decorators
# don't work on async functions
with patch('asgiref.sync.async_to_sync') as mock_a2s:
function_that_calls_async_to_sync()
[group_send], _ = mock_a2s.call_args
args, kwargs = mock_a2s.return_value.call_args
await group_send(*args, **kwargs)
# instantiate WebsocketCommunicator, do asserts...
If your code uses async_to_sync multiple times, then you will need to use call_args_list instead of call_args.
For me, wrapping the function in sync_to_async did not make the "You cannot use AsyncToSync..." error go away.
(I also needed to use @pytest.mark.django_db(transaction=True) to prevent the DB from locking, as described here).
Thanks @oTree-org!
I fixed this issue by mocking async_to_sync with following class:
class AsyncToSyncMock:
def __init__(self):
self.awaitable = None
self.constructor_kwargs = None
self.args = ()
self.kwargs = {}
def __call__(self, awaitable, **kwargs):
self.awaitable = awaitable
self.constructor_kwargs = kwargs
def _inner_call(*i_args, **i_kwargs):
self.args = i_args
self.kwargs = i_kwargs
return _inner_call
async def call(self):
return await self.awaitable(*self.args, **self.kwargs)
Example of usage:
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_steam_login(client, monkeypatch):
async_mock = AsyncToSyncMock()
monkeypatch.setattr(application.module, 'async_to_sync', async_mock)
# Call your sync function.
function_with_aync_to_sync_inside()
# Be careful! async_to_sync is actually evaluted here.
await async_mock.call()
This works fine only if async_to_sync is called once. Hope that will be useful for somebody.
I have the exact same problem. Are there any updates?
If an async test runs some function which in turn runs something within
async_to_syncyou can wrap that function insync_to_asyncto get rid of the "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly" error.
This worked, thanks!
EDIT: Actually, after some testing it seems that the testcase doesn't close the DB connection and I get this error after running the tests:
django.db.utils.OperationalError: database "test_foobar" is being accessed by other users
DETAIL: There is 1 other session using the database.
Is there any workaround?
Most helpful comment
Maybe not directly related, but there's probably a problem with your code. The line
async_to_sync(self.channel_layer.group_add('grpup', self.channel_name))should beasync_to_sync(self.channel_layer.group_add)('grpup', self.channel_name)instead. You want to convert thegroup_addmethod into a synchronous version and call it. See https://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions