Node: Create a flag to know when stream has reached it's last chunk from within _transform.

Created on 24 Oct 2018  路  14Comments  路  Source: nodejs/node

  • Version: v9.3.0
  • Platform: Darwin Kernel Version 18.0.0: Wed Aug 22 20:13:40 PDT 2018; root:xnu-4903.201.2~1/RELEASE_X86_64 x86_64
  • Subsystem: Stream.Transform

I'm using a transform stream in the following way.

        var transformStream = new Stream.Transform({ writableObjectMode: true, readableObjectMode: true})
        transformStream._transform = function (chunk, encoding, done) {

            recordCount++

            if (transform) chunk = transform(chunk)

            let jsonChunk = JSON.stringify([chunk])

            switch (true) {
                case this._writableState.ended:
                    jsonChunk = jsonChunk.slice(1, jsonChunk.length); break
                case recordCount === 1: 
                    jsonChunk = jsonChunk.slice(0, jsonChunk.length - 1) + ','; break
                default:
                    jsonChunk = jsonChunk.slice(1, jsonChunk.length - 1) + ','; break
            }
            this.push(jsonChunk)
            done();
        };

I've tried this with 1 record to see if there are any properties that change when the stream has reached it's last record. I can't find any that change. I'm using this stream to stringify json and then pipe it into gzip so that I can persist the compressed responses. I really need help to know when the stream has reached it's last record. Any help would be greatly appreciated.

feature request stream

Most helpful comment

I have to do it in object mode because I'm pulling it from athena and I don't think the client I'm using supports another format. Then I'm piping it into gzip, converting it to base64 and then persisting it.

If your input is objects, that's fine. What I was referring to was the output, which you can separately set to non-object mode.

How can I access the last record in _final after this.push has been called?

You don't. The stream._final handler is just where you'd write the final ']' character.

All 14 comments

@syntacticsolutions I鈥檝e edited your issue for syntax highlighting, hope that鈥檚 okay with you.

I鈥檇 like to see something like this in our streams API, especially since our own zlib transforms do some hacking to figure out whether it鈥檚 the last chunk of input or not.

/cc @nodejs/streams

Could you show me how to hack this so that it can work for me?

I can, but I don鈥檛 really want to, because it鈥檚 really hacky :smile: I think it鈥檚 essentially this:

https://github.com/nodejs/node/blob/449c0ca9f1bf247089781d67a109ebc7e6c1d5a5/lib/zlib.js#L448-L449

I stepped through the code to see if that would work and sadly it didn't :( (maybe because I'm using Transform Stream in object mode?) I'll let you know if I find something else that works.

ws.ended and ws.ending never get changed for some reason. Even if I pipe it to another transform stream that is not in object mode afterwards

I'm not sure why object mode is needed for this if you're piping it to a gzip stream (which deals with bytes and not objects). However, I don't think such a request is really possible due to the nature of things. Being a stream, the only time you'd know if it was the last chunk was if someone called stream.end(chunk). If someone does a stream.write(chunk) and then some time later calls stream.end(), you will have already push()ed the previous chunk with the assumption it wasn't the last chunk.

Instead, you will need to rethink the solution for the problem you're trying to solve. In this particular case, one possible solution could be to simply prepend a comma depending on recordCount and then push the final ']' in a stream._final = ... handler.

I have to do it in object mode because I'm pulling it from athena and I don't think the client I'm using supports another format. Then I'm piping it into gzip, converting it to base64 and then persisting it.

How can I access the last record in _final after this.push has been called?

I have to do it in object mode because I'm pulling it from athena and I don't think the client I'm using supports another format. Then I'm piping it into gzip, converting it to base64 and then persisting it.

If your input is objects, that's fine. What I was referring to was the output, which you can separately set to non-object mode.

How can I access the last record in _final after this.push has been called?

You don't. The stream._final handler is just where you'd write the final ']' character.

Thank you so much!!!!!!

ok this is what I have so far, but for some reason it breaks the gzip.
```javascript
const athena = require('./athena')
const redis = require('./redis')
const zlib = require('zlib')
const Stream = require('stream')

exports.persistStream = (config, query, name, transform) => {
return new Promise((resolve, reject) => {
let recordCount = 0

    var transformStream = new Stream.Transform({ writableObjectMode: true, readableObjectMode: true})
    transformStream._transform = function (chunk, encoding, done) {

        recordCount++

        if (transform) chunk = transform(chunk)

        let jsonChunk = JSON.stringify([chunk])

        switch (true) {
            case recordCount === 1: 
                jsonChunk = jsonChunk.slice(0, jsonChunk.length - 1); break
            default:
                jsonChunk = ',' + jsonChunk.slice(1, jsonChunk.length - 1); break
        }
        this.push(jsonChunk)
        done();
    };

    transformStream._final = function (done) {
        this.push(']')
        this.end()
    }

    const gzip = zlib.createGzip()

    let buffers = []

    var stream = athena.execute(query)
        .toStream()
        .pipe(transformStream)
        .pipe(gzip)

    gzip.on('data', (chunk) => {
        buffers.push(chunk)
    })

    gzip.on('end', function () {
        let buffer = Buffer.concat(buffers)
        redis.set(name, buffer.toString('base64'), (err, response) => {
            zlib.gzip(config, (err, buff) => {
                redis.set(name + ' Config', buff.toString('base64'), (err, response) => {
                    if (err) {
                        console.log(err)
                        reject()
                    } else {

                        console.log(name + ' succeeded')
                        resolve()
                    }
                })
            })
        })
    })

    stream.on('error', (err) => {
        console.log(err)
        reject()
    })
})

}

oh disregard that. I just forgot to call done() from within stream._final

I鈥檇 like to see something like this in our streams API, especially since our own zlib transforms do some hacking to figure out whether it鈥檚 the last chunk of input or not.

So based on https://github.com/nodejs/node/issues/23859#issuecomment-432808584 there is no way to implement that?

That鈥檚 not how stream processing works. You cannot be certain that you are processing the last chunk, because the stream could be ended after that chunk is processed. You have to use _flush or _final.

As explained you can use _flush (please don't use _final on a Transform). Also to check the state you can use writableFinished and readableEnded to determine state.

I'm closing this as I believe this has been resolved. Please re-open if there is anything that needs to be further discussed.

Was this page helpful?
0 / 5 - 0 ratings