I'm encountering the following exception when trying to perform a custom tree-reduce on a set of large cupy objects. I don't get this exception with smaller objects, so I have not been able to reproduce it with the normal pytests. The n_features in the HashingVectorizer that is used as input to the Naive Bayes pytest, however, can be modified to 8M in order to reproduce this exception.
Traceback (most recent call last):
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 255, in deserialize
deserializers=deserializers,
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 268, in deserialize
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 608, in deserialize
v = deserialize(h, f)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 268, in deserialize
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 63, in cuda_deserialize_cupy_ndarray
frame = PatchedCudaArrayInterface(frame)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
distributed.utils - ERROR - 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
yield
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/comm/ucx.py", line 207, in read
frames, deserialize=self.deserialize, deserializers=deserializers
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
res = await offload(_from_frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
frames, deserialize=deserialize, deserializers=deserializers
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 255, in deserialize
deserializers=deserializers,
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 268, in deserialize
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 608, in deserialize
v = deserialize(h, f)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 268, in deserialize
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 63, in cuda_deserialize_cupy_ndarray
frame = PatchedCudaArrayInterface(frame)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
distributed.worker - ERROR - 'bytes' object has no attribute '__cuda_array_interface__'
I believe this is the same as https://github.com/rapidsai/ucx-py/issues/421, and while it only occurs when protocol=ucx, the stack trace is giving me all dask.distributed errors, so I've opted to start a fresh thread here.
cc @jakirkham
I think I found the culprit here. I raised this value [1] to 2^28 and everything seems to execute successfully. @jakirkham pointed out that it looks like something is modifying the frames but not the header and I've verified the large frames that are causing the issue are passing through [2] inside frame_split_size.
I also verified this is where it's being called [3] and I don't see the header being updated to reflect that the cupy array being serialized is now on host. However, IIUC, modifying the header here would mean the deserializer would produce a different object than what was serialized.
I'd think we would still want a CuPy array to be returned from the deserializer, right? This would imply that we either want the the CuPy deserializer to understand how to deserialize memoryview objects or that we don't want cuda frames moved to host when broken into smaller chunks.
cc @pentschev @quasiben
[1] https://github.com/dask/distributed/blob/master/distributed/protocol/utils.py#L6
[2] https://github.com/dask/distributed/blob/master/distributed/protocol/utils.py#L49
[3] https://github.com/dask/distributed/blob/master/distributed/protocol/core.py#L56
@cjnolet I'm confused about how the byte array is getting to frame_split_size to begin with. Is the dask serializer being used in cuML ?
@quasiben, it's a cupy ndarray all the way through the frame_split_size. I believe it might be getting turned into the byte array in the msgpack serialization, somehow.
We are using the dask serializer and also the cuda serializer in cuml. This error seems to only happen when UCX protocol is used in Dask, however.
I would be surprised if a cupy array arrived at msgpack. I would certainly expect this to fail.
The frame_split_size function was designed to cut up very large frames (some communication and compression libraries can't handle things above a certain size). In Numpy+TCP-land we cut up large memoryviews into many smaller memoryviews. I don't know what CuPy+UCX does though in this case.
Running the MRE posted in https://github.com/rapidsai/ucx-py/issues/421 and I'll get back with an update shortly
@mrocklin, my mistake. I saw the msgpack.dumps at the end of protocol.core.dumps and assumed it was doing this for the payload.
The part I find weird is that everything works when my arrays are smaller than BIG_BYTES_SHARD_SIZE and the serialization fails with a mismatched header in the deserializer when the threshold is surpassed. Increasing the value of BIG_BYTES_SHARD_SIZE appears to cause successful execution (not saying we need to change this value, btw).
I ran with latest ucx-py/distributed/cuml and was not able to reproduce the error
Sorry not latest of cuml:
cuml 0.13.0a200313 cuda10.1_py37_1380 rapidsai-nightly
libcuml 0.13.0a200313 cuda10.1_1380 rapidsai-nightly
libcumlprims 0.13.0a200313 cuda10.1_11 rapidsai-nightly
@quasiben Did you make the changes outlined in the Github issue?
@quasiben, actually, here's a more simple reproducible example:
import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import cupy as cp
cluster = LocalCUDACluster(protocol="ucx")
client = Client(cluster)
def make_large_data(x):
return cp.random.random((5, 8000000)), cp.random.random((5))
fut = [client.submit(make_large_data, i) for i in range(5)]
client.compute(fut, sync=True)
This is the exception I get:
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/core.py", line 125, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 258, in deserialize
deserializers=deserializers,
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 271, in deserialize
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 63, in cuda_deserialize_cupy_ndarray
frame = PatchedCudaArrayInterface(frame)
File "/raid/cnolet/miniconda3/envs/cuml_dev_013/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
distributed.utils - ERROR - 'bytes' object has no attribute '__cuda_array_interface__'
Thanks for that simple reproducer Corey! 馃槃
Lol, it took me awhile to figure out the behavior that was causing it to be able to reproduce it.
Still digging through this but I think it's possible we are inadvertently calling memoryview on part of the data during merge_frames:
FWIW CuPy doesn't support the Python Buffer Protocol ( https://github.com/cupy/cupy/issues/1532 ). So if it got there, we would see an error. Not sure how Distributed would handle that (if at all).
In [1]: import cupy
In [2]: a = cupy.arange(5)
In [3]: memoryview(a)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-3-dd900d27ccee> in <module>
----> 1 memoryview(a)
TypeError: memoryview: a bytes-like object is required, not 'cupy.core.core.ndarray'
I think I found the issue. With large and irregular sizes, within merge_frames is logic which calls ensure_bytes:
ensure_bytes calls bytes, not on a CuPy array, but rather and rmm.DeviceBuffer:
bytes(rmm.DeviceBuffer(size=10))
We can avoid this routine by a check at the end of the merge_frames routine:
else:
if any([hasattr(f, "__cuda_array_interface__") for f in L]):
out.extend(L)
else:
out.append(b"".join(map(ensure_bytes, L)))
That's a nice find Ben! 馃槃
IIUC this function is used to combine frames into their original sizes. So how do we wind up in merge_frames from the UCX side? Are we splitting and merging the frames there somehow?
Thank you for digging into this, Ben!
I ran the Naive Bayes example again after making the change and I'm getting a little strange behavior. I modified the reproducer script slightly so that I could try to see the values in the resulting arrays, but I'm getting an exception. Maybe it's possible there's still something being done on the UCX side that is causing this one.
import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import cupy as cp
if __name__ == "__main__":
cluster = LocalCUDACluster(protocol="ucx")
client = Client(cluster)
def make_large_data(x):
return cp.ones((5, 8000000)), cp.ones((5))
fut = [client.submit(make_large_data, i) for i in range(5)]
local = [cp.sum(f[0], axis=1) for f in client.compute(fut, sync=True)]
print(str(local))
Here's the exception:
Traceback (most recent call last):
File "reproduce.py", line 19, in <module>
print(str(local))
File "cupy/core/core.pyx", line 1363, in cupy.core.core.ndarray.__repr__
File "cupy/core/core.pyx", line 1456, in cupy.core.core.ndarray.get
File "cupy/core/core.pyx", line 1462, in cupy.core.core.ndarray.get
File "cupy/cuda/memory.pyx", line 449, in cupy.cuda.memory.MemoryPointer.copy_to_host
File "cupy/cuda/runtime.pyx", line 378, in cupy.cuda.runtime.memcpy
File "cupy/cuda/runtime.pyx", line 201, in cupy.cuda.runtime.check_status
cupy.cuda.runtime.CUDARuntimeError: cudaErrorIllegalAddress: an illegal memory access was encountered
Traceback (most recent call last):
File "cupy/cuda/driver.pyx", line 247, in cupy.cuda.driver.moduleUnload
File "cupy/cuda/driver.pyx", line 118, in cupy.cuda.driver.check_status
cupy.cuda.driver.CUDADriverError: CUDA_ERROR_ILLEGAL_ADDRESS: an illegal memory access was encountered
Exception ignored in: 'cupy.cuda.function.Module.__dealloc__'
Traceback (most recent call last):
File "cupy/cuda/driver.pyx", line 247, in cupy.cuda.driver.moduleUnload
File "cupy/cuda/driver.pyx", line 118, in cupy.cuda.driver.check_status
cupy.cuda.driver.CUDADriverError: CUDA_ERROR_ILLEGAL_ADDRESS: an illegal memory access was encountered
Can you guys see if this error occurs for you?
We can avoid this routine by a check at the end of the merge_frames routine:
Alternatively, maybe with UCX we shouldn't be splitting and merging frames at all?
@cjnolet, yup I am seeing that as well.
@mrocklin @jakirkham seems like you two both want the same thing :)
I just verified that not splitting frames with __cuda_array_interface__ appear to make the reproducible example succeed.
if nbytes(frame) > n and not hasattr(frame, "__cuda_array_interface__"):
https://github.com/dask/distributed/blob/master/distributed/protocol/utils.py#L41
I think we want to check if we are using UCX not if the frame has __cuda_array_interface__
Sounds good to me! I just wanted to verify that it works when the frame has not been split
@cjnolet, can you please give PR ( https://github.com/dask/distributed/pull/3584 ) a try? 馃檪