Node: for await & Readable

Created on 3 Sep 2019  路  12Comments  路  Source: nodejs/node

Hi, I am trying to consume a readable with a for await loop.
When I create the simple stream below, the console output the error straight away instead of logging the data events first. Is it the expected behavior

Version: v12.9.1
Platform: Darwin Kernel Version 18.7.0

const { Readable } = require('stream')

async function* generate() {
  yield 1
  yield 2
  yield Promise.reject('Boum')
}

;(async () => {
  try {
    for await (const d of Readable.from(generate())) {
      console.log(d)
    }
  } catch (e) {
    console.log(e)
  }
})()

the output is

Boum

instead of the expected

1
2
Boum
stream

Most helpful comment

Note that we can consider this to be a problem for our Readable.from() implementation rather than our async iterator support. Essentially, defer calling destroy() until all chunks have been emitted. This can be easily achieved by setting highWaterMark: 1, and we may just set that as the default.

All 12 comments

I think this is probably correct (although probably surprising) since readable is supposed to be greedy.

In order to get the behaviour you want you need to set highWaterMark to 1, e.g.

for await (const d of Readable.from(generate(), { highWaterMark: 1 })) {
  console.log(d)
}

@mcollina: Thoughts? In the given example I believe the behaviour is correct since the generator is async.

However, using a sync generator has the same behavior and I'm not sure that is correct?

We might need to think about it a bit more. Look at the following:

async function* generate() {
  yield 1
  yield 2
  throw new Error('Boum')
}

;(async () => {
  try {
    for await (const d of generate()) {
      console.log(d)
    }
  } catch (e) {
    console.log(e)
  }
})()

The result is:

2
Boum

I think this _might_ be something we can fix. Specifically, the following is making the iterator exit eagerly:

https://github.com/nodejs/node/blob/63b056d8d4f0696254cd5fc40a69aee0157fc410/lib/internal/streams/async_iterator.js#L61-L86

I think we might want to put the promise rejection in the kLastPromise property queue instead: https://github.com/nodejs/node/blob/63b056d8d4f0696254cd5fc40a69aee0157fc410/lib/internal/streams/async_iterator.js#L155.

@benjamingr what do you think?

On second thought, I might be wrong, and this might be due for something else, or something that we cannot really fix.

Streams are greedy into emitting'error'. They emit an error as soon as it happens. In https://github.com/nodejs/node/blob/63b056d8d4f0696254cd5fc40a69aee0157fc410/lib/_stream_readable.js#L1223-L1236, we see that we call destroy eagerly and this is what triggers it.

As a short term measure, we should definitely document it.

@benjamingr what do you think?

That streams do watermarking and that they are greedy :]

To avoid the stream behaviour one can simple not convert their AsyncIterator to a stream.

Now I do believe we can "fix" this (on the async iterator side) by only emitting the error and destroying after we are done emitting data on the iterator but honestly I am not sure that would be better from a stream consumer PoV. In fact it would likely be worse.

So +1 on a docs change, -0.5 on changing the Symbol.asyncIterator.

Now I do believe we can "fix" this (on the async iterator side) by only emitting the error and destroying after we are done emitting data on the iterator

I think I鈥檇 be in favour of that, fwiw.

I think I鈥檇 be in favour of that, fwiw.

That was my first intuition too but I am having a hard time coming up with a real use-case in which this (buffering) behaviour is better. Namely, in all cases I considered I would rather recover from the error sooner rather than later. The cases I came up with were:

  • Persistent database change stream (I would rather re-connect and sync which I have to do anyway to listen to new changes).
  • Query results iterated with a cursor (if the query failed on some results, I would rather retry the query sooner rather than later).
  • An http body stream (I would need to recover and marginally find the incomplete data useful).

Edit: the theme of these is having to reconnect to establish synchronisation with the producer later on _anyway_.

On the other hand if I look at async iterators _without_ streams, such use cases are easy to come up with and are abundant in reactive systems (like the whole server flow being accepting requests from clients in a for...await )

The biggest issue I see here is that there is missing data that we are dropping that the user might be interested in. I am also not sure how else we can expose it. Basically the constraints are:

  • As a consumer if there is an error downstream I want to know about it as soon as possible.
  • As a consumer I never want to miss any data on the stream.

The biggest issue I see here is that there is missing data that we are dropping that the user might be interested in. I am also not sure how else we can expose it. Basically the constraints are:

  • As a consumer if there is an error downstream I want to know about it as soon as possible.
  • As a consumer I never want to miss any data on the stream.

Yeah, that鈥檚 what I鈥檓 thinking too. I鈥檇 also consider it more expected for (async) iterators to throw the exception only after providing data that was produced before the error, because in the typical (async) generator function approach, that鈥檚 when that exception would be have been created.

Note that we can consider this to be a problem for our Readable.from() implementation rather than our async iterator support. Essentially, defer calling destroy() until all chunks have been emitted. This can be easily achieved by setting highWaterMark: 1, and we may just set that as the default.

Note that running this other snippet produces a different output

function makeStreamV2() {
  return new Readable({
    objectMode: true,

    read() {
      if (this.reading) {
        return
      }

      this.reading = true
      this.push(1)
      this.push(2)
      this.emit('error', 'boum')
    }
  })
}

;(async () => {
  try {
    for await (const d of makeStreamV2()) {
      console.log(d)
    }
  } catch (e) {
    console.log(e)
  }
})()
boum

@mcollina did we resolve this or is this something someone needs to further look into?

This needs some decision to be made, and possibly some docs to be added, or the behavior changed.

Was this page helpful?
0 / 5 - 0 ratings