RxJS version: 5.0.0-beta5
Code to reproduce:
const a = Observable.of(1, 2, 3, 4).share();
a
.buffer(a.filter(x => x % 2 === 0))
.subscribe(x => console.log(x));
Expected behavior:
[1, 2]
[3, 4]
[]
Actual behavior:
[]
[]
Additional information:
The "expected behavior" is what Rx 4 does, but maybe there's a more correct way to do this?
The default scheduler in RxJS 5 is the null or recursive one, this is the breaking change with most impact when compared to RxJS 4 where the scheduler is trampoline by default. I think if you would change the source Observable from of to interval it would work as expected.
Buffer first subscribes to the closing notifier a.filter, consuming all its synchronous emissions, then it subscribes to the given source a, which happens to emit nothing because everything was emitted already.
If you do this
const a = Rx.Observable.of(1, 2, 3, 4)
.subscribeOn(Rx.Scheduler.async)
.share();
a
.buffer(a.filter(x => x % 2 === 0))
.subscribe(x => console.log(x));
You'll notice the output
[1]
[2, 3]
Which makes sense if you see that a.filter is subscribed to before a.buffer is. When 2 is emitted, the current buffer with [1] is closed, then 2 is inserted in the buffer.
You can do it with scan though:
Rx = Rx.KitchenSink;
const a = Rx.Observable.of(1, 2, 3, 4)
.scan(({ready, arr}, i) => {
if (i % 2 === 0) {
return {ready: true, arr: arr.concat(i)};
} else if (ready) {
return {ready: false, arr: [i]};
} else {
return {ready: false, arr: arr.concat(i)}
}
}, {ready: false, arr: []})
.filter(acc => acc.ready)
.map(acc => acc.arr)
.subscribe(x => console.log(x));
[1, 2]
[3, 4]
@staltz I would argue that this is a breaking change for no good reason and is in fact a bug
@trxcllnt ?
a breaking change for no good reason
There there were several really good reasons for this breaking change.
Other things:
of and from, per the es-observable spec are supposed to be scheduled on the next micro task. We're not currently doing that, but we should be. That would make this problem go away by itself.
Also, the sample code is really an illustration of an anti-pattern in Rx: shared synchronous observables with share() or refCount(). That's bad juju and a prime use case for publish() and connect().
@trxcllnt I disagree
js
Rx.Observable.of(1,2,3).subscribe(x => {
Rx.Scheduler.queue.schedule(42, (scheduler, state) => scheduler.schedule(null, () => console.log(state))
})
scheduleRecursive in RxJS v4 and before.of and from are not the root causes here, because if I did create with a sync and using the right recursive scheduler, then I'd have a problem here.
Setting aside the sync issue for a sec, the results are different for async too:
const a = Observable.interval(1000).take(5).share();
a
.buffer(a.filter(x => x % 2 === 0))
.subscribe(x => console.log(x));
4:
[ 0 ]
[ 1, 2 ]
[ 3, 4 ]
[]
5:
[]
[ 0, 1 ]
[ 2, 3 ]
In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa. What is correct? Is any reason to subscribe to notifier first?
In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa.
This is probably a source of the real issue here.
@mattpodwysocki I whole-heartedly agree with you. Now that we've got the operator-subscribes-to-the-source PR merged in, we can fix the real issue, which is:
In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa.
@trxcllnt so you're saying that you agree with @mattpodwysocki's assertion that we should be trampoline scheduling by default? (if I'm understanding his assertions correctly). Because I'm against that. It would be counter to many of the stated goals of this project
@blesh oh no, just that this is a bug. the recursive vs. trampoline scheduling was settled a long time ago by RxJava.
Yeah, I think we all agree this is a bug.
Leaving this open, as we'll have this change in the next major version (as it's breaking for some users)
Just got tripped up by this. Three questions:
1) Until this is released, is there a better work-around than using a Subject to cause an immediate subscription for the source? i.e.:
source$.publish(stream$ => stream$.buffer(stream$.throttle(100)));
becomes
source$.publish(stream$ => {
const sub$ = new Subject();
stream$.subscribe(sub$);
return sub$.buffer(stream$.throttle(100));
});
2) Should this also be applied to other operators with notifiers, such as bufferToggle(openings), delayWhen(subscriptionDelay), distinct(flushes), sample(notifier), skipUntil(notifier), takeUntil(notifier), window(windowBoundaries), and windowToggle(openings).
3) Should this by extension also be applied to all operators that take observables as arguments, for example even combineLatest, onErrorResumeNext, and sequenceEqual.
I wonder however what the expected behaviour is of the notifier in some of these other cases given that it may be intended to be inclusive or exclusive (consider skipUntil with a filter for example). In light of this should there be a way to change the subscription order in the operator, rather than forcing the caller to use a Subject to reverse the order from the default?
Note that merge and zip already have the correct order because they just prepends their source to an array with their arguments and subscribes them in order.
Also, things like sequenceEqual technically don't care about the order unless there is an error, in which case we will get the error from compareTo before the source. This is probably has no chance of biting anyone ever, but it's still an inconsistency.
Related #5654
Most helpful comment
@blesh oh no, just that this is a bug. the recursive vs. trampoline scheduling was settled a long time ago by RxJava.