I define a custom aggregation as follows:
custom_agg = dd.Aggregation('custom_agg', lambda s: list(s), lambda s: list(s))
When using that on a dataframe grouped on one field, it works:
ipdb> df.groupby('url').agg(custom_agg)
Dask DataFrame Structure:
referrer session_id ts customer hour
npartitions=1
object object object object object
... ... ... ... ...
When grouping the same dataframe on more than one field, it throws an exception about requiring a MultiIndex:
ipdb> df.groupby(['customer', 'url', 'ts']).agg(custom_agg)
*** ValueError: Metadata inference failed in `apply`.
Original error is below:
------------------------
ValueError('multiple levels only valid with MultiIndex',)
Traceback:
---------
File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/utils.py", line 137, in raise_on_meta_error
yield
File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/core.py", line 3128, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/groupby.py", line 614, in _groupby_apply_funcs
grouped = _groupby_raise_unaligned(df, **kwargs)
File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/groupby.py", line 133, in _groupby_raise_unaligned
return df.groupby(**kwargs)
File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/generic.py", line 4416, in groupby
**kwargs)
File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/groupby.py", line 1699, in groupby
return klass(obj, by, **kwds)
File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/groupby.py", line 392, in __init__
mutated=self.mutated)
File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/groupby.py", line 2591, in _get_grouper
raise ValueError('multiple levels only valid with '
Is it something I'm doing wrong?
cc @chmp in case they have thoughts
@j-bennet: could you give a short description of your goal? I assume something similar to Postgres' array_agg, i.e., collecting all values into lists for each group. A simple implementation would be:
custom_agg = dd.Aggregation('custom_agg', lambda s: s.apply(list), lambda s: s.apply(list))
To flatten the result lists, use:
import itertools as it
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(list),
lambda s: s.apply(lambda chunks: list(it.chain.from_iterable(chunks))),
)
TBH, I am not completely sure where your implementation is going wrong. It seems you destroy the group structure of the result by returning a list not a dataframe as expected by dask. Why it does work without multiple grouping-keys is a mystery to me.
@chmp Yes, that's exactly what I want here, combine all the elements into a list. Spark SQL equivalent would be collect_list. Thank you!
P.S. I think the custom Aggregation needs a few more complicated examples in documentation. It's difficult to figure out how to do anything different from the usual sum, mean, etc.
Most helpful comment
@chmp Yes, that's exactly what I want here, combine all the elements into a list. Spark SQL equivalent would be
collect_list. Thank you!P.S. I think the custom
Aggregationneeds a few more complicated examples in documentation. It's difficult to figure out how to do anything different from the usualsum,mean, etc.