Distributed: "Stream is closed"

Created on 31 Dec 2017  Â·  37Comments  Â·  Source: dask/distributed

I am having workers die with the following error messages.

This occurs when I am trying to .persist() large xarray dataset into memory. (There is more than enough memory in the cluster for the dataset by a factor of 5.)

I don't know what these errors mean, other than that the workers have died. Advice would be appreciated on how to debug more effectively.

distributed.worker - ERROR - failed during get data
Traceback (most recent call last):
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 221, in write
    yield future
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/worker.py", line 524, in get_data
    compressed = yield comm.write(msg)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 225, in write
    convert_stream_closed_error(self, e)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.core - WARNING - Lost connection to 'tcp://10.43.8.25:48251': in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.worker - ERROR - Worker stream died during communication: tcp://10.43.4.25:53234
Traceback (most recent call last):
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 182, in read
    frame = yield stream.read_bytes(length)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/worker.py", line 1763, in gather_dep
    who=self.address)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 516, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 350, in send_recv
    response = yield comm.read()
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 188, in read
    convert_stream_closed_error(self, e)
  File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

Most helpful comment

Just commenting to say that I was having this same issue for a long time (I was trying to do something similar to Ryan (and to Julius over at https://github.com/pangeo-data/pangeo/issues/757), using xarray.open_mfdataset to open and load data from hundreds of netCDF files), but after updating to dask v2.30.0 and dask-labextension v3.0.0 it seems to be working now! I'm very pleased.

All 37 comments

Unfortunately these don't give us very much information. While sending data between workers the connection was cut. I don't suppose the worker logs are any more informative?

That is the worker log.

I think it is something related to spill-to-disk. If I make the dataset
smaller by a factor of two, everything works fine.

I will try to find a more reproducible example. Sorry for the vagueness.

On Sun, Dec 31, 2017 at 11:55 AM, Matthew Rocklin notifications@github.com
wrote:

Unfortunately these don't give us very much information. While sending
data between workers the connection was cut. I don't suppose the worker
logs are any more informative?

—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1674#issuecomment-354614156,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ABJFJtyQye8d4DRhrjDWrAFYaVr8Lw-Yks5tF7ySgaJpZM4RP0N0
.

There is more than enough memory in the cluster for the dataset by a factor of 5.

I think it is something related to spill-to-disk. If I make the dataset smaller by a factor of two, everything works fine.

These two statements together confuse me. Does it spill to disk? Does the worker itself fail?

That is the worker log.

The log from the other worker might be interesting to see

The worker does fail. I will have to check about spilling to disk.

I am going to try a test case where I just persist a big random array and
see if I can trigger the failer.

On Sun, Dec 31, 2017 at 12:57 PM, Matthew Rocklin notifications@github.com
wrote:

There is more than enough memory in the cluster for the dataset by a
factor of 5.

I think it is something related to spill-to-disk. If I make the dataset
smaller by a factor of two, everything works fine.

These two statements together confuse me. Does it spill to disk? Does the
worker itself fail?

—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1674#issuecomment-354616836,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ABJFJk6hrvpz1U2IWWPljfU6lEknLjVkks5tF8smgaJpZM4RP0N0
.

i am also having the same problem . csv is just 582 mb . and i am using 9 c3.xlarge instances as workers with 4 process and 1 c3.xlarge as scheduler .
but i am doing lot of dataframe operation and running some functions on them parallely.

code.txt

that is the actual code and cmd line arguments are scheduler ip with port and no of chunks to run at a time (eg code.py x.x.x.x:yyyy 1000) and substitute the s3 path with the below csv file
https://drive.google.com/open?id=1BMHxeRBpq_dWN_fhffP5O2JQ3DdSIYVs

log file
dask_log.txt

i could see this error in my terminal
`tornado.application - ERROR - Exception in callback functools.partial(.null_wrapper at 0x7fb3565c4730>, {'keys': ["('loc-d5223415298072e7ae9e98e8b84112ea', 0)"], 'op': 'client-releases-keys', 'client': 'Client-5ed5b65c-f475-11e7-9312-704d7b70e106'})

Traceback (most recent call last):
File "/home/lib/python3.5/site-packages/tornado/ioloop.py", line 605, in _run_callback ret = callback()
File "/home/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper return fn(args, *kwargs)
File "/homelib/python3.5/site-packages/distributed/client.py",
line 677, in _send_to_scheduler_safe self.scheduler_comm.send(msg)

AttributeError: 'NoneType' object has no attribute 'send'`

`

`

created a new issue #1688 so it can be discussed separately

Were you able to reproduce this on random data, @rabernat?

Sorry, I have no updates on this issue right now.

FWIW ran into this issue and issue ( https://github.com/dask/distributed/issues/1736 ) recently (surprised I hadn’t run into these sooner). Was able to sort out both by having dask-worker write to node local scratch space. So might be worth trying if you haven’t already.

@rabernat I am seeing this error associated with processes that: (1) do not release the GIL and (2) are spilling to disk. I believe spilling also holds the worker GIL, so it may be that it is a general issue with a connection timing out due to (apparent) lack of connectivity.

I have also had this issue using the streamz.dask module. It took a couple runs but here is the code that eventually produced the error.

from dask.distributed import Client

client = Client('tcp://127.0.0.1:40990')
from streamz import Stream
import operator as op
import numpy as np

s1 = Stream()
s2 = Stream()
d1 = s1.scatter()
d2 = s2.scatter()
res = d1.combine_latest(d2).starmap(op.sub)

lres = res.buffer(10).gather()
L = lres.sink_to_list()

ll = []
s2.emit(np.ones((10, 10)))
for i in range(10):
    s1.emit(np.ones((10, 10)))
    ll.append(i)
# sleep(5)
print('hi')
print(len(L))
print(len(ll))


The resulting error message

/home/christopher/mc/envs/dp_dev/bin/python /home/christopher/.PyCharm2018.1/config/scratches/scratch_198.py
hi
9
10
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f8c6b0090d0>, <tornado.concurrent.Future object at 0x7f8c6a7c8470>)
Traceback (most recent call last):
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/comm/tcp.py", line 177, in read
    n_frames = yield stream.read_bytes(8)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/core.py", line 552, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/core.py", line 432, in send_recv
    response = yield comm.read(deserializers=deserializers)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/comm/tcp.py", line 198, in read
    convert_stream_closed_error(self, e)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/christopher/dev/streamz/streamz/core.py", line 886, in cb
    yield self._emit(x)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 828, in callback
    result_list.append(f.result())
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/christopher/dev/streamz/streamz/dask.py", line 123, in update
    result = yield client.gather(x, asynchronous=True)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/client.py", line 1468, in _gather
    response = yield self.scheduler.gather(keys=keys)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/core.py", line 555, in send_recv_from_rpc
    % (e, key,))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed: while trying to call remote method 'gather'

Process finished with exit code 0

conda env

# Name                    Version                   Build  Channel
dask                      0.18.1                     py_0    conda-forge
dask-core                 0.18.1                     py_0    conda-forge

I don't think that this should be spilling to disk (I have enough memory to hold 11 10x10 arrays). Nor is there anything special happening with the GIL (just regular numpy subtraction).

Any updates on this?

There hasn't been any comment since the last comment on July 9th.

The next step for this issue is for someone to generate a minimal failing example that demonstrates the issues with as few extra pieces as possible.

I have two laptop both linux (Ubuntu 4 GB RAM). One is worker and other has client and scheduler. When I tried to read a csv file of 8GB I got the following error in jupyter notebook. Everything was working when I read a file of 25 MB size.

Code:

futures = train.map_partitions(lambda part: client.scatter([part])[0]).compute(scheduler='threads')  # single threaded local scheduler

train = dd.from_delayed(list(futures), meta=train.apply(lambda x: x, axis=1))
**Error Message in Jupyter Notebook:**
`StreamClosedError                         Traceback (most recent call last)
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    193         try:
--> 194             n_frames = yield stream.read_bytes(8)
    195             n_frames = struct.unpack("Q", n_frames)[0]

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:

StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

CommClosedError                           Traceback (most recent call last)
<ipython-input-7-f19dd6fd3721> in <module>()
----> 1 futures = train.map_partitions(lambda part: client.scatter([part])[0]).compute(scheduler='threads')  # single threaded local scheduler
      2 
      3 train = dd.from_delayed(list(futures), meta=train.apply(lambda x: x, axis=1))

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         pack_exception=pack_exception, **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    460                         _execute_task(task, data)  # Re-execute locally
    461                     else:
--> 462                         raise_exception(exc, tb)
    463                 res, worker_id = loads(res_info)
    464                 state['cache'][key] = res

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
    110         if exc.__traceback__ is not tb:
    111             raise exc.with_traceback(tb)
--> 112         raise exc
    113 
    114     import pickle as cPickle

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/optimization.py in __call__(self, *args)
    940                              % (len(self.inkeys), len(args)))
    941         return core.get(self.dsk, self.outkey,
--> 942                         dict(zip(self.inkeys, args)))
    943 
    944     def __reduce__(self):

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in get(dsk, out, cache)
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/compatibility.py in apply(func, args, kwargs)
     91     def apply(func, args, kwargs=None):
     92         if kwargs:
---> 93             return func(*args, **kwargs)
     94         else:
     95             return func(*args)

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(*args, **kwargs)
   3877     func = kwargs.pop('_func')
   3878     meta = kwargs.pop('_meta')
-> 3879     df = func(*args, **kwargs)
   3880     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   3881         if not len(df):

<ipython-input-7-f19dd6fd3721> in <lambda>(part)
----> 1 futures = train.map_partitions(lambda part: client.scatter([part])[0]).compute(scheduler='threads')  # single threaded local scheduler
      2 
      3 train = dd.from_delayed(list(futures), meta=train.apply(lambda x: x, axis=1))

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, maxsize, timeout, asynchronous)
   2069                 timeout=timeout,
   2070                 asynchronous=asynchronous,
-> 2071                 hash=hash,
   2072             )
   2073 

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    751             return future
    752         else:
--> 753             return sync(self.loop, func, *args, **kwargs)
    754 
    755     def __repr__(self):

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    329             e.wait(10)
    330     if error[0]:
--> 331         six.reraise(*error[0])
    332     else:
    333         return result[0]

~/anaconda3/envs/py36/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/utils.py in f()
    314             if timeout is not None:
    315                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316             result[0] = yield future
    317         except Exception as exc:
    318             error[0] = sys.exc_info()

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
   1914                     client=self.id,
   1915                     broadcast=broadcast,
-> 1916                     timeout=timeout,
   1917                 )
   1918 

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    737             name, comm.name = comm.name, "ConnectionPool." + key
    738             try:
--> 739                 result = yield send_recv(comm=comm, op=key, **kwargs)
    740             finally:
    741                 self.pool.reuse(self.addr, comm)

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    531         yield comm.write(msg, serializers=serializers, on_error="raise")
    532         if reply:
--> 533             response = yield comm.read(deserializers=deserializers)
    534         else:
    535             response = None

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    212             self.stream = None
    213             if not shutting_down():
--> 214                 convert_stream_closed_error(self, e)
    215         else:
    216             try:

~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
    139         raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
    140     else:
--> 141         raise CommClosedError("in %s: %s" % (obj, exc))
    142 
    143 

CommClosedError: in <closed TCP>: Stream is closed`

**Error Message in Scheduler terminal:**
`distributed.core - INFO - Starting established connection
distributed.core - INFO - Lost connection to 'tcp://192.168.1.104:43558': in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
distributed.core - INFO - Lost connection to 'tcp://192.168.1.104:43560': in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
distributed.core - INFO - Lost connection to 'tcp://192.168.1.104:43570': in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
distributed.utils_perf - INFO - full garbage collection released 291.62 MB from 3109 reference cycles (threshold: 10.00 MB)

I'm experiencing this as well when working with a large number of workers (>200) across many nodes. My worker logs are indeed full of these:

 Event loop was unresponsive in Worker for 61.83s.  This is often caused by long-running 
GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

How can I fix this? Some of my functions have a lot of python code (using numpy) so I don't know exactly why the GIL is being held. Who is moving large chunks of data? My partitions are around ~100MB in size and are being read over a CIFS file share (using the LocalFileSystem with paths such as \\servername\path-to.parquet)

I don't know exactly why the GIL is being held. Who is moving large chunks of data?

Sorry, the warning doesn't say this. It says that these are two common causes, not that they have occurred.

Something is blocking the event loop for about a minute. Unfortunately Dask can't tell you why this was. It's often something to do with the code that you're asking Dask to run. I don't know how to help you without knowing a lot more about your application, and diving in that deeply probably doesn't make sense for a public github issue.

Understood. What is the proper design if my code IS a GIL holding function. Should I wrap it in a function that forks a process and await it's result? Wouldn't that cause havoc in the worker?

IMO it's not that uncommon to have custom code (e.g for .apply and .map_partitions) that is not very GIL friendly.

Should I wrap it in a function that forks a process and await it's result? Wouldn't that cause havoc in the worker?

I don't know why it would cause havoc, but you mind find that it was quite slow.

IMO it's not that uncommon to have custom code (e.g for .apply and .map_partitions) that is not very GIL friendly.

The most often solution here is to change the code to release the GIL. This is pretty easy today with most Python wrapping systems (and increasingly becoming the default).

Alternatively, you can ignore the warnings.

This conversation has gotten pretty off-topic from the original issue here. If you have more to discuss I recommend raising a separate issue.

I don't have a minimal example, but I do have a reproducible one.

import xarray as xr
from dask.distributed import Client, progress, performance_report
from dask_kubernetes import KubeCluster
import fsspec

cluster = KubeCluster(n_workers=30)
client = Client(cluster)

ds = xr.open_zarr(fsspec.get_mapper('s3://pangeo-data-uswest2/esip/adcirc/adcirc_01d', anon=False, requester_pays=True))

with performance_report(filename="dask-zarr-report.html"):
    max_var1 = ds['zeta'].max(dim='time').compute()

The first time I run this it dies with "stream is closed" when the maximum is computed.

---------------------------------------------------------------------------
StreamClosedError                         Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    197         try:
--> 198             n_frames = await stream.read_bytes(8)
    199             n_frames = struct.unpack("Q", n_frames)[0]

StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

CommClosedError                           Traceback (most recent call last)
<timed exec> in <module>

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in __exit__(self, typ, value, traceback)
   4598 
   4599     def __exit__(self, typ, value, traceback):
-> 4600         get_client().sync(self.__aexit__, type, value, traceback)
   4601 
   4602 

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    766         else:
    767             return sync(
--> 768                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    769             )
    770 

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    332     if error[0]:
    333         typ, exc, tb = error[0]
--> 334         raise exc.with_traceback(tb)
    335     else:
    336         return result[0]

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py in f()
    316             if callback_timeout is not None:
    317                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 318             result[0] = yield future
    319         except Exception as exc:
    320             error[0] = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in __aexit__(self, typ, value, traceback)
   4590 
   4591     async def __aexit__(self, typ, value, traceback):
-> 4592         data = await get_client().scheduler.performance_report(start=self.start)
   4593         with open(self.filename, "w") as f:
   4594             f.write(data)

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    755             name, comm.name = comm.name, "ConnectionPool." + key
    756             try:
--> 757                 result = await send_recv(comm=comm, op=key, **kwargs)
    758             finally:
    759                 self.pool.reuse(self.addr, comm)

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    538         await comm.write(msg, serializers=serializers, on_error="raise")
    539         if reply:
--> 540             response = await comm.read(deserializers=deserializers)
    541         else:
    542             response = None

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    216             self.stream = None
    217             if not shutting_down():
--> 218                 convert_stream_closed_error(self, e)
    219         else:
    220             try:

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
    130         raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
    131     else:
--> 132         raise CommClosedError("in %s: %s" % (obj, exc))
    133 
    134 

CommClosedError: in <closed TCP>: Stream is closed

However, if I run the maximum calculation again it succeeds.

In case it's useful, here is the Dask performance report.

And here's the binder link to reproduce.

I wonder if this is a timeout issue. Can you test with the following config:

distributed:
  comm:
    timeouts:
      tcp: 50s              # time before calling an unresponsive connection dead

or in the code:

import dask
import distributed
dask.config.set({"distributed.comm.timeouts.tcp": "50s"})

Looking into this a bit. Seeing some odd things after that performance_report context manager exits.

await client.scheduler.nbytes() will occasionally throw an error from Tornado.

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    755             name, comm.name = comm.name, "ConnectionPool." + key
    756             try:
--> 757                 result = await send_recv(comm=comm, op=key, **kwargs)
    758             finally:
    759                 self.pool.reuse(self.addr, comm)

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    536 
    537     try:
--> 538         await comm.write(msg, serializers=serializers, on_error="raise")
    539         if reply:
    540             response = await comm.read(deserializers=deserializers)

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in write(self, msg, serializers, on_error)
    248             if sum(lengths) < 2 ** 17:  # 128kiB
    249                 b = b"".join(length_bytes + frames)  # small enough, send in one go
--> 250                 stream.write(b)
    251             else:
    252                 stream.write(b"".join(length_bytes))  # avoid large memcpy, send in many

/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py in write(self, data)
    557         self._write_futures.append((self._total_write_index, future))
    558         if not self._connecting:
--> 559             self._handle_write()
    560             if self._write_buffer:
    561                 self._add_io_state(self.io_loop.WRITE)

/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py in _handle_write(self)
    963                 if num_bytes == 0:
    964                     break
--> 965                 self._write_buffer.advance(num_bytes)
    966                 self._total_write_done_index += num_bytes
    967             except (socket.error, IOError, OSError) as e:

/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py in advance(self, size)
    200         Advance the current buffer position by ``size`` bytes.
    201         """
--> 202         assert 0 < size <= self._size
    203         self._size -= size
    204         pos = self._first_pos

AssertionError: 

(self._size is 0).

This may be completely unrelated though.

@quasiben, I tried adding the suggested line to my notebook:

dask.config.set({"distributed.comm.timeouts.tcp": "50s"})

but no luck, same result:

First time through this dask calculation:

%%time
with performance_report(filename="dask-zarr-report.html"):
    max_var1 = ds['zeta'].max(dim='time').compute()

fails with "stream is closed".

Second time you execute it, it succeeds.

@TomAugspurger thanks for looking into this!

Small update.

  1. The reason things work the second time is because performance_report measures stuff from the start of the context manger. If we do
start = time()
max_var1 = ds['zeta'].max(dim='time').compute()
await client.scheduler.performance_report(start=start)

things reliably raise the exception.

  1. Inside Scheduler.performance_report, we make several remote calls on the workers to get data.
        compute, scheduler, workers = await asyncio.gather(
            *[
                self.get_profile(start=start),
                self.get_profile(scheduler=True, start=start),
                self.get_profile(server=True, start=start),
            ]
        )

For me, await self.get_profile() reliably succeeds, but self.get_profile(server=True) reliably fails. It's not really clear to me why that would matter, the two look fairly similar on the worker side of things. Still looking.

Hmm, @mrocklin is it possible that the Server.profile deque could have elements that reference themselves? In the worker logs, I'm seeing

tornado.application - ERROR - Exception in callback functools.partial(. at 0x7efd329bf7a0>,  exception=RecursionError('maximum recursion depth exceeded while calling a Python object')>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/tcpserver.py", line 327, in 
    gen.convert_yielded(future), lambda f: f.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 456, in _handle_stream
    await self.comm_handler(comm)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py", line 423, in handle_comm
    await comm.write(result, serializers=serializers)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 240, in write
    context={"sender": self._local_addr, "recipient": self._peer_addr},
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 40, in to_frames
    if FRAME_OFFLOAD_THRESHOLD and sizeof(msg) > FRAME_OFFLOAD_THRESHOLD:
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/utils.py", line 505, in __call__
    return meth(arg, *args, **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/sizeof.py", line 36, in sizeof_python_dict
...

which mimics the error you see from

d = {'a': 1}
d['b'] = d
sizeof.sizeof_python_dict(d)

Got a local reproducer! Kinda...

import xarray as xr
import dask.array as da
import numpy as np
from distributed import Client, performance_report


def main():
    with Client():
        # Generate and write the data
        shape = (1032, 75, 50, 25)
        chunks = (12, 75, 50, 25)

        arr = da.random.random(shape, chunks=chunks)

        i = np.arange(shape[3], dtype='i4')
        j = np.arange(shape[2], dtype='i4')

        coords = {
            "i": i,
            "j": j,
            "lev": np.random.random(shape[1]).astype('f4'),
            "time": np.arange(shape[0]).view("datetime64[ns]"),
        }

        lat = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='latitude',
                           dims=('j', 'i'), coords={'i': i, 'j': j})
        lng = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='lng',
                           dims=('j', 'i'), coords={'i': i, 'j': j})

        coords['lat'] = lat
        coords['lng'] = lng

        xarr = xr.DataArray(arr, dims=('time', 'lev', 'j', 'i'), coords=coords, name='uo')

        dset = xr.Dataset({"uo": xarr})
        dset.to_zarr("data.zarr", mode="w", consolidated=True)

        # Read the data
        with performance_report():
            print("reading")
            xr.open_zarr("data.zarr").uo.mean(dim='time').compute()


if __name__ == "__main__":
    main()

That consistently raises a StreamClosedError when run in a notebook, but is fine when run as a standalone script. So we're likely looking at some poor interaction between distributed, tornado, and the notebook. I'll hopefully make some progress now that I can reproduce it locally.

Hmm, @mrocklin https://github.com/mrocklin is it possible that the
Server.profile deque could have elements that reference themselves?

Nothing comes to mind. If I were to debug this I would take that same
example, but run it asynchronously, and then use pdb to see what the object
was that was self-nested after it failed.

On Fri, Feb 7, 2020 at 1:38 PM Tom Augspurger notifications@github.com
wrote:

Got a local reproducer! Kinda...

import xarray as xrimport dask.array as daimport numpy as npfrom distributed import Client, performance_report

def main():
with Client():
# Generate and write the data
shape = (1032, 75, 50, 25)
chunks = (12, 75, 50, 25)

    arr = da.random.random(shape, chunks=chunks)

    i = np.arange(shape[3], dtype='i4')
    j = np.arange(shape[2], dtype='i4')

    coords = {
        "i": i,
        "j": j,
        "lev": np.random.random(shape[1]).astype('f4'),
        "time": np.arange(shape[0]).view("datetime64[ns]"),
    }

    lat = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='latitude',
                       dims=('j', 'i'), coords={'i': i, 'j': j})
    lng = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='lng',
                       dims=('j', 'i'), coords={'i': i, 'j': j})

    coords['lat'] = lat
    coords['lng'] = lng

    xarr = xr.DataArray(arr, dims=('time', 'lev', 'j', 'i'), coords=coords, name='uo')

    dset = xr.Dataset({"uo": xarr})
    dset.to_zarr("data.zarr", mode="w", consolidated=True)

    # Read the data
    with performance_report():
        print("reading")
        xr.open_zarr("data.zarr").uo.mean(dim='time').compute()

if __name__ == "__main__":
main()

That consistently raises a StreamClosedError when run in a notebook, but
is fine when run as a standalone script. So we're likely looking at some
poor interaction between distributed, tornado, and the notebook. I'll
hopefully make some progress now that I can reproduce it locally.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1674?email_source=notifications&email_token=AACKZTHTYOL56WTXUMQBCEDRBXIE7A5CNFSM4EJ7IN2KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELEWXTQ#issuecomment-583625678,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTDW2HCTWY272RIXIDDRBXIE7ANCNFSM4EJ7IN2A
.

FWIW, I think I've ruled that out, but I'm not sure yet. I round-tripped the profile statistics through JSON with json.loads(json.dumps(prof)), which should have broken any cyclic references. I still saw the issue.

It seems like dask.sizeof.sizeof_python_dict doesn't handle extremely short but deep data structures, like those created by profile.

def f(n):
    original = outer = {}
    inner = {}

    for i in range(n):
        outer['children'] = inner
        outer = inner

    return original

nested = f(10000)

dask.sizeof.sizeof_python_dict(nested)
# RecursionError

If the depth of the dict is ~larger than the recursion limit, we'll see the error. I've confirmed that simply returning return getsizeof(d) + 1000 * len(d), like we do for "long" dictionaries, fixes things on my example.

So, what to do? I'm having trouble. Detecting whether dict is highly nested doesn't seem feasible. Passing along a flag to sizeof that's like the current depth doesn't seem like an option, since sizeof doesn't take keyword arguments. Right now I'm leaning towards changing the data structure we use for profile.

Looking at the traceback it looks like we're computing sizeof here to see if we should offload the serialization of this thing to a separate thread. If that computation fails in some way then I'm confident saying that "yes, this thing is probably large and hairy enough that we should move its serialization off to a separate thread. We might consider putting a try-except around the offload conditional.

@rsignell-usgs your issues should be fixed by https://github.com/dask/distributed/pull/3455. I don't know if any of the other issues reported here were due to https://github.com/dask/distributed/issues/1674#issuecomment-583692173

@TomAugspurger, that's great news! I probably won't be able to try this out until later this week, but will report back!

@TomAugspurger how were you able to look at the logs ? It did not seem like the log page from the dashboard was populated when I looked

I was confused about that too. I believe the output I put in https://github.com/dask/distributed/issues/1674#issuecomment-583130189 was after I made some modifications to the comms code (possibly handling a RecursionError?). So I'm not sure that we've tracked down all the causes of this error.

@TomAugspurger , I finally tried out my use case with the new dask=2.11.0, and now I get a different error when trying to use the performance report:

This code:

%%time
with performance_report(filename="dask-report1.html"):
    max_var1 = ds['zeta'].max(dim='time').compute()

now produces:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<timed exec> in <module>

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in __exit__(self, typ, value, traceback)
   4611     def __exit__(self, typ, value, traceback):
   4612         frame = inspect.currentframe().f_back
-> 4613         code = inspect.getsource(frame)
   4614         get_client().sync(self.__aexit__, type, value, traceback, code=code)
   4615 

/srv/conda/envs/notebook/lib/python3.7/inspect.py in getsource(object)
    971     or code object.  The source code is returned as a single string.  An
    972     OSError is raised if the source code cannot be retrieved."""
--> 973     lines, lnum = getsourcelines(object)
    974     return ''.join(lines)
    975 

/srv/conda/envs/notebook/lib/python3.7/inspect.py in getsourcelines(object)
    953     raised if the source code cannot be retrieved."""
    954     object = unwrap(object)
--> 955     lines, lnum = findsource(object)
    956 
    957     if istraceback(object):

/srv/conda/envs/notebook/lib/python3.7/inspect.py in findsource(object)
    784         lines = linecache.getlines(file)
    785     if not lines:
--> 786         raise OSError('could not get source code')
    787 
    788     if ismodule(object):

OSError: could not get source code

This can be reproduced by running the "ike_3ways" notebook on the Pangeo binder: badge

FYI, that second issue you found ("could not get source code") was fixed in https://github.com/dask/distributed/pull/3505.

Just commenting to say that I was having this same issue for a long time (I was trying to do something similar to Ryan (and to Julius over at https://github.com/pangeo-data/pangeo/issues/757), using xarray.open_mfdataset to open and load data from hundreds of netCDF files), but after updating to dask v2.30.0 and dask-labextension v3.0.0 it seems to be working now! I'm very pleased.

Was this page helpful?
0 / 5 - 0 ratings