Describe the bug
I am running into issues when i try to re partition on (multiple workers>=4) even though i have enough memory , I am at just 43 % capacity.
This seems to work fine if i have just 2 workers.
Steps/Code to reproduce bug
Helper Functions
import cudf
import dask
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import numpy as np
import pandas as pd
import numpy as np
import io
n_cols = 40
#n_rows per worker
n_rows = 4_850_000
n_parts_per_worker = 8
# appx 33 Mill per worker
dtypes = dict(zip([str(i) for i in range(0,n_cols)],[np.int32]*(n_cols)))
df = pd.read_csv(io.StringIO(""),names=list(dtypes.keys()), dtype=dtypes)
meta_df = cudf.from_pandas(df)
#works with 2 workers but fails at 4
n_workers = 4
# Create Cluster
cluster = LocalCUDACluster(n_workers=n_workers)
client = Client(cluster)
## DataFrame Helper Function
def create_df(n_rows,n_cols):
df=cudf.DataFrame()
for col_id in range(0,n_cols):
df[str(col_id)]= np.ones(shape=n_rows,dtype=np.int32)
return df
Create dataframe
# Create Data Frame
parts = [dask.delayed(create_df)(n_rows,n_cols=n_cols) for i in range(0,n_workers*n_parts_per_worker)]
df = dask_cudf.from_delayed(parts,meta=meta_df)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))
! nvidia-smi
nvidia-smi output
Wed Jul 17 11:56:55 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.48 Driver Version: 410.48 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla P100-SXM2... On | 00000000:06:00.0 Off | 0 |
| N/A 35C P0 48W / 300W | 7104MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla P100-SXM2... On | 00000000:07:00.0 Off | 0 |
| N/A 35C P0 49W / 300W | 6757MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla P100-SXM2... On | 00000000:0A:00.0 Off | 0 |
| N/A 32C P0 46W / 300W | 6757MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 3 Tesla P100-SXM2... On | 00000000:0B:00.0 Off | 0 |
| N/A 32C P0 44W / 300W | 6757MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 4 Tesla P100-SXM2... On | 00000000:85:00.0 Off | 0 |
| N/A 32C P0 33W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 5 Tesla P100-SXM2... On | 00000000:86:00.0 Off | 0 |
| N/A 31C P0 34W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 6 Tesla P100-SXM2... On | 00000000:89:00.0 Off | 0 |
| N/A 31C P0 35W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 7 Tesla P100-SXM2... On | 00000000:8A:00.0 Off | 0 |
| N/A 32C P0 32W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| 0 59424 C ...naconda3/envs/rapids_nightly/bin/python 347MiB |
| 0 59468 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
| 1 59472 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
| 2 59466 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
| 3 59470 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
+-----------------------------------------------------------------------------+
Repartition df
## repartition Df
## Run into OOM issues at repartition
df = df.repartition(npartitions=n_workers)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))
Error Trace
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-3-219dd226ab30> in <module>
4 df = df.persist()
5 wait(df)
----> 6 print("len of df = {:,}".format(len(df)))
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
510 def __len__(self):
511 return self.reduction(
--> 512 len, np.sum, token="len", meta=int, split_every=False
513 ).compute()
514
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
173 dask.base.compute
174 """
--> 175 (result,) = compute(self, traverse=False, **kwargs)
176 return result
177
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
444 keys = [x.__dask_keys__() for x in collections]
445 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446 results = schedule(dsk, keys, **kwargs)
447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
448
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2525 should_rejoin = False
2526 try:
-> 2527 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2528 finally:
2529 for f in futures.values():
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1821 direct=direct,
1822 local_worker=local_worker,
-> 1823 asynchronous=asynchronous,
1824 )
1825
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
761 else:
762 return sync(
--> 763 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
764 )
765
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
330 e.wait(10)
331 if error[0]:
--> 332 six.reraise(*error[0])
333 else:
334 return result[0]
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in f()
315 if callback_timeout is not None:
316 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317 result[0] = yield future
318 except Exception as exc:
319 error[0] = sys.exc_info()
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
740 if exc_info is not None:
741 try:
--> 742 yielded = self.gen.throw(*exc_info) # type: ignore
743 finally:
744 # Break up a reference to itself
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1678 exc = CancelledError(key)
1679 else:
-> 1680 six.reraise(type(exception), exception, traceback)
1681 raise exc
1682 if errors == "skip":
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/methods.py in concat()
341 func = concat_dispatch.dispatch(type(dfs[0]))
342 return func(
--> 343 dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
344 )
345
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf()
31 assert axis == 0
32 assert join == "outer"
---> 33 return cudf.concat(dfs)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/multi.py in concat()
52
53 if typ is DataFrame:
---> 54 return DataFrame._concat(objs, axis=axis, ignore_index=ignore_index)
55 elif typ is Series:
56 return Series._concat(objs, axis=axis)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in _concat()
1444 data = [
1445 (c, Series._concat([o[c] for o in objs], index=index))
-> 1446 for c in objs[0].columns
1447 ]
1448 out = cls(data)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in <listcomp>()
1444 data = [
1445 (c, Series._concat([o[c] for o in objs], index=index))
-> 1446 for c in objs[0].columns
1447 ]
1448 out = cls(data)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/series.py in _concat()
1035 else:
1036 name = None
-> 1037 col = Column._concat([o._column for o in objs])
1038 return cls(data=col, index=index, name=name)
1039
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/column.py in _concat()
110 # Performance the actual concatenation
111 if newsize > 0:
--> 112 col = _column_concat(objs, col)
113
114 return col
cudf/bindings/concat.pyx in cudf.bindings.concat._column_concat()
RuntimeError: CUDA error encountered at: /conda/envs/gdf/conda-bld/libcudf_1563314405241/work/cpp/src/column/legacy/column.cpp:101: 11 cudaErrorInvalidValue invalid argument
Environment overview (please complete the following information)
cudf 0.9.0a py37_1312 rapidsai-nightly
dask-cudf 0.9.0a1 py37_0 rapidsai-nightly/label/cuda10.0
libcudf 0.9.0a cuda10.0_1312 rapidsai-nightly
dask-cuda 0.9.0a0+17.g3057f94.dirty pypi_0 pypi
Edit: Added Nvidia-smi output.
Edit 2: Removed OOM from heading as this seems to be unrelated.
Sorry for the confusion
Is this issue deterministic? I just ran the example above on a DGX with 16GB GPUs and I can't reproduce that. nvidia-smi also doesn't report memory growing past 7399MB for me.
For completeness, I tested this on nightly build.
@pentschev , Can you provide the exact versions that you are on , Cause i am on nightly too (just updated everything) .
This seems to be non-deterministic for me.
Also, this might be un-related to OOM cause it seems to be peaking at 12000 MiB for me but i can still not do a re partition successfully.
I have updated the heading to reflect that.
%%bash
conda list | grep 'cudf'
conda list | grep 'dask'
cudf 0.9.0a py37_1412 rapidsai-nightly
dask-cudf 0.9.0a1 py37_0 rapidsai-nightly/label/cuda10.0
libcudf 0.9.0a cuda10.0_1412 rapidsai-nightly
dask 2.1.0 py_0
dask-core 2.1.0 py_0
dask-cuda 0.9.0a0+17.g3057f94.dirty pypi_0 pypi
dask-cudf 0.9.0a1 py37_0 rapidsai-nightly/label/cuda10.0
@VibhuJawa here's a list of what I'm using:
cudf 0.9.0a py37_1435 rapidsai-nightly
dask 2.1.0 py_0 conda-forge
dask-core 2.1.0 py_0 conda-forge
dask-cuda 0.9.0a0+17.g3057f94.dirty pypi_0 pypi
dask-cudf 0.9.0a1 py37_0 rapidsai-nightly
libcudf 0.9.0a cuda9.2_1435 rapidsai-nightly
I still couldn't reproduce this after some 3-4 attempts just now
The only major difference I see is you're using CUDA 10, not sure if that is of big concern. Also, I just updated everything, so maybe your cudf build is from yesterday.
Alright, I was able to reproduce this, and it was my mistake that I couldn't before (I was only running the first two blocks of code, and missed the third one).
This is directly related to the issue in https://github.com/rapidsai/dask-cuda/issues/57. What happens here is the worker crashes due to the amount of data. Setting device_memory_limit may help by spilling it to disk, but it will make it slower.
I discussed offline with @VibhuJawa, and for such pipelines we have two options:
Both will incur in overhead, and this may be very dependent on the algorithm in question, but I tend to believe that option 1 will tend to perform better.
I have been working on benchmarking alternatives for spilling memory in https://github.com/rapidsai/dask-cuda/issues/92, but the outlook isn't great. Besides the cost of copying the memory to host, there's also a cost into serializing that memory. For an idea of the current status, spilling to host has currently a bandwidth of about 550 MB/s, in contrast to the real bandwidth we can achieve of 6 GB/s when using unpinned memory. I expect to be able to speedup serialization, but the actual spilling bandwidth will certainly be < 6 GB/s.
@pentschev ,
Apologies for the late follow up.
Just to be clear, could you make spill over work with repartition on smaller chunks on a 16 gb card ?
I changed the number of rows to just 37_890 and parts per worker to 1024 . (We have about 38 million rows per worker) and set device_memory_limit='10000 MiB' but am still getting the same error.
## n_rows per worker
n_rows = 4_850_000//128 (`37890`)
n_parts_per_worker = 8*128
Why do you want only one partition per worker? This seems possibly inefficient. For example, what happens if a few of the partitions end up on the same worker? Dask makes no guarantees about behavior here.
So if I have a 16GB card, and 7GB of memory per worker spread among lots of partitions. Then I move the data around so that I can repartition it. Lets say that in a pessimistic case most pieces of data have to move, so now I have 14 GB of memory occupied per worker. Then, just for a moment, I need to copy 7GB of that data into a single 7GB dataframe before I can release the 7GB of smaller dataframes. Now I have, briefly, 21 GB of data on my 16GB cards.
Perhaps this explains the problem?
Dask's scheduling heuristics aren't good at playing very close to the line like this. I recommend keeping many small partitions. Things work more smoothly. If you need giant chunks for some reason, then Dask may not be a good fit. This approach is more common in MPI workloads.
Is there a general estimate on how close can we be to the limit? Usually on host based workflows there is a sweet spot for the number of partitions (not too many, not too few) which gives the best performance.
My experience running different workflows using Dask_cudf has been that the performance seems to keep improving as we increase the partition size to being as large as possible without going OOM. So often while optimizing for performance I seem to keep pushing for giant chunks until I get memory errors.
Our general recommendations are here: https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions . Note that in your case I'm assuming that you're running with one thread rather than 10, as in that example.
You should have space for a few times more chunks to be in memory than you have concurrent tasks.
My understanding from @kkraus14 was that performance benefits should drop off a bit after you reach a large enough size to saturate all of the thread blocks. The estimate I was given was that this was likely to occur in the few hundred MB range. I don't know what's optimal though, it sounds like you have evidence against this.
In your case, I'd recommend chunks in the gigabyte range, unless there is some very pressing need to go larger.
Why do you want only one partition per worker? This seems possibly inefficient. For example, what happens if a few of the partitions end up on the same worker? Dask makes no guarantees about behavior here.
I wanted to re-partition this because i was sending it to Dask-XGBoost to train, which concats the dataframes on each worker before training and i wanted to side-step that by doing this re-partition.
See: https://github.com/rapidsai/dask-xgboost/blob/dask-cudf/dask_xgboost/core.py#L67-L69
@mt-jones , Can you please confirm if we can prevent the memory shoot up we were seeing by doing this or is it completely unrelated ?
I have a 16GB card, and 7GB of memory per worker spread among lots of partitions. Then I move the data around so that I can repartition it. Lets say that in a pessimistic case most pieces of data have to move, so now I have 14 GB of memory occupied per worker. Then, just for a moment, I need to copy 7GB of that data into a single 7GB dataframe before I can release the 7GB of smaller dataframes. Now I have, briefly, 21 GB of data on my 16GB cards.
Perhaps this explains the problem?
Yup, that explains it but can we not side-step that problem by doing the below config changes ?
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})
On the performance aspects of large chunks vs small-chunks, my experience is similar as @ayushdg .
I ran some performance tests on a workflow yesterday which i can share on side-channels which show the performance drop with decreasing chunksize .
I do not immediately see how those changes would avoid the situation I'm talking about above.
The bit that does this in XGBoost intentionally looks at the data that is already on the workers and concatenates that. That should remove the extra 7GB from moving data around. This is unrelated to the config settings you mention.
I ran some performance tests on a workflow yesterday which i can share on side-channels which show the performance drop with decreasing chunksize .
FWIW I think that it'd be great to show such a benchmark in a public place like a GitHub issue if you're able to make something that is shareable.
FWIW I think that it'd be great to show such a benchmark in a public place like a GitHub issue if you're able to make something that is shareable.
Okay, let me try to run the same operations on some dummy data and try to make something sharable.
Also, I suspect that the conversation will be able to engage more people if it doesn't include Dask. It would be interesting to see how cudf operations react as data sizes get larger. If you can show performance degredation with small chunks with just cudf then you'll have a lot of people who can engage on the problem. If Dask is involved then it's probably just a couple of us (most of whom are saturated).
This is resolved as of 0.14.
Most helpful comment
In your case, I'd recommend chunks in the gigabyte range, unless there is some very pressing need to go larger.