I'm quite new to this RX stuff so forgive me if there something simple I'm missing here.
I was trying to use scan and found that it did not behave as I expected. This code illustrates how:
public class ScanIssue
{
public static void main(String[] args) throws InterruptedException
{
PublishSubject<Integer> source = PublishSubject.create();
Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer a, Integer b)
{
return a + b;
}
};
Observable<Integer> scan = Observable.scan(source, accumulator);
// Observable<Integer> scan = Observable.scan(source, 0, accumulator);
print("A", scan);
print("B", scan);
print("C", scan);
// print("D", scan);
// print("E", scan);
Thread.sleep(1000);
source.onNext(1);
Thread.sleep(1000);
source.onNext(1);
}
private static <T> void print(final String prefix, Observable<T> source)
{
source.subscribe(new Action1<T>()
{
@Override
public void call(T s)
{
System.out.println(prefix + ": " + s);
}
});
}
}
I would expect it to output nothing on the first onNext and this on the second:
A: 2
B: 2
C: 2
Where I don't care about the order of A,B & C.
But instead it outputs this on the first one:
B: 1
B: 2
B: 3
and this on the second:
B: 4
B: 5
B: 6
_B_ can be any of A, B and C but always the same all 6 times.
The first value is always passed through as the first onNext. If no initial value is given, then the first source onNext is passed through and then fed as the initial value along with the next onNext.
So for this one:
source.scan(0, accumulator);
We see 0 come through first and then be accumulated with 1 and 2:
onNext => A: 0
FUNC=> a: 0 b: 1
onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3
In this one:
Observable<Integer> scan = source.scan(accumulator);
it's now actually only passing in 2 values (1 and 2) instead of 0 as initial then 1 and 2, so we get:
onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3
The first value is still passed directly through and then the next accumulated.
If I change the Observable to emit 0, 1, 2 it looks the same as when setting initialValue to 0.
source.onNext(0);
source.onNext(1);
source.onNext(2);
... now emits this:
onNext => A: 0
FUNC=> a: 0 b: 1
onNext => A: 1
FUNC=> a: 1 b: 2
onNext => A: 3
Javadocs: http://netflix.github.io/RxJava/javadoc/rx/Observable.html#scan(rx.util.functions.Func2)
Returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by an Observable into the same function, and so on until all items have been emitted by the source Observable, emitting the result of each of these iterations.

Most helpful comment
The first value is always passed through as the first
onNext. If no initial value is given, then the first sourceonNextis passed through and then fed as the initial value along with the nextonNext.So for this one:
We see
0come through first and then be accumulated with1and2:In this one:
it's now actually only passing in 2 values (1 and 2) instead of 0 as initial then 1 and 2, so we get:
The first value is still passed directly through and then the next accumulated.
If I change the Observable to emit 0, 1, 2 it looks the same as when setting initialValue to 0.
... now emits this:
Javadocs: http://netflix.github.io/RxJava/javadoc/rx/Observable.html#scan(rx.util.functions.Func2)