for rxjava 2.0.x
Anyway to create Observable<T> from Stream<T> ?
btw I'm not sure this feature exists on Rxjava 2.0.x now
but I had created this one for my personal use.
public class ObsearvableStream {
public static <T> Observable<T> fromCallable(Callable<Stream<T>> callable) {
UnicastSubject<T> subject = UnicastSubject.create();
Completable task = Single
.fromCallable(callable)
.doOnSuccess(stream -> stream.forEach(subject::onNext))
.toCompletable()
.doOnComplete(subject::onComplete)
.doOnError(subject::onError);
return subject.doOnSubscribe(d -> task.subscribe());
}
public static <T> Observable<T> fromStream(Stream<T> stream) {
UnicastSubject<T> subject = UnicastSubject.create();
Completable task = Completable
.fromAction(() -> stream.forEach(subject::onNext))
.doOnComplete(subject::onComplete)
.doOnError(subject::onError);
return subject.doOnSubscribe(d -> task.subscribe());
}
}
Using Observable.create would be much cleaner.
another version
public class ObsearvableStream {
public static <T> Observable<T> fromCallable(Callable<Stream<T>> callable) {
return Observable.create(emitter -> {
try {
callable.call().forEach(emitter::onNext);
emitter.onComplete();
} catch (Throwable t) {
emitter.onError(t);
}
});
}
public static <T> Observable<T> fromStream(Stream<T> stream) {
return Observable.create(emitter -> {
try {
stream.forEach(emitter::onNext);
emitter.onComplete();
} catch (Throwable t) {
emitter.onError(t);
}
});
}
}
Much nicer!
On Mon, Jan 30, 2017, 1:51 AM Siwapun Siwaporn notifications@github.com
wrote:
another version
public class ObsearvableStream {
public static <T> Observable<T> fromCallable(Callable<Stream<T>> callable) { return Observable.create(emitter -> { try { callable.call().forEach(emitter::onNext); emitter.onComplete(); } catch (Throwable t) { emitter.onError(t); } }); } public static <T> Observable<T> fromStream(Stream<T> stream) { return Observable.create(emitter -> { try { stream.forEach(emitter::onNext); emitter.onComplete(); } catch (Throwable t) { emitter.onError(t); } }); }}
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/5032#issuecomment-275990977,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEESO5TOD7cud9eDocFfMzycR2qf08ks5rXYh8gaJpZM4LxDu8
.
Or use the Jdk 8 interop library.
@akarnokd
I'm not strong on stream and iterator. but when I dig to your code there is Observable.fromIterable(() -> stream.iterator());.
Is there any performance impact when using stream.iterator() specially when stream is very large ?
@mapkuff It depends on what's behind the stream but the transformation itself is as low overhead as currently possible. It has way less overhead than what you have and support backpressure for Flowable out of box.
I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
@akarnokd
If stream come from a resource like fetching from database (Spring data jpa) and it need to be closed after using. Is there any problem with this ? (I'm not sure how Stream.iterator works.)
btw I did create a util which create observable from stream with ObservableInterop and manually close a stream but I want to omit my util if ObservableInterop work fine xD.
Best regards
I'm not sure at the moment if the interop closes a stream on its own but you can add doFinally and have it call close.
I know this ticket is closed, but is there any thought of moving some of the great Java 8 interop functionality into the RxJava library itself rather than requiring a 3rd party library?
Converting from a Stream to an Observable seems like a very worthy candidate, and I'm sure there are others, but just ran into it and was a bit dismayed to be honest that I couldn't do it natively.
Props to @akarnokd for making the interop library in the first place, but be great to see some of standard use-cases supported OOTB in RxJava.
As long as Java 8 support is not widespread well enough on Android, it is not feasible for us to up the JDK requirement. Besides, streaming a stream is a source-like operation that requires static method which can live anywhere.
The types are present in Android, you just can't use them on all API
levels. A way to do this is to use animal sniffer to ensure only APIs from
Java 6 and Android API 14 are used and then add exceptions to the check for
these factories. This is a common approach for pure-Java libraries
targeting Android.
On Thu, Feb 14, 2019, 2:18 PM David Karnok <[email protected] wrote:
As long as Java 8 support is not widespread well enough on Android, it is
not feasible for us to up the JDK requirement. Besides, streaming a stream
is a source-like operation that requires static method which can live
anywhere.—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/5032#issuecomment-463755802,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEeGrdcaFPDMVk33cK8Ukslsf4moyks5vNbaBgaJpZM4LxDu8
.
As long as Java 8 support is not widespread well enough on Android, it is not feasible for us to up the JDK requirement. Besides, streaming a stream is a source-like operation that requires static method which can live anywhere.
Possibly, but in that case your library could at least be adopted as an additional official RxJava library in that case rather than folding it into the core. The problem for me is developer experience, and Principle of Least Surprise. I see Observable.fromArray(), Observable.fromIterable() etc., and it's a real PITA when I can't simply do Observable.fromStream() but instead have to google and download a 3rd party library I'm not familiar with, to do something very common.
It's non-trivial also because we have to vet all 3rd party libraries to make sure there's a low security risk of including it in Production environments.
The fact Java 8 is already EOL and been out of years doesn't help either. But I do appreciate your effort to bridge the gap with your library, so thanks.
Most helpful comment
Or use the Jdk 8 interop library.