Rxjava: 2.x: repeat() / repeatWhen operator unexpected behavior

Created on 16 Aug 2017  ·  5Comments  ·  Source: ReactiveX/RxJava

I'm using Rxjava 2.1.2, where I try to use repeat operator to keep producing a sequence from a source publisher.

However, the repeat operator seems to repeat the stale sequence emitted by the source publisher when the publisher produced at the very beginning instead of recall the top upstream function inside that source publisher, which is used by the publisher to produce the sequence.

Example sample code:

class SourcePublisher() {
    var counter = 0
    val list1 = listOf<Int>(1, 3, 5, 7, 9)
    val list2 = listOf<Int>(0, 2, 4, 6, 8)

    fun getSource(): Single<List<Int>> {
        return if (counter++ % 2 == 0) Single.just(list1) else Single.just(list2)
    }
}

fun main(args: Array<String>) {
    val source = SourcePublisher()

    source.getSource()
           .repeatWhen { completed -> completed.delay(5, TimeUnit.SECONDS) }
               .subscribe { list -> 
                println(list)
            }
}

...produced the followin output, every 5 seconds print out:
[1, 3, 5, 7, 9]
[1, 3, 5, 7, 9]
[1, 3, 5, 7, 9]
...

Excepted output, every 5 seconds following print out: 
[1, 3, 5, 7, 9]
[0, 2, 4, 6, 8]
[1, 3, 5, 7, 9]
[0, 2, 4, 6, 8]
...

I'm wondering is this an expected behavior for repeat operator, which means it will only keep repeating the very first sequence generated after publisher initialized? Or I misunderstand and use it in a wrong way to make it keep playing stale data?

2.x Question

Most helpful comment

Do you see that you call fun getSource() exactly once and its returned value is a constant flow?

You practically wrote

Single.just(list1).repeatWhen(...)

In order to get a fresh sequence on each repeat, you have to defer the subscription to the getSource() so it gets evaluated multiple times.

    Single.defer(() -> source.getSource()).repeatWhen(...)

All 5 comments

Do you see that you call fun getSource() exactly once and its returned value is a constant flow?

You practically wrote

Single.just(list1).repeatWhen(...)

In order to get a fresh sequence on each repeat, you have to defer the subscription to the getSource() so it gets evaluated multiple times.

    Single.defer(() -> source.getSource()).repeatWhen(...)

Or change original fun getSource() to produce Single that computes value for each subscribe call:

fun getSource() = Single.fromCallable { if (counter++ % 2 == 0) list1 else list2 }

that fromCallable works like a charm. Thank you all.

Great! Please close the issue then :)

I was struggling to understand the behaviour of repeat operator. This helped, thanks @akarnokd @artem-zinnatullin ✌🏼

Was this page helpful?
0 / 5 - 0 ratings