Rxjava: OperatorDoOnEach do on onStart.

Created on 19 Apr 2015  ·  9Comments  ·  Source: ReactiveX/RxJava

I modify the OperatorDoOnEach code like this:

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> observer) {
        return new Subscriber<T>(observer) {

            private boolean done = false;


            @Override
            public void onStart() {//<<<<this is my add
                super.onStart();
                observer.onStart();
            }

to let onStart is also called when subscribe.

do this make any problem?

I want to do this,
when onStart, I show the progressDialog, and when onError or onComplete I let the progressDialog dismiss. so I need onStart callback.

if i want to implement a operator to show the progress info, how can i do?

Question

Most helpful comment

@guoyoujin
Example you create some Observable like:

Observable<T> yourObservable() {return Observable.create(...)};

Then you can use:
bindDialogLoading(YOUR_DIALOG, yourObservable)

Or you can check this tutorial:
http://blog.danlew.net/2015/03/02/dont-break-the-chain/

All 9 comments

Using onStart is okay, but don't forget you need SwingUtilities.invokeLater() to create, display and destroy the GUI. First of all, you need to determine how values passing through the Subscriber will tell you how far you progressed. If you know the expected total number of events, then you can just count onNexts and update the GUI accordingly.

I add onStart in OperatorDoOnEach it doesn't work, because onStart() is not like onNext which is called in the onSubscribe, so it doesn't call like a chain, I work in android, when onStart to create a progressdialog, and when onComplete or onError, call dialog.dismiss.

@jiqimaogou How about calling onNext(0) before starting the real work? Then you can check the progress in the onNext method. If it's 0, show the progressDialog, and if greater than 0, update the progressDialog.

@jiqimaogou you can show dialog in onStart(). (In this method you need create Handler then get Looper of mainThread).

private static final <T> Observable<T> bindDialogLoading(final Dialog dlg, final Observable<T> source) {
        Assertions.assertUiThread();
        final Observable<T> o = source.observeOn(mainThread());
        return o.lift(new Observable.Operator<T, T>() {
            @Override
            public Subscriber<? super T> call(final Subscriber<? super T> child) {
                return new Subscriber<T>() {
                    @Override
                    public void onStart() {
                        super.onStart();
                        new Handler(Looper.getMainLooper()).post(() -> dlg.show());
                        child.onStart();
                    }

                    @Override
                    public void onCompleted() {
                        dlg.dismiss();
                        child.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        dlg.dismiss();
                        child.onError(e);
                    }

                    @Override
                    public void onNext(T t) {
                        dlg.dismiss(); // Dismiss dialog or update it. Its depend your logic.
                        child.onNext(t);
                    }
                };
            }
        });
    }

Closing due to inactivity.

@quangson91 ,hi, For you the answer, I didn't understand how to use, can you write a demo?

@guoyoujin
Example you create some Observable like:

Observable<T> yourObservable() {return Observable.create(...)};

Then you can use:
bindDialogLoading(YOUR_DIALOG, yourObservable)

Or you can check this tutorial:
http://blog.danlew.net/2015/03/02/dont-break-the-chain/

@quangson91,hi, I can't find the class methods “Assertions.assertUiThread();”

Please comment that line.

Was this page helpful?
0 / 5 - 0 ratings