Rxjs: Create Observable from Promise function

Created on 18 Apr 2018  路  9Comments  路  Source: ReactiveX/rxjs

It is a very common pattern -- especially when interoperating with libraries that use promises -- to depend on some Promise/async functionality to create an Observable. For example, you might need to get some value from a storage library that implements Promises:

const loadData = async () => {
  const token = await storage.get('token');
  return ajax({ url: '/data', headers: { token } });
}

loadData returns a promise, so you have a Promise that returns an Observable. If you want to use this with other Observables you will have to use fromPromise. You could also do something like:

// or `fromPromise` with an IIFE
const loadData = () => defer(async () => {
  const token = await storage.get('token');
  return ajax({ url: '/data', { headers: { token } }));
}).pipe(switchAll());

// or ... but if you have a lot of async data to pass through it becomes uglier
const loadData = () => fromPromise(storage.get('token')).pipe(
  switchMap(token => ajax({ url: '/data', { headers: { token } }))
);

Since this is such a common pattern, I think it would make sense to have an Observable creation function that creates an Observable returned from a Promise. I think that using defer would be cool, but that would also probably overload a lot of defer's functionality and may not be possible with the current implementation, so the best name I can come up with is deferAsync.

const loadData = () => deferAsync(async () => {
  const token = await storage.get('token');
  const token2 = await storage.getFromToken('token2', token);
  const token3 = await storage.getFromToken('token3', token2);

  return ajax({ url: '/data', { headers: { token3 } }));
});

This would effectively return the inner ajax Observable once the promise resolved. It doesn't necessarily have to function like defer where it returns a new Observable on each subscription, but I'm not sure how easy / hard it would be to do either way and deferAsync probably isn't the best name for it.

I think that the existing solutions to this problem as stated in this issue are okay ... but I think that this additional method can simplify some of the interop between typical Promise-based workflows and libraries and ease the transition to using Observable patterns for some developers while keeping things a bit cleaner.

Most helpful comment

Our general recommendation is if implementation is trivial / and doesn't require modification in core, we do recommend for user-land first. We want to gather usecases around how much common it is before we include into core, since adding new api surface is easy but deprecating / changing behavior is exceptionally hard, as well as we don't want to core api surface it too broad (it is alreay, and we are trying to shrink it down).

All 9 comments

Using your first solution, a user-land deferAsync function is simple to implement:

const deferAsync = <T>(factory: () => Promise<Observable<T>>) => defer(factory).pipe(mergeAll());

mergeAll can be used instead of switchAll, as the promise will resolve only once, so there'll never be a need to switch.

I agree that this could be relatively easy to implement in user land, but since it鈥檚 such a common pattern (I think) it would make sense to be a core method. Otherwise this creates the need for an additional and possibly inconsistent addition of a local or third party dependency.

Can you explain more about using merge vs. switch? Does it make a difference in this case?

@cartant this may not be the best forum, but I was just looking at an answer you had on a question of mine on SO. Small reactive world.

Our general recommendation is if implementation is trivial / and doesn't require modification in core, we do recommend for user-land first. We want to gather usecases around how much common it is before we include into core, since adding new api surface is easy but deprecating / changing behavior is exceptionally hard, as well as we don't want to core api surface it too broad (it is alreay, and we are trying to shrink it down).

@ajcrites Using switchAll instead of mergeAll won't make any difference. It's just that it's overkill, as the promise will only resolve once and only one observable will be emitted, so there will never be any need to switch - that is, there will be no need to unsubscribe from a previously emitted observable to subscribe to a subsequently emitted observable because there'll only be one emitted observable.

@ajcrites Regarding SO, if there's anything you want to discuss, just DM me on Twitter.

@cartant thanks that makes sense. So are you saying that using mergeAll is not overkill in this instance and is the preferred higher order observable function?

@ajcrites Because of the time dimension, there are multiple flattening strategies for observables:

  • With mergeMap (which has flatMap as an alias), received observables are subscribed to concurrently and their emitted values are flattened into the output stream.
  • With concatMap, received observables are queued and are subscribed to one after the other, as each completes. (concatMap is mergeMap with a concurrency of one.)
  • With switchMap, when an observable is received it's subscribed to and any subscription to a previously received observable is unsubscribed.
  • With exhaustMap, when an observable is received it's subscribed to unless there is a subscription to a previously received observable and that observable has not yet completed - in which case the received observable is ignored.

The concatAll, exhaust, mergeAll and switchAll operators for higher-order observables are implemented using the *Map operators.

Basically, I view mergeMap as being the most fundamental of the flattening operators and unless there's a compelling reason to use one of the others, I go with mergeMap.

In this case, the promise is only going to resolve once and there is only going to be a single observable emitted from the higher-order observable. So concatAll, exhaust and switchAll are redundant. The effected behaviour will be identical to that with mergeAll, they just involve functionality that will never be required in this instance.

I agree with @kwonoj and @cartant that this should be a user-land implementation first. If it does, indeed become a popular approach, we can move it into the core library.

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

Was this page helpful?
0 / 5 - 0 ratings