Rxjava: How can I get my Observable to unsubscribe?

Created on 24 Aug 2016  路  4Comments  路  Source: ReactiveX/RxJava

I am trying to understand how to unsubscribe from an Observable that is created from a live feed.
Here is more or less the code:

SomeFeed feed = new SomeFeed();
Observable<PriceTick> observable = Observable.create(s ->
  feed.register(new SomeListener() {
    @Override
    public void priceTick(PriceTick event) {
      s.onNext(event);
    }

    @Override
    public void error(Throwable throwable) {
      s.onError(throwable);
    }
  })
);
Subscription subscription = observable.subscribe(System.out::println);
subscription.unsubscribe();
System.out.println("Is unsubscribed:" + subscription.isUnsubscribed()); // prints true

I am finding that after the subscription is unsubscribed, the subscribed is still outputting the event stream.

How can I get the unsubscribe to remove the subscriber from the notifications?

Question

Most helpful comment

Use Observable.fromEmitter(). The javadoc has an example how to setup a listener-based source:

Observable.<Event>fromEmitter(emitter -> {
     Callback listener = new Callback() {
         @Override
         public void onEvent(Event e) {
             emitter.onNext(e);
             if (e.isLast()) {
                 emitter.onCompleted();
             }
         }

         @Override
         public void onFailure(Exception e) {
             emitter.onError(e);
         }
    };

    AutoCloseable c = api.someMethod(listener);

    emitter.setCancellable(c::close);

}, BackpressureMode.BUFFER);

All 4 comments

Add a check for s.isUnsubscribed() before emitting values. But you should consider using other alternatives for creating an Observable, e.g. Observable.fromCallable() etc.

check for unsubscribed works, thanks. But how would I use fromCallable() to convert an existing feed to a hot observable?

Use Observable.fromEmitter(). The javadoc has an example how to setup a listener-based source:

Observable.<Event>fromEmitter(emitter -> {
     Callback listener = new Callback() {
         @Override
         public void onEvent(Event e) {
             emitter.onNext(e);
             if (e.isLast()) {
                 emitter.onCompleted();
             }
         }

         @Override
         public void onFailure(Exception e) {
             emitter.onError(e);
         }
    };

    AutoCloseable c = api.someMethod(listener);

    emitter.setCancellable(c::close);

}, BackpressureMode.BUFFER);

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

yubaokang picture yubaokang  路  3Comments

dzharikhin picture dzharikhin  路  4Comments

archenroot picture archenroot  路  3Comments

dsvoronin picture dsvoronin  路  4Comments

gfx picture gfx  路  3Comments