Rxjava: Problem when combining BehaviorSubject and share

Created on 6 Oct 2016  路  5Comments  路  Source: ReactiveX/RxJava

I have a problem when using the BehaviorSubject. I want to share the subject between several observers. But when sharing it, it does not seem like the last item of the BehaviorSubject is emitted to the second subscriber.

Note that if I push another integer through with onNext AFTER I have subscribed the second subscriber, it arrives to both.

Have I misunderstood how it is supposed to work? If so, what are my alternatives?
Below is the test I wrote for this.

I'm using RxJava 1.2.1

Thank you!

@Test
public void behaviorSubjectDoesNotOutputLastItemWhenSharedAndNewSubscriber() {
        TestSubscriber<Integer> testSubOne = TestSubscriber.create();
        TestSubscriber<Integer> testSubTwo = TestSubscriber.create();

        BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);

        Observable<Integer> behaviorObservable = behaviorSubject
                .share();

        behaviorObservable.subscribe(testSubOne);

        behaviorSubject.onNext(2);

        behaviorObservable.subscribe(testSubTwo);

        assertThat(testSubOne.isUnsubscribed()).isFalse();
        assertThat(testSubTwo.isUnsubscribed()).isFalse();

        testSubOne.unsubscribe();
        testSubTwo.unsubscribe();

        final List<Integer> testSubOneEvents = testSubOne.getOnNextEvents();
        assertThat(testSubOneEvents).hasSize(2);
        assertThat(testSubOneEvents.get(0)).isEqualTo(1);
        assertThat(testSubOneEvents.get(1)).isEqualTo(2);

        final List<Integer> testSubTwoEvents = testSubTwo.getOnNextEvents();
        assertThat(testSubTwoEvents).hasSize(1);
        assertThat(testSubTwoEvents.get(0)).isEqualTo(2);
}
Question

Most helpful comment

@HGyllensvard
1) instead of share() you can use .replay(1).refCount()
2) using the subject has no added value here better use smth like RxBroadcastReceiver or fromEmitter or some other lib out there like ReactiveNetwork

All 5 comments

when you apply share() you loose the behavior of BehaviorSubject check RefCount.

you could have get what you want only if you testSubOne.unsubscribe(); before subscribing the second one.

However the main thing is why would you use share() on a Subject.
They are both normally used for multicasting and don't need combining.

Thank you for the quick reply and the link, it's easy to miss some details of the implementation :)

I'm developing for Android, which got Broadcast receivers (you have to extend the BR class), these work as a kind of callback with information about the system, when you register you specify what data you are interested in.

As an example, I'm interested in if the device is connected to a wifi network or not. I use a BehaviorSubject to push out the notifications about the change, to start the stream, and that anyone that subscribes will automatically get the latest known state.

So why do I want to share it?
1) To follow the platform I want to register the BR when first subscribed, and unregister when the last observer unsubscribes to make sure the BR isn't listening to data without anyone being interested in it.
2) I have several different places in the code interested in the same data. I could register several receivers for the same data, but I would like to avoid that.

So I want to do something like this:


behaviorSubject
                  // Making sure these only occur upon first subscription and last unsubscribe
                .doOnSubscribe(registerTheBroadCastReceiver())
                .doOnUnsubscribe(unregisterTheBroadcastReceiver())
                .share()
                // After this I want all subscribers to get the last known value
                .asObservable()
                .subscribeOn(Schedulers.io())

I hope there's a better way to achieve this and that it's just my lack/misunderstanding of Rx skills :)
To progress, I am currently not sharing, and using an AtomicInteger to count the number of subscriptions.

Once again, thank you for the help!

@HGyllensvard
1) instead of share() you can use .replay(1).refCount()
2) using the subject has no added value here better use smth like RxBroadcastReceiver or fromEmitter or some other lib out there like ReactiveNetwork

Hi Again,

I tested at work and the replay and refCount works like a charm, thank you for that!
Hopefully this can also help others stuck in the same situation.

Glad it works :) however I would like to stress again that the using Subject here is not the best choice. If you anyway want to use Subject use PublishSubject

Was this page helpful?
0 / 5 - 0 ratings