Rxjava: A problem about concurrency in RxJava

Created on 1 Feb 2017  路  2Comments  路  Source: ReactiveX/RxJava

Hi,sorry to bother you guys,but there are some troubles puzzle me when I using RxJava.

       List<Integer> test = Arrays.asList(1,2,3);
       Observable.from(test)
                .flatMap(new Func1<Integer, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(final Integer number) {
                        return Observable
                                .create(new Observable.OnSubscribe<Integer>() {
                                    @Override
                                    public void call(Subscriber<? super Integer> subscriber) {
                                        Log.d("TAG", "createIntDelay: " + Thread.currentThread());
                                        try {
                                            if(number == 2){
                                                Thread.sleep(3000);
                                            }else if(number == 1){
                                                Thread.sleep(5000);
                                            }else {
                                                Thread.sleep(1000);
                                            }
                                        }catch (Exception e){
                                            e.printStackTrace();
                                        }
                                        subscriber.onNext(number);
                                        subscriber.onCompleted();
                                    }
                                })
                                .subscribeOn(Schedulers.newThread());
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("TAG", "call: " + integer);
                    }
        });

This is the code I wrote,and I got the output with this order:
3,2,1
in 5 seconds.
It obviously changes the list's order.So I used concatMap instead:

       List<Integer> test = Arrays.asList(1,2,3);
       Observable.from(test)
                .concatMap(new Func1<Integer, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(final Integer number) {
                        return Observable
                                .create(new Observable.OnSubscribe<Integer>() {
                                    @Override
                                    public void call(Subscriber<? super Integer> subscriber) {
                                        Log.d("TAG", "createIntDelay: " + Thread.currentThread());
                                        try {
                                            if(number == 2){
                                                Thread.sleep(3000);
                                            }else if(number == 1){
                                                Thread.sleep(5000);
                                            }else {
                                                Thread.sleep(1000);
                                            }
                                        }catch (Exception e){
                                            e.printStackTrace();
                                        }
                                        subscriber.onNext(number);
                                        subscriber.onCompleted();
                                    }
                                })
                                .subscribeOn(Schedulers.newThread());
                    }

                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                               @Override
                               public void onCompleted() {

                               }

                               @Override
                               public void onError(Throwable e) {

                               }

                               @Override
                               public void onNext(Integer integer) {
                                   Log.d("TAG", "call: " + integer);
                               }
                           }
         );

The output changes the order like:
1,2,3
But the duration also changed,it spends 9 seconds.Three tasks are executed in sequentially.
I viewed the source code about concatMap and find there is a variable named active,it will control the workflow.When a task is running,active will be set with true so that other tasks cannot be executed,just waiting in a for loop.
I wonder is there has any workaround to solve the problem:

Doing the concurrent executions and the order of tasks does not change.

I am looking forward to your reply,Thanks a lot!

Most helpful comment

Maybe concatMapEager solve this.

All 2 comments

Maybe concatMapEager solve this.

@ytRino It works.Thanks a lot

Was this page helpful?
0 / 5 - 0 ratings