Please add
map(Func2<? super T, int index, ? extends R> func)
Where the second parameter of Func2 is the index of the element.
A similar overload could be useful on operators like scan, reduce, filter etc.
Hi, at some time, there were a few indexed methods but they have been removed to reduce the API surface.
You can use rxjava-extras which is on Maven Central like this:
Observable
.just("a", "b", "c")
.compose(Transformers.mapWithIndex())
.map(x -> x.index() + "->" + x.value())
.forEach(System.out::println);
and you get
0->a
1->b
2->c
Looks like we can close this issue, @akarnokd pointed why there is no such operator in current version of RxJava (1.1.0) and @davidmoten gave working solution.
That's a weird criterium for closing feature request. Most projects close them if already present.
Also scan(tuple2.create(null, 0), (tup, item) -> tuple2.create(item, tup.value2() + 1)) works just fine, i just got lazy writing that every time so i put up the request.
Why not use rxjava-extras? :)
RxJava already has ~4k methods, and it has compose() and some other ways to be extendable.
I'd say, that existing operators are enough to do what you want, though, it's a little bit less effective than native operator because it creates new Pair each time:
Observable
.just("a", "b", "c")
.map(new Func1<String, Pair<Integer, String>>() {
private int counter; // Or AtomicInteger if needed.
@Override
public Pair<Integer, String> call(String s) {
return new Pair<Integer, String>(counter++, s);
}
})
// I have nothing personal against indexed operators, but at the same time I'd vote for keeping API "small".
@artem-zinnatullin the map function should be stateless. Otherwise, it only supports one subscriber.
@zsxwing a lot of operators have state -> support only one subscriber. There is a group of connectable operators to support multiple subscribers.
@artem-zinnatullin A correctly implemented operator might have a state internally, but does not share this state between subscribers. For example Scan.
The code
hotObservable.scan(...).subscriber(a);
hotObservable.scan(...).subscriber(b);
is identical to
Observable<T> hot2 = hotObservable.scan(...);
hot2.subscriber(a);
hot2.subscriber(b);
And should be identical for all operators. The only exception to the rule is perhaps publish().
A common strategy to make a state 'local' to the subscription is to use observable.create().
I'd like to see the API grow with some simple useful operators so I'm not against mapWithIndex being reinstated.
A quick check. What are the concerns with an increased API surface? Are they
Since the introduction of backpressure into RxJava constructing even simple transformations is something that should be done with care and the team here is perfect for managing these additions.
@artem-zinnatullin defer is also useful for getting per-subscriber state:
Observable.defer(() -> {
Func1<String, Pair<Integer,String>> f = new Func1<String, Pair<Integer, String>>() {
private int counter; // Or AtomicInteger if needed.
@Override
public Pair<Integer, String> call(String s) {
return new Pair<Integer, String>(counter++, s);
}
};
return Observable
.just("a", "b", "c")
.map(f);
});
By the way, mapWithIndex can be implemented by zipWith(Observable.range(0, Integer.MAX_VALUE)).
I prefer to keep the surface API as small as possible.
I believe @zsxwing suggestion (or rxjava-extras) should be the recommended one instead of adding another method.
I'm surprised the proposition by @zsxwing didn't cause any problems to nobody. Isn't it true that the lack of backpressure and the size of that range must cause some issues if you are writing async code?
For example:
val observable = Observable.timer(100, TimeUnit.MILLISECONDS)
val indices = Observable.range(0, 1000).doOnNext { println("computed index: $it") }
observable.zipWith(indices)
.forEach { (a, b) ->
println("got: [$a, $b]")
}
Prints:
computed index: 1
computed index: 2
computed index: 3
// ...
computed index: 999
got: [0, 0]
So I'd expect the application to hang if you pass Integer.MAX_VALUE there (actually that's what happened in my case, I had to terminate it).
Isn't it true that the lack of backpressure
This issue refers to RxJava 1.x where Observable had support for backpressure.
Most helpful comment
By the way, mapWithIndex can be implemented by
zipWith(Observable.range(0, Integer.MAX_VALUE)).