Ray: [ParallelIterator] built-in reduceByKey()

Created on 7 Jul 2020  路  8Comments  路  Source: ray-project/ray

I think there should be a built-in reduce transform in ParallelIterator. I'm aware of the DIY MapReduce example, but it does not work by combining keys.

For instance, Spark's rdd.reduceByKey()

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V.

P3 enhancement

Most helpful comment

Totally agree we should have this.

We really need 2 operations first.

  1. Reduce (self : ParallelIterator[B], initial_value : A, reduce_fn : (state : A, value : B) => res : A)) => A

  2. GroupBy, (the exact function signature may be a little different, but it might look like) (self : ParallelIterator[U]) => Map[U, ParallelIterator[U])

(1) wouldn't be too difficult to write (not sure if it would be useful in a vacuum).

(2) would require some care to do the data shuffling properly.

All 8 comments

cc @amogkam @wuisawesome

Totally agree we should have this.

We really need 2 operations first.

  1. Reduce (self : ParallelIterator[B], initial_value : A, reduce_fn : (state : A, value : B) => res : A)) => A

  2. GroupBy, (the exact function signature may be a little different, but it might look like) (self : ParallelIterator[U]) => Map[U, ParallelIterator[U])

(1) wouldn't be too difficult to write (not sure if it would be useful in a vacuum).

(2) would require some care to do the data shuffling properly.

cc @pdames since I believe you've expressed interest in this as well

Thanks for the callout, @wuisawesome. We definitely have use-cases for this feature at Amazon as well, and could potentially assist with some of the initial implementation work.

Awesome! @pdames I'll reach out to you offline

@crypdick can you provide some context about the scale of your parallel iterator needs (approx #keys, size of dataset, etc)?

@wuisawesome ~7.1M keys, ~30 items per key

I see that this functionality is in the streaming module: https://github.com/ray-project/ray/blob/master/streaming/python/examples/wordcount.py#L78-L83

Was this page helpful?
0 / 5 - 0 ratings