celery -A proj report
in the issue.master
branch of Celery.software -> celery:4.0.2 (latentcall) kombu:4.0.2 py:3.5.2
billiard:3.5.0.2 py-amqp:2.1.4
platform -> system:Darwin arch:64bit imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:redis://127.0.0.1/
result_backend: 'redis://127.0.0.1/'
broker_url: 'amqp://guest:********@localhost:5672//'
pubsub channels
pubsub channels
Code example: https://gist.github.com/gugu/680452f754507ec6bea144a2b257c398
On step 3 and 5 we need to have 0 channels
On step 3 we have 4 open channels
On step 5 we have 8 open channels
On our live application in 2 hours (between restarts) we have 3 200 000 open channels, which generally slows everything and hell gates open with tenth of different issues.
PS: will try to return with pull request later
Any news about this? We are also running in to this issue and it causes us a bit of a headache...
I wrote my own chord implementation and did not try to fix this
@gugu do you mind sharing your work? If you have it available in a public repository of course.
@MikaelWahlberg I have reproduced the bug with the prefork pool. I found some instances in the source code where the backend should unsubscribe from channels but have not resolved it completely yet. One temporary solution for the worker might be to set --max-tasks-per-child
to a relatively low number (e.g. 5-10).
@georgepsarakis Yeah, that could have worked as a workaround, but our main problem seems to be with Celery beat, and there we don't have that option...
Glad to found this issue. My redis-cli pubsub channels | grep celery-task-meta | wc -l
is currently at ~48k, which I found strange.. It doesn't cause an issue yet tho, but it might soon...
@MikaelWahlberg what do you mean exactly? beat
runs separately than the worker. From my research so far, the problem is caused by the worker processes and my suggested option refers to that. I would suggest that anyone that encounters this issue to use --max-tasks-per-child
, which will allow the connections that appear subscribed to be closed.
@georgepsarakis Sorry, I missed this comment. What I mean is that the number of pubsub channels keep growing, even if we set --max-tasks-per-child on the "receiving" workers. And once we restart the celery-beat process it goes down again.
Sure, as a workaround we could restart the celery-beat every day as well, that would probably help a lot.
@georgepsarakis yes, at Monday I'll create a separate module for this
It seems to me that there's no reason for celery-beat to even bother subscribing in the first place - it doesn't deal with results, it just queues up tasks.
A side effect of this issue can lead to some out-of-control resource usage - at a certain point, duplicate subscribe messages were being sent out from the beat worker to our Redis instance, and these grew substantially. I hypothesize that this was due to Redis taking a very long time (or just refusing) to acknowledge the subscription, so the task IDs stay in the queue of task IDs to subscribe to, but I haven't dug deep enough into the result backend code to confirm this as the cause.
Regardless, this can be addressed by ensuring that the beat worker doesn't establish a connection to the result backend at all.
@georgepsarakis here is my very first ezy-chord release:
https://github.com/gugu/ezy-chord
(tomorrow I'll move it to my company's webpage: https://github.com/EzyInsights/ezy-chord)
Hi. Are you sure this behaviour is related to chords only? I have a simple test setup with a simple task from the base tutorial and made the experience, that something like the code below executed from the python shell opens 100 channels, which are only closed once the python process is terminated.
for i in xrange(1,100):
tasks.add.apply_async((2,2))
@rdrongow yes, that's also true. If you do not need result of a task you can try @celery.task(ignore_result=True)
, this can help
I think I may have found the solution to this issue, I will follow up with details once I have concluded.
I suggest to roll back how celery worked with Redis to version 3, the new one is broken by the idea. V3 worked fine, we had no issues
I also have the same issue in a Celery Chain workflow. After a few days, it opened millions of pubsub channels by the first task in the chain.
I'm new to Celery. Only the next task in the chain needs the result of the first task. I don't need the result back in the python code running this Celery workflow. Should I set celery.task(ignore_result=True)
in this case?
@chuanma no, it will not help you. you can emulate chain by running next task from the end of the previous task
Thanks for the response, @gugu
My workflow involves both celery.chain
and celery.group
. This is not upgraded to celery.chord
, isn't it?
task1.s() | celery.group(task2.s(), task3.s())
I ran strace
against a celery worker of task1
and found out that it subscribe
to the channels of both downstream tasks: task2
and task3
at the end of each run. This is weird because task1
is the first task, and shouldn't rely on any downstream tasks. Anyone knows why the subscribe
actions are required from task1
?
Also both downstream tasks don't return any results (I have celery.task(ignore_result=True)
for both tasks). So the downstream tasks won't publish any data to those phantom channels. That means task1
is subscribing to some channel that never has any published data. Is this a bug?
@chuanma
you can run task1, at the end of task1 you can run task2.delay(); task3.delay()
with same effect
Thanks! That will be my last resort. For now I'd prefer to have task1 reusable in other Celery workflow.
@thedrow @auvipy @AlexHill FYI I am still working on solving this.
change to v5 if not possible to fix before 4.2 release
I had setup some tasks to replicate the issue, and from what I have tested so far, the problem does not seem to exist any more in current master. I think it was fixed by #4131, because now on_fulfilled is called, which performs the channel unsubscription on the Result Consumer. Can someone else please check/verify?
@chuanma @gugu if you could possibly check at some point please provide any feedback here. @auvipy @thedrow @AlexHill I used the integration test from #3815 to verify, opened #4468 for that purpose.
closing as fixed for now
I think the issue is still present. We are using Celery with Redis broker + backend for data science enrichment of documents. The set up is following:
enrich.apply_async()
)chord(child_tasks, enrich_result_callback.s(client_id, meta)).delay()
where child tasks are different enrichments to be executed by invocation of external microservices.After running several thousands of document enrichments, we can observe a slow down of Celery workers. The throughput of the workers drops significantly. I was trying to experiment with different configuration options of Celery like: -O fair set up, increasing multiply_prefetch config to larger values + {'fanout_patterns': True}
and {'fanout_prefix': True}
. Also, we tried to update to the latest version of Celery 4.2rc2. None, of these helped.
By running following, I can see how number of channels is constantly growing: redis-cli pubsub channels | grep celery-task-meta | wc -l
currently around ~ 65k
Below is Celery configuration:
software -> celery:4.1.0 (latentcall) kombu:4.1.0 py:3.5.5
billiard:3.5.0.3 redis:2.10.6
platform -> system:Linux arch:64bit, ELF imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis://redis-xxx.xxx.ng.0001.euc1.cache.amazonaws.com:6379/2
REDIS_ENRICHMENTS_KEY: *
worker_max_tasks_per_child: 100
result_expires: 3600
REDIS_CLIENT_IDS_KEY: *
task_queues:
(
REDIS_RESULTS_TARGETS_KEY: '**'
REQUEST_TIMEOUT: 60
broker_transport_options: {
'visibility_timeout': 43200}
statsd_workers_prefix: 'swarm-workers'
STATSD: {
'host': 'statsd.collectd', 'port': 8125, 'prefix': 'swarm-api'}
broker_url: 'redis://redis-xxx.xxx.ng.0001.euc1.cache.amazonaws.com:6379/1'
imports:
('swarm.workers.tasks',)
result_backend: 'redis://redis-xxx.xxx.ng.0001.euc1.cache.amazonaws.com:6379/2'
task_default_queue: 'default'
DEBUG: False
worker_prefetch_multiplier: 2
We need to find a test case that reproduces the issue.
When @georgepsarakis wrote one, it passed without any code modification. See #4468.
@mleginus Some one has to dig deep and figure this out. We can't really do that since we don't have your code available to us.
@mleginus it would also help if you had a chart with the channel count over time, since starting the worker.
Hello @georgepsarakis @thedrow ,
sorry for the delayed response. I tried to simplify the code and provide an example how the issue arises. I am running two workers like this:
celery -A swarm.workers.tasks worker -Q default -P solo
celery -A swarm.workers.tasks worker -Q children -P solo
I am executing the task test function which checks for Redis channel count before and after chord execution. I was running this against locally installed Redis:
I am wondering what could be the reason for such behaviour when the snippet is quite similar to the test case written by @georgepsarakis
here is the code snippet of tasks.py which is under swarm.workers
package:
"""Celery tasks module."""
from celery import Celery
from celery import chord
from kombu import Queue
from redis import StrictRedis
from time import sleep
class CeleryConfig(object):
# celery configurations
broker_url = 'redis://localhost:6379/1'
task_default_queue = 'default'
task_queues = (
Queue('default', routing_key='parent.#,opbeat'),
Queue('children', routing_key='child.#,opbeat'),
)
imports = ('swarm.workers.tasks',)
result_backend = 'redis://localhost:6379/2'
worker_prefetch_multiplier = 2
worker_max_tasks_per_child = 100
def create_app(config):
"""Create a celery app instance."""
celery_app = Celery(__name__)
celery_app.config_from_object(config)
return celery_app
app = create_app(CeleryConfig)
@app.task
def add(x, y):
"""Add two numbers."""
return x + y
@app.task
def delayed_sum(numbers, pause_time=1):
"""Sum the iterable of numbers."""
# Allow the task to be in STARTED state for
# a limited period of time.
sleep(pause_time)
return sum(numbers)
def run_test_task():
"""Run test task."""
child_tasks = []
child_tasks.append(
add.signature(
(1, 2),
options={
'queue': 'children',
'routing_key': 'child.enrich',
}
)
)
child_tasks.append(
add.signature(
(3, 3),
options={
'queue': 'children',
'routing_key': 'child.enrich',
}
)
)
redis = StrictRedis()
channels_before = len(redis.execute_command('PUBSUB CHANNELS'))
print('Redis channels count before chord execution {}'.format(channels_before))
async_result = chord(child_tasks)(delayed_sum.s())
print('The final sum is {}'.format(async_result.get()))
channels_after = len(redis.execute_command('PUBSUB CHANNELS'))
print('Redis channels count after chord execution {}'.format(channels_after))
I was also trying to run this against the AWS Elasticache and there I can see the same behaviour. Below is the chart how number of channels increases with the time. Please ignore the staircase shape of the function. The reason for "staircase function" is that tasks get triggered every 5 minutes. Nevertheless, the channels count is completely dependent on the number of chord + child tasks executed.
@georgepsarakis @thedrow can u reproduce with the snippet on your machines?
see my pip freeze:
amqp==2.1.4
apipkg==1.4
appdirs==1.4.0
aspy.yaml==0.2.2
astroid==1.4.9
autopep8==1.2.4
billiard==3.5.0.2
blinker==1.4
boto3==1.4.4
botocore==1.5.7
cached-property==1.3.0
celery==4.1.0
certifi==2017.1.23
click==6.7
cov-core==1.15.0
coverage==3.7.1
decorator==4.0.11
docutils==0.13.1
execnet==1.4.1
factory-boy==2.8.1
Faker==0.7.7
flake8==3.5.0
Flask==0.12
freezegun==0.3.8
ipython==5.1.0
ipython-genutils==0.1.0
isort==4.2.5
itsdangerous==0.24
Jinja2==2.9.4
jmespath==0.9.1
jsonschema==2.5.1
kombu==4.1.0
lazy-object-proxy==1.2.2
MarkupSafe==0.23
marshmallow==2.12.1
mccabe==0.6.1
nodeenv==1.1.0
opbeat==3.5.2
packaging==16.8
pep8==1.7.0
pexpect==4.2.1
pickleshare==0.7.4
pre-commit==0.12.1
prompt-toolkit==1.0.9
ptyprocess==0.5.1
py==1.4.32
pycodestyle==2.3.1
pyflakes==1.6.0
Pygments==2.2.0
pylint==1.6.5
pyparsing==2.1.10
pytest==2.9.2
pytest-catchlog==1.2.2
pytest-cov==1.8.1
pytest-flask==0.10.0
pytest-xdist==1.15.0
python-dateutil==2.6.0
python-statsd==1.7.2
pytz==2016.10
PyYAML==3.12
redis==2.10.5
requests==2.13.0
s3transfer==0.1.10
simplegeneric==0.8.1
six==1.10.0
traitlets==4.3.1
urllib3==1.20
vine==1.1.3
virtualenv==15.1.0
wcwidth==0.1.7
webargs==1.3.4
Werkzeug==0.11.15
wrapt==1.10.8
@mleginus your pip freeze
output displays Celery version 4.1.0 which does not contain the fixes. Could you please double check that you are using the latest release candidate version, 4.2.0rc2?
@georgepsarakis thanks for pointing this out. The issue is gone with 4.2.0rc2
. Let me try tomorrow when in the office with the original code and Celery 4.2.0rc2
-> to see that pubsub channels are cleaned up in the AWS Elasticache. If thats the case, sorry for the confusion!
No worries! Just let us know please once you verify.
I can reproduce the issue with 4.2.0rc2 although the code snippet is different:
see tasks.py snippet:
"""Celery tasks module."""
from celery import chord
from celery import Celery
from kombu import Queue
from redis import StrictRedis
from time import sleep
class CeleryConfig(object):
# celery configurations
broker_url = 'redis://localhost:6379/1'
task_default_queue = 'default'
task_queues = (
Queue('default', routing_key='parent.#,opbeat'),
Queue('children', routing_key='child.#,opbeat'),
)
imports = ('swarm.workers.tasks',)
result_backend = 'redis://localhost:6379/2'
worker_prefetch_multiplier = 2
worker_max_tasks_per_child = 100
def create_app(config):
"""Create a celery app instance."""
celery_app = Celery(__name__)
celery_app.config_from_object(config)
return celery_app
app = create_app(CeleryConfig)
@app.task
def enrich():
"""Main enrichment task."""
child_tasks = []
for _ in range(1, 10):
child_tasks.append(
enrich_single_call.signature(
kwargs={
'data': 'data',
},
options={
'queue': 'children',
'routing_key': 'child.enrich',
}
)
)
chord(child_tasks)(enrich_result_callback.s())
@app.task
def enrich_single_call(data):
"""Single call to a single enrichment service."""
result = {
'data': data.upper() # faking invocation of enrichment service :) :)
}
return result
@app.task
def enrich_result_callback(results_list):
"""Enrichment results publisher."""
results = {}
for result in results_list:
results['enrichment'] = result
return results
def run_test_task():
"""Run test task."""
redis = StrictRedis()
channels_before = len(redis.execute_command('PUBSUB CHANNELS'))
print('Redis channels count before chord execution {}'.format(channels_before))
enrich.delay()
sleep(5)
channels_after = len(redis.execute_command('PUBSUB CHANNELS'))
print('Redis channels count after chord execution {}'.format(channels_after))
running Celery workers like this:
celery -A swarm.workers.tasks worker -Q children -P solo
celery -A swarm.workers.tasks worker -Q default -P solo
then running run_test_task from ipython produces output like this:
@georgepsarakis can u reproduce with the updated snippet?
@mleginus This is exactly why we introduced an RC process.
Will you be able to contribute this test case to our integration test suite?
@thedrow I would love to help with the integration test but it might be that I wont be able to do that until middle of May (I am wrapping up this week in the office and then travelling for some time).
@mleginus @thedrow I opened #4666 that may be resolving this issue. Please test this before deploying to production, it may have unknown consequences.
@mleginus do you perhaps have time to test the branch on PR #4666 ? Your feedback will be valuable.
@mleginus @georgepsarakis @thedrow Any update on this? Still blocking 4.2...
@johnarnold we would like some feedback on the possible solution. #4666 will probably be merged into master quite soon.
I have rerun @mleginus's sample code above and I still get the failure.
My pip3 freeze is:
pip3 freeze | grep celery
celery==4.2.0rc3
The output of his script is:
Redis channels count before chord execution 0
Redis channels count after chord execution 10
Redis channels count before chord execution 10
Redis channels count after chord execution 20
Redis channels count before chord execution 20
Redis channels count after chord execution 30
Several minutes after running the script, running 'redis-cli pubsub channels' still shows the celery-task-meta-xxx elements as still subscribed.
I've tried this on two machines, one Ubuntu 16 and the other mac os X. Redis 4.0.7.
I have confirmed that the tasks are actually succeeding: AsyncResult(task_id.id).status == "SUCCESS".
Therefore, I believe the issue is still that the pubsub client is not receiving the task_succeed message from the work. Today is my first day looking at celery code, so my beliefs should not be trusted.
I have found a workaround for now, which is not very nice: for tasks that have no return value, I call
task_id.backend.result_consumer.cancel_for(task_id.task_id) using the task_id returned from .delay().
Let me know if I can provide any other info.
@bartloop thanks for the feedback. I am guessing that you are referring to this script, correct?
You need to call AsyncResult.get()
so that the caller unsubscribes from the chord. With the code as it is now, the channels will remain active, as long as the ipython
process remains active.
Try running this for testing:
def run_test_task():
"""Run test task."""
redis = StrictRedis()
channels_before = len(redis.execute_command('PUBSUB CHANNELS'))
print('Redis channels count before chord execution {}'.format(channels_before))
result = enrich.delay()
sleep(5)
channels_after = len(redis.execute_command('PUBSUB CHANNELS'))
print('Redis channels count after chord execution {}'.format(channels_after))
result.get()
channels_after = len(redis.execute_command('PUBSUB CHANNELS'))
print('Redis channels count after result retrieval {}'.format(channels_after))
@georgepsarakis thank you for taking the time to show me that correction.
The code you modified decremented the count by 1, not by 10:
Redis channels count before chord execution 0
Redis channels count after chord execution 10
Redis channels count after result retrieval 9
Redis channels count before chord execution 9
Redis channels count after chord execution 19
Redis channels count after result retrieval 18
But, our issue is not really about chords - we're struggling with doing .delay()s on single tasks with no return value.
So here's my newbie question: for tasks that have no result, how do I do the equivalent of .get() but in a non-blocking way? The documentation for AsyncResult shows a forget() function, which seems to be what I want, but it has no effect on the subscription count. I also tried adding ignore_result=True to the task decorators, but that had no effect. I assume there is some way to do this.
Thanks!
@bartloop This sounds like a different but related issue.
Please open a new one with the relevant test case.
If you are up to it, contribute an integration test and Mark it as xfail.
@georgepsarakis I hope you don't mind me closing this one again.
I'd rather keep our issue list clean and this one should be resolved.
If there is an exact regression of this issue let's reopen. If not, please encourage our users to open issues with the relevant test cases.
@thedrow not at all, I just reopened in order not to discuss on a closed issue, thanks a lot for explaining.
@georgepsarakis and @thedrow I can reproduce the behaviour reported by @bartloop -
https://github.com/celery/celery/issues/3812#issuecomment-386639135 I would like to see whether I will be able to write an integration test case that could reproduce such behaviour. For this, I need to have two celery workers running inside the test - one listening on children
queue and another on default
queue. Could you advice on how to create a second worker fixture (in addition to celery_session_worker
https://github.com/celery/celery/blob/master/t/integration/conftest.py#L65) and pass in appropriate settings to listen on a specific queue? Thanks in advance.
@mleginus could you please open a new issue so that we can analyze this further? Thanks!
Err... is it just me or should this issue be reopened ? I just tried out gugu's code on a recent 4.4.0 install and I still have the same symptoms - subscriptions for the chord's header subtasks are still not cleared.
NB : I'm still browsing the (numerous) related tickets etc so I might have overlooked something though - if yes, please enlighten me ;-)
celery -A app report :
software -> celery:4.4.0 (cliffs) kombu:4.6.7 py:2.7.17
billiard:3.6.1.0 py-amqp:2.5.2
platform -> system:Linux arch:64bit
kernel version:5.3.0-26-generic imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:redis://localhost/
worker_max_tasks_per_child: 10
result_extended: True
worker_prefetch_multiplier: 1
worker_send_sent_event: True
result_expires: 86400
worker_disable_rate_limits: True
worker_send_task_events: True
timezone: u'Europe/Paris'
track_started: True
broker_url: u'amqp://guest:********@localhost:5672//'
result_backend: u'redis://localhost/'
worker_pool_restarts: True
Most helpful comment
@georgepsarakis yes, at Monday I'll create a separate module for this