Rxjs: No multicasting when using share

Created on 1 Sep 2016  路  7Comments  路  Source: ReactiveX/rxjs

Version 5.0.0-beta.11

The following code works as expected

(() => {
const x = Rx.Observable.of(1).do(x => console.log('side effect'));
const y = x.publish().refCount(1);
y.subscribe(console.log)
y.subscribe(console.log)
})()
// > side effect
// > 1

I read in the docs that share is an alias for publish().refCount(). However, using share I am seeing different behaviour (no multicasting):

(() => {
const x = Rx.Observable.of(1).do(x => console.log('side effect'));
const y = x.share();
y.subscribe(console.log)
y.subscribe(console.log)
})()
// > side effect
// > 1
// > side effect
// > 1

--

Related issues:

Most helpful comment

@OliverJAsh The reason you're seeing this behavior is because your source synchronously completes.

About the first example: multicast connects the Subject to the source on your first subscription to refCount(). The source emits one value then completes (synchronously). When the Subject is completed by the source, its now forever in the stopped state. It will ignore values emitted to it, and automatically complete any new Subscribers. This is why the second subscription in your first example doesn't hear a value, and doesn't cause a new subscription to the source. Since you used a Subject instance, multicast is forced to use the same Subject as before, which is stopped.

In your second example multicast has a Subject factory method. When you first subscribe to refCount(), multicast calls the factory method to get the Subject instance to connect with. When the source synchronously completes, the refCount drops to 0, and the Subject is dereferenced. The second subscription ups the refCount from 0 to 1 again, and multicast uses the Subject factory to get a brand new Subject to use for this connection.

This behavior is more consistent with functional idioms of Observables, as we've eliminated the hidden state that forever blocks re-subscription, and is essential (for example) in retrying and repeating shared Observables.

If you change your example source to async (or manually connect yourself, instead of relying on refCount()), you'll see the subscriptions do indeed share the same underlying subscription to the source:

const x = Rx.Observable.of(1).do(x => console.log('side effect'));
const y = x.multicast(() => new Rx.Subject());
y.subscribe(console.log)
y.subscribe(console.log);
y.connect();
// > side effect
// > 1
// > 1

/* ~or~ */

const x = Rx.Observable.timer(100).do(x => console.log('side effect'));
const y = x.multicast(() => new Rx.Subject()).refCount();
y.subscribe(console.log)
y.subscribe(console.log);
// > side effect
// > 0
// > 0

All 7 comments

share is an alias of multicast with a Subject selector (so it can support repeating and retrying) and refCount.

The shared instance of Observable.of(1).do(...) synchronously emits and completes upon the first subscription. When the second subscription to the shared Observable happens, multicast recreates the internal Subject instance and resubscribes to the source Observable.

If your source doesn't complete synchronously, you'll see share working as expected:

import { Observable } from 'rxjs';

const x = Observable.timer(100).do(x => console.log('side effect'));
const y = x.share();

y.subscribe(console.log.bind(console, 'A:'))
y.subscribe(console.log.bind(console, 'B:'))
// > side effect
// > A: 0
// > B: 0

Thanks, that makes sense. Could the documentation on share be updated to
replace where it says share is an alias for publish + refCount?

@oliverjash yes, it probably should be. It was my understanding that share _was_ an alias for publish + refCount for backwards compatibility, but @blesh would know more.

To be honest, the publish variants (that don't use selectors) are not as useful in a functional system (the shared Subject can't be invalidated, so subscribing after error or completion doesn't re-subscribe to the source), and only included for backwards compatability. I don't use publish for that reason.

The docs are still misleading on this鈥擨 just got tripped up by the very same thing. Can we fix the docs?

Upon further investigation, publish:

multicast(new Subject())

and share:

multicast(() => new Subject()).refCount();

The code difference between publish().refCount() and .share() is very subtle:

  • with publish, we pass a subject to multicast
  • with share, we pass a subject factory function to multicast

Why do these argument types lead to different behaviour, and is this documented?

For example:

(() => {
const x = Rx.Observable.of(1).do(x => console.log('side effect'));
// identical to `publish().refCount()`
const y = x.multicast(new Rx.Subject()).refCount()
y.subscribe(console.log)
y.subscribe(console.log)
})()
// => side effect
// => 1
(() => {
const x = Rx.Observable.of(1).do(x => console.log('side effect'));
// identical to `share()`
const y = x.multicast(() => new Rx.Subject()).refCount()
y.subscribe(console.log)
y.subscribe(console.log)
})()
// => side effect
// => 1
// => side effect
// => 1

@OliverJAsh The reason you're seeing this behavior is because your source synchronously completes.

About the first example: multicast connects the Subject to the source on your first subscription to refCount(). The source emits one value then completes (synchronously). When the Subject is completed by the source, its now forever in the stopped state. It will ignore values emitted to it, and automatically complete any new Subscribers. This is why the second subscription in your first example doesn't hear a value, and doesn't cause a new subscription to the source. Since you used a Subject instance, multicast is forced to use the same Subject as before, which is stopped.

In your second example multicast has a Subject factory method. When you first subscribe to refCount(), multicast calls the factory method to get the Subject instance to connect with. When the source synchronously completes, the refCount drops to 0, and the Subject is dereferenced. The second subscription ups the refCount from 0 to 1 again, and multicast uses the Subject factory to get a brand new Subject to use for this connection.

This behavior is more consistent with functional idioms of Observables, as we've eliminated the hidden state that forever blocks re-subscription, and is essential (for example) in retrying and repeating shared Observables.

If you change your example source to async (or manually connect yourself, instead of relying on refCount()), you'll see the subscriptions do indeed share the same underlying subscription to the source:

const x = Rx.Observable.of(1).do(x => console.log('side effect'));
const y = x.multicast(() => new Rx.Subject());
y.subscribe(console.log)
y.subscribe(console.log);
y.connect();
// > side effect
// > 1
// > 1

/* ~or~ */

const x = Rx.Observable.timer(100).do(x => console.log('side effect'));
const y = x.multicast(() => new Rx.Subject()).refCount();
y.subscribe(console.log)
y.subscribe(console.log);
// > side effect
// > 0
// > 0

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

OliverJAsh picture OliverJAsh  路  3Comments

samherrmann picture samherrmann  路  3Comments

LittleFox94 picture LittleFox94  路  3Comments

shenlin192 picture shenlin192  路  3Comments

matthewwithanm picture matthewwithanm  路  4Comments