I am reading IntroToRx .com pararaph Disposal of connections and subscriptions
 .
And it says that if I reconnect to previously disconnected ConncectableObservable it will resume emissions to its subscribers.
I have written a test for it, and it fails because a subscriber gets only the values emitted before the first disconnection. Is it correct behavior and my misunderstanding or it is bug?
@Test
public void reconnectToConnectableObservable() throws InterruptedException, IOException {
final TestScheduler testScheduler = Schedulers.test();
final TestSubscriber<Long> subscriber = TestSubscriber.create();
final TestSubscriber<Long> sideEffectSubscriber = TestSubscriber.create();
final ConnectableObservable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS, testScheduler)
.doOnNext(sideEffectSubscriber::onNext)
.publish();
final Subscription subscription = observable.subscribe(subscriber);
final Subscription firstConnection = observable.connect();
testScheduler.advanceTimeBy(2, TimeUnit.MILLISECONDS);
firstConnection.unsubscribe();
testScheduler.advanceTimeBy(16, TimeUnit.MILLISECONDS);
final Subscription secondConnection = observable.connect();
testScheduler.advanceTimeBy(2, TimeUnit.MILLISECONDS);
secondConnection.unsubscribe();
sideEffectSubscriber.assertValueCount(4); //pass
assertThat(subscription.isUnsubscribed()).isFalse(); //pass
subscriber.assertValueCount(4); //fails
}
_P.S. Help me with correct issue's title_
As far as I can remember, standard ConnectableObservables never worked as a resuming sources. When you unsubscribe one, that connection is done with all subscribers listening on it are let go without further events.
With publish, after an unsubscribe you can set up fresh subscribers and connect will dispatch events to them.
But look at the mentioned link, I found it on http://reactivex.io/documentation/operators/connect.html in See Also section. And the code with the output presented in the linked article conflicts with your words. Which source should people use as the authority when learning new operators?
I've checked Rx.NET and indeed their disconnect doesn't throw away subscribers and reconnecting after the source completed completes all latecommers. Somehow, that property was not captured early on in RxJava tests.
We diverged from Rx.NET and that source quite some time ago so it is expected some properties don't hold with RxJava.
Rx.NET uses multicast through a Subject for its publisher which can be duplicated with some extra work:
PublishSubject<Integer> ps = PublishSubject.create();
ps.subscribe(System.out::println);
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS);
Subscription s = source.subscribe(ps);
Thread.sleep(2000);
s.unsubscribe();
Thread.sleep(1000);
s = source.subscribe(ps);
Thread.sleep(2000);
s.unsubscribe();
Which source should people use as the authority when learning new operators?
Anything that is dedicated to RxJava.
I think your question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
Most helpful comment
I've checked Rx.NET and indeed their disconnect doesn't throw away subscribers and reconnecting after the source completed completes all latecommers. Somehow, that property was not captured early on in RxJava tests.
We diverged from Rx.NET and that source quite some time ago so it is expected some properties don't hold with RxJava.
Rx.NET uses multicast through a Subject for its
publisherwhich can be duplicated with some extra work:Anything that is dedicated to RxJava.