RxJS version:
5.5.2
Code to reproduce:
This is more of a question so I don't have a reproducible example.
Expected behavior:
Same as above.
Actual behavior:
Same as above.
Additional information:
At first, I encountered what I thought was a bug with the shareReplay operator, but after finding an older issue that brought up the same concerns, I discovered that it was a misunderstanding on my part about how shareReplay was supposed to work.
However, in the process, I dug through some of the source code and now I have a question about the teardown logic in the shareReplay operator:
Since the shareReplay operator does not create its own Subscriber, it looks like that teardown logic isn't getting attached to anything. Will the teardown logic ever get called?
Thanks for all the hard work!
The teardown logic is called every time an observer unsubscribes. Just this small part gets called only if the source Observable completed https://github.com/ReactiveX/rxjs/blob/master/src/operators/shareReplay.ts#L47.
Thanks for the quick response.
Can you elaborate — for shareReplay specifically, how does any Subscriber access the teardown function?
My understanding is that the teardown function needs to get attached to a Subscriber in order for it to be called during an unsubscribe. That attachment happens during Observable.subscribe at L200 to L204:
/* L200 */ if (operator) {
/* L201 */ operator.call(sink, this.source);
/* L202 */ } else {
/* L203 */ sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
/* L204 */ }
When we use shareReplay, L201 executes, so we'll be executing the shareReplayOperation function. That function returns the teardown function, but the teardown function does not subsequently get used.
Many other operators instantiate a Subscriber object, which takes care of attaching the teardown function in its constructor, but shareReplay is not instantiating a Subscriber, so what is the mechanism by which its teardown function gets attached?
Hmm, I think you might be right. This line https://github.com/ReactiveX/rxjs/blob/master/src/operators/shareReplay.ts#L43 should be probably this:
this.add(() =>
refCount--;
...
};
instead of this:
return () => {
refCount--;
...
};
Actually, as the this context is already a Subscriber, the innerSub subscription seems unnecessary -
the subscription to the ReplaySubject will be added to the Subscriber when this is passed to subscribe. Unsubscription from the subject will occur when the Subscriber is unsubscribed. So the implementation could be:
subject.subscribe(this);
this.add(() => {
refCount--;
if (subscription && refCount === 0 && isComplete) {
subscription.unsubscribe();
}
});
I was tempted to write a PR for this, but with everything of interest in a closure, I can't see how to write an initial, failing test.
It's also awkward as the unsubscription from the source can only occur if the source has completed - at which time the unsubscription is redundant. As far as I can see, the refCount is unnesessary. In fact, the entire teardown looks to be redundant, to me.
I think this demo shows not exactly the problem described here but it's not would I expect to happen:
Observable.interval(250)
.finally(i => console.log('finally 1'))
.shareReplay(1)
.take(5)
.finally(i => console.log('finally 2'))
.subscribe(console.log);
This prints the following output:
1
2
3
4
finally 2
I'd expect that finally 1 would be called too but it never does (even when I used this.add with the dispose callback like I mentioned above). This is because isComplete is false when it's called.
I understand this was the point of of PR #2910 but it requires me to always complete the chain before using shereReplay (.take(5).shareReplay(1) instead of .shareReplay(1).take(5)) which is hard to achieve in practise.
Is there any resolution on this issue? I've bumped into exactly the same thing yesterday and was wondering why it doesn't work...
@martinsik My two cents: I think it's all down to refCount not pleasing all of the people all of the time -
and, now, the reference count is essentially ignored for shareReplay. If you've not seen it, Paul Taylor made an interesting comment on refCount here.
However, I, too, would be interesting in knowing whether the problem you've highlighted was considered when PR #2910 was made. The change in behaviour was a surprise to me.
The refCount is still relevant for subscribing the inner observable, no? So I don't think it's entirely ignored. I actually came to find that what I was looking for in #3238 is exactly the new behavior of shareReplay*: connect on subscribe, cache values even if the refCount goes to 0, but make the observable retryable in case of errors.
In my case this is exactly how caching is supposed to work because I want to cache values even if there are no subscribers for a little while.
*) Going off of the descriptions, I haven't yet tested it.
FYI, in my case I complete the chain by using takeUntil with an observable that emits when the cache should be torn down.
@Airblader As far as I can see, refCount is only used to control the unsubscription from the source. However, the unsubscription is only allowed if the source has completed. This seems redundant, as if the source has completed (or errored) the subscription will have been automatically unsubscribed, as that's part of the observable contract.
As far as I can see, refCount is only used to control the unsubscription from the source.
I hadn't actually checked the source yet and just assumed the refCount was also used to ensure connecting on the first subscription (since refCount() as an operator does exactly that).
This seems redundant, as if the source has completed (or errored) the subscription will have been automatically unsubscribed, as that's part of the observable contract.
Point taken and agreed. :-)
Closing this in favour of https://github.com/ReactiveX/rxjs/issues/3336.