Rxjava: Ignore(don't save reference to) disposable returning after subscription. RxJava2

Created on 19 Jul 2017  路  16Comments  路  Source: ReactiveX/RxJava

Hello, and thx for your great work. I'm Android developer and I faced with some situation. I use Observable.create(...) to wrap listeners and move into reactive-world:

  Observable.create(emitter -> {
            SharedPreferences sharedPreferences = getSharedPreferences();
            SharedPreferences.OnSharedPreferenceChangeListener listener = (sharedPreferences, key) -> emitter.onNext(sharedPreferences.getBoolean("Some key", false));
            sharedPreferences.registerOnSharedPreferenceChangeListener(listener);
            emitter.setCancellable(() -> sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener));
        });

The problem is, this listener is stroring as WeakReference at SharedPreferences.
In that case, only Disposable, returning after Observable.subscribe(...) method will save strong-reference to this listener.

Sometimes it does not neccessary for me to persist that Disposable, because i won't dispose() my subscription. But in this case( don't store reference to Disposable), my listener will be cleared by GC and my Observable stops emit items.

My question: Whether it is valid to not store Disposable, returning after subscribe(...) or not in common case.
Or my code is wrong and I need to create some proxy-class, that will store strong-references in this case

Thx!

2.x Android Question

Most helpful comment

@akarnokd Nice solution! in our project we decided to go for SharedPrefs decorator that stores strong refs

All 16 comments

You have to retain a strong reference to the listener in your case of which the easiest way is to store the Disposable of the subscription in a strong reference (as it practically does the same thing as a proxy wrapper would at this point).

  • I know, undestand and wrote about saving listener inside strong reference at _Disposable_
  • Storing strong-referenceto listener at _Disposable_ is a native behaviour in such cases, because(as i mention earlier), it storing by using _Emitter#setCancellable(...)_

And so, my question was: Whether it is valid to not store Disposable( returning after Observable#subscribe(...))in common case

I work at team, and we want to maintain some rules during developing. Necessity(Or lack of such necessity) of mandatory preservation strong-reference to disposable is one of such rules.

On Android, you have to store that Disposable to support cancelling the sequence in case the activity is stopped or paused. The usual issue is opposite to your situation, namely the strong reference may leak the activity beyond its lifecycle. From this perspective, the fact that registerOnSharedPreferenceChangeListener uses weak references and unless you store a reference to the Observer/Disposable, the setup will end up being GCd, the objective here is to have a working solution counteracting this property and not the philosophical question of whether storing Disposables is good practice or not.

The counter-question is, how would you handle the situation without RxJava and just by interacting with the listener the traditional Android way?

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@akarnokd Hello! I'd like to rephrase original question: doesn't observable contract guarantee that its subscriptions must work with no regard to whether client code stores references to them or not?

How do you define "must work"? If you don't store references, you can't call dispose thus can't stop an ongoing Observable flow.

@akarnokd I'd define it as follows: subscription works while its observer receives items it has subscribed to

Yes. There are two cases to consider:

  • Running a synchronous source will work because the current thread will keep references to it through the end Observable.subscribe() call.
  • Running asynchronous source with standard schedulers will keep a reference to the whole chain from the backing ScheduledExecutorService's thread as the task it executes has strong references to the inner components of the Observable operators.

This guarantee actually comes from Java, because otherwise everything would break that somehow doesn't store reference in somewhere reachable from static roots.

Ok, let's see code, that subscribes for settings and show/hide error:

  private void someMethod() {
        observePreferences().subscribe(someValue -> someValue ? showNotification(),
                hideNotifiaction());
    }

At that part of code I ignored disposable, returned after Observable#subscribe(...) call, because I'm not interested in disposing subscription. Also, I have not any thread switching, thus not using schedulers.
observePreferences() can be realized at this manner:


 private Observable<Boolean> observePreferences() {
        return Observable.create(emitter -> {
            SharedPreferences sharedPreferences = getSharedPreferences();
            SharedPreferences.OnSharedPreferenceChangeListener listener = (sharedPreferences, key) -> emitter.onNext(sharedPreferences.getBoolean("some_pref_key", false));
            sharedPreferences.registerOnSharedPreferenceChangeListener(listener);
            emitter.setCancellable(() -> sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener));
        })
    }

Now, let's see io.reactivex.internal.operators.observable.ObservableCreate.java:

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

As you can see, only place, where disposable is storing is
_observer.onSubscribe(parent);_

Also, as we can see at Observable's

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;

results disposable is not storing at any places.

Thus, at that implementation, no one storing disposable after subscription. Thus, weak listener will be removed by GC soon and my subscriber/observer/consumer will not receive events after that.

Is registerOnSharedPreferenceChangeListener RxJava ? Let's see a simplified example:

WeakReference<Integer> wr = new WeakReference<>(123456789);

Observable<Integer> obs = Observable.fromCallable(() -> wr.get());

System.gc();
Thread.sleep(200);

obs.test()
  .assertFailure(NullPointerException.class);

Unsurprisingly, RxJava failed to keep 123456789 alive.

RxJava can't change such weak sources into strong ones and also can't make you hold strong references to its own components.

Your case has nothing to do with RxJava. I assume the weak reference property of registerOnSharedPreferenceChangeListener was there all along and when you manually registered a listener, you kept a strong reference to the listener somewhere until it could be released. Now RxJava is taking the place of the listener thus you have to keep a strong reference of the observer or disposable you use/get back to prevent losing the listener. (This is no contradiction to my answer to @einmalfel question as he didn't specify what sources he meant and my answer assumed standard RxJava sources that don't have any weak references anywhere near them.)

@akarnokd so, does the observable implementation in question violate Observable contract?

Let's see:

  • onSubscribe called? Yes, by create itself
  • onXXX methods called sequentially? Yes, the sharedPreferences signals from the main thread, right?
  • onXXX stops if the consumer disposes? Yes, the setCancellation call after the register call ensures this.

Verdict: contract honored.

If the question had been, how to work around such weak listeners, that's a more interesting question. For example, have a companion task that keeps a reference to the listener as part of the Observable setup:

```java
private Observable observePreferences() {
return Observable.create(emitter -> {
SharedPreferences sharedPreferences = getSharedPreferences();
SharedPreferences.OnSharedPreferenceChangeListener listener =
(sharedPreferences, key) ->
emitter.onNext(sharedPreferences.getBoolean("some_pref_key", false));
sharedPreferences.registerOnSharedPreferenceChangeListener(listener);

        Disposable d = Schedulers.io().schedulePeriodicallyDirect(
           () -> log(listener.toString()), 100000, TimeUnit.DAYS);

        emitter.setCancellable(() -> 
              sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener);
              d.dispose();
        });
  });

}

@akarnokd Nice solution! in our project we decided to go for SharedPrefs decorator that stores strong refs

In a similar scenario, I used a IdentityHashMap to store the references, and removed the references on cancellation.

Was this page helpful?
0 / 5 - 0 ratings