Current Behavior
When you reuse a pipe with publish or publishReplay all uses of the pipe share the same instance of ConnectableObservable.
Which causes the connection to connect when the other one connected and gets data from the other source.
const pipeLine = pipe(
map((x: number) => x * x),
publish(),
);
const obs1 = src1.pipe(pipeLine);
const obs2 = src2.pipe(pipeLine);
obs1.subscribe(d => console.log('sub 1', d));
obs2.subscribe(d => console.log('I should not be running', d));
obs1.connect();
This is what you get on the console
sub 1 1
I should not be running 1
sub 1 4
I should not be running 4
sub 1 9
I should not be running 9
sub 1 16
I should not be running 16
sub 1 25
I should not be running 25
obs2 which was never connected has been connected and it's also connected to the wrong source.
Reproduction
https://stackblitz.com/edit/rxjs-nrcyfu
Expected behavior
reusing pipes should not effect which source connects to which subscription.
Only the 1st subscription should run as the 2nd one hasn't been connected yet.
Also the 2nd subscription should not be subscribed to the first observable.
Workaround
Use a factory method instead.
const pipeLine = () => pipe(
map((x: number) => x * x),
publish(),
);
Possible Solution
for publishReplay
https://github.com/ReactiveX/rxjs/blob/259e5cd2e8149bc5db14cc0216afec7131e8aa11/src/internal/operators/publishReplay.ts
Instead of creating the replay subject on line 22 instead it can be created in the call back on line 24. That way each source observable would have it's own replay subject.
for publish
On line 61 we can return a factory that takes the source instead of calling multicast directly.
Yeah, this looks like a bug to me. Thanks.
I have just stumbled on this issue as well in a project I am working on.
I am very confused by this since it appears to be pretty major to me. Some very popular blog posts explaining how those operators work also describe publishReplay(x) as an equivalent to multicast(() => new ReplaySubject(x)). The proposed solution for publishReplay would make this true.
Since you mentioned that the current behavior indeed looks like a bug, I feel like the article should be true... Except that I cannot find any trace of this operator ever working this way before. Unfortunately, the history doesn't go past 2018 so I am not convinced about this.
Would you mind sharing your view on this @cartant ? Is this really the correct behavior? Is using multicast(() => new ReplaySubject()) an appropriate workaround until a fix comes to light?
@kawazoe
Would you mind sharing your view on this cartant ? Is this really the correct behavior?
There is no fix needed for publishReplay's behaviour. It does not restart by design. That is the essential difference between the publish and share family of operators. For reference, see the discussion in #453. It's long, so start at this comment and work backwards.
@cartant I've just got through the entire thread and wow... what a can of worm this is... I definitely did not expected this issue to be as complex when I asked that initial question. It also explain why I felt like their behavior changed at some point.
Thanks a LOT of that pointer. I don't know why I couldn't find that issue initially.
I agree with the final decision, but I think the subtleties of those operators could be better documented. There is a reason why there are many people trying to explain them in blogs. They are not trivial to understand and have multiple edge cases like hot/cold sources, completed or ongoing sources, completed or ongoing subscribers, etc...
... I think the subtitles of those operators could be better documented.
Yeah. It is confusing. The publish variants should also be static and refCount should not be an operator - see https://github.com/ReactiveX/rxjs/pull/5585#issuecomment-657920508
I just want to clarify. This bug has nothing to do with restarting.
This issue is when creating a pipe without a source observable and using pipe to connect an existing observable-less pipe to an observable.
As presented here https://stackblitz.com/edit/rxjs-nrcyfu
const pipeLine = () => pipe(
map((x: number) => x * x),
publishReplay(),
);
The pipeLine should've been able to be reused for multiple source observables, but it is not.
The proposed solution #5411 does not change the restart-ability of it. Once it's bound to an observable it won't be able to restart. What it's accomplishing is moving the creation of the subject into the factory to delay it's creation to when it's bound to the observable as apposed to when the pipe function is called. This solved the ability to share the pipe but does not actually change the behavior once it's bound to an observable.
Most helpful comment
Yeah. It is confusing. The
publishvariants should also be static andrefCountshould not be an operator - see https://github.com/ReactiveX/rxjs/pull/5585#issuecomment-657920508