What happened:
When trying to compute multiple parquet files into a single pandas dataframe with a distributed client on a single VM, I get the following traceback :
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
sub_header, sub_frames, deserializers=deserializers
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
return loads(header, frames)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
return pickle.loads(x, buffers=new)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
return pickle.loads(x, buffers=buffers)
_pickle.UnpicklingError: pickle data was truncated
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 283, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 565, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 2666, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1981, in gather
asynchronous=asynchronous,
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 844, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 353, in sync
raise exc.with_traceback(tb)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 336, in f
result[0] = yield future
File "/home/cedric/venv/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1869, in _gather
response = await future
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1920, in _gather_remote
response = await retry_operation(self.scheduler.gather, keys=keys)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 390, in retry_operation
operation=operation,
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 370, in retry
return await coro()
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 862, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 645, in send_recv
response = await comm.read(deserializers=deserializers)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/tcp.py", line 222, in read
allow_offload=self.allow_offload,
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 77, in from_frames
res = await offload(_from_frames)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in offload
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/usr/local/lib/python3.7/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in <lambda>
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 63, in _from_frames
frames, deserialize=deserialize, deserializers=deserializers
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
sub_header, sub_frames, deserializers=deserializers
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
return loads(header, frames)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
return pickle.loads(x, buffers=new)
File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
return pickle.loads(x, buffers=buffers)
_pickle.UnpicklingError: pickle data was truncated
What you expected to happen:
I would expect no issue since the parquet files have the same schema (they don't have any parquet metadata though).
The same process worked fine on distributed 2021.2.0 and 2021.3.0
Minimal Complete Verifiable Example:
Install a new virtualenv with Python 3.7.9, install the following libraries with pip:
requirements.txt :
dask==2021.3.0
pickle5==0.0.11
numpy==1.20.1
fastparquet==0.5.0
pandas==1.2.3
git+https://github.com/dask/distributed.git@aac50f63fdacfb43be64279ab540f68cabe7351b
zict==2.0.0
tblib==1.7.0
sortedcontainers==2.3.0
cloudpickle==1.6.0
click==7.1.2
dask-glm==0.2.0
dask-ml==1.8.0
tornado==6.1
toolz==0.11.1
psutil==5.8.0
msgpack==1.0.2
partd==1.1.0
fsspec==0.8.5
Then run the following python script :
from dask.distributed import Client
import dask.dataframe as dd
client = Client()
df = dd.read_parquet("s3://sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/", storage_options={"anon": True})
df.compute()
Anything else we need to know?:
Here is the result from the following command:
client.get_versions(check=True)
Result
{
"scheduler": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"workers": {
"tcp://127.0.0.1:34472": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:34639": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:35957": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:35992": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:39976": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:40883": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:43619": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:43934": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:44065": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:45409": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:45573": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
},
"tcp://127.0.0.1:46478": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
}
},
"client": {
"host": {
"python": "3.7.9.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "3.10.0-1062.9.1.el7.x86_64",
"machine": "x86_64",
"processor": "x86_64",
"byteorder": "little",
"LC_ALL": "null",
"LANG": "fr_FR.UTF-8"
},
"packages": {
"python": "3.7.9.final.0",
"dask": "2021.03.0",
"distributed": "2021.03.1+12.gaac50f6",
"msgpack": "1.0.2",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.20.1",
"lz4": null,
"blosc": null
}
}
}
This issue was discovered trying to debug #4645.
@jakirkham has raised something interesting. He said that, using Python 3.7, distributed needs pickle5 on all the workers, the scheduler and the client(s). Here, pickle5 is installed when the LocalCluster is being created but I am not sure it is installed on all workers, the scheduler and the client(s) since pickle5 is neither in the install requirements for Python3.7, nor in the packages from the workers when I check using client.get_versions (or at least, it does not appear in the packages available).
Environment:
Just to be clear I don't think pickle5 is needed. Am simply suggesting things to try as we debug the issue
Also linking this example that Matt shared ( https://github.com/dask/distributed/issues/4645#issuecomment-811957991 )
Also this was tested with Python 3.8 and found to still be an issue ( https://github.com/dask/distributed/issues/4645#issuecomment-811974266 ). So it is not unique to Python 3.7 + pickle5.
Without pickle5 on Python 3.7, we saw this issue ( https://github.com/dask/distributed/issues/4645#issuecomment-811440906 ). This is interesting because it could indicate this was serialized with something else other that pickle (like dask serialization), which would indicate this is a serialization bug
I'm blocked for the next few hours, but the next thing I recommend doing, if anyone has the time, is to construct a test.
I tried this with the following:
@gen_cluster(client=True)
async def test_large_pandas_dataframe(c, s, a, b):
pd = pytest.importorskip("pandas")
w = c.submit(np.arange, 20000000)
x = c.submit(pd.DataFrame, {"x": w}, workers=a.address)
y = c.submit(inc, x, workers=b.address)
await y
But I learned that the frames that were coming out of Pandas had length 1, and so maybe weren't being cut up. I think that this is maybe because they're blocks in a pandas block manager. For some reason the blocks coming out of the example dataset from @williamblazing posted in https://github.com/dask/dask/issues/7490 have a much longer length. Maybe this is a dtype difference?
The next thing I would try is to construct a Pandas dataframe that we can send through my test above that reproduces this error. My guess is that this will require a different dtype or categorical or something. Once that's done I'm hoping that we can put breakpoints strategically in the serialization pipeline to find out where things are being cut, and why they're not being stitched back together.
Yeah I've been trying NumPy arrays and bytes objects locally to see if a simpler object also reproduces
Am still trying to wrap my head around where the issue is coming from. ATM am thinking pickle protocol 5 is a red-herring (especially since this issue shows up in 2021.3.1 and not 2021.3.0 suggesting this related to other serialization changes). Instead am thinking some of our logic around splitting and joining frames may be having issues. Though have yet to narrow that down
Yeah, I think that the thing to do here is to start with the data the William provided, which we know fails, and then try to simplify and reduce that down.
There are pretty solid tests and special code paths around numpy arrays. I'm fairly confident about that path
Ok have found a simpler reproducer
import numpy as np
from dask.distributed import Client, wait
class Data:
def __init__(self, n):
self.data = np.empty(n, dtype="u1")
c = Client(n_workers=1)
f = c.submit(Data, 200_000_000)
wait(f)
d = f.result()
Testing a roundtrip through dumps/loads doesn't reproduce the issue. So seems like it is not there
import numpy as np
from distributed.protocol import Serialize, dumps, loads
class Data:
def __init__(self, n):
self.data = np.empty(n, dtype="u1")
d = Data(200_000_000)
loads(dumps(Serialize(d)))
I'm tracking this down to the code that handles frames in msgpack now.
Yeah, ok, so the _encode_default and _decode_default functions in dumps and loads in protocol/core.py create a new convention where they split and then compress frames, and record the number of sub-frames under a new "num-sub-frames" key. Unfortunately, in the case where they don't immediately deserialize, but instead create a Serialized object (this happens when things pass through the scheduler), the object that they create will get deserialized now with a completely different code path, which is unaware of the new num-sub-frames convention.
cc @madsbk
Resolved I think in https://github.com/dask/distributed/pull/4666
Tests are running now.
Yeah was also digging into the MsgPack serialization code, but hadn't gotten quite that far. That makes sense. Thanks for digging deeper 馃檪
FWIW Mads is out this week. I believe he get back next week, but am not sure
Ok this should now be fixed. Please retest and let us know how it goes. Also it should be possible to test without pickle5. If you are still seeing issues with main, please let us know as soon as you can (we are planning to release tomorrow) 馃檪
cc @alejandrofiel @Cedric-Magnan @gabicca @williamBlazing
Ok this should now be fixed. Please retest and let us know how it goes. Also it should be possible to test without
pickle5. If you are still seeing issues withmain, please let us know as soon as you can (we are planning to release tomorrow) 馃檪cc @alejandrofiel @Cedric-Magnan @gabicca @williamBlazing
I've just ran a few tests that were failing before and now they pass with the latest code on main, on python3.7!
Thank everyone for all the effort you put into fixing this!! :)
I've tested with the code I had provided to read parquet files with distributed from the current main branch and it seems to work as well ! Congrats and thanks for the effort ! (It also works without pickle5)
Great, thank you all for testing! 馃槃