Distributed: `dask.distributed.Client` fails to spin down gracefully in its context manager

Created on 14 Feb 2018  Â·  7Comments  Â·  Source: dask/distributed

Versions

dask==0.16.1
distributed==1.20.2
tornado==4.5.3

Issue

I'm using dask.distributed.Client in a pytest.fixture, as follows:

import pytest
import dask.distributed.Client


@pytest.fixture(scope='module')
def client():
    with Client(n_workers=4) as dask_client:
        yield dask_client

def test_some_function(client):
    ...

def test_some_other_function(client):
   ...

Sometimes, when the test suite finishes, I get the error shown at the bottom of this ticket. This leads me to believe the test suite finishes before the Client teardown is actually complete, which seems like it could only happen if the Client's __exit__ implementation returns too early. I also get the same error message if I simply execute the following, which seems to indicate the same.

while True:
    with Client(n_workers=4) as dask_client:
        # do something with dask_client
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x1a150a7510>, <tornado.concurrent.Future object at 0x1a14fac748>)
Traceback (most recent call last):
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/comm/tcp.py", line 174, in read
    n_frames = yield stream.read_bytes(8)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/iostream.py", line 324, in read_bytes
    self._try_inline_read()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/iostream.py", line 709, in _try_inline_read
    self._check_closed()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/iostream.py", line 925, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/core.py", line 464, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/core.py", line 350, in send_recv
    response = yield comm.read()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
    yielded = next(result)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/comm/tcp.py", line 188, in read
    convert_stream_closed_error(self, e)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/client.py", line 804, in _update_scheduler_info
    self._scheduler_identity = yield self.scheduler.identity()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/core.py", line 467, in send_recv_from_rpc
    % (e, key,))
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe: while trying to call remote method 'identity'

Most helpful comment

I'm experiencing this behavior when running Dask 2.6 in docker, but only intermittently. It appears that python or docker is not waiting for the dask cluster to wrap up it's child processes? Or maybe this is a symptom of the docker zombie reaping problem. Anyone have ideas or solutions?

EDIT: Details are important.

dask==2.6.0
distributed==1.26.0
tornado==6.0.3
dask_cluster = LocalCluster(processes=True, threads_per_worker=1)
    with Client(dask_cluster) as dask_client:
        exit_code = run_distributed_processing(args.manifest_file, dask_client, db_state).value
        dask_client.wait()

dask_cluster.close()

Same exact errors as @macks22 experienced.

EDIT 2: Terrible workaround. I tried putting a sleep(5) at the end of the script and it drastically reduced (but did not eliminate) the number of errors but obviously didn't actually fix anything.

All 7 comments

Can I ask you to try this again on the recent release of dask? Either

conda install dask
pip install dask[distributed] --upgrade

On Wed, Feb 14, 2018 at 8:58 AM, Mack notifications@github.com wrote:

Versions

dask==0.16.1
distributed==1.20.2
tornado==4.5.3

Issue

I'm using dask.distributed.Client in a pytest.fixture, as follows:

import pytest
import dask.distributed.Client

@pytest.fixture(scope='module')
def client():
with Client(n_workers=4) as dask_client:
yield dask_client

def test_some_function(client):
...

def test_some_other_function(client):
...

Sometimes, when the test suite finishes, I get the error shown at the
bottom of this ticket. This leads me to believe the test suite finishes
before the Client teardown is actually complete, which seems like it could
only happen if the Client's __exit__ implementation returns too early. I
also get the same error message if I simply execute the following, which
seems to indicate the same.

while True:
with Client(n_workers=4) as dask_client:
# do something with dask_client

tornado.application - ERROR - Exception in callback functools.partial(.null_wrapper at 0x1a150a7510>, )
Traceback (most recent call last):
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/comm/tcp.py", line 174, in read
n_frames = yield stream.read_bytes(8)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/iostream.py", line 324, in read_bytes
self._try_inline_read()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/iostream.py", line 709, in _try_inline_read
self._check_closed()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/iostream.py", line 925, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/core.py", line 464, in send_recv_from_rpc
result = yield send_recv(comm=comm, op=key, *kwargs)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "", line 4, in raise_exc_info
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(
exc_info)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/core.py", line 350, in send_recv
response = yield comm.read()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "", line 4, in raise_exc_info
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/comm/tcp.py", line 188, in read
convert_stream_closed_error(self, e)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in : BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/ioloop.py", line 605, in _run_callback
ret = callback()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/stack_context.py", line 277, in null_wrapper
return fn(args, *kwargs)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
future.result()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "", line 4, in raise_exc_info
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(exc_info)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/client.py", line 804, in _update_scheduler_info
self._scheduler_identity = yield self.scheduler.identity()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "", line 4, in raise_exc_info
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(
exc_info)
File "/Users/vru959/anaconda2/envs/py3k/lib/python3.6/site-packages/distributed/core.py", line 467, in send_recv_from_rpc
% (e, key,))
distributed.comm.core.CommClosedError: in : BrokenPipeError: [Errno 32] Broken pipe: while trying to call remote method 'identity'

—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1761, or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszFiOg0ryu6bAlA2QeScE1uFik2yeks5tUuaBgaJpZM4SFWso
.

That seemed to resolve it. Thanks! So I guess between 0.16.0 and 0.17.0, there was some bug fix that addressed this?

Yes

On Fri, Feb 16, 2018 at 11:28 AM, Mack notifications@github.com wrote:

That seemed to resolve it. Thanks! So I guess between 0.16.0 and 0.17.0,
there was some bug fix that addressed this?

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1761#issuecomment-366285058,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszFVi6Yl4ZhXODn8XVG4Q7ZZIiO5hks5tVayhgaJpZM4SFWso
.

Glad this worked. I'm going to close for now. Please reach out again if the problem recurs.

I'm experiencing this behavior when running Dask 2.6 in docker, but only intermittently. It appears that python or docker is not waiting for the dask cluster to wrap up it's child processes? Or maybe this is a symptom of the docker zombie reaping problem. Anyone have ideas or solutions?

EDIT: Details are important.

dask==2.6.0
distributed==1.26.0
tornado==6.0.3
dask_cluster = LocalCluster(processes=True, threads_per_worker=1)
    with Client(dask_cluster) as dask_client:
        exit_code = run_distributed_processing(args.manifest_file, dask_client, db_state).value
        dask_client.wait()

dask_cluster.close()

Same exact errors as @macks22 experienced.

EDIT 2: Terrible workaround. I tried putting a sleep(5) at the end of the script and it drastically reduced (but did not eliminate) the number of errors but obviously didn't actually fix anything.

@macks22 this issue is closed. If you think that you have found a bug then please raise a new issue. If you're able to provide a minimal reproducible example, that would be very helpful.

Closed issues tend not to be tracked as effectively, especially when the volume of issues is high.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

fjetter picture fjetter  Â·  6Comments

mrocklin picture mrocklin  Â·  6Comments

tom-andersson picture tom-andersson  Â·  3Comments

mrocklin picture mrocklin  Â·  3Comments

mberglundmx picture mberglundmx  Â·  7Comments