Rxjs: combineLatest on synchronous sources

Created on 24 Feb 2017  路  7Comments  路  Source: ReactiveX/rxjs

RxJS version: 5.1.0

Code to reproduce:

const a$ = Rx.Observable.of(1, 2);
const b$ = Rx.Observable.of(10);

const c$ = Rx.Observable.combineLatest(a$, b$, (a,b)=>a+b);

c$.subscribe(c => console.log(c));

Expected behavior:

11
12

Actual behavior:

12

Additional information:

I'm not sure is this a bug or a "feature". What should we do about this case? Is it just one of those "let's instruct people to choose a different scheduler if they want to have the expected behavior"?

For reference:
xstream: https://github.com/staltz/xstream/issues/173
most.js: https://github.com/cujojs/most/issues/414

bug

Most helpful comment

@mattpodwysocki you get this behavior from the recursive scheduler in v4. The "fix" in v5 is to use the queue scheduler. If you know of a solution that doesn't impact throughput performance, I'd be happy to change my recommendation.

All 7 comments

While zip is generally applicable to lists, combineLatest is undefined without the time dimension. The current behavior is fine, because the solution is to introduce your own notion of time. Rx generally shies away from defining one for you.

@trxcllnt no, this is a bug and a breaking change from RxJS v4.x. Please fix. The expected values from @staltz were how RxJS v4 operated

@mattpodwysocki you get this behavior from the recursive scheduler in v4. The "fix" in v5 is to use the queue scheduler. If you know of a solution that doesn't impact throughput performance, I'd be happy to change my recommendation.

I tend to agree with @trxcllnt on this one, although I haven't commented. Forcing queue scheduling on an operator in JavaScript is like trying to force the library to mimic some sort of gated/threaded model, which the majority of JS developers know it isn't. Once it's sunk in that Observable.of is synchronous by default the current behavior is obvious. What's outlined as the expected behavior here is less obvious as it deviates from how JavaScript fundamentally works.

Given this issue has been here this long and there's been little interest, I'd say the current behavior is fine and we can close the issue.

The "fix" if you want to change the behavior is to use the queue scheduler on incoming sources.

I like the recommendation to define your own scheduling here. I disagree that 11, 12 is expected when using a synchronous observable like that. I think at best this is an item for the docs to make the Scheduler argument of Observable.of clearer for what behavior it affects.

The biggest problem I see with Rxjs v5 semantics is that order matters, which is confusing at least to me.

That is to say

const a$ = Rx.Observable.of(1, 2);
const b$ = Rx.Observable.of(10);

const c$ = Rx.Observable.combineLatest(a$, b$, (a,b)=>a+b);

c$.subscribe(c => console.log(c));

will output 12 as Andre mentioned.

However, switch a and b and you will get 11, 12.

const a$ = Rx.Observable.of(1, 2);
const b$ = Rx.Observable.of(10);

const c$ = Rx.Observable.combineLatest(b$, a$, (a,b)=>a+b);

c$.subscribe(c => console.log(c));

With Rxjs v4, in both cases, the results were 11, 12.

I'd much prefer the rxjs v4 behaviour by default, as you would expect combineLatest by default to be commutative in its arguments. Then specific behaviour with specific schedulers.

For what is worth, Bacon.js has the same behavior as rxjs v5: http://jsfiddle.net/wp3xp50e/

Was this page helpful?
0 / 5 - 0 ratings

Related issues

samherrmann picture samherrmann  路  3Comments

benlesh picture benlesh  路  3Comments

jakovljevic-mladen picture jakovljevic-mladen  路  3Comments

matthewwithanm picture matthewwithanm  路  4Comments

chalin picture chalin  路  4Comments