channel_layer.send seems to be an async function. So how would I call it from sync code? Should asgiref.sync.async_to_sync be used in such case?
Also docs don't mention it being async.
You will need to wrap it in asgiref.sync.AsyncToSync. I'll mention that in the docs.
New docs are in 358bf6c
Somewhat related question: channel_layer.group_send, channel_layer.group_add are not implemented. Should I open a new issue to track this feature?
No, they are in a half-done state on my machine, it's the last piece of work on asgi_redis 2.0. They are coming, don't worry.
I've got another problem. Wrapping with AsyncToSync works only if we are somewhere in asgi app. If I call wrapped function from another process (management command for example) then it fails with Cannot call async functions without an event loop running. If I call it from another process which has its own asyncio loop then it just hangs.
As example, I have the following function:
def send_email(recipient, subject, text):
channel_layer = get_channel_layer()
AsyncToSync(channel_layer.send)(
'email.dispatcher',
dict(type='dispatch', email=dict(recipient=recipient, subject=subject, text=text))
)
I'd like it to be multipurpose, and avoid writing 3 different versions of this function.
Can something be done about this?
Yes, I can make AsyncToSync run its own tiny event loop when you've not got one running. I've opened an issue here to track it: https://github.com/django/asgiref/issues/26
OK, I have updated asgiref @laevilgenius - if you could pull the master version of it and check you can now do what you want?
Sync code works (both asgi and management command). But async in management command still fails:
If I call it directly it blocks my loop. If I call it with ThreadPoolExecutor it throws You cannot instantiate AsyncToSync inside a thread that wasn't made using SyncToAsync.
It is problematic for me, because send_email is buried inside of big chunk of sync code, which I want to call pretty much everywhere.
Hmm, that's strange, it works in testing. Let me try to replicate it more.
Can you clarify what you mean by blocks my loop? A code example would be great.
Basically it boils down to
class Command(BaseCommand):
def handle(self, *args, **options):
loop = asyncio.get_event_loop()
pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
loop.run_until_complete(self.run(loop, pool))
loop.close()
async def run(self, loop, pool):
(1) send_email('[email protected]', 'test', 'text')
(2) await loop.run_in_executor(pool, functools.partial(send_email, '[email protected]', 'test', 'text'))
(1) hangs because it waits for a concurrent.futures.Future if I understand AsyncToSync code correctly.
(2) throws RuntimeError: You cannot instantiate AsyncToSync inside a thread that wasn't made using SyncToAsync.
I'm not quite sure why you're going in and out of async mode so much; why not do one of these?
Option A: Don't instantiate an async event loop outside and just call your function:
class Command(BaseCommand):
def handle(self, *args, **options):
send_email('[email protected]', 'test', 'text')
Option B: Use AsyncToSync:
class Command(BaseCommand):
def handle(self, *args, **options):
run_sync = AsyncToSync(self.run)
run_sync()
async def run(self):
# The only reason you would do this would be if send_email was its own async function,
# but here I have called it sync anyway.
send_email('[email protected]', 'test', 'text')
I have an async loop which does its own thing - takes events from celery. And I have async handlers for those events which may call some sync code either directly, if it is fast or in executor. In any case if such code has AsyncToSync somewhere - it fails. Hanging is especially dangerous here.
If you either just write the email send handler as native async then, or use SyncToAsync to wrap it, you should not have this problem - the one thing that AsyncToSync will not let you do is run async code inside of a synchronous thread that was NOT made by SyncToAsync.
Yeah, I'll probably go with native async. But hanging issue still bothers me. It would be helpful to check if Future is going to wait in the same thread as asyncio loop and throw an exception, if it is possible.
I'll try and clean that up if I can replicate the environment it happens in (I already fixed one hanging issue yesterday, but it makes my brain hurt a bit so I need a short break)
I am having the same problem, calling async_to_sync from a manage command, in this case python manage.py tests. The relevant code is
class MyHandler(SyncConsumer):
def websocket_connect(self, event):
self.send({ "type": "websocket.accept"})
async_to_sync(self.channel_layer.group_add('grpup', self.channel_name))
Test looks like this -- I use asynctest because I don't want this to be the only test in my project that uses pytest
class ChannelTests(asynctest.TestCase):
async def test_websockets(self):
communicator = WebsocketCommunicator(MyHandler, path)
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
When I run this, I get RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
@andrewgodwin
hi, i hit a strange error
channels == 2.1.3
channels-redis == 2.3.0
daphne -b 0.0.0.0 -p 80 antwork_backend.asgi:channel_layer
i use group_send to send message to websocket by channels 2 outside
the message come from mqtt client, and very frequently
the memory is increase 100M every 10 min
I let the chat_message func in consumer return directly, then the memory stop increase
so i guess if the consumer handle message slower than the message income
if it is how can i handle this, and how can i run more consumer?
i test the chat_message func, it was called ~3s later after the message income.
the send method is:
def send_to_ws(channel, data):
channel_layer = get_channel_layer()
send_data = {'room_name': channel, 'type': 'chat_message', 'message': data, 'username': channel}
async_to_sync(channel_layer.group_send)(channel, send_data)
and the consumer is:
class ChatConsumer(AsyncJsonWebsocketConsumer):
rooms = set()
##### WebSocket event handlers
def get_plane_group_name(self, id):
return "DEV_" + id
async def connect(self):
"""
Called when the websocket is handshaking as part of initial connection.
"""
params = QueryDict(self.scope["query_string"]).dict()
room_name = "all"
if params.get("room_name"):
room_name = self.get_plane_group_name(params.get("room_name"))
self.scope["user_detail"] = {"username": params.get("token")}
await self.accept()
async def receive_json(self, content):
"""
Called when we get a text frame. Channels will JSON-decode the payload
for us and pass it as the first argument.
"""
command = content.get("command", None)
try:
if command == "enter":
# Make them join the room
await self.join_room(content["room_name"])
elif command == "leave":
# Leave the room
await self.leave_room(content["room_name"])
elif command == "send":
await self.send_room(content["room_name"], content["message"])
elif command == "control":
msg = json.dumps(content["message"])
mqtt_content = encode_mqtt_content(content["topic"], msg)
if mqtt_content:
cnt = mq_client.publish(content["topic"], mqtt_content, qos=2)
print("send msg: {}".format(cnt))
elif command == "apply_control":
pass
elif command == "give_control":
pass
except Exception as er:
print("error: {}".format(er))
async def disconnect(self, code):
"""
Called when the WebSocket closes for any reason.
"""
# Leave all the rooms we are still in
for room_id in list(self.rooms):
try:
await self.leave_room(room_id)
except ClientError:
pass
##### Command helper methods called by receive_json
async def join_room(self, room_name):
"""
Called by receive_json when someone sent a join command.
"""
await self.channel_layer.group_send(
room_name,
{
"type": "chat.join",
"room_name": room_name,
"username": self.scope["user_detail"]["username"],
}
)
await self.channel_layer.group_add(
room_name,
self.channel_name,
)
self.rooms.add(room_name)
async def leave_room(self, room_name):
"""
Called by receive_json when someone sent a leave command.
"""
await self.channel_layer.group_send(
room_name,
{
"type": "chat.leave",
"room_name": room_name,
"username": self.scope["user_detail"]["username"],
}
)
# Remove that we're in the room
self.rooms.discard(room_name)
# Remove them from the group so they no longer get room messages
await self.channel_layer.group_discard(
room_name,
self.channel_name,
)
async def send_room(self, room_name, message):
"""
Called by receive_json when someone sends a message to a room.
"""
await self.channel_layer.group_send(
room_name,
{
"type": "chat.message",
"room_name": room_name,
"username": self.scope["user_detail"]["username"],
"message": message,
}
)
# These helper methods are named by the types we send - so chat.join becomes chat_join
async def chat_join(self, event):
"""
Called when someone has joined our chat.
"""
# Send a message down to the client
await self.send_json(
{
"msg_type": settings.MSG_TYPE_ENTER,
"room_name": event["room_name"],
"username": event["username"],
},
)
async def chat_leave(self, event):
"""
Called when someone has left our chat.
"""
# Send a message down to the client
await self.send_json(
{
"msg_type": settings.MSG_TYPE_LEAVE,
"room_name": event["room_name"],
"username": event["username"],
},
)
async def chat_message(self, event):
"""
Called when someone has messaged our chat.
"""
# Send a message down to the client
await self.send_json(
{
"msg_type": settings.MSG_TYPE_MESSAGE,
"room_name": event["room_name"],
"username": event["username"],
"message": event["message"],
},
)
thanks.
Most helpful comment
I've got another problem. Wrapping with
AsyncToSyncworks only if we are somewhere in asgi app. If I call wrapped function from another process (management command for example) then it fails withCannot call async functions without an event loop running. If I call it from another process which has its own asyncio loop then it just hangs.As example, I have the following function:
I'd like it to be multipurpose, and avoid writing 3 different versions of this function.
Can something be done about this?