Rxjava: published GroupedObservable emitted from groupBy() only emits a single item

Created on 19 Jun 2015  路  6Comments  路  Source: ReactiveX/RxJava

PublishSubject<Triplet<Integer, Observable<String>, Observable<Integer>>> subject =
    PublishSubject.create();
Observable<Triplet<Integer, Observable<String>, Observable<Integer>>> grouped =
    subject.groupBy(Triplet::getValue0).flatMap(
        tripletsById -> tripletsById.publish(pub -> Observable.just(Triplet.create(
            tripletsById.getKey(),
            pub.flatMap(Triplet::getValue1),
            pub.flatMap(Triplet::getValue2)))));
grouped.subscribe(triplet -> System.out.println("New triplet: " + triplet.getValue0()));
subject.onNext(Triplet.create(1, Observable.just("A"), Observable.just(1)));
subject.onNext(Triplet.create(1, Observable.just("B"), Observable.just(3)));
subject.onNext(Triplet.create(2, Observable.just("A"), Observable.just(2)));

This code outputs:

New triplet: 1
New triplet: 1
New triplet: 2

If I change the definition of grouped so that tripletsById is not published, I get the expected behavior:

Observable<Triplet<Integer, Observable<String>, Observable<Integer>>> grouped =
    subject.groupBy(Triplet::getValue0).map(tripletsById -> Triplet.create(
        tripletsById.getKey(),
        tripletsById.flatMap(Triplet::getValue1),
        tripletsById.flatMap(Triplet::getValue2)))));

Output:

New triplet: 1
New triplet: 2
Question

Most helpful comment

Here is full code reproducing the issue:

import rx.Observable;
import rx.subjects.PublishSubject;

public class GroupByPublish {

    public static void main(String... args) {
        PublishSubject<Triplet<Integer, Observable<String>, Observable<Integer>>> subject =
                PublishSubject.create();

        Observable<Triplet<Integer, Observable<String>, Observable<Integer>>> grouped =
                subject.groupBy(Triplet::getValue0)
                        .flatMap(tripletsById -> {
                            System.out.println("Received new group: " + tripletsById.getKey());
                            return tripletsById
                                    .doOnCompleted(() -> System.out.println("*** completed publish"))
                                    .doOnUnsubscribe(() -> System.out.println("*** unsubscribing publish"))
                                    .publish(pub ->
                                    Observable.just(Triplet.<Integer, Observable<String>, Observable<Integer>> create(
                                            tripletsById.getKey(),
                                            pub.flatMap(Triplet::getValue1),
                                            pub.flatMap(Triplet::getValue2))
                                            ));
                        });

        grouped.subscribe(triplet -> System.out.println("New triplet: " + triplet.getValue0()));
        subject.onNext(Triplet.create(1, Observable.just("A"), Observable.just(1)));
        subject.onNext(Triplet.create(1, Observable.just("B"), Observable.just(3)));
        subject.onNext(Triplet.create(2, Observable.just("A"), Observable.just(2)));
    }

    public static class Triplet<S1, S2, S3> {

        S1 v0;
        S2 v1;
        S3 v2;

        Triplet(S1 v0, S2 v1, S3 v2) {
            this.v0 = v0;
            this.v1 = v1;
            this.v2 = v2;
        }

        public static <S1, S2, S3> Triplet<S1, S2, S3> create(S1 v0, S2 v1, S3 v2) {
            return new Triplet<S1, S2, S3>(v0, v1, v2);
        }

        public S1 getValue0() {
            return v0;
        }

        public S2 getValue1() {
            return v1;
        }

        public S3 getValue2() {
            return v2;
        }
    }
}

Output:

Received new group: 1
New triplet: 1
*** unsubscribing publish
Received new group: 1
New triplet: 1
*** unsubscribing publish
Received new group: 2
New triplet: 2
*** unsubscribing publish

It's somewhat odd that we get the unsubscribe.

All 6 comments

It looks like the inner publish is unsubscribing from the tripletsById Observable which causes the groupBy to discard the Obserable and create a new one when it see that key again.

I've got it work by not using publish

        Observable<Triplet<Integer, String, Integer>> grouped = subject.groupBy(Triplet::getValue0).flatMap(tripletsById -> {
            return tripletsById.flatMap((triplet) -> triplet.getValue1().flatMap((one) -> triplet.getValue2().map((two) -> Triplet.create(tripletsById.getKey(), one, two))));
        });

@abersnaze Is that expected behavior? Should I just avoid publishing publishing the GroupedObservable?


Another issue has come up that may be related. The following code throws an IllegalStateException:

ArrayList<String> strings = new ArrayList<>();
    PublishSubject<Triplet<Integer, Observable<String>, Observable<Integer>>> subject =
        PublishSubject.create();
    Observable<Triplet<Integer, Observable<String>, Observable<Integer>>> grouped =
        subject.groupBy(Triplet::getValue0).map(
            pub -> Triplet.<Integer, Observable<String>, Observable<Integer>>with(pub.getKey(),
                pub.flatMap(Triplet::getValue1),
                pub.flatMap(Triplet::getValue2)));

    grouped.flatMap(triplet ->
        Observable.just("ID: " + triplet.getValue0()).concatWith(
            triplet.getValue1().map(s -> triplet.getValue0() + " String: " + s).mergeWith(
            triplet.getValue2().map(i -> triplet.getValue0() + " Int: " + i))))
        .subscribe(strings::add);

    subject.onNext(Triplet.with(1, Observable.just("A"), Observable.just(1)));
    subject.onNext(Triplet.with(1, Observable.just("B"), Observable.just(2)));
    subject.onNext(Triplet.with(2, Observable.just("A"), Observable.just(3)));
java.lang.IllegalStateException: Only one subscriber allowed!
  at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124)
  at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81)
  at rx.Observable$1.call(Observable.java:144)
  at rx.Observable$1.call(Observable.java:136)
  at rx.Observable.unsafeSubscribe(Observable.java:7531)
  at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.java:251)
  at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.java:236)
  at rx.Observable$1.call(Observable.java:144)
  at rx.Observable$1.call(Observable.java:136)
  at rx.Observable$1.call(Observable.java:144)
  at rx.Observable$1.call(Observable.java:136)
  at rx.Observable$1.call(Observable.java:144)
  at rx.Observable$1.call(Observable.java:136)
  at rx.Observable.unsafeSubscribe(Observable.java:7531)
  at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:215)
  at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:185)
  at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:120)
  at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:106)
  at rx.Subscriber.setProducer(Subscriber.java:177)
  at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:50)
  at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
  at rx.Observable$1.call(Observable.java:144)
  at rx.Observable$1.call(Observable.java:136)
  at rx.Observable.unsafeSubscribe(Observable.java:7531)
  at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:176)
  at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:140)
  at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
  at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:106)
  at rx.Subscriber.setProducer(Subscriber.java:177)
  at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:50)
  at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
  at rx.Observable$1.call(Observable.java:144)
  at rx.Observable$1.call(Observable.java:136)
  at rx.Observable.unsafeSubscribe(Observable.java:7531)
  at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:215)
  at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:185)
  at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:120)
  at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
  at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
  at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.createNewGroup(OperatorGroupBy.java:310)
  at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:223)
  at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
  at rx.subjects.PublishSubject.onNext(PublishSubject.java:114)

A GroupedObservable is a difficult and special beast coming out of groupBy in order to deal with "time gap" issues. (You can start in this issue to follow the many threads on that issue if you care: https://github.com/ReactiveX/RxJava/issues/844)

It is "hot", yet must support back pressure, and must also support async consumers without losing data, despite being hot (because each group receives data from upstream, it does not control the source). For these reasons it must always be subscribed to by a single subscriber, and can buffer data. Thus it can only be subscribed to once, due to it's hot nature within the lifecycle of the outer groupBy. You can however publish it to multicast it.

More on the specific issue with use of publish coming ...

Here is full code reproducing the issue:

import rx.Observable;
import rx.subjects.PublishSubject;

public class GroupByPublish {

    public static void main(String... args) {
        PublishSubject<Triplet<Integer, Observable<String>, Observable<Integer>>> subject =
                PublishSubject.create();

        Observable<Triplet<Integer, Observable<String>, Observable<Integer>>> grouped =
                subject.groupBy(Triplet::getValue0)
                        .flatMap(tripletsById -> {
                            System.out.println("Received new group: " + tripletsById.getKey());
                            return tripletsById
                                    .doOnCompleted(() -> System.out.println("*** completed publish"))
                                    .doOnUnsubscribe(() -> System.out.println("*** unsubscribing publish"))
                                    .publish(pub ->
                                    Observable.just(Triplet.<Integer, Observable<String>, Observable<Integer>> create(
                                            tripletsById.getKey(),
                                            pub.flatMap(Triplet::getValue1),
                                            pub.flatMap(Triplet::getValue2))
                                            ));
                        });

        grouped.subscribe(triplet -> System.out.println("New triplet: " + triplet.getValue0()));
        subject.onNext(Triplet.create(1, Observable.just("A"), Observable.just(1)));
        subject.onNext(Triplet.create(1, Observable.just("B"), Observable.just(3)));
        subject.onNext(Triplet.create(2, Observable.just("A"), Observable.just(2)));
    }

    public static class Triplet<S1, S2, S3> {

        S1 v0;
        S2 v1;
        S3 v2;

        Triplet(S1 v0, S2 v1, S3 v2) {
            this.v0 = v0;
            this.v1 = v1;
            this.v2 = v2;
        }

        public static <S1, S2, S3> Triplet<S1, S2, S3> create(S1 v0, S2 v1, S3 v2) {
            return new Triplet<S1, S2, S3>(v0, v1, v2);
        }

        public S1 getValue0() {
            return v0;
        }

        public S2 getValue1() {
            return v1;
        }

        public S3 getValue2() {
            return v2;
        }
    }
}

Output:

Received new group: 1
New triplet: 1
*** unsubscribing publish
Received new group: 1
New triplet: 1
*** unsubscribing publish
Received new group: 2
New triplet: 2
*** unsubscribing publish

It's somewhat odd that we get the unsubscribe.

I think the problem is that the just(triplet) isn't actually connected to the published Observable. the triplet just has references to Observables but doesn't subscribe to them before the end of the closure.

Closing due to age.

Was this page helpful?
0 / 5 - 0 ratings