Hi,sorry to bother you guys,but there are some troubles puzzle me when I using RxJava.
List<Integer> test = Arrays.asList(1,2,3);
Observable.from(test)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(final Integer number) {
return Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.d("TAG", "createIntDelay: " + Thread.currentThread());
try {
if(number == 2){
Thread.sleep(3000);
}else if(number == 1){
Thread.sleep(5000);
}else {
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
subscriber.onNext(number);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("TAG", "call: " + integer);
}
});
This is the code I wrote,and I got the output with this order:
3,2,1
in 5 seconds.
It obviously changes the list's order.So I used concatMap instead:
List<Integer> test = Arrays.asList(1,2,3);
Observable.from(test)
.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(final Integer number) {
return Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.d("TAG", "createIntDelay: " + Thread.currentThread());
try {
if(number == 2){
Thread.sleep(3000);
}else if(number == 1){
Thread.sleep(5000);
}else {
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
subscriber.onNext(number);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d("TAG", "call: " + integer);
}
}
);
The output changes the order like:
1,2,3
But the duration also changed,it spends 9 seconds.Three tasks are executed in sequentially.
I viewed the source code about concatMap and find there is a variable named active,it will control the workflow.When a task is running,active will be set with true so that other tasks cannot be executed,just waiting in a for loop.
I wonder is there has any workaround to solve the problem:
Doing the concurrent executions and the order of tasks does not change.
I am looking forward to your reply,Thanks a lot!
Maybe concatMapEager solve this.
@ytRino It works.Thanks a lot
Most helpful comment
Maybe
concatMapEagersolve this.