Cudf: [FEA] dask-cudf doesn't support "corr"/correlation function like Pandas and cuDF

Created on 12 Nov 2019  ยท  28Comments  ยท  Source: rapidsai/cudf

When attempting to perform a correlation like sales_corr = sales['pr_review_rating', 'count'].corr(sales['pr_review_rating', 'mean']) dask-cudf fails with the following error.

TypeError: cannot concatenate object of type "<class 'cudf.core.series.Series'>"; only pd.Series, pd.DataFrame, and pd.Panel (deprecated) objs are valid

It seems that we might limit this currently. However dask-cudf should behave exactly like cuDF and Pandas. https://github.com/rapidsai/cudf/blob/4613ba821e4ed03a2db744f2c0bb0959fd450191/python/dask_cudf/dask_cudf/backends.py#L30-L33

cuDF (Python) dask feature request

Most helpful comment

The shape= argument was introduced by me in NumPy and CuPy to address exactly that shortcoming. It's the only special case for array creation with __array_function__. The *_like functions will allow dispatching via __array_function__ according to the first argument (if NumPy, dispatch to NumPy itself, if CuPy dispatch to CuPy, etc.), and the new shape= argument allows us to create an arbitrarily-shaped array with the correct array type, which wasn't possible before.

All 28 comments

Reproducer in a dev-nightly container as of 11/12/19:

import dask_cudf
import cudf
โ€‹
df = cudf.datasets.randomdata(10)
ddf = dask_cudf.from_cudf(df, 2)
ddf.x.corr(ddf.y)
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-3-4ce11b32272a> in <module>
      4 df = cudf.datasets.randomdata(10)
      5 ddf = dask_cudf.from_cudf(df, 2)
----> 6 ddf.x.corr(ddf.y)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in corr(self, other, method, min_periods, split_every)
   2978         if method != "pearson":
   2979             raise NotImplementedError("Only Pearson correlation has been implemented")
-> 2980         df = concat([self, other], axis=1)
   2981         return cov_corr(
   2982             df, min_periods, corr=True, scalar=True, split_every=split_every

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/multi.py in concat(dfs, axis, join, interleave_partitions)
   1046     if axis == 1:
   1047         if all(df.known_divisions for df in dasks):
-> 1048             return concat_indexed_dataframes(dfs, axis=axis, join=join)
   1049         elif (
   1050             len(dasks) == len(dfs)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/multi.py in concat_indexed_dataframes(dfs, axis, join)
    884     warn = axis != 0
    885     meta = methods.concat(
--> 886         [df._meta for df in dfs], axis=axis, join=join, filter_warning=warn
    887     )
    888     empties = [strip_unknown_categories(df._meta) for df in dfs]

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/methods.py in concat(dfs, axis, join, uniform, filter_warning)
    354         func = concat_dispatch.dispatch(type(dfs[0]))
    355         return func(
--> 356             dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
    357         )
    358 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf(dfs, axis, join, uniform, filter_warning, sort)
     31     dfs, axis=0, join="outer", uniform=False, filter_warning=True, sort=None
     32 ):
---> 33     assert axis == 0
     34     assert join == "outer"
     35     return cudf.concat(dfs)

AssertionError: 

In the following code:

https://github.com/dask/dask/blob/9c72876af571ef90ecad5df7c8bd123b06220305/dask/dataframe/core.py#L5124-L5134

It looks like we need to change the np.zeros functions to np.zeros_like to respect the cupy array values.

However, it looks like this function might also use record arrays, which may not be supported in cupy. We can probably have this function return a tuple/list/dict instead.

@rjzamora any interest in diving in here?

@rjzamora any interest in diving in here?

Sure - as long as you are okay with my suggestion here :)

Re-opening this due to new information and cross-linking.

This is partially dependent on dask/dask#5643 , but also currently fails for me. In a small test, I end up with a different error from dask/dask#5643 . This test can get past that issue because the dataframes have known divisions, which leads to the generalized concat_indexed_dataframes during the concat. Instead, here, we end up in a codepath that tries to execute lambda self, other: elemwise(op, self, other), where the operation is __ne__ , self is the dask dataframe, and other is the string pearson. This binary op ends up causing a cuDF error way down the call stack due to this binary op.

Example:

import cudf
import dask_cudf
d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
d2 = cudf.datasets.randomdata(100, dtypes={"c":float, "d":float})
dd1 = dask_cudf.from_cudf(d1, 5)
dd2 = dask_cudf.from_cudf(d2, 5)
dd1.corr(dd2)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    168     try:
--> 169         yield
    170     except Exception as e:
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
   4421         with raise_on_meta_error(funcname(op)):
-> 4422             meta = partial_by_order(*parts, function=op, other=other)
   4423 
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/utils.py in partial_by_order(*args, **kwargs)
   1075         args2.insert(i, arg)
-> 1076     return function(*args2, **kwargs)
   1077 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py in __ne__(self, other)
    893     def __ne__(self, other):
--> 894         return self._apply_op("__ne__", other)
    895 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py in _apply_op(self, fn, other, fill_value)
    754             for k, col in enumerate(self._cols):
--> 755                 result[col] = getattr(self._cols[col], fn)(other[k])
    756         elif isinstance(other, DataFrame):
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in __ne__(self, other)
   1000     def __ne__(self, other):
-> 1001         return self._unordered_compare(other, "ne")
   1002 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in _unordered_compare(self, other, cmpops)
    948         result_name = utils.get_result_name(self, other)
--> 949         other = self._normalize_binop_value(other)
    950         outcol = self._column.unordered_compare(cmpops, other)
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in _normalize_binop_value(self, other)
    943         else:
--> 944             return self._column.normalize_binop_value(other)
    945 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/column/numerical.py in normalize_binop_value(self, other)
    139         else:
--> 140             raise TypeError("cannot broadcast {}".format(type(other)))
    141 
TypeError: cannot broadcast <class 'str'>
During handling of the above exception, another exception occurred:
ValueError                                Traceback (most recent call last)
<ipython-input-36-b0dbaa32cc44> in <module>
      6 dd1 = dask_cudf.from_cudf(d1, 5)
      7 dd2 = dask_cudf.from_cudf(d2, 5)
----> 8 dd1.corr(dd2)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in corr(self, method, min_periods, split_every)
   4013     @derived_from(pd.DataFrame)
   4014     def corr(self, method="pearson", min_periods=None, split_every=False):
-> 4015         if method != "pearson":
   4016             raise NotImplementedError("Only Pearson correlation has been implemented")
   4017         return cov_corr(self, min_periods, True, split_every=split_every)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in <lambda>(self, other)
   1333             return lambda self, other: elemwise(op, other, self)
   1334         else:
-> 1335             return lambda self, other: elemwise(op, self, other)
   1336 
   1337     def rolling(self, window, min_periods=None, center=False, win_type=None, axis=0):
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
   4420         ]
   4421         with raise_on_meta_error(funcname(op)):
-> 4422             meta = partial_by_order(*parts, function=op, other=other)
   4423 
   4424     result = new_dd_object(graph, _name, meta, divisions)
/opt/conda/envs/rapids/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
     97                 value = type()
     98             try:
---> 99                 self.gen.throw(type, value, traceback)
    100             except StopIteration as exc:
    101                 # Suppress StopIteration *unless* it's the same exception that
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    188         )
    189         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 190         raise ValueError(msg)
    191 
    192 
ValueError: Metadata inference failed in `ne`.
Original error is below:
------------------------
TypeError("cannot broadcast <class 'str'>",)
Traceback:
---------
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py", line 169, in raise_on_meta_error
    yield
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py", line 4422, in elemwise
    meta = partial_by_order(*parts, function=op, other=other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/utils.py", line 1076, in partial_by_order
    return function(*args2, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py", line 894, in __ne__
    return self._apply_op("__ne__", other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py", line 755, in _apply_op
    result[col] = getattr(self._cols[col], fn)(other[k])
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 1001, in __ne__
    return self._unordered_compare(other, "ne")
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 949, in _unordered_compare
    other = self._normalize_binop_value(other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 944, in _normalize_binop_value
    return self._column.normalize_binop_value(other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/column/numerical.py", line 140, in normalize_binop_value
    raise TypeError("cannot broadcast {}".format(type(other)))

@rjzamora , thanks for pointing out that the above example shouldn't work. It's incorrectly using dataframes, which likely explains binaryop issue. This is representative of the workflow (correlation of two columns:

import cudf
import dask_cudf
โ€‹
d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
dd1 = dask_cudf.from_cudf(d1, 5)
dd1.a.corr(dd1.b).compute()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-2-7816cf3671a0> in <module>
      4 d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
      5 dd1 = dask_cudf.from_cudf(d1, 5)
----> 6 dd1.a.corr(dd1.b).compute()

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     79         get_id=_thread_get_id,
     80         pack_exception=pack_exception,
---> 81         **kwargs
     82     )
     83 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in cov_corr_chunk(df, corr)
   5147             )
   5148             mu_discrepancy[mask] = np.nan
-> 5149             m[idx] = np.nansum(mu_discrepancy, axis=0)
   5150         m = m.T
   5151         dtype.append(("m", m.dtype))

<__array_function__ internals> in nansum(*args, **kwargs)

TypeError: no implementation found for 'numpy.nansum' on types that implement __array_function__: [<class 'cupy.core.core.ndarray'>]

CuPy just recently implemented nansum in v7.0, which is ahead of ours. I'll test this the master branch of CuPy, and report back. Thanks for looking into it.

CuPy just recently implemented nansum in v7.0, which is ahead of ours. I'll test this the master branch of CuPy, and report back. Thanks for looking into it.

Right, im using '7.0.0rc1', and this seems to run fine

Love it @rjzamora . That solves the problem. Let's leave this issue open for tracking purposes and keep an eye on when CuPy v7.0 is considered the "stable" release and then evaluate updating our CuPy dependency accordingly.

Brief summary of the current status below:

In a vanilla cuDF / Dask environment, we cannot use Dask-cuDF's correlation. To use correlation on indexed dataframes, we need the following:

  • NumPy > 1.17
  • CuPy > 7.0

The NumPy upgrade can be done via conda, and the CuPy upgrade is best done by cloning the repository and then running pip install . from the cupy directory.

To use correlation on unindexed dataframes, we also need:

  • Dask master (for this new feature https://github.com/dask/dask/pull/5659)

Since general usage can include both, the functional requirements are:

  • NumPy > 1.17
  • CuPy > 7.0
  • Dask master at least as recent as 2019-12-03

@kkraus14 @shwina , do you have any strong feelings on updating our dependency to NumPy >= 1.17? I suspect Dask will take care of itself and CuPy v7.0 is not yet considered the stable release.

We haven't pinned a numpy version in general for cuDF so I'm somewhat hesitant to require such a new version. What chunk of code requires numpy > 1.17? I'm fine with upgrading to Cupy >= 7.0 once there's a conda-forge release.

Ah, I guess I always just get 1.16 from another dependency then. Good to know.

This is the chunk of code that necessitates it, currently:

https://github.com/dask/dask/blob/efadf5a60b6f469099b2f63f704b80448f21c872/dask/array/utils.py#L325-L334

Without numpy > 1.17, we don't currently leverage duck typing to create a CuPy internal to Dask's cov_corr_chunk function. Thus, a slice of a numpy array ends up trying to be set with a CuPy array coming from a .values call on a cuDF object.

https://github.com/dask/dask/blob/efadf5a60b6f469099b2f63f704b80448f21c872/dask/array/utils.py#L325-L334

From my perspective this should be a dispatched function to allow for CuPy/Numpy agnostic behavior. @mrocklin @pentschev do you agree?

We've been relying on the __array_function__ protocol for Numpy/Cupy agnostics behavior. I'm somewhat against creating an internal dispatching layer for Numpy within Dask Array if we can avoid it.

I suspect that the 1.17 requirement that @beckernick is referring to is just that that is when __array_function__ becomes on by default.

To complement @mrocklin 's comment, for that functionality we need CuPy >= 6.4.0 only, and we can still use NumPy 1.16 if people are fine with setting the NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 environment variable (which I assume users will not be).

I think this is something different than __array_function__ protocol, because this is a function that allocates a numpy array. As far as I'm aware there's no way to override this as of yet, though the community is talking about what that would look like.

To complement @mrocklin 's comment, for that functionality we need CuPy >= 6.4.0 only, and we can still use NumPy 1.16 if people are fine with setting the NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 environment variable (which I assume users will not be).

I'm happy to enforce numpy >= 1.17 for cudf then, but does the zeros_like function allow for allocating a Cupy array if a cupy array is passed in?

cc @rjzamora , too.

My understanding is consistent with Keith's. The NEP-18 flag doesn't solve the problem because if zeros_like in the NumPy namespace does not have the shape argument, we default to a pure np.zeros(shape), which will allocate a numpy array. The shape argument was not added until 1.17

@pentschev I believe we also need CuPy >7.0 in order to dispatch to cupy.nansum internal to the cov_cor_chunk function, which doesnt exist until v7.0.

Confirmed this will work as expected, so no need for a Dask dispatch, sorry for false alarm ๐Ÿ˜…

In [1]: import numpy as np

In [2]: import cupy

In [3]: test = cupy.zeros(10)

In [4]: type(np.zeros_like(test, shape=5))
Out[4]: cupy.core.core.ndarray

zeros_like allows for CuPy array allocation:

import numpy as np
import cupy as cp
โ€‹
garr = cp.array(range(5))
print(type(np.zeros_like(garr)))
<class 'cupy.core.core.ndarray'>

But the issue is that without the shape parameter we switch codepaths, currently.

The shape= argument was introduced by me in NumPy and CuPy to address exactly that shortcoming. It's the only special case for array creation with __array_function__. The *_like functions will allow dispatching via __array_function__ according to the first argument (if NumPy, dispatch to NumPy itself, if CuPy dispatch to CuPy, etc.), and the new shape= argument allows us to create an arbitrarily-shaped array with the correct array type, which wasn't possible before.

As long as we enforce a sufficiently new numpy and CuPy version all should be well which I'm perfectly happy to do for cuDF.

Now that CuPy v7.0 is officially released, are we ready to undo the CuPy version restriction from https://github.com/rapidsai/cudf/pull/3539 and also enforce NumPy > 1.17?

I believe that would officially close this issue. cc @randerzander

CuPy 7.0 is available from conda-forge

We now support CuPy 7.0.0 ( https://github.com/rapidsai/cudf/pull/3619 ) ๐Ÿ™‚

This should be resolved as of now with CuPy >= 7.

@kkraus14 are there unit tests covering that? If not maybe it is worth to keep this issue open just for adding unit tests, so eventual regression in future can be spotted. Any idea if it works in groupby already? Or that should be a separate issue?

It's important to note that this failure was the product of an issue with cuDF and upstream libraries. That said, IIRC @rjzamora included a fix to cuDF and to Dask both of which include tests. @pentschev also implemented nansum in CuPy, which has its own test. So I think this is covered pretty well. That said, if there is another test you would like to add, I think that would be happily accepted ๐Ÿ™‚

@jakirkham thanks for clarifying. It was not obvious from reading this thread. It is always useful to refer the issue from commit or PR so it is clear.

Was this page helpful?
0 / 5 - 0 ratings