Rxjava: 2.1.4 Observable.concat only executes first source

Created on 10 Jun 2018  路  6Comments  路  Source: ReactiveX/RxJava

RxJava 2.1.4

Hello Im having trouble with contact, what I was trying tot do is reactively chose source depending on the result of the source before it. Im not sure if I understand how Observable.concat works, but this test is failing.

class RxTest {

    val source1 = "first"

    val source2 = "second"

    val source3 = "third"

    val SOURCE_1_FLAG = 0

    val SOURCE_2_FLAG = 1

    val SOURCE_3_FLAG = 2

    @Test
    fun unsatisfiedResultFromSource1ProceedToSource2Test() {
        val observer: TestObserver<String> = TestObserver()

        val loadData = ObservableTransformer<Int, String> {
            it.publish {
                Observable.concat(
                        it.filter { it == SOURCE_1_FLAG }
                                .flatMap { Observable.just(source1) }
                                .doOnNext{ System.out.println(it) }
                                .filter { false },
                        it.filter { it <= SOURCE_2_FLAG }
                                .flatMap { Observable.just(source2) }
                                .doOnNext{ System.out.println(it) },
                        it.filter { it <= SOURCE_3_FLAG }
                                .flatMap { Observable.just(source3) }
                                .doOnNext{ System.out.println(it) }
                ).take(1)
            }
        }
        val observable = Observable.just(SOURCE_1_FLAG).compose(loadData)

        observable.subscribe(observer)

        observer.assertComplete()
        observer.assertNoErrors()

        observer.assertValue { it == source2 }

        observer.assertNever { it == source1 }
        observer.assertNever { it == source3 }
    }
}

In this test I was expecting that the source1 will be executed and then because of the filter { false } it will proceed to source2. But what actually happens is it executes the first source and then completes without any emitted data, source2 havent got a chance to be executed.

Most helpful comment

With this shared concat setup, your first condition will consume the Observable.just(SOURCE_1_FLAG) and then switch to the next condition, which will then find an already completed sequence. You need either merge instead of cocnat or replay instead of publish.

All 6 comments

With this shared concat setup, your first condition will consume the Observable.just(SOURCE_1_FLAG) and then switch to the next condition, which will then find an already completed sequence. You need either merge instead of cocnat or replay instead of publish.

Thanks @akarnokd, I was actually referring to merge when I was making this kind of setup, I assume concat was just like merge except it concatenate the stream. Thank you for clarifying how concat works, but still needs to find solution for the behavior that I need. I think you get my intention in this code.

        val loadData = ObservableTransformer<Int, String> {
            it.replay {
                Observable.concat(
                        it.filter { it == SOURCE_1_FLAG }
                                .flatMap { Observable.just(source1) }
                                .doOnNext{ System.out.println(it) }
                                .filter { false },
                        it.filter { it <= SOURCE_2_FLAG }
                                .flatMap { Observable.just(source2) }
                                .doOnNext{ System.out.println(it) },
                        it.filter { it <= SOURCE_3_FLAG }
                                .flatMap { Observable.just(source3) }
                                .doOnNext{ System.out.println(it) }
                ).take(1)
            }
        }

Thanks again arnold, replay was what I need.

Hi @akarnokd, Im sorry I need more info with concat, consider this code;

        val loadData = ObservableTransformer<Int, String> {
            it.replay {
                Observable.concat(
                        it.filter { it == SOURCE_1_FLAG }
                                .flatMap { Observable.just(source1) }
                                ...
                                .doOnNext{ System.out.println(it) },
                        it.filter { it == SOURCE_2_FLAG }
                                .flatMap { Observable.just(source2) }
                                ...
                                .doOnNext{ System.out.println(it) },
                        it.filter { it == SOURCE_3_FLAG }
                                .flatMap { Observable.just(source3) }
                                ...
                                .doOnNext{ System.out.println(it) }
                )
            }
        }

Assume the first observable in the concat completes already, then when upstream emits more data, the first observable in this concat will be skipped right?

Im having hard time tailoring what I want, what I wanted is that even if the first observable finishes, it will still receive the upstream data in the next emission. I was digging the available operators, and it seem to be impossible, is it?

Should I reopen this?

Try Observable.merge instead.

Yeah thanks, I did it using a series of publish and Observable.merge but its less compact, but still beautiful. I was attempting to share it here but its hard to explain. 馃槃

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dlew picture dlew  路  4Comments

archenroot picture archenroot  路  3Comments

nltran picture nltran  路  4Comments

yubaokang picture yubaokang  路  3Comments

francorolando picture francorolando  路  3Comments