RxJava 2.1.4
Hello Im having trouble with contact, what I was trying tot do is reactively chose source depending on the result of the source before it. Im not sure if I understand how Observable.concat works, but this test is failing.
class RxTest {
val source1 = "first"
val source2 = "second"
val source3 = "third"
val SOURCE_1_FLAG = 0
val SOURCE_2_FLAG = 1
val SOURCE_3_FLAG = 2
@Test
fun unsatisfiedResultFromSource1ProceedToSource2Test() {
val observer: TestObserver<String> = TestObserver()
val loadData = ObservableTransformer<Int, String> {
it.publish {
Observable.concat(
it.filter { it == SOURCE_1_FLAG }
.flatMap { Observable.just(source1) }
.doOnNext{ System.out.println(it) }
.filter { false },
it.filter { it <= SOURCE_2_FLAG }
.flatMap { Observable.just(source2) }
.doOnNext{ System.out.println(it) },
it.filter { it <= SOURCE_3_FLAG }
.flatMap { Observable.just(source3) }
.doOnNext{ System.out.println(it) }
).take(1)
}
}
val observable = Observable.just(SOURCE_1_FLAG).compose(loadData)
observable.subscribe(observer)
observer.assertComplete()
observer.assertNoErrors()
observer.assertValue { it == source2 }
observer.assertNever { it == source1 }
observer.assertNever { it == source3 }
}
}
In this test I was expecting that the source1 will be executed and then because of the filter { false } it will proceed to source2. But what actually happens is it executes the first source and then completes without any emitted data, source2 havent got a chance to be executed.
With this shared concat setup, your first condition will consume the Observable.just(SOURCE_1_FLAG) and then switch to the next condition, which will then find an already completed sequence. You need either merge instead of cocnat or replay instead of publish.
Thanks @akarnokd, I was actually referring to merge when I was making this kind of setup, I assume concat was just like merge except it concatenate the stream. Thank you for clarifying how concat works, but still needs to find solution for the behavior that I need. I think you get my intention in this code.
val loadData = ObservableTransformer<Int, String> {
it.replay {
Observable.concat(
it.filter { it == SOURCE_1_FLAG }
.flatMap { Observable.just(source1) }
.doOnNext{ System.out.println(it) }
.filter { false },
it.filter { it <= SOURCE_2_FLAG }
.flatMap { Observable.just(source2) }
.doOnNext{ System.out.println(it) },
it.filter { it <= SOURCE_3_FLAG }
.flatMap { Observable.just(source3) }
.doOnNext{ System.out.println(it) }
).take(1)
}
}
Thanks again arnold, replay was what I need.
Hi @akarnokd, Im sorry I need more info with concat, consider this code;
val loadData = ObservableTransformer<Int, String> {
it.replay {
Observable.concat(
it.filter { it == SOURCE_1_FLAG }
.flatMap { Observable.just(source1) }
...
.doOnNext{ System.out.println(it) },
it.filter { it == SOURCE_2_FLAG }
.flatMap { Observable.just(source2) }
...
.doOnNext{ System.out.println(it) },
it.filter { it == SOURCE_3_FLAG }
.flatMap { Observable.just(source3) }
...
.doOnNext{ System.out.println(it) }
)
}
}
Assume the first observable in the concat completes already, then when upstream emits more data, the first observable in this concat will be skipped right?
Im having hard time tailoring what I want, what I wanted is that even if the first observable finishes, it will still receive the upstream data in the next emission. I was digging the available operators, and it seem to be impossible, is it?
Should I reopen this?
Try Observable.merge instead.
Yeah thanks, I did it using a series of publish and Observable.merge but its less compact, but still beautiful. I was attempting to share it here but its hard to explain. 馃槃
Most helpful comment
With this shared concat setup, your first condition will consume the
Observable.just(SOURCE_1_FLAG)and then switch to the next condition, which will then find an already completed sequence. You need eithermergeinstead ofcocnatorreplayinstead ofpublish.