The standards proposals are leaning towards using AbortSignal and AbortController (ala fetch cancellation) for cancellation with Observables. I generally find this favorable for a few reasons and I think I see a migration path for us.
subscribe accepts Observer and an optional AbortSignal.forEach accepts an optional AbortSignal.new Observable(subscriber) gives both a safe Observer and an AbortSignal, but is expected to return void.Observer passed to subscribe will have an optional abort handler that will get the AbortError if the observable is aborted.forEach, the AbortError would go down the rejection path just like it does with fetch. Since the returned promise is a guarantee to a future where the producer either errors or completes, it makes sense to send the AbortError down the rejection path in this case.AbortSignal is an EventTarget, and EventTarget may soon have an on method that returns an observable. you could use abortSignal.on('abort') to compose teardown if you so choose.We'll start using AbortSignal internally throughout the library.
subscribe that accepts an Observer and an AbortSignal. If AbortSignal is passed, it does not return a Subscription.Subscription into an AbortControllerAbortSignal.unsubscribe as abort.Observable constructor receive an AbortSignal.AbortSignal.start method I just added. :) lol It's no longer necessary.abort method that accepts AbortError.forEach get sent as rejections of type AbortError to the returned Promise.toPromise accept an abort signalsubscribe(fn, fn , fn), because object literals are more readable anyhow, an forEach can be used for the general case of just needing next. Think subscribe(null, (err) => doAThing(err)) vs subscribe({ error: doAThing })cc/ @jhusain @trxcllnt @kwonoj @mattpodwysocki
Also cc @domenic who might have interest.
So before, unsubscribe simply made sure no handlers (next, error, complete) were called. With AbortSignal, does that mean that cancelling an Observable will call error with an AbortError?
@felixfbecker No, not at all. It doesn't change anything semantically for the Observable type, just the cancellation mechanism. In the short term, it would be an additional feature rather than a replacement.
I could see that as a confusion point because that is what the Promise returned by fetch does
I'm not sure we could do that, it would make error handling pretty much awful, as any unsubscription would be treated like an error. switchMap would be a hot mess.
In fetch it makes a little sense, because Promises are always multicast... however cancellation is not an error state. Seems like, if anything, a finally clause would need to be notified in a Promise, but that's a matter for debate elsewhere.
I just know with Observable it seems like a really bad idea.
After talking with @jhusain about this last night, I think we're going to add an abort signal to Observer.
forEach, which returns a promise, an abort would purcolate down the rejection path as an AbortError just like it does in fetch.For Observable forEach, which returns a promise, an abort would purcolate down the rejection path as an AbortError just like it does in fetch.
I'm not so sure about this. Does this mean
document.body.on("mousemove").takeUntil(document.body.on("click")).forEach(e => { ... });
would result in a (here-unhandled) rejected promise?
It feels OK to me to use the AbortSignal mechanism to communicate cancelations without using "AbortError".
@domenic take, takeUntil, et al, result in a producer-fired completion rather than an abort. So no AbortError or abort notification would fire.
A trickier one would be switchMap, which actually aborts inner observables. For that one, the abort errors would need to be swallowed unless the parent aborts. It's either that or we might have to change it to complete those inner observables instead. :thinking:
Ah, right, in that case I don't have any concerns, especially since I've never used/don't understand switchMap :).
Well, OK, no, maybe I have concerns. It's weird that the equivalent of removeEventListener() would cause an "exception". This isn't exceptional.
@domenic: It feels OK to me to use the AbortSignal mechanism to communicate cancelations without using "AbortError".
Maybe I need to disambiguate this a little.
abortSignal ... the thing we're passing in as a cancellation token of sorts.abort notifications, which come to a handler on an Observer objects passed to subscribe. This could recieve the AbortError object.AbortError as sent to the rejected promise of forEach.It's weird that the equivalent of removeEventListener() would cause an "exception". This isn't exceptional.
In the case of using Observable.prototype.subscribe, it would not send anything down the error path of the observer chain, rather it would notify down the abort path of the observer chain.
In the case of using Observable.prototype.forEach it would do the same thing it does with fetch, which is send an AbortError down the rejection path. (This is mostly to mirror what fetch is doing)
I don't think that is the right dimension along which to mirror fetch. In fetch, canceling is at least somewhat exceptional; it prevents a response from being received. Removing an event listener is not exceptional.
@domenic Interesting, so you're saying you would not reject the promise returned by forEach with AbortError? (Or presumably use AbortError at all?)
Yeah, I don't think I would reject the promise.
A function that accepts an AbortSignal can decide how to react to the news of the process being aborted. If it means premature, somewhat-exceptional termination, it can return a rejected promise. If it's expected termination, and this is just one signalling channel, it can return a fulfilled promise. It's up to the function in question. For forEach, it seems expected.
Now, if the observable was created via something like
const o = new Observable((observer, signal) => {
signal.addEventListener("abort", () => {
observer.error(new DOMException("AbortError", "..."));
});
});
then forEach would reject. This might make sense for some observable where unsubscribing is somehow an error. But I don't think this should be the default behavior.
@domenic ... I see. So you're saying leave it to the Observable creator to decide whether or not to send the AbortSignal? That makes sense.
What about for the case of signal.on('abort')? :smile: There might be a few people that try this inside of their Observable:
const o = new Observable((observer, signal) => {
signal.on('abort').forEach(() => {
observer.error(new DOMException('AbortError', '')
}, otherSignal);
});
... if otherSignal is aborted above, does the promise returned by forEach reject?
Assumine you mean o.forEach(), not signal.on('abort').forEach(), and assuming that otherSignal is aborted before signal is, then I think the answer is no. The () => { observer.error(...) } block never executes, because the forEach unsubscribed.
@domenic I meant in a world where EventTarget has an on method... what will it do? I suppose that's orthogonal to this particular issue. I'm just curious.
I think I'm totally with you that the Observable creator should be left to decide whether or not an abort signals an error.
Right, I don't think observables created by on() should be ones that treat unsubscription/aborting as an error.
@domenic Ah, right, in that case I don't have any concerns, especially since I've never used/don't understand switchMap :).
switchMap is like flatMap, except that instead of merging all inner Observables, it switches to the latest Observable (and if applicable, disposes the subscription of the current Observable). In pseudocode:
Observable.prototype.switchMap = function(sel: (x: any) => Observable) {
let source = this;
return new Observable((sink) => {
let outerSubscription = new Subscription();
let innerSubscription = new Subscription();
outerSubscription.add(source.subscribe((x) => {
innerSubscription.unsubscribe();
outerSubscription.remove(innerSubscription);
outerSubscription.add(innerSubscription = sel(x).subscribe({
next(x) { sink.next(x); },
error: sink.error.bind(sink),
complete: sink.complete.bind(sink)
}));
}, sink.error.bind(sink), sink.complete.bind(obs)));
return outerSubscription;
});
}
It's useful in cases you're mapping discrete events into asynchronous actions, and the arrival of a new event should cancel the current pending action, and switch to the latest asynchronous action:
tabNavigatorSelections
.switchMap((selectedTab) => loadTabContent(selectedTab))
.subscribe((selectedTabContent) => render(selectedTabContent));
mouseDownEvents
.switchMap((down) => mouseMoveEvents.takeUntil(mouseUp))
.subscribe((moveEvent) => doDrag(moveEvent))
I'm currently thinking that in a world where observable is using AbortSignal internally, we might want to provide an AbortSignal to flattening functions. Use case:
clicks.switchMap((e, i, signal) => fetch(url, { signal }))
:thinking:
Then again I'd worry a little bit that people would use it for silly things inside of that function.
FWIW people can do it like this:
switchMap(
() =>
new Observable(observer => {
const abortController = new AbortController()
fetch(url, { signal: abortController.signal })
.then(res => {
observer.next(res)
observer.complete()
})
.catch(err => observer.error(err))
return () => abortController.abort()
})
)
@felixfbecker in a world where we were using AbortSignal as well, it would be easier than that, as we'd be providing a signal to the subscriber function.
switchMap(
() =>
new Observable((observer, signal) => {
fetch(url, { signal })
.then(res => {
observer.next(res)
observer.complete()
})
.catch(err => observer.error(err))
})
)
I'm just trying to think of ways to make that more ergonomic. I'm tired of everyone maintaining an HTTP client.
After looking starting to implement a toy version of this, I think I've decided that it would not work out. The problem was clear after some discussion with @jhusain. The reasons it won't work are a bit nuanced:
forEach, any abort would therefor result in that rejection (this is fine on it's own... but...)forEach... Therefore all completions will cause UnhandledPromiseExceptions if not explicitly handled and using forEach.The truth is that streams like Observables are made to be subscribed to and unsubscribed from, and it's not an error if you unsubscribe, it's expected. So while Subscription and AbortController are _nearly identical in shape_ they are not identical in use case or behavior, and cannot be used interchangeably for this type.
As of right now, I think I'll close this issue. :cry:
I'm still interested in reading anyone's thoughts, and I'm happy to reopen if necessary.
So, their behavior is completely up to us. They only reject forEach because we decided that. And we only decided that because @jhusain convinced me nobody would use forEach for subscription, essentially.
If we think people will use forEach for a subscription where they don't care about the completion value, we should not reject with AbortError.
@domenic I thought the nasty issue with not rejecting the promise returned by forEach is ambiguous behavior between "complete" and "unsubscribed" within an async function:
async function foo() {
await someObservable.forEach(signal);
doThisWhenComplete();
}
... where some third party to the execution of foo aborts the signal. If the returned promise doesn't reject, then doThisWhenComplete will be called regardless. Effectively we wouldn't have a way to differentiate between "complete" and "unsubscribed" within an async function. Where, the other way around, we have a way to differentiate between errors and unsubscribe by doing a try-catch and checking the error to see if it's an AbortError.
It sucks. I really want this to work, but currently I'm not seeing a great solution. Hopefully I'm just near-sighted. haha.
Without completely changing how Observables work, would it still be possible to improve the interoperability for the use case of calling abortable async functions? Taking from ideas proposed in this thread, provide an AbortSignal to all Observable-factory-like functions, e.g.
switchMap((val, i, signal) => fetch(url, { signal }))
defer(signal => fetch(url, { signal }))
new Observable((subscriber, signal) => {
fetch(url, { signal })
.then(res => {
subscriber.next(res)
subscriber.complete()
})
.catch(err => observer.error(err))
})
There would be no change in behaviour - when the Observable is unsubscribed from, it first runs the unsubscribe logic, _then_ also aborts the AbortSignal. So whether the Promise rejects with an AbortError or not doesn't matter anymore because the consumer unsubscribed and will never know. It is nothing but a convenience to make it easier to abort unneeded work.
There are lots of use cases where Observables and their "dont care" unsubscribe semantics are the right abstraction (e.g. events), but as part of an Observable chain you often need to trigger async work, and for a simple async function like fetch a Promise/AbortSignal pair is the right abstraction (this applies to most user-land async functions). RxJS should make crossing that boundary as smooth as possible (as it does today by accepting ObservableInput almost everywhere, just cancellation is missing).
The primary difference to the proposal in the OP is that this strictly improves the conversion from Promise/AbortSignal -> Observable, not the other way around. I.e. it answers "I have a function that takes AbortSignal, how can I get an Observable", it intentionally does not solve "I have a function that returns an Observable and an AbortSignal, how can I cancel it" because I don't think the use case is as strong for that, and it's not hard to do manually.
Ping @benlesh, thoughts?
I wrapped my proposal into a library of functions that can be used as drop-in replacement for RxJS factories/operators where AbortSignal makes sense: https://github.com/felixfbecker/abortable-rx
@benlesh it would be nice if atleast toPromise accepted an AbortSignal
Has this had any movement in the last 3/4 year?
I just sat down today to replace all my native fetch calls with Angular's HttpClient. I figured, it's a modern framework, surely it'll be a half-hour drop-in replacement job. But for the life of me I can't figure out what to do with my AbortSignal -- a lot of my existing code passes an AbortSignal to all calls being made for one logical operation, and triggers it when the user clicks a "cancel" or "stop" button. I read through the backlog on this issue but if there's a simple pattern for reproducing this behavior with an Observable-based HTTP library, I can't find it.
@thw0rted you can try the helpers I linked above
Thanks Felix. I'm probably not going to add the whole library but I will compare your approach to what I worked out myself.
@thw0rted Just curious, have you tried using takeUntil with a Subject as your abort signal? I haven't needed to abort my Http requests, so I've not actually tried this, but using these two RxJs facilities, I'd imagine it would allow you to abort the call?
private abortSignal$$ = new Subject();
public abort() {
this.abortSignal$$.next();
}
public loadData(): Observable<Data[]> {
return this.http.get<Data[]>(`/api/data`).pipe(takeUntil(this.abortSignal$$));
}
Perhaps I'm missing a nuance here...it may be that using takeUntil in this way will only cancel the Observable, but not otherwise affect the underlying Http call?
That's a neat idea! I'm an Observables yellow-belt at best, so I would never have come up with that on my own, but it seems like it gets the spirit of the thing. My understanding is that the underlying XHR / fetch call is supposed to be aborted when the last subscriber unsubscribes from the returned observable. So, as long as takeUntil unsubscribes when its trigger observable fires, that "should" happen. I'll have to test to see if it actually does.
This also simplifies my code -- I can basically write a tiny wrapper to turn an AbortSignal into a Subject:
ts
public load(signal?: AbortSignal) {
const ret = this.http.get<Data[]>(`/api/data`);
if (!signal) { return ret; }
const subj = new Subject();
signal.addEventListener("abort", () => subj.next(), {once: true});
return ret.pipe(takeUntil(subj));
}
It's a bit more nuanced:
AbortSignal that's already aborted, as that will not fire the abort event anymoreabort listener if the Observable completes, errors, or gets unsubscribed from, else it's gonna leak memory (could do this by using takeUntil(fromEvent(signal, 'abort').pipe(take(1))) instead of a Subject)Revised per Felix's suggestions, in case anybody finds it helpful:
```ts
// LikeObservable#toPromise` but unsubscribes when the passed signal aborts
export function observableToPromise
if (!signal) { return obs.toPromise(); }
if (signal.aborted) { return NEVER.toPromise(); }
const stop = fromEvent(signal, "abort").pipe(take(1));
return obs.pipe(takeUntil(stop)).toPromise();
}
````
Nice. Just one last tip: You probably want to reject the Promise with an AbortError instead of never resolving it, because otherwise you'll have listeners hang forever (memory leak) and never run cleanup (finally blocks).
The test suite of abortable-rx's toPromise() may be of interest for you: https://github.com/felixfbecker/abortable-rx/blob/0b60bfc80e61904e720f99e9ca8c662c72dc4f51/src/index.test.ts#L71
From my point of view, Cancellation is just an Observable with a specific context. It is an action invoked by an entity that represents of canceling something. This definition is pretty much close to what an Observable is (an object when observed emits a data representation).
This could mean that for an Observable with a need of clearing things up, it needs to listen for an Observable that will invoke a cancellation like takeUntil operator does.
But this idea tells us more about what an Observable is. If cancellation is an Observable in the context of canceling something and some Observables has a need for it. Then it could be that Observable, as requires by a reactive program, fundamentally reacts from another Observable outside.
const fromArray = array => new Observable((next, externalObservable) => {
let cancelled = false;
// listen to an external observable that could
// probably emit a cancellation context (token)
// as an indication to break the loop
externalObservable
.filter(value => value === 'CANCEL')
.take(1) // automatic unsubscribe external observable
.subscribe(() => (cancelled = true))
for (let index = 0; index < array.length; index++) {
if (cancelled) break;
next(array[index]);
}
});
const subject = new Subject();
fromArray([1, 2, 3, 4, 5])
.subscribe(
value => value === 3 && subject.next('CANCEL'), // emit a cancellation context
subject // pass an observable that could emit a cancellation context
)
Pseudo-code above shows that fromArray reacts from an external Observable that emits cancellation token and thus break the propgagation.
If the idea that cancellation is just another form of Observable is correct and fundamental
new Observable(next => next('CANCEL'));
fromEvent(button, 'click').mapTo('CANCEL');
timeout(1000).mapTo('CANCEL')
then it could be that Observable fundamentally reacts to another Observable. This not just limited to cancellation context it could be used for aborting fetch requests, pausing an observable, pulling data manually from an observable, and many more. I've explained the idea further in this article (https://github.com/cedmandocdoc/redefining-observable) and provided workable examples to play with.
Closing this in favor of discussions #5591 #5545 #5683 et al.
Most helpful comment
It's a bit more nuanced:
AbortSignalthat's already aborted, as that will not fire theabortevent anymoreabortlistener if the Observable completes, errors, or gets unsubscribed from, else it's gonna leak memory (could do this by usingtakeUntil(fromEvent(signal, 'abort').pipe(take(1)))instead of a Subject)