All the currently provided channel abstractions in kotlinx.coroutines are _hot_. The data is being produced regardless of the presence of subscriber. This is good for data sources and applications that are inherently hot, like incoming network and UI-events.
However, hot streams are not an ideal solution for cases where data stream is produced on demand. Consider, for example, the following simple code that produces ReceiveChannel<Int>:
produce<Int> {
while (true) {
val x = computeNextValue()
send(x)
}
}
One obvious downside is the computeNextValue() is invoked before send, so even when receiver is not ready, the next value gets computed. Of course, it gets suspended in send if there is no receiver, but it is not as lazy as you get with cold reactive Publisher/Observable/Flowable/Flux/Flow.
We need the abstraction for cold streams in kotlinx.coroutines that is going to be just as lazy, computing data in "push" mode versus "pull" mode of hot channels that we have now.
There are the following related discussions:
Let me add here that we are looking at an asynchronous analogue of a standard Kotlin's Sequence here. We _already_ can write the following code in Kotlin:
val s = buildSequence<Int> {
while (true) {
val x = computeNextValue()
yield(x)
}
}
and this code is perfectly lazy in the sense that computeNextValue is not invoked until sequence gets to be iterated and only actually requested values are ever compute. However, we cannot do arbitrary suspension from inside of buildSequence. We cannot delay, we cannot do asynchronous network requests, etc.
Hi,
I was writing a kotlinx-coroutine reactive (=cold) library based on reactive-streams. But following Elizarov advice on #201 I switched to a pure kotlin suspend version, strongly inspired by your StreamBenchmarks project.
Here is the library : Reactivity
It provides 2 cold producers (Reactor inspiration) :
As they are reactive objects, they start producing items only when a client calls a Terminal (final/consuming) operation function.
Reactivity is a multiplatform project, with common and platform specific tests and extensions. Providing for example platform specific Solo.toPromise in JS, Solo.toCompletableFuture or Stream.toMulti in JVM (in JDK8 project).
There are only a few operators right now (map, filter, delay) but they can be easily added as they are very simple.
I would be really happy if Reactivity can save time, or serve as source of inspiration for this issue
The biggest issue with Channel<T> in my experience is that .openSubscription() has to be called before operators can be applied. I personally really liked LiveData<T>'s approach.
It shipped with LiveData<T>, MutableLiveData<T>, and MediatorLiveData<T>, and two transformations, .map() and .switchMap{ }.
Implementing custom operators is easy, just add an extension function.
Unfortunately, LiveData<T> falls completely flat because it's not thread safe and highly coupled to the main thread.
Ultimately, my use case is that I need a lightweight RxJava for scenarios where I can't use RxJava but want a reactive-state based architecture, and I am hoping co-routines can solve this problem.
@ScottPierce We can open a separate issue about LinkedListBroadcastChannel (that's a broadcast channel implementation with UNLIMITED capacity) if that helps you. What's your specific use-case about it? Go ahead, create a separate issue and describe your use-case for it.
@pull-vert I'm not a big fan of Single-like abstraction. Solo is great name, btw, but the if we include it in the library, then we'll have three slightly different ways to asynchronously perform an operation that returns a value:
async { something } -- starts computation in background immediately to produce result later.async(start = CoroutineStart.LAZY) { something } -- starts computation only when it is requested, _sharing_ result with subsequent requests.solo { something } -- starts computation when it is requested and _every time_ it is requested do it again from scratch. They are all _different_ ways, but do we really need an abstraction for that last one? Consider this definition:
typealias Solo<T> = suspend () -> T
Isn't this functional type the Solo we are looking for? Do we really need to give it some other name like Solo? We can always declare all the extensions we might need directly on top of that suspend () -> T functional type.
Let me also quote Eric Meijer's tweet here: https://twitter.com/headinthebox/status/971492821151580160
I'd prefer a real type for a few reasons.
The typealias trick can go really far for lots of abstractions but that doesn't always mean it's appropriate. Would you do the same for a multi-value source in suspend (Consumer<T>) -> Unit or would it get a proper type?
These types can and will leak into Java and I'd much prefer a semantically named type than an arbitrary complex function type that only the Kotlin metadata can disambiguate. Ignoring the fact that there's a strong possibility these types will be partially usable from Java, the ability to do simple things like instanceof and isAssignableFrom checks against a real type will help this abstract to be wired into libraries like Retrofit, JAX RS, etc. Even just having them able to be received by Java code and converted into something else (like an RxJava type) goes a long way.
A Solo<T> is a specialization of a Many<T> (or whatever you want to call it) and thus should be polymorphic with respect to it. It's really convenient being able to return a Solo<T> from a call to a Many<T>.flatMap( T -> Many<T> ) function without requiring overloads, for example.
Eric's tweet does not make sense. Single is a factory of a coroutine/async+awaitable. Your typealias is evidence of this. And since the context of the tweet was Java, when we rasterize the typealias into a proper type (as the Java language requires) and desugar coroutines into their underlying form of callbacks (as the Java language requires) we're left with the exact shape of Rx's single so they're actually exactly the same thing.
I question the very need of Single/Solo. The typealias digression was just to demonstrate what this concept actually does. I question the very use-cases for Single/Solo. Why would you ever use it? Why would you need a dedicated type to denote a reference to a computation that reruns every time you ask for it? Why would you need this level of indirection? Your code will be much easier to understand if you use suspending functions directly.
To illustrate. Instead of:
interface A { fun doSomething(): Single<T> }
write
interface A { suspend fun doSomething(): T }
Instead of:
fun foo() = // return Single<R>
doSomething().map { it.transform() }
do
suspend fun foo() = // return R
doSomething().transform()
you can continue this example with flatMap, etc, etc. The code that does not use Single/Solo will be invariably more direct and easier to understand.
On the other hand, we need a dedicated abstraction for asynchronous streams of values simply because there is no other way to represent it in Kotlin, but via a separate type.
Fair point. I tend to think in a world where strong interop is a must. For
the core coroutines library I'd be happy with a single abstraction on a
multi-value suspending source.
On Thu, Mar 15, 2018 at 9:59 AM Roman Elizarov notifications@github.com
wrote:
I question the very need of Single/Solo. The typealias digression was
just to demonstrate what this concept actually does. I question the very
use-cases for Single/Solo. Why would you ever use it? Why would you need
a dedicated type to denote a reference to a computation that reruns every
time you ask for it. Why would you need this level of indirection? Your
code will be much easier to understand if you use suspending functions
directly.To illustrate. Instead of:
interface A { fun doSomething(): Single
} write
interface A { suspend fun doSomething(): T }
Instead of:
fun foo() = // return Single
doSomething().map { it.transform() }do
suspend fun foo() = // return R
doSomething().transform()you can continue this example with flatMap, etc, etc. The code that does
not use Single/Solo will be invariably more direct and easier to
understand.On the other hand, we need a dedicated abstraction for asynchronous
streams of values simply because there is no other way to represent it in
Kotlin, but via a separate type.—
You are receiving this because you commented.Reply to this email directly, view it on GitHub
https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-373385015,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEETtozZbVXUPMhCWfONpMXxbvREgyks5tenPQgaJpZM4SNhnC
.
Isn't this
suspend fun foo() = // return R
doSomething().transform()
eagerly executing doSomething and you call a method on the returned value T?
I'd think that given a suspend () -> T, you'd want to create a new function suspend () -> R that when invoked, calls the previous suspend function only then, transform its results and returns that as the value R, right?
Yes, but you wouldn't invoke the method until you needed the value (i.e., when you'd otherwise subscribe). When it's suspend functions all the way down you don't need the abstraction because you can just call the suspending function over and over.
I like the Solo = Single value cold emmitter because it can provide specific Operators, and some intermediate (= cold/not consuming) Operators on Multi can return a Solo :
fun <E> Solo<E>.merge(vararg others: Solo<E>): Multi<E>
inline fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): Solo<R>
fun <E> Multi<E>.first(): Solo<E>
See https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html to see some Operators of Flux that return Mono.
After that some Terminal (final/consuming) Solo specific extensions I mentioned before are useful :
fun <T> Solo<T>.toPromise(): Promise<T>
fun <E> Solo<E>.toCompletableFuture(
coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E>
The point above is that operators on a single async value and operators
which narrow a multi-async stream into a single value can be simple,
imperative suspending functions.
Conversion to a Promise or CF is straightforward when all you have is
suspending functions.
The most interesting case is multiple singles to a multi. It's conceptually
easy (we already do concatMap all the time often without realizing it) but
I'm curious how terse it can be made syntactically.
On Thu, Mar 15, 2018, 7:29 PM pull-vert notifications@github.com wrote:
I like the Solo = Single value cold emmitter because it can provide
specific Operators, and some intermediate (= cold/not consuming) Operators
on Multi can return a Solo :fun
Solo .merge(vararg others: Solo ): Multi
inline funMulti .reduce(initial: R, crossinline operation: (acc: R, E) -> R):): Solo
funMulti .first(): Solo See
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
to see some Operators of Flux that return Mono.After that some Terminal (final/consuming) Solo specific extensions I
mentioned before are useful :fun
Solo .toPromise(): Promise
funSolo .toCompletableFuture(
coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture—
You are receiving this because you commented.Reply to this email directly, view it on GitHub
https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-373555008,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEES6Bbp6kpWjmTomRcUT-CFrM2w8vks5tevlYgaJpZM4SNhnC
.
Hi,
I tried to picture some code to understand better the final shape.
I played with _Solo_, using a type alias can lead to some interesting collateral effects.
typealias AsyncCallable<T> = suspend () -> T
fun <E> AsyncCallable<E>.toCompletableFuture(
coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E> = future(coroutineContext) { invoke() }
fun main(vararg args: String) = runBlocking {
val deferred = async { "Hello" }
val deferredFuture = deferred::await.toCompletableFuture()
}
Then I tried to implement the concatMap, but I don't know if it is possible to do better.
interface AsyncSource<E> {
suspend fun consumeEach(consumer: suspend (E) -> Unit)
}
fun <E> buildAsyncSource(builder: suspend AsyncSourceBuilder<E>.() -> Unit): AsyncSource<E> = TODO()
interface AsyncSourceBuilder<E> {
suspend fun yield(item: E)
suspend fun yieldAll(items: AsyncSource<E>) = items.consumeEach { yield(it) }
}
suspend fun <E> AsyncSource<E>.concatMap(block: suspend (E) -> AsyncSource<E>): AsyncSource<E> =
buildAsyncSource {
[email protected] { item ->
yieldAll(block(item))
}
}
@pull-vert In kotlinx.coroutines terminal operations on a cold reactive streams (let's call it Multi for this example) that you mention will have the following signatures:
inline suspend fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): R
suspend fun <E> Multi<E>.first(): E
@JakeWharton I don't think we need any kind of "multiple singles to a multi" (merge) at all. Where can those "multiple singles" can come from? They can appear in a situation like this:
suspend fun listUsers(): List<User>
suspend fun getUserProfile(userId: UserId): UserProfile
Here we want to get a list of users, then get a profile for each user. In reactive Java world we'd define getUserProfile(user: UserId): Single<UserProfile> and we'd have to use some kind of merging operation on those Single types. With coroutines all we need is a plain map:
val profiles: List<UserProfile> = listUsers().map { getUserProfile(it.userId) }
Note, that sometimes you'd want to perform this map "in parallel" (but be careful that unlimited parallelism can exhaust your local resources and/or DOS your server unless you take appropriate precautions). We have separate issues about that (see #172 and #180).
@elizarov I understand and agree your example for terminal/consuming Operations.
But for me the "first" Operator must be Intermediate, that will suspend and send the Single first value of a Multi cold Source but should not be terminal/consuming.
I would prefer the resulting Solo/Single value to be cold and not consuming, to be able to chain Operators until the first terminal/consuming Operator (can have multiple consumers, each will receive all elements).
For exemple a stupid Reactive Operator chain
Multi
.fromValues("one", "two", "three")
.filter { it.startsWith("t") } // Intermediate cold = returns Multi with "two" and "three"
.first() // Intermediate cold = returns a Mono with "two"
.concatMap { it.splitAndDelay(15)} // Intermediate cold = returns a Multi with "t", "w" and "o" (adding a 15ms delay between each send)
.consumeEach { println(it)} // terminal/consuming operation printing "t", "w" then "o"
@pull-vert In a world of channels and cold streams we might have the following function defined in some API:
suspend fun subscribeToSomeStrings(): ReceiveChannel<String>
It performs an operation to subscribe to the data stream (establishing network connection, etc) and returns a channel of strings flowing from the server. It is hot. There is no much reason to defer the actual subscription until the terminal operation. It is better to "fail-fast" (return an error immediately) if we have a problem establishing communication channel to the server.
The actual processing of this hot data stream would use cold channels in intermediate steps, like this:
subscribeToSomeStrings() // establishes connection to datasource & returns a hot stream
.filter { it.startWith("t" } // intermediate - returns cold stream (not coroutine to filter it yet)
.first() // terminal, starts filtering incoming stream, returns a String
.split() // a regular operator on String, no need to wrap this operator
.delayEach(15) // intermediate - returns cold stream with delays between each items
.consumeEach { ... } // terminal
There is no much need for special "glue" operators that work on Single/Solo. You can apply regular functions (like String.split()) directly on the values that you have. Interestingly, if we replace the first line with a function that returns List<String> instead of ReceiveChannel<String>, then the code will compile and continue to work. It would just become more eager in processing the data.
And that is whole beauty. You can take a whole synchronous data processing pipeline with intermediate operators like filter, map and terminal operators like first and introduce asynchrony into it (mabe make a source asynchronous or maybe introduce some asynchronous request inside one of the map or filter operators) and it would continue to work without you having to change a single line of code. IDE will helpfully highlight with suspension marks in the gutter what pieces of your pipeline have become asynchronous.
@elizarov Thanks for this clarification I now understand your vision for introducing cold Intermediate Operators to Channels, not using new Coroutine for each Operator with a lot better performance than actual.
I am still a bit confused by this chaining of mixed hot and cold Operators, it will require good documentation to be sure in what state (hot instantly usable or cold that will require terminal consuming operator) we are at every step of the chain.
I wonder how this will interact with existing pure cold reactive libraries (rxjs, rxjava, reactor, java 9 Flow...), for example if we want to use kotlinx-coroutines between a cold Observable rxjava reactive Couchbase driver as Input and return a cold Mono
If I understand your example delayEach or map cold intermediate Operator will have to be declared as Extension on kotlinx ReceiveChannel, java 8 Stream, kotlin Sequence, kotlin Collection, kotlin Array, maybe more.
@pull-vert The mental model is simple. There are no cold operators. You just "setup your pipeline" (that is what cold operators do) that is later executed by the terminal operator. Since every chain of operators ultimately ends in a terminal operator, you don't really care what happens in between. It just works.
With respect to the kind of operators we need to provide, we have huge advantage over reactive libraries because we only need to define basic ones like map and filter. For example, there is no need to have ready-to-use delayEach(time) in the library, because it can be trivially implemented with .map { delay(time); it }
@elizarov Thanks again now I see and understand exactly the direction for this cold stream operators topic for kotlinx-coroutines.
I will continue enrich my pure cold reactive library for fun (strongly based on kotlinx-coroutines), and of course follow what is going on here!
Let me give you an update on the current concept that is on the table for the cold streams to solicit feedback.
The plan is to introduce an interface (tentatively called) Source<T> that represents an asynchronous source of elements of type T that can be consumed in a "push" fashion (you specify a suspending function to be sequentially invoked on each element). The contract for the Source interface would not specify how many times it can be consumed, so it seems logical that there will be different implementations:
ReceiveChannel<T> implements Source<T> and can be consumed just once. It is a hot source. The producer of the elements is active regardless of the presence of consumer. Note, that a channel is a kind of resource. It must be consumed exactly once or cancelled, or otherwise the producer coroutine is suspended forever.
BroadcastChannel<T> implements Source<T> and can be consumed multiple times. It is a hot source. The producer of the elements is active regardless of the presence of consumer and while the broadcast channel can be consumed multiple times or not at all, it is still a resource that must be canceled to cancel the producer coroutine.
source { .... } coroutine builder returns a cold implementation of the Source<T>. The producer coroutine is started for each consumer. It is not resource and need not be closed.
Various operators like filter, map, etc would be defined as extensions on Source and would return Source. Whether the resulting source can be consumed multiple times or not would depend on the original source and is not reflected in the type itself.
The later, however, is the main problem of this design. If some business function is declared like this:
fun requestElementsFromServer(): Source<Element>
then is not clear from its signature what requestElementsFromServer actually _does_. Does it actually perform an operation and returns a channel (which is resource that must be consumed to release its resources) or does it return a cold source that makes a request to the server each time it is consumed?
Now, if the function was declared like this:
fun requestElementsFromServer2(): ReceiveChannel<Element>
Then it would have been perfectly clear that requestElementsFromServer2 opens a communication channel with the server over the network and returns it, so that it is now responsibility of the caller to consume it or to close it.
The worst incarnation of this problem is that if you now do requestElementsFromServer2().filter { ... }, then the type of the result is Source<Element> and the fact it it must be consumed or cancelled is completely lost.
This seems to derail the idea of unification of channels and cold sources to avoid duplication of filter, map operators. To salvage this unification we need to adjust this plan somehow:
Either add some additional type parameter to the type of Source<T> to indicate the "resource-like" nature of sources that were build from channels and ability to "cancel" them.
Or declare that all sources are resource-like and must be consumed or cancelled and add an empty implementation of cancel operation for cold sources that result from source { ... } coroutine builder.
Thanks for the detailed feedback.
I am not sure using Source<Element> + extensions for cold stream would very nice from an API perspective. It does not sound "first class" support to me.
As you said, it is not clear from a signature returning Source<Element> what the function actually does.
Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions. Extensions are awesome, but I don't think they should be used to provide such basic functionality (even if IDEA does a good job to propose them, but Kotlin is not just IDEA, especially in the long term). I think I would prefer if filter and map would be part of a regular class or interface, with extensions only used to add additional capabilities if needed.
While I agree cold streams don't need to be closed, I am not sure it should be an empty implementation. Isn't it possible to cancel a cold stream after getting just a few elements?
So even if that seems not in the 2 solutions you proposed, why not just introducing a type dedicated for cold streams (that could implement Source<T> as well) that would implement map and filter with the right return type? From a signature perspective, the behavior would be explicit, that would avoid the need of a full extension based API, and ReceiveChannel, BroadcastChannel and this new type design would be more consistent.
Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions
This is a bit a side topic, but i'd like to say that extensions function cannot really be overuse IMO. If something can be written as an extension function then do it. It will in most cases (if not always) provide better separation of concerns, decoupling and help to fulfill many SOLID principles like interface segregation principle, and single responsibility principle. I "overuse" them for two years now an I'm not able to see any drawback, and see only benefits. The only limitation is when you could end up with many extensions functions with clashing signature.
Having filter, map and all other operators part of the interface would make the interface big, break the "interface segregation principle" and it'll be painful for the implementors.
Or declare that all sources are resource-like and must be consumed or cancelled and add an empty implementation of cancel operation for cold sources that result from source { ... } coroutine builder.
I would go for that personally. It's not a burden for the client code to ensure the source is consumed (thanks to onCompletion = consumes(), consume and consumeEach).
I wish to provide an example use case to explicit my concerns.
I want to provide an API to parse a file.
I use a non blocking file access, so coroutine works well.
I parse next block only when it is effectively requested, so it is a cold stream.
I keep the file open while reading, so I have to close it.
It is possible to consume this source to dispose it, but it is really expensive.
What I should use?
i agree with @elizarov and @jcornaz about map, filter and other operators as extension functions, leaving base API pure and simple.
I also agree with @sdeleuze : Cold extensions always returning Source is a problem for me.
I can understand that internal implementation of Source.map and Source.filter return a new Source Object, but IMO when user will use ReceiveChannel.map, it must return a ReceiveChannel.
Same the new Cold Channel.
Does writing operators for an interface (like ReceiveChannel<T>) create limitations compared to writing operators over a class (like Observable<T>)? I'd imagine the class can provide some nice encapsulation benefits with access to privately scoped variables.
How does implementing more complex operators like .switchMap work when the operators are written over extension functions on an interface? Is it even possible?
then is not clear from its signature what requestElementsFromServer actually does. Does it actually perform an operation and returns a channel (which is resource that must be consumed to release its resources) or does it return a cold source that makes a request to the server each time it is consumed?
The issue is that a stream being hot or cold would not be known by simply looking at the type? I don't feel that was ever a problem with Rx (in fact, I feel that this abstraction is an advantage of Rx), so why would it be a problem here?
Does writing operators for an interface (like
ReceiveChannel<T>) create limitations compared to writing operators over a class (likeObservable<T>)? I'd imagine the class can provide some nice encapsulation benefits with access to privately scoped variables.
Your operator is free to be a class. All of the RxJava operators are classes which deal with the ObservableSource interface and not Observable. The latter only exists because of limitations of the Java language.
Your operator is free to be a class.
I wasn't referring to the operator being a class, but to ReceiveChannel<T> being a class and not an interface.
All of the RxJava operators are classes which deal with the ObservableSource interface and not Observable.
👍 . My confusion here stems from .switchMap not existing in the official coroutines lib and not knowing how to implement it myself. Assuming it's possible with the current API, then my complaint about classes vs interfaces is invalid and doesn't add to this discussion.
It is possible to write ReceiveChannel<T>.switchMap using an extension: https://gist.github.com/hannesstruss/927ec8120d7cb312d80685f230d50c6e
(don't use this, it's probably broken, but the general idea works)
@fvasco
I want to provide an API to parse a file.
I use a non blocking file access, so coroutine works well.
I parse next block only when it is effectively requested, so it is a cold stream.
I keep the file open while reading, so I have to close it.
It is possible to consume this source to dispose it, but it is really expensive.
What I should use?
If you return a cold stream from your file-parsing API, then you should open the file only when there is a consumer. You should be able to write the corresponding file-parsing code like this:
fun parseFile(file: File): Source<Data> = source {
file.bufferedReader().use {
while (true) {
val line = it.readLine() ? break
send(parseLineToData(line))
}
}
}
In this case you get a properly cold source. Invoking parseFile(f) does nothing and its result can be safely ignored (no need to close it to release resources).
@pull-vert
I can understand that internal implementation of Source.map and Source.filter return a new Source > Object, but IMO when user will use ReceiveChannel.map, it must return a ReceiveChannel.
Returning a new channel on each map, filter, etc on another channel is extremely expensive. Channels are expensive multi-threaded abstractions. They should not be overused.
One of explicit design goals of this proposal is to make sure that the result of ReceiveChannel.map is NOT a channel. But what it is? It is some kind of source that is in-between hot and cold. Just like a resource it must be consumed or closed (or otherwise the coroutine sending data on the other side is lost forever), but it is NOT a channel. It it a weaker abstraction than a channel, since you cannot consume it from multiple coroutines in a _fan-out_ fashion. You can convert it to a channel, but you don't have to if all you plan to do is just to consume it once.
Hi @elizarov, you are right,
I wish to explain better my concerns adding a bit of code:
val data = parseFile(dataFile) .first { it.id == requiredDataId }
I suppose that we should consider a mechanism to dispose the source in the first function.
Reading your example lead me to reconsider:
source { .... } coroutine builder returns a cold implementation of the Source
. The producer coroutine is started for each consumer. It is not resource and need not be closed.
@sdeleuze
As you said, it is not clear from a signature returning Source
what the function actually does.
I indeed consider it as a weak point, yet people in Rx/Reactive world are actually used to that. They represent both hot and cold data sources by the same type in the type-system and they used to manually managing resources of hot sources if needed.
Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions. Extensions are awesome, but I don't think they should be used to provide such basic functionality (even if IDEA does a good job to propose them, but Kotlin is not just IDEA, especially in the long term). I think I would prefer if filter and map would be part of a regular class or interface, with extensions only used to add additional capabilities if needed.
Interface methods don't scale. We cannot and don't want to provide every conceivable operator that one might need, so we should design about the fact that other people are going to add their own extensions and those extensions should naturally fit just if they were provided by in the core library.
While I agree cold streams don't need to be closed, I am not sure it should be an empty implementation. Isn't it possible to cancel a cold stream after getting just a few elements?
Cancelling a cold after receiving a few elements would indeed be possible. The current idea that you'll have a dedicated cancel() function _in scope_ of consumeEach operator for that purpose. In this case, the producer block in source { ... } will get a cancellation exception and will run its finally sections to close all the resources it was using.
So even if that seems not in the 2 solutions you proposed, why not just introducing a type dedicated for cold streams (that could implement Source
as well) that would implement map and filter with the right return type? From a signature perspective, the behavior would be explicit, that would avoid the need of a full extension based API, and ReceiveChannel, BroadcastChannel and this new type design would be more consistent.
We can introduce a dedicated ColdSource<T> : Source<T> and define that source { ... } returns a ColdSource<T>. But what would filter and map be declared to return? It cannot be ColdSource, since you can apply filter to a ReceiveChannel and the result is not really cold....
@fvasco
I suppose that we should consider a mechanism to dispose the source in the first function.
The first function will cause the send inside the source { .... } block to throw a cancellation exception, thus exit and close a file.
@ZakTaccardi
The issue is that a stream being hot or cold would not be known by simply looking at the type? I don't feel that was ever a problem with Rx (in fact, I feel that this abstraction is an advantage of Rx), so why would it be a problem here?
In Rx the whole design is usually centered around lazy cold streams and lazy cold values (singles), so you only rarely get to interact with hot sources that are backed by something that needs to be closed.
However, we plan to integrate this new cold streams with channels (that do not really exist in Rx world) and that is why we might get this problem.
Generally it doesn't matter in Rx if a source is hot or cold as applying most operators will establish a chain of cold sources anyway. For example, given this setup:
PublishSubject<Integer> subject = PublishSubject.create();
Observable<Integer> observable = subject.filter(v -> v % 2 == 0).map(v -> v + 1);
subject.onNext(1);
subject.onNext(2);
none of the user functions get executed without subscribing to observable and the two onNexts get lost as subject doesn't have a consumer subscribed to it at all.
In a sense, you can know in Rx if a source is hot if the type name includes Subject or Processor, otherwise it's cold/doesn't matter. Hot sources are things that exist outside of your local context anyway and they run/execute likely beyond your control.
So I'm not sure why you'd want to differentiate between them in the type system in a way that somehow retains the hotness property over applied operators that don't change temperature (I can't imagine such an operator btw at the moment). Distinction based on cardinality does work (i.e., Single), trying to caputure the exception type does not (everything ends up as Observable<T, Throwable> after combinations).
@akarnokd Thanks for the useful perspective on Rx.
They chief reason that I'm asking this question is that coroutines provide an easy way to create a hot sources (that is outside of your control). The hot source with coroutines that is doing some kind of channel.send is being suspended until somebody receives (subscribes to) on the other side, as opposed to being lost with subject.onNext. However, this creates a problem that now you've effectively created a resource that must be consumed. There does not seem to a problem like that in the Rx world, so the absence of the corresponding distinction is quite natural for Rx.
However, this creates a problem that now you've effectively created a resource that must be consumed. There does not seem to a problem like that in the Rx world, so the absence of the corresponding distinction is quite natural for Rx.
In the Rx world subscriptions may be (and should be) disposed when done using it. Doing so will release underling resources if any.
Except the terminology ("cancel" for channel and "dispose" for observable), I don't see the difference...
is being suspended until somebody receives (subscribes to) on the other side
So practically you want to handle the cases where
The first case is a form of deferred (lazy) execution which you can emulate by flatMapping over a consumer object stream. So the input of such generator is a queue of consumers which consumer take a queue of items. The generator is enumerating on this queue-of-consumers and once one arrives, it creates the queue-of-items, calls the consumer with it and then starts a producer routine which then feeds the queue-of-items:
Queue<Consumer<Queue<T>>> consumerQueue = ...
// generator
consumerQueue.forEachAsync(consumer -> {
Queue<T> itemQueue = ...
consumer.accept(itemQueue);
async {
for (T item : someItems) {
itemQueue.send(item);
}
}
});
consumerQueue.send(queue -> {
for (T item : queue) {
whatEver(item);
}
});
For the second case, I believe you have to actually design the queue itself to not suspend on the send() side but suspend on the poll() side. Also for efficiency, you may want to design different queues for different retention modes. In addition, the queue could check if there was ever a poll() call and if so, allow suspension on the send() side, depending on the requirements of course.
@jcornaz In the Rx world subscriptions may be (and should be) disposed when done using it. Doing so will release underling resources if any.
Same here. Subscriptions are always resources. The conundrum we are having here revolves around the source/observables/streams themselves. A cold stream is typically a purely garbage-collected entity and does not hold any resources until subscribed to. However, a hot stream (channel) can be backed by an actual resource. Imagine a network server that accepts a connection from a client, represents it as a hot stream of bytes and passes it down to your code for processing. Now you have to ultimately close it even if you decide not to subscribe to it.
@akarnokd The primary use-case we have in mind is a "connected application" that has incoming and outgoing network connections. You can model on outgoing connection (client sockets) with a cold stream (that actually opens connection on subscription), but incoming connections (servers sockets) are inherently resources and they are more naturally modelled with hot streams (channels). If you program a connected application list that on top of an abstraction of channels, then lots of things like data flow control (backpressure) become easy. You don't usually need this "flatMapping over a consumer object stream", since a typical relation between a producer and consumer in such an application is one-to-one.
Now, the design challenge we have, if you let me rephrase it, is how to reuse implementation of all the operators like map, filter, etc both for those hot streams (channels) and for fully lazy Rx-like cold streams alike, given the fact that hot streams must be closed even if they are never subscribed to.
Would it perhaps work to annotate the Source type to both
fun parseFile(file: File): @Cold Source<Data> = source {
file.bufferedReader().use {
//...
}
}
fun postComment(comment: String) : @Hot @MustBeClosed Source<NetworkResult> {
return api.networkCall().whatEver()
}
RxJava didn't split hot vs. cold in 2.x.
Maybe, Single, and Completable are just T?, T, and Unit suspend functions.
No need for types.
On Sun, Aug 5, 2018, 2:20 PM Scott Pierce notifications@github.com wrote:
@elizarov https://github.com/elizarov A common type to bridge between
hot / cold abstractions seems like the wrong way to go. That sounds like
RxJava1 Observable, which was problematic. I understand this isn't
exactly an apples to apples comparison, but it was a significant
improvement when RxJava2 separated it's hot and cold sources into distinct
types. I understand that this would be potentially more code duplication,
but from a pure api / usability perspective, distinct types is the way to
go in my opinion.You mentioned above:
I'm not a big fan of Single-like abstraction
My team loves Singles, and also Completables, and even found some uses
for Maybes. We used them heavily in our applications and found them to be
an improvement over when we used only Observables everywhere. Our feeling
is that the library would be incomplete without these.Will you guys be providing something similar to Subject as well? That
would also be useful.Channels seem somewhat similar to Processors (although I've never
actually found a use for one). Is there a plan to add something that would
fill the role of Flowable?—
You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub
https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-410538121,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEXc_UxKnMwEUYOPn6tuVkj15AlZ6ks5uNzdWgaJpZM4SNhnC
.
unification of channels and cold sources to avoid duplication of filter, map operators
I'd like to be based on Clojure 1.7 Transducers. Java 8 have some transducers, called Collectors,such as Comparator.comparing, java.util.stream.Collectors.mapping、reducing、groupingBy、partitioningBy、summing 、Collectors.flatMapping(in Java 9) etc.
So a Java 8 Collector is a "reducing function",it's supplier fn is arity-0, it's accumulator fn is arity-2,it's arity-1 fn is finisher or completion. And Stream.collect() is similar to Clojure into function.
see : https://github.com/matthiasn/talk-transcripts/blob/master/Hickey_Rich/InsideTransducers.md
Kotlinx channels,kotlin Sequence, kotlin Collection, kotlin Array, maybe more, could provide a extension collect(Collector) in Java https://docs.oracle.com/javase/10/docs/api/java/util/stream/Stream.html#collect(java.util.stream.Collector)
Hence, in Source
val evens = Source.of(1,2,3,4).collect(Collectors.filtering(it %2 == 0) ,Collectors.toSet())
Transducers / Collectors are composable algorithmic transformations, separate the algorithm and the seq-like things.
https://clojure.org/reference/transducers
Kotlinx channels,kotlin Sequence, kotlin Collection, kotlin Array, maybe more, could provide a extension collect(Collector) in Java
Besides the fact it is not related to the topic of cold-stream IMO, a generic colector is not necessary, because we can write such terminal operators as extension functions. And actually, many are already provided by the standard library (for Sequence and Iterable) and in kotlinx.coroutines (for ReceiveChannel)
@SolomonSun2010, here's your example rewritten using Sequence:
// here `toSet()` is equivalent to `collect(Collectors.toSet())`, but in a more readable fashion.
val evens = sequenceOf(1, 2, 3, 4).filter { it % 2 == 0 }.toSet()
Are also available reduce, fold, groupBy, partition and others. So most of the time, you already have the needed collector. And if one wants to define a custom one, he can simply write it as an extension function.
thanks, I mean, separate the algorithm and the seq-like things, write these operators only in one place, such as in Collectors , eliminate duplication in all seq-like things.
Perhaps, some advanced type system (@raulraja : https://github.com/Kotlin/KEEP/pull/87 ) could eliminate duplication , or refactor technique in Scala 2.13:
https://scala-lang.org/blog/2018/06/13/scala-213-collections.html
@elizarov I guess you are super busy with the upcoming 1.3 release, but I would be curious to know your plans about that (IMO) key issue. Do you plan to fix it before Coroutines 1.0? After? Isn't there a risk that it could imply major changes in other part of Coroutines design?
@sdeleuze
I guess you are super busy with the upcoming 1.3 release, but I would be curious to know your plans about that (IMO) key issue. Do you plan to fix it before Coroutines 1.0? After? Isn't there a risk that it could imply major changes in other part of Coroutines design?
This will go after 1.0 release. It will be one of the first things we'll be working on. We've ruled out all the risk to the core abstractions (like jobs and etc) that we'll be finalizing in 1.0, but it will definitely affect Channel APIs, so for 1.0 release we'll label all the parts of Channel APIs that will be changing as a result of introducing lazy streams as Experimental, which would mean "you can use it, but note that it will change in one of the upcoming major updates".
FYI we have scheduled official Coroutines support in Spring Framework for upcoming 5.2 release expected first half of next year, and a cold stream abstraction seems mandatory for our use case since we will have to translate Flux cold stream to Coroutines.
So any early progress after 1.0 on that topic would be appreciated in order to allow us to expose the right APIs. For now Coroutines support incubates in Spring Fu.
@jcornaz
Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions
This is a bit a side topic, but i'd like to say that extensions function cannot really be overuse IMO. If something can be written as an extension function then do it. It will in most cases (if not always) provide better separation of concerns, decoupling and help to fulfill many SOLID principles like interface segregation principle, and single responsibility principle. I "overuse" them for two years now an I'm not able to see any drawback, and see only benefits. The only _limitation_ is when you could end up with many extensions functions with clashing signature.
Having
filter,mapand all other operators part of the interface would make the interface big, break the "interface segregation principle" and it'll be painful for the implementors.
Something Rust does in lots of its standard library traits (basically interfaces) is implement operators as default methods. One extra thing that allows is for a default operator implementation to be overridden when you know it can be implemented more efficiently for the specific type. It also still allows for adding operators while maintaining backwards compatibility.
So in Rust's Iterator<T> type, the only method you have to implement is next(), which provides the next element of the iterator, and by default all other methods are implemented in terms of that method. But any concrete type that implements Iterator<T> could also choose to manually implement any other operators when it makes sense, for example if the type supports random access, then skip(5) can be overridden like self.offset += 5 instead of for i in 0..5 { next(); }.
One reason I wanted to suggest default methods instead of extension functions is that I find extension functions hurt discoverability. A problem I've encountered with Okio is that they've deprecated lots of its static methods in favour of extension functions, but when using ByteArray.toByteString() instead of ByteString.of() you have to know what you're looking for, the extension function isn't available unless ByteString.Companion.* is imported in scope.
@angusholder, Using default implementation in interfaces unnecessarily pollutes and increase complexity of the interface. Extension functions are just helpers allowing a simpler usage of the interface, and they may even be declared in a separate module wherever it makes sense.
Remember that any type (but it is even more true for interfaces) should stay as simple and as short as possible.
The reason I wanted to suggest default methods instead of extension functions is that I find extension functions hurt discoverability.
I think the exact opposite. For me extension have great discoverability, because they automatically show up in auto-completion like if they were member of type.
While I agree with that interfaces should be kept short, I think that in the case of streams, map/filter/etc are all core functionality of the type. I want to be able to go look at the type and instantly see all the things I can do with it, but in using extension functions those functions get spread all over the codebase, in an attempt to pretend it's simpler than it is. Keeping the interface small feels like optimizing for people looking to implement the type, but far more people are going to be reading its code in order to consume it, so having its functionality all available in the interface itself better aids the common situation. If I just wanted to see a list of all the map/filter/etc functions, being extension functions I wouldn't know where to look, but with default methods, I can just do the obvious thing of ctrl+n "ReceiveChannel" to look at the interface body.
The largest use-case for extension functions was to take the place of static utility methods in Java where you wanted to add functionality to a class but couldn't because it wasn't your type, but here they are being written in the same project as the type,
Default methods are quite new in Java and Kotlin so in general there's not much guidance for their idiomatic uses, but in this case I think they're a better choice than extension functions: semantically they're in the place you'd expect to find them, and functionally they allow polymorphism.
I think that in the case of streams, map/filter/etc are all core functionality of the type
Not to me. The only core functionality is to be able to iterate over them. For instance in the Kotlin stdlib, interface Sequence defines a unique method iterator(), because that's the only responsibility of a Sequence. And then all operators map, filter, etc. are defined as extension functions, because they are only helper for using Sequence.
The largest use-case for extension functions was to take the place of static utility methods in Java where you wanted to add functionality to a class but couldn't because it wasn't your type, but here they are being written in the same project as the type,
Extensions functions are not only meant to extend types of third party libraries. They are very useful to achieve better decoupling, separation of concern and interface segregation. Look at the Kotlin standard library and how many extension functions are written over types provided by that same library. Extensions functions are really useful, no matter if you own the type or not.
When I look at an interface I want to see at a glance what is the contract. What does it mean to have an object implementing that interface? If that interface contains hundred methods with default implementations, it definitely makes harder for me to understand it.
If I just wanted to see a list of all the map/filter/etc functions, being extension functions I wouldn't know where to look
You probably have a hard time using the kotlin standard library then. Personally, if I want to know how I can use a type I press ctrl + space to see all members as well as extensions functions.
semantically they're in the place you'd expect to find them
I guess that's personal opinion. I'd expect to find them as extension functions.
and functionally they allow polymorphism
If polymorphism make sense, then yes it should be a member of the interface. But only in that case.
More details about the planning constraints we have on Spring Framework 5.2 (where we need this issue to be solved to expose our cold Flux in our Coroutines extensions to be able to claim real Coroutines support):
@elizarov Any news on when we'll see something concerning cold streams?
We are actively working on it.
We have finished the design phase and resolved most of the issues related to API surface and mental model; now we are prototyping it, the first eap build will arrive at the beginning of April (though I am not sure this one will be publicly announced and merged into upstream as "ready to use" feature)
Awesome! Looking forward to trying it out and providing feedback.
On Sat, Mar 16, 2019, 8:39 AM Vsevolod Tolstopyatov <
[email protected]> wrote:
We are actively working on it.
We have finished the design phase and resolved most of the issues related
to API surface and mental model; now we are prototyping it and there will
be the first eap build at the beginning of April (though I am not sure this
one will be publicly announced and merged into upstream as "ready to use"
feature)—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-473526722,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AA1LAYggq1dZLXc_cm5hO_S8CJDjH5vrks5vXOX8gaJpZM4SNhnC
.
@qwwdfsad do you plan to show an early version of the architecture before diving deep onto it? is there any reference or existing codebase you've taken?
@qwwdfsad I'm looking forward to the cold streams!
We have finished the design phase
Is there an online design doc or write-up somewhere what your team's current thinking is for the design? I'm writing up a presentation for DroidCon and would love to know the near future of cold channels. Thanks!
@streetsofboston We'll start with early access preview of the code (as experimental feature) and gradually add additional documentation. The design is actually extremely straightforward and mostly self-explanatory.
Thank you, @elizarov . Is the current thinking still like the ideas you put forward in July 2018?
https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-408379789
It had changed somewhat:
Flow<T>Flow<T>, but provide .asFlow() converters for all the appropriate types instead. There is no write-up (yet), but a documented preview-candidate: https://github.com/Kotlin/kotlinx.coroutines/commits/flow-eap
I see cancellation is implemented via throwing. Does the following pattern work?
var count = 0;
flow { while (true) emit(1) }
.retry { count++ % 2 != 0 }
.take(2)
.collect { System.out.println(it) }
If I understand correctly, this type of retry will get fromDownstream false the first time so that the predicate is invoked again, yieding a retry which then starts the flow over and take() will keep emitting beyond the limit.
This pattern does not work yet, early-terminating operators are one of our open questions actually (though this one can be trivially workaround in retry implementation).
Flow preview is now available in 1.2.0-alpha-2.
Note that preview features have no compatibility guarantees, read more about preview status here
Try it, use it, break it!
Any feedback is highly appreciated.
@qwertyfinger
Flow.collect is commonly named forEach, a collect should returns something _collected_ for me.
This collect looks to have two constraints, both are numbered 1 and both use "should" instead of "must".
Moreover the proposed example does not look related to the collect method, please reconsider it.
Moreover the proposed example does not look related to the collect method
It is an example of collect body. Will improve it, thanks.
About collect naming: we had a lot of debates about naming and reached the consensus with Flow, collect and emit.
I am not ready to duplicate this discussion again, especially in the form of Github issue; but at some point we will release a detailed write up/design document "why things are the way they are" and probably it will cover naming as well
concatenate is already known flatten, flatMap.
(flatMap)[https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map.html)
Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
I agree, please reconsider the name.
Moreover the behavior of this operator is subject to a custom threshold (consider a simple delay into the mapper).
I propose to force an explicit threshold for a concurrent mapper to avoid any kind of surprise.
Flow<T>.flatMap(mapper: suspend (value: T) -> Flow<R>): Flow<R>
Flow<T>.flatMap(concurrency: Int, bufferSize: Int = concurrency, mapper: suspend (value: T) -> Flow<R>): Flow<R>
Same for map, its behavior is completely different.
About collect naming: we had a lot of debates about naming and reached the consensus with Flow, collect and emit.
A collect with a Collector looks ok for me, for the other one I prefer cohesion with Sequence and Collection operators. Thanks for sharing the _status of the art_ of design.
Please consider to add a lazy plus operator and associateXXX/toMap terminal functions.
Also consider fun Flow.indexed() : IndexedValue
IterableflatMap.
Merges given sequence of flows into a single flow with no guarantees on the order.
This should be an optional, predictable and configurable behaviour.
As a final, personal consideration concurrency = 16 is good enough as concurrency = 42, both are wrong, I already explained my concerns in #1022
I was wondering how one could replicate combining two Flows with these APIs? It doesn't seem like there's support for select statements yet. Is that in the plans? (Thinking usage something like Rx's combineLatest)
No, select is not in the plans as it does not fit Flow push model well.
Something like combineLatest can be implemented via channels:
fun <T, R, V> combineLatest(first: Flow<T>, second: Flow<R>, combiner: (T, R) -> V): Flow<V> = flow {
coroutineScope {
val firstChannel = asChannel(first)
val secondChannel = asChannel(second)
whileSelect {
firstChannel.onReceive { value ->
// Combine values here
}
secondChannel.onReceive { value ->
// Combine values here
}
}
}
}
private fun <T> CoroutineScope.asChannel(first: Flow<T>): Channel<T> {
val channel = Channel<T>()
launch {
first.collect { value ->
channel.send(value)
}
}
return channel
}
@mikelikespie Using channels (or plain callbacks) only, I was also able to replicate a combineLatest analogue that works for any number of channels or callbacks (that's why you need to subclass it): https://github.com/LouisCAD/Splitties/blob/61a4035ba3b18270dbfb0828436b5045bff44919/sample/src/androidMain/kotlin/com/louiscad/splittiessample/extensions/coroutines/ConflatedValues.kt#L13
Do you have plans to catch up to RxJava's operators? Debounce, scan, retryWhen and quite a few very useful ones.
I've fiddled a bit with the alpha release and I tried to implement scan (reduce/fold are both terminal operators) and it seemed as simple as:
fun <T, R> Flow<T>.scan(
initial: R,
operation: suspend (acc: R, value: T) -> R
) = flow {
var accumulator = initial
emit(initial)
collect { value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}
I don't know if this implementation is safe enough, but so far it's looking really good.
Do you have plans to catch up to RxJava's operators? Debounce, scan, retryWhen and quite a few very useful ones.
We do. Could you please elaborate on "quite a few very useful ones"? :)
and I tried to implement scan
Yes, this is the intended implementation
We do. Could you please elaborate on "quite a few very useful ones"? :)
I only mentioned the first ones I've used a lot that came to mind, the complete list is big. :sweat_smile:
I suggest making the core Flow library/api not too big. Don't make it another/alternative implementation of Rx. We already have Rx.
If you're targeting multiplatform, you need another Rx.
Anyways, I don't think that'll be a problem since operators are just extension methods, so they could easily split advanced/complex operators into several libraries (if needed).
As long they are split into separate-libraries, I think it's fine :).
The core api of Flow should be limited, in my opinion. It's strength should be to be able to deal with 'cold' stream in an imperative/sequential programming style, much like Channels do for 'hot' streams.
I am not worried about the size of the core lib since most operators are concise, but indeed maybe keeping the API surface not huge in the Core could be great, especially since extension driven API design + easier custom operators could help developers to add additional capabilities in an easier way compared to Rx world.
Flow looks as suspend Sequence, but it uses the visitor pattern instead of iterator one.
Having a rich operator set may be a really nice to have, the downside is to force the developer to learn a wide set of vocabulary.
Unregarding the performances, the Channel is really easy to understand and use, it does not require any special coroutine context and expore a really limited operators set.
You can observe this issue by comparing Deferred with Java's CompletitionStage, the documentation of last one is really tedious to read and scatterbrained to remember.
Flow is a great candidate for the «I want X» parade. Two delay operators are already present, is planned the throttle operator?
Finally will this issue cover the Mono class, or it will become a typealias of CoroutineScope.() -> T?
For APIs, my take on this is that Flow intentionally targets Flux/Flowable/Observable uses cases, but those using Mono/Single/Completable/Maybe/CompletitionStage will just translate to suspending functions with Unit, nullable or non-nullable return types. For me that's comparable to Optional in Java versus just leveraging Kotlin's null-safety. The less noise there is by leveraging the language features, the better it is for developers.
I guess it is a matter of taste, but I like the level of abstraction and the flexibility that Flow provides compared to the various flavors of Channel.
How would Rx sharing operators like the combinations publish().refCount() or replay(1).autoConnect() work in a Flow environment?
Think about opening a ByteStream in a flow {}, applying operators like map, scan to it and wanting to share the resulting Flow.
My interpretation was that they were by design "cold" streams, thus the necessity for replaying or publishing isn't necessary.
All transformations on the flow, such as map and filter do not trigger flow collection or execution, only terminal operators (e.g. single)do trigger it.
publish is already there -- it is called produceIn. It is a terminal operator for a flow that pushes data from it to the outgoing channel (you can convert that channel to flow again). replay analogue is broadcastIn, but it does not yet support unlimited-size buffer.
No,
selectis not in the plans as it does not fitFlowpush model well.Something like
combineLatestcan be implemented via channels:fun <T, R, V> combineLatest(first: Flow<T>, second: Flow<R>, combiner: (T, R) -> V): Flow<V> = flow { coroutineScope { val firstChannel = asChannel(first) val secondChannel = asChannel(second) whileSelect { firstChannel.onReceive { value -> // Combine values here } secondChannel.onReceive { value -> // Combine values here } } } } private fun <T> CoroutineScope.asChannel(first: Flow<T>): Channel<T> { val channel = Channel<T>() launch { first.collect { value -> channel.send(value) } } return channel }
I'm trying to make combineLatest work but how is this supposed to work? Could you write a fully functional example on this? @qwwdfsad
Hi @PaulWoitaschek
I wish to note that in your asChannel proposal there is no guarantee of channel's processing order, so the combineLatest operator may combine _incorrect_ pairs.
In other words you can send:
and it combines the couples: (a, 1), (b, 1), (b, 2) or (a, 1), (a, 2), (b, 2).
So a Flow operator become not deterministic.
It's not my implementation, it's a quote from this thread I can't make sense of.
As a further consideration on combineLatest operator, maybe it is better to left out all not native operator to avoid any type of unexpected operator.
If combineLatest should be provided, it should be implemented only for Channel and make conversion (and any implied effects) explicit.
maybe it is better to left out
For me combineLatest is the operator.
Combining the latest events of multiple data sources and emitting a single data class for the view to render is the essence of how apps I write work.
Coordinating operators require a resumption primitive and basically a push-pull adapter to suspend either the generator or the consumer. I'm not that familiar with Kotlin continuations but I'm sure they can be programmatically created and resumed.
I tried to implement a trivial plus (a.k.a. concat) operator:
operator fun <T> Flow<T>.plus(other: Flow<T>): Flow<T> = flow {
[email protected](this)
other.collect(this)
}
Is emitAll more readable?
operator fun <T> Flow<T>.plus(other: Flow<T>): Flow<T> = flow {
emitAll(this@plus)
emitAll(other)
}
public suspend inline fun <T> FlowCollector<T>.emitAll(other: Flow<T>) {
other.collect(this)
}
Lastly
val s = sequence { yield(1) }
val c = GlobalScope.produce { send(2) }
val f = flow { emit(3) }
Why another operator name? Is yield or produce not enough?
At this moment I cannot see valid reason to not reuse stabilized concepts of Kotlin API, propose new names (or similar names with different semantics) can be messy for developers.
Flow looks really similar to Sequence for me, I suggest to spot the differences and similarities between Flow and Sequence API.
I was wondering that as well, should we favor consistency with Sequence or Rx ...
@PaulWoitaschek
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.selects.select
fun main() {
runBlocking {
val firstFlow = flowOf(1, 2, 3, 4).delayEach(1000)
val secondFlow = flowOf("a", "b", "c").delayEach(1500)
val combined = combineLatest(firstFlow, secondFlow) { a, b -> "$a$b" }
combined.collect { println(it) }
}
}
fun <T, R, V> combineLatest(first: Flow<T>, second: Flow<R>, combiner: (T, R) -> V): Flow<V> = flow {
coroutineScope {
var firstChannel: ReceiveChannel<T>? = asChannel(first)
var secondChannel: ReceiveChannel<R>? = asChannel(second)
var firstValue = firstChannel!!.receive()
var secondValue = secondChannel!!.receive()
while (firstChannel != null || secondChannel != null) {
emit(combiner(firstValue, secondValue))
select<Unit> {
firstChannel?.onReceive?.invoke { value ->
firstValue = value
if (firstChannel?.isClosedForReceive == true) firstChannel = null
}
secondChannel?.onReceive?.invoke { value ->
secondValue = value
if (secondChannel?.isClosedForReceive == true) secondChannel = null
}
}
}
}
}
private fun <T> CoroutineScope.asChannel(flow: Flow<T>): ReceiveChannel<T> =
produce { flow.collect { send(it) } }
Combining the latest events of multiple data sources and emitting a single data class for the view to render is the essence of how apps I write work.
(Conflated)BroadcastChannel and plain select fits my use case.
Moreover a select is not limited to two Flow.
As an alternative we are developing Reaktive library: https://github.com/badoo/Reaktive
As an alternative we are developing Reaktive library: https://github.com/badoo/Reaktive
That's really neat! Do you plan to add support for suspend functions and the concurrent operators? I need my takeUntil :D
As an alternative we are developing Reaktive library: https://github.com/badoo/Reaktive
That's really neat! Do you plan to add support for suspend functions and the concurrent operators? I need my
takeUntil:D
Thanks for feedback 😊 suspend functions are not planned and there are already a lot of operators like combineLatest, zip, merge, sample, debounce, throttle. I'll put takeUntill into the queue. You can also open an issue for that to keep track 😉
@fvasco If you remove the delayEach, it crashes:
"Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed".
If you change the delay to smaller numbers, i.e.
val firstFlow = flowOf(1, 2, 3, 4).delayEach(10)
val secondFlow = flowOf("a", "b", "c").delayEach(15)
You don't end up with the latest values:
2a
2b
3b
4b
Good catch @PaulWoitaschek! Don't use it, I'm not interested to engage myself on this issue.
I think it would be nice if FlowCollector was a CoroutineScope.
Then one could always safely start a coroutine from the flow builder and be sure that:
take(n), first() and any operator which do not wait until the end of the flow.@jcornaz Making or not making FlowCollector a CoroutineScope is a performance trade-off. Simple, sequential flow-chains do not need a coroutine scope, because they are sequential and involved no concurrency so providing CoroutineScope impl for them is going to be just a performance overhead (extra allocated objects). It would benefit more complex operators that involve concurrency, since otherwise these complex operators need to use coroutineScope { ... } themselves.
So far we are leaning to a performance-conscious choice -- keep simple operators faster, and it is Ok if more complex operators have to use coroutineScope occasionally when they need it.
@jcornaz I not agree with you, it is possible to build a coroutineScope inside the flow builder (with all proposed benefit).
Moreover most of them depends by a Job, a Flow pure concept should not depend from any Job or Dispatcher, I think.
This is why I am reluctant of the combineXxx operator, it easy can build for Channel, not for Flow, so it should be implemented for Channel instead of Flow.
@elizarov. It makes sense, thanks for the explanation.
@fvasco
Moreover most of them depends by a Job, a Flow pure concept should not depend from any Job or Dispatcher, I think.
I agree.
This is why I am reluctant of the combineXxx operator, it easy can build for Channel, not for Flow, so it should be implemented for Channel instead of Flow.
I think it is as easy/hard to build for both, and with the same benefits/problems (like the order of the output not being guaranteed). The only difference is that ReceiveChannel is less safe because it represents an open resource. Which is why I don't agree there. I think all operator could and should be available on Flow instead of ReceiveChannel because Flow is a safer API.
On a completely different topic:
Why FlowCollector doesn't have a declaration-site invariance?
Here is what I'd expect:
@FlowPreview
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
But here is what we have:
@FlowPreview
public interface FlowCollector<T> {
public suspend fun emit(value: T)
}
Why FlowCollector doesn't have a declaration-site invariance?
It's an oversight, I will fix it in flow-leftovers branch (target: 1.2.0)
What about combineLatest: it is implementable for both Flow and Channel in more or less the same way.
No new operators will be introduced to channels, existing ones will be eventually deprecated and hidden. Channel is a synchronization primitive, analogue of blocking queue in coroutines world, not a tool to build reactive pipelines on top of.
@elizarov
Making or not making FlowCollector a CoroutineScope is a performance trade-off. Simple, sequential flow-chains do not need a coroutine scope, because they are sequential and involved no concurrency so providing CoroutineScope impl for them is going to be just a performance overhead (extra allocated objects). It would benefit more complex operators that involve concurrency, since otherwise these complex operators need to use coroutineScope { ... } themselves.
Ok for the flow builder, but couldn't the block of flowViaChannel get a CoroutineScope?
Channel is nothing but a communication primitive for coroutines)flowViaChannel already needs to use coroutineScope. So it could just be passed to the block without any overhead.EDIT: Actually, in the case of a flowViaChannel wouldn't it make sense to get a ProducerScope?
@jcornaz I will answer instead of Roman.
This is actually a good idea, thanks for pointing it out.
We have some ideas about optional CoroutineScope in flow builder and this is very similar to flowViaChannel. I will focus on this after 1.2.0 and there are high chances flowViaChannel will have scope receiver in 1.2.1.
It is hard to track ideas and feature requests under this issue, we will close it right after 1.2.0. I've created #1081 to track progress on that
Some personal consideration regarding Limit.kt.
drop counts elements (https://github.com/Kotlin/kotlinx.coroutines/blob/5830d0102625a9149370f03d92c6c9c0478999c8/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt#L24), when count is reached there is no reason to increment skipped.
Should take work with count = 0?
Lastly in concatenate method the inner variable can be declared instead of the implicit it.
Closing this one as fixed in 1.2.0.
There is still a lot of open questions, missing operators and various design problems which is hard to track under this issue. If you feel that something is wrong or some functionality is missing, please create a separate issue for that.
Flow looks like both Sequence and Flowable (or Flux, or Publisher) and there is a lot of temptation to create issues like "Please add operator X because I've got used to it in $FRAMEWORK".
But please explain why you need such an operator and what domain-specific problem does it solve. We ask for it not because we don't want to add more operators, but because we want Flow to solve real problems in the most elegant way and it is not always operators people get used to. And understanding how Flow is used is crucial for us for further design.
A good example of how things are different in Flow is flatMap operator. It is hard to compose reactive applications without it but in coroutines world, we have suspending map operator, so most of the combination issues can be solved by a simple suspension, thus greatly decreasing use of flatMap or merge.
A good example of how things are different in Flow is flatMap operator. It is hard to compose reactive applications without it but in coroutines world, we have suspending map operator, so most of the combination issues can be solved by a simple suspension, thus greatly decreasing use of flatMap or merge.
Say then that you renamed flatMap to map, or that you made map behave like flatMap. Using incorrect nomenclature, even if it's to point out how superficially simple your solution is, may confuse users across frameworks because they have some preconceived expectations. One of them being that the lambda passed to a map function cannot and should not throw with the real-world consequence of mismatched expectations on different APIs (i.e. Set, List) that'll lead to crashes.
In some non-suspending reactive libraries, map and flatMap are different in two ways:
map is one-to-one (one value in, one value out), and flatMap is one-to-many.map is synchronous (must return immediately), and flatMap can be asynchronous, since it works with stream types.In this library, the two operators still have the first distinction, but not the second. RxJava, for example, has a flatMapSingle operator that is still one-to-one but allows the result value to be produced asynchronously. In Flow, you can just use a simple map for that. flatMap is still useful if you need to map a single input value to multiple output values.
I think this naming is a good choice for the library because the cardinality difference is the more important one – operators like flatMap and flatten are so-called because they take a two-dimensional thing (a list of lists, a stream of streams), and "flatten" it into a one-dimensional thing (list/stream of single items). This is also the _only_ distinguishing property in other APIs like Kotlin's list and sequence, which don't deal with asynchrony at all.
In this library, the two operators still have the first distinction, but not the second.
I'm not sure I follow, you can easily come up with examples that jump threads, for example map { async(CommonPool) { throw RuntimeException() }.await() } is asynchronous and legitimately throws. Each value can even jump to a different thread and require non-blocking await for the current thread.
In other libraries, assuming you're following the spec, you cannot fail or jump threads on map because it's synchronous and blocking and you cannot construct an "error value" such a throwing coroutine or Observable.error(). That's what flatMap is used for. Also, flatMap is not limited to multiple values on a stream, for example rx.Single has it.
flatMap exists to represent sequentiality of this wrapped/suspended value followed by another. There's some semantics on how to flatten streams so we came up with new names for them.
My small concern here is that by making map suspended you're recreating flatMap under another name and not setting the right expectations, nor is this library simpler for it.
Maybe I'm being too picky about it 🤪but there are precedents where this distinction mattered in other languages.
That's why he said it has first but not second. It's one-to-one but not
synchronous.
Anyway the suspending map is more like concatMap, not flatMap. You rarely
actually want one-to-many flatMap (which is like mergeMap).
On Sat, Apr 13, 2019, 12:53 PM Paco notifications@github.com wrote:
In this library, the two operators still have the first distinction, but
not the second.map { async(CommonPool) { 1 }.await() } is asynchronous
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-482838393,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEYbiEl-RItwVsggeJEgYLHniKHVsks5vggtwgaJpZM4SNhnC
.
and_then then? :D
I realize this response is a bit late, but just wanted to get some clarification on @elizarov's comment above:
publishis already there -- it is calledproduceIn. It is a terminal operator for a flow that pushes data from it to the outgoing channel (you can convert that channel to flow again).replayanalogue isbroadcastIn, but it does not yet support unlimited-size buffer.
I don't see how produceIn is equivalent to publish. publish and replay _both_ do multicasting, they differ in behavior upon subscription. As far as I can tell, only broadcastIn does multicasting. Neither have flexible replaying behavior (a conflated broadcast channel is similar to replay(1).refCount()).
@zach-klippenstein
You may be interested in this issue:
https://github.com/Kotlin/kotlinx.coroutines/issues/1086
Most helpful comment
We are actively working on it.
We have finished the design phase and resolved most of the issues related to API surface and mental model; now we are prototyping it, the first eap build will arrive at the beginning of April (though I am not sure this one will be publicly announced and merged into upstream as "ready to use" feature)