Rxjava: 2.x: Best way to unsubscribe/dispose a Subject subscription?

Created on 29 Aug 2016  路  3Comments  路  Source: ReactiveX/RxJava

Since there is no longer a Subscription returned when subscribing a Subject to an Observable, looking for the best way unsubscribe/dispose.

Observable<Long> observableA = Observable.interval(200, TimeUnit.MILLISECONDS).map(value -> value);
Observable<Long> observableB = Observable.interval(200, TimeUnit.MILLISECONDS).map(value -> value * 100L);

BehaviorSubject<Long> subject = BehaviorSubject.create();
subject.subscribe(System.out::println);

System.out.println("Subscribing to observableA");
observableA.subscribe(subject);

TimeUnit.SECONDS.sleep(3);

// TODO: How to unsubscribe the subject from upstream observableA before subscribing it to upstream observableB?

System.out.println("Subscribing to observableB");
observableB.subscribe(subject);

TimeUnit.SECONDS.sleep(3);

What I've come up with so far is to subscribeWith an object that extends DisposableObserver that forwards onNext, onError, onComplete to the subject and use that to dispose() at the appropriate time.

Question

Most helpful comment

Yes, that works, also subscribe(subject::onNext) etc. where you get back a Disposable too.

All 3 comments

Yes, that works, also subscribe(subject::onNext) etc. where you get back a Disposable too.

Works for me, thanks. Was just trying to make sure I wasn't overlooking something obvious.

In 2.x, exposing the dispose() method is the choice of the consumer Observer; this saves us a lot of allocation when such feature is not required.

Was this page helpful?
0 / 5 - 0 ratings