Cudf: [BUG] `repartition` failing on multiple-workers

Created on 17 Jul 2019  路  19Comments  路  Source: rapidsai/cudf

Describe the bug
I am running into issues when i try to re partition on (multiple workers>=4) even though i have enough memory , I am at just 43 % capacity.

This seems to work fine if i have just 2 workers.

Steps/Code to reproduce bug
Helper Functions

import cudf
import dask
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait

import numpy as np
import pandas as pd
import numpy as np
import io


n_cols = 40
#n_rows per worker 
n_rows = 4_850_000 
n_parts_per_worker = 8
# appx 33 Mill per worker

dtypes = dict(zip([str(i) for i in range(0,n_cols)],[np.int32]*(n_cols)))
df = pd.read_csv(io.StringIO(""),names=list(dtypes.keys()), dtype=dtypes)
meta_df = cudf.from_pandas(df)


#works with 2 workers but fails at 4
n_workers = 4
# Create Cluster
cluster = LocalCUDACluster(n_workers=n_workers)
client = Client(cluster)


## DataFrame Helper Function
def create_df(n_rows,n_cols):
    df=cudf.DataFrame()
    for col_id in range(0,n_cols):
        df[str(col_id)]= np.ones(shape=n_rows,dtype=np.int32)
    return df

Create dataframe

# Create Data Frame
parts = [dask.delayed(create_df)(n_rows,n_cols=n_cols) for i in range(0,n_workers*n_parts_per_worker)]
df = dask_cudf.from_delayed(parts,meta=meta_df)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))
! nvidia-smi

nvidia-smi output

Wed Jul 17 11:56:55 2019       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.48                 Driver Version: 410.48                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P100-SXM2...  On   | 00000000:06:00.0 Off |                    0 |
| N/A   35C    P0    48W / 300W |   7104MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla P100-SXM2...  On   | 00000000:07:00.0 Off |                    0 |
| N/A   35C    P0    49W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla P100-SXM2...  On   | 00000000:0A:00.0 Off |                    0 |
| N/A   32C    P0    46W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   3  Tesla P100-SXM2...  On   | 00000000:0B:00.0 Off |                    0 |
| N/A   32C    P0    44W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   4  Tesla P100-SXM2...  On   | 00000000:85:00.0 Off |                    0 |
| N/A   32C    P0    33W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   5  Tesla P100-SXM2...  On   | 00000000:86:00.0 Off |                    0 |
| N/A   31C    P0    34W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   6  Tesla P100-SXM2...  On   | 00000000:89:00.0 Off |                    0 |
| N/A   31C    P0    35W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   7  Tesla P100-SXM2...  On   | 00000000:8A:00.0 Off |                    0 |
| N/A   32C    P0    32W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     59424      C   ...naconda3/envs/rapids_nightly/bin/python   347MiB |
|    0     59468      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    1     59472      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    2     59466      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    3     59470      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
+-----------------------------------------------------------------------------+

Repartition df

## repartition Df
## Run into OOM issues at repartition
df = df.repartition(npartitions=n_workers)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))

Error Trace

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-3-219dd226ab30> in <module>
      4 df = df.persist()
      5 wait(df)
----> 6 print("len of df = {:,}".format(len(df)))

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
    510     def __len__(self):
    511         return self.reduction(
--> 512             len, np.sum, token="len", meta=int, split_every=False
    513         ).compute()
    514 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2525                     should_rejoin = False
   2526             try:
-> 2527                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2528             finally:
   2529                 for f in futures.values():

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1821                 direct=direct,
   1822                 local_worker=local_worker,
-> 1823                 asynchronous=asynchronous,
   1824             )
   1825 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    761         else:
    762             return sync(
--> 763                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    764             )
    765 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330             e.wait(10)
    331     if error[0]:
--> 332         six.reraise(*error[0])
    333     else:
    334         return result[0]

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
    740                     if exc_info is not None:
    741                         try:
--> 742                             yielded = self.gen.throw(*exc_info)  # type: ignore
    743                         finally:
    744                             # Break up a reference to itself

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1678                             exc = CancelledError(key)
   1679                         else:
-> 1680                             six.reraise(type(exception), exception, traceback)
   1681                         raise exc
   1682                     if errors == "skip":

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/methods.py in concat()
    341         func = concat_dispatch.dispatch(type(dfs[0]))
    342         return func(
--> 343             dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
    344         )
    345 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf()
     31     assert axis == 0
     32     assert join == "outer"
---> 33     return cudf.concat(dfs)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/multi.py in concat()
     52 
     53     if typ is DataFrame:
---> 54         return DataFrame._concat(objs, axis=axis, ignore_index=ignore_index)
     55     elif typ is Series:
     56         return Series._concat(objs, axis=axis)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in _concat()
   1444         data = [
   1445             (c, Series._concat([o[c] for o in objs], index=index))
-> 1446             for c in objs[0].columns
   1447         ]
   1448         out = cls(data)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in <listcomp>()
   1444         data = [
   1445             (c, Series._concat([o[c] for o in objs], index=index))
-> 1446             for c in objs[0].columns
   1447         ]
   1448         out = cls(data)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/series.py in _concat()
   1035         else:
   1036             name = None
-> 1037         col = Column._concat([o._column for o in objs])
   1038         return cls(data=col, index=index, name=name)
   1039 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/column.py in _concat()
    110         # Performance the actual concatenation
    111         if newsize > 0:
--> 112             col = _column_concat(objs, col)
    113 
    114         return col

cudf/bindings/concat.pyx in cudf.bindings.concat._column_concat()

RuntimeError: CUDA error encountered at: /conda/envs/gdf/conda-bld/libcudf_1563314405241/work/cpp/src/column/legacy/column.cpp:101: 11 cudaErrorInvalidValue invalid argument

Environment overview (please complete the following information)

  • Method of cuDF install: [conda]
cudf                      0.9.0a                py37_1312    rapidsai-nightly
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly/label/cuda10.0
libcudf                   0.9.0a            cuda10.0_1312    rapidsai-nightly
dask-cuda                 0.9.0a0+17.g3057f94.dirty          pypi_0    pypi

Edit: Added Nvidia-smi output.

Edit 2: Removed OOM from heading as this seems to be unrelated.
Sorry for the confusion

bug cuDF (Python) dask

Most helpful comment

In your case, I'd recommend chunks in the gigabyte range, unless there is some very pressing need to go larger.

All 19 comments

Is this issue deterministic? I just ran the example above on a DGX with 16GB GPUs and I can't reproduce that. nvidia-smi also doesn't report memory growing past 7399MB for me.

For completeness, I tested this on nightly build.

@pentschev , Can you provide the exact versions that you are on , Cause i am on nightly too (just updated everything) .

This seems to be non-deterministic for me.

Also, this might be un-related to OOM cause it seems to be peaking at 12000 MiB for me but i can still not do a re partition successfully.

I have updated the heading to reflect that.

%%bash
conda list | grep 'cudf'
conda list | grep 'dask'
cudf                      0.9.0a                py37_1412    rapidsai-nightly
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly/label/cuda10.0
libcudf                   0.9.0a            cuda10.0_1412    rapidsai-nightly
dask                      2.1.0                      py_0  
dask-core                 2.1.0                      py_0  
dask-cuda                 0.9.0a0+17.g3057f94.dirty          pypi_0    pypi
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly/label/cuda10.0 

@VibhuJawa here's a list of what I'm using:

cudf                      0.9.0a                py37_1435    rapidsai-nightly
dask                      2.1.0                      py_0    conda-forge
dask-core                 2.1.0                      py_0    conda-forge
dask-cuda                 0.9.0a0+17.g3057f94.dirty          pypi_0    pypi
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly
libcudf                   0.9.0a             cuda9.2_1435    rapidsai-nightly

I still couldn't reproduce this after some 3-4 attempts just now

The only major difference I see is you're using CUDA 10, not sure if that is of big concern. Also, I just updated everything, so maybe your cudf build is from yesterday.

Alright, I was able to reproduce this, and it was my mistake that I couldn't before (I was only running the first two blocks of code, and missed the third one).

This is directly related to the issue in https://github.com/rapidsai/dask-cuda/issues/57. What happens here is the worker crashes due to the amount of data. Setting device_memory_limit may help by spilling it to disk, but it will make it slower.

I discussed offline with @VibhuJawa, and for such pipelines we have two options:

  1. Paying the price of working with smaller chunks; or
  2. Paying the price of spilling data to host more often.

Both will incur in overhead, and this may be very dependent on the algorithm in question, but I tend to believe that option 1 will tend to perform better.

I have been working on benchmarking alternatives for spilling memory in https://github.com/rapidsai/dask-cuda/issues/92, but the outlook isn't great. Besides the cost of copying the memory to host, there's also a cost into serializing that memory. For an idea of the current status, spilling to host has currently a bandwidth of about 550 MB/s, in contrast to the real bandwidth we can achieve of 6 GB/s when using unpinned memory. I expect to be able to speedup serialization, but the actual spilling bandwidth will certainly be < 6 GB/s.

@pentschev ,

Apologies for the late follow up.

Just to be clear, could you make spill over work with repartition on smaller chunks on a 16 gb card ?

I changed the number of rows to just 37_890 and parts per worker to 1024 . (We have about 38 million rows per worker) and set device_memory_limit='10000 MiB' but am still getting the same error.

## n_rows per worker 
n_rows = 4_850_000//128 (`37890`)
n_parts_per_worker = 8*128

Why do you want only one partition per worker? This seems possibly inefficient. For example, what happens if a few of the partitions end up on the same worker? Dask makes no guarantees about behavior here.

So if I have a 16GB card, and 7GB of memory per worker spread among lots of partitions. Then I move the data around so that I can repartition it. Lets say that in a pessimistic case most pieces of data have to move, so now I have 14 GB of memory occupied per worker. Then, just for a moment, I need to copy 7GB of that data into a single 7GB dataframe before I can release the 7GB of smaller dataframes. Now I have, briefly, 21 GB of data on my 16GB cards.

Perhaps this explains the problem?

Dask's scheduling heuristics aren't good at playing very close to the line like this. I recommend keeping many small partitions. Things work more smoothly. If you need giant chunks for some reason, then Dask may not be a good fit. This approach is more common in MPI workloads.

Is there a general estimate on how close can we be to the limit? Usually on host based workflows there is a sweet spot for the number of partitions (not too many, not too few) which gives the best performance.
My experience running different workflows using Dask_cudf has been that the performance seems to keep improving as we increase the partition size to being as large as possible without going OOM. So often while optimizing for performance I seem to keep pushing for giant chunks until I get memory errors.

Our general recommendations are here: https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions . Note that in your case I'm assuming that you're running with one thread rather than 10, as in that example.

You should have space for a few times more chunks to be in memory than you have concurrent tasks.

My understanding from @kkraus14 was that performance benefits should drop off a bit after you reach a large enough size to saturate all of the thread blocks. The estimate I was given was that this was likely to occur in the few hundred MB range. I don't know what's optimal though, it sounds like you have evidence against this.

In your case, I'd recommend chunks in the gigabyte range, unless there is some very pressing need to go larger.

Why do you want only one partition per worker? This seems possibly inefficient. For example, what happens if a few of the partitions end up on the same worker? Dask makes no guarantees about behavior here.

I wanted to re-partition this because i was sending it to Dask-XGBoost to train, which concats the dataframes on each worker before training and i wanted to side-step that by doing this re-partition.

See: https://github.com/rapidsai/dask-xgboost/blob/dask-cudf/dask_xgboost/core.py#L67-L69

@mt-jones , Can you please confirm if we can prevent the memory shoot up we were seeing by doing this or is it completely unrelated ?

I have a 16GB card, and 7GB of memory per worker spread among lots of partitions. Then I move the data around so that I can repartition it. Lets say that in a pessimistic case most pieces of data have to move, so now I have 14 GB of memory occupied per worker. Then, just for a moment, I need to copy 7GB of that data into a single 7GB dataframe before I can release the 7GB of smaller dataframes. Now I have, briefly, 21 GB of data on my 16GB cards.

Perhaps this explains the problem?

Yup, that explains it but can we not side-step that problem by doing the below config changes ?

dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})

On the performance aspects of large chunks vs small-chunks, my experience is similar as @ayushdg .

I ran some performance tests on a workflow yesterday which i can share on side-channels which show the performance drop with decreasing chunksize .

I do not immediately see how those changes would avoid the situation I'm talking about above.

The bit that does this in XGBoost intentionally looks at the data that is already on the workers and concatenates that. That should remove the extra 7GB from moving data around. This is unrelated to the config settings you mention.

I ran some performance tests on a workflow yesterday which i can share on side-channels which show the performance drop with decreasing chunksize .

FWIW I think that it'd be great to show such a benchmark in a public place like a GitHub issue if you're able to make something that is shareable.

FWIW I think that it'd be great to show such a benchmark in a public place like a GitHub issue if you're able to make something that is shareable.

Okay, let me try to run the same operations on some dummy data and try to make something sharable.

Also, I suspect that the conversation will be able to engage more people if it doesn't include Dask. It would be interesting to see how cudf operations react as data sizes get larger. If you can show performance degredation with small chunks with just cudf then you'll have a lot of people who can engage on the problem. If Dask is involved then it's probably just a couple of us (most of whom are saturated).

This is resolved as of 0.14.

Was this page helpful?
0 / 5 - 0 ratings