Celery version:
Report:
software -> celery:4.2.0 (windowlicker) kombu:4.2.2-post1 py:3.6.6
billiard:3.5.0.5 sqs:N/A
platform -> system:Linux arch:64bit, ELF
kernel version:3.13.0-139-generic imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:sqs results:disabled
broker_url: 'sqs://localhost//'
include: [...]
worker_hijack_root_logger: False
task_serializer: 'json'
result_expires: 3600
accept_content: ['json']
result_serializer: 'json'
timezone: 'Europe/Berlin'
enable_utc: True
broker_transport_options: {
'polling_interval': 1,
'region': 'eu-west-1',
'visibility_timeout': 10860}
task_ignore_result: True
task_acks_late: True
worker_prefetch_multiplier: 1
worker_max_tasks_per_child: 10
worker_pool: 'celery.concurrency.prefork:TaskPool'
task_time_limit: 10800
worker_enable_remote_control: False
worker_send_task_events: False
task_default_queue: 'celery'
kombu/asynchronous/hub.py
to uncomment the print statements in create_loop
worker_max_tasks_per_child: 10
)The main worker process should settle to ~1% CPU usage, after the 500 tasks were run. The print statements should be run not so often because of the sleep
call at the end of the create_loop
method.
The main worker hammers the CPU at constant 100% and the console output (due to the uncommented print statement) is flooded with lots (almost every microsecond) of these (never stopping):
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,502; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,503; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,505; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,506; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,507; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,508; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,509; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,510; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,511; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,513; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
also, the strace
is full of these:
epoll_ctl(9, EPOLL_CTL_DEL, 43, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLERR|0x36f80000, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 46, {EPOLLERR|0x36ca1800, {u32=32711, u64=4295000007}}) = -1 EBADF (Bad file descriptor)
epoll_ctl(9, EPOLL_CTL_DEL, 44, {EPOLLERR|0x36ca1800, {u32=32711, u64=4295000007}}) = -1 EBADF (Bad file descriptor)
epoll_ctl(9, EPOLL_CTL_DEL, 19, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 23, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 27, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 11, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 15, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
There have been similar issues around, but they most of the time were related to redis
. Also what's different to former issues, is that the worker still distributes / consumes new tasks. So basically, everything works, but the CPU is not idling, but hammering the loop instead. I can consistently reproduce this with my SQS worker (concurrency 5 btw), so please let me know, what else information i can gather to hunt down this problem. THX!
Hello again. The latest updates don't seem to help. Currently we are at
billiard==3.6.0.0
celery==4.3.0
kombu==4.5.0
django-celery-beat==1.4.0
The whole issue is very similar to https://github.com/celery/celery/issues/1845 only that we are using SQS as the broker. The problem seems to be caused by worker_max_tasks_per_child
, because the main process goes to 100% CPU as soon as it has to restart its children because of that task limit.
I did some further investigation with python gdb
. Here is a python backtrace via py-bt
:
Traceback (most recent call first):
<built-in method unregister of select.epoll object at remote 0x7fadac9b8600>
File "$ENV/lib/python3.6/site-packages/kombu/utils/eventio.py", line 75, in unregister
self._epoll.unregister(fd)
File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 243, in _unregister
self.poller.unregister(fd)
File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 160, in _remove_from_loop
self._unregister(fd)
File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 181, in remove
self._remove_from_loop(fd)
File "$ENV/lib/python3.6/site-packages/celery/concurrency/asynpool.py", line 721, in <listcomp>
[hub_remove(fd) for fd in diff(active_writes)]
File "$ENV/lib/python3.6/site-packages/celery/concurrency/asynpool.py", line 721, in on_poll_start
[hub_remove(fd) for fd in diff(active_writes)]
File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 295, in create_loop
tick_callback()
<built-in method next of module object at remote 0x7fadcaf37638>
File "$ENV/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "$ENV/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "$ENV/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "$ENV/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "$ENV/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "$ENV/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "$ENV/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "$ENV/lib/python3.6/site-packages/celery/bin/worker.py", line 258, in run
worker.start()
File "$ENV/lib/python3.6/site-packages/celery/bin/base.py", line 252, in __call__
ret = self.run(*args, **kwargs)
File "$ENV/lib/python3.6/site-packages/celery/bin/worker.py", line 223, in run_from_argv
return self(*args, **options)
File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 420, in execute
).run_from_argv(self.prog_name, argv[1:], command=argv[0])
File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 488, in handle_argv
return self.execute(command, argv)
File "$ENV/lib/python3.6/site-packages/celery/bin/base.py", line 298, in execute_from_commandline
return self.handle_argv(self.prog_name, argv[1:])
File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 496, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 322, in main
cmd.execute_from_commandline(argv)
File "$ENV/lib/python3.6/site-packages/celery/__main__.py", line 16, in main
_main()
File "$ENV/lib/python3.6/site-packages/celery/__main__.py", line 20, in <module>
main()
<built-in method exec of module object at remote 0x7fadcaf37638>
File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
It fails to do the self._epoll.unregister(fd)
which relates nicely to the strace
outputs I originally posted. Stepping through for a while I end up at
241 def _unregister(self, fd):
242 try:
243 self.poller.unregister(fd)
244 except (AttributeError, KeyError, OSError):
>245 pass
246
247 def close(self, *args):
248 [self._unregister(fd) for fd in self.readers]
249 self.readers.clear()
250 [self._unregister(fd) for fd in self.writers]
so unregister
clearly raises an exception.
The FDs there like 27
, 11
, 15
are all pipes according to /proc/$PID/fd
. There I can get the pipe FDs and according to lsof | grep $PIPE_FD
they are pipes between the main process and the child processes:
15614
is the main process and 23179
e.g. is a child process:
python3.6 15614 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 15614 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18344 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18344 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18344 18346 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18344 18346 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18368 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18368 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18368 18406 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18368 18406 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18369 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18369 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18369 18382 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18369 18382 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18370 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18370 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18370 18389 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18370 18389 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18371 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18371 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18371 18391 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18371 18391 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18372 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18372 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18372 18390 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18372 18390 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18373 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18373 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18373 18393 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18373 18393 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18375 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18375 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18375 18405 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18375 18405 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18376 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18376 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18376 18395 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18376 18395 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18378 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18378 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18378 18403 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18378 18403 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18380 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18380 18400 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18383 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18383 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18383 18404 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18383 18404 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18384 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18384 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18384 18424 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18384 18424 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18386 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18386 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18386 18420 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18386 18420 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18387 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18387 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18387 18423 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18387 18423 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18396 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18396 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18396 18426 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18396 18426 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18397 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18397 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18397 18422 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18397 18422 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18399 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18399 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 18399 18425 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 18399 18425 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 23122 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 23122 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
python3.6 23179 ubuntu 39r FIFO 0,8 0t0 49209822 pipe
python3.6 23179 ubuntu 40w FIFO 0,8 0t0 49209822 pipe
Without knowing the kombu
/ billiard
code it seems to me, that the child processes were successfully restarted but somehow the main process does not realize this and continues to try and get rid of the old ones, which don't exist anymore.
You are now bringing us to a conclusion we can use to debug this further.
Would you mind changing the code so that the exception will be raised?
If the exception complains about a file descriptor which does not exist, maybe we could simply remove it from the readers/writers list.
So maybe this exception should not be silently passed and ignored.
I'm going to make Celery emit a log for these failures.
Thanks for your support.
In kombu's eventio.py
I had to modify _epoll
to not swallow exceptions (so above i probably did not really run into an exception. the gnome debugger probably just falsely thought, he was in that path). with this i could make all the repeating unregister errors visible:
def unregister(self, fd):
try:
self._epoll.unregister(fd)
except (socket.error, ValueError, KeyError, TypeError):
raise
except (IOError, OSError) as exc:
raise
if getattr(exc, 'errno', None) not in (errno.ENOENT, errno.EPERM):
raise
so that in hub.py
I could do
def _unregister(self, fd):
try:
self.poller.unregister(fd)
except (AttributeError, KeyError, OSError):
logger.exception("pls debug me, fd = %s", fd, extra={'fd_debug': fd})
with many fd
numbers repeating thousands of times within seconds.
e.g. the message was pls debug me, fd = 56
with traceback
Traceback (most recent call last):
File ".../kombu/asynchronous/hub.py", line 243, in _unregister
self.poller.unregister(fd)
File ".../kombu/utils/eventio.py", line 78, in unregister
self._epoll.unregister(fd)
FileNotFoundError: [Errno 2] No such file or directory
The eventio
module specifically ignores these exceptions and does not raise them.
We can either:
_unregister
method in the hub
module as well. In that case we will have to handle the exception whenever _unregistered
is called.Can you please check if #5499 helps in any way?
i will try it out.
Unfortunately it does not help. Depending on the setting (-Ofair
vs. -Odefault
) the worker still goes crazy at these lines:
https://github.com/celery/celery/blob/master/celery/concurrency/asynpool.py#L735
https://github.com/celery/celery/blob/master/celery/concurrency/asynpool.py#L742
edit:
And in the logs I find no occurrences of Encountered OSError while trying
.
I don't understand why.
If we're calling hub.remove
we're eventually calling
hub._discard
whichsshould remove the file descriptor from both the readers and the writers.
If you add a breakpoint there, do you see the fd being removed?
Is it added again somewhere?
So for the -Ofair
case, I modified on_poll_start
like so:
def on_poll_start():
if outbound and len(busy_workers) < len(all_inqueues):
# print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
# len(active_writes)))
inactive = diff(active_writes)
[hub_add(fd, None, WRITE | ERR, consolidate=True)
for fd in inactive]
else:
for fd in diff(active_writes):
aw = list(self._active_writes)
ai = list(self._all_inqueues)
result = hub_remove(fd)
extra = {
'fd': fd,
'result': repr(result),
'active_writes': aw,
'all_inqueues': ai,
}
logger.warn('removed fd %r result %r aw %r ai %r', fd, result, aw, ai, extra=extra)
I'm logging self._active_writes
and self._all_inqueues
, because diff(active_writes)
seems to be equivalent to self._all_inqueues - self._active_writes
.
Then I started the worker with --concurrency 1
and worker_max_tasks_per_child=10
and delay
ed a few tasks, so the restarting should set in quickly.
Even before the restarting took place, my logs were full of this
removed fd 8 result None aw [] ai [8]
removed fd 8 result None aw [] ai [8]
removed fd 8 result None aw [] ai [8]
The frequency and CPU are not high yet. But as soon as restarting takes place and no more tasks arrive, this goes crazy to 7000 logs per 5 seconds. From start to end, the fd 8
never changes. To me, it seems like AsynPool._all_inqueues
is not properly cleaned up. Debuggin on_inqueue_close
now.
So, on_inqueue_close
is being called successfully, but everytime, at least once afterwards, on_process_alive
is being called and the line https://github.com/celery/celery/blob/e7ae4290ef044de4ead45314d8fe2b190e497322/celery/concurrency/asynpool.py#L1083 adds the 8
back in there.
So we do not remove the process from the pool after the worker recycles itself?
Whenever we start a process we append the worker to the pool:
I don't see anyone calling for _join_exited_workers()
which should clean them from the pool.
Could this be the problem?
Can you please verify?
i am not that sure anymore, if that is the overall culprit. i mean, after all, some code must be responsible for the cleanup to be called so frequently. So i moved up the traceback to place some logging into the hub's loop in https://github.com/celery/kombu/blob/master/kombu/asynchronous/hub.py#L301:
…
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
logger.info('new loop run with poll_timeout %r', poll_timeout)
if readers or writers:
…
Let me show you, how I analyze this log message now with kibana:
To me it seems, that something is short circuiting the loop and that's why the CPU goes to 100%. However, I have no idea how to effectively debug this further. There are so many except
and continue
clauses inside of it.
Can you please verify?
I'm not sure, I understand. Do you want me to verify, if _join_exited_workers
is being called?
I analysed the loop with lots of log statements, but all i can say, is that it most of the time ran into https://github.com/celery/kombu/blob/master/kombu/asynchronous/hub.py#L362, which is normal i guess. So there are just way to many events, that don't seem to disappear. Just as in my original post:
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,502; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,503; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
Can you please verify?
I'm not sure, I understand. Do you want me to verify, if
_join_exited_workers
is being called?
Yes.
Maybe add it to the top of the loop as well...
Is it possible the celery code already has a comment describing this could happen?
https://github.com/celery/celery/pull/5499/files#diff-c80fd9891efbbe68275a133d83cd22a4L456
def _track_child_process(self, proc, hub):
try:
fd = proc._sentinel_poll
except AttributeError:
# we need to duplicate the fd here to carefully
# control when the fd is removed from the process table,
# as once the original fd is closed we cannot unregister
# the fd from epoll(7) anymore, causing a 100% CPU poll loop.
fd = proc._sentinel_poll = os.dup(proc._popen.sentinel)
hub.add_reader(fd, self._event_process_exit, hub, proc)
Can you share the command you run @tuky for launching the celery worker for this bug report? Is it possible you are using heartbeats and don't have --without-mingle --without-gossip ?
Here was an original commit back when Ask was trying to solve this for Celery 3 and he related it back to epoll https://github.com/celery/celery/commit/ca57e722b25f8fca817084ec7562be3698c7ee02
and in the commit he mentions he found out that:
1) epoll_wait always returned an error state for a Popen pipe fd.
2) the worker was trying to unregister this fd from epoll, but
3) epoll.unregister
refused to do so giving an IOError(ENOENT)
error.
So turns out this is an epoll quirk, and the solution is to duplicate the pipe fd
so that we can carefully control when it's removed from the process
file descriptor table.
Can you share the command you run @tuky for launching the celery worker for this bug report?
Thank you for your investigations. Here you go @matteius
python3.6 -m celery worker --loglevel=INFO --app=foo.worker -Ofair --concurrency=1 --logfile=/var/log/celery/foo.log --pidfile=/var/run/celery/foo.pid --hostname=foo@bar
Can you tell from that, whether I'm using heart beats? At least, I am not using the --without-heartbeat
option. Should I try running with all 3 options --without-gossip --without-mingle --without-heartbeat
?
@tuky It would be worth a try for the sake of science, though I suspect now after reading more of the code last night that the issue is inherent to the SQS support and/or just how it uses epoll. I started working on a code change similar to my other PR that was merged and linked in here earlier that would also try and accept these other file descriptor points in the Async pool -- hoping to spend some more time on it later today but it will take time to understand everything going on as there is a lot. Any chance you can work on a TDD style unit test that reproduces the issue of the ENOENT in someway?
Also note (since you asked): my experience with heart beats is on the RabbitMQ AMQP broker where I believe the broker is setup to send heatbeats and in that case you have to have the worker respond within the heatbeat interval or it drops the connection -- we run this way and with the flags -without-gossip --without-mingle because those protocols were way too chatty causing some performance bugs back on Celery 3.
I tried out running with --without-gossip --without-mingle --without-heartbeat
, but without success. @thedrow I also finally verified, that _join_exited_workers
is indeed being called from time to time.
I will try to come up with a test similar to the test of https://github.com/celery/celery/pull/5499/files, but probably will fail, because I know too few details :crossed_fingers:
@tuky Alright well I figured I would give you a chance to write that unit test and focus more on the code change and refactor as it did seemly oddly similar to the other PR that was linked in. So now I have this new PR specially for this ticket -- maybe you can try these changes to see if that resolves the bug or not and we can keep working towards tests. I plan to not write unit tests this weekend as this was above and beyond what I thought I would even get to: https://github.com/celery/celery/pull/5604/files
The effort is much appreciated. However, it does not help in our case. There is no occurrence of Encountered OSError when accessing fd
in my logs. Only when I deactivate the exception swallowing in https://github.com/celery/kombu/blob/master/kombu/asynchronous/hub.py#L245 and https://github.com/celery/kombu/blob/master/kombu/utils/eventio.py#L79, i get this right after starting the worker:
Traceback (most recent call last):
File "venv/src/celery/celery/concurrency/asynpool.py", line 240, in iterate_file_descriptors_safely
hub_method(fd, *hub_args, **hub_kwargs)
File "venv/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 181, in remove
self._remove_from_loop(fd)
File "venv/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 160, in _remove_from_loop
self._unregister(fd)
File "venv/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 243, in _unregister
self.poller.unregister(fd)
File "venv/lib/python3.6/site-packages/kombu/utils/eventio.py", line 75, in unregister
self._epoll.unregister(fd)
FileNotFoundError: [Errno 2] No such file or directory
This fixes the 100% CPU loop, but only 1 can be consumed and then it's dead.
@tuky It would appear that kombu also tracks those same FDs in the asynchronous hub and the envtio _epoll method there may not catch all relevant exceptions. I may try to make a somewhat similar change in kombu to make that code more safe or at least log warnings rather than just pass. For your case though you say it consumed 1 and then died -- well the error is FileNotFoundError and the connection being the file descriptor so something closed or removed a temporary file descriptor after the first connection. Is that FD the connection to SQS? Is there a way for you to specify persistent connections with that aws service?
It is odd that you are getting the FileNotFoundError raised from that iterate_file_descriptors_safely when it in fact is one of the two exceptions it tries to catch. If it had caught it would have print "Encountered OSError when accessing fd" which you said is not there but then there is the FileNotFoundError traceback
It is odd that you are getting the FileNotFoundError raised from that iterate_file_descriptors_safely when it in fact is one of the two exceptions it tries to catch. If it had caught it would have print "Encountered OSError when accessing fd"
I'm sorry, I confused you. The traceback I posted did have the message Encountered OSError when accessing fd
attached to it. I copied the traceback from kibana and the message there is a field separated from the traceback.
Is that FD the connection to SQS?
I don't know, how I could find out / debug this. Do you have a command I could run on my OS or in a python shell or something?
Is there a way for you to specify persistent connections with that aws service?
looking at https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#configuration there is indeed a tcp_keepalive
option. i will try that.
Is that FD the connection to SQS?
I don't know, how I could find out / debug this. Do you have a command I could run on my OS or in a python shell or something?
via
ls -l /proc/4460/fd/
i get
8 -> pipe:[163974]
and then with
lsof | grep 163974
i get
python3.6 4460 ubuntu 7r FIFO 0,10 0t0 163974 pipe
python3.6 4460 ubuntu 8w FIFO 0,10 0t0 163974 pipe
python3.6 4530 ubuntu 7r FIFO 0,10 0t0 163974 pipe
where 4460
is the parent process pid and 4530
is the child. so the fd is the pipe from parent to child process. i am wondering, why the parent is calling unregister
right at the beginning, when there were no tasks processed yet, at all :confused:
Reading https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/ I investigated a bit further and figured, that the linux kernel in use might be too old. We had 4.4 which does not yet provide the EPOLLEXCLUSIVE
feature. But it is not used by default anyway. I upgraded the kernel to 4.5 and activated the flag, when _epoll.register
is called, but that did not work. For now we are using poll
instead of epoll
by hacking
import select
del select.epoll
With poll
I cannot reproduce the 100% CPU issue, at all. With SQS as the queue service it also seems like an unnecessary optimization to use epoll
over poll
. Would you consider adding a setting which makes _get_poller
prefer poll
over epoll
?
That's alarming since all event loop implementations including gevent use epoll.
@jamadden At this point I'd like to bring you to the discussion as you are the main maintainer of gevent.
Is there a bug in gevent which can cause this?
Reading https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/ I investigated a bit further and figured, that the linux kernel in use might be too old. We had 4.4 which does not yet provide the
EPOLLEXCLUSIVE
feature. But it is not used by default anyway. I upgraded the kernel to 4.5 and activated the flag, when_epoll.register
is called, but that did not work. For now we are usingpoll
instead ofepoll
by hackingimport select del select.epoll
With
poll
I cannot reproduce the 100% CPU issue, at all. With SQS as the queue service it also seems like an unnecessary optimization to useepoll
overpoll
. Would you consider adding a setting which makes_get_poller
preferpoll
overepoll
?
I'd rather not expose these details to the users. We need to find the proper fix and implement it.
@tuky I feel like we are really getting somewhere with this and I went back over this entire conversation again today and realized what you said on May 20th is now way more clear. So the Pipe FD between the master celery process and the children are breaking in this configuration when the max tasks per child is reached. It clears the file descriptor out of the list but the new worker gets created with the same file descriptor added back to the list. I was trying to read through the billiard code more to see how the maxtasks feature is implemented.
So,
on_inqueue_close
is being called successfully, but everytime, at least once afterwards,on_process_alive
is being called and the lineadds the
8
back in there.
Clearly the new worker that starts up to replace the old one is calling on on_process_alive (as you stated) and that adds the old file descriptor back (which no longer exists) and then later in the loop it can't communicate with the new worker and nothing restarts it because it doesn't get tasks anymore.
This spot in Billiard is where it seems to create a Worker process using the prior known FDs in _create_worker_process: https://github.com/celery/billiard/blob/master/billiard/pool.py#L1143
I also found this in Billiard where maxtasks is implemented: https://github.com/celery/billiard/blob/master/billiard/pool.py#L389
I do question the order of methods called in Worker __call__ method: https://github.com/celery/billiard/blob/master/billiard/pool.py#L288
First it calls self._make_child_methods() using the self.inq and self.synq before calling self.after_fork() which may call close on the writers of self.inq and self.synq ... not saying that will keep it from adding the old FD back in, but it does make me head scratch, though totally speculative.
Then I looked back to what @thedrow mentioned earlier in the question about is _join_exited_workers being called and I found that it is called from _maintain_pool in billiard which is called two spots, one is in the worker handler body method. It gets called while the process _state is RUN with a periodic sleep of .8 seconds, but I suspect _state could have changed when the worker return a state of EX_RECYCLE from __call__. Plus, the only other place that calls it is the method public method maintain_pool (which is called from the celery AsyncPool on _event_process_exit) but if I am right about the _state variable at that time it won't actually call _maintain_pool because maintain_pool checks if self._worker_handler._state == RUN and self._state == RUN before doing this, so perhaps that important piece of code isn't being called.
So while the exact fix for the repeating file descriptor in the list is unknown, its clear that the old FD is getting added back in Tuky's test output. I wonder with the naming of the exit code to be EX_RECYCLE if the Celery/Billiard/Kombu logic intend to reuse the file descriptor pipes or not -- perhaps they break in the SQS implementation when the worker is restarted? Perhaps it is a bug for more than just SQS like you mentioned some other reports that mentioned Redis and I would be willing to believe that -- except we do use max tasks per child with the AsyncPool and amqp backend and very rarely do the workers get hung up like this on task restarts though I can point to a handful of times that may be what happened. If the file descriptor goes away and it needs to be there for the restart to work, that could explain it. My other PR doesn't do anything to recreate bad FDs it just adds some safety checks around places where we go to use the ones we already know about.
It would be really nice if I had a way to reproduce this locally. Even if I have to setup an AWS SQS sandbox to run against, would be useful to have some sample config and tasks to run without much setup. It might not be a requirement to fix it, but it would possibly help me ask less questions.
It would be really nice if I had a way to reproduce this locally. Even if I have to setup an AWS SQS sandbox to run against, would be useful to have some sample config and tasks to run without much setup.
That would be awesome, thx for your further investigation. I will look into creating a reproduction-repo and provide you with some SQS credentials for a queue.
I invited you @matteius @thedrow and @auvipy to a private repo which is ready to use - including credentials - for an SQS queue. Thank you, I hope you can reproduce the issue with this.
Is this fixed already? Is there any work around for it?
I think I'm having the same/similar issue without "worker_max_tasks_per_child". It seems the pipe between the workers is gone and it get stuck.
I'm running python3.7-alpine with:
celery[sqs]==4.4.0rc4
kombu==4.6.6
It is a simple task which has a single parameter which is an UUID. max_retries is set to None and retry_backoff=2 with task_acks_late=True and task_acks_on_failure_or_timeout=False.
The task in my case always raises an exception to keep the message on the queue for testing purposes.
/code # ps aux
PID USER TIME COMMAND
1 root 0:00 {start-celery.sh} /bin/sh /code/docker/start-celery.sh
8 root 0:01 {celery} /usr/local/bin/python /usr/local/bin/celery -A my_module worker -l debug -c1
11 root 0:00 {celery} /usr/local/bin/python /usr/local/bin/celery -A my_module worker -l debug -c1
$ strace -fp 11 -s 10000
strace: Process 11 attached
read(8,
$ ls -l /proc/11/fd/8
lr-x------ 1 root root 64 Nov 30 04:10 /proc/11/fd/8 -> pipe:[23027111]
# (find /proc -type l | xargs ls -l | fgrep 'pipe:[23027111]') 2>/dev/null
lr-x------ 1 root root 64 Nov 30 04:10 /proc/11/fd/8 -> pipe:[23027111]
lr-x------ 1 root root 64 Nov 30 04:10 /proc/11/task/11/fd/8 -> pipe:[23027111]
lr-x------ 1 root root 64 Nov 30 04:10 /proc/8/fd/8 -> pipe:[23027111]
l-wx------ 1 root root 64 Nov 30 04:10 /proc/8/fd/9 -> pipe:[23027111]
lr-x------ 1 root root 64 Nov 30 04:10 /proc/8/task/8/fd/8 -> pipe:[23027111]
l-wx------ 1 root root 64 Nov 30 04:10 /proc/8/task/8/fd/9 -> pipe:[23027111]
$ strace -fp 8 -s 10000
# Normal messages
epoll_ctl(7, EPOLL_CTL_ADD, 10, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=10, u64=139878494896138}}) = -1 EEXIST (File exists)
mmap(NULL, 262144, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f3815615000
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
epoll_pwait(7, [], 1023, 666, NULL, 8) = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
sysinfo({uptime=58265, loads=[112224, 126656, 139264], totalram=24973213696, freeram=880857088, sharedram=1203023872, bufferram=2054041600, totalswap=8258580480, freeswap=7548694528, procs=2268, totalhigh=0, freehigh=0, mem_unit=1}) = 0
epoll_pwait(7, [], 1023, 67, NULL, 8) = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
epoll_pwait(7, [], 1023, 898, NULL, 8) = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
wait4(11, 0x7ffdfeee5e34, WNOHANG, NULL) = 0
epoll_pwait(7, [], 1023, 100, NULL, 8) = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
... same message
(as you can see here the message repeat it self forever.)
@tuky I have been dealing with a similar setup: celery with SQS as the broker. We notice that when a worker terminates, the new worker comes online but CPU usage spikes to 100%.
After a lot of trial and error, downgrading python worked and CPU spikes disappeared. (Base docker image changed from python:3.6
(latest?) -> python:3.6.8
). I understand this is not a fix, just putting it out here in case it helps anyone or with the investigation itself.
@tuky I have been dealing with a similar setup: celery with SQS as the broker. We notice that when a worker terminates, the new worker comes online but CPU usage spikes to 100%.
After a lot of trial and error, downgrading python worked and CPU spikes disappeared. (Base docker image changed from
python:3.6
(latest?) ->python:3.6.8
). I understand this is not a fix, just putting it out here in case it helps anyone or with the investigation itself.
Thank you @code-haven.
Has something changed between 3.6.8 & 3.6.10 that might affect this?
This is the path to investigate.
I had tried with python:3.6.8-slim (docker) and celery==4.2.2 but celery still running with the same issue when worker reload after consuming max_task_per_child.
The PyPI infrastructure began seeing this behavior when upgrading from python:3.7.3-slim-stretch
to python:3.8.2-slim-buster
in https://github.com/pypa/warehouse/pull/7828/files.
We're using SQS with --max-tasks-per-child
.
With python:3.8.2-slim-buster
, kombu==4.6.8
and pycurl==7.43.0.2
the issue is always reproducible like this:
After some stracing, I can see that in my case, the file which breaks epoll is not a pipe between processes, but a socket to SQS. So maybe the cause is unrelated to that one which the author originally had.
As I see, epoll breaks right after closing the SQS socket (and kombu does not delete the socket from epoll, and as we all already know, it is wrong). When we start our main loop, Pycurl establishes Keepalive HTTPS connection to SQS. Then Kombu sets a periodic routine, which sends http request give me some new messages
over that connection. But after several minutes from the start, curl suddenly decides to close it. So when Kombu regains the control inside CurlClient._handle_socket()
callback, it has an already closed file descriptor, which cannot be removed from epoll.
Without --max-tasks-per-child
it would still work - epoll holds sort of weak references to file objects, so when all strong references are destroyed, the file is also deleted from epoll. But in our case new workers are spawned by forking from the main process, which means the worker gets all open file descriptors from the main one.
I met this issue about a year ago and managed to work around it in an extremely dumb but an extremely effective way: just go to kombu.asynchronous.hub.Hub.create_loop()
and insert sleep()
after every N iterations 🙂
I met this issue about a year ago and managed to work around it in an extremely dumb but an extremely effective way: just go to
kombu.asynchronous.hub.Hub.create_loop()
and insertsleep()
after every N iterations slightly_smiling_face
would you mind sending a PR on kombu?
The PyPI infrastructure began seeing this behavior when upgrading from
python:3.7.3-slim-stretch
topython:3.8.2-slim-buster
in https://github.com/pypa/warehouse/pull/7828/files.We're using SQS with
--max-tasks-per-child
.
would you mind checking the proposed fix in kombu? https://github.com/celery/kombu/pull/1189
this is exciting news. will check this next month, if it fixes everything for us, too.
Most helpful comment
So,
on_inqueue_close
is being called successfully, but everytime, at least once afterwards,on_process_alive
is being called and the line https://github.com/celery/celery/blob/e7ae4290ef044de4ead45314d8fe2b190e497322/celery/concurrency/asynpool.py#L1083 adds the8
back in there.