Rxjava: Execute map, filter, flatMap using multiple threads in RxJava/Scala

Created on 17 Nov 2015  路  1Comment  路  Source: ReactiveX/RxJava

Is it any way to execute filter, map and flatMap using multiple threads? The only option I found is just to run subscribe block concurrently as mentioned here https://github.com/ReactiveX/RxJava/issues/1673

Here is a code snippet that illustrates the problem(it's in Scala but I can post it as Java if needed):

def withDelay[T](delay: Duration)(t: => T) = {
    Thread.sleep(delay.toMillis)
    t
  }

  Observable
    .interval(500 millisecond)
    .filter(x => {
      withDelay(1 second) { x % 2 == 0 }
    })
    .map(x => {
      withDelay(1 second) { x * x }
    }).subscribe(println(_))

The corresponding question on StackOverflow is posted - http://stackoverflow.com/questions/33749045/how-to-execute-map-filter-flatmap-using-multiple-threads-in-rxscala-java

Thanks in advance for all the responses.

Question

Most helpful comment

RxJava is inherently sequential, it means that all operators will never call onNext, onError and onCompleted concurrently. Thus there is no mid-stream parallelism unlike Java 8 Streams. If you want to do concurrent execution, you have to split the sequence and recombine it. Depending on your scenario, you can do this via flatMap or groupBy+flatMap. For example:

source.flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation()).filter(...))

AtomicLong group = new AtomicInteger();
int n = Runtime.getRuntime().availableProcessors();

source.groupBy(k -> group.getAndIncrement() % n)
.flatMap(g -> g.observeOn(Schedulers.computation()).filter(...))
.subscribe(...)

In these examples, filter will happen in parallel on different values and are recombined into a single stream.

There is an RxJavaParallel library that aims for providing more convenient parallelism but it isn't actively developed at the moment.

>All comments

RxJava is inherently sequential, it means that all operators will never call onNext, onError and onCompleted concurrently. Thus there is no mid-stream parallelism unlike Java 8 Streams. If you want to do concurrent execution, you have to split the sequence and recombine it. Depending on your scenario, you can do this via flatMap or groupBy+flatMap. For example:

source.flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation()).filter(...))

AtomicLong group = new AtomicInteger();
int n = Runtime.getRuntime().availableProcessors();

source.groupBy(k -> group.getAndIncrement() % n)
.flatMap(g -> g.observeOn(Schedulers.computation()).filter(...))
.subscribe(...)

In these examples, filter will happen in parallel on different values and are recombined into a single stream.

There is an RxJavaParallel library that aims for providing more convenient parallelism but it isn't actively developed at the moment.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

nltran picture nltran  路  4Comments

midnight-wonderer picture midnight-wonderer  路  3Comments

dlew picture dlew  路  4Comments

dimsuz picture dimsuz  路  4Comments

aballano picture aballano  路  3Comments