Node: stream.pipeline does not wait for the last stream to flush before calling the final callback

Created on 9 Jul 2020  路  8Comments  路  Source: nodejs/node

  • Version: v12.8.0
  • Platform: macOS 10.15.4 (Catalina)
  • Subsystem: stream

What steps will reproduce the bug?

// reproduce.js

const util = require('util');
const stream = require('stream');

const call = async (fn, ...args) => fn(...args);

const map = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      (modified) => cb(null, modified),
      (error) => cb(error),
    );

  return tx;
};

const tap = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      () => cb(null, chunk),
      (error) => cb(error),
    );

  return tx;
};

const fork = (...t) => {
  let done;
  let doneError;
  let flush;

  const tx = new stream.Transform({ objectMode: true });
  const pt = new stream.PassThrough({ objectMode: true });

  stream.pipeline(pt, ...t, (error) => {
    done = true;
    doneError = error;
    flush && flush(doneError);
  });

  tx._flush = (cb) => {
    pt.push(null);
    flush = cb;
    done && flush(doneError);
  };

  tx._transform = (chunk, enc, cb) => {
    pt.push(chunk, enc);
    cb(null, chunk);
  };

  return tx;
};

const readableStream = new stream.PassThrough({ objectMode: true });
const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    readableStream,
    fork(
      tap(() => console.log('fork 1: do something with obj for 2s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 2000))),
      tap(() => console.log('fork 1 done!')),
    ),
    fork(
      tap(() => console.log('fork 2: do something with obj for 4s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 4000))),
      tap(() => console.log('fork 2 done!')),
    ),
    fork(
      tap(() => console.log('fork 3: do something with obj for 6s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 6000))),
      tap(() => console.log('fork 3 done!')),
    ),
    // new stream.PassThrough({ objectMode: true }),
    // ^___ adding an extra stream in the pipeline seems to fix the problem
  );
  console.log('done!');
}

run().catch(console.error);

readableStream.push({ name: 'test' });
readableStream.push(null);

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

Console output should look like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
fork 3 done!
done!

What do you see instead?

Console output actually looks like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
done!
fork 3 done!

Additional information

As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.

await pipeline(
    ...
    new stream.PassThrough({ objectMode: true }),
);

console.log('done!);
stream

Most helpful comment

I think I found a solution.

All 8 comments

Hi, Aravindan. Thanks for your feedback, I am trying to locate this problem, and I reproduced this problem in v14.5.0.

@rickyes Just checked, I too get this in both 12.8.0 and 14.5.0. Let me know if you need anything else from me.

are you able to simplify the repro case?

@ronag here you go:

const stream = require('stream');

const makeStream = (i) =>
  new stream.Transform({
    transform: (chunk, enc, cb) => cb(null, chunk),
    flush: (cb) =>
      setTimeout(() => {
        console.log('done flushing', i);
        cb(null);
      }),
  });

const input = new stream.Readable();

stream.pipeline(
  input,
  makeStream(1),
  makeStream(2),
  makeStream(3),
  () => console.log('done!'),
);

input.push('test');
input.push(null);

Prints:

done flushing 1
done flushing 2
done!
done flushing 3

I think I'm getting close to the problem and check again tonight.

Simplest repro:

const stream = require('stream');

const makeStream = (i) =>
  new stream.Transform({
    transform: (chunk, enc, cb) => cb(null, chunk),
    flush: (cb) =>
      setTimeout(() => {
        console.log('done flushing', i);
        cb(null);
      }),
  });

const input = new stream.Readable();
input.push(null);

stream.pipeline(
  input,
  makeStream(1),
  () => console.log('done!'),
);

This is actually an unfortunate case of Transform using 'prefinish' instead of _final to invoke _flush.

I don't think this is fixable without breaking something else. See notes here.

We would have to change Transform to properly use _final to fix this.

@mcollina WDYT?

I think I found a solution.

Was this page helpful?
0 / 5 - 0 ratings