Current Behavior
Unsubscribe method calls early
6.3.1 order
Reproduction
import {from, Observable} from "rxjs";
import {tap,mergeMap} from "rxjs/operators";
new Observable<number>((observer)=>{
observer.next(1);
observer.complete();
return ()=>{
console.log("unsubscribe");
}
})
.pipe(
tap((a)=>{console.log("tap",a)}),
mergeMap((a)=>from(new Promise<number>((resolve)=>{
setTimeout(()=>resolve(a),3000);
})))
)
.subscribe((a)=>{console.log("value",a)},()=>{},()=>{
console.log("sub complete")
})
Expected behavior
6.2.2 order
This is the expected behavior. Teardown is supposed to occur synchronously after completion. The 6.2 behavior was a leak.
Strange. It works from 5 version without changes. The first version was break is 6.3
How can I detect that pipeline is unsubscribed before mergeMap resolve? complete event not happen
Strange. It works from 5 version without changes. The first version was break is 6.3
Version 5 had the leak too.
Basically, the guarantee Rx is _supposed_ to provide is that resources are torn down as soon as possible. The problem was that RxJS 5-6.2 was holding onto source subscriptions for _some_ operators.
Consider the following (albeit contrived) example:
import { Observable, timer } from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';
const source$ = new Observable(subscriber => {
let i = 0;
const id = setInterval(() => {
console.log('tick');
subscriber.next(i++);
subscriber.complete();
}, 200);
return () => {
console.log('clean up');
clearInterval(id);
}
});
source$.pipe(
mergeMap(() => {
return timer(1000);
}),
)
.subscribe(x => console.log(x));
The setInterval is what we'll call an "expensive resource"... In 6.2 and under, you'll see it tick until that 1 second timer returned by mergeMap completes, then it will clean up. In 6.3, it will clean up that interval immediately after the first tick when subscriber.complete is called.
Now just imagine this with something like a web socket, and you only want one value from it, now you're keeping a socket open consuming resources on both the client and the server longer than necessary. And if that one value from the websocket happens to be flattened into an endless stream like an interval or the like? That socket stays open forever. Even though you only wanted one value.
I hope this is an adequate explanation.
How can I detect that pipeline is unsubscribed before mergeMap resolve? complete event not happen
To detect error, complete or clean up:
source$.pipe(
catchError(err => { /* handle and return new observable here */ })
)
// or
source$.pipe(
tap({ error(err) { /* detected error, side effect here */ } }),
)
source$.pipe(
tap({ complete() { /* completion detected, side effect */ }),
)
source$.pipe(
finalize(() => { /* observable is done */ })
)
Sincere apologies for any trouble this caused. I think this may fall under a "one person's bug is another person's feature". Hopefully my explanation above illustrates why this was a bug though.