So I have a scheduled celery beat task (celery.py):
@app.on_after_configure.connect
def setup_periodic_tasks(sender,
**kwargs):
sender.add_periodic_task(10.0, test_event, name='test')
And the task (events/tasks.py):
@shared_task
def test_event():
from .models import Event
Event.objects.create()
When the event is created, a receiver is fired, that should send a message to a channels group (events/receivers.py):
@receiver(post_save, sender=Event)
def event_post_add(sender, instance, created, *args, **kwargs):
if created:
print("receiver fired")
Group("test").send({
"text": json.dumps({
'type': 'test',
})
})
The main problem is that the receiver is being fired in the celery beat process, and nothing is getting sent via django channels. No error messages, nothing, it's simply not being sent.
I've tried without celery beat - it even stops my django process from responding to API calls.
I've also tried with runserver and running runworker and daphne separately. Same issue.
How can I integrate these two so that I will be able to send messages to channels from celery background processes?
My package versions:
Django==1.10.4
channels==1.0.0
djangorestframework-jwt==1.9.0
asgi-redis==1.0.0
daphne==1.0.1
celery==4.0.2
What CHANNEL_LAYERS django setting is used? Did you provide DJANGO_SETTINGS_MODULE environment variable to the running celery worker? Did you call django.setup after celery worker start?
The celery process probably has no running consumer. Then the messages will be buffered forever. Try using
python
Group("test").send({
"text": json.dumps({
'type': 'test',
})
}, immediately=True)
@AlexejStukov Thanks, that did the trick, it's working now. What do you mean by not having a running consumer and how does immediately=True help the situation?
Also since these receivers could be called from elsewhere, not only from celery background processes, how does this change the way my application works?
Ah, I understand. By default Channel.send and Group.send store message in the outgoing queue instead of actually sending message. This queue will be processed at the end of consumer by channels worker. You can send channels.signals.consumer_finished signal at the end of base task so you don't need type immediately=True each time. Or you can use celery signals to emit channels signals.
Actually, @proofit404, the send messages only buffer now if they're not in a consumer - I suspect @xtrinch is not on version 1.0.2, where this was fixed.
True, updating fixed it. Thank you both!
@andrewgodwin good to know. It did not occur to me, that this was a bug.
channels==1.1.1 also need immediately=True
@wangwanzhong In what context? If you are inside a consumer messages are still buffered until the consumer exits.
@wangwanzhong check you're running with the Redis backend, else you won't be able to find your channels.
Most helpful comment
The celery process probably has no running consumer. Then the messages will be buffered forever. Try using
python Group("test").send({ "text": json.dumps({ 'type': 'test', }) }, immediately=True)