Rxjava: Zip behaviour with different schedulers in combining observables

Created on 18 Feb 2015  路  11Comments  路  Source: ReactiveX/RxJava

I faced unpredictable behaviour in Zip scheduling. I have observables that supplying items in different schedulers. Zip combine function will be executed when "last" item in set will be supplied by one of observables. Also tick() and then zipFunction.call(...) will be executed in scheduler of "last" observable. It means we don't know about scheduler where zip combining function will be executed.

For example I have a ConnectableObservable that was started by connect() in main thread. And then I want to combine it with other observables.

//somewhere in main thread
observable1.connect();

//somewhere else
Observable.zip(
    observable1,
    observable2,
    (res1, res2) -> combine(res1, res2)
).subscribeOn(Schedulers.io())

Of course observable2 will be executed in the one of Schedulers.io() threads.
But in combining function I have no idea about my current thread. It can be main thread or one of Scheduler.io() threads. It's quite not logical in some cases.

I suggest to add variations of zip that can operate in specified scheduler, e.g. explicitly pass scheduler to zip operator or make operator like zipMap that will be operating in scheduler you pass to whole Observable.

Any ideas?

Question

Most helpful comment

Hi @franciscojunior

Can you try specifying the scheduler for which a second Single is operating on? e.g.

single1
    .observeOn(mainThreadScheduler)
    .zipWith(single2.subscribeOn(mainThreadScheduler))

or if you use static method version

Single.zip(
    single1.subscribeOn(mainThreadScheduler),
    single2.subscribeOn(mainThreadScheduler),
    (car, engine) -> {
        // do something in main thread
    })

From my observation, I think zip operator's combine function is being called in a scheduler that the source singles are operating on. If the sources are operating on different scheduler, it is quite uncertain as to which scheduler will be used to run the combine function. I guess it will be a scheduler of a single that emit the latter item for each combination of a zipped items. By specifying mainThreadScheduler to both singles, it will ensure that the combine function will only be run from the main thread.

All 11 comments

/cc @benjchristensen @akarnokd @mironov-nsk

Why does the thread of execution matter in this case? That is the nature of reactive, it can be any thread that calls you.

If you wish to move the work to another thread, combine the values in zip then observeOn to a different Scheduler to do further work.

Any further discussion on this?

No, thanks a lot for clarification.

@benjchristensen I hava the same problem.I want the zip operate work on a particual Scheduler.
Because one of the var can only work on a particual Scheduler.

other Scheduler will crash.

I have quite a bit related issue. My scenario involves applying zip operator on 2 unknown Singles and when my zip function is called, I want to update UI using values emitted from these Singles. The UI update must be done on a particular scheduler i.e. main thread and the Singles should operate on another scheduler.

What I am doing it now is like this:-

// `single1` and `single2` have been gotten from unknown source

Single.zip(
    single1,
    single2,
    (car, engine) -> {
        Object[] objs = {car, engine};
        return objs;
    })
    .subscribeOn(schedulerA)
    .observeOn(mainThreadScheduler)
    .subscribe(objs -> {
        Car car = (Car) objs[0];
        Engine engine = (Engine) objs[1];
        // Use `car` and `engine` to update UI in main thread
    });

I'm not sure what I've done is the most effective approach but from what I see, the code to pack emitted values in an Object array for the sake of being able to use it in the main thread is quite cumbersome. When I encounter this issue, I always look for a version of Single.zip() that accept one more parameter of type Scheduler but unfortunately it seems doesn't exist.

Hi, @onelaview .

I'm facing the same issue right now. One workaround I tried was, instead of using the static method, try using the instance method:

single1
    .observeOn(...)
    .zipWith(...)

I had some issues though. The first time the operator is called, the correct thread is specified to run the zip operator and UI is updated correctly, but in the second run, I get an exception talking about not using the correct thread to update the UI.

I'm still looking for some other reliable way to do that. I also think returning the Object array very cumbersome.

I hope it helps.

Hi @franciscojunior

Can you try specifying the scheduler for which a second Single is operating on? e.g.

single1
    .observeOn(mainThreadScheduler)
    .zipWith(single2.subscribeOn(mainThreadScheduler))

or if you use static method version

Single.zip(
    single1.subscribeOn(mainThreadScheduler),
    single2.subscribeOn(mainThreadScheduler),
    (car, engine) -> {
        // do something in main thread
    })

From my observation, I think zip operator's combine function is being called in a scheduler that the source singles are operating on. If the sources are operating on different scheduler, it is quite uncertain as to which scheduler will be used to run the combine function. I guess it will be a scheduler of a single that emit the latter item for each combination of a zipped items. By specifying mainThreadScheduler to both singles, it will ensure that the combine function will only be run from the main thread.

Thanks, @onelaview for your reply!

In my case, the observables are indeed being "subscribed on" in a another scheduler when the observables are being created. The scheduler used is the io() one.

From my observation, I think zip operator's combine function is being called in a scheduler that the source singles are operating on. If the sources are operating on different scheduler, it is quite uncertain as to which scheduler will be used to run the combine function. I guess it will be a scheduler of a single that emit the latter item for each combination of a zipped items. By specifying mainThreadScheduler to both singles, it will ensure that the combine function will only be run from the main thread.

I see. I'll try that.

What's strange to me is that I thought the observeon operator would cause the scheduler to change regardless of the original scheduler where the observer was subscribed on. Maybe I'm missing something (or a lot of things :) ).

From the tests I made initially, I noticed that from 8 calls to this part of the code, 7 were run in the scheduler I specified in the observe on operator (in agreement to what I would expect by using the observeon operator) and 1 was called in the io operator (the one used to emit the item, which is totally against my idea of the observeon operator)

I'll keep testing and will try your suggestion. I'm not in my dev machine so I can't make the tests right now.

Thank you again for your reply!

Hi, @onelaview

I tried and it didn't work because my second observable already specify a scheduler to subscribe on. But what you said about the latter item scheduler makes totally sense and I think that's why the function was being run in different threads.

My requirements changed and I used another operator. But thank you anyway for your feedback! It was really helpful.

Hi @franciscojunior

You may try to use .observeOn() on your second observable instead of .subscribeOn() i.e.

single1
    .observeOn(mainThreadScheduler)
    .zipWith(single2.observeOn(mainThreadScheduler))

No matter what scheduler a Single is originally operating on, the observeOn() will instruct it to emit an item on a particular scheduler.

http://reactivex.io/documentation/operators/observeon.html

Was this page helpful?
0 / 5 - 0 ratings