Celery: OSError: Socket closed in celery worker with eventlet

Created on 14 Jun 2018  路  23Comments  路  Source: celery/celery

Celery version: 4.2.0

Steps to reproduce

docker-compose.yml

version: '2'

services:
  worker:
    build: .
    depends_on:
      - rabbitmq

  rabbitmq:
    image: rabbitmq:alpine

Dockerfile

FROM alpine

RUN apk add --no-cache build-base python3 python3-dev
RUN pip3 install celery eventlet

CMD celery -b amqp://rabbitmq worker -P eventlet --loglevel=DEBUG
$ docker-compose up --build

Expected behavior

Worker stays connected without errors

Actual behavior

3 minutes after startup, the worker reports several errors with a Traceback, similar to the following:

[2018-06-14 20:11:44,213: WARNING/MainProcess] Traceback (most recent call last):
[2018-06-14 20:11:44,213: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/eventlet/hubs/poll.py", line 114, in wait
    listener.cb(fileno)
[2018-06-14 20:11:44,214: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/celery/worker/pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2018-06-14 20:11:44,214: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 301, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2018-06-14 20:11:44,215: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2018-06-14 20:11:44,216: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2018-06-14 20:11:44,217: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2018-06-14 20:11:44,217: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2018-06-14 20:11:44,218: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 426, in _read
    raise IOError('Socket closed')
[2018-06-14 20:11:44,218: WARNING/MainProcess] OSError: Socket closed
[2018-06-14 20:11:44,219: WARNING/MainProcess] Removing descriptor: 7

In addition, the rabbitmq server reports warnings similar to the following (repeated twice):

2018-06-14 20:11:44.209 [warning] <0.586.0> closing AMQP connection <0.586.0> (172.19.0.3:42678 -> 172.19.0.2:5672):
missed heartbeats from client, timeout: 60s
Eventlet Workers Pool Gevent Workers Pool RabbitMQ Broker Bug Report

Most helpful comment

@auvipy why did you close this? The problem still exists.

All 23 comments

Note that when using prefork instead of eventlet, the rabbitmq server still reports the missed heartbeats warning, but there are no warnings printed in the worker:

2018-06-14 20:36:04.807 [warning] <0.698.0> closing AMQP connection <0.698.0> (172.19.0.3:46040 -> 172.19.0.2:5672):
missed heartbeats from client, timeout: 60s

And gevent fails with a similar error message to eventlet:

[2018-06-14 20:43:29,908: WARNING/MainProcess] Traceback (most recent call last):
[2018-06-14 20:43:29,908: WARNING/MainProcess] File "src/gevent/_waiter.py", line 119, in gevent.__waiter.Waiter.switch
[2018-06-14 20:43:29,908: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/celery/worker/pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2018-06-14 20:43:29,909: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 301, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2018-06-14 20:43:29,909: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2018-06-14 20:43:29,909: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2018-06-14 20:43:29,910: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2018-06-14 20:43:29,910: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2018-06-14 20:43:29,911: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/amqp/transport.py", line 418, in _read
    s = recv(n - len(rbuf))
[2018-06-14 20:43:29,911: WARNING/MainProcess] File "/usr/lib/python3.6/site-packages/gevent/_socket3.py", line 380, in recv
    return _socket.socket.recv(self._sock, *args)
[2018-06-14 20:43:29,912: WARNING/MainProcess] ConnectionResetError: [Errno 104] Connection reset by peer
[2018-06-14 20:43:29,912: WARNING/MainProcess] 2018-06-14T20:43:29Z
[2018-06-14 20:43:29,913: WARNING/MainProcess] <built-in method switch of gevent.__greenlet_primitives.TrackedRawGreenlet object at 0x7f791830ce88> failed with ConnectionResetError

Also, I can confirm that I see the same behavior on the latest development versions of all libraries, using this Dockerfile:

FROM alpine

RUN apk add --no-cache build-base python3 python3-dev
RUN pip3 install https://github.com/celery/celery/zipball/master#egg=celery https://github.com/celery/billiard/zipball/master#egg=billiard https://github.com/celery/py-amqp/zipball/master#egg=amqp https://github.com/celery/kombu/zipball/master#egg=kombu https://github.com/celery/vine/zipball/master#egg=vine eventlet gevent

CMD celery -b amqp://rabbitmq worker -P eventlet --loglevel=DEBUG

This matches what we are experiencing. On a single queue, we have multiple workers task_acks_late = True and worker_prefetch_multiplier = 1 and we have mixture of long-lasting tasks and short ones.

Switching back to Celery 3.x makes this issue "disappear"

FYI

At first, retries weren't working when using celery 4.1.0 and kombu 4.1.0. After updating both to 4.2.0 and 4.2.1 respectively, retries started to work but same timeout messages started to appear and tasks seemed to be delivered correctly but never processed by workers. We are using prefork

-amqp==2.2.2
+amqp==1.4.9
-billiard==3.5.0.3
+billiard==3.3.0.23
-celery==4.2.0
+celery==3.1.23
-kombu==4.2.1
+kombu==3.0.34
-pyramid-celery==3.0.0
+pyramid-celery==2.0.0

The broker connection uses the heartbeat setting from app config file by default (since version 4.2.0, #4148).

Configuration and defaults - Documentation

broker_heartbeat

transports supported:

  • pyamqp

Default: 120.0 (negotiated by server).

You can try to set broker_heartbeat=0. Hope this helps.

@y0ngdi I set broker_heartbeat=0, there is a reduction in the error, but it still exists.

amqp==2.3.2
celery==4.2.0
gevent==1.3.4
greenlet==0.4.13
kombu==4.2.0
the worker reports several errors with a Traceback:
[2018-06-22 09:41:21,467: INFO/MainProcess] sync with celery@device_producer_tasks
[2018-06-22 09:41:21,468: ERROR/MainProcess] Control command error: error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/celery/worker/pidbox.py", line 46, in on_message
    self.node.handle_message(body, message)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 129, in handle_message
    return self.dispatch(**body)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 112, in dispatch
    ticket=ticket)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 135, in reply
    serializer=self.mailbox.serializer)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/pidbox.py", line 265, in _publish_reply
    **opts
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/channel.py", line 1732, in _basic_publish
    (0, exchange, routing_key, mandatory, immediate), msg
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/home/ubuntu/.local/share/virtualenvs/backend-uRCQ3Clv/local/lib/python2.7/site-packages/amqp/transport.py", line 275, in write
    self._write(s)
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe

I encounterd this issue with celery 4.2 and eventlet but no perfect solution found

I did a little investigation. Celery does create a new connection but does not call heartbeat_check for it. This results into the RabbitMQ server closing the connection. I have experienced this error with the command connection (e.g. stats, ping, ..)

Minimal example:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery(
    'myapp',
    broker='amqp://guest@localhost//',
)
app.conf.broker_heartbeat = 5


if __name__ == '__main__':
    app.start()

Run the example

$ python3 example.py worker -l DEBUG

RabbitMQ logs

2018-06-27 19:49:41.109 [info] <0.471.1> accepting AMQP connection <0.471.1> (127.0.0.1:42744 -> 127.0.0.1:5672)
2018-06-27 19:49:41.111 [info] <0.471.1> connection <0.471.1> (127.0.0.1:42744 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'
2018-06-27 19:49:41.116 [info] <0.479.1> accepting AMQP connection <0.479.1> (127.0.0.1:42746 -> 127.0.0.1:5672)
2018-06-27 19:49:41.118 [info] <0.479.1> connection <0.479.1> (127.0.0.1:42746 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'
2018-06-27 19:49:41.131 [info] <0.500.1> accepting AMQP connection <0.500.1> (127.0.0.1:42748 -> 127.0.0.1:5672)
2018-06-27 19:49:41.133 [info] <0.500.1> connection <0.500.1> (127.0.0.1:42748 -> 127.0.0.1:5672): user 'guest' authenticated and granted access to vhost '/'
2018-06-27 19:49:56.135 [warning] <0.500.1> closing AMQP connection <0.500.1> (127.0.0.1:42748 -> 127.0.0.1:5672):
missed heartbeats from client, timeout: 5s

Attempt to send a command after the connection has been closed by RabbitMQ:

$ celery inspect ping

Celery worker log prints this when it receives the command:

[2018-06-27 19:51:25,638: DEBUG/MainProcess] pidbox received method ping() [reply_to:{'exchange': 'reply.celery.pidbox', 'routing_key': '5a7fe1f1-be67-397f-879c-d939ea3c076e'} ticket:d80183f3-a236-4057-841b-6b8cd2926917]
[2018-06-27 19:51:25,639: ERROR/MainProcess] Control command error: ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/celery/worker/pidbox.py", line 46, in on_message
    self.node.handle_message(body, message)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 129, in handle_message
    return self.dispatch(**body)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 112, in dispatch
    ticket=ticket)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 135, in reply
    serializer=self.mailbox.serializer)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/pidbox.py", line 265, in _publish_reply
    **opts
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 194, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/common.py", line 129, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel, orig)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/common.py", line 135, in _maybe_declare
    entity.declare(channel=channel)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/kombu/entity.py", line 185, in declare
    nowait=nowait, passive=passive,
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/channel.py", line 614, in exchange_declare
    wait=None if nowait else spec.Exchange.DeclareOk,
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/home/bar/Desktop/foo/virtualenv/lib/python3.6/site-packages/amqp/transport.py", line 275, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-06-27 19:51:25,647: DEBUG/MainProcess] Closed channel #2
[2018-06-27 19:51:25,647: DEBUG/MainProcess] using channel_id: 2
[2018-06-27 19:51:25,648: DEBUG/MainProcess] Channel open

Possible cause of error

I think there exists a connection that is terminated after from the server side because kombu.connection.Connection.heartbeat_check() is not called repetitively on the instance.

I'm seeing the same issue with gevent.

Workaround with broker_heartbeat = 0 works for me.

Installed versions:

amqp==2.2.2
celery==4.2.0
eventlet==0.23.0
kombu==4.2.1

Worker:
celery -A celery_app worker --loglevel=info -P eventlet

P.S. Tested on Windows 10, with Python 2 & 3.

@auvipy : Why closing ? The issue is not fixed...

did @stojan-jovic workaround work?

It seems like yes, but a workaround is not bugfix.
And it still lead to a ton of time wasted looking for a solution by everyday more people.

if you come up with a possible fix then it will be re opened.

Workaround with broker_heartbeat = 0 seems to work for me for celery 4.2.0 and gevent 1.2.2 (python 2) / RabbitMQ 3.7.8. At least it didn't hang up on the workers at the default 60s timeout for me.

Django 1.8.4, Python 2.7.12
Issue appeared after upgrade from 3.1.25 to 4.2.1
CELERY_BROKER_HEARTBEAT = 0 in settings.py solved this.

@auvipy why did you close this? The problem still exists.

@djlambert He is known to close issues/prs for no reason.

Any updates on this ? A work-around is not a bug fix.

feel free to investigate the issue @gvdmarck as you are facing this, it would be easier for you to find out the root cause.

Django 1.8.4, Python 2.7.12
Issue appeared after upgrade from 3.1.25 to 4.2.1
CELERY_BROKER_HEARTBEAT = 0 in settings.py solved this.

should be BROKER_HEARTBEAT instead of CELERY_BROKER_HEARTBEAT?

amqp==2.2.2

I am trying to run celery worker with this configuration, but celery worker is not starting up.
The command I am using is as follows
pipenv run celery worker -A src.celery_app -l debug -P eventlet
But I don't see any celery startup log, or anything it directly gets back to windows cmd prompt.

@stojan-jovic , can you let me know how to setup the environment for celery.

Was this page helpful?
0 / 5 - 0 ratings