This may be related to #2706 but just in case.
Supposing I have the following Pandas DataFrame (and I convert it to a Dask DataFrame):
import numpy as np
import pandas as pd
from dask import dataframe as dd
my_df = pd.DataFrame({"userid": ["john", "alice", "bob", "john", "bob", "john"],
"date": ["2017-01-01", "2017-02-01", "2017-04-01", "2017-01-01", "2017-02-01", "2017-02-01"],
"amount": [2.0, 5.0, 7.0, np.nan, np.nan, 6.0]})
my_df_dask = dd.from_pandas(my_df, npartitions=5)
I want to group by userid and date, and compute a custom aggregation over the groups:
# Custom aggregation function:
def nan_sum(array):
if np.isnan(array).sum() == len(array):
return np.nan
else:
return np.nansum(array)
# And we convert it to a Dask Aggregation:
nan_sum_dask = dd.Aggregation('custom_nan_sum', nan_sum, nan_sum)
I we try to execute it:
my_df_dask.groupby(["userid", "date"]).agg({"amount": nan_sum_dask})
The following exception is raised:
Exception Traceback (most recent call last)
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname)
136 try:
--> 137 yield
138 except Exception as e:
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
3263 with raise_on_meta_error(funcname(func)):
-> 3264 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3265
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in _groupby_apply_funcs(df, *index, **kwargs)
634 for result_column, func, func_kwargs in funcs:
--> 635 r = func(grouped, **func_kwargs)
636
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in _apply_func_to_column(df_like, column, func)
667
--> 668 return func(df_like[column])
669
<ipython-input-20-07afb8c1b440> in nan_sum(array)
1 def nan_sum(array):
----> 2 if np.isnan(array).sum() == len(array):
3 return np.nan
~/anaconda3/lib/python3.6/site-packages/pandas/core/base.py in __getitem__(self, key)
249 raise Exception('Column(s) {selection} already selected'
--> 250 .format(selection=self._selection))
251
Exception: Column(s) amount already selected
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-21-fc8a04d0ed3d> in <module>()
----> 1 my_df_dask.groupby(["userid", "date"]).agg({"amount": nan_sum_dask})
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out)
1150 @derived_from(pd.core.groupby.DataFrameGroupBy)
1151 def agg(self, arg, split_every=None, split_out=1):
-> 1152 return self.aggregate(arg, split_every=split_every, split_out=split_out)
1153
1154
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
1146 return self.size()
1147
-> 1148 return super(DataFrameGroupBy, self).aggregate(arg, split_every=split_every, split_out=split_out)
1149
1150 @derived_from(pd.core.groupby.DataFrameGroupBy)
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
1030 ),
1031 token='aggregate', split_every=split_every,
-> 1032 split_out=split_out, split_out_setup=split_out_on_index)
1033
1034 @insert_meta_param_description(pad=12)
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, **kwargs)
3220
3221 if meta is no_default:
-> 3222 meta_chunk = _emulate(chunk, *args, **chunk_kwargs)
3223 meta = _emulate(aggregate, _concat([meta_chunk]),
3224 **aggregate_kwargs)
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
3262 """
3263 with raise_on_meta_error(funcname(func)):
-> 3264 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3265
3266
~/anaconda3/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
97 value = type()
98 try:
---> 99 self.gen.throw(type, value, traceback)
100 except StopIteration as exc:
101 # Suppress StopIteration *unless* it's the same exception that
~/anaconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname)
148 ).format(" in `{0}`".format(funcname) if funcname else "",
149 repr(e), tb)
--> 150 raise ValueError(msg)
151
152
ValueError: Metadata inference failed in `_groupby_apply_funcs`.
Original error is below:
------------------------
Exception('Column(s) amount already selected',)
Traceback:
---------
File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error
yield
File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py", line 3264, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py", line 635, in _groupby_apply_funcs
r = func(grouped, **func_kwargs)
File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/dask/dataframe/groupby.py", line 668, in _apply_func_to_column
return func(df_like[column])
File "<ipython-input-20-07afb8c1b440>", line 2, in nan_sum
if np.isnan(array).sum() == len(array):
File "/home/ubuntu/anaconda3/lib/python3.6/site-packages/pandas/core/base.py", line 250, in __getitem__
.format(selection=self._selection))
Any ideas?
Thank you!
The exception Exception('Column(s) amount already selected',) is coming from pandas. It seems like your function is expecting an array and returning a scalar.
I think it should expect a SeriesGroupBy object, and return a Series.
I can take a closer look later if you're still having trouble. We need to improve the documentation on this.
Hmmm that's weird. If I happen to use my_df_dask.groupby(["userid", "date"]).agg(nan_sum_dask) instead of my_df_dask.groupby(["userid", "date"]).agg({"amount": nan_sum_dask}) it does work without any problem...
Hm that's too bad. It looks like Dask will need to be aware of whether an
Aggregation is being applied to a selected column... Any further debugging
you can do here would be appreciated.
On Tue, Sep 18, 2018 at 3:11 AM Julio Antonio Soto notifications@github.com
wrote:
Hmmm that's weird. If I happen to use my_df_dask.groupby(["userid",
"date"]).agg(nan_sum_dask) instead of my_df_dask.groupby(["userid",
"date"]).agg({"amount": nan_sum_dask}) it does work without any problem...—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/dask/issues/3990#issuecomment-422297928, or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABQHIp9-DcK7Z1PiphiKxY1qZ34hXEvVks5ucKqUgaJpZM4WsKm7
.
Oh just nevermind. The problem was in my dd.Aggregation object. I didn't know that the functions for the chunk and agg argument constructors need a pandas.core.groupby.groupby.SeriesGroupBy as the argument that enters those functions.
This means that
nan_sum_dask = dd.Aggregation('custom_nan_sum',
lambda x: x.agg(nan_sum),
lambda y: y.agg(nan_sum)
)
Does work.
So, the issue is solved :)
I will try to think about how to extend the documentation for this use case.
Thank you again @TomAugspurger!
Most helpful comment
Oh just nevermind. The problem was in my dd.Aggregation object. I didn't know that the functions for the
chunkandaggargument constructors need apandas.core.groupby.groupby.SeriesGroupByas the argument that enters those functions.This means that
Does work.
So, the issue is solved :)
I will try to think about how to extend the documentation for this use case.
Thank you again @TomAugspurger!