Version: 14.4.0
Platform: 64-bit (Windows)
Subsystem: stream
pipeline throws Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close when:
1) There is a destination function and transformer function throws from for await block .
// Expecting: Error: transformer
// Got: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
Stream.pipeline(
Stream.Readable.from(['a', 'b', 'c']),
async function* (readable) {
for await (const chunk of readable) {
// If this line is moved before or after the `for await` a correct error is thrown
throw new Error('transformer');
}
},
// If destination function is removed a correct error is thrown
async function (readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
},
(error, val) => error ? console.error(error) : console.log(val)
)
2) The destination function throws from for await block
// Expecting: Error: destination
// Got: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
Stream.pipeline(
Stream.Readable.from(['a', 'b', 'c']),
async function (readable) {
let result = '';
for await (const chunk of readable) {
// If this line is moved before or after the `for await` a correct error is thrown
throw new Error('destination');
result += chunk;
}
return result;
},
(error, val) => error ? console.error(error) : console.log(val)
)
3) The transformer or destination returns from for await block
// Expecting: Pipeline resolved with the value returned from destination and unfinished streams being silently destroyed
// Got: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
Stream.pipeline(
Stream.Readable.from(['a', 'b', 'c']),
async function (readable) {
for await (const chunk of readable) {
// If this line is moved BEFORE `for await` - callback is NOT called AT ALL (node simply exits if it has nothing else to do)
return 'foo';
}
},
(error, val) => error ? console.error(error) : console.log(val)
)
@urugator first of all hey 馃憢 fancy seeing you here.
Second, cc @mcollina @ronag
Third - I'm not sure what the right behavior here is, I think this is because return or throw in a generator calls return on it.
You are iterating a Readable, when you .throw() or return which finishes the steam that finishes the stream.
pipeline also closes the stream (so that you don't have streams "hanging", since you are explicitly closing the readable and it's also implicitly closed by pipeline you are getting ERR_STREAM_PREMATURE_CLOSE
Hmm, weirdly if there is a second readable between the two and it's .returning on an async iterator and not a readable it passes.
This passes
const Stream = require ('stream');
const stream = Stream.pipeline(
Stream.Readable.from(['a', 'b', 'c']),
// If destination function is removed a correct error is thrown
fakeIterator,
async function (readable) {
for await (const chunk of readable) {
// If this line is moved BEFORE `for await` - callback is NOT called AT ALL (node simply exits if it has nothing else to do)
return 'foo';
}
},
(error, val) => error ? console.error("errored", error) : console.log("got", val)
);
stream.on('data', (chunk) => console.log('got chunk', chunk));
function fakeIterator(stream) {
console.log('fakeReadable called');
let value = 0;
const asyncIterator = ({
next() {
value++;
console.log('next called', value);
return { done: value > 10, value }
},
return() {
console.log('return called');
return { done: true };
},
throw() {
console.log('throw called');
return { done: true };
}
});
return { [Symbol.asyncIterator]() { return asyncIterator } }
}
When iterating manually it also seems to propagate the correct Error: transformer:
Stream.pipeline(
Stream.Readable.from(['a', 'b', 'c']),
async function* (readable) {
const iter = readable[Symbol.asyncIterator]()
let chunk = await iter.next();
throw new Error('transformer');
},
async function (readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
},
(error, val) => error ? console.error(error) : console.log(val)
)
I was hoping I could use pipeline to avoid the complexity of streams handling, but the current behavior seems hardly useful and quite confusing/inconsistent.
For illustration, here is the original use case:
function limitStreamLength(maxLength = 0) {
return async function* limitStreamLength(readable) {
const length = 0;
for await (const chunk of readable) {
length += chunk.length;
if (length > maxLength) {
// Throw something the user can catch and react to
throw new Error('Max length reached');
// Alternatively don't throw, just truncate the result
return '';
}
yield chunk;
}
}
}
async function streamToString(readable) {
readable.setEncoding('utf8');
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
}
const result = await pipeline(
Stream.Readable.from(['a', 'b', 'c']),
limitStreamLength(1),
streamToString,
)
First of all, there are a lot of layers we are trying to simplify things and this is a new feature. Also, software has bugs.
but the current behavior seems hardly useful and quite confusing/inconsistent
Secondly, I would recommend you to rethink this line of reasoning, mainly because it implies a judgement on your part on why we added this in the first place.
Last but least, I don鈥檛 fully understand how this would be changed. Again this is a complex subject with 3 different layers intersecting and somehow it might get timing dependent.
I would consider most of your problems as bugs until we assess how we could improve the situation.
If you think you cane make this feature more useful, please send a PR.
@mcollina I mean, the current behavior in the OP just looks like a use case that wasn't considered with the feature.
it implies a judgement on your part on why we added this in the first place.
Which is exactly why I am mentioning it, because I am hoping that someone tells me whether my judgment or expectations are wrong. It's completely possible the pipeline isn't intended to be used like this, but I am unable to tell from available sources (docs/issues/google).
I am sorry if it sounds offensive or something, I very much respect all the work that is being put into this.
I mean, the current behavior in the OP just looks like a use case that wasn't considered with the feature.
I think the 3rd case is a proper bug, and the other two are some error cases we should handle somehow.
Most helpful comment
Which is exactly why I am mentioning it, because I am hoping that someone tells me whether my judgment or expectations are wrong. It's completely possible the
pipelineisn't intended to be used like this, but I am unable to tell from available sources (docs/issues/google).I am sorry if it sounds offensive or something, I very much respect all the work that is being put into this.