I am having workers die with the following error messages.
This occurs when I am trying to .persist() large xarray dataset into memory. (There is more than enough memory in the cluster for the dataset by a factor of 5.)
I don't know what these errors mean, other than that the workers have died. Advice would be appreciated on how to debug more effectively.
distributed.worker - ERROR - failed during get data
Traceback (most recent call last):
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 221, in write
yield future
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/worker.py", line 524, in get_data
compressed = yield comm.write(msg)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 225, in write
convert_stream_closed_error(self, e)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.core - WARNING - Lost connection to 'tcp://10.43.8.25:48251': in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.worker - ERROR - Worker stream died during communication: tcp://10.43.4.25:53234
Traceback (most recent call last):
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 182, in read
frame = yield stream.read_bytes(length)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/worker.py", line 1763, in gather_dep
who=self.address)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 516, in send_recv_from_rpc
result = yield send_recv(comm=comm, op=key, **kwargs)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 350, in send_recv
response = yield comm.read()
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 188, in read
convert_stream_closed_error(self, e)
File "/rigel/ocp/users/ra2697/conda/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
Unfortunately these don't give us very much information. While sending data between workers the connection was cut. I don't suppose the worker logs are any more informative?
That is the worker log.
I think it is something related to spill-to-disk. If I make the dataset
smaller by a factor of two, everything works fine.
I will try to find a more reproducible example. Sorry for the vagueness.
On Sun, Dec 31, 2017 at 11:55 AM, Matthew Rocklin notifications@github.com
wrote:
Unfortunately these don't give us very much information. While sending
data between workers the connection was cut. I don't suppose the worker
logs are any more informative?—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1674#issuecomment-354614156,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ABJFJtyQye8d4DRhrjDWrAFYaVr8Lw-Yks5tF7ySgaJpZM4RP0N0
.
There is more than enough memory in the cluster for the dataset by a factor of 5.
I think it is something related to spill-to-disk. If I make the dataset smaller by a factor of two, everything works fine.
These two statements together confuse me. Does it spill to disk? Does the worker itself fail?
That is the worker log.
The log from the other worker might be interesting to see
The worker does fail. I will have to check about spilling to disk.
I am going to try a test case where I just persist a big random array and
see if I can trigger the failer.
On Sun, Dec 31, 2017 at 12:57 PM, Matthew Rocklin notifications@github.com
wrote:
There is more than enough memory in the cluster for the dataset by a
factor of 5.I think it is something related to spill-to-disk. If I make the dataset
smaller by a factor of two, everything works fine.These two statements together confuse me. Does it spill to disk? Does the
worker itself fail?—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1674#issuecomment-354616836,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ABJFJk6hrvpz1U2IWWPljfU6lEknLjVkks5tF8smgaJpZM4RP0N0
.
i am also having the same problem . csv is just 582 mb . and i am using 9 c3.xlarge instances as workers with 4 process and 1 c3.xlarge as scheduler .
but i am doing lot of dataframe operation and running some functions on them parallely.
that is the actual code and cmd line arguments are scheduler ip with port and no of chunks to run at a time (eg code.py x.x.x.x:yyyy 1000) and substitute the s3 path with the below csv file
https://drive.google.com/open?id=1BMHxeRBpq_dWN_fhffP5O2JQ3DdSIYVs
log file
dask_log.txt
i could see this error in my terminal
`tornado.application - ERROR - Exception in callback functools.partial(
Traceback (most recent call last):
File "/home/lib/python3.5/site-packages/tornado/ioloop.py", line 605, in _run_callback ret = callback()
File "/home/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper return fn(args, *kwargs)
File "/homelib/python3.5/site-packages/distributed/client.py",
line 677, in _send_to_scheduler_safe self.scheduler_comm.send(msg)
AttributeError: 'NoneType' object has no attribute 'send'`
`
`
created a new issue #1688 so it can be discussed separately
Were you able to reproduce this on random data, @rabernat?
Sorry, I have no updates on this issue right now.
FWIW ran into this issue and issue ( https://github.com/dask/distributed/issues/1736 ) recently (surprised I hadn’t run into these sooner). Was able to sort out both by having dask-worker write to node local scratch space. So might be worth trying if you haven’t already.
@rabernat I am seeing this error associated with processes that: (1) do not release the GIL and (2) are spilling to disk. I believe spilling also holds the worker GIL, so it may be that it is a general issue with a connection timing out due to (apparent) lack of connectivity.
I have also had this issue using the streamz.dask module. It took a couple runs but here is the code that eventually produced the error.
from dask.distributed import Client
client = Client('tcp://127.0.0.1:40990')
from streamz import Stream
import operator as op
import numpy as np
s1 = Stream()
s2 = Stream()
d1 = s1.scatter()
d2 = s2.scatter()
res = d1.combine_latest(d2).starmap(op.sub)
lres = res.buffer(10).gather()
L = lres.sink_to_list()
ll = []
s2.emit(np.ones((10, 10)))
for i in range(10):
s1.emit(np.ones((10, 10)))
ll.append(i)
# sleep(5)
print('hi')
print(len(L))
print(len(ll))
The resulting error message
/home/christopher/mc/envs/dp_dev/bin/python /home/christopher/.PyCharm2018.1/config/scratches/scratch_198.py
hi
9
10
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f8c6b0090d0>, <tornado.concurrent.Future object at 0x7f8c6a7c8470>)
Traceback (most recent call last):
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/comm/tcp.py", line 177, in read
n_frames = yield stream.read_bytes(8)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/core.py", line 552, in send_recv_from_rpc
result = yield send_recv(comm=comm, op=key, **kwargs)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/core.py", line 432, in send_recv
response = yield comm.read(deserializers=deserializers)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/comm/tcp.py", line 198, in read
convert_stream_closed_error(self, e)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/ioloop.py", line 605, in _run_callback
ret = callback()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper
return fn(*args, **kwargs)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
future.result()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/home/christopher/dev/streamz/streamz/core.py", line 886, in cb
yield self._emit(x)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 828, in callback
result_list.append(f.result())
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/home/christopher/dev/streamz/streamz/dask.py", line 123, in update
result = yield client.gather(x, asynchronous=True)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/client.py", line 1468, in _gather
response = yield self.scheduler.gather(keys=keys)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/home/christopher/mc/envs/dp_dev/lib/python3.5/site-packages/distributed/core.py", line 555, in send_recv_from_rpc
% (e, key,))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed: while trying to call remote method 'gather'
Process finished with exit code 0
conda env
# Name Version Build Channel
dask 0.18.1 py_0 conda-forge
dask-core 0.18.1 py_0 conda-forge
I don't think that this should be spilling to disk (I have enough memory to hold 11 10x10 arrays). Nor is there anything special happening with the GIL (just regular numpy subtraction).
Any updates on this?
There hasn't been any comment since the last comment on July 9th.
The next step for this issue is for someone to generate a minimal failing example that demonstrates the issues with as few extra pieces as possible.
I have two laptop both linux (Ubuntu 4 GB RAM). One is worker and other has client and scheduler. When I tried to read a csv file of 8GB I got the following error in jupyter notebook. Everything was working when I read a file of 25 MB size.
Code:
futures = train.map_partitions(lambda part: client.scatter([part])[0]).compute(scheduler='threads') # single threaded local scheduler
train = dd.from_delayed(list(futures), meta=train.apply(lambda x: x, axis=1))
**Error Message in Jupyter Notebook:**
`StreamClosedError Traceback (most recent call last)
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
193 try:
--> 194 n_frames = yield stream.read_bytes(8)
195 n_frames = struct.unpack("Q", n_frames)[0]
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1132 try:
-> 1133 value = future.result()
1134 except Exception:
StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
CommClosedError Traceback (most recent call last)
<ipython-input-7-f19dd6fd3721> in <module>()
----> 1 futures = train.map_partitions(lambda part: client.scatter([part])[0]).compute(scheduler='threads') # single threaded local scheduler
2
3 train = dd.from_delayed(list(futures), meta=train.apply(lambda x: x, axis=1))
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
460 _execute_task(task, data) # Re-execute locally
461 else:
--> 462 raise_exception(exc, tb)
463 res, worker_id = loads(res_info)
464 state['cache'][key] = res
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
110 if exc.__traceback__ is not tb:
111 raise exc.with_traceback(tb)
--> 112 raise exc
113
114 import pickle as cPickle
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/optimization.py in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/compatibility.py in apply(func, args, kwargs)
91 def apply(func, args, kwargs=None):
92 if kwargs:
---> 93 return func(*args, **kwargs)
94 else:
95 return func(*args)
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(*args, **kwargs)
3877 func = kwargs.pop('_func')
3878 meta = kwargs.pop('_meta')
-> 3879 df = func(*args, **kwargs)
3880 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
3881 if not len(df):
<ipython-input-7-f19dd6fd3721> in <lambda>(part)
----> 1 futures = train.map_partitions(lambda part: client.scatter([part])[0]).compute(scheduler='threads') # single threaded local scheduler
2
3 train = dd.from_delayed(list(futures), meta=train.apply(lambda x: x, axis=1))
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, maxsize, timeout, asynchronous)
2069 timeout=timeout,
2070 asynchronous=asynchronous,
-> 2071 hash=hash,
2072 )
2073
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
751 return future
752 else:
--> 753 return sync(self.loop, func, *args, **kwargs)
754
755 def __repr__(self):
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
329 e.wait(10)
330 if error[0]:
--> 331 six.reraise(*error[0])
332 else:
333 return result[0]
~/anaconda3/envs/py36/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/utils.py in f()
314 if timeout is not None:
315 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316 result[0] = yield future
317 except Exception as exc:
318 error[0] = sys.exc_info()
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
1914 client=self.id,
1915 broadcast=broadcast,
-> 1916 timeout=timeout,
1917 )
1918
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
737 name, comm.name = comm.name, "ConnectionPool." + key
738 try:
--> 739 result = yield send_recv(comm=comm, op=key, **kwargs)
740 finally:
741 self.pool.reuse(self.addr, comm)
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
531 yield comm.write(msg, serializers=serializers, on_error="raise")
532 if reply:
--> 533 response = yield comm.read(deserializers=deserializers)
534 else:
535 response = None
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
~/anaconda3/envs/py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/comm/tcp.py in read(self, deserializers)
212 self.stream = None
213 if not shutting_down():
--> 214 convert_stream_closed_error(self, e)
215 else:
216 try:
~/anaconda3/envs/py36/lib/python3.6/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
139 raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
140 else:
--> 141 raise CommClosedError("in %s: %s" % (obj, exc))
142
143
CommClosedError: in <closed TCP>: Stream is closed`
**Error Message in Scheduler terminal:**
`distributed.core - INFO - Starting established connection
distributed.core - INFO - Lost connection to 'tcp://192.168.1.104:43558': in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
distributed.core - INFO - Lost connection to 'tcp://192.168.1.104:43560': in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
distributed.core - INFO - Lost connection to 'tcp://192.168.1.104:43570': in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
distributed.utils_perf - INFO - full garbage collection released 291.62 MB from 3109 reference cycles (threshold: 10.00 MB)
I'm experiencing this as well when working with a large number of workers (>200) across many nodes. My worker logs are indeed full of these:
Event loop was unresponsive in Worker for 61.83s. This is often caused by long-running
GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
How can I fix this? Some of my functions have a lot of python code (using numpy) so I don't know exactly why the GIL is being held. Who is moving large chunks of data? My partitions are around ~100MB in size and are being read over a CIFS file share (using the LocalFileSystem with paths such as \\servername\path-to.parquet)
I don't know exactly why the GIL is being held. Who is moving large chunks of data?
Sorry, the warning doesn't say this. It says that these are two common causes, not that they have occurred.
Something is blocking the event loop for about a minute. Unfortunately Dask can't tell you why this was. It's often something to do with the code that you're asking Dask to run. I don't know how to help you without knowing a lot more about your application, and diving in that deeply probably doesn't make sense for a public github issue.
Understood. What is the proper design if my code IS a GIL holding function. Should I wrap it in a function that forks a process and await it's result? Wouldn't that cause havoc in the worker?
IMO it's not that uncommon to have custom code (e.g for .apply and .map_partitions) that is not very GIL friendly.
Should I wrap it in a function that forks a process and await it's result? Wouldn't that cause havoc in the worker?
I don't know why it would cause havoc, but you mind find that it was quite slow.
IMO it's not that uncommon to have custom code (e.g for .apply and .map_partitions) that is not very GIL friendly.
The most often solution here is to change the code to release the GIL. This is pretty easy today with most Python wrapping systems (and increasingly becoming the default).
Alternatively, you can ignore the warnings.
This conversation has gotten pretty off-topic from the original issue here. If you have more to discuss I recommend raising a separate issue.
I don't have a minimal example, but I do have a reproducible one.
import xarray as xr
from dask.distributed import Client, progress, performance_report
from dask_kubernetes import KubeCluster
import fsspec
cluster = KubeCluster(n_workers=30)
client = Client(cluster)
ds = xr.open_zarr(fsspec.get_mapper('s3://pangeo-data-uswest2/esip/adcirc/adcirc_01d', anon=False, requester_pays=True))
with performance_report(filename="dask-zarr-report.html"):
max_var1 = ds['zeta'].max(dim='time').compute()
The first time I run this it dies with "stream is closed" when the maximum is computed.
---------------------------------------------------------------------------
StreamClosedError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
197 try:
--> 198 n_frames = await stream.read_bytes(8)
199 n_frames = struct.unpack("Q", n_frames)[0]
StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
CommClosedError Traceback (most recent call last)
<timed exec> in <module>
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in __exit__(self, typ, value, traceback)
4598
4599 def __exit__(self, typ, value, traceback):
-> 4600 get_client().sync(self.__aexit__, type, value, traceback)
4601
4602
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
766 else:
767 return sync(
--> 768 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
769 )
770
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
332 if error[0]:
333 typ, exc, tb = error[0]
--> 334 raise exc.with_traceback(tb)
335 else:
336 return result[0]
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py in f()
316 if callback_timeout is not None:
317 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 318 result[0] = yield future
319 except Exception as exc:
320 error[0] = sys.exc_info()
/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in __aexit__(self, typ, value, traceback)
4590
4591 async def __aexit__(self, typ, value, traceback):
-> 4592 data = await get_client().scheduler.performance_report(start=self.start)
4593 with open(self.filename, "w") as f:
4594 f.write(data)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
755 name, comm.name = comm.name, "ConnectionPool." + key
756 try:
--> 757 result = await send_recv(comm=comm, op=key, **kwargs)
758 finally:
759 self.pool.reuse(self.addr, comm)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
538 await comm.write(msg, serializers=serializers, on_error="raise")
539 if reply:
--> 540 response = await comm.read(deserializers=deserializers)
541 else:
542 response = None
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
216 self.stream = None
217 if not shutting_down():
--> 218 convert_stream_closed_error(self, e)
219 else:
220 try:
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
130 raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
131 else:
--> 132 raise CommClosedError("in %s: %s" % (obj, exc))
133
134
CommClosedError: in <closed TCP>: Stream is closed
However, if I run the maximum calculation again it succeeds.
In case it's useful, here is the Dask performance report.
And here's the binder link to reproduce.
I wonder if this is a timeout issue. Can you test with the following config:
distributed:
comm:
timeouts:
tcp: 50s # time before calling an unresponsive connection dead
or in the code:
import dask
import distributed
dask.config.set({"distributed.comm.timeouts.tcp": "50s"})
Looking into this a bit. Seeing some odd things after that performance_report context manager exits.
await client.scheduler.nbytes() will occasionally throw an error from Tornado.
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
755 name, comm.name = comm.name, "ConnectionPool." + key
756 try:
--> 757 result = await send_recv(comm=comm, op=key, **kwargs)
758 finally:
759 self.pool.reuse(self.addr, comm)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
536
537 try:
--> 538 await comm.write(msg, serializers=serializers, on_error="raise")
539 if reply:
540 response = await comm.read(deserializers=deserializers)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py in write(self, msg, serializers, on_error)
248 if sum(lengths) < 2 ** 17: # 128kiB
249 b = b"".join(length_bytes + frames) # small enough, send in one go
--> 250 stream.write(b)
251 else:
252 stream.write(b"".join(length_bytes)) # avoid large memcpy, send in many
/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py in write(self, data)
557 self._write_futures.append((self._total_write_index, future))
558 if not self._connecting:
--> 559 self._handle_write()
560 if self._write_buffer:
561 self._add_io_state(self.io_loop.WRITE)
/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py in _handle_write(self)
963 if num_bytes == 0:
964 break
--> 965 self._write_buffer.advance(num_bytes)
966 self._total_write_done_index += num_bytes
967 except (socket.error, IOError, OSError) as e:
/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py in advance(self, size)
200 Advance the current buffer position by ``size`` bytes.
201 """
--> 202 assert 0 < size <= self._size
203 self._size -= size
204 pos = self._first_pos
AssertionError:
(self._size is 0).
This may be completely unrelated though.
@quasiben, I tried adding the suggested line to my notebook:
dask.config.set({"distributed.comm.timeouts.tcp": "50s"})
but no luck, same result:
First time through this dask calculation:
%%time
with performance_report(filename="dask-zarr-report.html"):
max_var1 = ds['zeta'].max(dim='time').compute()
fails with "stream is closed".
Second time you execute it, it succeeds.
@TomAugspurger thanks for looking into this!
Small update.
performance_report measures stuff from the start of the context manger. If we dostart = time()
max_var1 = ds['zeta'].max(dim='time').compute()
await client.scheduler.performance_report(start=start)
things reliably raise the exception.
compute, scheduler, workers = await asyncio.gather(
*[
self.get_profile(start=start),
self.get_profile(scheduler=True, start=start),
self.get_profile(server=True, start=start),
]
)
For me, await self.get_profile() reliably succeeds, but self.get_profile(server=True) reliably fails. It's not really clear to me why that would matter, the two look fairly similar on the worker side of things. Still looking.
Hmm, @mrocklin is it possible that the Server.profile deque could have elements that reference themselves? In the worker logs, I'm seeing
tornado.application - ERROR - Exception in callback functools.partial(. at 0x7efd329bf7a0>, exception=RecursionError('maximum recursion depth exceeded while calling a Python object')>)
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/tcpserver.py", line 327, in
gen.convert_yielded(future), lambda f: f.result()
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 456, in _handle_stream
await self.comm_handler(comm)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py", line 423, in handle_comm
await comm.write(result, serializers=serializers)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 240, in write
context={"sender": self._local_addr, "recipient": self._peer_addr},
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 40, in to_frames
if FRAME_OFFLOAD_THRESHOLD and sizeof(msg) > FRAME_OFFLOAD_THRESHOLD:
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/utils.py", line 505, in __call__
return meth(arg, *args, **kwargs)
File "/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/sizeof.py", line 36, in sizeof_python_dict
...
which mimics the error you see from
d = {'a': 1}
d['b'] = d
sizeof.sizeof_python_dict(d)
Got a local reproducer! Kinda...
import xarray as xr
import dask.array as da
import numpy as np
from distributed import Client, performance_report
def main():
with Client():
# Generate and write the data
shape = (1032, 75, 50, 25)
chunks = (12, 75, 50, 25)
arr = da.random.random(shape, chunks=chunks)
i = np.arange(shape[3], dtype='i4')
j = np.arange(shape[2], dtype='i4')
coords = {
"i": i,
"j": j,
"lev": np.random.random(shape[1]).astype('f4'),
"time": np.arange(shape[0]).view("datetime64[ns]"),
}
lat = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='latitude',
dims=('j', 'i'), coords={'i': i, 'j': j})
lng = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='lng',
dims=('j', 'i'), coords={'i': i, 'j': j})
coords['lat'] = lat
coords['lng'] = lng
xarr = xr.DataArray(arr, dims=('time', 'lev', 'j', 'i'), coords=coords, name='uo')
dset = xr.Dataset({"uo": xarr})
dset.to_zarr("data.zarr", mode="w", consolidated=True)
# Read the data
with performance_report():
print("reading")
xr.open_zarr("data.zarr").uo.mean(dim='time').compute()
if __name__ == "__main__":
main()
That consistently raises a StreamClosedError when run in a notebook, but is fine when run as a standalone script. So we're likely looking at some poor interaction between distributed, tornado, and the notebook. I'll hopefully make some progress now that I can reproduce it locally.
Hmm, @mrocklin https://github.com/mrocklin is it possible that the
Server.profile deque could have elements that reference themselves?
Nothing comes to mind. If I were to debug this I would take that same
example, but run it asynchronously, and then use pdb to see what the object
was that was self-nested after it failed.
On Fri, Feb 7, 2020 at 1:38 PM Tom Augspurger notifications@github.com
wrote:
Got a local reproducer! Kinda...
import xarray as xrimport dask.array as daimport numpy as npfrom distributed import Client, performance_report
def main():
with Client():
# Generate and write the data
shape = (1032, 75, 50, 25)
chunks = (12, 75, 50, 25)arr = da.random.random(shape, chunks=chunks) i = np.arange(shape[3], dtype='i4') j = np.arange(shape[2], dtype='i4') coords = { "i": i, "j": j, "lev": np.random.random(shape[1]).astype('f4'), "time": np.arange(shape[0]).view("datetime64[ns]"), } lat = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='latitude', dims=('j', 'i'), coords={'i': i, 'j': j}) lng = xr.DataArray(da.random.random(shape[2:], chunks[2:]), name='lng', dims=('j', 'i'), coords={'i': i, 'j': j}) coords['lat'] = lat coords['lng'] = lng xarr = xr.DataArray(arr, dims=('time', 'lev', 'j', 'i'), coords=coords, name='uo') dset = xr.Dataset({"uo": xarr}) dset.to_zarr("data.zarr", mode="w", consolidated=True) # Read the data with performance_report(): print("reading") xr.open_zarr("data.zarr").uo.mean(dim='time').compute()if __name__ == "__main__":
main()That consistently raises a StreamClosedError when run in a notebook, but
is fine when run as a standalone script. So we're likely looking at some
poor interaction between distributed, tornado, and the notebook. I'll
hopefully make some progress now that I can reproduce it locally.—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1674?email_source=notifications&email_token=AACKZTHTYOL56WTXUMQBCEDRBXIE7A5CNFSM4EJ7IN2KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELEWXTQ#issuecomment-583625678,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTDW2HCTWY272RIXIDDRBXIE7ANCNFSM4EJ7IN2A
.
FWIW, I think I've ruled that out, but I'm not sure yet. I round-tripped the profile statistics through JSON with json.loads(json.dumps(prof)), which should have broken any cyclic references. I still saw the issue.
It seems like dask.sizeof.sizeof_python_dict doesn't handle extremely short but deep data structures, like those created by profile.
def f(n):
original = outer = {}
inner = {}
for i in range(n):
outer['children'] = inner
outer = inner
return original
nested = f(10000)
dask.sizeof.sizeof_python_dict(nested)
# RecursionError
If the depth of the dict is ~larger than the recursion limit, we'll see the error. I've confirmed that simply returning return getsizeof(d) + 1000 * len(d), like we do for "long" dictionaries, fixes things on my example.
So, what to do? I'm having trouble. Detecting whether dict is highly nested doesn't seem feasible. Passing along a flag to sizeof that's like the current depth doesn't seem like an option, since sizeof doesn't take keyword arguments. Right now I'm leaning towards changing the data structure we use for profile.
Looking at the traceback it looks like we're computing sizeof here to see if we should offload the serialization of this thing to a separate thread. If that computation fails in some way then I'm confident saying that "yes, this thing is probably large and hairy enough that we should move its serialization off to a separate thread. We might consider putting a try-except around the offload conditional.
@rsignell-usgs your issues should be fixed by https://github.com/dask/distributed/pull/3455. I don't know if any of the other issues reported here were due to https://github.com/dask/distributed/issues/1674#issuecomment-583692173
@TomAugspurger, that's great news! I probably won't be able to try this out until later this week, but will report back!
@TomAugspurger how were you able to look at the logs ? It did not seem like the log page from the dashboard was populated when I looked
I was confused about that too. I believe the output I put in https://github.com/dask/distributed/issues/1674#issuecomment-583130189 was after I made some modifications to the comms code (possibly handling a RecursionError?). So I'm not sure that we've tracked down all the causes of this error.
@TomAugspurger , I finally tried out my use case with the new dask=2.11.0, and now I get a different error when trying to use the performance report:
This code:
%%time
with performance_report(filename="dask-report1.html"):
max_var1 = ds['zeta'].max(dim='time').compute()
now produces:
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<timed exec> in <module>
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in __exit__(self, typ, value, traceback)
4611 def __exit__(self, typ, value, traceback):
4612 frame = inspect.currentframe().f_back
-> 4613 code = inspect.getsource(frame)
4614 get_client().sync(self.__aexit__, type, value, traceback, code=code)
4615
/srv/conda/envs/notebook/lib/python3.7/inspect.py in getsource(object)
971 or code object. The source code is returned as a single string. An
972 OSError is raised if the source code cannot be retrieved."""
--> 973 lines, lnum = getsourcelines(object)
974 return ''.join(lines)
975
/srv/conda/envs/notebook/lib/python3.7/inspect.py in getsourcelines(object)
953 raised if the source code cannot be retrieved."""
954 object = unwrap(object)
--> 955 lines, lnum = findsource(object)
956
957 if istraceback(object):
/srv/conda/envs/notebook/lib/python3.7/inspect.py in findsource(object)
784 lines = linecache.getlines(file)
785 if not lines:
--> 786 raise OSError('could not get source code')
787
788 if ismodule(object):
OSError: could not get source code
This can be reproduced by running the "ike_3ways" notebook on the Pangeo binder:
FYI, that second issue you found ("could not get source code") was fixed in https://github.com/dask/distributed/pull/3505.
Just commenting to say that I was having this same issue for a long time (I was trying to do something similar to Ryan (and to Julius over at https://github.com/pangeo-data/pangeo/issues/757), using xarray.open_mfdataset to open and load data from hundreds of netCDF files), but after updating to dask v2.30.0 and dask-labextension v3.0.0 it seems to be working now! I'm very pleased.

Most helpful comment
Just commenting to say that I was having this same issue for a long time (I was trying to do something similar to Ryan (and to Julius over at https://github.com/pangeo-data/pangeo/issues/757), using
xarray.open_mfdatasetto open and load data from hundreds of netCDF files), but after updating to dask v2.30.0 and dask-labextension v3.0.0 it seems to be working now! I'm very pleased.