As mentioned [[1](https://twitter.com/benjchristensen/status/471312995352457219)] [[2](https://gist.github.com/torgeir/bdfffbdfdc8ed98056b8#file-statistics-java-L25-L26)] it would be nice for Observable.zip(iterable, funcn) to handle iterables of the same type, without having to go about casting the object to an integer for making something like this to work
List<Observable<Integer>> iterable = Arrays.asList(Observable.from(1), Observable.from(2), Observable.from(3));
Observable<List<Integer>> observable = Observable.zip(iterable, Arrays::asList);
Making FuncN<R> into FuncN<T, R> kind of snowballed on me as I reached the OperatorZip$Zip which seems to be needing the Object type.
I missed this issue being opened ... do you want to pick this up again?
Sure, though I'm off for holidays for a couple of weeks now, so I'm a little of the radar..
I believe these Object[] arrays was where I stumbled
Considering how badly this will break any code using FuncN and the fact that it is only for arities beyond 9, this is not a change we want to make this late in the project lifecycle.
Java doesn't really have a good way of handling this type of scenario, and FuncN is an "escape hatch" by definition. It would mostly end up being used as FuncN<Object, R> anyways for its intended use.
Here is a workaround (using int[]):
List<Observable<Integer>> iterable = Arrays.asList(Observable.from(1), Observable.from(2), Observable.from(3));
Observable<int[]> observable = Observable.zip(iterable, ns -> Arrays.stream(ns).mapToInt(n -> (Integer) n).toArray());
observable.forEach(i -> System.out.println("Int[]: " + Arrays.toString(i)));
Retaining the arity and types:
Observable.zip(Observable.from(1), Observable.from(2), Observable.from(3), (a, b, c) -> Arrays.asList(a, b, c));
I came across this same issue, but with combineLatest. I see how it causes an incompatibility, but I'd like this to be reconsidered for v2.
Currently I'm using the following overload of combineLatest:
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
The definition of FuncN is:
R call(Object... args);
Firstly, the type information gets lost. A user would presume T is passed, but it is actually Object. This caused quite a bit of confusion when using RxJava in Kotlin, where all types can be implicitly resolved and the end of an chain would result in some form of Object. Going back up the chain to see what function had misbehaving type information resulted in combineLatest.
Additionally, I personally find the use of varargs a bit confusing. I passed in a List and I assume a List is passed to the combineFunction. Instead it is an Array/[], but it is not explicitly typed as an array (as it is typed using varargs). I see how varargs would fit combineFunction if combineLatest was called with varargs, but in this particular overload it is not.
It might be a good idea to introduce the following function definition for V2:
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, Func1<List<? extends T>, ? extends R> combineFunction)
That way a list is passed into combineLatest and a list is passed to the combineFunction. That way, no type information is lost.
This is how we solved the problem in Kotlin:
fun <T, R> List<Observable<T>>.combineLatest(combineFunction: (List<T>) -> R): Observable<R> =
Observable.combineLatest(
this,
{ xs: Array<out Any> ->
combineFunction(xs.toList() as List<T>)
}
)
I agree, this should be revised for v2.
Considering how badly this will break any code using FuncN and the fact that it is only for arities beyond 9, this is not a change we want to make this late in the project lifecycle.
That's not how FuncN is used. In practice FuncN is used for variable length functions, which probably occur far more often than Func4-9.
v2.x is different from v1.x now. See https://github.com/ReactiveX/RxJava/blob/0705001df0b52334829b2e4963469533618f14cb/src/main/java/io/reactivex/Observable.java#L101
The signature looks like:
public static <T, R> Observable<R> combineLatest(ObservableConsumable<? extends T>[] sources, Function<? super Object[], ? extends R> combiner)
So, FuncN has been replaced with Function that accepts an array :smile::+1:. However, the type signature does forget the type information T as it is still passing Object[] to combiner :cry:. This type signature is already incompatible with RxJava v1.x, so why does it need to be Object[] instead of T[]. Or is this just a case of no-one having picked up this problem yet?
If that's the case, I've made a commit for this change for v2.x here: https://github.com/bobvanderlinden/RxJava/tree/pr-generictype. If people here think it looks like what is desired, I'll do a PR. All tests are passing on my machine.
It was a long time ago, but I think we had some type inference problems due to the contravariant input to the function. The question is that how many extra unchecked warnings pop up due to the change.
Yes, I do get one unchecked cast now. I suppressed it using: https://github.com/bobvanderlinden/RxJava/commit/477696bc5364cddbef317106f77039657e3facf7
It's due to the Queue being of type Object. 2 different types of objects are being polled and offered to the queue each time (CombinerSubscriber<T, R> and T[]). Both of these are offered in the same sequence (https://github.com/bobvanderlinden/RxJava/blob/477696bc5364cddbef317106f77039657e3facf7/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java#L187) and are polled in the same sequence (https://github.com/bobvanderlinden/RxJava/blob/477696bc5364cddbef317106f77039657e3facf7/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java#L232-L235), so practically it's always the correct type.
The same unchecked warning occurs here: https://github.com/bobvanderlinden/RxJava/blob/477696bc5364cddbef317106f77039657e3facf7/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java#L221. There it's being suppressed as well.
The problem is with the queue containing two types of objects. Using 2 queues might be a solution for the type-checking problem, or a pair of objects in the queue, but I guess the current method is used for performance reasons?
If these are limited to the operators themselves, then there is no problem with the extra SuppressWarnings.
I guess this is used for performance reasons?
Yes.
I'll do a PR.
Sure. I've merged a PR targeting combineLatest as well so you may need to rebase your changes. I'll move on with the test and operator sync with 1.x so you may post a PR.
Thanks for the feedback. PR #4211 submitted.
Flowable doesn't seem to have a similar feature, right?
@micHar This is an old issue; could you be more specific about what you think is missing?
Function<? super T[] does not work because Java doesn't allow creating a generic array and an Object[] array passed as T[] will cause ClassCastException as you can't upcast an Object[] to a String[] for example.
I'm confused. Looked into the pull request which seems to be merged and can find e.g. this:
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableConsumable<? extends T>> sources, Function<? super T[], ? extends R> combiner) {
But in 2.1.8 there's only
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> combiner) {
So this has been deleted and is impossible to implement, right?
The signature has been changed to Function<? super Object[], ? extends R> because it can't work otherwise, but this happened before 2.0.0 full release.
The linked PR was later undone due to #4524 via #4525.
Most helpful comment
This is how we solved the problem in Kotlin: