Current Behavior
We noticed that finalize is not called at the end of a pipe, in combination with concatMap.
Reproduction
We could reproduce our production problem when using a delay in the pipe. We are using delayWhen, maybe it has the same side-effect as delay:
https://stackblitz.com/edit/typescript-dhe1y9?file=index.ts
In production we use a very simplified version of the following SB:
https://stackblitz.com/edit/typescript-pf9zt7?file=index.ts
With delay, the finalize is called before next value or on complete.
Expected behavior
Finalize should be called at the end of a pipe, and before a next value is emitted from concatMap.
Environment
The same things happen in both StackBlitz examples:
When using delay(ms), every complete/next notification will be scheduled as a macrotask, that will be invoked after ms milliseconds. By default, delay uses AsyncScheduler, which is why the notification will be scheduled as a macrotask. In order to visualize this, try using delay(0, queueScheduler), in both examples and you should see the _expected_ results.
_Note: delay(msGreaterThanZero, queueScheduler) === delay(ms)_.
concatMap(fn) === mergeMap(fn, concurrent = 1). When mergeMap has the concurrent set and it is exceeded, it will buffer the _extra_ values. When one of its active inner observables completes, it will take the oldest buffered value and will create an inner observable out of it, using the provided fn.
finalize(cb): the callback will be invoked when its destination subscriber(more one this in the snippet below) unsubscribes. When the source completes or emits an error notification, the subscriber will have the unsubscribe method called as well.
// S#1 -> Subscriber#1
src$.pipe(
a(), // S#4
b(), // S#3
finalize(cb), // S#2;
).subscribe(observer) // S#1 - the `observer` is turned into a `Subscriber`
// S#1 is the destination subscriber for S#2
The finalize's callback will be called when S#1 unsubscribes, which can happen when the source completes or emits an error, or when you manually call S#1.unsubscribe().
Now let's see how this applies to your examples.
const source = of(2000, 1000).pipe(concatMap(val => process(val)));
function process(val) {
return of(val).pipe(
tap(val => console.log("process-1", val)),
delay(val, queueScheduler),
tap(val => console.log("process-2", val)),
finalize(() => console.log("finalize", val))
);
}
When 2000 arrives, concatMap, more precisely, mergeMap, will create an inner observable from it. Notice that the inner observable(what process returns) contains the delay operator, which means that the inner observable will not complete in this tick. 1000 comes next, but as the inner observable has not completed yet, the value(1000) will be buffered.
After 2000 ms pass, the mergeMap's inner observable will emit the value and complete(because of of operator).
Here's what happens when the inner observable completes:
protected _complete(): void {
// parent refers to `mergeMap` in this case
this.parent.notifyComplete(this);
// When this happens, the `finalize`'s callback will be invoked as well
this.unsubscribe();
}
mergeMap's notifyComplete looks like this:
notifyComplete(innerSub: Subscription): void {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift()!); // Create an inner obs from the oldest buffered value
} /* ... */
}
So, as you can see, when the inner observable completes, it will first notify the parent(mergeMap in this case), which will in turn create an inner observable from the oldest buffered value(1000 in this case). During the _creation process_, which involves calling the process function, the line where process-1 1000 is logged will be reached.
If you removed the delay() completely, you'd see the _expected_ result because all would happen synchronously. This means that when the inner observable created from 2000 completes, the buffer will still be empty, because 1000 hasn't arrived yet.
Thx Andrei, in shedding some light in the implementation.
I think your note should read:
Note: delay(msGreaterThanZero, asyncScheduler) === delay(ms).
We have a delayWhen, acting as a semaphore, to synchronize that other async tasks have all subscribed.
Independent of scheduler used, the behavior will be the same, the inner finalize will be called at next or complete/error of the outer subscription. Making it dependent on the outer subscription and not being the last part run to finish a pipe.
This is not expected, many other programming language use a try/catch/finally, or using/destroy structure. The final part is called before continuing to the next block (read next value in RxJS).
@ devs
I just need a final statement: Is it a bug, or as-designed, or will-not-change.
When not accepted as a bug, finalize can not be used. As a quick fix, I could move the finalize code to a catchError. As I have a defined end in the expand, I could throw an Error.
Is catchError run before the end of the pipe or has it the same implementation as finalize, running on next value and complete/error of the outer subscription ?
I think your note should read:
Note: delay(msGreaterThanZero, asyncScheduler) === delay(ms).
I'd disagree. queueScheduler's actions are QueueAction instances. When you schedule a QueueAction with a delay, this line will be reached:
if (delay > 0) {
return super.schedule(state, delay);
}
where super.schedule points to AsyncAction.schedule. The AsyncScheduler uses AsyncAction instances.
IMO, the _problem_ in this case is that the inner observable notifies its parent outer observable about a complete notification before it(the inner one) unsubscribes.
If you want to run the callback _when the complete notification is sent_, you can use tap's third argument, which is a callback function. So you could replace finalize(() => console.log("finalize", val)) with tap(null, null, () => console.log("finalize", val)). This should comply with the expected behavior.
Thanks again Andrei, for your exhaustive answer.
About the scheduler, I was referencing:
https://rxjs-dev.firebaseapp.com/guide/scheduler
Time-related operators like bufferTime, debounceTime, delay, auditTime, sampleTime, throttleTime, timeInterval, timeout, timeoutWith, windowTime all take a Scheduler as the last argument, and otherwise operate by default on the asyncScheduler.
I think, you wanted to point me to something else.
Thanks for the tip, this is a "real" finalize, how I expect it to be, and will solve my problem:
tap(null, null, () => console.log("finalize", val))
https://stackblitz.com/edit/typescript-pf9zt7?file=index.ts
@ devs
Would still be nice to have a final statement on this issue: Is it a bug, or as-designed, or will-not-change.
The repro seems to exhibit expected behaviour, IMO and, FWIW, I don't understand what is meant by this:
Finalize should be called at the end of a pipe, and before a next value is emitted from concatMap.
I expected, and that is also how other programming language implement a finalize is, that finalize is called before leaving a block (RxJS: pipe) and going to a next block (speak reading the next value for concatMap).
Without having a delay / delayWhen, it behaves like expected. With them, finalize is called on the next value and on complete/error of the outer subscription.
An application can work for a long time, just by adding a delay / delayWhen (or others) the behavior changes. Maybe a function returning an Observable is used and changed, it will not be obvious that it will have an impact on a pipe with a finalize.
When you tell me it is as designed, I will never use finalize anymore and use Andrei's tip with tap.
I was just lucky that I kept a console.log in finalize, I saw it for a long time, that the two last finalize came at the same time (ending of the outer subscription). Because I didn't had in some branches (if/else) within the expand no console.log, before doubting the correctness of them having two finalize at the same time (and the others be called at the "wrong" time).
I would urge you, to add this changing behavior, when adding a delay, to the documentation.
EDIT: The last SB in my previous post was not saved, I updated it, showing the side-effect, explanation inside SB
https://stackblitz.com/edit/typescript-pf9zt7?file=index.ts
I would urge you, to add this changing behavior, when adding a delay, to the documentation.
I think the _unexpected_ behavior will occur whenever you're dealing with observables that don't emit in the same tick they are created. For example: delay will emit the value in the next tick, a click$ observable will emit sometime in the future, but not in this tick.
What I'm trying to emphasize is that it's not finalize()'s fault, it's just how mergeMap handles inner observables. If one inner obsevable completes, it will first notify the parent(mergeMap), which may cause a new inner observable to be created(if the buffer is not empty), then the inner subscriber will unsubscribe.
The buffer will be empty if everything is synchronous.
@FritzHerbers I'll have another look at this tomorrow, when I am less tired. I think this is the same issue that was raised in https://github.com/ReactiveX/rxjs/issues/4138 and https://github.com/ReactiveX/rxjs/issues/4931 and discussed in https://github.com/ReactiveX/rxjs/issues/4222
... Andrei's tip with
tap
That won't run the callback on explicit unsubscription - which might or might not be what you want - so it's not the same thing. However, it is possible to write a finalize-like operator that will ensure the callbacks are run in a less surprising order - i.e. that source subscriptions are finalized before their sinks.
@cartant
Did you had the time to look into this issue.
How would the code of such finalize-like operator look like.
@FritzHerbers It's still on my list if things to do. I know exactly what's going on and I'll write the operator tomorrow and explain how it works, etc.
There is a dispose operator here.
The difference between it and finalize is this bit:
unsubscribe() {
super.unsubscribe();
const { callback } = this;
if (callback) {
callback();
this.callback = undefined!;
}
}
This differs from finalize because the finalize operator adds the callback to the Subscription:
It's added in the Subscription constructor, so the callback is the first teardown in the subscription. The subscription to the source is going to be added after that:
And that means that - unlike in dispose - the finalize callback will be called before the unsubscription from the child subscription occurs.
IMO - well, in my changed opinion - this is a bug, but fixing it would be a breaking change. It's also possible that others might not consider this to be a bug.
[Edited] Based on the feedback from @cartant my issue is not the same as author's, but I guess someone might end up here.
I have similar problem too, have a look at following code from StackBlitz:
const getData = () =>
interval(200).pipe(
take(1),
mapTo("Network Data")
);
let loading = true;
const source = getData().pipe(
finalize(() => (loading = false)),
shareReplay()
);
source.pipe(
// when data arrives, loading should be false
tap(data => console.log('loading', loading)) // console output: loading true
).subscribe();
In my example, the loading starts, data is fetched from the network and with the help of finalize loading stops. I use shareReplay to cache the fetched data. When subscription to source happens, and data arrives, I expect loading to be stopped already. However, as seen in example, finalize hasn't been executed yet and loading is still true at this point in the tap.
I know that I can use tap to stop loading, but finalize does more than tap.
I agree with FritzHerbers, when source Observable completes or errors finalize should be executed before any subsequent code starts to execute from the chain.
@gogakoreli your's is not the same problem. Your tap is going to be invoked on a next notification and that is guaranteed to happen before the finalize.
@cartant thanks for the feedback. I made this suggestion because if execution is synchronous finalize happens first and then tap. It behaves differently based on the source Observable. Can you suggest something to overcome this issue and be able to utilize functionality similar to finalize?
@gogakoreli I see what you mean. The behaviour is surprising when the source is synchronous, but it's unrelated to finalize. It's because the destination subscribes to the subject after the subject subscribes to the source.
@cartant I was experimenting with it more and looks like the issue still happens without shareReplay. (shareReplay really doesn't seem to matter for the issue, I wanted to just showcase closely my real example)
@cartant This doesn't look like an issue with finalize, rather it seems to be a side effect of the implementation of mergeMap with a concurrency limit and how it interacts with synchronous observables like of.
In the majority of cases, observables _should not_ be synchronous, however, so this would be a non-issue.
Here's what's playing out:
finalize _always_ happens during the _teardown_ phase, which happens after error or complete are called, OR during unsubscription.concatMap is really mergeMap with a concurrency limit of 1.mergeMap with a concurrency limit:complete event).In the case of the first example in the original post, there's a tap before the delay in the inner observable, and our developer is confused by the outcome. However, this is the expected outcome, and we can't really get it to be any different unless we force microtask scheduling everywhere (ala Promises). In terms of some sort of standardization track maybe that's the right choice... however, in terms of performance, it would hurt us pretty bad.
I'm going to close this issue as "expected behavior". And chalk it up to another case of "synchronous observables produce surprising outcomes because they're synchronous".
I hope my explanation helps, @FritzHerbers.
@benlesh
Thanks for your reply and summarizing the implementation.
From a programmers perspective the "expected behavior" is the one I know from various other programming languages. Not being disrespectful, for me it is "as implemented" or "as designed".
That behavior changes when a delay (and others, which was maybe even added in a very deep stream) is added might not be noticed, is a second "misbehavior" of the issue.
@cartant made a "dispose" operator, hooking up things differently, would this the way to go for finalize or add "dispose" to rxjs?
Didn't try the operator (I noticed some days later it was your own repository), but will, as I was hoping others would comment on your implementation and this issue would resolve.
At the moment we are using the "extended" tap proposed by @Andrei0872, which works just fine and "as expected". We are not interested that the callback runs explicitly on unsubscription, it just needs to be called at the end of the "block".
I see @cartant ... so it's called during teardown, but we should be calling it last, I agree. The fix for this should be as simple as moving the addition of the callback to the operator call instead of in the constructor.
It _is_ a breaking change, however I doubt it will break many people because it'll only really be noticeable in synchronous cases, or cases with multiple finalize actions touching shared state in ways that don't behave well if they're out of order. Seems unlikely.
@FritzHerbers
Didn't try the operator (I noticed some days later it was your own repository), but will, as I was hoping others would comment on your implementation and this issue would resolve.
It's in the rxjs-etc repo 'cause it's a breaking change and any changes to finalize won't be released until the next major version. It's MIT licensed, so if it solves a problem for you, you can copy dispose and the licence to your project, etc. without having to wait for the next major version.
As for the change to finalize, see #5433.
Most helpful comment
@cartant This doesn't look like an issue with
finalize, rather it seems to be a side effect of the implementation of mergeMap with a concurrency limit and how it interacts with synchronous observables likeof.In the majority of cases, observables _should not_ be synchronous, however, so this would be a non-issue.
Here's what's playing out:
finalize_always_ happens during the _teardown_ phase, which happens aftererrororcompleteare called, OR duringunsubscription.concatMapis reallymergeMapwith a concurrency limit of1.mergeMapwith a concurrency limit:a. when an inner observable completes, during that complete step will synchronously check to see if there is another value buffered.
b. If there is a buffered value, it will _synchronously_ "project" it with the mapping function, and subscribe to the returned observable.
c. IF that returned observable is synchronous, it will emit before the inner observable gets to the teardown (because we're still technically synchronously in the first inner observable's
completeevent).In the case of the first example in the original post, there's a
tapbefore thedelayin the inner observable, and our developer is confused by the outcome. However, this is the expected outcome, and we can't really get it to be any different unless we force microtask scheduling everywhere (ala Promises). In terms of some sort of standardization track maybe that's the right choice... however, in terms of performance, it would hurt us pretty bad.I'm going to close this issue as "expected behavior". And chalk it up to another case of "synchronous observables produce surprising outcomes because they're synchronous".
I hope my explanation helps, @FritzHerbers.