Okay, this one might sound a little strange, but I was talking with @benjamingr, and he reminded me of past discussions about just using Observable as a signal by talking about what a shame it was we had AbortSignal natively, but no Observable. Like "what if we wanted to signal something _other_ than cancellation?".
To that end, I'd like to re-explore this idea. Currently, we have the idea of takeUntil as an operator, which is effectively cancellation of the source observable, and notification of completion downstream. But _what if_ we allowed that cancellation notification at the point of subscription, and we also provided an observable of cancellations to the observable initializer:
These two things would be _very_ similar, only one would't cause the completion notification:
source$.pipe(
takeUntil(notifier$)
).subscribe({
next: console.log,
complete: () => console.log('complete')
});
source$.subscribe({
next: console.log,
complete: () => console.log('complete') // never called because it was cancelled.
}, notifier$);
Let's take the example from #5649:
let sideEffects = 0;
const firehose = new Observable((subscriber) => {
let n = 0;
while (!subscriber.closed && n < 1000) {
sideEffects++;
subscriber.next(n++);
}
});
// Let's say this observable here is the result of some sort of operator that someone made.
// If this is hard to understand, I guess a challenge would be to have someone "roll their own"
// concatMap operator without using our Subscribers and having working knowledge of
// our implementation details.
const flatteningSyncOuter = new Observable((subscriber, teardown$) => {
// Notice how now we are threading the signal through. Pretty straight forward.
firehose.subscribe({
next: value => subscriber.next(value)
}, teardown$);
firehose.subscribe({
next: value => subscriber.next(value),
complete: () => subscriber.complete()
}, teardown$);
});
const unsubController = new Subject();
const teardown$ = unsubController.asObservable(); // this is really optional
flatteningSyncOuter.pipe(
tap(() => {
// Give it a 1 in 10 chance of unsubbing externally.
if (Math.random() > 0.9) unsubController.next();
}),
take(300)
).subscribe(undefined, teardown$);
expect(sideEffects).to.equal(3);
As you can see, the API difference is minimal.
Furthermore, we could easily provide something to get an Observable from an AbortSignal, and vice versa. AND we could even allow for users to pass AbortSignal there, if it's available, and we could do the conversion for them under the hood.
I think this solves a few problems around AbortSignal, in particular, if someone wanted to use the provided teardown$ to register teardowns, it's MUCH more ergonomic than AbortSignal:
AbortSignal
const source1$ = new Observable((subscriber, signal) => {
const resource = new ExpensiveResource();
signal.addEventListener('abort', () => resource.close(), { once: true });
});
Observable
const source1$ = new Observable((subscriber, teardown$) => {
const resource = new ExpensiveResource();
teardown.subscribe(() => resource.close());
});
It's a lot cleaner. I think the confusing bit above might be that users will wonder if they need to unsubscribe from the teardown they've subscribed to. But that's unnecessary, because teardown _must_ happen eventually. Unless the subscription runs forever.
AbortSignal doesAbortSignalIf Promise was designed better, and allowed sync emission, it would have been better for this particular use case. Alas. We have what we have.
Hmm, this is interesting - some thoughts on this:
.subscribes (alongside other non Rx APIs)..subscribe take a signal (optionally).Tangent:
If Promise was designed better, and allowed sync emission, it would have been better for this particular use case. Alas. We have what we have.
For what it's worth when promises needed to do this we used callbacks which was both the shortest and had no error handling / timing guarantees that had to be upheld.
In your case that would be:
const source1$ = new Observable((subscriber, teardown) => {
const resource = new ExpensiveResource();
teardown(() => resource.close());
});
// which is almost as short, and provides no benefit over the 'current'
const source1$ = new Observable((subscriber, teardown) => {
const resource = new ExpensiveResource();
return () => resource.close();
});
We mainly did this because cancellation (in bluebird) was an afterthought of the promise constructor which already had other guarantees and constraints.
Also:
he reminded me of past discussions about just using Observable as a signal by talking about what a shame it was we had AbortSignal natively, but no Observable
I would very much be interested in exploring Observables in the platform again (as a much simpler primitive). I had to learn a lot just to get a (pretty basic) understanding of EventTarget and it's a very complicated primitive in some surprising ways.
Interesting idea. I like it.
One observation is that using a Subject as the notifier is a little different to using an AbortSignal. To behave more similarly to AbortSignal, it would need to be a ReplaySubject(1). Passing a Subject kinda assumes that all use of the notifier within observable's implementation is going to involve synchronous calls to subscribe - i.e. no possibility of a missed notification.
What would the implementation of Observable#subscribe look like - given that (as a garden-variety observable) the notifier won't have an equivalent of Subscription#closed or AbortSignal#aborted? How would it exit early upon receipt of an already-signalled notifier?
I guess a BehaviorSubject might model that better. 馃
Exciting! I am exploring this idea too a couple of months ago (check out the article), my thoughts:
The foundation that we currently have is that Observable has two fundamental properties: data propagation and cancelation. It is evident that data propagation is fundamental since it is what makes an Observable to be observable, it is its way to express something to the subscriber and we achieve it using a callback subscriber.next. The other property is cancelation, this may seem fundamental at first but it occurred to me that cancelation may not and it may be just an emergent property. Exploring this I've found that, cancelation is a demand from the subscriber, it is purely based on the application logic when to cancel. So it is not the cancelation is fundamental but rather the ability for an Observable to accept a demand or a request from the outside and it makes sense because, in a larger scale, a subscriber could demand other than cancelation and that could emerged somewhere (depends on the application requirement). This ability of Observable to react base on the subscriber request arrives at me to the conclusion that cancelation is indeed an Observable because sending a request is very similar to what an Observable is (a data propagation).
@benlesh will the idea of accepting another Observable inside the producer function as a way for cancelation is open for the possibility to be more generic like what you have said:
Like "what if we wanted to signal something other than cancellation?".
If so then the API that I am imagining:
Observable.CANCEL = Symbol('CANCEL');
const obs = new Observable((subscriber, talkback) => {
const id = setInterval(() => subscriber.next('hey'), 500);
talkback.pipe(
filter(token => token === Observable.CANCEL),
takeWhile(token => token !== Observable.CANCEL, true)
).subscribe(() => {
clearInterval(id);
subscriber.complete();
});
})
const cancel = fromEvent(document.body, "click").pipe(mapTo(Observable.CANCEL))
obs.subscribe(data => console.log(data), cancel);
Then maybe we can extends the Observable to have a boilerplate encapsulation for automatic cancelation like:
// CancelObservable extends Observable
new CancelObservable((subscriber, cancel) => {
cancel.subscribe(); // filtered subscriber request down to Observable.CANCEL only
})
I hope that RxJS team will explore this idea, it is very exciting! It will open up many opportunities.
I'd like to iterate that supporting/using AbortSignals would _not_ mean passing the signal to the subscriber function (in the observable constructor). It would just mean subscribe accepting an optional signal second argument and if a signal is passed there invoking the unsubscribe function
const source1$ = new Observable((subscriber) => {
const resource = new ExpensiveResource();
// call next somewhere
return () => resource.close();
});
const unsubscrie = source1$.subscribe(...); // today, still works
source1$.subscribe(..., signal); // I am suggesting we add support for this
This would break very little existing code while enabling interop with all other async code :]
Bare with me cause I'm probably missing the point here... But out of curiosity, why introduce a new cancellation mechanism when rx already has one?
Also, when AbortSignal is used for a Promise, it would reject with AbortError once it is signalled. Wouldn't it be consistent if something similar is done for Observable? In that case, support for cancellation signal can be handled with just a new operator similar to takeUntil, but instead of completing, it errors out with AbortError when abortSignal emits. Something like
source$.pipe(
abortWhen(abortSignal)
).subscribe({
next: console.log,
complete: () => console.log('not called on abort'),
error: e => console.error('called on abort', e);
});
This way, upstream is cancelled the normal way (returned function from subscribe function is called).
Also, one can make abortWhen operator accept an observable to signal the cancel as well.
Ok I indeed missed the point. Turns out (for me) that the plan was altogether to move away from old way of cancellation to a new way. #5863. As I understand to be able to support aborting of synchronously emitting observables (like in the firehose example), you need to be able to setup cancellation ahead of time.
I'll shut up (for now 馃槣).
@benjamingr
I think I'd agree with you that current way of detecting a cancellation should still be supported, so existing code is not broken. But consider the following example as what I understand is the main reason for the idea of passing the abort signal to the subscriber function:
const firehose$ = new Observable((subscriber) => {
let cancelled = false;
let count = 0;
while(!cancelled) {
subscriber.next(count++);
}
return () => cancelled = true;
});
In this synchronous 'firehose' example, the subscriber function will never reach the point where the cancellation function is returned. Hence it can never be cancelled.
But in the case where an abort signal is passed, it can still be cancelled like this:
const firehose$ = new Observable((subscriber, signal) => {
let count = 0;
while(!signal.aborted) {
subscriber.next(count++);
}
});
This is because cancellation is setup prior to calling the synchronous subscription function. Where in the first example, cancellation is (tried to be) setup _after_ calling it.
Still, my opinion is that cancellation in the first example should still be supported (as well), because I'd say that in more than 99% of the cases, observables are used in asynchronous ways, and as you say, this way it can be backward compatible with existing code.
Most helpful comment
Also:
I would very much be interested in exploring Observables in the platform again (as a much simpler primitive). I had to learn a lot just to get a (pretty basic) understanding of
EventTargetand it's a very complicated primitive in some surprising ways.