Hi,
I'm using Kubernetes to deploy my python application, Kubernetes provide a livenessProbe and readinessProbe see here.
How can I do to check if my celery beat or celery worker is alive and in correct state ?
The PID is not a solution because it cannot be used to catch a deadlock for example.
Thanks in advance for your help,
Best regards,
Celery has a monitoring API you can use.
A pod should be considered live if the Celery worker sends a heartbeat.
A pod should be considered ready if the worker has sent the worker-online event.
If you have specific problems or feature requests, please open a separate issue.
Would this work?
readinessProbe:
exec:
command:
- "/bin/sh"
- "-c"
- "celery -A path.to.app status | grep -o ': OK'"
initialDelaySeconds: 30
periodSeconds: 10
@7wonders You'd need to extract the celery node name first. This readinessProbe will fail if any celery instance fails which is not what you want.
@thedrow Hmm, I think its actually that it will succeed even if the actual node has failed but another one is ok which is also not a great outcome.
Looks like
/bin/sh -c 'exec celery -A path.to.app inspect ping -d celery@$HOSTNAME'
is good enough for readiness check and verifies just one node.
Beware that in some apps, running this command can take a few seconds using full CPU AND kubernetes defaults are to run it every 10 seconds.
It is thus much safer to have a high periodSeconds (ours is set to 300).
@redbaron did that command work for you? If it works then what are the settings for liveness and readiness prob?
For some reason, this readiness probe is nowhere near satisfactory for us. The inspect responds non-deterministically with no load on our cluster. We run the format like this:
celery inspect ping -b "redis://archii-redis-master:6379" -d celery@archii-task-crawl-integration-7d96d86b9d-jwtq7
And with normal ping times (10 seconds), our cluster is completely killed by the CPU celery requires.
~I use this for liveness with a 30s interval: sh -c celery -A path.to.app status | grep "${HOSTNAME}:.*OK"
~
I use this for liveness with a 30s interval: sh -c celery -A path.to.app inspect ping --destination celery@${HOSTNAME}
Doesn't seem to cause any extra load, I run a fleet of well over 100 workers.
Readiness probes aren't necessary, Celery is never used in a service. I just set minReadySeconds: 10
which is good enough for delaying worker startup in rolling Deployments, but it obviously depends on the startup time of Celery for your project so examine logs and set accordingly.
Readiness probes are still useful even if they're not used in a service. Specifically, when you do a deployment of workers and want to make sure your deployment was successful, you usually use kubectl rollout status deployment
. Without readinessprobes, we've deployed bad code that didn't start celery and didn't know it.
My solution was:
readinessProbe:
exec:
command:
[
"/usr/local/bin/python",
"-c",
"\"import os;from celery.task.control import inspect;from <APP> import celery_app;exit(0 if os.environ['HOSTNAME'] in ','.join(inspect(app=celery_app).stats().keys()) else 1)\""
]
Others seem to not work 🤷♂️
Thanks @yardensachs!
Spend many time for debugging what's wrong with other solutions, but no way
Seems like celery inspect ping
command do not return exit(0) or something in that way
celery inspect ping
does work, but you need bash
to replace the environment variable like this:
livenessProbe:
exec:
# bash is needed to replace the environment variable
command: [
"bash",
"-c",
"celery inspect ping -A apps -d celery@$HOSTNAME"
]
initialDelaySeconds: 30 # startup takes some time
periodSeconds: 60 # default is quite often and celery uses a lot cpu/ram then.
timeoutSeconds: 10 # default is too low
good to know
We ended up ripping celery inspect ping out from our liveness probes because we found that under heavier load, the ping would just hang for minutes at a time even though the jobs were processing fine and there was no backlog. I have a feeling it had something to do with using eventlet, but we're continuing to look into it.
@WillPlatnick That won't happen with 5.0 because Celery will be async so there will be reserved capacity for control coroutines.
I'm having trouble with inspect ping
spawning defunct / zombie processes:
root 2296 0.0 0.0 0 0 ? Z 16:04 0:00 [python] <defunct>
root 2323 0.0 0.0 0 0 ? Z 16:05 0:00 [python] <defunct>
...
Anyone else encountering this? There isn't a --pool
argument for forcing a single process execution.
Can I ask what are you using instead of celery inspect ping
@WillPlatnick? We've encountered a similar issue with the probe failing under heavy load.
@mcyprian We got rid of the liveness probe. My gut is telling me it has something to do with eventlet, but we haven't made it a priority to figure it out.
we meet the same CPU problem with Redis broker
Has anyone found a solution?
We were also experimenting with scheduling "debug_task" on the queue which name we based on the container name. The problem is that we have tons of old queues in RabbitMQ now
Please be aware that using
sh -c celery -A path.to.app status | grep "${HOSTNAME}:.*OK"
as suggested on https://github.com/celery/celery/issues/4079#issuecomment-437415370 will lead to massive error reports on rabbitmq, see https://github.com/celery/celery/issues/4355#issuecomment-578786369
I think i have found a way to reduce the CPU usage of inspect ping.
celery -b amqp://user:pass@rabbitmq:5672/vhost inspect ping
Not loading the celery configs using -A path.to.celery certainly helped with the cpu usage,
could somebody verify.
I think i have found a way to reduce the CPU usage of inspect ping.
celery -b amqp://user:pass@rabbitmq:5672/vhost inspect ping
Not loading the celery configs using -A path.to.celery certainly helped with the cpu usage,
could somebody verify.
Nice! It's much better than with the app loaded.
But we still get the huge overhead of a python process starting + celery import. I still would recommend a high period.
Hi,
celery inspect ping -A app.tasks -d celery@$HOSTNAME gives me "Error: Broadcast not supported by transport 'sqs'" .
I'm using SQS as the broker, so this means 'inspect' / 'status' command wont work with SQS?
We have found that at scale, all the remote control features are causing the Redis instance to spike up in CPU due to set commands on kombu.pidbox
key, so we can't use ping, or status or inspect as they are all using remote control and trying to disable the remote control for production use case.
Seems to me like having a dedicated health check queue is the right way but I'm not sure at all
Any one has any other direction not involving remote control to test health checks?
We use dedicated health checks queues with RabbitMQ eviction policies (queues are deleted automatically) successfully for some time now, and we are happy with the solution. Mostly because this check is indeed checking that worker processes the task and finishes. Since we've introduced it we had no more problems with stuck workers.
@bartoszhernas mind sharing some code for that? do you queue these via beat and than workers pick it up?
would love to see the code + the liveness probe section
Hi, the code is really easy:
In Kubernetes I specify queue name based on POD_NAME, and pass it to livecheck script:
livenessProbe:
initialDelaySeconds: 120
periodSeconds: 70
failureThreshold: 1
exec:
command:
- bash
- "-c"
- |
python celery_liveness_probe.py $LIVENESS_QUEUE_NAME
env:
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: LIVENESS_QUEUE_NAME
value: queue-$(MY_POD_NAME)
(you need to use bash -c, because Kubernetes doesn't expand ENVs when trying to pass it as command directly)
then the celery_liveness_probe.py is just setting up Django to be able to use Celery and scheduling the task on POD's queue
# encoding: utf-8
from __future__ import absolute_import, unicode_literals
import os
import sys
if __name__ == "__main__":
import django
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ahoy.archive.settings")
django.setup()
from ahoy.archive.apps.eventbus.service import eventbus_service
exit(0 if eventbus_service.health_check(sys.argv[1] if sys.argv and len(sys.argv) > 1 else None) else 1)
the health check function sends the task and waits for the results
def health_check(self, queue_name: Optional[str] = None) -> bool:
event = self.celery.send_task(
AhoyEventBusTaskName.LIVENESS_PROBE,
None,
queue=queue_name or self.origin_queue,
ignore_result=False,
acks_late=True,
retry=False,
priority=255
)
try:
is_success = event.get(timeout=10)
except (celery.exceptions.TimeoutError, AttributeError):
is_success = False
return is_success
So basically: send a task, and if it returns results then the worker is healthy. If the worker got stuck (happened a lot of times) then the tasks never finishes, the Pod is restarted and everything gets back to normal.
The only caveat is that you need to deal with old queues, with RabbitMQ it's easy, we just set up expiry policy on queue
https://www.rabbitmq.com/ttl.html#queue-ttl
@bartoszhernas thank you for sharing the code!
like you've said my queues are dynamic and we are using Redis - so we really need to find a way to deal with the queue names expiration on Redis
Yeah, we have similar issue with BullMQ with Redis. My idea is to write CronJob for Kubernetes that would clear the queues every some time.
Most helpful comment
celery inspect ping
does work, but you needbash
to replace the environment variable like this: