Cudf: [BUG] dask_cudf fails to reset_index on multi-column groupby result

Created on 28 Aug 2019  路  2Comments  路  Source: rapidsai/cudf

I'm doing a multi-column groupby to aggregate and reduce the number of rows in my DataFrame before performing a merge.

I can do this with cudf, as well as with pandas and dask, but not dask_cudf.

Repro:

import cudf, dask_cudf

df = cudf.DataFrame()
df['id_1'] = ['a', 'a', 'b']
df['id_2'] = [0, 0, 1]
df['val'] = [1, 2, 3]

df_lookup = cudf.DataFrame()
df_lookup['id_1'] = ['a', 'b']
df_lookup['metadata'] = [0, 1]

# works
df.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(df_lookup, on='id_1')

ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf_lookup = dask_cudf.from_cudf(df_lookup, npartitions=2)

# fails
ddf.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(ddf_lookup, on='id_1').compute()

Result:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/column/column.py in as_column(arbitrary, nan_as_null, dtype, name)
   1383             data = as_column(
-> 1384                 memoryview(arbitrary), dtype=dtype, nan_as_null=nan_as_null
   1385             )

TypeError: memoryview: a bytes-like object is required, not 'MultiIndex'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    169     try:
--> 170         yield
    171     except Exception as e:

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   4725     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 4726         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   4727 

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty)
   4705     elif isinstance(x, tuple):
-> 4706         return tuple([_extract_meta(_x, nonempty) for _x in x])
   4707     elif isinstance(x, dict):

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in <listcomp>(.0)
   4705     elif isinstance(x, tuple):
-> 4706         return tuple([_extract_meta(_x, nonempty) for _x in x])
   4707     elif isinstance(x, dict):

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty)
   4701     if isinstance(x, (Scalar, _Frame)):
-> 4702         return x._meta_nonempty if nonempty else x._meta
   4703     elif isinstance(x, list):

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _meta_nonempty(self)
    343         """ A non-empty version of `_meta` with fake data."""
--> 344         return meta_nonempty(self._meta)
    345 

/conda/envs/rapids/lib/python3.7/site-packages/dask/utils.py in __call__(self, arg, *args, **kwargs)
    485         meth = self.dispatch(type(arg))
--> 486         return meth(arg, *args, **kwargs)
    487 

/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0.10.0a0+800.g6407f5065-py3.7.egg/dask_cudf/backends.py in meta_nonempty_cudf(x, index)
     15     y = meta_nonempty(x.to_pandas())  # TODO: add iloc[:5]
---> 16     return cudf.from_pandas(y)
     17 

/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/dataframe.py in from_pandas(obj)
   3725     elif isinstance(obj, pd.Series):
-> 3726         return Series.from_pandas(obj)
   3727     elif isinstance(obj, pd.MultiIndex):

/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py in from_pandas(cls, s, nan_as_null)
    132     def from_pandas(cls, s, nan_as_null=True):
--> 133         return cls(s, nan_as_null=nan_as_null)
    134 

/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py in __init__(self, data, index, name, nan_as_null, dtype)
     93                 name = data.name
---> 94             index = as_index(data.index)
     95         elif isinstance(data, pd.Index):

/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/index.py in as_index(arbitrary, **kwargs)
    928     else:
--> 929         return as_index(column.as_column(arbitrary), **kwargs)
    930 

/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/column/column.py in as_column(arbitrary, nan_as_null, dtype, name)
   1400                 data = as_column(
-> 1401                     pa.array(arbitrary, type=pa_type, from_pandas=nan_as_null),
   1402                     dtype=dtype,

/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib.array()

/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib.get_series_values()

/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/pandas-shim.pxi in pyarrow.lib._PandasAPIShim.make_series()

/conda/envs/rapids/lib/python3.7/site-packages/pandas/core/series.py in __init__(self, data, index, dtype, name, copy, fastpath)
    176             if isinstance(data, MultiIndex):
--> 177                 raise NotImplementedError("initializing a Series from a "
    178                                           "MultiIndex is not supported")

NotImplementedError: initializing a Series from a MultiIndex is not supported

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-47-1f4b497560ca> in <module>
      2 ddf_lookup = dask_cudf.from_cudf(df_lookup, npartitions=2)
      3 
----> 4 ddf.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(ddf_lookup, on='id_2').compute()

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in reset_index(self, drop)
    458             Do not try to insert index into dataframe columns.
    459         """
--> 460         return self.map_partitions(M.reset_index, drop=drop).clear_divisions()
    461 
    462     @property

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs)
    618         >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP
    619         """
--> 620         return map_partitions(func, self, *args, **kwargs)
    621 
    622     @insert_meta_param_description(pad=12)

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in map_partitions(func, *args, **kwargs)
   4765         # Use non-normalized kwargs here, as we want the real values (not
   4766         # delayed values)
-> 4767         meta = _emulate(func, *args, udf=True, **kwargs)
   4768     else:
   4769         meta = make_meta(meta, index=meta_index)

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   4724     """
   4725     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 4726         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   4727 
   4728 

/conda/envs/rapids/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
    128                 value = type()
    129             try:
--> 130                 self.gen.throw(type, value, traceback)
    131             except StopIteration as exc:
    132                 # Suppress StopIteration *unless* it's the same exception that

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    189         )
    190         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 191         raise ValueError(msg)
    192 
    193 

ValueError: Metadata inference failed in `reset_index`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
NotImplementedError('initializing a Series from a MultiIndex is not supported')

Traceback:
---------
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py", line 170, in raise_on_meta_error
    yield
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4726, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4706, in _extract_meta
    return tuple([_extract_meta(_x, nonempty) for _x in x])
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4706, in <listcomp>
    return tuple([_extract_meta(_x, nonempty) for _x in x])
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4702, in _extract_meta
    return x._meta_nonempty if nonempty else x._meta
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 344, in _meta_nonempty
    return meta_nonempty(self._meta)
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/utils.py", line 486, in __call__
    return meth(arg, *args, **kwargs)
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0.10.0a0+800.g6407f5065-py3.7.egg/dask_cudf/backends.py", line 16, in meta_nonempty_cudf
    return cudf.from_pandas(y)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/dataframe.py", line 3726, in from_pandas
    return Series.from_pandas(obj)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py", line 133, in from_pandas
    return cls(s, nan_as_null=nan_as_null)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py", line 94, in __init__
    index = as_index(data.index)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/index.py", line 929, in as_index
    return as_index(column.as_column(arbitrary), **kwargs)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/column/column.py", line 1401, in as_column
    pa.array(arbitrary, type=pa_type, from_pandas=nan_as_null),
  File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 1566, in pyarrow.lib.get_series_values
  File "pyarrow/pandas-shim.pxi", line 88, in pyarrow.lib._PandasAPIShim.make_series
  File "/conda/envs/rapids/lib/python3.7/site-packages/pandas/core/series.py", line 177, in __init__
    raise NotImplementedError("initializing a Series from a "

Working Dask/Pandas version:

import dask

# uses the cudf dfs created in repro snippet above
ddf = dask.dataframe.from_pandas(df.to_pandas(), npartitions=1)
ddf_lookup = dask.dataframe.from_pandas(df_lookup.to_pandas(), npartitions=1)

ddf.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(ddf_lookup, on='id_1').compute()
bug cuDF (Python) dask

Most helpful comment

No problem - I can take a good look tomorrow (tonight if I'm lucky).

All 2 comments

@rjzamora Can you take a look into this? We're hitting this: https://github.com/rapidsai/cudf/blob/branch-0.10/python/dask_cudf/dask_cudf/backends.py#L13-L16 and it's erroring on round-tripping a Series with a MultiIndex.

We already handle the MultiIndex nicely for a DataFrame so I'm guessing there's just a bit of logic missing for the Series erroneously. Here's a minimal reproducer:

import pandas as pd
import cudf

pdf = pd.DataFrame({'a': [1,2,3], 'b': [4,5,6], 'c': [7,8,9]}).set_index(['a', 'b'])
test_pdf = pdf['c']

result = cudf.from_pandas(test_pdf)
# NotImplementedError: initializing a Series from a MultiIndex is not supported

No problem - I can take a good look tomorrow (tonight if I'm lucky).

Was this page helpful?
0 / 5 - 0 ratings