Rxjs: Using AbortSignal and AbortController

Created on 16 Nov 2017  路  39Comments  路  Source: ReactiveX/rxjs

The standards proposals are leaning towards using AbortSignal and AbortController (ala fetch cancellation) for cancellation with Observables. I generally find this favorable for a few reasons and I think I see a migration path for us.

Proposal

  1. Observable subscribe accepts Observer and an optional AbortSignal.
  2. Observable forEach accepts an optional AbortSignal.
  3. The subscriber function for new Observable(subscriber) gives both a safe Observer and an AbortSignal, but is expected to return void.
  4. The Observer passed to subscribe will have an optional abort handler that will get the AbortError if the observable is aborted.
  5. If an abort is triggered on a subscription that happened with forEach, the AbortError would go down the rejection path just like it does with fetch. Since the returned promise is a guarantee to a future where the producer either errors or completes, it makes sense to send the AbortError down the rejection path in this case.

Advantages

  • Synchronous observables cannot firehose and lock up an app, the cancellation semantic exists prioer to subscription.
  • If multiple data producers are set up in a subscriber function (in the Observable constructor), you can add them to the abort signal one at a time, guaranteeing they can be torn down if a subsequent step fails. This wasn't possible before without some serious hackery.
  • Observables all the way down. Since AbortSignal is an EventTarget, and EventTarget may soon have an on method that returns an observable. you could use abortSignal.on('abort') to compose teardown if you so choose.
  • We will now have the ability to be notified, uniquely, of cancellation (without having to do trickery to figure out if it was really an error or a completion)

Migration path

We'll start using AbortSignal internally throughout the library.

  1. Add an overload to subscribe that accepts an Observer and an AbortSignal. If AbortSignal is passed, it does not return a Subscription.
  2. Make Subscription into an AbortController

    • Add the ability to create an AbortSignal.

    • Alias unsubscribe as abort.

  3. Have the subscriber function passed to the Observable constructor receive an AbortSignal.
  4. Any function/Subscription returned from the subscriber function (from the constructor), will automatically be added to the AbortSignal.
  5. Remove the observer start method I just added. :) lol It's no longer necessary.
  6. Add the observer abort method that accepts AbortError.
  7. Ensure that aborts arriving at the tail-end of a forEach get sent as rejections of type AbortError to the returned Promise.
  8. BONUS: Have toPromise accept an abort signal
  9. Add deprecation messages for usages that don't align with the spec.
  10. Long term, I'd like to phase out the 3 handlers method: subscribe(fn, fn , fn), because object literals are more readable anyhow, an forEach can be used for the general case of just needing next. Think subscribe(null, (err) => doAThing(err)) vs subscribe({ error: doAThing })
    NOTE: The current TC39 proposal may not reflect this yet

cc/ @jhusain @trxcllnt @kwonoj @mattpodwysocki

discussion

Most helpful comment

It's a bit more nuanced:

  • You need to handle an AbortSignal that's already aborted, as that will not fire the abort event anymore
  • You need to remove the abort listener if the Observable completes, errors, or gets unsubscribed from, else it's gonna leak memory (could do this by using takeUntil(fromEvent(signal, 'abort').pipe(take(1))) instead of a Subject)

All 39 comments

Also cc @domenic who might have interest.

So before, unsubscribe simply made sure no handlers (next, error, complete) were called. With AbortSignal, does that mean that cancelling an Observable will call error with an AbortError?

@felixfbecker No, not at all. It doesn't change anything semantically for the Observable type, just the cancellation mechanism. In the short term, it would be an additional feature rather than a replacement.

I could see that as a confusion point because that is what the Promise returned by fetch does

I'm not sure we could do that, it would make error handling pretty much awful, as any unsubscription would be treated like an error. switchMap would be a hot mess.

In fetch it makes a little sense, because Promises are always multicast... however cancellation is not an error state. Seems like, if anything, a finally clause would need to be notified in a Promise, but that's a matter for debate elsewhere.

I just know with Observable it seems like a really bad idea.

After talking with @jhusain about this last night, I think we're going to add an abort signal to Observer.

  1. For a Promise (which, per the name, is a guaranteed future) it makes sense that aborting would cause a rejection with AbortError.
  2. For Observable, which is lazy and does not guarantee completion, aborting would not cause an error, rather it would cause another event. This is a litlte interesting, because @leebyron actually needed this abstraction so much Relay rolled their own Observable.
  3. For Observable forEach, which returns a promise, an abort would purcolate down the rejection path as an AbortError just like it does in fetch.

For Observable forEach, which returns a promise, an abort would purcolate down the rejection path as an AbortError just like it does in fetch.

I'm not so sure about this. Does this mean

document.body.on("mousemove").takeUntil(document.body.on("click")).forEach(e => { ... });

would result in a (here-unhandled) rejected promise?

It feels OK to me to use the AbortSignal mechanism to communicate cancelations without using "AbortError".

@domenic take, takeUntil, et al, result in a producer-fired completion rather than an abort. So no AbortError or abort notification would fire.

A trickier one would be switchMap, which actually aborts inner observables. For that one, the abort errors would need to be swallowed unless the parent aborts. It's either that or we might have to change it to complete those inner observables instead. :thinking:

Ah, right, in that case I don't have any concerns, especially since I've never used/don't understand switchMap :).

Well, OK, no, maybe I have concerns. It's weird that the equivalent of removeEventListener() would cause an "exception". This isn't exceptional.

@domenic: It feels OK to me to use the AbortSignal mechanism to communicate cancelations without using "AbortError".

Maybe I need to disambiguate this a little.

  • There's abortSignal ... the thing we're passing in as a cancellation token of sorts.
  • There's abort notifications, which come to a handler on an Observer objects passed to subscribe. This could recieve the AbortError object.
  • There's AbortError as sent to the rejected promise of forEach.

It's weird that the equivalent of removeEventListener() would cause an "exception". This isn't exceptional.

In the case of using Observable.prototype.subscribe, it would not send anything down the error path of the observer chain, rather it would notify down the abort path of the observer chain.

In the case of using Observable.prototype.forEach it would do the same thing it does with fetch, which is send an AbortError down the rejection path. (This is mostly to mirror what fetch is doing)

I don't think that is the right dimension along which to mirror fetch. In fetch, canceling is at least somewhat exceptional; it prevents a response from being received. Removing an event listener is not exceptional.

@domenic Interesting, so you're saying you would not reject the promise returned by forEach with AbortError? (Or presumably use AbortError at all?)

Yeah, I don't think I would reject the promise.

A function that accepts an AbortSignal can decide how to react to the news of the process being aborted. If it means premature, somewhat-exceptional termination, it can return a rejected promise. If it's expected termination, and this is just one signalling channel, it can return a fulfilled promise. It's up to the function in question. For forEach, it seems expected.

Now, if the observable was created via something like

const o = new Observable((observer, signal) => {
  signal.addEventListener("abort", () => {
    observer.error(new DOMException("AbortError", "..."));
  });
});

then forEach would reject. This might make sense for some observable where unsubscribing is somehow an error. But I don't think this should be the default behavior.

@domenic ... I see. So you're saying leave it to the Observable creator to decide whether or not to send the AbortSignal? That makes sense.

What about for the case of signal.on('abort')? :smile: There might be a few people that try this inside of their Observable:

const o = new Observable((observer, signal) => {
  signal.on('abort').forEach(() => {
    observer.error(new DOMException('AbortError', '')
  }, otherSignal);
});

... if otherSignal is aborted above, does the promise returned by forEach reject?

Assumine you mean o.forEach(), not signal.on('abort').forEach(), and assuming that otherSignal is aborted before signal is, then I think the answer is no. The () => { observer.error(...) } block never executes, because the forEach unsubscribed.

@domenic I meant in a world where EventTarget has an on method... what will it do? I suppose that's orthogonal to this particular issue. I'm just curious.

I think I'm totally with you that the Observable creator should be left to decide whether or not an abort signals an error.

Right, I don't think observables created by on() should be ones that treat unsubscription/aborting as an error.

@domenic Ah, right, in that case I don't have any concerns, especially since I've never used/don't understand switchMap :).

switchMap is like flatMap, except that instead of merging all inner Observables, it switches to the latest Observable (and if applicable, disposes the subscription of the current Observable). In pseudocode:

Observable.prototype.switchMap = function(sel: (x: any) => Observable) {
  let source = this;
  return new Observable((sink) => {
    let outerSubscription = new Subscription();
    let innerSubscription = new Subscription();
    outerSubscription.add(source.subscribe((x) => {
      innerSubscription.unsubscribe();
      outerSubscription.remove(innerSubscription);
      outerSubscription.add(innerSubscription = sel(x).subscribe({
        next(x) { sink.next(x); },
        error: sink.error.bind(sink), 
        complete: sink.complete.bind(sink)
      }));
    }, sink.error.bind(sink), sink.complete.bind(obs)));
    return outerSubscription;
  });
}

It's useful in cases you're mapping discrete events into asynchronous actions, and the arrival of a new event should cancel the current pending action, and switch to the latest asynchronous action:

tabNavigatorSelections
  .switchMap((selectedTab) => loadTabContent(selectedTab))
  .subscribe((selectedTabContent) => render(selectedTabContent));

mouseDownEvents
  .switchMap((down) => mouseMoveEvents.takeUntil(mouseUp))
  .subscribe((moveEvent) => doDrag(moveEvent))

I'm currently thinking that in a world where observable is using AbortSignal internally, we might want to provide an AbortSignal to flattening functions. Use case:

clicks.switchMap((e, i, signal) => fetch(url, { signal }))

:thinking:

Then again I'd worry a little bit that people would use it for silly things inside of that function.

FWIW people can do it like this:

switchMap(
  () =>
    new Observable(observer => {
      const abortController = new AbortController()
      fetch(url, { signal: abortController.signal })
        .then(res => {
          observer.next(res)
          observer.complete()
        })
        .catch(err => observer.error(err))
      return () => abortController.abort()
    })
)

@felixfbecker in a world where we were using AbortSignal as well, it would be easier than that, as we'd be providing a signal to the subscriber function.

switchMap(
  () =>
    new Observable((observer, signal) => {
      fetch(url, { signal })
        .then(res => {
          observer.next(res)
          observer.complete()
        })
        .catch(err => observer.error(err))
    })
)

I'm just trying to think of ways to make that more ergonomic. I'm tired of everyone maintaining an HTTP client.

After looking starting to implement a toy version of this, I think I've decided that it would not work out. The problem was clear after some discussion with @jhusain. The reasons it won't work are a bit nuanced:

  1. Since @domenic and @jhusain had agreed that aborting should cause a rejection of the promise returned by forEach, any abort would therefor result in that rejection (this is fine on it's own... but...)
  2. All teardown logic must be registered with the abortSignal passed to the subscriber function.
  3. All teardown logic must be executed not only on abort (unsubscription) but also on complete or error.
  4. Since the abortSignal is handed to us by a third party, that means that every subscription will have to create it's own AbortController, wrap it in a handler passed to the parent abortsignal, and send it's abort signal down the chain.
  5. This is done so that when complete or error are triggered, we can tear down by calling abort on the abort controller... BUT, because of 1 above, that means that we're going to reject the promise returned by forEach... Therefore all completions will cause UnhandledPromiseExceptions if not explicitly handled and using forEach.
  6. Perhaps slightly less bad are the semantic implications of this, that all completions require an abort, all errors require an abort, and of course unsubscription is really an abort.

The truth is that streams like Observables are made to be subscribed to and unsubscribed from, and it's not an error if you unsubscribe, it's expected. So while Subscription and AbortController are _nearly identical in shape_ they are not identical in use case or behavior, and cannot be used interchangeably for this type.

As of right now, I think I'll close this issue. :cry:

I'm still interested in reading anyone's thoughts, and I'm happy to reopen if necessary.

So, their behavior is completely up to us. They only reject forEach because we decided that. And we only decided that because @jhusain convinced me nobody would use forEach for subscription, essentially.

If we think people will use forEach for a subscription where they don't care about the completion value, we should not reject with AbortError.

@domenic I thought the nasty issue with not rejecting the promise returned by forEach is ambiguous behavior between "complete" and "unsubscribed" within an async function:

async function foo() {
  await someObservable.forEach(signal);
  doThisWhenComplete();
}

... where some third party to the execution of foo aborts the signal. If the returned promise doesn't reject, then doThisWhenComplete will be called regardless. Effectively we wouldn't have a way to differentiate between "complete" and "unsubscribed" within an async function. Where, the other way around, we have a way to differentiate between errors and unsubscribe by doing a try-catch and checking the error to see if it's an AbortError.

It sucks. I really want this to work, but currently I'm not seeing a great solution. Hopefully I'm just near-sighted. haha.

Without completely changing how Observables work, would it still be possible to improve the interoperability for the use case of calling abortable async functions? Taking from ideas proposed in this thread, provide an AbortSignal to all Observable-factory-like functions, e.g.

switchMap((val, i, signal) => fetch(url, { signal }))

defer(signal => fetch(url, { signal }))

new Observable((subscriber, signal) => {
  fetch(url, { signal })
    .then(res => {
      subscriber.next(res)
      subscriber.complete()
    })
    .catch(err => observer.error(err))
})

There would be no change in behaviour - when the Observable is unsubscribed from, it first runs the unsubscribe logic, _then_ also aborts the AbortSignal. So whether the Promise rejects with an AbortError or not doesn't matter anymore because the consumer unsubscribed and will never know. It is nothing but a convenience to make it easier to abort unneeded work.

There are lots of use cases where Observables and their "dont care" unsubscribe semantics are the right abstraction (e.g. events), but as part of an Observable chain you often need to trigger async work, and for a simple async function like fetch a Promise/AbortSignal pair is the right abstraction (this applies to most user-land async functions). RxJS should make crossing that boundary as smooth as possible (as it does today by accepting ObservableInput almost everywhere, just cancellation is missing).

The primary difference to the proposal in the OP is that this strictly improves the conversion from Promise/AbortSignal -> Observable, not the other way around. I.e. it answers "I have a function that takes AbortSignal, how can I get an Observable", it intentionally does not solve "I have a function that returns an Observable and an AbortSignal, how can I cancel it" because I don't think the use case is as strong for that, and it's not hard to do manually.

Ping @benlesh, thoughts?

I wrapped my proposal into a library of functions that can be used as drop-in replacement for RxJS factories/operators where AbortSignal makes sense: https://github.com/felixfbecker/abortable-rx

@benlesh it would be nice if atleast toPromise accepted an AbortSignal

Has this had any movement in the last 3/4 year?

I just sat down today to replace all my native fetch calls with Angular's HttpClient. I figured, it's a modern framework, surely it'll be a half-hour drop-in replacement job. But for the life of me I can't figure out what to do with my AbortSignal -- a lot of my existing code passes an AbortSignal to all calls being made for one logical operation, and triggers it when the user clicks a "cancel" or "stop" button. I read through the backlog on this issue but if there's a simple pattern for reproducing this behavior with an Observable-based HTTP library, I can't find it.

@thw0rted you can try the helpers I linked above

Thanks Felix. I'm probably not going to add the whole library but I will compare your approach to what I worked out myself.

@thw0rted Just curious, have you tried using takeUntil with a Subject as your abort signal? I haven't needed to abort my Http requests, so I've not actually tried this, but using these two RxJs facilities, I'd imagine it would allow you to abort the call?

private abortSignal$$ = new Subject();

public abort() {
  this.abortSignal$$.next();
}

public loadData(): Observable<Data[]> {
  return this.http.get<Data[]>(`/api/data`).pipe(takeUntil(this.abortSignal$$));
}

Perhaps I'm missing a nuance here...it may be that using takeUntil in this way will only cancel the Observable, but not otherwise affect the underlying Http call?

That's a neat idea! I'm an Observables yellow-belt at best, so I would never have come up with that on my own, but it seems like it gets the spirit of the thing. My understanding is that the underlying XHR / fetch call is supposed to be aborted when the last subscriber unsubscribes from the returned observable. So, as long as takeUntil unsubscribes when its trigger observable fires, that "should" happen. I'll have to test to see if it actually does.

This also simplifies my code -- I can basically write a tiny wrapper to turn an AbortSignal into a Subject:

ts public load(signal?: AbortSignal) { const ret = this.http.get<Data[]>(`/api/data`); if (!signal) { return ret; } const subj = new Subject(); signal.addEventListener("abort", () => subj.next(), {once: true}); return ret.pipe(takeUntil(subj)); }

It's a bit more nuanced:

  • You need to handle an AbortSignal that's already aborted, as that will not fire the abort event anymore
  • You need to remove the abort listener if the Observable completes, errors, or gets unsubscribed from, else it's gonna leak memory (could do this by using takeUntil(fromEvent(signal, 'abort').pipe(take(1))) instead of a Subject)

Revised per Felix's suggestions, in case anybody finds it helpful:

```ts // LikeObservable#toPromise` but unsubscribes when the passed signal aborts
export function observableToPromise(obs: Observable, signal?: AbortSignal): Promise {
if (!signal) { return obs.toPromise(); }
if (signal.aborted) { return NEVER.toPromise(); }

const stop = fromEvent(signal, "abort").pipe(take(1));

return obs.pipe(takeUntil(stop)).toPromise();

}
````

Nice. Just one last tip: You probably want to reject the Promise with an AbortError instead of never resolving it, because otherwise you'll have listeners hang forever (memory leak) and never run cleanup (finally blocks).

The test suite of abortable-rx's toPromise() may be of interest for you: https://github.com/felixfbecker/abortable-rx/blob/0b60bfc80e61904e720f99e9ca8c662c72dc4f51/src/index.test.ts#L71

From my point of view, Cancellation is just an Observable with a specific context. It is an action invoked by an entity that represents of canceling something. This definition is pretty much close to what an Observable is (an object when observed emits a data representation).
This could mean that for an Observable with a need of clearing things up, it needs to listen for an Observable that will invoke a cancellation like takeUntil operator does.

But this idea tells us more about what an Observable is. If cancellation is an Observable in the context of canceling something and some Observables has a need for it. Then it could be that Observable, as requires by a reactive program, fundamentally reacts from another Observable outside.

const fromArray = array => new Observable((next, externalObservable) => {
  let cancelled = false;
  // listen to an external observable that could
  // probably emit a cancellation context (token)
  // as an indication to break the loop
  externalObservable
    .filter(value => value === 'CANCEL')
    .take(1) // automatic unsubscribe external observable
    .subscribe(() => (cancelled = true))

  for (let index = 0; index < array.length; index++) {
    if (cancelled) break;
    next(array[index]);
  }
});

const subject = new Subject();

fromArray([1, 2, 3, 4, 5])
  .subscribe(
    value => value === 3 && subject.next('CANCEL'), // emit a cancellation context
    subject // pass an observable that could emit a cancellation context
  )

Pseudo-code above shows that fromArray reacts from an external Observable that emits cancellation token and thus break the propgagation.

If the idea that cancellation is just another form of Observable is correct and fundamental

new Observable(next => next('CANCEL'));

fromEvent(button, 'click').mapTo('CANCEL');

timeout(1000).mapTo('CANCEL')

then it could be that Observable fundamentally reacts to another Observable. This not just limited to cancellation context it could be used for aborting fetch requests, pausing an observable, pulling data manually from an observable, and many more. I've explained the idea further in this article (https://github.com/cedmandocdoc/redefining-observable) and provided workable examples to play with.

Closing this in favor of discussions #5591 #5545 #5683 et al.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dooreelko picture dooreelko  路  3Comments

matthewwithanm picture matthewwithanm  路  4Comments

chalin picture chalin  路  4Comments

jakovljevic-mladen picture jakovljevic-mladen  路  3Comments

benlesh picture benlesh  路  3Comments