Think directed graphs of channels & coroutines with producers as message sources and actors as sinks. What's missing in this picture is:
e.g. NodeJS Streams uses similar concepts for high throughput and performance.
I'm proposing adding a couple of coroutines (note that these are early prototypes and not up to par with producer et al):
produce & actor, a transform is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine, and two channels to communicate with upstream and downstream coroutines.ReceiveChannel and send them to a downstream SendChannel. When the downstream SendChannel is part of a Channel, it returns the downstream channel's ReceiveChannel for further chaining (like a shell pipe sequence $ cmd1 | cmd2 | cmd3 ...).
This example reads blocks (default 1024) as ByteBuffers from a file and decodes the blocks to utf8.
val data : String //the file's contents
FS.createReader(inputFile.toPath())
.pipe(decodeUtf8())
.pipe(contents {
assertEquals(data, it.joinToString { "" })
})
.drainAndJoin()
createReader returns a ReceiveChannel wrapper around aReaddecodeUtf8 receives ByteBuffers and returns Stringcontents is a transform which returns a list of all messages after the channel closesLike the previous example, here we read blocks, convert them to utf8 strings and further split text into lines and count the number of lines.
val data : String //the file's contents
val lines = data.split("\n")
val listener = Channel<String>()
val count = async(coroutineContext) {
listener.count()
}
val teeListener = tee(listener, context = coroutineContext)
FS.createReader(inputFile.toPath())
.pipe(decodeUtf8())
.pipe(splitter)
.pipe(teeListener)
.pipe(contents {
assertEquals(lines.size, it.size)
})
.drainAndJoin()
assertEquals(lines.size, count.await())
splitLine splits incoming String blocks into individual lines and pushes each line as a message on its downstream channel.tee is a passthrough transform that replicates messages on the provided ReceiveChannelAs @jcornaz points out in the discussion below, transforms (with state) can be implemented as extensions of ReceiveChannel. The snippets (from this test) below contrast the two approaches (extensions are cleaner):
dataProducer() // emit chunks of 512 bites
.pipe(tee(verifyLength)) // verify that we're getting all of the data
.pipe(splitter(NL)) // split into lines
.pipe(counter(lineCounter)) // count lines
.pipe(splitter(WS, true)) // split lines into words (words are aligned)
.pipe(counter(wordCounter)) // count words
.drainAndJoin() // wait
dataProducer() // emit chunks of 512 bites
.tee(verifyLength) // verify that we're getting all of the data
.split(NL) // split into lines
.countMessages(lineCounter) // count lines
.split(WS, true) // split lines into words (words are aligned)
.countMessages(wordCounter) // count words
.drain() // wait
Question: is this something the team considers worth pursuing?
It's not clear to me how this new API better than standard map/flatMap.
Could you show some working, self-contained example? I would like to try to rewrite it using existing channel operators and compare results
produce and actor, transform gives you the ability to add context unlike map/flatMap. pipe, its basically a consume loop, but I imagine a coroutine is needed to host that loop. I added a (somewhat standalone) test to the repo (clone and build with gradle). Let me know if works for doing a comparison:
Like produce and actor, transform gives you the ability to add context unlike map/flatMap.
map, flatMap and every other channel operator accepting a suspending lambda also accept a context.
Re. pipe, its basically a consume loop, but I imagine a coroutine is needed to host that loop.
So could pipe be defined like this? :
suspend fun <E> ReceiveChannel<E>.pipe(destination: SendChannel<E>) =
consumeEach { destination.send(it) }
As far as I know such function is indeed missing in kotlinx.coroutines, and could be useful IMO. However I personally would name it "sendTo".
By context, I meant state (additional variables). e.g. in the actor example, var counter is tracked outside the channel.
In addition to consumeEach, pipe returns ReceiveChannel (where appropriate) for chaining, like a shell pipe chain: $ a | b | c | d.
There's a matter of best-approach w.r.t. chaining transformations: adding extensions to ReceiveChannel vs providing a reference to a pipe like function.
e.g. sendTo seems like a good extension to add, and even include in the stdlib, but should every "chainable transformation" be written as an extension? Extensions definitely look cleaner...
should every "chainable transformation" be written as an extension? Extensions definitely look cleaner
In my opinon: yes. It leads to code easier to read and understand.
And if transforms should be written as extension functions, then produce allows us to have any necessary mutable state.
Example:
```kotlin
fun
var count = 0
consumeEach { send(count++) }
}
I added an alternative using extensions (w/produce) vs transform.
Simiarl to #257
see also: KT-24223 Feature request: add the pipelines operators in Kotlin
https://youtrack.jetbrains.com/issue/KT-24223
Data-Driven Concurrency is cool !
see also: https://dl.acm.org/citation.cfm?doid=3154814.3162014
Building Scalable, Highly Concurrent & Fault Tolerant Systems - Lessons Learned
https://www.slideshare.net/jboner/building-scalable-highly-concurrent-fault-tolerant-systems-lessons-learned?from_action=save
Dataflow Concurrency
• Deterministic
• Declarative
• Data-driven
• Threads are suspended until data is available
• Lazy & On-demand
• No difference between:
• Concurrent code
• Sequential code
• Examples: Akka & GPars
This is kind-of duplicate of #254, so closing this as outdated
Most helpful comment
It's not clear to me how this new API better than standard map/flatMap.
Could you show some working, self-contained example? I would like to try to rewrite it using existing channel operators and compare results