What happened:
Serialization fails with TypeError: can not serialize 'numpy.uint16' object when using master.
What you expected to happen:
Serialization should succeed.
Minimal Complete Verifiable Example:
import dask.array as da
import numpy as np
from distributed.protocol.highlevelgraph import highlevelgraph_pack
def fn(x, dt):
return x.astype(dt)
arr = da.blockwise(fn, "x", da.ones(1000), "x", np.uint16(0), None, dtype=np.uint16)
highlevelgraph_pack(arr.__dask_graph__(), None, None)
Anything else we need to know?:
This happens for a number of numpy data types.
Environment:
The following also fails:
import dask.array as da
import numpy as np
from distributed.protocol.highlevelgraph import highlevelgraph_pack
def fn(x, y):
return x
arr = da.blockwise(fn, "x", da.ones(1000), "x", np.ones(1), None)
highlevelgraph_pack(arr.__dask_graph__(), None, None)
I have confirmed that this bug is also present in the latest release (distributed==2021.2.0).
The last version on which the following example runs through is distributed==2.30.1. Thereafter it always fails. Note that failure will not occur with optimize_graph=True. However, I am busy experimenting with annotations and I need to disable task fusion.
import dask.array as da
import numpy as np
from distributed import LocalCluster, Client
def fn(x, y):
return x
if __name__ == "__main__":
cluster = LocalCluster(processes=True,
n_workers=1,
threads_per_worker=1)
client = Client(cluster)
arr = da.blockwise(fn, "x",
da.ones(1000), "x",
np.ones(1), None)
da.compute(arr, optimize_graph=False)
Thanks for the report. Taking a look now.
I'm able to reproduce. It looks like the root cause is that Blockwise.__dask_distributed_pack__ can return inputs that aren't msgpack serializable. In this case, the blockwise layer's .indices includes the ndarray np.array([1.]).
@madsbk I'd be curious to get your thoughts here. Do we require that the values in __dask_distributed_pack__ be msgpack serializable?
Yes because of the new high level graph da.blockwise doesn't support numpy arrays directly. E.g. the da.ufunc class calls da.asarray() on the inputs to make sure that they are dask arrays, which in turn makes them compatible with __dask_distributed_pack__.
The following should work:
import dask.array as da
import numpy as np
from distributed.protocol.highlevelgraph import highlevelgraph_pack
def fn(x, y):
return x
arr = da.blockwise(fn, "x", da.ones(1000), "x", da.asarray(np.uint16(0)), None)
highlevelgraph_pack(arr.__dask_graph__(), None, None)
Having said that, we might consider making da.blockwise support NumPy arrays directly.
If I may ask @madsbk, is it expected that the last example I posted above used to work and now doesn't? That seems like it might be a regression, but I appreciate that I am no expert.
I think this is a regression, because if the argument index is None, then the docs say that the argument is treated as literal, not a dask array. Hence any literal will now need to be msgpackable, whereas previously it would all go through dask's dask.protocol.serialize.{serialize,deserialize} functionality.
I agree, this is a regression and if it something that is useful for people we should support it :)
This is quite critical/useful to my project (I am already using this behaviour), so a definite thumbs up from me.
@madsbk do you have a sense for what the best path forward is?
Blockwise.__dask_distributed_pack__ use dask-serializedumps_msgpack how to handle ndarrayshighlevelgraph_pack and highlevelgraph_unpack use / optionally fall back to dask's serialization?We've been discussing optimizing the serialization pipeline to go through MsgPack more directly (with fewer passes over the data) ( https://github.com/dask/distributed/issues/4379 ). The interest being to improve performance of serialization and simplify that code. This would effectively include 2. Though it doesn't preclude us from doing other things as well in the interim
It seems that msgpack-numpy is an adequate workaround in the interim. Not sure if it is something to rely on, but it unblocks me for now.
Sorry if I overlooked it, but what NumPy version is this using?
Sorry if I overlooked it, but what NumPy version is this using?
numpy==1.20.0
Yeah NumPy 1.20 made some significant changes with dtypes. It might be worth retrying with 1.19 to see if this is related to those changes in NumPy
Yeah NumPy 1.20 made some significant changes with dtypes. It might be worth retrying with 1.19 to see if this is related to those changes in NumPy
Sure! Will let you know what happens.
I have tested numpy==1.19.5 and, unfortunately, the problem persists.
Interesting thanks for the update
What if np.dtype("u2") is used instead? Or even just"u2"? Maybe something peculiar about np.uint16 as opposed to a dtype object
Or perhaps it is just the scalar value? Can 0 be used without typing?
What if
np.dtype("u2")is used instead? Or even just"u2"? Maybe something peculiar aboutnp.uint16as opposed to a dtype object
I have tried this - it doesn't matter what the specific type is. I have tried several and they all fail (np.float64 and np.dtype('f8') both fail, as an example). I think it just boils down to none of them being msgpack serializable.
Or perhaps it is just the scalar value? Can
0be used without typing?
Scalar values work but likely only because msgpack can cope with Python's builtins. np.float32(0) does not work, for example.
Right so I think we are getting the crux of the issue. Namely I don鈥檛 think Dask (currently) handles serialization of NumPy scalars. I think this support could be added and should be pretty straightforward. Is this something you would be interested in doing? 馃檪
Right so I think we are getting the crux of the issue. Namely Dask doesn鈥檛 (currently) handle serialization of NumPy scalars. I think this support could be added and should be pretty straightforward. Is this something you would be interested in doing? slightly_smiling_face
I don't believe this only affects numpy scalars and arrays, as the following example demonstrates:
import numpy as np
import dask
import dask.array as da
from distributed import Client
class A:
def __init__(self, value):
self._value = value
def __reduce__(self):
return (A, (self._value,))
@property
def value(self):
return self._value
def fn(data, a):
return data * a.value
def test():
data = da.ones(1000, chunks=100)
a = A(2)
xformed = da.blockwise(fn, "x",
data, "x",
a, None,
dtype=data.dtype)
client = Client(processes=False)
# Succeeds
result = client.compute(xformed, optimize_graph=True)
# Fails
result = client.compute(xformed, optimize_graph=False)
if __name__ == "__main__":
test()
With optimisation (the default distributed/dask behaviour for a long time), the above succeeds, because
dask.protocol.serialize.{serialize,deserialize} is invoked on the optimised low-level graph. dask.protocol.serialize.{serialize,deserialize} can defer to custom serialisation routines, in addition to standard pickle. blockwise does support embedding literals (presumably of any type) in the graph.
Without optimisation, the above fails because dask.protocol.highlevelgraph.highlevelgraph_pack is invoked on the HLG, which only supports msgpack as a serialisation protocol.
This regression is only exposed when graph optimisation is turned off.
I think the way forward here is to determine why msgpack is the only serialisation option for HLG's. IIRC this is to avoid remote code execution on the scheduler?
If not, would it be possible to use distributed standard serialisation routines for HLG serialisation?
I believe (though Mads may correct me) the reason for HLG using only msgpack is to keep serialization efficient.
There鈥檚 some additional overhead we have been finding in Dask鈥檚 custom serialization, but we are looking at reworking that to be more performant ( https://github.com/dask/distributed/pull/4531 )
I think the way forward here is to determine why msgpack is the only serialisation option for HLG's. IIRC this is to avoid remote code execution on the scheduler?
Yes, this is primarily to avoid remote code execution on the scheduler.
Since the literals are not need by the scheduler but by the workers, we can make Blockwise.__dask_distributed_pack__ serialize them using to_serialize(). This should be fairly straightforward to handle once #4531 is in. If there is no rush, I can put it on my to do list.
Thanks @madsbk, I would appreciate that! As mentioned, I have managed to use msgpack-numpy to unblock myself in the interim so I am not in a particular rush.
Turns out that my msgpack-numpy trick only works in some cases. So this is perhaps a little more urgent than I thought.
@JSKenyon can you please test with Mads PR ( https://github.com/dask/dask/pull/7353 ) (just merged)? 馃檪
@jakirkham Will do! If not today, then tomorrow.
This seems to have resolved the problem! Thanks @madsbk! I will be doing further testing over the coming week and will let you know if I encounter any further issues.
Awesome, thanks @JSKenyon
Most helpful comment
This seems to have resolved the problem! Thanks @madsbk! I will be doing further testing over the coming week and will let you know if I encounter any further issues.