I'm doing a multi-column groupby to aggregate and reduce the number of rows in my DataFrame before performing a merge.
I can do this with cudf, as well as with pandas and dask, but not dask_cudf.
Repro:
import cudf, dask_cudf
df = cudf.DataFrame()
df['id_1'] = ['a', 'a', 'b']
df['id_2'] = [0, 0, 1]
df['val'] = [1, 2, 3]
df_lookup = cudf.DataFrame()
df_lookup['id_1'] = ['a', 'b']
df_lookup['metadata'] = [0, 1]
# works
df.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(df_lookup, on='id_1')
ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf_lookup = dask_cudf.from_cudf(df_lookup, npartitions=2)
# fails
ddf.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(ddf_lookup, on='id_1').compute()
Result:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/column/column.py in as_column(arbitrary, nan_as_null, dtype, name)
1383 data = as_column(
-> 1384 memoryview(arbitrary), dtype=dtype, nan_as_null=nan_as_null
1385 )
TypeError: memoryview: a bytes-like object is required, not 'MultiIndex'
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
169 try:
--> 170 yield
171 except Exception as e:
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
4725 with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 4726 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
4727
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty)
4705 elif isinstance(x, tuple):
-> 4706 return tuple([_extract_meta(_x, nonempty) for _x in x])
4707 elif isinstance(x, dict):
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in <listcomp>(.0)
4705 elif isinstance(x, tuple):
-> 4706 return tuple([_extract_meta(_x, nonempty) for _x in x])
4707 elif isinstance(x, dict):
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty)
4701 if isinstance(x, (Scalar, _Frame)):
-> 4702 return x._meta_nonempty if nonempty else x._meta
4703 elif isinstance(x, list):
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _meta_nonempty(self)
343 """ A non-empty version of `_meta` with fake data."""
--> 344 return meta_nonempty(self._meta)
345
/conda/envs/rapids/lib/python3.7/site-packages/dask/utils.py in __call__(self, arg, *args, **kwargs)
485 meth = self.dispatch(type(arg))
--> 486 return meth(arg, *args, **kwargs)
487
/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0.10.0a0+800.g6407f5065-py3.7.egg/dask_cudf/backends.py in meta_nonempty_cudf(x, index)
15 y = meta_nonempty(x.to_pandas()) # TODO: add iloc[:5]
---> 16 return cudf.from_pandas(y)
17
/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/dataframe.py in from_pandas(obj)
3725 elif isinstance(obj, pd.Series):
-> 3726 return Series.from_pandas(obj)
3727 elif isinstance(obj, pd.MultiIndex):
/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py in from_pandas(cls, s, nan_as_null)
132 def from_pandas(cls, s, nan_as_null=True):
--> 133 return cls(s, nan_as_null=nan_as_null)
134
/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py in __init__(self, data, index, name, nan_as_null, dtype)
93 name = data.name
---> 94 index = as_index(data.index)
95 elif isinstance(data, pd.Index):
/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/index.py in as_index(arbitrary, **kwargs)
928 else:
--> 929 return as_index(column.as_column(arbitrary), **kwargs)
930
/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/column/column.py in as_column(arbitrary, nan_as_null, dtype, name)
1400 data = as_column(
-> 1401 pa.array(arbitrary, type=pa_type, from_pandas=nan_as_null),
1402 dtype=dtype,
/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib.array()
/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib.get_series_values()
/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/pandas-shim.pxi in pyarrow.lib._PandasAPIShim.make_series()
/conda/envs/rapids/lib/python3.7/site-packages/pandas/core/series.py in __init__(self, data, index, dtype, name, copy, fastpath)
176 if isinstance(data, MultiIndex):
--> 177 raise NotImplementedError("initializing a Series from a "
178 "MultiIndex is not supported")
NotImplementedError: initializing a Series from a MultiIndex is not supported
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-47-1f4b497560ca> in <module>
2 ddf_lookup = dask_cudf.from_cudf(df_lookup, npartitions=2)
3
----> 4 ddf.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(ddf_lookup, on='id_2').compute()
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in reset_index(self, drop)
458 Do not try to insert index into dataframe columns.
459 """
--> 460 return self.map_partitions(M.reset_index, drop=drop).clear_divisions()
461
462 @property
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs)
618 >>> ddf.map_partitions(func).clear_divisions() # doctest: +SKIP
619 """
--> 620 return map_partitions(func, self, *args, **kwargs)
621
622 @insert_meta_param_description(pad=12)
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in map_partitions(func, *args, **kwargs)
4765 # Use non-normalized kwargs here, as we want the real values (not
4766 # delayed values)
-> 4767 meta = _emulate(func, *args, udf=True, **kwargs)
4768 else:
4769 meta = make_meta(meta, index=meta_index)
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
4724 """
4725 with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 4726 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
4727
4728
/conda/envs/rapids/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
128 value = type()
129 try:
--> 130 self.gen.throw(type, value, traceback)
131 except StopIteration as exc:
132 # Suppress StopIteration *unless* it's the same exception that
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
189 )
190 msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 191 raise ValueError(msg)
192
193
ValueError: Metadata inference failed in `reset_index`.
You have supplied a custom function and Dask is unable to
determine the type of output that that function returns.
To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.
Original error is below:
------------------------
NotImplementedError('initializing a Series from a MultiIndex is not supported')
Traceback:
---------
File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py", line 170, in raise_on_meta_error
yield
File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4726, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4706, in _extract_meta
return tuple([_extract_meta(_x, nonempty) for _x in x])
File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4706, in <listcomp>
return tuple([_extract_meta(_x, nonempty) for _x in x])
File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 4702, in _extract_meta
return x._meta_nonempty if nonempty else x._meta
File "/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 344, in _meta_nonempty
return meta_nonempty(self._meta)
File "/conda/envs/rapids/lib/python3.7/site-packages/dask/utils.py", line 486, in __call__
return meth(arg, *args, **kwargs)
File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0.10.0a0+800.g6407f5065-py3.7.egg/dask_cudf/backends.py", line 16, in meta_nonempty_cudf
return cudf.from_pandas(y)
File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/dataframe.py", line 3726, in from_pandas
return Series.from_pandas(obj)
File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py", line 133, in from_pandas
return cls(s, nan_as_null=nan_as_null)
File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/series.py", line 94, in __init__
index = as_index(data.index)
File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/index.py", line 929, in as_index
return as_index(column.as_column(arbitrary), **kwargs)
File "/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/column/column.py", line 1401, in as_column
pa.array(arbitrary, type=pa_type, from_pandas=nan_as_null),
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 1566, in pyarrow.lib.get_series_values
File "pyarrow/pandas-shim.pxi", line 88, in pyarrow.lib._PandasAPIShim.make_series
File "/conda/envs/rapids/lib/python3.7/site-packages/pandas/core/series.py", line 177, in __init__
raise NotImplementedError("initializing a Series from a "
Working Dask/Pandas version:
import dask
# uses the cudf dfs created in repro snippet above
ddf = dask.dataframe.from_pandas(df.to_pandas(), npartitions=1)
ddf_lookup = dask.dataframe.from_pandas(df_lookup.to_pandas(), npartitions=1)
ddf.groupby(by=['id_1', 'id_2']).val.sum().reset_index().merge(ddf_lookup, on='id_1').compute()
@rjzamora Can you take a look into this? We're hitting this: https://github.com/rapidsai/cudf/blob/branch-0.10/python/dask_cudf/dask_cudf/backends.py#L13-L16 and it's erroring on round-tripping a Series with a MultiIndex.
We already handle the MultiIndex nicely for a DataFrame so I'm guessing there's just a bit of logic missing for the Series erroneously. Here's a minimal reproducer:
import pandas as pd
import cudf
pdf = pd.DataFrame({'a': [1,2,3], 'b': [4,5,6], 'c': [7,8,9]}).set_index(['a', 'b'])
test_pdf = pdf['c']
result = cudf.from_pandas(test_pdf)
# NotImplementedError: initializing a Series from a MultiIndex is not supported
No problem - I can take a good look tomorrow (tonight if I'm lucky).
Most helpful comment
No problem - I can take a good look tomorrow (tonight if I'm lucky).