Rxjs: Different behavior with Rx 4 and buffer

Created on 12 Apr 2016  路  16Comments  路  Source: ReactiveX/rxjs

RxJS version: 5.0.0-beta5

Code to reproduce:

const a = Observable.of(1, 2, 3, 4).share();
a
  .buffer(a.filter(x => x % 2 === 0))
  .subscribe(x => console.log(x));

Expected behavior:

[1, 2]
[3, 4]
[]

Actual behavior:

[]
[]

Additional information:

The "expected behavior" is what Rx 4 does, but maybe there's a more correct way to do this?

bug

Most helpful comment

@blesh oh no, just that this is a bug. the recursive vs. trampoline scheduling was settled a long time ago by RxJava.

All 16 comments

The default scheduler in RxJS 5 is the null or recursive one, this is the breaking change with most impact when compared to RxJS 4 where the scheduler is trampoline by default. I think if you would change the source Observable from of to interval it would work as expected.

Buffer first subscribes to the closing notifier a.filter, consuming all its synchronous emissions, then it subscribes to the given source a, which happens to emit nothing because everything was emitted already.

If you do this

const a = Rx.Observable.of(1, 2, 3, 4)
  .subscribeOn(Rx.Scheduler.async)
  .share();
a
  .buffer(a.filter(x => x % 2 === 0))
  .subscribe(x => console.log(x));

You'll notice the output

[1]
[2, 3]

Which makes sense if you see that a.filter is subscribed to before a.buffer is. When 2 is emitted, the current buffer with [1] is closed, then 2 is inserted in the buffer.

You can do it with scan though:

Rx = Rx.KitchenSink;
const a = Rx.Observable.of(1, 2, 3, 4)
  .scan(({ready, arr}, i) => {
    if (i % 2 === 0) {
      return {ready: true, arr: arr.concat(i)};
    } else if (ready) {
      return {ready: false, arr: [i]};
    } else {
      return {ready: false, arr: arr.concat(i)}
    }
  }, {ready: false, arr: []})
  .filter(acc => acc.ready)
  .map(acc => acc.arr)
  .subscribe(x => console.log(x));
[1, 2]
[3, 4]

@staltz I would argue that this is a breaking change for no good reason and is in fact a bug

@trxcllnt ?

a breaking change for no good reason

There there were several really good reasons for this breaking change.

  1. 95% of observables are async, so default scheduling is unnecessary and this issue is outlining an edge-case.
  2. It's extremely rare that scheduling is needed to avoid a stack overflow, so default trampoline scheduling is unnecessary.
  3. recursive scheduling is much, much faster than trampoline scheduling. Given 1 & 2 above, recursive was a better choice as the default to insure optimal performance.
  4. To make all scheduling trampoline by default would throw out a lot of the benefit of this rewrite.

Other things:

of and from, per the es-observable spec are supposed to be scheduled on the next micro task. We're not currently doing that, but we should be. That would make this problem go away by itself.

Also, the sample code is really an illustration of an anti-pattern in Rx: shared synchronous observables with share() or refCount(). That's bad juju and a prime use case for publish() and connect().

@trxcllnt I disagree

  1. In my many years of using RxJS since 2010, most of the sources from RxJS are NOT async at all, such as a range of numbers, some static data, etc
  2. The default scheduling with trampolining was two-fold, the first was to keep items in order for recursive scheduling (note not just regular recursive, but passing in a scheduler to schedule more items). The second was to avoid the stack overflow with really large sequences.
    js Rx.Observable.of(1,2,3).subscribe(x => { Rx.Scheduler.queue.schedule(42, (scheduler, state) => scheduler.schedule(null, () => console.log(state)) })
  3. By default using the queued scheduler, you do both, when you did a scheduleRecursive in RxJS v4 and before.
  4. I care more about correctness in this case in which the user's expectations once again were not met.

of and from are not the root causes here, because if I did create with a sync and using the right recursive scheduler, then I'd have a problem here.

Setting aside the sync issue for a sec, the results are different for async too:

const a = Observable.interval(1000).take(5).share();
a
  .buffer(a.filter(x => x % 2 === 0))
  .subscribe(x => console.log(x));

4:

[ 0 ]
[ 1, 2 ]
[ 3, 4 ]
[]

5:

[]
[ 0, 1 ]
[ 2, 3 ]

In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa. What is correct? Is any reason to subscribe to notifier first?

In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa.

This is probably a source of the real issue here.

@mattpodwysocki I whole-heartedly agree with you. Now that we've got the operator-subscribes-to-the-source PR merged in, we can fix the real issue, which is:

In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa.

@trxcllnt so you're saying that you agree with @mattpodwysocki's assertion that we should be trampoline scheduling by default? (if I'm understanding his assertions correctly). Because I'm against that. It would be counter to many of the stated goals of this project

@blesh oh no, just that this is a bug. the recursive vs. trampoline scheduling was settled a long time ago by RxJava.

Yeah, I think we all agree this is a bug.

Leaving this open, as we'll have this change in the next major version (as it's breaking for some users)

Just got tripped up by this. Three questions:

1) Until this is released, is there a better work-around than using a Subject to cause an immediate subscription for the source? i.e.:

source$.publish(stream$ => stream$.buffer(stream$.throttle(100)));

becomes

source$.publish(stream$ => {
    const sub$ = new Subject();
    stream$.subscribe(sub$);
    return sub$.buffer(stream$.throttle(100));
});

2) Should this also be applied to other operators with notifiers, such as bufferToggle(openings), delayWhen(subscriptionDelay), distinct(flushes), sample(notifier), skipUntil(notifier), takeUntil(notifier), window(windowBoundaries), and windowToggle(openings).
3) Should this by extension also be applied to all operators that take observables as arguments, for example even combineLatest, onErrorResumeNext, and sequenceEqual.

I wonder however what the expected behaviour is of the notifier in some of these other cases given that it may be intended to be inclusive or exclusive (consider skipUntil with a filter for example). In light of this should there be a way to change the subscription order in the operator, rather than forcing the caller to use a Subject to reverse the order from the default?

Note that merge and zip already have the correct order because they just prepends their source to an array with their arguments and subscribes them in order.

Also, things like sequenceEqual technically don't care about the order unless there is an error, in which case we will get the error from compareTo before the source. This is probably has no chance of biting anyone ever, but it's still an inconsistency.

Related #5654

Was this page helpful?
0 / 5 - 0 ratings