Distributed: Interesting Issue while loading pandas in distributed dask

Created on 18 Jul 2018  Â·  4Comments  Â·  Source: dask/distributed

Hello,
I built a cluster of 10 workers and 1 scheduler and I am trying to do a very simple read for a CSV file and load it into memory and I keep getting the below error when I do df.head() directly:

`---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1374 try:
-> 1375 st = self.futures[key]
1376 exception = st.exception

KeyError: "('head-1-5-from-delayed-92da537e2c34b41efc1c80f715ffe7c2', 0)"

During handling of the above exception, another exception occurred:

CancelledError Traceback (most recent call last)
in ()
----> 1 df.head()

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
906
907 if compute:
--> 908 result = result.compute()
909 return result
910

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in compute(self, *kwargs)
152 dask.base.compute
153 """
--> 154 (result,) = compute(self, traverse=False, *
kwargs)
155 return result
156

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in compute(args, *kwargs)
405 keys = [x.__dask_keys__() for x in collections]
406 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 407 results = get(dsk, keys, **kwargs)
408 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
409

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2096 try:
2097 results = self.gather(packed, asynchronous=asynchronous,
-> 2098 direct=direct)
2099 finally:
2100 for f in futures.values():

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1506 return self.sync(self._gather, futures, errors=errors,
1507 direct=direct, local_worker=local_worker,
-> 1508 asynchronous=asynchronous)
1509
1510 @gen.coroutine

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, args, *kwargs)
613 return future
614 else:
--> 615 return sync(self.loop, func, args, *kwargs)
616
617 def __repr__(self):

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, args, *kwargs)
251 e.wait(10)
252 if error[0]:
--> 253 six.reraise(*error[0])
254 else:
255 return result[0]

~/anaconda3/envs/python3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/utils.py in f()
236 yield gen.moment
237 thread_state.asynchronous = True
--> 238 result[0] = yield make_coro()
239 except Exception as exc:
240 error[0] = sys.exc_info()

~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1097
1098 try:
-> 1099 value = future.result()
1100 except Exception:
1101 self.had_exception = True

~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1105 if exc_info is not None:
1106 try:
-> 1107 yielded = self.gen.throw(*exc_info)
1108 finally:
1109 # Break up a reference to itself

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1379 six.reraise(CancelledError,
1380 CancelledError(key),
-> 1381 None)
1382 else:
1383 six.reraise(type(exception),

~/anaconda3/envs/python3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None

CancelledError: ('head-1-5-from-delayed-92da537e2c34b41efc1c80f715ffe7c2', 0)`

When I try to load it into memory using client.presist(df), I get the below error:
`---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
in ()
----> 1 df.head()

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
906
907 if compute:
--> 908 result = result.compute()
909 return result
910

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in compute(self, *kwargs)
152 dask.base.compute
153 """
--> 154 (result,) = compute(self, traverse=False, *
kwargs)
155 return result
156

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in compute(args, *kwargs)
405 keys = [x.__dask_keys__() for x in collections]
406 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 407 results = get(dsk, keys, **kwargs)
408 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
409

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2084 fifo_timeout=fifo_timeout,
2085 retries=retries,
-> 2086 user_priority=priority,
2087 )
2088 packed = pack_data(keys, futures)

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout)
2019 for v in s:
2020 if v not in self.futures:
-> 2021 raise CancelledError(v)
2022
2023 for k, v in dsk3.items():

CancelledError: ('from-delayed-92da537e2c34b41efc1c80f715ffe7c2', 0)`

Any idea why I am getting these errors?

Some of the worker logs:
distributed.worker - ERROR - None Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/distributed/worker.py", line 346, in handle_scheduler self.ensure_computing]) File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run yielded = self.gen.throw(exc_info) File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 361, in handle_stream msgs = yield comm.read() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run yielded = self.gen.throw(exc_info) File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 203, in read deserializers=deserializers) File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 315, in wrapper yielded = next(result) File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 79, in from_frames res = _from_frames() File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 65, in _from_frames deserializers=deserializers) File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 122, in loads value = _deserialize(head, fs, deserializers=deserializers) File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 235, in deserialize dumps, loads = families[name] KeyError: None

distributed.worker - INFO - Connection to scheduler broken. Reconnecting...

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Registered to: tcp://172.31.31.234:8786

distributed.worker - INFO - -------------------------------------------------

All 4 comments

I recommend producing a minimal reproducible example to help others
diagnose the problem.

See https://stackoverflow.com/help/mcve and
http://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports

On Tue, Jul 17, 2018 at 11:44 PM, willbadr notifications@github.com wrote:

Hello,
I built a cluster of 10 workers and 1 scheduler and I am trying to do a
very simple read for a CSV file and load it into memory and I keep getting
the below error when I do df.head() directly:

`-----------------------------------------------------------

KeyError Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py
in _gather(self, futures, errors, direct, local_worker)
1374 try:
-> 1375 st = self.futures[key]
1376 exception = st.exception

KeyError: "('head-1-5-from-delayed-92da537e2c34b41efc1c80f715ffe7c2', 0)"

During handling of the above exception, another exception occurred:

CancelledError Traceback (most recent call last)
in ()
----> 1 df.head()

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/dataframe/core.py
in head(self, n, npartitions, compute)
906
907 if compute:
--> 908 result = result.compute()
909 return result
910

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in
compute(self, *kwargs)
152 dask.base.compute
153 """
--> 154 (result,) = compute(self, traverse=False, *
kwargs)
155 return result
156

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in
compute(args, *kwargs)
405 keys = [x.dask_keys() for x in collections]
406 postcomputes = [x.dask_postcompute() for x in collections]
--> 407 results = get(dsk, keys, **kwargs)
408 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
409

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py
in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync,
asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2096 try:
2097 results = self.gather(packed, asynchronous=asynchronous,
-> 2098 direct=direct)
2099 finally:
2100 for f in futures.values():

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py
in gather(self, futures, errors, maxsize, direct, asynchronous)
1506 return self.sync(self._gather, futures, errors=errors,
1507 direct=direct, local_worker=local_worker,
-> 1508 asynchronous=asynchronous)
1509
1510 @gen.coroutine

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py
in sync(self, func, args, *kwargs)
613 return future
614 else:
--> 615 return sync(self.loop, func, args, *kwargs)
616
617 def repr(self):

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/utils.py
in sync(loop, func, args, *kwargs)
251 e.wait(10)
252 if error[0]:
--> 253 six.reraise(*error[0])
254 else:
255 return result[0]

~/anaconda3/envs/python3/lib/python3.6/site-packages/six.py in
reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/utils.py
in f()
236 yield gen.moment
237 thread_state.asynchronous = True
--> 238 result[0] = yield make_coro()
239 except Exception as exc:
240 error[0] = sys.exc_info()

~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/gen.py in
run(self)
1097
1098 try:
-> 1099 value = future.result()
1100 except Exception:
1101 self.had_exception = True

~/anaconda3/envs/python3/lib/python3.6/site-packages/tornado/gen.py in
run(self)
1105 if exc_info is not None:
1106 try:
-> 1107 yielded = self.gen.throw(*exc_info)
1108 finally:
1109 # Break up a reference to itself

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py
in _gather(self, futures, errors, direct, local_worker)
1379 six.reraise(CancelledError,
1380 CancelledError(key),
-> 1381 None)
1382 else:
1383 six.reraise(type(exception),

~/anaconda3/envs/python3/lib/python3.6/site-packages/six.py in
reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None

CancelledError: ('head-1-5-from-delayed-92da537e2c34b41efc1c80f715ffe7c2',
0)`

When I try to load it into memory using client.presist(df), I get the
below error:

`-----------------------------------------------------------

CancelledError Traceback (most recent call last)
in ()
----> 1 df.head()

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/dataframe/core.py
in head(self, n, npartitions, compute)
906
907 if compute:
--> 908 result = result.compute()
909 return result
910

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in
compute(self, *kwargs)
152 dask.base.compute
153 """
--> 154 (result,) = compute(self, traverse=False, *
kwargs)
155 return result
156

~/anaconda3/envs/python3/lib/python3.6/site-packages/dask/base.py in
compute(args, *kwargs)
405 keys = [x.dask_keys() for x in collections]
406 postcomputes = [x.dask_postcompute() for x in collections]
--> 407 results = get(dsk, keys, **kwargs)
408 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
409

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py
in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync,
asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2084 fifo_timeout=fifo_timeout,
2085 retries=retries,
-> 2086 user_priority=priority,
2087 )
2088 packed = pack_data(keys, futures)

~/anaconda3/envs/python3/lib/python3.6/site-packages/distributed/client.py
in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions,
priority, user_priority, resources, retries, fifo_timeout)
2019 for v in s:
2020 if v not in self.futures:
-> 2021 raise CancelledError(v)
2022
2023 for k, v in dsk3.items():

CancelledError: ('from-delayed-92da537e2c34b41efc1c80f715ffe7c2', 0)`

Any idea why I am getting these errors?

Some of the worker logs:
distributed.worker - ERROR - None Traceback (most recent call last): File
"/opt/conda/lib/python3.6/site-packages/distributed/worker.py", line 346,
in handle_scheduler self.ensure_computing]) File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py",
line 1099, in run value = future.result() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py",
line 1107, in run yielded = self.gen.throw(exc_info) File
"/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 361,
in handle_stream msgs = yield comm.read() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py",
line 1099, in run value = future.result() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py",
line 1107, in run yielded = self.gen.throw(
exc_info) File
"/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line
203, in read deserializers=deserializers) File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py",
line 1099, in run value = future.result() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py",
line 315, in wrapper yielded = next(result) File "/opt/conda/lib/python3.6/
site-packages/distributed/comm/utils.py", line 79, in from_frames res =
_from_frames() File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py",
line 65, in _from_frames deserializers=deserializers) File
"/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py",
line 122, in loads value = _deserialize(head, fs,
deserializers=deserializers) File "/opt/conda/lib/python3.6/
site-packages/distributed/protocol/serialize.py", line 235, in
deserialize dumps, loads = families[name] KeyError: None

distributed.worker - INFO - Connection to scheduler broken. Reconnecting...

distributed.worker - INFO - ------------------------------

distributed.worker - INFO - Registered to: tcp://172.31.31.234:8786

distributed.worker - INFO - ------------------------------

—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2124, or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszHYqg8ietMnET30S_sYKQq_tibtmks5uHq8mgaJpZM4VT5cl
.

Thanks for the reply and apologies for the first messy message. I am trying to follow the example in:
http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes

I started the scheduler in one server and another 10 workers in docker containers in few other servers. When i start the client using the below, I can list all the 10 workers and one scheduler.

from dask.distributed import Client
client = Client('172.31.XX.XXX:8786')  # set up local cluster on your laptop
client.restart()

Then I load a simple small dataframe from a CSV file (as in the example code in the first line). when I do

import dask.dataframe as dd
df = dd.read_csv('s3://sagemaker-walebadr/yellow_tripdata_2015-01.csv')
df = df.persist()
df.head()

I get a whole bunch of errors. On the output, I get the error:

CancelledError: ('head-1-5-from-delayed-5d024ea72f095d82070467d7fc4fb1b5', 0)

On the scheduler server, I see a the error below:

distributed.core - ERROR - 'heartbeat'
distributed.scheduler - INFO - Close client connection: Client-a5ec4e70-8a85-11e8-b910-0e74a1836ef8
distributed.core - ERROR - 'heartbeat'
Traceback (most recent call last):
 File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 374, in handle_stream
handler = self.stream_handlers[op]

On the worker side:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 122, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 235, in deserialize
dumps, loads = families[name]
KeyError: None

I recommend verifying that the version of dask across your cluster is
consistent

client.get_versions(check=True)

On Wed, Jul 18, 2018 at 8:31 AM, willbadr notifications@github.com wrote:

Thanks for the reply. I am trying to follow the example in:
http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes

I started the scheduler in one server and another 10 workers in docker
containers in few other servers. When i start the client using the below, I
can list all the 10 workers and one scheduler.

from dask.distributed import Client
client = Client('172.31.XX.XXX:8786') # set up local cluster on your laptop
client.restart()

Then I load a simple small dataframe from a CSV file (as in the example
code in the first line). when I do

import dask.dataframe as dd
df = dd.read_csv('s3://sagemaker-walebadr/yellow_tripdata_2015-01.csv')
df = df.persist()
df.head()

I get a whole bunch of errors. On the output, I get the error:

CancelledError: ('head-1-5-from-delayed-5d024ea72f095d82070467d7fc4fb1b5', 0)

On the scheduler server, I see a the error below:

distributed.core - ERROR - 'heartbeat'
distributed.scheduler - INFO - Close client connection: Client-a5ec4e70-8a85-11e8-b910-0e74a1836ef8
distributed.core - ERROR - 'heartbeat'
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 374, in handle_stream
handler = self.stream_handlers[op]

On the worker side:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 122, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 235, in deserialize
dumps, loads = families[name]
KeyError: None

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2124#issuecomment-405914324,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszIjBsueg094DiJ-FBYmWSvBxc13oks5uHyqwgaJpZM4VT5cl
.

@mrocklin Thank you so much!! My Jupyter notebook is on a third node and the client version was lower than the scheduler version. The issue is now resolved.

Was this page helpful?
0 / 5 - 0 ratings