Almost every time I see the 'parallel' operator used it is being used incorrectly and misunderstood. This leads me to believe we should remove it and instead educate on how to use merge and flatMap correctly which is generally what is trying to be achieved in uses of parallel.
Anyone have good reason not to eliminate it?
/cc @headinthebox
I'm in favor of removing it, as you say it's mostly used incorrectly and someone who wants to control parallelism can always use the ExecutorScheduler.
Thanks for raising this one, it's been on my list to mention.
A while back I used the parallel method for an HPC task and found it confusing. I expected to find Scala collections par method or equivalent but not to be found. For clarity's sake I later switched the code to use flatMap+onSubscribe to get the desired behaviour. I think the existing method tries to do too much (why get it to apply the sharding function when it could be done prior).
Could we simplify Observable.parallel so it just does:
Observable<T> parallel() {
return flatMap( o -> o.subscribeOn(Schedulers.computation()));
}
and an overload to specify the scheduler:
Observable<T> parallel(Scheduler scheduler) {
return flatMap( o-> o.subscribeOn(scheduler));
}
@davidmoten Those signatures don't take a function with the work to do while in parallel so aren't correct.
I suggest we remove them and allow a new RxJavaParallel project to explore alternatives and only once it is proven and matured to merge it into RxJava itself. This would include something like a ParallelObservable that allows map, flatMap, take, filter and other basic operators to execute in parallel without the normal serialized emission restriction.
Oh yeah woops, the merge means the parallel is lost. So I wonder what the rx-java analogy to the Scala collections par method is?
I suppose it's likely to be
Observable<Observable<T>> parallel() {
return map( o -> just(o).subscribeOn(Schedulers.computation()));
}
I wonder if that's of use?
I just had a look at Scala's par and it looks like it returns a special Parallel instance of the collection in question. So the analogy would be
ParallelObservable<T> parallel(Scheduler scheduler) {
return new ParallelObservable<T>(this, scheduler);
}
where ParallelObservable<T> is really Observable<Observable<T>> under the covers but you can interact with it as though it is Observable<T>. I imagine that ParallelObservable.subscribe would perform the flatten before normal subscription.
Blimey I've got reading problems, Ben suggested exactly this. Ta Ben.
Work on ParallelObservable will be done in https://github.com/ReactiveX/RxJavaParallel
This has been completed and will be released in 0.20.5 and 1.0 RC4.
Anyone looking for a replacement to the simple "run things in parallel" use case, you likely want this:
streamOfItems.flatMap(item -> {
doStuffWithItem(item).subscribeOn(Schedulers.io());
});
The doStuffWithItem needs to return an Observable obviously ... but the point is kick off your work for each item inside flatMap using subscribeOn to make it async, or with a function that already makes the calls be async.
Hi, @benjchristensen , @davidmoten , @jbripley , I agree with using flatMap(). I have 1 question, flatMap will do the operation and merge it back to main thread and onNext() will be called. What if one Observable ends with Observable.error() in that case Observable.error() will be called in main thread? If yes, then main thread will shut down and what will happen to other Observables running in parallel? If no, then result from other Observable will be triggered in onNext?
Correct me If I am wrong as I have just started with RxJava.
@rishi-anand please post your question on StackOverflow with details on what you have already implemented.
Most helpful comment
This has been completed and will be released in 0.20.5 and 1.0 RC4.
Anyone looking for a replacement to the simple "run things in parallel" use case, you likely want this:
The
doStuffWithItemneeds to return anObservableobviously ... but the point is kick off your work for each item insideflatMapusingsubscribeOnto make it async, or with a function that already makes the calls be async.