Rxjava: What is the best solution to get download progress ?

Created on 19 Aug 2016  Â·  11Comments  Â·  Source: ReactiveX/RxJava

I want to create a simple Observable that downloads a file, and send progress. Here's what I got for now :

@Override
    public Observable<Progress> downloadTiles(final String tilesUrl) {
        return Observable.create(new Observable.OnSubscribe<Progress>() {
            @Override
            public void call(Subscriber<? super Progress> subscriber) {
                Request request = new Request.Builder()
                        .url(tilesUrl)
                        .build();

                try {
                    // see http://stackoverflow.com/a/34345052/1343969
                    Call runningRequest =  mClient.newCall(request);
                    Response response = runningRequest.execute();
                    long contentLength = response.body().contentLength();
                    File destinationFile = TileCacheHelper.getZipFile(mContext, tilesUrl);
                    Timber.d("Saving tiles zip to %s", destinationFile);
                    BufferedSink sink = Okio.buffer(Okio.sink(destinationFile));

                    long totalRead = 0;
                    long lastRead;
                    while ((lastRead = response.body().source().read(sink.buffer(), BUFFER_SIZE)) != -1) {
                        if(subscriber.isUnsubscribed()){
                            runningRequest.cancel();
                            response.body().close();
                            sink.close();
                            Timber.d("Download cancelled");
                            return;
                        }
                        sink.emitCompleteSegments();
                        totalRead += lastRead;
                        subscriber.onNext(new Progress(totalRead, contentLength, false));
                    }
                    sink.writeAll(response.body().source());
                    sink.close();

                    subscriber.onNext(new Progress(totalRead, contentLength, true));
                    subscriber.onCompleted();
                } catch (IOException e) {
                    subscriber.onError(e);
                }
            }
        }).sample(100, TimeUnit.MILLISECONDS);
    }

I'm fairly new to RxJava, so I don't think I get all the concepts behind it. I'll explain my thought, so don't hesitate to point any mistake I would make.

I read everywhere : "Don't use create, it doesn't handle backpressure and you won't write it correctly" and... That's completely right. When my app subscribe to this, I get the backpressure exception. I tried to add the backpressure operators (OnBackpressureDrop would be completely correct) but with no luck. That's the reason why I added the sample, and even if it works, I'd like the UI to display the progress as soon as it can.

I found this, and it seems that SyncOnSubscribe is close to what I want. But from what I understand, the "next" is called in a "pull" fashion (also called "cold", is that it ?), so the subscriber ask the observable to do the next thing it should do. That's not exactly what I want : I'd like the Observer to do his stuff in the background continuously, maybe even "push" progress as soon as it can (what is called "hot" ?), and the UI should update as fast as possible, but if it is too slow (and it is), it should get only the last value. Letting the UI request the progress when it is ready would be great, but I don't know how to do that and let the observable download the file.

Also, there is the new fromAsync, but I'd like to keep the Observable content sequential as it is, instead of adding a number of indirections that makes the code harder to read).

So, what is the best way to implement this ?

Question

Most helpful comment

For the record, here's what I ended up doing :

public Observable<Progress> downloadFile(final String url, final File dest) {
        return Observable
                .fromEmitter(new Action1<Emitter<Progress>>() {
                    @Override
                    public void call(final Emitter<Progress> progressEmitter) {
                        mIsFinished = false;

                        Request request = new Request.Builder()
                                .url(url)
                                .build();

                        final Call call = mClient.newCall(request);
                        call.enqueue(new Callback() {
                            @Override
                            public void onFailure(Call call, IOException e) {
                                Timber.w("onFailure called");
                                progressEmitter.onError(e);
                            }

                            @Override
                            public void onResponse(Call call, Response response) throws IOException {
                                Timber.d("OnResponse called");
                                BufferedSink sink = Okio.buffer(Okio.sink(dest));

                                try {
                                    long contentLength = response.body().contentLength();
                                    long totalRead = 0;
                                    long lastRead;
                                    while ((lastRead = response.body().source().read(sink.buffer(), BUFFER_SIZE)) != -1) {
                                        totalRead += lastRead;
                                        sink.emitCompleteSegments();
                                        progressEmitter.onNext(new Progress(totalRead, contentLength));
                                    }
                                    sink.writeAll(response.body().source());
                                    sink.close();
                                    response.body().close();
                                    mIsFinished = true;
                                    progressEmitter.onCompleted();
                                } catch (IOException e) {
                                    response.body().close();
                                    sink.close();
                                    progressEmitter.onError(e);
                                }
                            }
                        });

                        progressEmitter.setCancellation(new Cancellable() {
                            @Override
                            public void cancel() throws Exception {
                                Timber.w("downloadFile: Download cancelled");
                                call.cancel();
                            }
                        });
                    }
                }, Emitter.BackpressureMode.LATEST)
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        if (!mIsFinished) {
                            if (dest.delete()) {
                                Timber.d("Incomplete file deleted : %s", dest);
                            } else {
                                Timber.w("Incomplete file CANNOT be deleted : %s", dest);
                            }
                        }
                    }
                })
                .unsubscribeOn(Schedulers.io())
                .onBackpressureLatest();

    }

The weird thing is that if I do not add the last onBackpressureLatest(), I get MissingBackpressureException, but if I remove doOnUnsubscribe and unsubscribeOn, it works. I don't know why these 2 calls break the backpressure.

All 11 comments

I think you should use a Subject (PublishSubject in this case). That way you can easily push values to the Observers outside of your download code.

BTW, since your download code and your progress receiving code are presumably gonna be in on different threads, you need to wrap the Subject in a SerializedSubject.

Downloading happens on single thread, no need for serialisation here.

I would suggest to use Observable.fromAsync() here, it's not that hard :)

On Fri, 19 Aug 2016, 23:19 Sasa Sekulic, [email protected] wrote:

I think you should use a Subject
http://akarnokd.blogspot.hu/2015/06/subjects-part-1.html (PublishSubject
http://reactivex.io/documentation/subject.html in this case). That way
you can easily push values to the Observers outside of your download code.

BTW, since your download code and your progress receiving code are
presumably gonna be in on different threads, you need to wrap the Subject
in a SerializedSubject https://github.com/ReactiveX/RxJava/wiki/Subject.

—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/4376#issuecomment-241124395,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AA7B3BnCl9Fi9IUc-Wmbz9NfzvlsJg5Iks5qhg_igaJpZM4Jocf3
.

true, fromAsync() would be a good solution - you can check the docs here.

I can't try it for now, but I'll do it as soon as possible. I find it confusing to use fromAsync() when my code is completely synchronous though.

Hey guys, sorry for the delay. I tried using fromAsync with a drop policy, but the result is still not correct : I receive the 16 first updates, but nothing after that. Why is that ?

Can you post a standalone code example? How do you consume the results? 16 is the default buffer size and sounds like something doesn't request more.

Here's a demo : https://github.com/Oyashirox/FromAsyncTest

Notice that I tried to add request(1) in onStart and onNext() without success.

Try with AsyncEmitter.BackpressureMode.BUFFER instead so you don't lose data.

Same behavior with BUFFER. I don't mind losing data in this case, it's even the expected behavior : the UI won't be able to display every step, so I want it to display only the last one emitted.

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

For the record, here's what I ended up doing :

public Observable<Progress> downloadFile(final String url, final File dest) {
        return Observable
                .fromEmitter(new Action1<Emitter<Progress>>() {
                    @Override
                    public void call(final Emitter<Progress> progressEmitter) {
                        mIsFinished = false;

                        Request request = new Request.Builder()
                                .url(url)
                                .build();

                        final Call call = mClient.newCall(request);
                        call.enqueue(new Callback() {
                            @Override
                            public void onFailure(Call call, IOException e) {
                                Timber.w("onFailure called");
                                progressEmitter.onError(e);
                            }

                            @Override
                            public void onResponse(Call call, Response response) throws IOException {
                                Timber.d("OnResponse called");
                                BufferedSink sink = Okio.buffer(Okio.sink(dest));

                                try {
                                    long contentLength = response.body().contentLength();
                                    long totalRead = 0;
                                    long lastRead;
                                    while ((lastRead = response.body().source().read(sink.buffer(), BUFFER_SIZE)) != -1) {
                                        totalRead += lastRead;
                                        sink.emitCompleteSegments();
                                        progressEmitter.onNext(new Progress(totalRead, contentLength));
                                    }
                                    sink.writeAll(response.body().source());
                                    sink.close();
                                    response.body().close();
                                    mIsFinished = true;
                                    progressEmitter.onCompleted();
                                } catch (IOException e) {
                                    response.body().close();
                                    sink.close();
                                    progressEmitter.onError(e);
                                }
                            }
                        });

                        progressEmitter.setCancellation(new Cancellable() {
                            @Override
                            public void cancel() throws Exception {
                                Timber.w("downloadFile: Download cancelled");
                                call.cancel();
                            }
                        });
                    }
                }, Emitter.BackpressureMode.LATEST)
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        if (!mIsFinished) {
                            if (dest.delete()) {
                                Timber.d("Incomplete file deleted : %s", dest);
                            } else {
                                Timber.w("Incomplete file CANNOT be deleted : %s", dest);
                            }
                        }
                    }
                })
                .unsubscribeOn(Schedulers.io())
                .onBackpressureLatest();

    }

The weird thing is that if I do not add the last onBackpressureLatest(), I get MissingBackpressureException, but if I remove doOnUnsubscribe and unsubscribeOn, it works. I don't know why these 2 calls break the backpressure.

Was this page helpful?
0 / 5 - 0 ratings