Rxjs: distinctUntilChanged seems to be comparing same value when used with combineLatest

Created on 5 May 2016  路  12Comments  路  Source: ReactiveX/rxjs

RxJS version:5.0.0-beta.7

Code to reproduce:

const test$ = Rx.Observable.from([1,2,3]);

Rx.Observable.combineLatest(test$)
    .do(x=>console.log('combineLatest:', x))
    .distinctUntilChanged((x,y)=>{console.log('Compare: ', x, y, x === y); return x === y})
    .subscribe(x=>console.log('Output: ', x));

Expected behavior:

combineLatest: [1]
Output:  [1]
combineLatest: [2]
Compare:  [1] [2] false
Output:  [2]
combineLatest: [3]
Compare:  [2] [3] false
Output:  [3]

Actual behavior:

combineLatest: [1]
Output:  [1]
combineLatest: [2]
Compare:  [2] [2] true
combineLatest: [3]
Compare:  [3] [3] true
bug

Most helpful comment

I believe this is because combineLatest doesn't copy its values Array when it emits.

We should probably be copying that array when it emits. It's a perf hit, but I think it's a bug that it doesn't, because it's a footgun.

All 12 comments

I believe this is because combineLatest doesn't copy its values Array when it emits. This can be worked around by providing a selector function:

Observable.combineLatest(...sources, (...values) => values)

I'm also experiencing this behavior when using distinctUntilChanged after debounceTime. Is there a workaround?

Also, for the original issue, is that the expected behavior? (it's not a bug?)

@abijr Did you try @trxcllnt suggested workaround?

It seems that combineLatest reuses the same array reference for produced values. For performance, this is ideal to avoid creating new arrays, but it has the side affect of causing distinctUntilChanged to break since it's held reference will always be the same as the latest.

Perhaps a possible long term fix is to have distinctUntilChanged do a deep copy for its cached object to check against future values.

I'm not using combineLatest, I'm using debounceTime, and it has a different type signature. I can't use @trxcllnt's workaround (well, I'm almost sure I can't). But, using filter instead of distinctUntilChanged solved my problem.

@abijr are you sure you can't use my workaround? another way of coding it (without ES6) is:

sourceA.combineLatest(sourceB, function() {
  return Array.prototype.slice.call(arguments);
})
.distinctUntilChanged(...);

Under the covers I can understand why you would want to avoid allocating a new array each call.

From the view point of a consumer though, it's confusing, because that value in the array could have changed out from under you later on down subscriber chain (and you wouldn't know it!).

It seems like an odd side-effect to have in as aren't most operators supposed to be immutable?
Does anyone know what the price would be to allocate a new array on the way out of the operator? (.concat())

@trxcllnt, here's how I'm currently using (trying to use) distinctUntilChanged. As I've mentioned before, I'm not using combineLatest at all. Here is the code that isn't working for me:

this.changes$
  .debounceTime(SAVE_DELAY)
  .distinctUntilChanged(...) // this doesn't work, I swapped this line with filter
  .subscribe(draft => {
    save(draft)
  })

Here's my workaround:

this.changes$
  .debounceTime(SAVE_DELAY)
  .filter(isDifferent()) // continue only if different to last save
  .subscribe(draft => {
    save(draft)
  })


function isDifferent() {
  let tmp: Draft = {...}; // 
  return (draft: Draft) => {/* return true if draft and tmp are different */}
}

I hope this clarifies my view/experience on the original issue.

I believe this is because combineLatest doesn't copy its values Array when it emits.

We should probably be copying that array when it emits. It's a perf hit, but I think it's a bug that it doesn't, because it's a footgun.

We should probably be copying that array when it emits. It's a perf hit, but I think it's a bug that it doesn't, because it's a footgun.

An option flag/argument, or documention about the solution would be okay too IMO.

No additional operators are required to illustrate the incorrect and thus surprising behaviour of combineLatest.
This code:

var as = new Rx.Subject();
var bs = new Rx.Subject();

// Default projection
Rx.Observable.combineLatest(as, bs)
    .observeOn(Rx.Scheduler.asap)
    .subscribe(function(c) { console.log('default projection [' + c[0] + ', ' + c[1] + ']') });

// Custom projection
Rx.Observable.combineLatest(as, bs, function(a, b) { return [a, b] })
    .observeOn(Rx.Scheduler.asap)
    .subscribe(function(c) { console.log(' custom projection [' + c[0] + ', ' + c[1] + ']') });

as.next(0);
bs.next('x');
as.next(1);
bs.next('y');
as.next(2);

Produces the output:

default projection [2, y]
 custom projection [0, x]

default projection [2, y]
 custom projection [1, x]

default projection [2, y]
 custom projection [1, y]

default projection [2, y]
 custom projection [2, y]

The lack of array copy within the operator implementation causes incorrect asynchronous results for the default projection.

To fix this:

  • for the case where a custom projection function is provided, _no array copy is required_,
  • for the case of the default projection function, array copy is required to produce correct results.

Thus, only the default projection code path will be affected.

If one values (GC) performance above and beyond "correct" behaviour, one could supply a suitably broken projection function. i.e. a projection function that returns a cached array/object.

I believe this issue is resolved via #1841

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

benlesh picture benlesh  路  3Comments

benlesh picture benlesh  路  3Comments

Agraphie picture Agraphie  路  3Comments

dooreelko picture dooreelko  路  3Comments

cartant picture cartant  路  3Comments