Is it any way to execute filter, map and flatMap using multiple threads? The only option I found is just to run subscribe block concurrently as mentioned here https://github.com/ReactiveX/RxJava/issues/1673
Here is a code snippet that illustrates the problem(it's in Scala but I can post it as Java if needed):
def withDelay[T](delay: Duration)(t: => T) = {
Thread.sleep(delay.toMillis)
t
}
Observable
.interval(500 millisecond)
.filter(x => {
withDelay(1 second) { x % 2 == 0 }
})
.map(x => {
withDelay(1 second) { x * x }
}).subscribe(println(_))
The corresponding question on StackOverflow is posted - http://stackoverflow.com/questions/33749045/how-to-execute-map-filter-flatmap-using-multiple-threads-in-rxscala-java
Thanks in advance for all the responses.
RxJava is inherently sequential, it means that all operators will never call onNext, onError and onCompleted concurrently. Thus there is no mid-stream parallelism unlike Java 8 Streams. If you want to do concurrent execution, you have to split the sequence and recombine it. Depending on your scenario, you can do this via flatMap or groupBy+flatMap. For example:
source.flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation()).filter(...))
AtomicLong group = new AtomicInteger();
int n = Runtime.getRuntime().availableProcessors();
source.groupBy(k -> group.getAndIncrement() % n)
.flatMap(g -> g.observeOn(Schedulers.computation()).filter(...))
.subscribe(...)
In these examples, filter will happen in parallel on different values and are recombined into a single stream.
There is an RxJavaParallel library that aims for providing more convenient parallelism but it isn't actively developed at the moment.
Most helpful comment
RxJava is inherently sequential, it means that all operators will never call onNext, onError and onCompleted concurrently. Thus there is no mid-stream parallelism unlike Java 8 Streams. If you want to do concurrent execution, you have to split the sequence and recombine it. Depending on your scenario, you can do this via flatMap or groupBy+flatMap. For example:
In these examples,
filterwill happen in parallel on different values and are recombined into a single stream.There is an RxJavaParallel library that aims for providing more convenient parallelism but it isn't actively developed at the moment.