Dask: Impossible to have a user defined Aggregation when grouping on more than one field

Created on 23 Sep 2017  路  3Comments  路  Source: dask/dask

I define a custom aggregation as follows:

custom_agg = dd.Aggregation('custom_agg', lambda s: list(s), lambda s: list(s))

When using that on a dataframe grouped on one field, it works:

ipdb> df.groupby('url').agg(custom_agg)
Dask DataFrame Structure:
              referrer session_id      ts customer    hour
npartitions=1
                object     object  object   object  object
                   ...        ...     ...      ...     ...

When grouping the same dataframe on more than one field, it throws an exception about requiring a MultiIndex:

ipdb> df.groupby(['customer', 'url', 'ts']).agg(custom_agg)
*** ValueError: Metadata inference failed in `apply`.

Original error is below:
------------------------
ValueError('multiple levels only valid with MultiIndex',)

Traceback:
---------
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/utils.py", line 137, in raise_on_meta_error
    yield
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/core.py", line 3128, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/groupby.py", line 614, in _groupby_apply_funcs
    grouped = _groupby_raise_unaligned(df, **kwargs)
  File "/Users/irina/.pyenv/versions/talks/src/dask/dask/dataframe/groupby.py", line 133, in _groupby_raise_unaligned
    return df.groupby(**kwargs)
  File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/generic.py", line 4416, in groupby
    **kwargs)
  File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/groupby.py", line 1699, in groupby
    return klass(obj, by, **kwds)
  File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/groupby.py", line 392, in __init__
    mutated=self.mutated)
  File "/Users/irina/.pyenv/versions/talks/lib/python2.7/site-packages/pandas/core/groupby.py", line 2591, in _get_grouper
    raise ValueError('multiple levels only valid with '

Is it something I'm doing wrong?

Most helpful comment

@chmp Yes, that's exactly what I want here, combine all the elements into a list. Spark SQL equivalent would be collect_list. Thank you!

P.S. I think the custom Aggregation needs a few more complicated examples in documentation. It's difficult to figure out how to do anything different from the usual sum, mean, etc.

All 3 comments

cc @chmp in case they have thoughts

@j-bennet: could you give a short description of your goal? I assume something similar to Postgres' array_agg, i.e., collecting all values into lists for each group. A simple implementation would be:

custom_agg = dd.Aggregation('custom_agg', lambda s: s.apply(list), lambda s: s.apply(list))

To flatten the result lists, use:

import itertools as it

custom_agg = dd.Aggregation(
    'custom_agg', 
    lambda s: s.apply(list), 
    lambda s: s.apply(lambda chunks: list(it.chain.from_iterable(chunks))),
)

TBH, I am not completely sure where your implementation is going wrong. It seems you destroy the group structure of the result by returning a list not a dataframe as expected by dask. Why it does work without multiple grouping-keys is a mystery to me.

@chmp Yes, that's exactly what I want here, combine all the elements into a list. Spark SQL equivalent would be collect_list. Thank you!

P.S. I think the custom Aggregation needs a few more complicated examples in documentation. It's difficult to figure out how to do anything different from the usual sum, mean, etc.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

asmith26 picture asmith26  路  3Comments

rmancy-smg picture rmancy-smg  路  4Comments

cornhundred picture cornhundred  路  4Comments

bamal picture bamal  路  5Comments

jakirkham picture jakirkham  路  5Comments