Node: Different behavior between transform function and async generator using pipelines

Created on 19 Mar 2020  路  24Comments  路  Source: nodejs/node

  • Version: v13.11.0
  • Platform: Windows 10 64-bit
  • Subsystem: stream

What steps will reproduce the bug?

index.mjs:

import { pipeline, PassThrough } from 'stream'

async function* asyncGen(source) {
  for await (const chunk of source) {
    yield chunk;
  }
}

pipeline(process.stdin, asyncGen, process.stdout, err => {
  console.error('end')
})

then echo "hello" | node index.mjs ends up like this:

"hello"
end

but if you replace asyncGen in the pipeline by new PassThrough(), it ends up like this:

"hello"

As you can see, the callback is never called.

How often does it reproduce? Is there a required condition?

What is the expected behavior?

"hello"
end

What do you see instead?

"hello"

Additional information

I don't really know if it's an expected behavior or a bug to be honest. As I can't find anything related in the documentation, it feels like a bug.

help wanted stream

Most helpful comment

I think so, but because there's so much history here, let's give the mentioned folks some time to respond.

All 24 comments

@nodejs/streams

Minimal repro on master.

'use strict';

const common = require('../common');
const assert = require('assert');
const { pipeline, PassThrough } = require('stream');
const cp = require('child_process');

if (process.argv[2] === 'child') {
  pipeline(
    process.stdin,
    new PassThrough(),
    process.stdout,
    common.mustCall((err) => {
      console.error('end');
    })
  );
  return;
}

cp.exec([
  'echo',
  '"hello"',
  '|',
  `"${process.execPath}"`,
  `"${__filename}"`,
  'child'
].join(' '), common.mustCall((err, stdout, stderr) => {
  console.log(err)
  assert.ifError(err);
  console.log(stdout);
}));

Seems like stdout is never finished, even though the passthrough is writableEnded.

So this fails:

  pipeline(
    process.stdin,
    new PassThrough(),
    process.stdout,
    common.mustCall((err) => {
      console.error('end');
    })
  );

while this doesn't:

  pipeline(
    process.stdin,
    p.on('end', () => {
      process.stdout.end()
    }),
    process.stdout,
    common.mustCall((err) => {
      console.error('end');
    })
  );

@addaleax This does not seem related to pipeline or PassThrough it's seems to be something with process.stdout. This fails as well:

'use strict';

const common = require('../common');
const assert = require('assert');

if (process.argv[2] === 'child') {
  process.stdin
    .pipe(process.stdout)
    .on('finish', common.mustCall(() => {
      console.log('finish');
    }));
} else {
  const cp = require('child_process');

  cp.exec([
    'echo',
    '"hello"',
    '|',
    `"${process.execPath}"`,
    `"${__filename}"`,
    'child'
  ].join(' '), common.mustCall((err, stdout, stderr) => {
    assert.ifError(err);
  }));
}

The reason this probably works with pipeline + generator is because that path does not use pipe.

Maybe pipeline should prefer the 'readable' API when possible over pipe, would that provide better compatibility? @mcollina

I believe this is the culprit https://github.com/nodejs/node/blob/master/lib/_stream_readable.js#L654. Unfortunately I don't understand this part at the moment.

Basically pipe won't end() the dest if it is process.stdout. Not sure why that is the case. If there is a good reason for that, then maybe pipeline + generators shouldn't do it either?

If there is a good reason for that

https://github.com/nodejs/node/issues/7606#issuecomment-231293476

@vweevers: Does that mean

then maybe pipeline + generators shouldn't do it either?

Do we need a special case here in pipeline to complete once the stream before stdout has emitted 'end'?

Not sure we can do that? Before completing, we'd want to make sure the writes to stdout have been flushed, but stdout can be synchronous or asynchronous depending on platform and destination.

Alternative point of view: stdout is documented as being different from regular streams. The fact that stdout cannot be closed implies that a pipeline to stdout cannot complete. That's unfortunate, but logically solid.

If there is a good reason for that

7606 (comment)

It is a bit random though... since this only applies for the pipe protocol. We have 3 other "read" protocols where this does currently not apply.

The fact that stdout cannot be closed

It can be closed, it's just that pipe doesn't close it.

Alternative point of view: stdout is documented as being different from regular streams.

I can see that, but I can't see the documentation mentioning this particular thing? Should we add it?

Ah, my understanding of it was outdated. Since node v10.12.0 (see https://github.com/nodejs/node/pull/23053 and ERR_STDOUT_CLOSE) you can indeed close these streams. Before they used to throw an error, now they allow it but "secretly" don't close the underlying fd.

I would guess that the pipe behavior was needed to prevent that error. If so, we no longer need that workaround today. Perhaps @addaleax or @mcollina can shed some light on this. In the mean time I'll see if I can piece together the history.

Confirmed, the 9-year-old commit that introduced the now-defunct throwing behavior also introduced the pipe workaround: https://github.com/nodejs/node/commit/13324bf844d4527e91cf3777d3010aa4dca5f365

@vweevers: So is a possible fix to make pipeline actually call end() on the stdio stream?

src.pipe(dst);
if (dst === process.stdout) {
  src.on('end', () => dst.end());
}

I think so, but because there's so much history here, let's give the mentioned folks some time to respond.

So what should we do about this?

As far as I can tell, because of https://github.com/nodejs/node/pull/23053, we no longer need the 2nd and 3rd lines here:

https://github.com/nodejs/node/blob/78e6d2484fcacdee0cc0a1303e49a5328f3470dd/lib/_stream_readable.js#L652-L654

That would fix the issue.

I can PR a delete of the 2nd and 3rd lines if that's all it takes to solve.

@Xstoudi seems there is a little more to it unfortunately, https://github.com/nodejs/node/pull/32373#issuecomment-609897352

@ronag Do we not want to fix this for pipe()?

@ronag Do we not want to fix this for pipe()?

Was closed through the commit. I think that's a different issue than this though.

But yes, I think we should fix it in pipe() as well. pipeline should still have the the workaround though to support older versions of streams from e.g. readable-stream.

pipeline should still have the the workaround though to support older versions of streams from e.g. readable-stream.

Can we safely do that? Because we would be calling .end() twice.

Can we safely do that? Because we would be calling .end() twice.

I think calling end twice is safe in this case.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

seishun picture seishun  路  3Comments

willnwhite picture willnwhite  路  3Comments

cong88 picture cong88  路  3Comments

danialkhansari picture danialkhansari  路  3Comments

dfahlander picture dfahlander  路  3Comments