Rxjs: Early unsubscription event in 6.3.x

Created on 31 Aug 2018  路  6Comments  路  Source: ReactiveX/rxjs

Bug Report

Current Behavior
Unsubscribe method calls early

6.3.1 order

  1. tap 1
  2. unsubscribe
  3. value 1
  4. sub complete

Reproduction

import {from, Observable} from "rxjs";
import {tap,mergeMap} from "rxjs/operators";

new Observable<number>((observer)=>{
  observer.next(1);
  observer.complete();
  return ()=>{
    console.log("unsubscribe");
  }
})
.pipe(
  tap((a)=>{console.log("tap",a)}),
  mergeMap((a)=>from(new Promise<number>((resolve)=>{
    setTimeout(()=>resolve(a),3000);
  })))
)
.subscribe((a)=>{console.log("value",a)},()=>{},()=>{
  console.log("sub complete")
})

Expected behavior
6.2.2 order

  1. tap 1
  2. value 1
  3. sub complete
  4. unsubscribe

All 6 comments

This is the expected behavior. Teardown is supposed to occur synchronously after completion. The 6.2 behavior was a leak.

Strange. It works from 5 version without changes. The first version was break is 6.3

How can I detect that pipeline is unsubscribed before mergeMap resolve? complete event not happen

Strange. It works from 5 version without changes. The first version was break is 6.3

Version 5 had the leak too.

Basically, the guarantee Rx is _supposed_ to provide is that resources are torn down as soon as possible. The problem was that RxJS 5-6.2 was holding onto source subscriptions for _some_ operators.

Consider the following (albeit contrived) example:

import { Observable, timer } from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';

const source$ = new Observable(subscriber => {
  let i = 0;
  const id = setInterval(() => {
    console.log('tick');
    subscriber.next(i++);
    subscriber.complete();
  }, 200);

  return () => {
    console.log('clean up');
    clearInterval(id);
  }
});


source$.pipe(
  mergeMap(() => {
    return timer(1000);
  }),
)
.subscribe(x => console.log(x));

The setInterval is what we'll call an "expensive resource"... In 6.2 and under, you'll see it tick until that 1 second timer returned by mergeMap completes, then it will clean up. In 6.3, it will clean up that interval immediately after the first tick when subscriber.complete is called.

Now just imagine this with something like a web socket, and you only want one value from it, now you're keeping a socket open consuming resources on both the client and the server longer than necessary. And if that one value from the websocket happens to be flattened into an endless stream like an interval or the like? That socket stays open forever. Even though you only wanted one value.

I hope this is an adequate explanation.

How can I detect that pipeline is unsubscribed before mergeMap resolve? complete event not happen

To detect error, complete or clean up:

  • error:
source$.pipe(
  catchError(err => { /* handle and return new observable here */ })
)

// or 
source$.pipe(
  tap({ error(err) { /* detected error, side effect here */ } }),
)
  • complete
source$.pipe(
  tap({ complete() { /* completion detected, side effect */ }),
)
  • teardown (which happens on error, complete and unsubscribe)
source$.pipe(
  finalize(() => { /* observable is done */ })
)

Sincere apologies for any trouble this caused. I think this may fall under a "one person's bug is another person's feature". Hopefully my explanation above illustrates why this was a bug though.

Was this page helpful?
0 / 5 - 0 ratings