Dask: Metadata inference failed in custom DataFrame aggregation

Created on 17 Sep 2018  Â·  4Comments  Â·  Source: dask/dask

This may be related to #2706 but just in case.

Supposing I have the following Pandas DataFrame (and I convert it to a Dask DataFrame):

import numpy as np
import pandas as pd
from dask import dataframe as dd

my_df = pd.DataFrame({"userid": ["john", "alice", "bob", "john", "bob", "john"],
                      "date": ["2017-01-01", "2017-02-01", "2017-04-01", "2017-01-01", "2017-02-01", "2017-02-01"],
                      "amount": [2.0, 5.0, 7.0, np.nan, np.nan, 6.0]})

my_df_dask = dd.from_pandas(my_df, npartitions=5)

I want to group by userid and date, and compute a custom aggregation over the groups:

# Custom aggregation function:
def nan_sum(array):
    if np.isnan(array).sum() == len(array):
        return np.nan
    else:
        return np.nansum(array)

# And we convert it to a Dask Aggregation:
nan_sum_dask = dd.Aggregation('custom_nan_sum', nan_sum, nan_sum)

I we try to execute it:

my_df_dask.groupby(["userid", "date"]).agg({"amount": nan_sum_dask})

The following exception is raised:

Exception                                 Traceback (most recent call last)
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname)
    136     try:
--> 137         yield
    138     except Exception as e:

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   3263     with raise_on_meta_error(funcname(func)):
-> 3264         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3265 

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in _groupby_apply_funcs(df, *index, **kwargs)
    634     for result_column, func, func_kwargs in funcs:
--> 635         r = func(grouped, **func_kwargs)
    636 

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in _apply_func_to_column(df_like, column, func)
    667 
--> 668     return func(df_like[column])
    669 

<ipython-input-20-07afb8c1b440> in nan_sum(array)
      1 def nan_sum(array):
----> 2     if np.isnan(array).sum() == len(array):
      3         return np.nan

~/anaconda3/lib/python3.6/site-packages/pandas/core/base.py in __getitem__(self, key)
    249             raise Exception('Column(s) {selection} already selected'
--> 250                             .format(selection=self._selection))
    251 

Exception: Column(s) amount already selected

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-21-fc8a04d0ed3d> in <module>()
----> 1 my_df_dask.groupby(["userid", "date"]).agg({"amount": nan_sum_dask})

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out)
   1150     @derived_from(pd.core.groupby.DataFrameGroupBy)
   1151     def agg(self, arg, split_every=None, split_out=1):
-> 1152         return self.aggregate(arg, split_every=split_every, split_out=split_out)
   1153 
   1154 

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1146             return self.size()
   1147 
-> 1148         return super(DataFrameGroupBy, self).aggregate(arg, split_every=split_every, split_out=split_out)
   1149 
   1150     @derived_from(pd.core.groupby.DataFrameGroupBy)

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1030                    ),
   1031                    token='aggregate', split_every=split_every,
-> 1032                    split_out=split_out, split_out_setup=split_out_on_index)
   1033 
   1034     @insert_meta_param_description(pad=12)

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, **kwargs)
   3220 
   3221     if meta is no_default:
-> 3222         meta_chunk = _emulate(chunk, *args, **chunk_kwargs)
   3223         meta = _emulate(aggregate, _concat([meta_chunk]),
   3224                         **aggregate_kwargs)

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   3262     """
   3263     with raise_on_meta_error(funcname(func)):
-> 3264         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3265 
   3266 

~/anaconda3/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
     97                 value = type()
     98             try:
---> 99                 self.gen.throw(type, value, traceback)
    100             except StopIteration as exc:
    101                 # Suppress StopIteration *unless* it's the same exception that

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname)
    148                ).format(" in `{0}`".format(funcname) if funcname else "",
    149                         repr(e), tb)
--> 150         raise ValueError(msg)
    151 
    152 

ValueError: Metadata inference failed in `_groupby_apply_funcs`.

Original error is below:
------------------------
Exception('Column(s) amount already selected',)

Traceback:
---------
  File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error
    yield
  File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py", line 3264, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py", line 635, in _groupby_apply_funcs
    r = func(grouped, **func_kwargs)
  File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py", line 668, in _apply_func_to_column
    return func(df_like[column])
  File "<ipython-input-20-07afb8c1b440>", line 2, in nan_sum
    if np.isnan(array).sum() == len(array):
  File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/pandas/core/base.py", line 250, in __getitem__
    .format(selection=self._selection))

Any ideas?

Thank you!

Most helpful comment

Oh just nevermind. The problem was in my dd.Aggregation object. I didn't know that the functions for the chunk and agg argument constructors need a pandas.core.groupby.groupby.SeriesGroupBy as the argument that enters those functions.

This means that

nan_sum_dask = dd.Aggregation('custom_nan_sum', 
                              lambda x: x.agg(nan_sum), 
                              lambda y: y.agg(nan_sum)
                              )

Does work.

So, the issue is solved :)

I will try to think about how to extend the documentation for this use case.

Thank you again @TomAugspurger!

All 4 comments

The exception Exception('Column(s) amount already selected',) is coming from pandas. It seems like your function is expecting an array and returning a scalar.

I think it should expect a SeriesGroupBy object, and return a Series.

I can take a closer look later if you're still having trouble. We need to improve the documentation on this.

Hmmm that's weird. If I happen to use my_df_dask.groupby(["userid", "date"]).agg(nan_sum_dask) instead of my_df_dask.groupby(["userid", "date"]).agg({"amount": nan_sum_dask}) it does work without any problem...

Hm that's too bad. It looks like Dask will need to be aware of whether an
Aggregation is being applied to a selected column... Any further debugging
you can do here would be appreciated.

On Tue, Sep 18, 2018 at 3:11 AM Julio Antonio Soto notifications@github.com
wrote:

Hmmm that's weird. If I happen to use my_df_dask.groupby(["userid",
"date"]).agg(nan_sum_dask) instead of my_df_dask.groupby(["userid",
"date"]).agg({"amount": nan_sum_dask}) it does work without any problem...

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/dask/issues/3990#issuecomment-422297928, or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABQHIp9-DcK7Z1PiphiKxY1qZ34hXEvVks5ucKqUgaJpZM4WsKm7
.

Oh just nevermind. The problem was in my dd.Aggregation object. I didn't know that the functions for the chunk and agg argument constructors need a pandas.core.groupby.groupby.SeriesGroupBy as the argument that enters those functions.

This means that

nan_sum_dask = dd.Aggregation('custom_nan_sum', 
                              lambda x: x.agg(nan_sum), 
                              lambda y: y.agg(nan_sum)
                              )

Does work.

So, the issue is solved :)

I will try to think about how to extend the documentation for this use case.

Thank you again @TomAugspurger!

Was this page helpful?
0 / 5 - 0 ratings