distributed fails with large graph/dataset

Created on 6 Apr 2021  路  3Comments  路  Source: dask/distributed

What happened:

I'm using a local distributed cluster to perform computations with a large 2190x432x432x184 array, with chunks of 7x432x432x184. Dask/distributed works beautifully when I run my computation on a subset of my data, but when running on my full dataset (~500GB) I get the following error: ValueError: 3713192179 exceeds max_bin_len(2147483647). Changes to the number of workers/threads or chunking seems to have no effect. I suspect this is an issue with the graph size, but Googling finds few matches so I'm not sure.

I get this error with dask and distributed version 2021.3.1. I checked previous versions and the error does not occur. However, the speed of computation seems a lot higher with 2021.3.1 over previous versions, so I'm reluctant to downgrade.

Below is the full trace:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 3713192179 exceeds max_bin_len(2147483647)
distributed.core - ERROR - 3713192179 exceeds max_bin_len(2147483647)
Traceback (most recent call last):
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/core.py", line 555, in handle_stream
    msgs = await comm.read()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/tcp.py", line 218, in read
    msg = await from_frames(
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    return protocol.loads(
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 3713192179 exceeds max_bin_len(2147483647)
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/core.py", line 501, in handle_comm
    result = await result
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/scheduler.py", line 4732, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/core.py", line 555, in handle_stream
    msgs = await comm.read()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/tcp.py", line 218, in read
    msg = await from_frames(
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    return protocol.loads(
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 3713192179 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f219fdf51f0>, <Task finished name='Task-116' coro=<BaseTCPListener._handle_stream() done, defined at /data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/tcp.py:476> exception=ValueError('3713192179 exceeds max_bin_len(2147483647)')>)
Traceback (most recent call last):
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/tornado/tcpserver.py", line 331, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/tcp.py", line 493, in _handle_stream
    await self.comm_handler(comm)
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/core.py", line 501, in handle_comm
    result = await result
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/scheduler.py", line 4732, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/core.py", line 555, in handle_stream
    msgs = await comm.read()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/tcp.py", line 218, in read
    msg = await from_frames(
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    return protocol.loads(
  File "/data/hpcdata/users/tomand/anaconda/envs/icenet2-dask/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 3713192179 exceeds max_bin_len(2147483647)

What you expected to happen:

No error.

Minimal Complete Verifiable Example:

I tried to crease a MWE that mimics my script using random data, but I'm unable to get the same error. The gist is that it computes a mean over the 1st and 2nd axis of a 2190x432x432x184 array. In practice there are a few more steps so the graph will be more complicated. This might be why I can't recreate the error in this MWE. Below is a Colab link.

https://colab.research.google.com/drive/1KUP6nrPQfzutsyQjgduD8TrPQMDQbiu_?usp=sharing

Environment:

  • Dask version: 2021.3.1
  • Python version: 3.8.5
  • Operating System: CentOS Linux 7 (Core)
  • Install method (conda, pip, source): mamba

Most helpful comment

Can you please test with Dask PR ( https://github.com/dask/dask/pull/7525 ) and Distributed PR ( https://github.com/dask/distributed/pull/4677 )?

cc @madsbk

All 3 comments

Can you please test with Dask PR ( https://github.com/dask/dask/pull/7525 ) and Distributed PR ( https://github.com/dask/distributed/pull/4677 )?

cc @madsbk

That fixes this particular issue. Thank you very much!

Great, thanks for following up @tom-andersson and @madsbk for fixing the issue

Was this page helpful?
0 / 5 - 0 ratings

Related issues

DPeterK picture DPeterK  路  3Comments

quasiben picture quasiben  路  7Comments

macks22 picture macks22  路  7Comments

fjetter picture fjetter  路  6Comments

djhoese picture djhoese  路  3Comments