I am trying to understand how to unsubscribe from an Observable that is created from a live feed.
Here is more or less the code:
SomeFeed feed = new SomeFeed();
Observable<PriceTick> observable = Observable.create(s ->
feed.register(new SomeListener() {
@Override
public void priceTick(PriceTick event) {
s.onNext(event);
}
@Override
public void error(Throwable throwable) {
s.onError(throwable);
}
})
);
Subscription subscription = observable.subscribe(System.out::println);
subscription.unsubscribe();
System.out.println("Is unsubscribed:" + subscription.isUnsubscribed()); // prints true
I am finding that after the subscription is unsubscribed, the subscribed is still outputting the event stream.
How can I get the unsubscribe to remove the subscriber from the notifications?
Add a check for s.isUnsubscribed() before emitting values. But you should consider using other alternatives for creating an Observable, e.g. Observable.fromCallable() etc.
check for unsubscribed works, thanks. But how would I use fromCallable() to convert an existing feed to a hot observable?
Use Observable.fromEmitter(). The javadoc has an example how to setup a listener-based source:
Observable.<Event>fromEmitter(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onCompleted();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
}, BackpressureMode.BUFFER);
I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
Most helpful comment
Use
Observable.fromEmitter(). The javadoc has an example how to setup a listener-based source: