Rxjava: 2.x: refCount doesn't reset its source if one of its subscribers cancels immediately

Created on 8 Nov 2018  路  5Comments  路  Source: ReactiveX/RxJava

The following tests fail because the cancel/dispose from the downstream comes before the actual connection is established which leaves the connectable source in a disposed state, refusing to take in new subscribers/observers:

@Test
public void test() {
    BehaviorSubject<Integer> subject = BehaviorSubject.create();

    Observable<Integer> observable = subject
            .replay(1)
            .refCount();

    observable.takeUntil(Observable.just(1)).test();

    subject.onNext(2);

    observable.take(1).test().assertResult(2);
}


@Test
public void test2() {
    BehaviorProcessor<Integer> processor = BehaviorProcessor.create();

    Flowable<Integer> flowable = processor
            .replay(1)
            .refCount();

    // This line causes the test to fail.
    flowable.takeUntil(Flowable.just(1)).test();

    processor.onNext(2);

    flowable.take(1).test().assertResult(2);
}
2.x Bug

All 5 comments

Thanks for letting us know. Was this discovery prompted by a report or a flash of realization?

@davidmoten This was a bug we found during development. Our actual use case was more complicated but this is what it simplified to.
@akarnokd Thanks for the quick response!

+1 to @davidmoten 's comment, this is how open source projects should tackle bugs

馃憤 @akarnokd

Fix posted in #6297.

Was this page helpful?
0 / 5 - 0 ratings