Node: stream.pipeline destroys writable stream when error is occurred

Created on 26 Feb 2019  路  13Comments  路  Source: nodejs/node

  • Version:
    v11.10.0

  • Platform:
    Mac OS Darwin 16.7.0

  • Subsystem:
    stream, http

stream.pipeline is helpful to handle error and interoperable to Promise.
However, I found a behavior that is not suitable my usecase.

I am creating a web server with stream.pipeline.
If my readable stream emits an error like "file not found", I would like to send error response to my clients.

code example is as follows.

const fs = require('fs')
const http = require('http')
const { pipeline } = require('stream')

const server = http.createServer((req, res) => {
  const r = fs.createReadStream('./foo2.txt')
  pipeline(r, res, (err) => {
    if (err) {
      console.log(err) // No such file
      return res.end("error!!!"); // write error response but this message is not shown.
    }
  })
})

server.listen(9000)

I have investigated nodejs core, stream.pipeline destroys writable stream when error is occurred.
https://github.com/nodejs/node/blob/master/lib/internal/streams/pipeline.js#L42

so the above code cannot send error response.

Question

Is this an expected behaviour?
In my understandings, writable stream should be closed anyway, but in this case, we would like to close writable stream manually.

In this situation, we need to create custom writable stream?

I could send a PR to pass an option like { destroyOnError: false } to avoid destory automatically.

feature request stream

Most helpful comment

I'm willing to discuss adding new features/options to pipeline as long as the default behavior stays consistent. Send a PR!

All 13 comments

The whole reason of pipeline is to call destroy(). If you don鈥檛 want this behavior you can use plain old source.pipe().

hm. I would like to use Promise and Stream seamlessly, stream.pipeline is seems to be helpful for my situation in API doc.

const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

Anyway, I understood stream.pipeline should call destroy() automatically.

destroy function does not catch error object

I have tried to create custom Writable Stream to destroy manually.

const fs = require('fs')
const http = require('http')
const { pipeline, Writable } = require('stream')

class ResponseWriter extends Writable {
  constructor(options) {
    super(options);
    this.res = options.res;
  }

  _write(chunk, encoding, callback) {
    this.res.write(chunk);
    this.data += chunk;
    callback();
  }

  _final(callback) {
    this.data += this.res.end();
    callback();
  }

  _destroy(err, callback) {
    console.log("destroy", err); // does not show error.
    this.res.end("ERROR!! " + err);
    this.res.destroy(err);
    callback(err);
  }
}

const server = http.createServer((req, res) => {
  const r = fs.createReadStream('./foo2.txt')
  const w = new ResponseWriter({
    res,
  });
  pipeline(r, w, (err) => {
    console.log(err) // No such file
  });
})

server.listen(9000)

But this solution does not work well. cause _destroy cannot catch the err object.

in node.js core, stream.pipeline does not pass err object to the destroy function.

if (typeof stream.destroy === 'function') return stream.destroy(); // destroy(err) ??

Is this expected behavior? If not, I will send PR.

This is expected behavior. .destroy(err)聽is supposed to be called with the error that caused the _current_ stream to fail. If you want to always handle the "outer" logic, you should do that in the pipeline callback (or returned promise). Note that you don't want to call stream.destroy(err) because it will cause _all_ streams to emit 'error'聽 events, which can cause nasty side effects because of missing 'error'聽 handlers. I don't think we can support that.

Note that from an http standpoint, adding 聽this.res.end("ERROR!! " + err); after a byte is sent is not correct, as you'll be appending to data that is already flowing.

I think the source of these problems is not pipeline or pipe, but rather that the implementation of HttpResponse is not matching what you'd expect. Essentially if you are piping a stream into a response, you want to handle error conditions on a case-by-cases basis, making pipeline useless for you.

Essentially if you are piping a stream into a response, you want to handle error conditions on a case-by-cases basis, making pipeline useless for you.

I agree with that, but my opinion is different.

pipeline would be better to pass an option (pipeline(...streams, [option], callback)) to handle case-by-case basis like Stream.finished that has an option to handle case-by-case basis (finished(stream[, options], callback)).

https://nodejs.org/api/stream.html#stream_stream_finished_stream_options_callback

I know HttpResponse usecase is a little bit different from other stream usecases (http.request, fs.read/write), but other TCP/UDP network server using stream like ftp would return error response to their clients, this usecase is not so rare case.

I'm willing to discuss adding new features/options to pipeline as long as the default behavior stays consistent. Send a PR!

@yosuke-furukawa
I believe you can solve your issue with the PassThrough stream. Look for the code below:

app.post('/resize/:height/:width', (req, res) => {
  const { height, width } = req.params
    const resizer = sharp().resize({
      height: Number(height),
      width: Number(height)
    }).jpeg()
  const psToReq = new stream.PassThrough() // <-----------
  promisify(stream.pipeline)(
        req,
        resizer,
        psToReq // <-----------
  ).catch((e) => {
        console.log(e)
        res.sendStatus(400)
    })
  psToReq.pipe(res) // <-----------
})

@mcollina what do you think? does it make any sense to add this trick(with PassThrough) into the documentation? This problem appears quite often

I don't understand how the above trick helps.

a changed code sample from @yosuke-furukawa

const fs = require('fs')
const http = require('http')
const stream = require('stream')

const server = http.createServer((req, res) => {
  const r = fs.createReadStream('./foo2.txt')
  const ps = new stream.PassThrough()
  stream.pipeline(
   r,
   ps,
   (err) => {
    if (err) {
      console.log(err) // No such file
      return res.end("error!!!"); // write error response but this message is not shown.
    }
  })
  ps.pipe(res)
})

server.listen(9000)

and we run the server and call:

>curl localhost:9000
error!!!%

the server output:

node index.js
[Error: ENOENT: no such file or directory, open './foo2.txt'] {
  errno: -2,
  code: 'ENOENT',
  syscall: 'open',
  path: './foo2.txt'
}

so, I achieve the expected behavior in a simple way without changing the interface. Do you see my point now?

I still don't understand why adding a passthrough in the middle is going to fix this problem.
I think this is documented behavior to some extent: pipeline destroys all the streams when there is an error. Note that sending res.end() _could_ create issues, because in the generic case we could be in the middle of a transfer - so we should not really send anything.

The correct fix for this issues is to wait that the file is correctly opened, and then use pipeline. If the file cannot be opened correctly, we should send that message to the user. That's what https://www.npmjs.com/package/send does.

I think this is documented behavior to some extent: pipeline destroys all the streams when there is an error.

yep. but in a case of HttpResponse you don't want to destroy it.

Note that sending res.end()

I just changed the code from @yosuke-furukawa

The correct fix for this issues

it's not about this particular case. This case is just simple enough to show the concept. There is a lot of cases when you need to pipe some read stream to a transformation stream and to response(e.g., check my image resizing sample). And you need a way to handle possible ALL errors which can appear.

it was probably the reason why the original pump from @mafintosh was returning the last stream in the end.

I am in favour of adding options to pipeline and will send PR if I can.

We'll be happy to evaluate that change, send it over.

What about:

const r = fs.createReadStream('./foo2.txt')
await finished(pipeline(r, transform, (err) => {})
  .on('error', err => {
    console.log(err) // No such file
    res.end("error!!!"); // write error response but this message is not shown. 
  }) 
  .pipe(res)
)
Was this page helpful?
0 / 5 - 0 ratings

Related issues

mcollina picture mcollina  路  3Comments

akdor1154 picture akdor1154  路  3Comments

vsemozhetbyt picture vsemozhetbyt  路  3Comments

Brekmister picture Brekmister  路  3Comments

fanjunzhi picture fanjunzhi  路  3Comments