What is your question?
Hello, I am trying to merge 3 very large dataframes using multiple gpus. I am able to merge them together but when I try to save the resulting dataframe as csv or pandas dataframe using the compute() method. It gives out of memory error, even though only 400 mb of memory is used on each gpu. I have 4 2080Ti Max Q gpus,each with 12 gb memory. I have observed that this error is observed whenever I use the compute() function.
Code Snippets:
cluster = LocalCUDACluster()
client = Client(cluster)
sh = dask_cudf.read_csv(path_sh, npartitions = 4)
al = dask_cudf.read_csv(path_al, npartitions = 4)
pn = dask_cudf.read_csv(path_pn, npartitions = 4)
uk = dask_cudf.read_csv(path_uk, npartitions = 4)
sh = sh[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
pn = pn[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
uk = uk[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
al = al[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
def merge_mine(left,right,suffixes):
merge = left.merge(right,on='ID',how='outer',suffixes=suffixes)
return merge
m1 = merge_mine(sh,al,suffixes=('_sh','_al'))
m2 = merge_mine(m1,uk,suffixes=('_m1','_uk'))
m3 = merge_mine(m2,pn,suffixes=('_m2','_pn'))
m3.compute().to_csv('/Data')
MemoryError Traceback (most recent call last)
<ipython-input-21-4949ca2328c8> in <module>
----> 1 m3.compute().to_csv('/Data')
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
442 postcomputes.append(x.__dask_postcompute__())
443
--> 444 results = schedule(dsk, keys, **kwargs)
445 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
446
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2680 should_rejoin = False
2681 try:
-> 2682 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2683 finally:
2684 for f in futures.values():
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1980 direct=direct,
1981 local_worker=local_worker,
-> 1982 asynchronous=asynchronous,
1983 )
1984
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
830 else:
831 return sync(
--> 832 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
833 )
834
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1839 exc = CancelledError(key)
1840 else:
-> 1841 raise exception.with_traceback(traceback)
1842 raise exc
1843 if errors == "skip":
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/dask/utils.py in apply()
29 def apply(func, args, kwargs=None):
30 if kwargs:
---> 31 return func(*args, **kwargs)
32 else:
33 return func(*args)
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/dask/dataframe/multi.py in merge_chunk()
232 def merge_chunk(lhs, *args, **kwargs):
233 empty_index_dtype = kwargs.pop("empty_index_dtype", None)
--> 234 out = lhs.merge(*args, **kwargs)
235 # Workaround pandas bug where if the output result of a merge operation is
236 # an empty dataframe, the output index is `int64` in all cases, regardless
~/anaconda3/envs/pytorch/lib/python3.7/contextlib.py in inner()
72 def inner(*args, **kwds):
73 with self._recreate_cm():
---> 74 return func(*args, **kwds)
75 return inner
76
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/cudf/core/dataframe.py in merge()
3280 method,
3281 indicator,
-> 3282 suffixes,
3283 )
3284 return gdf_result
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/cudf/core/frame.py in _merge()
1502 suffixes,
1503 )
-> 1504 to_return = mergeop.perform_merge()
1505
1506 # If sort=True, Pandas would sort on the key columns in the
~/anaconda3/envs/pytorch/lib/python3.7/site-packages/cudf/core/join/join.py in perform_merge()
116 right_on=self.right_on,
117 left_index=self.left_index,
--> 118 right_index=self.right_index,
119 )
120 result = self.out_class._from_table(libcudf_result)
cudf/_lib/join.pyx in cudf._lib.join.join()
cudf/_lib/join.pyx in cudf._lib.join.join()
MemoryError: std::bad_alloc: CUDA error at: /conda/conda-bld/librmm_1591196517113/work/include/rmm/mr/device/cuda_memory_resource.hpp66: cudaErrorMemoryAllocation out of memory
I am encountering the exact same issue here, would love to hear a potential solution.
Output of nvidia-smi:
| NVIDIA-SMI 418.116.00 Driver Version: 418.116.00 CUDA Version: 10.1 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla V100-SXM3... On | 00000000:39:00.0 Off | 0 |
| N/A 39C P0 67W / 350W | 4110MiB / 32480MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla V100-SXM3... On | 00000000:3B:00.0 Off | 0 |
| N/A 36C P0 50W / 350W | 11MiB / 32480MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla V100-SXM3... On | 00000000:57:00.0 Off | 0 |
| N/A 32C P0 50W / 350W | 11MiB / 32480MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 3 Tesla V100-SXM3... On | 00000000:59:00.0 Off | 0 |
| N/A 37C P0 48W / 350W | 11MiB / 32480MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| 0 663 C /user/es.aau.dk/kh/miniconda3/bin/python 1821MiB |
| 0 96897 C /user/es.aau.dk/kh/miniconda3/bin/python 569MiB |
| 0 96985 C /user/es.aau.dk/kh/miniconda3/bin/python 569MiB |
| 0 97057 C /user/es.aau.dk/kh/miniconda3/bin/python 569MiB |
| 0 97136 C /user/es.aau.dk/kh/miniconda3/bin/python 569MiB |
+-----------------------------------------------------------------------------+
Output of /usr/local/cuda/bin/nvcc --version:
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2019 NVIDIA Corporation
Built on Sun_Jul_28_19:07:16_PDT_2019
Cuda compilation tools, release 10.1, V10.1.243
Running RAPIDS version 0.14.0.
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
ddf_crypto_market_entries = ... # load from disk
assert type(ddf_crypto_market_entries) == dask_cudf.core.DataFrame
ddf_crypto_self_merged = ddf_crypto_market_entries.merge(ddf_crypto_market_entries, on='cc')
ddf_crypto_self_merged.compute()
With output:
MemoryError Traceback (most recent call last)
<ipython-input-42-15ade0ed6518> in <module>
----> 1 ddf_crypto_self_merged.compute()
~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
442 postcomputes.append(x.__dask_postcompute__())
443
--> 444 results = schedule(dsk, keys, **kwargs)
445 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
446
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2680 should_rejoin = False
2681 try:
-> 2682 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2683 finally:
2684 for f in futures.values():
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1980 direct=direct,
1981 local_worker=local_worker,
-> 1982 asynchronous=asynchronous,
1983 )
1984
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
830 else:
831 return sync(
--> 832 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
833 )
834
~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
~/miniconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1839 exc = CancelledError(key)
1840 else:
-> 1841 raise exception.with_traceback(traceback)
1842 raise exc
1843 if errors == "skip":
~/miniconda3/lib/python3.7/site-packages/dask/dataframe/multi.py in merge_chunk()
232 def merge_chunk(lhs, *args, **kwargs):
233 empty_index_dtype = kwargs.pop("empty_index_dtype", None)
--> 234 out = lhs.merge(*args, **kwargs)
235 # Workaround pandas bug where if the output result of a merge operation is
236 # an empty dataframe, the output index is `int64` in all cases, regardless
~/miniconda3/lib/python3.7/contextlib.py in inner()
72 def inner(*args, **kwds):
73 with self._recreate_cm():
---> 74 return func(*args, **kwds)
75 return inner
76
~/miniconda3/lib/python3.7/site-packages/cudf/core/dataframe.py in merge()
3280 method,
3281 indicator,
-> 3282 suffixes,
3283 )
3284 return gdf_result
~/miniconda3/lib/python3.7/site-packages/cudf/core/frame.py in _merge()
1502 suffixes,
1503 )
-> 1504 to_return = mergeop.perform_merge()
1505
1506 # If sort=True, Pandas would sort on the key columns in the
~/miniconda3/lib/python3.7/site-packages/cudf/core/join/join.py in perform_merge()
116 right_on=self.right_on,
117 left_index=self.left_index,
--> 118 right_index=self.right_index,
119 )
120 result = self.out_class._from_table(libcudf_result)
cudf/_lib/join.pyx in cudf._lib.join.join()
cudf/_lib/join.pyx in cudf._lib.join.join()
MemoryError: std::bad_alloc: CUDA error at: /conda/conda-bld/librmm_1591196551527/work/include/rmm/mr/device/cuda_memory_resource.hpp66: cudaErrorMemoryAllocation out of memory
@Divyanshupy , in your example you are doing a series of chained left outer joins.
@kdhageman , it looks like you are doing a self-join, which usually causes significant expansion as well.
Without seeing the actual data, it's quite possible that these outer and self joins are expanding your dataframe such that it cannot fit on a single GPU. I would recommend instead you use persist rather than compute.
Please see the second half of this comment as well, which may (hopefully) provide a solution: https://github.com/rapidsai/cudf/issues/5829#issuecomment-668062379
Without seeing the actual data, it's quite possible that these outer and self joins are expanding your dataframe such that it cannot fit on a single GPU. I would recommend instead you use
persistrather thancompute.
It looks like in the above stacktraces the OOM error is happening in the worker computation as opposed to in transferring to the client process where using .persist() will likely still lead to OOM errors. If this is the case and the problem can't be reworked to lower memory usage, I would suggest using the device_memory_limit option of LocalCUDACluster (https://dask-cuda.readthedocs.io/en/latest/api.html#dask_cuda.LocalCUDACluster) to allow it to automatically handle spilling out of GPU memory.
@kkraus14
To give an indication of the the size of the Dataframe that I self-join:
>>> ddf_crypto_market_entries.compute().shape
(873343, 2)
I tried setting the control of the GPU memory spilling by using LocalCUDACluster(device_memory_limit='n MiB'), with various settings of n (i.e. 100, 1000 and 10000), but with all of these values, compute() resulted in the same out of memory exception.
@beckernick
I'm afraid I am not fully understanding how persist works compared to compute (after reading the docs), hopefully you don't mind elaborating.
It seems that compute returns a concrete result immediately, whereas this is different for persist. When persisting the previously computed Dataframe as follows:
result = ddf_crypto_self_merged.persist()
the result variable is of type dask_cudf.core.DataFrame and
>>> result.dask
<dask.highlevelgraph.HighLevelGraph at 0x7f2685263550>
How do I get concrete results out of this result object? I've looked at the Dask docs, but it seems that the cudf dask implementation does not return similar types of Future objects the way Dask seems to do.
cluster = LocalCUDACluster() will create a separate worker process for each GPU in your system, 4 in this case.
Calling .persist() will instruct the workers to perform computation on the task graph they've been given and then hold onto the result.
Calling .compute() will instruct the workers to transfer their result to the local client process, i.e. the process where you're defining your control flow. Typically you only want to do this with very small amounts of data at the end of your workflow.
Based on your above example, I would remove the .compute() call so that the CSV writing is handled by Dask on the workers instead of in the client. If you're still running into memory issues, I would suggest using the device_memory_limit property mentioned above, and if you're still running into memory issues after that I would suggest increasing the partitioning in the read_csv calls to more than 4 so that the partitioned pieces are smaller which would allow for more granular spilling.
Putting all of this together:
cluster = LocalCUDACluster()
# cluster = LocalCUDACluster(device_memory_limit="6GB") # Do this second to allow spilling from GPU memory to host memory in the workers
client = Client(cluster)
sh = dask_cudf.read_csv(path_sh, npartitions = 4)
al = dask_cudf.read_csv(path_al, npartitions = 4)
pn = dask_cudf.read_csv(path_pn, npartitions = 4)
uk = dask_cudf.read_csv(path_uk, npartitions = 4)
# Do this third to reduce the size of each piece of work
# sh = dask_cudf.read_csv(path_sh, npartitions = 8)
# al = dask_cudf.read_csv(path_al, npartitions = 8)
# pn = dask_cudf.read_csv(path_pn, npartitions = 8)
# uk = dask_cudf.read_csv(path_uk, npartitions = 8)
sh = sh[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
pn = pn[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
uk = uk[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
al = al[['ID', 'Label', 'FramesAnnotated', 'TotalFrames']]
def merge_mine(left,right,suffixes):
merge = left.merge(right,on='ID',how='outer',suffixes=suffixes)
return merge
m1 = merge_mine(sh,al,suffixes=('_sh','_al'))
m2 = merge_mine(m1,uk,suffixes=('_m1','_uk'))
m3 = merge_mine(m2,pn,suffixes=('_m2','_pn'))
m3.compute().to_csv('/Data')
# m3.to_csv('/Data') # Do this first as this makes all of the work properly run on the workers
@kkraus14 thanks for the response.
I have tinkered with various settings of npartitions and device_memory_limit. It seems that LocalCUDACluster() does not respect the device_memory_limit parameter. Code snippet:
>>> cluster = LocalCUDACluster(device_memory_limit="10GB")
>>> client = Client(cluster)
>>> client
Client
Scheduler: tcp://127.0.0.1:35839
Dashboard: http://127.0.0.1:32923/status
Cluster
Workers: 3
Cores: 3
Memory: 1.62 TB
When spinning up a Dask cluster from the terminal instead (using dask-scheduler and dask-worker --memory-limit=10GB), the following is happening in Python:
>>> client = Client(address="localhost:8786")
Client
Scheduler: tcp://127.0.0.1:8786
Dashboard: http://127.0.0.1:37873/status
Cluster
Workers: 3
Cores: 3
Memory: 30.00 GB
With this terminal-started Dask cluster, and with DataFrame with the npartitions=3000 parameter, the computation seems to succeed now 馃コ :
>>> result = ddf_crypto_self_merged.persist()
>>> result.dask.values()
[<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 0)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 1)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 2)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 3)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 4)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 5)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 6)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 7)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 8)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 9)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 10)>,
<Future: pending, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 11)>,
<Future: finished, type: cudf.DataFrame, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 12)>,
<Future: finished, type: cudf.DataFrame, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 13)>,
<Future: finished, type: cudf.DataFrame, key: ('drop_by_shallow_copy-5eefecbbf971d04e1179e03ddd745955', 14)>,
...
]
As follow-up to my previous comment, running nvidia-smi reveals a very low usage of the GPUs (note that this is measured during the computation):
Every 1.0s: nvidia-smi nv-ai-01.srv.aau.dk: Wed Aug 19 11:56:46 2020
Wed Aug 19 11:56:46 2020
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 418.126.02 Driver Version: 418.126.02 CUDA Version: 10.1 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla V100-SXM3... On | 00000000:34:00.0 Off | 0 |
| N/A 34C P0 66W / 350W | 2101MiB / 32480MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla V100-SXM3... On | 00000000:36:00.0 Off | 0 |
| N/A 33C P0 68W / 350W | 2198MiB / 32480MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla V100-SXM3... On | 00000000:39:00.0 Off | 0 |
| N/A 40C P0 66W / 350W | 908MiB / 32480MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| 0 44145 C /user/es.aau.dk/kh/miniconda3/bin/python 1097MiB |
| 0 77519 C /user/es.aau.dk/kh/miniconda3/bin/python 983MiB |
| 1 77583 C /user/es.aau.dk/kh/miniconda3/bin/python 2187MiB |
| 2 77614 C /user/es.aau.dk/kh/miniconda3/bin/python 897MiB |
+-----------------------------------------------------------------------------+
Any explanation for/insight into this?
I'm seeing the exact same issue in 0.15 (nightly).
The following code:
# notebook cell 1
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES=[0, 1, 2, 3])
from dask.distributed import Client, wait
client = Client(cluster)
# notebook cell 2
import dask_cudf
ddf = dask_cudf.read_parquet(f'/mnt/data/2019-taxi-dataset/')
ddf = ddf.repartition(npartitions=120) # optional
ddf = ddf.persist()
wait(ddf)
Completely saturates just one of my four GPUs:
# !nvidia-smi
Wed Aug 19 23:20:17 2020
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.82 Driver Version: 440.82 CUDA Version: 10.2 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla V100-SXM2... Off | 00000000:00:1B.0 Off | 0 |
| N/A 48C P0 55W / 300W | 16142MiB / 16160MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla V100-SXM2... Off | 00000000:00:1C.0 Off | 0 |
| N/A 42C P0 37W / 300W | 11MiB / 16160MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla V100-SXM2... Off | 00000000:00:1D.0 Off | 0 |
| N/A 41C P0 40W / 300W | 11MiB / 16160MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 3 Tesla V100-SXM2... Off | 00000000:00:1E.0 Off | 0 |
| N/A 44C P0 42W / 300W | 11MiB / 16160MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
Attempting to compute a value that takes up a trivial amount of memory results in an OOM error:
ddf.passenger_count.value_counts().compute()
MemoryError: std::bad_alloc: CUDA error at: ../include/rmm/mr/device/cuda_memory_resource.hpp:68: cudaErrorMemoryAllocation out of memory
I am seeing the same behavior that @kdhageman is seeing in terms of device_memory_limit having seemingly no effect on actual GPU utilization.
To me it really looks like LocalCUDACluster is borked somehow, e.g. it isn't actually successfully moving the user off of the default one-GPU mode. My understanding is that it should be giving me something that looks like this.
There is a bug in numba 0.51.0 which causes dask-cuda to not be able to properly target multiple GPUs. We have updated our numba version pinning to address the issue: https://github.com/rapidsai/dask-cuda/pull/385 but in the meantime I would suggest downgrading to numba 0.50.0.
As follow-up to my previous comment, running
nvidia-smireveals a very low usage of the GPUs (note that this is measured _during_ the computation):Every 1.0s: nvidia-smi nv-ai-01.srv.aau.dk: Wed Aug 19 11:56:46 2020 Wed Aug 19 11:56:46 2020 +-----------------------------------------------------------------------------+ | NVIDIA-SMI 418.126.02 Driver Version: 418.126.02 CUDA Version: 10.1 | |-------------------------------+----------------------+----------------------+ | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | |===============================+======================+======================| | 0 Tesla V100-SXM3... On | 00000000:34:00.0 Off | 0 | | N/A 34C P0 66W / 350W | 2101MiB / 32480MiB | 0% Default | +-------------------------------+----------------------+----------------------+ | 1 Tesla V100-SXM3... On | 00000000:36:00.0 Off | 0 | | N/A 33C P0 68W / 350W | 2198MiB / 32480MiB | 0% Default | +-------------------------------+----------------------+----------------------+ | 2 Tesla V100-SXM3... On | 00000000:39:00.0 Off | 0 | | N/A 40C P0 66W / 350W | 908MiB / 32480MiB | 0% Default | +-------------------------------+----------------------+----------------------+ +-----------------------------------------------------------------------------+ | Processes: GPU Memory | | GPU PID Type Process name Usage | |=============================================================================| | 0 44145 C /user/es.aau.dk/kh/miniconda3/bin/python 1097MiB | | 0 77519 C /user/es.aau.dk/kh/miniconda3/bin/python 983MiB | | 1 77583 C /user/es.aau.dk/kh/miniconda3/bin/python 2187MiB | | 2 77614 C /user/es.aau.dk/kh/miniconda3/bin/python 897MiB | +-----------------------------------------------------------------------------+Any explanation for/insight into this?
With 3000 partitions, you've likely broken down the problem into such small pieces that there isn't enough work in each piece to saturate a GPU. I would try increasing the number of partitions, as well as moving where you set the partitioning. I.E. instead of increasing the partitioning in the read_csv call, allow that to work on a larger number of partitions to better use the GPU, and then call a repartition after that. Dask has some more docs on partitioning here: https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead
This has been answered and the Numba issues are resolved so I'm closing this. Feel free to open a new issue if there's further issues.
Most helpful comment
@Divyanshupy , in your example you are doing a series of chained left outer joins.
@kdhageman , it looks like you are doing a self-join, which usually causes significant expansion as well.
Without seeing the actual data, it's quite possible that these outer and self joins are expanding your dataframe such that it cannot fit on a single GPU. I would recommend instead you use
persistrather thancompute.Please see the second half of this comment as well, which may (hopefully) provide a solution: https://github.com/rapidsai/cudf/issues/5829#issuecomment-668062379