**RxJS version: 5.0.0-beta.6
**Code to reproduce:
http://codepen.io/anon/pen/zBrPzj?editors=1111
Expected behavior:
Subcribe should only be called once when using the firstoperator.
Actual behavior:
It doesn't complete the subscription.
Additional information
Replacing first with take(1) works as expected.
To add some detail:
The call to the BehaviorSubject.next inside of the subscribe method seems to be the issue. Since first() calls complete after emitting the value in the _emitValue function, calling next on the behavior subject synchronously will cause an infinite loop. This doesn't occur with take() since the recursive call will complete when the number of emitted items is greater than the count. If you make the "next" call async, then the first operator behaves correctly.
.subscribe((x) => {
console.log("I have been called " + x);
Rx.Observable.timer(1).map(_ => {
console.log("hello");
store$.next({a: x[0] + 1, b: x[1] + 1});
}).subscribe();
}
It's worth nothing that you are not allowed to call store$.next concurrent. This is one of the serialization properties of Rx, callers need to ensure they implement a proper locking mechanism, so that downstream operators do not have to run extensive overhead to protect them self against multiple concurrent calls.
@onlyann yup, that's a bug.
It appears to be an issue with the synchronous nexting inside of the observation. Here's a more minimal example showing the bug:
var store$ = new Rx.Subject();
store$.first().subscribe((x) => {
console.log("I have been called " + x);
store$.next({a: x[0] + 1, b: x[1] + 1});
});
store$.next({ a: 1, b: 2 })
But if you use a setTimeout it works fine:
var store$ = new Rx.Subject();
store$.first().subscribe((x) => {
console.log("I have been called " + x);
setTimeout(() => store$.next({a: x[0] + 1, b: x[1] + 1}));
});
store$.next({ a: 1, b: 2 })
Is it expected to allow to source emitting value in subscription in case of synchronous observable instead of should be scheduled differently?
(just fyi - this does occur same on v4)
.take(1) also does not complete when called with synchronous nexting. It behaves as expected in the example, but doesn't complete:
http://codepen.io/anon/pen/GqNjra?editors=0011
Since it doesn't complete, it holds on to the observers and can cause memory leaks:
http://codepen.io/anon/pen/EyNyGW?editors=0011
In both cases, wrapping the next call in a setTimeout fixes the completion issue, but you lose the synchronicity that might have been the original intent. Also, calling .unsubscribe() explicitly fixes the problem of holding on to the observers, but they still don't "complete".
var store$ = new Rx.Subject();
store$.take(1).subscribe((x) => {
console.log("I have been called " + x.a);
store$.next({a: x[0] + 1, b: x[1] + 1});
}, () => {}, () => console.log("completed"));
store$.next({ a: 1, b: 2 })
// "I have been called 1"
var store$ = new Rx.Subject();
store$.take(1).subscribe((x) => {
console.log("I have been called " + x.a);
setTimeout(() => store$.next({a: x[0] + 1, b: x[1] + 1}));
}, () => {}, () => console.log("completed"));
store$.next({ a: 1, b: 2 })
// "I have been called 1"
// "completed"
Protecting against re-entrency without throwing errors or dropping values would necessitate introducing unbounded buffers, which was a conscious design trade-off we made in the early days.
In this case, the take operator forwards on to its Subscriber, which immediately feeds another event into the head, and the stack never unwinds to allow the take operator to send the completion message (like a snake eating it's own tail).
This isn't a bug in take, since modifying it to complete before next'ing would mean the subscriber never hears the next event, and modifying it to unsubscribe before next'ing would dispose subscriptions in the wrong order.
As demonstrated, introducing an asynchronous boundary between emission and consumption (or vice-versa) is the best alternative here, though it doesn't have to be async across tick boundaries; async on the asap scheduler (which still blocks the current event loop) does the trick:
console.clear();
setTimeout(() => console.log('setTimeout flushed'));
var store$ = new Rx.Subject();
store$.take(1)
.observeOn(Rx.Scheduler.asap)
.subscribe((x) => {
console.log("I have been called " + x.a);
store$.next({a: x[0] + 1, b: x[1] + 1});
}, () => {}, () => console.log("completed"));
store$.next({ a: 1, b: 2 });
// "I have been called 1"
// "completed"
Rx.Scheduler.asap.schedule(() => console.log('asap flushed'));
// "asap flushed"
// "setTimeout flushed"
@trxcllnt is correct. It's the reentrancy protection that is causing this effect. I think we can close this issue for now, however, it might be worth discussing the merits of the reentrancy protections in Subjects, as I know that we've discussed it in the past, but it can't hurt to revisit.
@blesh & @trxcllnt can you take a look at #1993 ?
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.
Most helpful comment
.take(1)also does not complete when called with synchronousnexting. It behaves as expected in the example, but doesn't complete:http://codepen.io/anon/pen/GqNjra?editors=0011
Since it doesn't complete, it holds on to the observers and can cause memory leaks:
http://codepen.io/anon/pen/EyNyGW?editors=0011
In both cases, wrapping the
nextcall in asetTimeoutfixes the completion issue, but you lose the synchronicity that might have been the original intent. Also, calling.unsubscribe()explicitly fixes the problem of holding on to the observers, but they still don't "complete".