Since there is no longer a Subscription returned when subscribing a Subject to an Observable, looking for the best way unsubscribe/dispose.
Observable<Long> observableA = Observable.interval(200, TimeUnit.MILLISECONDS).map(value -> value);
Observable<Long> observableB = Observable.interval(200, TimeUnit.MILLISECONDS).map(value -> value * 100L);
BehaviorSubject<Long> subject = BehaviorSubject.create();
subject.subscribe(System.out::println);
System.out.println("Subscribing to observableA");
observableA.subscribe(subject);
TimeUnit.SECONDS.sleep(3);
// TODO: How to unsubscribe the subject from upstream observableA before subscribing it to upstream observableB?
System.out.println("Subscribing to observableB");
observableB.subscribe(subject);
TimeUnit.SECONDS.sleep(3);
What I've come up with so far is to subscribeWith an object that extends DisposableObserver that forwards onNext, onError, onComplete to the subject and use that to dispose() at the appropriate time.
Yes, that works, also subscribe(subject::onNext) etc. where you get back a Disposable too.
Works for me, thanks. Was just trying to make sure I wasn't overlooking something obvious.
In 2.x, exposing the dispose() method is the choice of the consumer Observer; this saves us a lot of allocation when such feature is not required.
Most helpful comment
Yes, that works, also
subscribe(subject::onNext)etc. where you get back aDisposabletoo.