Effects version 2.0.8. The ordering of the observable sources in Observable#combineLatest impacts its resulting emissions. For example;
Observable<Integer> intObs = Observable.just(1, 2, 3, 4, 5);
Observable<String> stringObs = Observable.just("One");
Observable<String> stringObs2 = Observable.just("Two");
Observable.combineLatest(intObs, stringObs, stringObs2,
(i, s, s2) -> "Observable 1: " + i + " " + s + " " + s2)
.blockingIterable()
.forEach(System.out::println);
Observable.combineLatest(stringObs, stringObs2, intObs,
(s, s2, i) -> "Observable 2: " + i + " " + s + " " + s2)
.blockingIterable()
.forEach(System.out::println);
gives the following output;
Observable 1: 5 One Two
Observable 2: 1 One Two
Observable 2: 2 One Two
Observable 2: 3 One Two
Observable 2: 4 One Two
Observable 2: 5 One Two
I am uncertain whether this is the intended functionality or not as some of the unit tests seem to accept that this is the behaviour. Therefore this is either a bug or I believe that the documentation should better indicate that the order dictates from which observable source the latest item is taken from first.
In my opinion, the problem comes from the ObservableCombineLatest.LatestCoordinator#combine method. Here values are passed in from each of the observable sources. When a multiple item _just_ observable (like above) is passed in first, combine() then skips each item until the last before there is a null signal from the completion of that observable source. The next _just_ observable sources are then passed in and an item of each is then collected, then added to the queue and subsequently drained. When the order is switched, and the multiple item _just_ is not first then an emission is added to the queue in combine() for every item as the upper bounding null is unknown.
I wrote the following breaking unit test to further explain; https://github.com/philleonard/RxJava/commit/acffb3b61492441aca9b9310f5aca898cab86e2d
I'm happy to implement the fix or documentation changes.
This is the expected behavior with synchronous sources. You are probably thinking in a zip-like operator which picks one item from each source to have them balanced.
Have you tried with asynchronous sources?
Let me tell you the story of combineLatest. When it was first concieved, the driving example was a GUI form field where each input field would fire off a change event and combineLatest would collect them all and once all field has fired at least once, it is supposed to fire off an event that enabled a button or performed validation.
This use case works to this day but many don't recognize why this case works and other async cases not so much. combineLatest works because each source fires off events from the same thread and only one of them at a time and these sources are hot as well. If these don't hold in other cases, you get "unexpected" output patterns: cold sources rush through and async sources come in in arbitrary order resulting in arbitrary combinations.
So yes, there is an inherent order because for loops start from 0 to n - 1 and give preferential to the front sources. But you are allowed to post in an array that is randomized so each time the sources get evaluated, you get a random order of events.
I see. Yes, I agree the problem does originate from synchronous sources such as just and fromIterable. Introducing a delay such as; Observable<Integer> intObs = Observable.just(1, 2, 3, 4, 5).delay(100, TimeUnit.MILLISECONDS) then matches the output. I would still say that the documentation could benefit from some further explanation of how synchronous sources are handled, I was personally expecting different behaviour.
I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
Most helpful comment
This is the expected behavior with synchronous sources. You are probably thinking in a
zip-like operator which picks one item from each source to have them balanced.Have you tried with asynchronous sources?
Let me tell you the story of
combineLatest. When it was first concieved, the driving example was a GUI form field where each input field would fire off a change event andcombineLatestwould collect them all and once all field has fired at least once, it is supposed to fire off an event that enabled a button or performed validation.This use case works to this day but many don't recognize why this case works and other async cases not so much.
combineLatestworks because each source fires off events from the same thread and only one of them at a time and these sources are hot as well. If these don't hold in other cases, you get "unexpected" output patterns: cold sources rush through and async sources come in in arbitrary order resulting in arbitrary combinations.So yes, there is an inherent order because for loops start from 0 to n - 1 and give preferential to the front sources. But you are allowed to post in an array that is randomized so each time the sources get evaluated, you get a random order of events.