Node: async iterators/generators in stream.pipeline()

Created on 8 Apr 2019  Â·  10Comments  Â·  Source: nodejs/node

Apologies for all the code here.

We can consume a Readable stream using an async iterator:

// source: http://2ality.com/2018/04/async-iter-nodejs.html
async function printAsyncIterable(iterable) {
  for await (const chunk of iterable) {
    console.log('>>> '+chunk);
  }
}

printAsyncIterable(fs.createReadStream('my-file.txt', 'utf8'));

And we can use async generators similarly to how one would use a Transform stream:

/**
 * Parameter: async iterable of chunks (strings)
 * Result: async iterable of lines (incl. newlines)
 */
async function* chunksToLines(chunksAsync) {
  let previous = '';
  for await (const chunk of chunksAsync) {
    previous += chunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf('\n')) >= 0) {
      // line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

/**
 * Parameter: async iterable of lines
 * Result: async iterable of numbered lines
 */
async function* numberLines(linesAsync) {
  let counter = 1;
  for await (const line of linesAsync) {
    yield counter + ': ' + line;
    counter++;
  }
}

Then, we can "pipe" these together like so:

async function main() {
  printAsyncIterable(
    numberLines(
      chunksToLines(
        fs.createReadStream('my-file.txt', 'utf8')
      )
    )
  );
}
main();

That's neat, but also kind of hideous. What if we could leverage stream.pipeline() to do something like this?

async function main() {
  stream.pipeline(
    fs.createReadStream('my-file.txt', 'utf8'),
    chunksToLines,
    numberLines,
    printAsyncIterable
  );
}
main();

I'm unfamiliar with the guts of stream.pipeline()--and completely new to async iterators and generators--so don't know how feasible something like this is.

FWIW, the "hideous nested function calls" can be naively replaced by use of the godlike Array.prototype.reduce():

const pipeline = async (...args) => args.reduce((acc, arg) => arg(acc));

async function main() {
  pipeline(
    fs.createReadStream('my-file.txt', 'utf8'),
    chunksToLines,
    numberLines,
    printAsyncIterable
  );
}
main();

Reference: https://twitter.com/b0neskull/status/1115325542566227968

feature request stream

Most helpful comment

It's awesome that we now have streams.Readable.from() and readable[Symbol.asyncIterator], the only piece missing in the puzzle now is streams.Transform.from() with a signature like

class Transform<I, O> {
  static from<I, O>(generatorFn: (input: AsyncIterable<I>) => AsyncIterable<O>): Transform<I, O>
}

All 10 comments

cc @mcollina

So … basically, the desired feature here is that stream.pipeline() should transparently convert async generators to object-mode Transform streams?

@addaleax I guess that'd be pretty great. Readable#pipe too, maybe?

something something migration path from streams to async iterators and generators

if we can coerce async iterables to streams, that would enable some nice compat with whatwg streams. +1 from me

How can we coerce to a transform? What signature would you expect from a transform async iterator?

@mcollina I think any function that takes one argument, which is an async-iterable for Buffers or strings, and returns an async-iterable of that kind should work?

More generally, I think we could create an utility class that turns async-iterables of Buffers/strings into a Readable stream, and use that as a building block for the .pipe() utility.

https://github.com/mcollina/stream-iterators-utils/blob/master/README.md#utilstoreadablegenerator-opts this is a prototype implementation of that Readable pattern.

It's awesome that we now have streams.Readable.from() and readable[Symbol.asyncIterator], the only piece missing in the puzzle now is streams.Transform.from() with a signature like

class Transform<I, O> {
  static from<I, O>(generatorFn: (input: AsyncIterable<I>) => AsyncIterable<O>): Transform<I, O>
}

We added support for async generators in pipeline in v13.x. I believe that full-fills this topic. Please re-open of there is anything further to address.

Was this page helpful?
0 / 5 - 0 ratings