Rxjava: Is there any way to get a Scheduler from OnSubscribe function?

Created on 4 Mar 2014  路  16Comments  路  Source: ReactiveX/RxJava

Hello.

My Observable is very simple and looks like:

Observable.create(new Observable.OnSubscribe<Content>() {
    @Override
    public void call(final Subscriber<? super Content> subscriber) {
        final ContentObserver observer = new ContentObserver() {
            @Override
            public void onContentChanged() {
                subscriber.onNext(loadContent());
            }
        };

        subscriber.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                unregisterContentObserver(observer);
            }
        }));

        registerContentObserver(observer);
        subscriber.onNext(loadContent());
    }
});

loadContent is a heavy operation and have to be executed on the background thread (which is provided by the Scheduler that I pass to subscribeOn method).

PROBLEM
onContentChanged is always invoked from the UI thread and this cannot be changed, so I need to reschedule loadContent call to the background thread, but I failed to find a way to get a current Scheduler.

What is the right way to do it?

Most helpful comment

I think @benjchristensen nailed it.
I believe Schedulers.current() could be very useful for wrapping some non-rx code that is switching threads for some reason. Happens on Android often, when callback is called on main thread. Consider this example:

private static class MyOnSubscribe implements ObservableOnSubscribe<Foo> {
    @Override
    public void subscribe(ObservableEmitter<Foo> emitter) throws Exception {
        // worker thread
        callbackManager.registerCallback( //some sdk method that always delivers on main
                new Callback() {
                    @Override
                    public void onSuccess(@NonNull Foo result) {
                        // Main Thread. How to move onSuccess() back to worker?
                        emitter.onSuccess(result);
                    }
                }
         ...
        );
    }
}

Yes, I can just leave it as it is, let it deliver on main and fix it later with observeOn. But it just feels right to do it here, rather than outside.

EDIT: onSubscribe --> observeOn

All 16 comments

Is it possible to move loadContent to Subscriber? Then you can use observeOn to dispatch the messages to a Scheduler.

so I need to reschedule loadContent call to the background thread, but I failed to find a way to get a current Scheduler.

You said onContentChanged is always invoked from the UI thread, so what do you mean of "current Scheduler"? If you just want to reschedule loadContent call to a background thread, you can use Schedulers.io(), Schedulers.computation(), or Schedulers.newThread().

Is it possible to move loadContent to Subscriber? Then you can use observeOn to dispatch the messages to a Scheduler

Could you please elaborate?

what do you mean of "current Scheduler"

By "current Scheduler" I mean the Scheduler I passed to the subscribeOn method. I know that I can make the Observable know about the Scheduler and the code will be like:

final Scheduler scheduler = Schedulers.executor(Executors.newFixedThreadPool(3));
final Observable<Content> observable = Observable.create(new Observable.OnSubscribe<Content>() {
    @Override
    public void call(final Subscriber<? super Content> subscriber) {
        final ContentObserver observer = new ContentObserver() {
            @Override
            public void onContentChanged() {
                scheduler.schedule(new Action1<Scheduler.Inner>() {
                    @Override
                    public void call(Scheduler.Inner inner) {
                        subscriber.onNext(loadContent());
                    }
                });
            }
        };

        subscriber.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                unregisterContentObserver(observer);
            }
        }));

        registerContentObserver(observer);
        subscriber.onNext(loadContent());
    }
}).subscribeOn(scheduler);

I'm just interested if it's possible to make this knowledge about the Scheduler more implicit.

Could you please elaborate?

I'm not sure if the following example is what you want.

        Observable.create(new Observable.OnSubscribe<Void>() {
            @Override
            public void call(final Subscriber<? super Void> subscriber) {
                final ContentObserver observer = new ContentObserver() {
                    @Override
                    public void onContentChanged() {
                        subscriber.onNext(null);
                    }
                };

                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        unregisterContentObserver(observer);
                    }
                }));

                registerContentObserver(observer);
                subscriber.onNext(null);
            }
        }).observeOn(scheduler).map(new Func1<Void, Content>() {

            @Override
            public Content call(Void ignored) {
                return loadContent();
            }

        });

@zsxwing Don't like the solution, but it does what I need. Thank for the help

@mironov-nsk Are you suggesting some kind of ThreadLocal that would allow retrieving the current Scheduler if running within one? If so, that's an interesting idea.

It could be something like Schedulers.current() which could return null if not inside one (or Immediate perhaps which is a no-op so as to not worry about null) or the real Scheduler.

I'm not 100% certain it would work, but it would be interesting to explore. It could be useful for trampolining and recursive scheduling, particularly when subscribeOn is used. I don't have time to pursue this right now, but if you or anyone else wants to play with it and propose something for review I'd be interested.

@benjchristensen yes, something like Schedulers.current() is really what I want

Was too curious to not try ... here's a simple experiment with NewThreadScheduler: https://github.com/Netflix/RxJava/pull/946/files

Here it is working with subscribeOn:

      Observable.create(new OnSubscribe<Integer>() {

           @Override
           public void call(final Subscriber<? super Integer> s) {
               c.incrementAndGet();
               Schedulers.current().schedule(new Action1<Inner>() {

                   @Override
                   public void call(Inner t1) {
                       c.incrementAndGet();
                       s.onCompleted();
                   }

               });
           }

       }).subscribeOn(Schedulers.newThread()).subscribe(ts);

This would be great. I literally needed something like this 2 days ago. I
ended up injecting the scheduler instead.

Dylan Sale
On 6 Mar 2014 17:10, "Ben Christensen" [email protected] wrote:

Here it is working with subscribeOn:

  Observable.create(new OnSubscribe<Integer>() {

       @Override
       public void call(final Subscriber<? super Integer> s) {
           c.incrementAndGet();
           Schedulers.current().schedule(new Action1<Inner>() {

               @Override
               public void call(Inner t1) {
                   c.incrementAndGet();
                   s.onCompleted();
               }

           });
       }

   }).subscribeOn(Schedulers.newThread()).subscribe(ts);

Reply to this email directly or view it on GitHubhttps://github.com/Netflix/RxJava/issues/941#issuecomment-36829181
.

A bit of a necro but I would also find this to be a useful feature. I'd like to be able to do some work in an inner Observable (or an Operator) and return control flow to the original subscribeOn thread when the work is done. Schedulers.tramploline() seemed promising, but when I tested it it seemed to capture the current thread context too late -- at the time of schedule and not the time of createWorker.

You can wrap over a single threaded ExecutorService with Schedulers.from() and use it as many times as you wish to put tasks back to the same executor. If you have multiple independent instantiations of the same sequence, you can use defer() and create the single threaded Scheduler in there:

Observable.defer(() -> {
    ExecutorService pool = Execturos.newFixedThreadPool(1);
    Scheduler s = Schedulers.from(pool);
    return Observable.just(1).subscribeOn(s)
              .map(v -> v + 1)
              .observeOn(Schedulers.io())
              .flatMap(v -> Observable.range(v, 2))
              .observeOn(s)
              .doOnNext(System.out::println)
              .doOnUnsubscribe(() -> pool.shutdown());
});

I think @benjchristensen nailed it.
I believe Schedulers.current() could be very useful for wrapping some non-rx code that is switching threads for some reason. Happens on Android often, when callback is called on main thread. Consider this example:

private static class MyOnSubscribe implements ObservableOnSubscribe<Foo> {
    @Override
    public void subscribe(ObservableEmitter<Foo> emitter) throws Exception {
        // worker thread
        callbackManager.registerCallback( //some sdk method that always delivers on main
                new Callback() {
                    @Override
                    public void onSuccess(@NonNull Foo result) {
                        // Main Thread. How to move onSuccess() back to worker?
                        emitter.onSuccess(result);
                    }
                }
         ...
        );
    }
}

Yes, I can just leave it as it is, let it deliver on main and fix it later with observeOn. But it just feels right to do it here, rather than outside.

EDIT: onSubscribe --> observeOn

@Ghedeon This is what observeOn stands for:

Observable.create(new MyOnSubscribe())
.observeOn(Schedulers.computation())
.map(...)
.filter(...)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);

@akarnokd Right, no question it's gonna work. It just bothers me, that it's not clean, internal implementation introduced some mess and I'm exposing this crooked behavior of MyOnSubscribe and make consumers of this class deal with it. Adding comments like "Note: always on main", etc. Also, normally I would pass this Observable up, without having a single observeOn/subscribeOn and at the very end inject a Scheduler (using DI). But now I'm forced to inject it much earlier and in many places. It kinda changes the flow, it's subtle but definitely, makes a difference.

Sorry for digging up the thread, but we're facing the same issue as OP -- callback is called on a different thread than the subscription, and we'd like to emit value on the original subscribeOn scheduler anyway. Did anything change with RxJava2? I don't believe https://github.com/ReactiveX/RxJava/issues/941#issuecomment-125480042 is sufficient if we really care about going back to the same scheduler without having to inject it manually. I can also open a new issue if that's more appropriate

Was this page helpful?
0 / 5 - 0 ratings