Cudf: [FEA/QST] Ability to avoid a sort on the output for dask groupby

Created on 3 Sep 2019  路  18Comments  路  Source: rapidsai/cudf

Is your feature request related to a problem? Please describe.
Previous implementations of cudf groupby's used a hash based groupby that did not sort the output dataframe (Deviating from the order of pandas output). Current behavior is setting sort=True by default which means that even if we use a hash based groupby approach the output is then sorted (possibly to match pandas output).

The dask api does not support setting the sort argument to true or false. Resulting in a sort_values call post groupby. This operation can be expensive in a distributed setting, involve shuffles and can lead to memory issues(if multiple partitions are on the same gpu and partition size is relatively large).

Describe the solution you'd like
If would be great to have the ability to avoid this relatively expensive sort_values call if the output order does not matter to the user.

cuDF (Python) feature request

All 18 comments

Any opposition to just removing sorting the groupby in general? @ayushdg @beckernick @randerzander @shwina @thomcom @brandon-b-miller? I think this is a place where we shouldn't try to necessarily match Pandas behavior.

Leaning slightly towards sort=True being default to match user expectations, and perhaps making it possible to change this globally via something like:

import cudf

cudf.settings.groupby_sort = False

I think that people tend to follow up a groupby by an explicit sort in practice, because they're looking to find the group or groups with the biggest/smallest values of some metric. Moreover even if we actually guaranteed a sorted result, data scientists would call sort anyways because it makes their code easier to follow during review.

In that case there's no use assuming any desired order just so the user can immediately re-sort it.

they're looking to find the group or groups with the biggest/smallest values of some metric.

This is precisely why I think sort=True should be the default behaviour (and why Pandas makes it so). Pandas recommends setting sort=False for improved performance and I think we should do the same.

Further, when sort-based and hash-based groupby are unified into a single API, users will see that _some aggregations return sorted dataframes while others return unsorted dataframes_. This will lead to confusion and frustration for users that are (unfairly) expecting a 100% Pandas experience.

I ran some tests for sort=True vs sort=False.
N_rows = 100 mil
N_cols = 2

| N_groups (approx) | Sort=True | Sort=False |
|-------------------|-----------|------------|
| 10 mil | 726 ms | 524ms |
| 50 mil | 1.8 s | 1s |
| 100 mil | 2.6 s | 1.26s |

Link to the experiments: https://gist.github.com/ayushdg/7ead40391c1573bef975b417218b1d31

I have attached the dask profiles. Which doesn't make too much sense to me (any insight would be appreciated). According to the profile majority of the time goes in sort_values but according to the task graph majority of the time is taken by transfer-agg...
Groupby perf.zip

FYI: Had a conversation with @jcrist about this when we happened to be at the same conference, and based on that conversation the way Dask requires sorting the index which requires sorting the groupby result here.

@mrocklin any thoughts here? Happy to open an issue to continue discussion in the dask github if you think it's more appropriate.

If you're doing a groupby-aggregation then the cost of sorting won't matter either way. You'll have a single output partition.

If you're doing a groupby-apply then it's not clear to me that Dask Dataframe will perform the sort. I think it tends to just hash things (although, given the title of the issue, it sounds like I might be wrong).

@mrocklin If you have a somewhat large number of groups then the cost of sorting on the index, which are the group key columns in this instance, is non-negligible. Unless you're saying that we need to do the sorting anyway as part of shuffling around intermediate results?

Quick question: are you doing a groupby-apply or a groupby-aggregation (like df.groupby([x, y]).z.mean()). If groupby-aggregation, how many output groups?

In the gist above it looks like he tested 10MM, 50MM, and 100MM groups, so this is definitely a place where sorting is non-negligible.

Ah, I see the gist now. It looks like we're going groupby-aggregation as well. (this is really the main split in terms of what choices we make)

I looked through dask/dataframe/groupby.py and didn't see any explicit calls to sort.

I also ran a couple of groupby-aggregations on pandas code and saw that it seemed to be dependent on the aggregation used.

In [1]: import dask
das
In [2]: df = dask.datasets.timeseries()

In [3]: df.groupby("name").id.max().compute()
Out[3]:
name
Alice       1157
Bob         1136
Charlie     1130
Dan         1149
Edith       1149
Frank       1163
George      1146
Hannah      1147
Ingrid      1134
Jerry       1135
Kevin       1151
Laura       1142
Michael     1140
Norbert     1132
Oliver      1133
Patricia    1135
Quinn       1159
Ray         1135
Sarah       1137
Tim         1132
Ursula      1145
Victor      1137
Wendy       1144
Xavier      1144
Yvonne      1149
Zelda       1153
Name: id, dtype: int64

In [4]: df.groupby("id").name.count().compute()
Out[4]:
id
860      2
861      1
864      2
875      9
877     17
        ..
862      1
855      1
1137     3
1157     1
857      1
Name: name, Length: 304, dtype: int64

It seems like dask dataframe isn't always sorting by default. In this situation I recommend placing a breakpoint into the cudf sort operation, and then seeing where in the code it gets triggered.

It could also be that pandas is sorting when concatenating with certain indexes? I'm not sure. Regardless, I think that there is enough uncertainty about what is causing the sort that I would go back to the application and probe more directly for the origin of the errant behavior.

My understanding is that if the aggregate is a reduction-like operation (min,max,count etc.) Dask does not sort the underlying Dataframe. For eg: If aggregation is max, the algorithm should be

  • groupby.max() for each partition
  • Transfer each partition's grouped result to one worker (if split_out = 1)
  • Recompute groupby.max() on this result df to get the output.
    (Not sure what the process is if split_out > 1, but my guess is that there might be sorting and indices involved to find a clean split for the output)

For the algorithm mentioned above, each groupby operation called on a partition invokes Cudf's groupby which by default sets sort=True.

The Dask API does not support the sort parameter at all and defaults to whatever the behavior of the underlying dataframe is. (For simple reduction like operations). The api docs say default True but that's because the docs were taken from the pandas docstring.

On the cudf side, we call libcudf's groupby (using a hash based approach) and sort the groupby output after possibly to match pandas output.

Non reduction operations/ custom aggregations / splitting the groupby output into multiple partitions are a whole different story and likely involve sorting indices, shuflling partitions around, knowing divisions etc.

The dask-cudf timings shown in the gist compared modifying cudf.groupby default between sort=False and sort=True

A lot of the points I made are based on the dask docs here

@ayushdg I believe Dask DataFrame doesn't support passing as_index=False into the groupby call.

It was mentioned that Dask always sorts its index, so if we run a groupby and return the key columns as an unordered index, would Dask trigger a sort?

It was mentioned that Dask always sorts its index, so if we run a groupby and return the key columns as an unordered index, would Dask trigger a sort?

Dask actually dosen't always sort its index. The example above shows this.

In [4]: df.groupby("id").name.count().compute()
Out[4]:
id
860      2
861      1
864      2
875      9
877     17
        ..
862      1
855      1
1137     3
1157     1
857      1
Name: name, Length: 304, dtype: int64

What I'm hearing from @ayushdg 's analysis (nice work by the way) is that you'd prefer for the default behavior for some groupby-aggregations to be to not sort the intermediaries, and to possibly expose the sort= parameter for groupby-aggregations so that its value is passed through to the final aggregation. These seem like reasonable changes to me. You might want to raise an issue with dask/dask though to check in with the dask dataframe maintainers though.

@TomAugspurger any concerns with not sorting / exposing sorting for groupby-aggregations? Presumably this would err if we also had split_out=

Thanks for the explanation @mrocklin, at this point it sounds like we should move the issue over to Dask to continue discussion with Dask maintainers. @ayushdg would you mind opening the issue for discussion there and pinging the relevant parties here? 馃槃

Was this page helpful?
0 / 5 - 0 ratings