Channels: Cannot instantiate async_to_sync inside a SyncConsumer method

Created on 23 Feb 2018  路  8Comments  路  Source: django/channels

I'm getting this error from the worker (BTBitstampTrades). Any clue?

I think is because I'm defining the callback function inside the Worker method.

How could I send something to the channel layer from inside the callback def?

error from callback

My code:

class WsTradeHistory(JsonWebsocketConsumer):
    def connect(self):
        self.accept()
        async_to_sync(self.channel_layer.group_add)("users", self.channel_name)
        print('USUARIO CONECTADO' + self.channel_name)

    def disconnect(self, code):
        async_to_sync(self.channel_layer.group_discard)("users", self.channel_name)

    def receive_json(self, content, **kwargs):
        if content['group_request']:
            group_req = content['group_request']
            async_to_sync(self.channel_layer.group_add)(group_req, self.channel_name)
            if group_req == 'th_bitstamp_BTC_USD':
                print('BITSTAMP TRADES!')
                # self.bitstamp_trades('')
                async_to_sync(self.channel_layer.send)(
                    "bitstamp_trades",
                    {
                        "type": "get.data",
                    }
                )

    def trade_data(self, event):
        print('Enviando trade data al Usuario')
        self.send_json(json.loads(event["text"]))

    def one_trade(self, event):
        self.send_json(json.loads(event["text"]))

class BTBitstampTrades(SyncConsumer):

    def get_data(self, event):
        print('START WORKER')

        def callback(*args, **kwargs):
            async_to_sync(self.channel_layer.send)(
                "th_bitstamp_BTC_USD",
                {
                    "type": "one.trade",
                    "text": args[0]
                }
            )

        # We can't subscribe until we've connected, so we use a callback handler
        # to subscribe when able
        def connect_handler(data):
            channel = pusher.subscribe('live_trades')
            channel.bind('trade', callback)

        pusher = pysher.Pusher('de504dc5763aeef9ff52')
        pusher.connection.bind('pusher:connection_established', connect_handler)
        pusher.connect()

Ubuntu 16.04, Google Chrome, Django y Channels 2.02, Python 3.6

Most helpful comment

I just advise that you upgrade packages you're finding issues in before reporting an issue. It's a bit confusing for Channels as we have four - asgiref, daphne, channels and channels_redis.

All 8 comments

No, i don think it is to do with it being inside a nested def but rather that callback is called by pysher.Pusher and that is happing after def get_data(self, event): has completed
(your get_data method is automaticly wrapped in a SyncToAsync)

try:

def callback(*args, **kwargs):
    if getattr(SyncToAsync.threadlocal, "main_event_loop", None) is None:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(
            self.channel_layer.send(
                "th_bitstamp_BTC_USD",
                {
                    "type": "one.trade",
                    "text": args[0]
                }
            )
        )
        loop.close()
    else:
        async_to_sync(self.channel_layer.send)(
            "th_bitstamp_BTC_USD",
            {
                "type": "one.trade",
                "text": args[0]
            }
        )

Thank you for your quick response @hishnash

I did it and I got this error:

error from callback

Some general context: BTBitstampTrades is a worker, pysher is a library to consume third party Pusher websocket.

I'm trying that the method get_data() of the worker send the pysher response to a channel group via channel layer.

Please ignore the response @hishnash posted, it's not correct - that's just part of what the functions do for you. The correct solution is to upgrade your version of asgiref, as this problem was fixed in version 2.1.6.

Thank you for answer so quick.

I updated and I'm getting this error right now:

Task was destroyed but it is pending! task: <Task pending coro=<RedisConnection._read_data() done, defined at /home/pedro/anaconda2/envs/snowflakes/lib/python3.6/site-packages/aioredis/connection.py:175> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f766a6b53a8>()]> cb=[Future.set_result()]>

My code:

class WsTradeHistory(JsonWebsocketConsumer):

    def connect(self):
        self.accept()
        async_to_sync(self.channel_layer.group_add)("users", self.channel_name)
        print('USUARIO CONECTADO' + self.channel_name)

    def disconnect(self, code):
        async_to_sync(self.channel_layer.group_discard)("users", self.channel_name)

    def receive_json(self, content, **kwargs):
        if content['group_request']:
            group_req = content['group_request']
            async_to_sync(self.channel_layer.group_add)(group_req, self.channel_name)
            if group_req == 'th_bitstamp_BTC_USD':
                print('BITSTAMP TRADES!')
                async_to_sync(self.channel_layer.send)(
                    "bitstamp_trades",
                    {
                        "type": "get.data",
                    }
                )

    def trade_data(self, event):
        print('Enviando trade data al Usuario')
        self.send_json(json.loads(event["text"]))

    def one_trade(self, event):
        print('ONE TRADE')
        self.send_json(json.loads(event["text"]))

class BTBitstampTrades(SyncConsumer):

    def get_data(self, event):
        print('START WORKER')

        def callback(*args, **kwargs):
            print(args)
            async_to_sync(self.channel_layer.send)(
                "th_bitstamp_BTC_USD",
                {
                    "type": "one.trade",
                    "text": args[0]
                }
            )

        # We can't subscribe until we've connected, so we use a callback handler
        # to subscribe when able
        def connect_handler(data):
            channel = pusher.subscribe('live_trades')
            channel.bind('trade', callback)

        pusher = pysher.Pusher('de504dc5763aeef9ff52')
        pusher.connection.bind('pusher:connection_established', connect_handler)
        pusher.connect()

Have you updated channels_redis?

Hi @andrewgodwin I updated it and work! Thank you a lot.

I wish in the future be a pro like you guys @andrewgodwin @hishnash

This is my pip dependencies right now. Should I upgrade something else?

aiodns==1.1.1
aiohttp==2.3.10
aioredis==1.0.0
amqp==2.2.2
asgiref==2.1.6
asn1crypto==0.24.0
async-timeout==2.0.0
attrs==17.4.0
autobahn==17.10.1
Automat==0.6.0
Babel==2.5.3
billiard==3.5.0.3
biopython==1.69
cchardet==2.1.1
ccxt==1.10.1019
celery==4.1.0
certifi==2018.1.18
cffi==1.11.4
channels==2.0.2
channels-redis==2.1.0
chardet==3.0.4
constantly==15.1.0
cryptography==2.1.4
daphne==2.0.4
Django==2.0.2
django-celery-beat==1.1.1
django-celery-results==1.0.1
flower==0.9.2
hiredis==0.2.0
hyperlink==17.3.1
idna==2.6
idna-ssl==1.0.0
incremental==17.5.0
kombu==4.1.0
msgpack==0.5.6
multidict==4.1.0
mysqlclient==1.3.12
ndg-httpsclient==0.4.4
numpy==1.13.3
pandas==0.22.0
pyasn1==0.4.2
pyasn1-modules==0.2.1
pycares==2.3.0
pycparser==2.18
pyOpenSSL==17.5.0
Pysher==0.3.0
python-dateutil==2.6.1
pytz==2018.3
redis==2.10.6
requests==2.18.4
service-identity==17.0.0
six==1.11.0
tornado==4.5.3
Twisted==17.9.0
txaio==2.8.2
urllib3==1.22
vine==1.1.4
websocket-client==0.47.0
yarl==1.1.0
zope.interface==4.4.3

I just advise that you upgrade packages you're finding issues in before reporting an issue. It's a bit confusing for Channels as we have four - asgiref, daphne, channels and channels_redis.

Understood. I will follow your advice. Sorry.

Have a nice weekend.

Was this page helpful?
0 / 5 - 0 ratings