I want to write the string "Hello!" to process.stdout
every second for 10 seconds, then stop (actually, I want to stream some real data to a web server, but I'm making a contrived example here). Here's the code I came up with:
var Stream = require('stream');
var stream = new Stream.Readable();
var n = 0;
setInterval(function() {
if (n++ < 10) {
stream.push('Hello!');
} else if (n++ === 10) {
stream.push(null);
}
}, 1000);
stream.pipe(process.stdout);
But when I run that with node 5.5, I get:
events.js:154
throw er; // Unhandled 'error' event
^
Error: not implemented
at Readable._read (_stream_readable.js:450:22)
at Readable.read (_stream_readable.js:329:10)
at resume_ (_stream_readable.js:719:12)
at nextTickCallbackWith2Args (node.js:474:9)
at process._tickCallback (node.js:388:17)
What happened? I thought it might be illegal to call push
after pipe
, but this code:
var stream = new Stream.Readable();
stream.pipe(process.stdout);
for (n = 0; n < 10; n++) {
stream.push('Hello!');
}
stream.push(null);
logs the right string. Is it intended that I get that "not implemented" error when I introduce the interval? Thanks!
Readable streams have to implement a _read()
function, just like Writable streams have to implement _write()
, etc. See the documentation here for more information on the function.
The reason that it "works" with the second code snippet is that the (initial) push('Hello!')
sets up a function to be fired on the next tick. This function tries to "read" from the stream (to properly emit the 'readable' event but not actually consume any data), but only if the stream hasn't ended. Since push(null)
is called within the same tick, this sets the stream as having ended, so by the time that function is called on the next tick, no read ever happens. That means the default _read()
implementation (which just throws the error you see) never gets called.
Sweet, got it working. Thanks!
@davidvgalbraith can you please provide workable example?
Yeah! Just use stream.PassThrough()
instead of stream.Readable()
in my original example.
Most helpful comment
Yeah! Just use
stream.PassThrough()
instead ofstream.Readable()
in my original example.