Rxjava: Scan Operator, with and without initial value behave wildly different. Intended?

Created on 3 Jun 2013  路  1Comment  路  Source: ReactiveX/RxJava

I'm quite new to this RX stuff so forgive me if there something simple I'm missing here.

I was trying to use scan and found that it did not behave as I expected. This code illustrates how:

public class ScanIssue
{
    public static void main(String[] args) throws InterruptedException
    {
        PublishSubject<Integer> source  = PublishSubject.create();

        Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>()
        {
            @Override
            public Integer call(Integer a, Integer b)
            {
                return a + b;
            }
        };

        Observable<Integer> scan = Observable.scan(source, accumulator);
//        Observable<Integer> scan = Observable.scan(source, 0, accumulator);


        print("A", scan);
        print("B", scan);
        print("C", scan);
//        print("D", scan);
//        print("E", scan);

        Thread.sleep(1000);
        source.onNext(1);
        Thread.sleep(1000);
        source.onNext(1);
    }

    private static <T> void print(final String prefix, Observable<T> source)
    {
        source.subscribe(new Action1<T>()
        {
            @Override
            public void call(T s)
            {
                System.out.println(prefix + ": " + s);
            }
        });
    }
}

I would expect it to output nothing on the first onNext and this on the second:

A: 2
B: 2
C: 2

Where I don't care about the order of A,B & C.

But instead it outputs this on the first one:

B: 1
B: 2
B: 3

and this on the second:

B: 4
B: 5
B: 6

_B_ can be any of A, B and C but always the same all 6 times.

  • If I switch to the one with initial value it works as I expect
  • The more subscriptions I add the worse it gets
  • The type of source does not seem to matter (tried with a hand rolled one)

Most helpful comment

The first value is always passed through as the first onNext. If no initial value is given, then the first source onNext is passed through and then fed as the initial value along with the next onNext.

So for this one:

source.scan(0, accumulator);

We see 0 come through first and then be accumulated with 1 and 2:

onNext => A: 0
FUNC=> a: 0 b: 1
onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3

In this one:

Observable<Integer> scan = source.scan(accumulator);

it's now actually only passing in 2 values (1 and 2) instead of 0 as initial then 1 and 2, so we get:

onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3

The first value is still passed directly through and then the next accumulated.

If I change the Observable to emit 0, 1, 2 it looks the same as when setting initialValue to 0.

        source.onNext(0);
        source.onNext(1);
        source.onNext(2);

... now emits this:

onNext => A: 0
FUNC=> a: 0 b: 1
onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3

Javadocs: http://netflix.github.io/RxJava/javadoc/rx/Observable.html#scan(rx.util.functions.Func2)

Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by an Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the result of each of these iterations.

scan

>All comments

The first value is always passed through as the first onNext. If no initial value is given, then the first source onNext is passed through and then fed as the initial value along with the next onNext.

So for this one:

source.scan(0, accumulator);

We see 0 come through first and then be accumulated with 1 and 2:

onNext => A: 0
FUNC=> a: 0 b: 1
onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3

In this one:

Observable<Integer> scan = source.scan(accumulator);

it's now actually only passing in 2 values (1 and 2) instead of 0 as initial then 1 and 2, so we get:

onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3

The first value is still passed directly through and then the next accumulated.

If I change the Observable to emit 0, 1, 2 it looks the same as when setting initialValue to 0.

        source.onNext(0);
        source.onNext(1);
        source.onNext(2);

... now emits this:

onNext => A: 0
FUNC=> a: 0 b: 1
onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3

Javadocs: http://netflix.github.io/RxJava/javadoc/rx/Observable.html#scan(rx.util.functions.Func2)

Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by an Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the result of each of these iterations.

scan

Was this page helpful?
0 / 5 - 0 ratings

Related issues

philleonard picture philleonard  路  3Comments

midnight-wonderer picture midnight-wonderer  路  3Comments

dsvoronin picture dsvoronin  路  4Comments

theblang picture theblang  路  3Comments

ZakTaccardi picture ZakTaccardi  路  3Comments