Rxjava: 2.x Design: parallel flows

Created on 18 Aug 2016  路  4Comments  路  Source: ReactiveX/RxJava

As part of the research effort in Reactive-Streams-Commons (Rsc) we have developed parallel version of the Publisher by vectorizing the Subscribers to a ParallelPublisher:

public abstract class ParallelPublisher<T> {
     public abstract int parallelism();
     public abstract boolean ordered();
     public abstract void subscribe(Subscriber<? super T>[] subscribers);
     // ... operators
}

whose copy is now part of Reactor-Core.

(RxJava 1.x has a companion library, RxJavaParallel, which doesn't seem to be working at all.)

The benefit of having a ParallelFlowable (and perhaps ParallelObservable) is that one can then fluently go parallel (and then back to sequential):

Flowable.range(1, 1000).parallel()
.runOn(Schedulers.computation())
.map(v -> someHeavyWork(v))
.sequential()
.subscribe(System.out::println);

The drawback is the increase in size, class and method count of the library, plus the additional way of getting confused about asynchrony, concurrency and parallelism regarding RxJava.

I suggest having at least the non-ordered version of the Rsc parallel classes for 2.0 and perhaps add the ordered variants to 2.1 or later.

Adding them takes a small amount of time because of simple copying and renaming elements in it.

Discussion

Most helpful comment

I've added ParallelFlowable to RxJava 2 Extensions.

ParallelFlowable.from(Flowable.range(1, 100))
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.subscribe(System.out::println, Throwable::printStackTrace);

// or

Flowable.range(1, 100)
.to(ParallelFlowable::from)
.runOn(Schedulers.computation())
.filter(v -> (v & 1) != 0)
.sequential()
.subscribe(System.out::println, Throwable::printStackTrace);

All 4 comments

Maybe this would be better off as a companion library? I can definitely see the potential, but I can also see many people not needing it really, and having in mind the cons that you mentioned (size, confusion), having it separately would ensure that only those who really need it suffer the cons.

My opinion. Having a companion library already proved to be a non practical option. RxJavaParallel isn't updated since November of 2014.

I don't see the increase in size, class and method count of the library as a problem. It's part of the improvement process.

The confusion about async, concurrency and parallelism can be solved with a good documentation, just like Reactor will most likely do.

+1 for having everything together and working on effective documentation to avoid misuses.

I've added ParallelFlowable to RxJava 2 Extensions.

ParallelFlowable.from(Flowable.range(1, 100))
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.subscribe(System.out::println, Throwable::printStackTrace);

// or

Flowable.range(1, 100)
.to(ParallelFlowable::from)
.runOn(Schedulers.computation())
.filter(v -> (v & 1) != 0)
.sequential()
.subscribe(System.out::println, Throwable::printStackTrace);
Was this page helpful?
0 / 5 - 0 ratings