Now that some changes have been merged into Dask.dataframe, we should be able to remove some of the shuffle-based methods in the dask-cudf library and rely on the implementations upstream. This is in service of https://github.com/rapidsai/cudf/issues/2272 .
To be clear, these implementations aren't fully on the device yet, but anecdotally they do seem to be faster than the current implementation, and are likely less buggy (they're under heavy use by the Pandas community). Plus, once we do have some of the GPU primitives that we need, this step to transition things over will already be done.
I anticipate that we'll run into a couple of small issues along the way, but hopefully nothing as complex as we had to deal with when generalizing the shuffle code.
After dask#5205, Dask's set_index works with cudf dataframes when shuffle="tasks", but not for shuffle="disk" (the default). The disk-backed version constructs a graph using groupby (e.g. d = {i: g.get_group(i) for i in g.groups}), and cudf does not supprot groups and/or get_group.
In order to allow dask_cudf to completely fall back on dask here, we need to do one of two things:
groups and get_group to cudfIn the meantime, leaving a simple set_index wrapper in dask-cudf (using shuffle="tasks") also works, but (1) or (2) is probably a better long-term solution.
@kkraus14 - In #1720, you seemed hesitant to add the groupby functionality needed for (2). Do you believe (2) is a "no go"?
I think that we should just omit the use of the "disk" based shuffle technique for now and hard-code the "tasks" technique.
Just an update here...
dask_cudf-based set_index code in favor of the main-line dask implementation. However, the dask-based implementation still needs to move a column of data from device to host during the operation. Eliminating this data transfer is our highest priority, and the necessary work is in progress in cudf (i.e. cudf#2655 and cudf#2677).Assuming set_index-on-device is on track, the next priority is the removal of dask_cudf-based merge code (also in favor of main-line dask). This effort requires the same cudf changes as set_index to keep data on device. It also requires a small tweak in dask to use cudf's hash_columns method in lieu of hash_pandas_object.
Once the merge code is in a robust/accelerated state, I expect that we can/should rewrite join to rely on the optimized merge.
Overall, my plan is to open WIP PR's in both dask and dask_cudf to push on the integration of all these changes.
@rjzamora In exploring this, if there's places where we can dispatch an entire function similar to what we're doing with #2677, please let us know of any potential places. This typically gives us the ability to give even greater speedups by avoiding intermediate memory materializations and kernel launches.
Sounds good @kkraus14 !
Just to clarify the high-level shuffle algorithm used by dask for the set_index/merge cases:
# Part 1: Generate a new column, with each value being the new partition for that row:
partitions = index.map_partitions(<some-parititioning-function>,...)
df2 = df.assign(_partitions=partitions)
...
# Part 2 (EXPENSIVE PART) - Sort data into the new partitions:
df3 = rearrange_by_column(df2, "_partitions", npartitions=npartitions, ...)
# Return re-partitioned/sorted dask dataframe
del df3["_partitions"]
return df3
The goal of #2655 is to keep all data on the device between parts 1 and 2. The goal of #2677 is to accelerate the shuffle_group component of part 2 (which currently relies on the pandas' groupsort_indexer).
For the phase 1 of merge, partitions are calculated by hashing, so we can probably fuse the hashing and column-assignment by adding an "inplace" option to hash_columns, directly adding a new column (rather than returning a series).
Phase 2 is much messier, but we can certainly find further optimizations if #2677 proves too small.
@rjzamora Is Part 2 a sort or just a shuffle? We already have a partition_by_hash function which given a set of column(s) partitions the dataframe into n parts: https://rapidsai.github.io/projects/cudf/en/0.10.0/api.html#cudf.dataframe.DataFrame.partition_by_hash
It seems like this is aligned to our function, we'd just need a way to shuffle the local partitioned dataframes around the workers.
We already have a partition_by_hash function which given a set of column(s) partitions the dataframe into n parts: https://rapidsai.github.io/projects/cudf/en/0.10.0/api.html#cudf.dataframe.DataFrame.partition_by_hash
I believe this is indeed pretty close to what we need for the cases where we are using hashing for partitioning (in merge), but it is not quite what we want for cases where known/specified divisions are used to define a "_partitions" column (which is in-turn used for partitioning).
I will certainly see if I can work in partition_by_hash for hash-based partitioning - good call.
I would prefer to keep the single unified code path for now and only start specializing if we find that it is a performance problem.
I would prefer to keep the single unified code path for now and only start specializing if we find that it is a performance problem.
Agreed, but we should probably be testing bespoke optimizations along the way (as long as it is relatively easy!) so that we know how much performance we are leaving on the table (to help prioritize/plan further optimizations).
I would prefer to keep the single unified code path for now and only start specializing if we find that it is a performance problem.
I can see the argument for wanting a unified code path, but in addition to this being a "future optimization", it more importantly guards against playing interoperability, API compatibility, and performance whack-a-mole to get MNMG functionality going on a critical feature that we'll surely be optimizing in the near future anyway.
Sure, the cost is that you're now requiring that Dask make another stage in its computation dispatchable, which requires a longer discussion on that side.
I suggest that we stick to the current plan, then benchmark things and see where performance problems are, and then revisit this conversation at that point.
Resolved in 0.11.
Most helpful comment
@rjzamora In exploring this, if there's places where we can dispatch an entire function similar to what we're doing with #2677, please let us know of any potential places. This typically gives us the ability to give even greater speedups by avoiding intermediate memory materializations and kernel launches.