I'm using Rxjava 2.1.2, where I try to use repeat operator to keep producing a sequence from a source publisher.
However, the repeat operator seems to repeat the stale sequence emitted by the source publisher when the publisher produced at the very beginning instead of recall the top upstream function inside that source publisher, which is used by the publisher to produce the sequence.
Example sample code:
class SourcePublisher() {
var counter = 0
val list1 = listOf<Int>(1, 3, 5, 7, 9)
val list2 = listOf<Int>(0, 2, 4, 6, 8)
fun getSource(): Single<List<Int>> {
return if (counter++ % 2 == 0) Single.just(list1) else Single.just(list2)
}
}
fun main(args: Array<String>) {
val source = SourcePublisher()
source.getSource()
.repeatWhen { completed -> completed.delay(5, TimeUnit.SECONDS) }
.subscribe { list ->
println(list)
}
}
...produced the followin output, every 5 seconds print out:
[1, 3, 5, 7, 9]
[1, 3, 5, 7, 9]
[1, 3, 5, 7, 9]
...
Excepted output, every 5 seconds following print out:
[1, 3, 5, 7, 9]
[0, 2, 4, 6, 8]
[1, 3, 5, 7, 9]
[0, 2, 4, 6, 8]
...
I'm wondering is this an expected behavior for repeat operator, which means it will only keep repeating the very first sequence generated after publisher initialized? Or I misunderstand and use it in a wrong way to make it keep playing stale data?
Do you see that you call fun getSource() exactly once and its returned value is a constant flow?
You practically wrote
Single.just(list1).repeatWhen(...)
In order to get a fresh sequence on each repeat, you have to defer the subscription to the getSource() so it gets evaluated multiple times.
Single.defer(() -> source.getSource()).repeatWhen(...)
Or change original fun getSource() to produce Single that computes value for each subscribe call:
fun getSource() = Single.fromCallable { if (counter++ % 2 == 0) list1 else list2 }
that fromCallable works like a charm. Thank you all.
Great! Please close the issue then :)
I was struggling to understand the behaviour of repeat operator. This helped, thanks @akarnokd @artem-zinnatullin ✌🏼
Most helpful comment
Do you see that you call
fun getSource()exactly once and its returned value is a constant flow?You practically wrote
In order to get a fresh sequence on each repeat, you have to defer the subscription to the
getSource()so it gets evaluated multiple times.