Rxjava: Convert an Observable stream to Completable

Created on 29 Mar 2016  路  9Comments  路  Source: ReactiveX/RxJava

If I'm not mistaken, Completable is supposed to be a replacement for things like Observable<Void>, but it seems like combining them with Observables is messier.

It's possible I'm doing something weird here, but this is what I want to do:

Given

Observable<Data> makeHttpCall();
Completable storeInDatabase(Data data);

Ideally I would be able to do something like this:

Completable syncData() {
    return makeHttpCall().flatMap(storeInDatabase);
}

But currently this is what I have:

Completable syncData() {
    return makeHttpCall()
        .flatMap(data -> storeInDatabase(data).toObservable<Void>())
        .toCompletable();
}
Question

Most helpful comment

You can do this with some conversions:

Completable.merge(makeHttpCall().map(v -> storeInDatabase(v).toCompletable())).await();

All 9 comments

You don't explicitly have to convert it to Observable<Void> first, all toCompletable() does is basically ignore onNext() emissions and listen for a terminal event. If you care about non-terminal event emissions, you presumably need to make sure you're doing whatever it is you need with them before calling toCompletable()

The problem is that makeHttpCall().map(storeInDatabase) returns Observable<Completable>, and if I call toCompletable() on that, I'm not actually waiting for the Completable to complete right? But if I call toObservable() on the result of storeInDatabase, it will hook up the subscribe correctly. My understanding of this is not clear - am I wrong here?

You can do this with some conversions:

Completable.merge(makeHttpCall().map(v -> storeInDatabase(v).toCompletable())).await();

@akarnokd to be clear, you are calling this method:

image
is that right?

Are there other methods that operate on Observable<Completable> like this or is it a special case?

Yes, Completable.concat and Completable.mergeDelayError.

So now I have the following

// Given
Observable<Data> makeHttpCall();
Completable storeInDatabase(Data data);

// Do Work
Completable syncData() {
    return Completable.merge(makeHttpCall().map(db::storeInDatabase));
}

// Subscribe
syncData().await();

Is this the correct way to accomplish this?

If you want to wait blockingly then yes.

Ah sorry, didn't read your full sample snippet before. Disregard my earlier comment

It looks like Completable.merge solves my issue of transforming Observable<Completable> to Completable then. Thanks!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ljf1172361058 picture ljf1172361058  路  3Comments

dlew picture dlew  路  4Comments

dsvoronin picture dsvoronin  路  4Comments

hoc081098 picture hoc081098  路  3Comments

ZakTaccardi picture ZakTaccardi  路  3Comments