celery -A proj report
in the issue.master
branch of Celery.tasks.py
from celery import Celery
celery = Celery('tasks', broker='pyamqp://guest@localhost//')
@celery.task
def add(x, y):
return x + y
@celery.on_after_configure.connect
def add_periodic():
celery.add_periodic_task(10.0, add.s(2,3), name='add every 10')
if __name__ == '__main__':
add_periodic()
step 1: rabbitmq is up
rabbitmq 1186 1 0 Nov12 ? 00:00:00 /bin/sh /usr/sbin/rabbitmq-server
step2: execute tasks.py
python tasks.py
step3: start beat worker
celery -A tasks -l info beat
celery beat v4.0.0 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2016-11-12 17:37:58
Configuration ->
. broker -> amqp://guest:**@localhost:5672//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2016-11-12 17:37:58,912: INFO/MainProcess] beat: Starting...
I expect the scheduler to trigger add() function every ten seconds.
The add() function doesn't get triggered.
I don't see any exception in the terminal. Do I miss anything?
I had the same problem :(
Your example works well for me.
NOTE: Your signal handler needs to accept **kwargs, failing to do so will be an error in the future.
Using your example
# file: tasks.py
from celery import Celery
celery = Celery('tasks', broker='pyamqp://guest@localhost//')
@celery.task
def add(x, y):
return x + y
@celery.on_after_configure.connect
def add_periodic(**kwargs):
celery.add_periodic_task(10.0, add.s(2,3), name='add every 10')
I start the beat service as follows:
$ celery -A tasks beat -l debug
celery beat v4.0.0 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2016-12-01 11:54:56
Configuration ->
. broker -> amqp://guest:**@localhost:5672//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%DEBUG
. maxinterval -> 5.00 minutes (300s)
[2016-12-01 11:54:56,511: DEBUG/MainProcess] Setting default socket timeout to 30
[2016-12-01 11:54:56,511: INFO/MainProcess] beat: Starting...
[2016-12-01 11:54:56,517: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>
<ScheduleEntry: add every 10 tasks.add(2, 3) <freq: 10.00 seconds>
[2016-12-01 11:54:56,517: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2016-12-01 11:54:56,528: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@grain', 'platform': 'Erlang/OTP', 'version': '3.6.4'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2016-12-01 11:54:56,531: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.add)
[2016-12-01 11:54:56,534: DEBUG/MainProcess] using channel_id: 1
[2016-12-01 11:54:56,535: DEBUG/MainProcess] Channel open
[2016-12-01 11:54:56,537: DEBUG/MainProcess] beat: Synchronizing schedule...
[2016-12-01 11:54:56,537: DEBUG/MainProcess] tasks.add sent. id->af224838-cf72-4d0d-9076-1c39cdbeffb8
[2016-12-01 11:54:56,537: DEBUG/MainProcess] beat: Waking up in 9.97 seconds.
[2016-12-01 11:55:06,519: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.add)
[2016-12-01 11:55:06,520: DEBUG/MainProcess] tasks.add sent. id->907cf307-e36f-455a-97a8-441c79b8ab92
Hi, I have the same issue. But I try to start celery programmatically in a thread. maybe it is the cause.
This is my thread:
from __future__ import absolute_import, unicode_literals
import threading
from celery import current_app
from celery.bin import worker
app = current_app._get_current_object()
class CeleryThread(threading.Thread):
def __init__(self):
super(CeleryThread, self).__init__()
self.app = app
self.worker = worker.worker(app=self.app)
self.options = {
'broker': 'amqp://guest:guest@localhost:5672//',
'loglevel': 'INFO',
'traceback': True,
}
app.add_periodic_task(5.0, test.s('hello'), name='add every 10')
def run(self):
self.worker.run(**self.options)
@app.task
def test(args1):
print args1
And the main.py to launch this
celery_thread = CeleryThread()
# used to kill the thread when the main program stop
# celery_thread.daemon = True
celery_thread.start()
My console output is
-------------- celery@ubuntu v4.0.0 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-51-generic-x86_64-with-Ubuntu-16.04-xenial 2016-12-03 14:33:10
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: default:0x7f75775bfc50 (.default.Loader)
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. kalliope.core.CrontabManager2.CeleryThread.test
[2016-12-03 14:33:10,458: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2016-12-03 14:33:10,466: INFO/MainProcess] mingle: searching for neighbors
[2016-12-03 14:33:11,486: INFO/MainProcess] mingle: all alone
[2016-12-03 14:33:11,515: INFO/MainProcess] celery@ubuntu ready.
Do I forget an option? I can see you have a "scheduler" set in your output @ask
Thanks by advance for any help.
The same config with @liuallen1981 and the same issue. Anyone figures out what's happening ?. For now I have to use
celery.conf.beat_schedule = {
'do-something-periodically': {
'task': 'tasks.my_task',
'schedule': 3.0,
},
}
instead of using a setup_periodic_tasks
function with on_after_configure.connect
decorator.
+1 Also having this issue.
+1 Also having this issue.
Celery version 4.0.2 (latentcall)
+1 Also having this issue.
+1 Also having this issue. Went on and tested with @liuallen1981's code and get the same result as with my own code.
Celery: 4.0.2
To run periodic tasks, you have to invoke also scheduler when starting a worker using -B
option:
celery -A proj worker -B
When using celery in django applications, where tasks are autodiscovered from apps, you need to use on_after_finalize
signal instead of on_after_configure
.
References:
http://stackoverflow.com/questions/40712678/setup-periodic-task/40727526
http://stackoverflow.com/questions/41119053/connect-new-celery-periodic-task-in-django
-B is not for production and simply starts the Beats scheduler which at least in my case is already running.
+1 having the same issue with Celery(4.0.2)
Same issue here....
you just start a beat service, should also start a worker to do the task.
+1
same issue here
same issue here,
and I try to print something inside the callback, seems the callback haven't been called, but the RabbitMQ is working (works fine when I trigger a task in code )
@celery.on_after_configure.connect
def setup_periodic_tasks(**kwargs):
print('after connect')
(py35) ➜ celery -A celery.beat beat
celery beat v4.0.2 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2017-08-08 02:42:18
Configuration ->
. broker -> amqp://**:**@**:5672//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
I use Celery config celery.conf.beat_schedule
instead of dynamically add_periodic_task
to solve this, since I don't have to set schedule dynamically, but still don't know why this issue is happening.
I stepped through the library and found that my signal listener was being created/attached after the on_after_configure
signal was fired. (I was placing my signal listener in app/tasks.py
and it was not working.)
I reasoned that Django's app ready signal would probably execute after Celery configuration and it is working well for me so far.
NOTE: I am not sure what celery configuration actually entails and whether it is possible that app.ready could fire before Celery is configured... however, I expect it would at least throw a runtime error.
Sample code from my app/apps.py
:
from django.apps import AppConfig
import django.db.utils
from celery_app import app as celery_app
from celery.schedules import crontab
import utils.cron
class MyAppConfig(AppConfig):
name = 'app'
verbose_name = "MyApp"
def ready(self):
print("MyAppConfig.ready invoked.")
import app.signals
print("* * * Setting up periodic tasks!")
import app.models
import app.tasks
for cron in app.models.CronTask.objects.all():
celery_app.add_periodic_task(
crontab(**utils.cron.parse_crontab_expression_to_celery(cron.frequency)),
app.tasks.do_cron.s(cron.id),
name='do cron'
)
Note you also need to point INSTALLED_APPS
to using your new AppConfig
in settings.py
:
INSTALLED_APPS = [
# ...
'app.apps.MyAppConfig',
]
A good approach or fix would probably be to write a new decorator that 1) checks if Celery is already configured and if so executes immediately and 2) if Celery is not configured adds the listener using the @celery.on_after_configure.connect
.
As it stands, the docs are problematic since so many of us ran into this issue.
CCing @rustanacexd @viennadd just so you can try this fix if you still need to dynamically schedule tasks?
Putting my two cents out there, I got bit by this and ended up having to reorder some of my tasks. We have about 8 scheduled tasks that are supposed to fire, however, I noticed that the following would happen:
Example:
@celery.on_after_configure.connect
def setup_periodic_tasks(**kwargs):
celery.add_periodic_task(5.0, do_thing_b.s(), name='Test B')
celery.add_periodic_task(4.9, do_thing_a.s(), name='Test A')
Ordering them like this means that do_thing_a
would never fire, as it would be overwritten by do_thing_b
. Originally they were both set to 5
, although only one would fire (I believe in this case it would have been B as it was added first). Next what I did was change it to a decimal and offset it by .1
to see if that would fix it. No dice. Then I ordered them so the lower one would fire first and the higher one would fire second and that ended up fixing it. I.e:
@celery.on_after_configure.connect
def setup_periodic_tasks(**kwargs):
celery.add_periodic_task(4.9, do_thing_b.s(), name='Test B')
celery.add_periodic_task(5.0, do_thing_a.s(), name='Test A')
We're also using some crontab()
s, though those are sort of a mystery to get running as some work and some don't, I suspect it is the same issue as above. I haven't completely played around with it, as those intervals are generally set to occur every X hours/days, so I usually forget they exist.
Maybe this kind of behavior is mentioned in the documentation, or I'm going down a different rabbit hole, though this behavior doesn't make much sense. For reference, we're using Redis instead of RMQ and celery 4.1.0.
I was able to make this work. Check my answer here:
@prasanna-balaraman That does seem to work, thank you for the suggestion!
Same issue for me : I will test the another solution : https://stackoverflow.com/a/41119054/6149867
closing. if it still appears and any one have any code or docs suggestions plz feel free to send a pr referencing this issue.
It took me a while to realize that if there is any exception raised in setup_periodic_tasks, it will get silently suppressed.
The function is called here: https://github.com/celery/celery/blob/master/celery/app/base.py#L950
If anything goes wrong, the exception is only saved in responses, no re-raise or log:
https://github.com/celery/celery/blob/master/celery/utils/dispatch/signal.py#L276
So my suggestion is to keep setup_periodic_tasks as simple as possible.
Hope this helps!
@chiang831 do you have any suggestions to improve it? if so plz send a pr or open a discussion on celery-users mailing list
Defining them in on_after_finalize
is what worked for me (non-django celery app).
@app.on_after_finalize.connect
def app_ready(**kwargs):
"""
Called once after app has been finalized.
"""
sender = kwargs.get('sender')
# periodic tasks
speed = 15
sender.add_periodic_task(speed, update_leases.s(),
name='update leases every {} seconds'.format(speed))
Just ran into this and none of the previous solutions worked for me. The exact scenarios that cause this are confusing and rely on the behavior of ref-counting/gc and the exact lifetimes of your decorated functions.
Signal.connect by default only holds a weak reference to the signal handler. This makes sense for other use cases of the Signal object (a short lived object that wires signals shouldn't be held alive by its signal handlers), but is very surprising in this case.
My specific use case was a decorator to make it easy to add new periodic tasks:
def call_every_5_min(task):
@app.on_after_configure.connect
def register_task(sender, **_):
sender.add_periodic_task(collect_every_m*60, task.signature())
@call_every_5_min
@task
def my_celery_task(_):
pass
The fix is to explicitly ask for a strong reference:
def call_every_5_min(task):
def register_task(sender, **_):
sender.add_periodic_task(collect_every_m*60, task.signature())
app.on_after_configure.connect(register_task, weak=False)
The example in the docs only works if your decorated function is at module or class scope, in which case the module or class continues to hold a strong reference to the function. Otherwise the only strong reference will die at the end of the scope it's defined in.
I recommend changing the docs to pass weak=False
, which should work in the cases listed above. I have not explicitly tested this in a Django context though.
To run periodic tasks, you have to invoke also scheduler when starting a worker using
-B
option:
celery -A proj worker -B
When using celery in django applications, where tasks are autodiscovered from apps, you need to use
on_after_finalize
signal instead ofon_after_configure
.References:
http://stackoverflow.com/questions/40712678/setup-periodic-task/40727526
http://stackoverflow.com/questions/41119053/connect-new-celery-periodic-task-in-django
My process of python -m celery -A app_name worker -l info --autoscale=20,5 -BE
blocked at the end of app_name.celery.py
when use on_after_finalize
.
The same config with @liuallen1981 and the same issue. Anyone figures out what's happening ?. For now I have to use
celery.conf.beat_schedule = { 'do-something-periodically': { 'task': 'tasks.my_task', 'schedule': 3.0, }, }
instead of using a
setup_periodic_tasks
function withon_after_configure.connect
decorator.
This works for me instead.
if you're trying to solve this issue then reboot your docker engine first, it may be signals system bug
should we close this issue as not a bug?
@auvipy not sure. Looks like it's a celery bug
It is a bug we must fix.
Most helpful comment
To run periodic tasks, you have to invoke also scheduler when starting a worker using
-B
option:celery -A proj worker -B
When using celery in django applications, where tasks are autodiscovered from apps, you need to use
on_after_finalize
signal instead ofon_after_configure
.References:
http://stackoverflow.com/questions/40712678/setup-periodic-task/40727526
http://stackoverflow.com/questions/41119053/connect-new-celery-periodic-task-in-django