// reproduce.js
const util = require('util');
const stream = require('stream');
const call = async (fn, ...args) => fn(...args);
const map = (fn) => {
const tx = new stream.Transform({ objectMode: true });
tx._transform = (chunk, enc, cb) =>
call(fn, chunk).then(
(modified) => cb(null, modified),
(error) => cb(error),
);
return tx;
};
const tap = (fn) => {
const tx = new stream.Transform({ objectMode: true });
tx._transform = (chunk, enc, cb) =>
call(fn, chunk).then(
() => cb(null, chunk),
(error) => cb(error),
);
return tx;
};
const fork = (...t) => {
let done;
let doneError;
let flush;
const tx = new stream.Transform({ objectMode: true });
const pt = new stream.PassThrough({ objectMode: true });
stream.pipeline(pt, ...t, (error) => {
done = true;
doneError = error;
flush && flush(doneError);
});
tx._flush = (cb) => {
pt.push(null);
flush = cb;
done && flush(doneError);
};
tx._transform = (chunk, enc, cb) => {
pt.push(chunk, enc);
cb(null, chunk);
};
return tx;
};
const readableStream = new stream.PassThrough({ objectMode: true });
const pipeline = util.promisify(stream.pipeline);
async function run() {
await pipeline(
readableStream,
fork(
tap(() => console.log('fork 1: do something with obj for 2s')),
map((obj) => new Promise((done) => setTimeout(() => done(obj), 2000))),
tap(() => console.log('fork 1 done!')),
),
fork(
tap(() => console.log('fork 2: do something with obj for 4s')),
map((obj) => new Promise((done) => setTimeout(() => done(obj), 4000))),
tap(() => console.log('fork 2 done!')),
),
fork(
tap(() => console.log('fork 3: do something with obj for 6s')),
map((obj) => new Promise((done) => setTimeout(() => done(obj), 6000))),
tap(() => console.log('fork 3 done!')),
),
// new stream.PassThrough({ objectMode: true }),
// ^___ adding an extra stream in the pipeline seems to fix the problem
);
console.log('done!');
}
run().catch(console.error);
readableStream.push({ name: 'test' });
readableStream.push(null);
always
Console output should look like:
fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
fork 3 done!
done!
Console output actually looks like:
fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
done!
fork 3 done!
As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.
await pipeline(
...
new stream.PassThrough({ objectMode: true }),
);
console.log('done!);
Hi, Aravindan. Thanks for your feedback, I am trying to locate this problem, and I reproduced this problem in v14.5.0.
@rickyes Just checked, I too get this in both 12.8.0 and 14.5.0. Let me know if you need anything else from me.
are you able to simplify the repro case?
@ronag here you go:
const stream = require('stream');
const makeStream = (i) =>
new stream.Transform({
transform: (chunk, enc, cb) => cb(null, chunk),
flush: (cb) =>
setTimeout(() => {
console.log('done flushing', i);
cb(null);
}),
});
const input = new stream.Readable();
stream.pipeline(
input,
makeStream(1),
makeStream(2),
makeStream(3),
() => console.log('done!'),
);
input.push('test');
input.push(null);
Prints:
done flushing 1
done flushing 2
done!
done flushing 3
I think I'm getting close to the problem and check again tonight.
Simplest repro:
const stream = require('stream');
const makeStream = (i) =>
new stream.Transform({
transform: (chunk, enc, cb) => cb(null, chunk),
flush: (cb) =>
setTimeout(() => {
console.log('done flushing', i);
cb(null);
}),
});
const input = new stream.Readable();
input.push(null);
stream.pipeline(
input,
makeStream(1),
() => console.log('done!'),
);
This is actually an unfortunate case of Transform using 'prefinish' instead of _final to invoke _flush.
I don't think this is fixable without breaking something else. See notes here.
We would have to change Transform to properly use _final to fix this.
@mcollina WDYT?
I think I found a solution.
Most helpful comment
I think I found a solution.