Kotlinx.coroutines: Creating DAGs with channels - proposing transforms and pipes

Created on 7 Apr 2018  Â·  12Comments  Â·  Source: Kotlin/kotlinx.coroutines

Directed Graphs

Think directed graphs of channels & coroutines with producers as message sources and actors as sinks. What's missing in this picture is:

  • intermediate transform nodes
  • graph edges / interconnects or piping

e.g. NodeJS Streams uses similar concepts for high throughput and performance.

The Proposal

I'm proposing adding a couple of coroutines (note that these are early prototypes and not up to par with producer et al):

  • transform coroutine - like produce & actor, a transform is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine, and two channels to communicate with upstream and downstream coroutines.
  • pipe coroutine - a stateless coroutine that consumes messages from a ReceiveChannel and send them to a downstream SendChannel. When the downstream SendChannel is part of a Channel, it returns the downstream channel's ReceiveChannel for further chaining (like a shell pipe sequence $ cmd1 | cmd2 | cmd3 ...).

dag

Example 1

This example reads blocks (default 1024) as ByteBuffers from a file and decodes the blocks to utf8.

val data : String //the file's contents

FS.createReader(inputFile.toPath())
 .pipe(decodeUtf8())
 .pipe(contents {
   assertEquals(data, it.joinToString { "" })
 })
 .drainAndJoin()
  • createReader returns a ReceiveChannel wrapper around aRead
  • decodeUtf8 receives ByteBuffers and returns String
  • contents is a transform which returns a list of all messages after the channel closes

Example 2

Like the previous example, here we read blocks, convert them to utf8 strings and further split text into lines and count the number of lines.

val data : String //the file's contents
val lines = data.split("\n")

val listener = Channel<String>()
val count = async(coroutineContext) {
  listener.count()
}
val teeListener = tee(listener, context = coroutineContext)

FS.createReader(inputFile.toPath())
  .pipe(decodeUtf8())
  .pipe(splitter)
  .pipe(teeListener)
  .pipe(contents {
    assertEquals(lines.size, it.size)
  })
  .drainAndJoin()

assertEquals(lines.size, count.await())
  • splitLine splits incoming String blocks into individual lines and pushes each line as a message on its downstream channel.
  • tee is a passthrough transform that replicates messages on the provided ReceiveChannel

Current Alternatives

As @jcornaz points out in the discussion below, transforms (with state) can be implemented as extensions of ReceiveChannel. The snippets (from this test) below contrast the two approaches (extensions are cleaner):

With Transforms/Pipes

 dataProducer()                 // emit chunks of 512 bites
  .pipe(tee(verifyLength))      // verify that we're getting all of the data
  .pipe(splitter(NL))           // split into lines
  .pipe(counter(lineCounter))   // count lines
  .pipe(splitter(WS, true))     // split lines into words (words are aligned)
  .pipe(counter(wordCounter))   // count words
  .drainAndJoin()               // wait

With Extensions

dataProducer()                  // emit chunks of 512 bites
  .tee(verifyLength)            // verify that we're getting all of the data
  .split(NL)                    // split into lines
  .countMessages(lineCounter)   // count lines
  .split(WS, true)              // split lines into words (words are aligned)
  .countMessages(wordCounter)   // count words
  .drain()                      // wait

Question

Question: is this something the team considers worth pursuing?

enhancement

Most helpful comment

It's not clear to me how this new API better than standard map/flatMap.
Could you show some working, self-contained example? I would like to try to rewrite it using existing channel operators and compare results

All 12 comments

It's not clear to me how this new API better than standard map/flatMap.
Could you show some working, self-contained example? I would like to try to rewrite it using existing channel operators and compare results

  • Like produce and actor, transform gives you the ability to add context unlike map/flatMap.
  • Re. pipe, its basically a consume loop, but I imagine a coroutine is needed to host that loop.

I added a (somewhat standalone) test to the repo (clone and build with gradle). Let me know if works for doing a comparison:

Like produce and actor, transform gives you the ability to add context unlike map/flatMap.

map, flatMap and every other channel operator accepting a suspending lambda also accept a context.


Re. pipe, its basically a consume loop, but I imagine a coroutine is needed to host that loop.

So could pipe be defined like this? :

suspend fun <E> ReceiveChannel<E>.pipe(destination: SendChannel<E>) =
    consumeEach { destination.send(it) }

As far as I know such function is indeed missing in kotlinx.coroutines, and could be useful IMO. However I personally would name it "sendTo".

By context, I meant state (additional variables). e.g. in the actor example, var counter is tracked outside the channel.

In addition to consumeEach, pipe returns ReceiveChannel (where appropriate) for chaining, like a shell pipe chain: $ a | b | c | d.

There's a matter of best-approach w.r.t. chaining transformations: adding extensions to ReceiveChannel vs providing a reference to a pipe like function.

e.g. sendTo seems like a good extension to add, and even include in the stdlib, but should every "chainable transformation" be written as an extension? Extensions definitely look cleaner...

should every "chainable transformation" be written as an extension? Extensions definitely look cleaner

In my opinon: yes. It leads to code easier to read and understand.

And if transforms should be written as extension functions, then produce allows us to have any necessary mutable state.

Example:
```kotlin
fun ReceiveChannel.myStatefulOperator(): ReceiveChanne = produce {
var count = 0
consumeEach { send(count++) }
}

I added an alternative using extensions (w/produce) vs transform.

Simiarl to #257

see also: KT-24223 Feature request: add the pipelines operators in Kotlin
https://youtrack.jetbrains.com/issue/KT-24223

Data-Driven Concurrency is cool !
see also: https://dl.acm.org/citation.cfm?doid=3154814.3162014

Building Scalable, Highly Concurrent & Fault Tolerant Systems - Lessons Learned
https://www.slideshare.net/jboner/building-scalable-highly-concurrent-fault-tolerant-systems-lessons-learned?from_action=save

Dataflow Concurrency
• Deterministic
• Declarative
• Data-driven
• Threads are suspended until data is available
• Lazy & On-demand
• No difference between:
• Concurrent code
• Sequential code
• Examples: Akka & GPars

This is kind-of duplicate of #254, so closing this as outdated

Was this page helpful?
0 / 5 - 0 ratings