Node: readable[Symbol.asyncIterator]() does not resume streams

Created on 9 Jun 2020  路  18Comments  路  Source: nodejs/node

  • Version: v14.2.0
  • Platform: Linux
  • Subsystem: Ubuntu/Amazon AWS

What steps will reproduce the bug?

When creating a stream and pausing it, using Symbol.asyncIterator will not cause it to resume.

How often does it reproduce? Is there a required condition?

This happens always.

What is the expected behavior?

.resume() should be called

What do you see instead?

The promise will hang forever awaiting the stream to start.

Additional information

I've been working on creating a Transform stream that will take a HTTP1 Request or HTTP2 stream and convert the Buffer data to an object based on header information. But I do not want the data to be converted as it's inputted from the socket source. Therefore, I set up the pipe and pause it. That means when my application code desires to actually read it, then it starts transforming the input Buffer (eg: Buffer to UTF-8 to JSON).

Using .pipe(), attaching a 'readable' listener, or attaching a 'data' reader will automatically unpause the stream and let information flow. But when using readable[Symbol.asyncIterator]() it does not.

stream.on('readable', onReadable ); // OK stream.pipe(outputFileStream); //OK stream.on('data', onDataChunk); //OK const chunk = (await stream[Symbol.asyncIterator]().next).value; // Never resumes and hangs

You can see fuller example of the usage here: https://github.com/clshortfuse/webhoster/blob/3abaad9dd9852d063e5b127c782f759870664553/middleware/bufferdecoder.js#L223

I'm also using a third Passthrough stream to convert the source (Buffer) to string to Object. The idea is not to mutate the original source stream. That's why I want to pause it at the source, or else the StringDecoder will be processing automatically without an application-level request.

All 18 comments

I think the current behavior is correct for getting the iterator, but calling .next should probably attach a listener.

Thank you for the detailed report btw. cc @ronag @mcollina

Thanks. I wrote the syntax wrong. It should be:

const chunk = (await readable[Symbol.asyncIterator]().next()).value;

But yes, calling next() doesn't force a flowing state.

I'm not sure what happens here - when you call .next() it calls .read() on the stream and when you create an async iterator it adds a readable listener to the stream.

Any chance you can provide an isolated code sample? All the "trivial" stuff I'm trying works with the current impl.

Or is the confusion regarding async iterators not being in flowing mode (they are a pull abstraction and backpressure is way simpler in paused mode)?

I'll double check now. I was ready to migrate from express to this framework I wrote. Ran it, and the read pipeline stalled because I was running:

// req.stream.resume(); const iterator = req.stream[Symbol.asyncIterator](); const next = await iterator.next().then((result) => { // Never called if stream isn't resumed manually. console.log('got next', result); return result; }); const parameters = next.value;
Which I changed to:
````
/**

  • @param {Readable} readable
  • @return {Promise}
    */
    async function readStreamChunk(readable) {
    return new Promise((resolve, reject) => {
    readable.once('data', resolve);
    readable.once('error', reject);
    });
    }

const parameters = await readStreamChunk(req.stream);
````

and it stopped hanging. I figured it was because the .pause(). Stepping through doesn't return after /internals/async_hooks.js. That was on v14.2.0. If I uncomment the .resume() on the top line, then the asyncIterator works fine.


I'll try to get you a small reproducible now.

Note that async iterators on streams use on('readable') and .read() so the state will never be flowing. You need to call .return() to destroy the stream.

and it stopped hanging

I think it's probably a good idea for you to check out how stream modes work.

Just adding a "data" listener is probably not a great fix but might be suitable for what you're doing.

It was my assumption that iterating through a stream would unpause it, like iterating via 'data' events..

All Readable streams begin in paused mode but can be switched to flowing mode in one of the following ways:

  • Adding a 'data' event handler.
  • Calling the stream.resume() method.
  • Calling the stream.pipe() method to send the data to a Writable.

My point is that asyncIterator doesn't resume a stream. I'm not sure if that's an oversight or fully intentional. I have to look back at if i used readable or data, but based on documentation, it must have been data. Worst case, it's a bug. Best case, perhaps some clarity might be needed. I'm now assuming asyncIterator only iterates through what's inside the internal buffer array (ie: what's called by Readable.push()). And on unpaused streams, it notifies as data as it's received, but on paused streams, it won't do anything.

Regardless, here's the reproducible:

````js
import { PassThrough } from 'stream';

let buffer = '';

const source = new PassThrough();
const stringDecoder = new PassThrough({ encoding: 'utf-8' });
const objectDecoder = new PassThrough({
objectMode: true,
defaultencoding: 'utf-8',
transform(chunk, encoding, callback) {
buffer += chunk;
callback();
},
flush(callback) {
callback(null, JSON.parse(buffer));
},
});

source.pipe(stringDecoder).pipe(objectDecoder);
source.pause();
objectDecoder.on('pause', () => source.pause());
objectDecoder.on('resume', () => source.resume());

setTimeout(async () => {
let result;
if (Math.random() >= 0.5) {
console.log("Waiting for 'data' event...");
result = await new Promise((resolve) => objectDecoder.on('data', resolve));
} else {
console.log('Waiting for asyncIterator...');
result = (await objectDecoderSymbol.asyncIterator.next()).value;
}
console.log('Result:', result);
}, 1000);

setTimeout(() => {
console.log('Writing...');
source.write(Buffer.from(JSON.stringify({ key: 'value' })));
source.end();
}, 2000);

setTimeout(() => {
console.log('Bye!');
}, 5000);

````


All summarized: asyncIterator iterates through 'readable' events and not 'data' events (that causes resuming). That was my mistaken assumption.

All summarized: asyncIterator iterates through 'readable' events and not 'data' events (that causes resuming). That was my mistaken assumption.

Apart from this, what problems does this cause?

@mcollina Not exactly a "problem", per se, just that means I can't use asyncIterator on paused streams.

That means my issue seem to be more be architectural rather than a bug in NodeJS. My architecture is targeting this pipeline:

httpStream => zlibTransformer => utf8Transformer => objectModeTransformer

The application layer will only interact with the last leg of the pipeline, meaning the transforms are optional and transparent to the application. The objective is to not call zlib, utf8, or object transforms until there is a read request in order to reduce needless computation. Therefore the HTTP stream is paused, and I create a listener for when the last leg is resumed to also resume the original. But the conundrum is the transforms will never report readable since that doesn't happen unless the pipeline is flowing and a transformation happens (.push() is called). When I do open up this library, I'll have to inform users that readable won't ever fire if using transforms. Unfortunate, but seems to be the case.


I believe most of my confusion came from this line in the documentation regarding transforms:

Writing data while the stream is not draining is particularly problematic for a Transform, because the Transform streams are paused by default until they are piped or a 'data' or 'readable' event handler is added.

To be honest, if that is the case, I don't understand why readable or asyncIterator (which apparently iterates with 'readable') don't resume the stream. Or perhaps it is resuming the stream and the event is never emitted?

I solved my architectural issue by overriding ._read() in the last transform via the options :

read(...args) { if (source.isPaused()) source.resume(); Transform.prototype._read.call(this, ...args); },

I'm still confused as to why 'readable' doesn't resume the Transforms like it says in the documentation though.

To be honest, if that is the case, I don't understand why readable or asyncIterator (which apparently iterates with 'readable') don't resume the stream. Or perhaps it is resuming the stream and the event is never emitted?

Since async iterator is basically "read one element and then wait". You would be ping ponging back and forth between flowing and non flowing every time next() is called.

I'm still confused as to why 'readable' doesn't resume the Transforms like it says in the documentation though.

What does "resume" mean for you in this context?

What does "resume" mean for you in this context?

To my understanding, pause and resume are referencing flowing mode. A readable stream is paused (not flowing) by default. If you add a pipe, 'data', or literally call .resume() then it'll change to flowing mode.

Transform is listed as a special case where also adding 'readable' will also make it flow. So what is described as

Transform streams are paused by default until they are piped or a 'data' or 'readable' event handler is added.

means:

  • if a paused Transform has a 'readable' event listener added to it, then it is resumed (allowing data to flow)

That means I can take a Transform, pause it, add an 'readable' listener and it should start flowing.

But in my experience, either the resume (flowing) doesn't happen (and the documentation is wrong), or it does resume and 'resume' event is never fired.

To my understanding, pause and resume are referencing flowing mode.

It's not quite that simple.

Also that's internal. What does it mean for you as a user of the stream? What is the observable effects you are expecting/referring.

I assume you mean 'data' events are emitted?

I'm quoting the documentation:

https://github.com/nodejs/node/blame/master/doc/api/stream.md#L625

I expect 'resume' to be fired.

This works:

````js
import { Transform } from 'stream';

const transform = new Transform({
transform(chunk, e, callback) {
callback(null, chunk);
},
});
transform.on('resume', () => console.log('resume fired'));
transform.on('data', () => {});
````

Changing the last line to 'readable' instead of 'data' does not fire 'resume'. I'm starting to think the documentation is wrong. As to how I arrived to asyncIterator, I had trusted that 'readable' would fire a 'resume' in Transform (again, as documented), and asyncIterator is an outlier. But I would think they're both working as intended and 'readable' is never meant to fire 'resume', despite what the documentation says.

Yea, this is a bit confusing. 'readable' "resumes" the stream by making it flowing, which is not strictly the same as resume() which emits the 'resume' event... maybe it should I'm not sure (@addaleax)... the docs might need some clarification either way... maybe streams are not flowing by default?

Yeah, I see what you're saying by resuming internally. In the context of the documentation, 'pause/resume' generally refers to the flowing state (readableFlowing), whose change is emitted via 'pause', and 'resume'. That's where I got lost.

This is the original commit and line in question:
https://github.com/nodejs/node/commit/e9044c83a9b997bde60432cd056d36e3a7d8d1e3#diff-94e62e3974692b345ca60e0139a9291eR463

In the end, I figured out all my issues. My code wasn't compatible with 'readable' and not just asyncIterator because it relied on 'resume' being fired. So thanks to everyone for your patience.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Trott picture Trott  路  87Comments

speakeasypuncture picture speakeasypuncture  路  152Comments

benjamingr picture benjamingr  路  135Comments

VanCoding picture VanCoding  路  204Comments

aduh95 picture aduh95  路  104Comments