Rxjava: 2.x changing some Observable operators to return Single.

Created on 9 Aug 2016  Â·  39Comments  Â·  Source: ReactiveX/RxJava

I was looking through the Observable operators and wondering if now would be a good time to change the following methods to return Singles instead of Observables.

  • [x] Single<Boolean> all(Predicate<? super T> predicate)
  • [x] Single<Boolean> any(Predicate<? super T> predicate)
  • [x] <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
  • [x] <U> Single<U> collectInto(final U initialValue, BiConsumer<? super U, ? super T> collector)
  • [x] Single<Boolean> contains(final Object o)
  • [x] Single<Long> count()
  • [x] Maybe<T> elementAt(long index)
  • [x] Single<T> elementAt(long index, T defaultValue)
  • [x] Maybe<T> firstElement()
  • [x] Single<T> first(T defaultItem)
  • [x] Completable ignoreElements()
  • [x] Single<Boolean> isEmpty()
  • [x] Maybe<T> lastElement()
  • [x] Single<T> last(T defaultItem)
  • [x] Maybe<T> reduce(BiFunction<T, T, T> reducer)
  • [x] <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer)
  • [x] <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer)
  • [x] Maybe<T> singleElement()
  • [x] Single<T> single(T defaultItem)
  • [x] Single<List<T>> toList()
  • [x] Single<List<T>> toList(final int capacityHint)
  • [x] <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
  • [x] <K, V> Single<Map<K, V>> toMap(final Function<? super T, ? extends K> keySelector, final Function<? super T, ? extends V> valueSelector)
  • [x] <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ? extends K> keySelector)
  • [x] <K, V> Single<Map<K, Collection<V>>> toMultimap(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector)
  • [x] Single<List<T>> toSortedList()
  • [x] Single<List<T>> toSortedList(final Comparator<? super T> comparator)
  • [x] Single<List<T>> toSortedList(final Comparator<? super T> comparator, int capacityHint)
  • [x] Single<List<T>> toSortedList(int capacityHint)

(These were removed from the API:)

  • Single<List<T>> takeLastBuffer(int count)
  • Single<List<T>> takeLastBuffer(int count, long time, TimeUnit unit)
  • Single<List<T>> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler)
  • Single<List<T>> takeLastBuffer(long time, TimeUnit unit)
  • Single<List<T>> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler)
Discussion

Most helpful comment

I'm a bit nervous about Single not supporting backpressure. We got into trouble with Observable.just(item) not supporting backpressure in 1.x. See
https://github.com/ReactiveX/RxJava/issues/3044 which includes references to failures.

All 39 comments

I'm +1 for it I also suggested that here for a few operators #4277

I don't know, looks like one needs to convert back to Flowable all the time plus one needs to update a large amount of unit tests manually.

@akarnokd Why don't the operators on Single return Flowables? Isn't singleSubscribe an implicit request one?

As for the unit tests it's a large task but not insurmountable.

Some do, like Single.concat that have to be "widened" but mostly Single stays Single. Plus, Single doesn't have backpressure at all so no request(1) inside it. Reactor's Mono (0, 1, error) is a Publisher so they have to honor backpressure for that maybe 1 element.

@akarnokd Yes, I agree Single should not have back pressure because it is fixed size and subscribing can be thought of as an implicit request one. In other words if the subscriber doesn't have enough room in the queue for one element then don't subscribe.

I'm a bit nervous about Single not supporting backpressure. We got into trouble with Observable.just(item) not supporting backpressure in 1.x. See
https://github.com/ReactiveX/RxJava/issues/3044 which includes references to failures.

I agree with @davidmoten. It was clear that a mistake was made with just ignoring backpressure and it feels like the same mistake is being made again. Plus gaining compatibility with Mono is something which I would much appreciate.

The main difference in this case is that Single isn't meant to support back pressure so it wouldn't mix of behavior like there was with just.

I understand. I just dislike the lack of consistency between the flow control mechanisms on Flowable and Single/Completeable i.e. for Flowable I call request to control flow but I need to hold off on subscription entirely for Single/Completeable.

I do get that Observable relates much better to Single/Completeable than Flowable, but I also feel that backpressure should be a default and only turned off if performance is truly a consideration - i.e. use Flowable until it is known that Observable is truly going to improve performance and transform an Observable to Flowable as soon as possible using one of the flow control mechanisms. (AFAIK the only reason Observable exists is for performance right?).

I don't see Single or Completable missing backpressure being a problem in
this specific case because Observable itself doesn't support it either. I
wouldn't expect the equivalent operators for Flowable to return Single.

On Wed, Aug 10, 2016, 8:44 PM Lalit Maganti [email protected]
wrote:

I understand. I just dislike the lack of consistency between the flow
control mechanisms on Flowable and Single/Completeable i.e. for Flowable
I call request to control flow but I need to hold off on subscription
entirely for Single/Completeable.

I do get that Observable relates much better to Single/Completeable than
Flowable, but I also feel that backpressure should be a default and only
turned off if performance is truly a consideration - i.e. use Flowable
until it is known that Observable is truly going to improve performance
and change Observable to Flowable as soon as possible. (AFAIK the only
reason Observable exists is for performance right?).

—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/4321#issuecomment-239047190,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEZfQ-Ml72NNlWwd_snzT3PnRHV9cks5qenB5gaJpZM4JfzIM
.

It's not necessarily a problem and I guess reworking Single is unfeasible anyway this close to 2.x RC. Just a small gripe I had :)

Backpressure is there to avoid buffer bloat with unknown length streams. Single has only one element to get buffered and generally not a problem.

Observable was introduced because some got annoyed with backpressure and want truly unbounded flows no matter the cost. It has lower overhead than with backpressure but generally, the late optimizations to Flowable put them quite close.

Personally, I'd get rid of all but Flowable because it can support all the other behaviors plus I don't have to do 3x the work to keep everybody symmetric in terms of features.

Yeah, even 1.x codebase is quite hard to maintain, 2.x with different Flowable/Observable makes it even harder to work with…

@akarnokd you can add basic support of Single and Consumable in 2.x (basically just make library architecture aware of them from the beginning instead of adding them post-factum as it was in 1.x), non-basic operators can be added by community according to its needs as it happens/happened in 1.x.

The reason for Single and Completable being in the main library is that people expect convenient transformations to them: Flowable.toSingle, Flowable.toCompletable(). If these returned SingleSource or CompletableSource, those don't provide operators at all and are practically useless. toXs need the full classes.

The alternative could be to make this dependence indirect via reflection:

Single<T> single = someFlowable.to(Single.class);

public Z to(Class<Z> target) {
   return (Z)target.getMethod("from", Publisher.class).invoke(null, this);
}

but even a cached method may be too expensive on some platforms.

Otherwise:

someFlowable.to(Single::from).blah().blah();

someSingle.toFlowable(); // Single library depending on Flowable library is okay

I was talking about this a bit more and noticed that reduce(Func2<R, T, R>) would have to return a Maybe<T> in the case that source Observable<T>/Flowable<T> was empty. I couldn't find any issues on thinking about supporting a Maybe<T> type. I'm also not sure if people are willing to support yet another type in rx java.

There's already an Optional and Try type!

On Wed, Aug 17, 2016, 5:44 PM George Campbell [email protected]
wrote:

I was talking about this a bit more and noticed that reduce(Func2 R>) would have to return a Maybe in the case that source Observable/
Flowable was empty. I couldn't find any issues on thinking about
supporting a Maybe type. I'm also not sure if people are willing to
support yet another type in rx java.

—
You are receiving this because you commented.

Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/4321#issuecomment-240558790,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEERkt4lVN62YrfObR5ZecushOv-Bxks5qg4C-gaJpZM4JfzIM
.

I'm not sure about Try but isn't java.util.Optional eager?

Ah I see. It's a custom Optional, not Java 8s, so we could make a lazy
factory, but it's not a good idea to reuse it for two behaviors.

On Wed, Aug 17, 2016, 7:09 PM George Campbell [email protected]
wrote:

I'm not sure about Try but isn't java.util.Optional eager?

—
You are receiving this because you commented.

Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/4321#issuecomment-240576699,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEeM0vdzXfveuBl7hN5wkylEDH5usks5qg5SagaJpZM4JfzIM
.

I've been going over method signatures trying to come up with a plan for all of the conversion methods:

| From\To | Completable | Maybe | Single | Flow/Observable |
| --- | --- | --- | --- | --- |
| Completable | | toMaybe() toMaybe(T) toMaybe(()->T) | toSingle(T) toSingle(()->T) | toFlow/Observable() |
| Maybe | toCompletable() | | toSingle(T) toSingle(()->T) | toFlow/Observable() |
| Single | toCompletable() | toMaybe() | | toFlow/Observable() |
| FlowObservable | toCompletable() | toMaybeLast() toMaybeFirst() | toSingleFirst(T) toSingleFirst(()->T) toSingleLast(T) toSingleLast(()->T) | |

From 5 reactive types I want to drop the methods like first, last, single, and ignoreElements since they are redundant. I would like to also take out the blocking* variants of the methods. Leaving the only the type specific blocking methods for each type.

This is the core of the changes that I'm working on the effects of which will ripple out to all of the unit tests.

Please don't remove the internal operators backing the original same-type behavior.

@abersnaze How is your progress on this? I held off changes to 2.x but RC3 time is nearing and I'd like to improve performance on various elements.

I've got a branch that compiles but they're unit tests that failing for reasons. I'll push my branch to my fork if you want to take over.

On Sep 20, 2016, at 03:23, David Karnok [email protected] wrote:

@abersnaze How is your progress on this? I held off changes to 2.x but RC3 time is nearing and I'd like to improve performance on various elements.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

No. I think it's best if I redo it step-by-step, one operator at a time to rule out test anomalies and also apply optimizations.

The operator .last() effect 100s of tests.

On Sep 20, 2016, at 08:03, David Karnok [email protected] wrote:

No. I think it's best if I redo it step-by-step, one operator at a time to rule out test anomalies and also apply optimizations.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

See #4570

What should happen with

  • single() vs toSingle(),
  • ignoreElements() vs toCompletable() ?

Should these be merged into the former or the latter of each?

I picked single, first, last, & ignoreElements over toSingle, toMaybe, & toCompletable because the former were more descriptive of what it does when there more than one value.

Thanks for the info.

first, last and reduce signal NoSuchElementException traditionally. Do you think we should have maybeFirst, maybeLast and maybeReduce to avoid the exception?

I think that if we are going to make a breaking change to the API that we should also take the opportunity to change the behavior (not throwing exceptions)

To be clear, this is what can happen:

Single<T> first(T defaultItem) and Maybe<T> first()
Single<T> last(T defaultItem) and Maybe<T> last()
Single<T> elementAt(long index, T defaultItem) and Maybe<T> elementAt(long index) <-- not sure
Single<T> single(T defaultItem) and Maybe<T> single() <-- looks odd

You could postfix "Element" onto the names which has precedence from ignoreElements (e.g., firstElement(), singleElement()). This makes the return type mismatch less odd looking.

Closing via #4573, #4574, #4576, #4579

Let me know if I missed some classic operator.

What about Observable<T> takeFirst(Predicate<T>). In my opinion it should become:

Maybe<T> takeFirst(Predicate<T>)
Single<T> takeFirstOrError(Predicate<T>

Actually, since all other such predicate versions were removed, takeFirst should be removed. Use filter().first() instead.

You want me to remove it?

Yes, but only the Api and please fix up the tests that used it.

What about Observable.timer()? That one only ever emits one item.

We need those to jumpstart their respective types without conversion.

Was this page helpful?
0 / 5 - 0 ratings