celery -A proj report
in the issue.master
branch of Celery.# tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
I am testing the case when rabbitmq is down. So I didn't start rabbitmq at all. Then, I run the following commands in ipython:
from tasks import add
add.delay()
add.delay()
is always blocked. I tried to set retry
to False, but it doesn't work.
add.apply_async((2, 2), retry=False)
I also tried to provide a retry_policy and it also doesn't work.
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
I expect to see an exception when I set retry to False. When I set retry to True, it should run the default number of max_retries and if not working, it should throw an exception.
The function runs forever and blocks the process.
I press ctrl+c
to get the following error inforamtion:
In [1]: from tasks import add, mul
In [2]: add.delay(2,3)
^C---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in __call__(self)
35 try:
---> 36 return self.__value__
37 except AttributeError:
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
ConnectionRefusedError Traceback (most recent call last)
~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
337 try:
--> 338 return fun(*args, **kwargs)
339 except catch as exc:
~/github/celery-demo/.venv/src/kombu/kombu/connection.py in connect(self)
260 self._closed = False
--> 261 return self.connection
262
~/github/celery-demo/.venv/src/kombu/kombu/connection.py in connection(self)
801 self._default_channel = None
--> 802 self._connection = self._establish_connection()
803 self._closed = False
~/github/celery-demo/.venv/src/kombu/kombu/connection.py in _establish_connection(self)
756 self._debug('establishing connection...')
--> 757 conn = self.transport.establish_connection()
758 self._debug('connection established: %r', self)
~/github/celery-demo/.venv/src/kombu/kombu/transport/pyamqp.py in establish_connection(self)
129 conn.client = self.client
--> 130 conn.connect()
131 return conn
~/github/celery-demo/.venv/lib/python3.6/site-packages/amqp/connection.py in connect(self, callback)
281 )
--> 282 self.transport.connect()
283 self.on_inbound_frame = self.frame_handler_cls(
~/github/celery-demo/.venv/lib/python3.6/site-packages/amqp/transport.py in connect(self)
108 def connect(self):
--> 109 self._connect(self.host, self.port, self.connect_timeout)
110 self._init_socket(
~/github/celery-demo/.venv/lib/python3.6/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
149 self.sock.settimeout(timeout)
--> 150 self.sock.connect(sa)
151 except socket.error:
ConnectionRefusedError: [Errno 61] Connection refused
During handling of the above exception, another exception occurred:
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-2-e2f2f196d9be> in <module>()
----> 1 add.delay(2,3)
~/github/celery-demo/.venv/src/celery/celery/app/task.py in delay(self, *args, **kwargs)
414 celery.result.AsyncResult: Future promise.
415 """
--> 416 return self.apply_async(args, kwargs)
417
418 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
~/github/celery-demo/.venv/src/celery/celery/app/task.py in apply_async(self, args, kwargs, task_id, producer, link, link_error, shadow, **options)
537 link=link, link_error=link_error, result_cls=self.AsyncResult,
538 shadow=shadow, task_type=self,
--> 539 **options
540 )
541
~/github/celery-demo/.venv/src/celery/celery/app/base.py in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, router, result_cls, expires, publisher, link, link_error, add_to_parent, group_id, retries, chord, reply_to, time_limit, soft_time_limit, root_id, parent_id, route_name, shadow, chain, task_type, **options)
747 with P.connection._reraise_as_library_errors():
748 self.backend.on_task_call(P, task_id)
--> 749 amqp.send_task_message(P, name, message, **options)
750 result = (result_cls or self.AsyncResult)(task_id)
751 if add_to_parent:
~/github/celery-demo/.venv/src/celery/celery/app/amqp.py in send_task_message(producer, name, message, exchange, routing_key, queue, event_dispatcher, retry, retry_policy, serializer, delivery_mode, compression, declare, headers, exchange_type, **kwargs)
552 delivery_mode=delivery_mode, declare=declare,
553 headers=headers2,
--> 554 **properties
555 )
556 if after_receivers:
~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in publish(self, body, routing_key, delivery_mode, mandatory, immediate, priority, content_type, content_encoding, serializer, headers, compression, exchange, retry, retry_policy, declare, expiration, **properties)
179 body, priority, content_type, content_encoding,
180 headers, properties, routing_key, mandatory, immediate,
--> 181 exchange_name, declare,
182 )
183
~/github/celery-demo/.venv/src/kombu/kombu/connection.py in _ensured(*args, **kwargs)
492 for retries in count(0): # for infinity
493 try:
--> 494 return fun(*args, **kwargs)
495 except conn_errors as exc:
496 if got_connection and not has_modern_errors:
~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare)
185 headers, properties, routing_key, mandatory,
186 immediate, exchange, declare):
--> 187 channel = self.channel
188 message = channel.prepare_message(
189 body, priority, content_type,
~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in _get_channel(self)
207 channel = self._channel
208 if isinstance(channel, ChannelPromise):
--> 209 channel = self._channel = channel()
210 self.exchange.revive(channel)
211 if self.on_return:
~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in __call__(self)
36 return self.__value__
37 except AttributeError:
---> 38 value = self.__value__ = self.__contract__()
39 return value
40
~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in <lambda>()
222 connection = channel
223 self.__connection__ = connection
--> 224 channel = ChannelPromise(lambda: connection.default_channel)
225 if isinstance(channel, ChannelPromise):
226 self._channel = channel
~/github/celery-demo/.venv/src/kombu/kombu/connection.py in default_channel(self)
829
830 # make sure we're still connected, and if not refresh.
--> 831 self.ensure_connection(**conn_opts)
832 if self._default_channel is None:
833 self._default_channel = self.channel()
~/github/celery-demo/.venv/src/kombu/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors)
403 (), {}, on_error, max_retries,
404 interval_start, interval_step, interval_max,
--> 405 callback)
406 return self
407
~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
348 if callback:
349 callback()
--> 350 sleep(1.0)
351 # sleep remainder after int truncation above.
352 sleep(abs(int(tts) - tts))
KeyboardInterrupt:
I found the main issue happens at kombu/kombu/messaging.py::_publish
, which eventually reaches kombu/kombu/connection.py::default_channel
. The default_channel
calls kombu/kombu/connection.py::ensure_connection
according to the max_retries
, interval_start
, interval_step
, interval_max
in transport_options
.
When transport_options
is {} and the broker is down, self.ensure_connection(**conn_opts)
in default_channel
runs forever.
It seems that there are two retrying wrappers in kombu/messaging.py::Producer.publish
. One is based on retry
and retry_policy
provided by the celery amqp and another is based on the transport_options
.
My current solution is to add retrying configurations to transport_options
as well.
app.conf.broker_transport_options = {
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
Hi,
I have hit a problem that sounds similar to yours:
https://github.com/celery/kombu/issues/795
For me as a workaround, using Kombu 4.0.2 has worked.
@HengfengLi what version are you on?
@markddavidoff It's celery (4.1.0)
.
I ran into this issue as well while trying to get Jobtastic updated to support celery 4. I needed to peg the version of Kombu in order to get tests to not hang forever.
So, in lower versions of kombu/celery, the timeouts aren't all passed to redis-py correctly so in my case I ended up making a forked version of redis-py and overriding the timeouts manually
https://github.com/celery/kombu/pull/769 this should fix the issue
I don't have the issue solved. I'm using celery 4.1 and kombu 4.1. Should I use a newer kombu version?
When there is some problem with workers, the program that manage the task at some point send a task. The code of it is app.send_task('module.name', [])
and it frezzes there. It should gives an error of conection or something. Not be blocked there for whole one day. Maybe I'm missing a configuration? I'm using rabbitMQ and a database backend, as rpc gave me lots of problems.
wait for 4.2 release
Why? Is this a bug of the 4.1.0 release? In the meantime should I downgrade to 4.0.x? Thanks
in thee mean time try to install from github master
We're also seeing a similar issue using celery & kombu from their master branches. When the redis broker is down, apply_async()
will block until redis becomes available again. I've experimented with different broker_transport_options, to no avail.
how to set delay of 5mnutes in the below code
from flask import Flask, Blueprint, abort, jsonify, request, session
import settings
from celery import Celery
app = Flask(__name__)
app.config.from_object(settings)
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, args, *kwargs):
with app.app_context():
return TaskBase.__call__(self, args, *kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
@celery.task(name="tasks.add")
def add(x, y):
return x + y
@app.route("/test")
def hello_world(x=16, y=16):
x = int(request.args.get("x", x))
y = int(request.args.get("y", y))
res = add.apply_async((x, y))
context = {"id": res.task_id, "x": x, "y": y}
result = "add((x){}, (y){})".format(context['x'], context['y'])
goto = "{}".format(context['id'])
return jsonify(result=result, goto=goto)
@app.route("/test/result/
def show_result(task_id):
retval = add.AsyncResult(task_id).get(timeout=5.0)
return repr(retval+"mmk")
if __name__ == "__main__":
port = int(environ.get("PORT", 5000))
app.run(host='0.0.0.0', port=port, debug=True)
Even with celery 4.1.1 i'm still experiencing hanging when the broker (RabbitMQ) is down.
what do you face in 4.2rc4? this isn't in 4.1.x branch
This sounds a lot like #4627, which is still a problem even on master.
thanks @clokep
So 4,2rc4 wouldn't work ? Or how do I get that version ?
please try that version and report
I am seeing what looks very much like the same problem. I am using celery 4.1.1 and kombu 4.2.1.
Later: Actually in my case, the server - redis - should be up and responding. celery beat seems to trigger events fine, but calling delay methods hangs forever. Will continue investigating from next week just in case I have done something else wrong.
this should be fixed in 4.2rc4, care to try that? we want to know it's current status
Asif Saifuddin Auvi notifications@github.com writes:
this should be fixed in 4.2rc4, care to try that?
Will do so on Tuesday (Monday is a public holiday here and already
Brian May brian@linuxpenguins.xyz
https://linuxpenguins.xyz/brian/
ok thanks
Nevermind, turns out my problem was different. I wasn't correctly initializing Celery from Django, so it silently defaulted to using amqp instead of redis, where it silently hangs trying to connect to an AMQP socket that isn't actually listening.
please open a new issue after trying celery 4.2
I have tried with:
celery==4.2.0
kombu==4.2.1
and also needed to specify broker_transport_options
in order for apply_async
to not block forever when Redis is down.
Still having this issue:
The main process hangs up and no exceptions are raised.
Update:
solved with:
app.conf.broker_transport_options = {
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
Still an issue in case broker was working, then died and we send some task just after this moment:
celery: 4.2.1
kombu: 4.2.1
broker: rabbitmq
Steps to reproduce:
I tried to do some digging. My conclusions:
-- Celery is asking for Connection's object default_channel
property.
-- default_channel knows about broker_transport_options and calls -> ensure_connection()
->retry_over_time()
with broker_transport_options.
This function tries to do return fun(*args, **kwargs)
where fun is bound method Connection.connect.
And this function successfully return Connection object/.
so default_channel returns self._default_channel
and celery is trying to something with it.
But after series of processing steps on_task_call
->maybe_declare
-> _imaybe_declare
from kombu.common is called. It took Connection object from Queue and calls its ensure() method. But it has no idea about broker_transport_options so method ensure
is called without broker_transport_options
and uses the default values - max_retries=None
which means infinite retries :(
I've added processing broker_transport_options in kombu.connection.Connection ensure() method
so it uses broker_transport_options every time it checks connection to broker but I have no idea is it the right fix for this problem or not.
celery: 4.2.1
kombu: 4.2.1
Minimal example reproducing the problem:
from celery import Celery
celery_app = Celery('lol', broker='LOL_NOT_VALID')
celery_app.send_task('test')
I got the similar issue. The problem was that I forgot to add this.
Then you need to import this app in your proj/proj/__init__.py module. This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it:
proj/proj/__init__.py:
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
can anyone come with a suitable solution?
I think we worked around this by providing transport options that only retry a certain number of times (by default Celery tries forever):
# Try 5 times. Initially try again immediately, then add 0.5 seconds for each
# subsequent try (with a maximum of 3 seconds). This comes out to roughly 3
# seconds of total delay (0, 0.5, 1, 1.5).
CELERY_BROKER_TRANSPORT_OPTIONS = {
'max_retries': 4,
'interval_start': 0,
'interval_step': 0.5,
'interval_max': 3,
}
I can try to find more information if that isn't clear.
BROKER_TRANSPORT_OPTIONS fixed it for me, instead of CELERY_BROKER_TRANSPORT_OPTIONS. Using celery 4.2.1, kombu 4.2.1 and django 1.8.
Whether you use BROKER_TRANSPORT_OPTIONS
vs. CELERY_BROKER_TRANSPORT_OPTIONS
vs. broker_transport_options
etc. depends somewhat on how you configure your Celery application. Glad it worked for you though! 馃憤
This is pretty dumb, but for me using Flask, I had done this:
app.py
...
app = Flask(__name__)
app.config.from_object(config[config_name])
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
...
tasks.py:
import celery
@celery.task(bind=True)
def my_task():
...
I needed to do:
app.py
...
app = Flask(__name__)
app.config.from_object(config[config_name])
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
app.celery = celery
...
tasks.py
from flask import current_app
celery = current_app.celery
@celery.task(bind=True)
def my_task():
...
This solved my issue of infinite hanging, but honestly I think that given that my setup was messed up, it would have been better for celery to tell me that.
The real solution is here: https://github.com/celery/celery/issues/4627#issuecomment-396907957
Just to let you know that I have the same default behaviour with:
The CELERY_BROKER_TRANSPORT_OPTIONS
setting works well.
Same here I'm using:
Was the problem reintroduced?
@jreed1701 I'm seeing the same issue as you, and we are getting the same traceback as the one mentioned in this thread, but I don't think it is for the same reason. For me, apply_async
and delay
are exhibiting this behavior for @shared_task
functions but not @app.task
functions. Same traceback as the one discussed in this thread, but it has nothing to do with the state of the broker.
Hi @chrisconlan, funny you point this out. I arrived at this conclusion yesterday when I modified my software to use a factory/extension pattern. I was using the @shared_task method you pointed out, but moved to the @app.task method when I changed to the new pattern. The change in implementation on my part moved I create a celery instance without an application/task context, and use it to decorate my task functions at import time. Then later at runtime I replace the same celery object/instance task context with the appropriate app context, update it with config inputs, and then run it. The infinite hangup behavior ceased and the apply_async executed even though I had no broker connected. I wonder what about changing the context free'd this all up?
@jreed1701 Not sure about you, but I am using celery
through django
. I don't stray too far from the documented usage on that front because I don't want to break a deployment. Like you said, I'm not sure that anything is fundamentally broken about celery here, but I think the django-celery docs need an update to make sure everyone is following best practices.
@jreed1701 You gave me a great clue regarding celery context. I fixed it with the following ...
From the terminal, run python manage.py shell
to initialize a Django shell, then ...
# Make sure to initialize celery context
from django_project.celery import app
# Now you can import shared tasks
from other_app.tasks import the_shared_task
# Now the shared task won't hang
the_shared_task.delay()
Note to maintainers me and @jreed1701 got here because the traceback at the beginning of the thread matched our own. I don't think our discussion has to do with the actual title of this issue. If anything, it might be a worthwhile change to raise an exception if .delay
or .apply_async
is attempted on a decorated function without a celery context, but that is outside the scope of this issue.
I'll make an issue about this.
@chrisconlan I'm using Flask. Haven't gotten around to Django yet.
Saw you're based in MD! I'm nearby :)
Glad you go it working! Agreed, it's a documentation fix, but also check out the Kombu issues log. I was confused reading whether or not it was an issue with Kombu communicating with Redis in addition to what we've experienced with Celery.
@jreed1701 Awesome man. Shoot me an email and stay in touch.
Most helpful comment
I got the similar issue. The problem was that I forgot to add this.