Rxjs: [FeatrueRequest] new switch operator, which different from switchAll

Created on 23 May 2018  路  4Comments  路  Source: ReactiveX/rxjs

currently we have switchAll operator which implemented by _switchMap_ with the project function as x=>x.
the switchAll operator, would switch to the next observable directly evenif the observable is not emit any value:
observable A: -----a0----a1-----a2-----
observable B:-----------------b0-----b1-----b2-----
switchAll operator
output:-------------a0--------b0-----b1-----b2-----

Note: a1 is not output.
observable A: -----a0----a1-----a2-----
observable B:-----------------b0-----b1-----b2-----
switch operator
output:-------------a0----a1--b0-----b1-----b2-----

Note: a1 is outputed

the difference is switch operator is switched when new observable emit item, while switchAll operator switched when new observable available.

maybe the operator name should be switchIfEmit,

Most helpful comment

@benlesh

Here Is My Implementation:

/**
 * observable A: -----a0----a1-----a2-----
 * observable B:          -----b0-----b1-----b2-----
 * switchAll operator
 * output:       -----a0-------b0-----b1-----b2-----
 *
 * Note: a1 is not output.
 * observable A: -----a0----a1-----a2-----
 * observable B:          -----b0-----b1-----b2-----
 * switchIfEmit operator
 * output:       -----a0----a1-b0-----b1-----b2-----
 *
 * Note: a1 is outputed
 *
 * the difference is switchIfEmit operator is switched when new observable emit item,
 * while switchAll operator switched when new observable available.
 *
 * Semantics:
 * 1. it will wait all the inner observables, any one that emit a value will be switched to
 *    the outer observable output.
 * 2. if another inner observable emmit a value, then the outer would switch to it. the
 *    original inner observable would be unsubscribed.
 * 3. if the source observable(emmit observable items) completes/has error, it will continue waiting
 *    for the inner observables that already emited by the source.
 * 4. if the outer complete, all the active(emiting or waiting to emit) inner observables
 *    would be unsubscribed.
 * 5. if one inner observable completes, it will continue waiting for other inner observables.
 * 6. any inner observable error would be tramsmit to the ourter observable.
 * 7. if the outer observable is unsubscribed, the inner active observables and the upper
 *    source observable would be unsubscribed.
 */
export const switchIfEmit = () => <T>(source: Observable<Observable<T>>) =>
  new Observable<T>(observer => {
    const subscription = new Subscription();
    let lastInnerObservable: Observable<T>;
    const sourceSubscription = source.subscribe(
      innerObservable => {
        const innerSubscription = innerObservable.subscribe(item => {
          if (lastInnerObservable && lastInnerObservable !== innerObservable) {
            const sourceSubscription = (<any>lastInnerObservable).subscription;
            subscription.remove(sourceSubscription);
            sourceSubscription.unsubscribe();
            (<any>lastInnerObservable).subscription = undefined;
          }
          lastInnerObservable = innerObservable;
          observer.next(item);
        });
        (<any>innerObservable).subscription = innerSubscription;
        subscription.add(innerSubscription);
      },
      err => observer.error(err)
    );
    subscription.add(sourceSubscription);
    return subscription;
  });

All 4 comments

Interesting. So, you're saying that this operator would:

  1. Subscribe to the first observable
  2. Subscribe to every incoming observable, BUT only switch to it if it outputs a value

I'm not sure what the use cases would be here.

For now: I'd suggest implementing this operator in your own library, since it's just a function. If the need comes up frequently in the community, or people file a lot of issues looking to add this operator to the core library, we'll add it to the core library.

You'll have to figure out a lot of semantics about this though, like:

  1. What happens when more than one inner observable arrive, but neither emit values? wait for both, or just the most recent one?
  2. What happens when the outer completes, and the inner completes, but there's another waiting to emit a value?
  3. What happens if the second inner never emits a value, but instead errors while you're waiting for it's first value?

... and so many more questions.

@benlesh

Here Is My Implementation:

/**
 * observable A: -----a0----a1-----a2-----
 * observable B:          -----b0-----b1-----b2-----
 * switchAll operator
 * output:       -----a0-------b0-----b1-----b2-----
 *
 * Note: a1 is not output.
 * observable A: -----a0----a1-----a2-----
 * observable B:          -----b0-----b1-----b2-----
 * switchIfEmit operator
 * output:       -----a0----a1-b0-----b1-----b2-----
 *
 * Note: a1 is outputed
 *
 * the difference is switchIfEmit operator is switched when new observable emit item,
 * while switchAll operator switched when new observable available.
 *
 * Semantics:
 * 1. it will wait all the inner observables, any one that emit a value will be switched to
 *    the outer observable output.
 * 2. if another inner observable emmit a value, then the outer would switch to it. the
 *    original inner observable would be unsubscribed.
 * 3. if the source observable(emmit observable items) completes/has error, it will continue waiting
 *    for the inner observables that already emited by the source.
 * 4. if the outer complete, all the active(emiting or waiting to emit) inner observables
 *    would be unsubscribed.
 * 5. if one inner observable completes, it will continue waiting for other inner observables.
 * 6. any inner observable error would be tramsmit to the ourter observable.
 * 7. if the outer observable is unsubscribed, the inner active observables and the upper
 *    source observable would be unsubscribed.
 */
export const switchIfEmit = () => <T>(source: Observable<Observable<T>>) =>
  new Observable<T>(observer => {
    const subscription = new Subscription();
    let lastInnerObservable: Observable<T>;
    const sourceSubscription = source.subscribe(
      innerObservable => {
        const innerSubscription = innerObservable.subscribe(item => {
          if (lastInnerObservable && lastInnerObservable !== innerObservable) {
            const sourceSubscription = (<any>lastInnerObservable).subscription;
            subscription.remove(sourceSubscription);
            sourceSubscription.unsubscribe();
            (<any>lastInnerObservable).subscription = undefined;
          }
          lastInnerObservable = innerObservable;
          observer.next(item);
        });
        (<any>innerObservable).subscription = innerSubscription;
        subscription.add(innerSubscription);
      },
      err => observer.error(err)
    );
    subscription.add(sourceSubscription);
    return subscription;
  });

Cool! I hope you open source it and others find it useful! It's an interesting operator, I hadn't thought of it. If a lot of people start using this, we'll talk about adding it to core. :)

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jakovljevic-mladen picture jakovljevic-mladen  路  3Comments

marcusradell picture marcusradell  路  4Comments

benlesh picture benlesh  路  3Comments

cartant picture cartant  路  3Comments

dooreelko picture dooreelko  路  3Comments