Celery: Delay and apply_async waiting forever when the broker is down.

Created on 28 Sep 2017  路  44Comments  路  Source: celery/celery

Checklist

  • [X] I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • [X] I have verified that the issue exists against the master branch of Celery.

Steps to reproduce

# tasks.py
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

I am testing the case when rabbitmq is down. So I didn't start rabbitmq at all. Then, I run the following commands in ipython:

from tasks import add
add.delay()

add.delay() is always blocked. I tried to set retry to False, but it doesn't work.

add.apply_async((2, 2), retry=False)

I also tried to provide a retry_policy and it also doesn't work.

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0, 
    'interval_step': 0.2,
    'interval_max': 0.2,
})

Expected behavior

I expect to see an exception when I set retry to False. When I set retry to True, it should run the default number of max_retries and if not working, it should throw an exception.

Actual behavior

The function runs forever and blocks the process.

Error Stack Trace

I press ctrl+c to get the following error inforamtion:

In [1]: from tasks import add, mul

In [2]: add.delay(2,3)
^C---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in __call__(self)
     35         try:
---> 36             return self.__value__
     37         except AttributeError:

AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

ConnectionRefusedError                    Traceback (most recent call last)
~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
    337         try:
--> 338             return fun(*args, **kwargs)
    339         except catch as exc:

~/github/celery-demo/.venv/src/kombu/kombu/connection.py in connect(self)
    260         self._closed = False
--> 261         return self.connection
    262

~/github/celery-demo/.venv/src/kombu/kombu/connection.py in connection(self)
    801                 self._default_channel = None
--> 802                 self._connection = self._establish_connection()
    803                 self._closed = False

~/github/celery-demo/.venv/src/kombu/kombu/connection.py in _establish_connection(self)
    756         self._debug('establishing connection...')
--> 757         conn = self.transport.establish_connection()
    758         self._debug('connection established: %r', self)

~/github/celery-demo/.venv/src/kombu/kombu/transport/pyamqp.py in establish_connection(self)
    129         conn.client = self.client
--> 130         conn.connect()
    131         return conn

~/github/celery-demo/.venv/lib/python3.6/site-packages/amqp/connection.py in connect(self, callback)
    281         )
--> 282         self.transport.connect()
    283         self.on_inbound_frame = self.frame_handler_cls(

~/github/celery-demo/.venv/lib/python3.6/site-packages/amqp/transport.py in connect(self)
    108     def connect(self):
--> 109         self._connect(self.host, self.port, self.connect_timeout)
    110         self._init_socket(

~/github/celery-demo/.venv/lib/python3.6/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
    149                 self.sock.settimeout(timeout)
--> 150                 self.sock.connect(sa)
    151             except socket.error:

ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-2-e2f2f196d9be> in <module>()
----> 1 add.delay(2,3)

~/github/celery-demo/.venv/src/celery/celery/app/task.py in delay(self, *args, **kwargs)
    414             celery.result.AsyncResult: Future promise.
    415         """
--> 416         return self.apply_async(args, kwargs)
    417
    418     def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,

~/github/celery-demo/.venv/src/celery/celery/app/task.py in apply_async(self, args, kwargs, task_id, producer, link, link_error, shadow, **options)
    537             link=link, link_error=link_error, result_cls=self.AsyncResult,
    538             shadow=shadow, task_type=self,
--> 539             **options
    540         )
    541

~/github/celery-demo/.venv/src/celery/celery/app/base.py in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, router, result_cls, expires, publisher, link, link_error, add_to_parent, group_id, retries, chord, reply_to, time_limit, soft_time_limit, root_id, parent_id, route_name, shadow, chain, task_type, **options)
    747             with P.connection._reraise_as_library_errors():
    748                 self.backend.on_task_call(P, task_id)
--> 749                 amqp.send_task_message(P, name, message, **options)
    750         result = (result_cls or self.AsyncResult)(task_id)
    751         if add_to_parent:

~/github/celery-demo/.venv/src/celery/celery/app/amqp.py in send_task_message(producer, name, message, exchange, routing_key, queue, event_dispatcher, retry, retry_policy, serializer, delivery_mode, compression, declare, headers, exchange_type, **kwargs)
    552                 delivery_mode=delivery_mode, declare=declare,
    553                 headers=headers2,
--> 554                 **properties
    555             )
    556             if after_receivers:

~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in publish(self, body, routing_key, delivery_mode, mandatory, immediate, priority, content_type, content_encoding, serializer, headers, compression, exchange, retry, retry_policy, declare, expiration, **properties)
    179             body, priority, content_type, content_encoding,
    180             headers, properties, routing_key, mandatory, immediate,
--> 181             exchange_name, declare,
    182         )
    183

~/github/celery-demo/.venv/src/kombu/kombu/connection.py in _ensured(*args, **kwargs)
    492                 for retries in count(0):  # for infinity
    493                     try:
--> 494                         return fun(*args, **kwargs)
    495                     except conn_errors as exc:
    496                         if got_connection and not has_modern_errors:

~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare)
    185                  headers, properties, routing_key, mandatory,
    186                  immediate, exchange, declare):
--> 187         channel = self.channel
    188         message = channel.prepare_message(
    189             body, priority, content_type,

~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in _get_channel(self)
    207         channel = self._channel
    208         if isinstance(channel, ChannelPromise):
--> 209             channel = self._channel = channel()
    210             self.exchange.revive(channel)
    211             if self.on_return:

~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in __call__(self)
     36             return self.__value__
     37         except AttributeError:
---> 38             value = self.__value__ = self.__contract__()
     39             return value
     40

~/github/celery-demo/.venv/src/kombu/kombu/messaging.py in <lambda>()
    222             connection = channel
    223             self.__connection__ = connection
--> 224             channel = ChannelPromise(lambda: connection.default_channel)
    225         if isinstance(channel, ChannelPromise):
    226             self._channel = channel

~/github/celery-demo/.venv/src/kombu/kombu/connection.py in default_channel(self)
    829
    830         # make sure we're still connected, and if not refresh.
--> 831         self.ensure_connection(**conn_opts)
    832         if self._default_channel is None:
    833             self._default_channel = self.channel()

~/github/celery-demo/.venv/src/kombu/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors)
    403                             (), {}, on_error, max_retries,
    404                             interval_start, interval_step, interval_max,
--> 405                             callback)
    406         return self
    407

~/github/celery-demo/.venv/src/kombu/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
    348                     if callback:
    349                         callback()
--> 350                     sleep(1.0)
    351                 # sleep remainder after int truncation above.
    352                 sleep(abs(int(tts) - tts))

KeyboardInterrupt:

I found the main issue happens at kombu/kombu/messaging.py::_publish, which eventually reaches kombu/kombu/connection.py::default_channel. The default_channel calls kombu/kombu/connection.py::ensure_connection according to the max_retries, interval_start, interval_step, interval_max in transport_options.

When transport_options is {} and the broker is down, self.ensure_connection(**conn_opts) in default_channel runs forever.

It seems that there are two retrying wrappers in kombu/messaging.py::Producer.publish. One is based on retry and retry_policy provided by the celery amqp and another is based on the transport_options.

My current solution is to add retrying configurations to transport_options as well.

app.conf.broker_transport_options = {
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
}
Kombu Bug Report

Most helpful comment

I got the similar issue. The problem was that I forgot to add this.

Then you need to import this app in your proj/proj/__init__.py module. This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it:

proj/proj/__init__.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

All 44 comments

Hi,
I have hit a problem that sounds similar to yours:
https://github.com/celery/kombu/issues/795

For me as a workaround, using Kombu 4.0.2 has worked.

@HengfengLi what version are you on?

@markddavidoff It's celery (4.1.0).

I ran into this issue as well while trying to get Jobtastic updated to support celery 4. I needed to peg the version of Kombu in order to get tests to not hang forever.

So, in lower versions of kombu/celery, the timeouts aren't all passed to redis-py correctly so in my case I ended up making a forked version of redis-py and overriding the timeouts manually

https://github.com/celery/kombu/pull/769 this should fix the issue

I don't have the issue solved. I'm using celery 4.1 and kombu 4.1. Should I use a newer kombu version?

When there is some problem with workers, the program that manage the task at some point send a task. The code of it is app.send_task('module.name', []) and it frezzes there. It should gives an error of conection or something. Not be blocked there for whole one day. Maybe I'm missing a configuration? I'm using rabbitMQ and a database backend, as rpc gave me lots of problems.

wait for 4.2 release

Why? Is this a bug of the 4.1.0 release? In the meantime should I downgrade to 4.0.x? Thanks

in thee mean time try to install from github master

We're also seeing a similar issue using celery & kombu from their master branches. When the redis broker is down, apply_async() will block until redis becomes available again. I've experimented with different broker_transport_options, to no avail.

how to set delay of 5mnutes in the below code
from flask import Flask, Blueprint, abort, jsonify, request, session
import settings
from celery import Celery

app = Flask(__name__)
app.config.from_object(settings)

def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, args, *kwargs):
with app.app_context():
return TaskBase.__call__(self, args, *kwargs)
celery.Task = ContextTask
return celery

celery = make_celery(app)

@celery.task(name="tasks.add")
def add(x, y):
return x + y

@app.route("/test")
def hello_world(x=16, y=16):
x = int(request.args.get("x", x))
y = int(request.args.get("y", y))
res = add.apply_async((x, y))
context = {"id": res.task_id, "x": x, "y": y}
result = "add((x){}, (y){})".format(context['x'], context['y'])
goto = "{}".format(context['id'])
return jsonify(result=result, goto=goto)

@app.route("/test/result/")
def show_result(task_id):
retval = add.AsyncResult(task_id).get(timeout=5.0)
return repr(retval+"mmk")

if __name__ == "__main__":
port = int(environ.get("PORT", 5000))
app.run(host='0.0.0.0', port=port, debug=True)

Even with celery 4.1.1 i'm still experiencing hanging when the broker (RabbitMQ) is down.

what do you face in 4.2rc4? this isn't in 4.1.x branch

This sounds a lot like #4627, which is still a problem even on master.

thanks @clokep

So 4,2rc4 wouldn't work ? Or how do I get that version ?

please try that version and report

I am seeing what looks very much like the same problem. I am using celery 4.1.1 and kombu 4.2.1.

Later: Actually in my case, the server - redis - should be up and responding. celery beat seems to trigger events fine, but calling delay methods hangs forever. Will continue investigating from next week just in case I have done something else wrong.

this should be fixed in 4.2rc4, care to try that? we want to know it's current status

Asif Saifuddin Auvi notifications@github.com writes:

this should be fixed in 4.2rc4, care to try that?

Will do so on Tuesday (Monday is a public holiday here and already

finished for Friday). Thanks for the quick response.

Brian May brian@linuxpenguins.xyz
https://linuxpenguins.xyz/brian/

ok thanks

Nevermind, turns out my problem was different. I wasn't correctly initializing Celery from Django, so it silently defaulted to using amqp instead of redis, where it silently hangs trying to connect to an AMQP socket that isn't actually listening.

please open a new issue after trying celery 4.2

I have tried with:

celery==4.2.0
kombu==4.2.1

and also needed to specify broker_transport_options in order for apply_async to not block forever when Redis is down.

Still having this issue:

  • celery: 4.2.1
  • kombu: 4.2.1
  • broker: rabbitmq

The main process hangs up and no exceptions are raised.

Update:

solved with:

app.conf.broker_transport_options = {
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}

Still an issue in case broker was working, then died and we send some task just after this moment:

celery: 4.2.1
kombu: 4.2.1
broker: rabbitmq

Steps to reproduce:

  1. Run rabbitmq and send a task to it. AsyncResult object is returned.
  2. Stop rabbitmq and try to send the task again. .delay() hangs forever

I tried to do some digging. My conclusions:
-- Celery is asking for Connection's object default_channel property.
-- default_channel knows about broker_transport_options and calls -> ensure_connection()->retry_over_time() with broker_transport_options.
This function tries to do return fun(*args, **kwargs) where fun is bound method Connection.connect. And this function successfully return Connection object/.
so default_channel returns self._default_channel and celery is trying to something with it.

But after series of processing steps on_task_call->maybe_declare -> _imaybe_declare from kombu.common is called. It took Connection object from Queue and calls its ensure() method. But it has no idea about broker_transport_options so method ensure is called without broker_transport_options and uses the default values - max_retries=None which means infinite retries :(
I've added processing broker_transport_options in kombu.connection.Connection ensure() method so it uses broker_transport_options every time it checks connection to broker but I have no idea is it the right fix for this problem or not.

celery: 4.2.1
kombu: 4.2.1

Minimal example reproducing the problem:

from celery import Celery

celery_app = Celery('lol', broker='LOL_NOT_VALID')
celery_app.send_task('test')

I got the similar issue. The problem was that I forgot to add this.

Then you need to import this app in your proj/proj/__init__.py module. This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it:

proj/proj/__init__.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

can anyone come with a suitable solution?

I think we worked around this by providing transport options that only retry a certain number of times (by default Celery tries forever):

# Try 5 times. Initially try again immediately, then add 0.5 seconds for each
# subsequent try (with a maximum of 3 seconds). This comes out to roughly 3
# seconds of total delay (0, 0.5, 1, 1.5).
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'max_retries': 4,
    'interval_start': 0,
    'interval_step': 0.5,
    'interval_max': 3,
}

I can try to find more information if that isn't clear.

BROKER_TRANSPORT_OPTIONS fixed it for me, instead of CELERY_BROKER_TRANSPORT_OPTIONS. Using celery 4.2.1, kombu 4.2.1 and django 1.8.

Whether you use BROKER_TRANSPORT_OPTIONS vs. CELERY_BROKER_TRANSPORT_OPTIONS vs. broker_transport_options etc. depends somewhat on how you configure your Celery application. Glad it worked for you though! 馃憤

This is pretty dumb, but for me using Flask, I had done this:

app.py

    ...
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    ...

tasks.py:

import celery

@celery.task(bind=True)
def my_task():
    ...

I needed to do:
app.py

    ...
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)

    app.celery = celery
    ...

tasks.py

from flask import current_app

celery = current_app.celery

@celery.task(bind=True)
def my_task():
    ...

This solved my issue of infinite hanging, but honestly I think that given that my setup was messed up, it would have been better for celery to tell me that.

The real solution is here: https://github.com/celery/celery/issues/4627#issuecomment-396907957

Just to let you know that I have the same default behaviour with:

  • Celery 4.3.0
  • Kombu 4.6.4

The CELERY_BROKER_TRANSPORT_OPTIONS setting works well.

Same here I'm using:

  • Celery 4.4.2
  • Kombu 4.6.8

Was the problem reintroduced?

@jreed1701 I'm seeing the same issue as you, and we are getting the same traceback as the one mentioned in this thread, but I don't think it is for the same reason. For me, apply_async and delay are exhibiting this behavior for @shared_task functions but not @app.task functions. Same traceback as the one discussed in this thread, but it has nothing to do with the state of the broker.

Hi @chrisconlan, funny you point this out. I arrived at this conclusion yesterday when I modified my software to use a factory/extension pattern. I was using the @shared_task method you pointed out, but moved to the @app.task method when I changed to the new pattern. The change in implementation on my part moved I create a celery instance without an application/task context, and use it to decorate my task functions at import time. Then later at runtime I replace the same celery object/instance task context with the appropriate app context, update it with config inputs, and then run it. The infinite hangup behavior ceased and the apply_async executed even though I had no broker connected. I wonder what about changing the context free'd this all up?

@jreed1701 Not sure about you, but I am using celery through django. I don't stray too far from the documented usage on that front because I don't want to break a deployment. Like you said, I'm not sure that anything is fundamentally broken about celery here, but I think the django-celery docs need an update to make sure everyone is following best practices.

@jreed1701 You gave me a great clue regarding celery context. I fixed it with the following ...

From the terminal, run python manage.py shell to initialize a Django shell, then ...

# Make sure to initialize celery context
from django_project.celery import app

# Now you can import shared tasks
from other_app.tasks import the_shared_task

# Now the shared task won't hang
the_shared_task.delay()

Note to maintainers me and @jreed1701 got here because the traceback at the beginning of the thread matched our own. I don't think our discussion has to do with the actual title of this issue. If anything, it might be a worthwhile change to raise an exception if .delay or .apply_async is attempted on a decorated function without a celery context, but that is outside the scope of this issue.

I'll make an issue about this.

@chrisconlan I'm using Flask. Haven't gotten around to Django yet.
Saw you're based in MD! I'm nearby :)
Glad you go it working! Agreed, it's a documentation fix, but also check out the Kombu issues log. I was confused reading whether or not it was an issue with Kombu communicating with Redis in addition to what we've experienced with Celery.

@jreed1701 Awesome man. Shoot me an email and stay in touch.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

charles-dyfis-net picture charles-dyfis-net  路  48Comments

auvipy picture auvipy  路  102Comments

marvelph picture marvelph  路  129Comments

Twista picture Twista  路  55Comments

aromanovich picture aromanovich  路  68Comments