Rxjava: Observable<List<X>> to Observable<X> that emits each element in List<X>

Created on 21 Jan 2015  Â·  11Comments  Â·  Source: ReactiveX/RxJava

Is there a utility operator for converting from Observable<List<X>> to an Observable<X> that emits each element in List<X>?

Otherwise, are there any obvious optimizations for something like:

getChannels().switchMap(channels -> Observable.from(channels))

flatMap() could be used as well, depending.

Question

Most helpful comment

For anyone else who comes across this thread, I think that flatMapIterable will do the trick. You should be able to call .listObservable.flatMapIterable(items -> items) and get a stream of the items from the list that way.

All 11 comments

You can create an Operator for use in lift() quite easily. I think there might be generics problems in trying to create one that's applicable to all ? super Iterable<?>, but I really can't remember.

Quickly, for your example,

new Observable.Operator<T, List<T>>() {
  @Override public Subscriber<? super List<T>> call(final Subscriber<? super T> subscriber) {
    return new Subscriber<List<T>>() {
      @Override public void onCompleted() {
        subscriber.onCompleted();
      }

      @Override public void onError(Throwable e) {
        subscriber.onError(e);
      }

      @Override public void onNext(List<T> ts) {
        for (T t : ts) {
          subscriber.onNext(t);
        }
      }
    };
  }
};

Then you'd have:

getChannels().lift(unroll()).forEach(c -> c.doSomething());

Looks good to me. I would also add an isUnsubscribed check before emitting
onNext.
On 21 Jan 2015 19:32, "Jake Wharton" [email protected] wrote:

You can create an Operator for use in lift() quite easily. I think there
might be generics problems in trying to create one that's applicable to all ?
super Iterable, but I really can't remember.

Quickly, for your example,

new Observable.Operator>() {
@Override public Subscriber> call(final Subscriber subscriber) {
return new Subscriber>() {
@Override public void onCompleted() {
subscriber.onCompleted();
}

  @Override public void onError(Throwable e) {
    subscriber.onError(e);
  }

  @Override public void onNext(List<T> ts) {
    for (T t : ts) {
      subscriber.onNext(t);
    }
  }
};

}
};

_(Apologies for the lack of lambda... I'm in an Android project and too
lazy to open another)_

Then you'd have:

getChannels().lift(unroll()).forEach(c -> c.doSomething());

—
Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/issues/2473#issuecomment-70801229.

An alternative that means you don't have to write the low level Operator
implementation is to use compose with a Transformer implementation.
On 21 Jan 2015 20:30, "Dave Moten" [email protected] wrote:

Looks good to me. I would also add an isUnsubscribed check before emitting
onNext.
On 21 Jan 2015 19:32, "Jake Wharton" [email protected] wrote:

You can create an Operator for use in lift() quite easily. I think there
might be generics problems in trying to create one that's applicable to all ?
super Iterable, but I really can't remember.

Quickly, for your example,

new Observable.Operator>() {
@Override public Subscriber> call(final Subscriber subscriber) {
return new Subscriber>() {
@Override public void onCompleted() {
subscriber.onCompleted();
}

  @Override public void onError(Throwable e) {
    subscriber.onError(e);
  }

  @Override public void onNext(List<T> ts) {
    for (T t : ts) {
      subscriber.onNext(t);
    }
  }
};

}
};

_(Apologies for the lack of lambda... I'm in an Android project and too
lazy to open another)_

Then you'd have:

getChannels().lift(unroll()).forEach(c -> c.doSomething());

—
Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/issues/2473#issuecomment-70801229.

You can also use flatMapIterable like getChannels().flatMapIterable(l -> l).

Jake's example has two problems (and one minor issue):

  • does not chain with the downstream Subscriber and thus can't be properly unsubscribed,
  • disregards backpressure and emits all elements from the list regardless of how many was requested,
  • (minor) the already mentioned lack of isUnsubscribed check.

@zsxwing flatMapIterable looks like it may be close to what I want. But is there something like switchMapIterable? I don't think I actually need it in the case I have in mind, but I could see it coming up from time to time.

Possible documentation improvement: flatMapIterable is pretty hard to understand from the function documentation (see below), and the Wiki currently links to http://reactivex.io/documentation/operators/flatmap.html which doesn't describe flatMapIterable

Function documentation:

/* Returns an Observable that merges each item emitted by the source Observable with the values in an
 * Iterable corresponding to that item that is generated by a selector.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param <R>
 *            the type of item emitted by the resulting Observable
 * @param collectionSelector
 *            a function that returns an Iterable sequence of values for when given an item emitted by the
 *            source Observable
 * @return an Observable that emits the results of merging the items emitted by the source Observable with
 *         the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap-concatmap-and-flatmapiterable">RxJava wiki: flatMapIterable</a>
 */

/cc @DavidMGross for documentation.

@davidmoten Nice catch! I only meant to demonstrate it was possible by yourself, not suggesting an idiomatic solution. I went through our custom operators after seeing your post, though, and found some of those errors there as well. Eeek!

I'm closing this due to inactivity. If you have further questions, don't hesitate to reopen this issue or post a new one.

For anyone else who comes across this thread, I think that flatMapIterable will do the trick. You should be able to call .listObservable.flatMapIterable(items -> items) and get a stream of the items from the list that way.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

hoc081098 picture hoc081098  Â·  3Comments

dsvoronin picture dsvoronin  Â·  4Comments

dimsuz picture dimsuz  Â·  4Comments

paulblessing picture paulblessing  Â·  3Comments

philleonard picture philleonard  Â·  3Comments