The following tests fail because the cancel/dispose from the downstream comes before the actual connection is established which leaves the connectable source in a disposed state, refusing to take in new subscribers/observers:
@Test
public void test() {
BehaviorSubject<Integer> subject = BehaviorSubject.create();
Observable<Integer> observable = subject
.replay(1)
.refCount();
observable.takeUntil(Observable.just(1)).test();
subject.onNext(2);
observable.take(1).test().assertResult(2);
}
@Test
public void test2() {
BehaviorProcessor<Integer> processor = BehaviorProcessor.create();
Flowable<Integer> flowable = processor
.replay(1)
.refCount();
// This line causes the test to fail.
flowable.takeUntil(Flowable.just(1)).test();
processor.onNext(2);
flowable.take(1).test().assertResult(2);
}
Thanks for letting us know. Was this discovery prompted by a report or a flash of realization?
@davidmoten This was a bug we found during development. Our actual use case was more complicated but this is what it simplified to.
@akarnokd Thanks for the quick response!
It is from a StackOverflow question: https://stackoverflow.com/questions/53216329/refcount-observable-does-not-emit-on-second-subscription
+1 to @davidmoten 's comment, this is how open source projects should tackle bugs
馃憤 @akarnokd
Fix posted in #6297.