Node: readable.push always returning false after reaching highWaterMark for the first time

Created on 5 Apr 2019  路  32Comments  路  Source: nodejs/node

According to the documentation and multiple posts regarding Readable streams, .push will return false once highWaterMark is reached, at that moment _read should be stopped.

The issue I'm facing, and I'm doubting whether I misunderstood something or there's an actual bug, probably the former, is that after 16384 bytes are pushed, and .push returns false for the first time, all remaining calls to .push are returning false (9983616 in total).

const { Readable } = require('stream');

const writeStream = fs.createWriteStream('./bigFile.txt');

let readCalls = 0;
class CustomReader extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1e7;
    this._index = 0;
  }

  _read(size) {
    readCalls++;

    while (this._index < this._max) {
      this._index++

      if (this._index >= this._max)
        return this.push(null);

      if (!this.push('a'))
        return;
    }
  }
}

console.time('read');
new CustomReader().pipe(writeStream)
   .on('finish', () => {
      console.log(readCalls);
      console.timeEnd('read');
   });

In that example, I expect: readCalls to be ~610 (1e7 / 16384), but instead is 9983616.

Also I expect it to run in almost the same time than the following script using .write

(async() => {
  console.time('write');

  const file = fs.createWriteStream('./test.dat');
  for (let i = 0; i < 1e7; i++) {
    if (!file.write('a')) {
      await new Promise(resolve => file.once('drain', resolve));
    }
  }

  console.timeEnd('write');
})();

But it's taking twice the time, which is logical since _read is being called a lot more times than it should.

If I completely ignore the return value from .push and only depend on size argument, the Stream work as expected.

class CustomReader extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1e7;
    this._index = 0;
  }

  _read(size) {
    readCounter++;

    const stop = Math.min(this._index + size, this._max);
    while (this._index < stop) {

      this._index++
        if (this._index >= this._max)
          return this.push(null);

      if (!this.push('a')) {
        // ignore
        // return;
      }
    }
  }
}

// readCounter = 611

So the question is, after .push returns false, besides from stopping the _read function, should I wait for a specific event or do an extra check inside _read? If not, is this the expected behaviour?

  • Version: 10.15.3 / 8.15.0
  • Platform: Ubuntu 18:04
  • Subsystem:
stream

Most helpful comment

I think it would be good to add that lowWatermark.

All 32 comments

I think the issue might be that you expect the readable side鈥檚 buffer to drain fully before another _read() call, when in reality, it is called as soon as the buffer has shrunk below the high water mark? So, once 16384 bytes have been pushed, and then one byte is taken from the buffer (ie 16383 bytes are in the buffer), _read() is called, and not when there are 0 bytes in the buffer.

Does that make sense?

I get what you're saying, but if that's the case, wouldn't size be 1, instead of 16384 for those calls?, and I don't think that should be the intended behaviour, at least performance wise, since it would call _read() ~16384 more times than needed.

So the actual question, is the buffer supposed to be drained before _read() is called? In my opinion it should, but I'm not certain if I'm missing something.

@marcosc90 I鈥檓 not sure. I think other people in @nodejs/streams might be answer that question better, but generally: I think it鈥檚 true that streams are just not optimized for handling a large number of small chunks, vs. a relatively small number of large chunks.

Maybe something that we could do to improve this situation is to switch .pipe() to streams 3, i.e. use .on('readable') and .read() in its internals rather than .on('data'). I think this would help for your use case, but it also seems like a change where the consequences are hard to oversee.

I think you are hitting an edge case that happens when writing a one byte at a time in a tight loop.
Essentially, every time you do a this.push() which is below highWaterMark, we try scheduling a new _read() call to happen via process.nextTick() if one was not scheduled. Thanks to the tight loop, the highWaterMark is now full. However there is also the 'data' handler installed by .pipe(), which is going to clean up that buffer: I don't know if this is happening before or after the new _read() gets executed.

We _might_ have a <= that should be a < somewhere in the code. A PR would be welcomed if that was the case.

Just to confirm, the code is correct and there's a bug. In that example, this.push() should return false only ~610/611 times, and _read should be called that amount of time instead of 9983616.

If that's the case, I'm more than happy to track it down and submit a PR.

Later I will also try if this happens when writing more bytes at a time.

@mcollina I can confirm, that it doesn't matter whether I'm writing a single byte or multiple bytes, but if _read is syncrhonous, this.push returns false for all calls after highWaterMark is reached for the first time.

Take this example where size / 2 bytes are being pushed.

let text = 'lorem ipsum dolor '.repeat(1e6);
class BaconReadable extends Readable {
  constructor(options) {
    super(options);
    this.readIndex = 0;
  }
  _read(size) {
    readCounter++;
    let okToSend = true;
    while (okToSend) {
      okToSend = this.push(text.substr(this.readIndex, size / 2));
      // first this.push return true, the rest false
      this.readIndex += size;

      if (this.readIndex > text.length) {
        this.push(null);
        okToSend = false;
      }
    }
  }
}

~In my example, if I made _read asynchronous, everything works perfectly.~

I found the bug inside Readable.prototype.read, and have a possible fix, but I want to run some more tests. I have some doubts regarding a few things in the .read function, so if someone is willing to discuss the fix/pair program, I will be able to submit a PR soon.

_Example taken from: https://www.codemag.com/article/1803071/Getting-Started-with-Node-Streams_

Sure thing, I鈥檓 available for mentoring! We can keep discussing here, or you can ping me on IRC or Twitter.

Thank you! I'm pinging you on twitter right now

Here's an update after I've done some more tests.

When attaching .on('data') without piping to a file, everything runs synchronous, the whole Readable is blocking the event loop.

setTimeout(() => console.log('blocked'), 0)

console.time('read');

new CustomReader()
  .on('data', () => {})
  .on('end', () => {
    console.log('_read calls: ', readCalls);
    console.timeEnd('read');
  })

Output:

_read calls:  9983617
read: 6103.548ms
blocked

If I wrap _read body in process.nextTick it still blocks, but it calls _read only once. Meaning that this.push always return true.

Output

_read calls:  1
read: 2777.435ms
blocked

I'm still going through the code in _stream_readable trying to fix it, but I fear that the fix will be breaking. Right now 16 tests fail if I make the snippet work as intended. I will update when I gather more information.

When attaching .on('data') without piping to a file, everything runs synchronous, the whole Readable is blocking the event loop.

Your snippet is not be able to differentiate if it's async or not. setTimeout(fn, 0) runs after all nextTick. It's blocking the event loop by design. That's ok, if all data can be consumed before going to the event loop, it should (because of the cost of GC).

@mcollina I understand how setTimeout works, the async example wasn't a good one though, but the point I was trying to make is that _read calls should be done in different ticks of the event loop, but if you say that's the intended behaviour, then I guess a documentation change is needed instead, since stopping after this.push returns false won't change anything, nor stopping after size bytes are pushed.

Here's the example using process.nextTick

// Sync _read
console.log('Start:', new Date());
new CustomReader()
  .on('data', () => {
    if (!tick) {
      tick = true;
      process.nextTick(() => console.log('blocked', new Date()))
    }
  });

Output:

Start: 2019-04-08T12:00:25.908Z
blocked 2019-04-08T12:00:32.049Z // gets executed once the streams ends.
// ~7 seconds blocked

On the async snippet what I was trying to demonstrate, is that when using .push in a different tick, it always returns true, blocking the event loop until this.push(null) is issued.

To sum up, the bug is that this.push does not work as documented, so it can't be used as .write return value to stop _read.

synchronous read: After hwm is reached, always returns false.
asynchronous read: Always returns true, even after pushing a single chunk greater than hwm.

this.push(Buffer.alloc(this.readableHighWaterMark + 1)); // true

Are you working on a PR?

I'm working on one, but it's not an easy fix, at least from what I've seen and tested. I'll give an update in the next couple of days.

The thing is, if I fix it, to work as intended, it will probably break or change the behaviour of already existing code depending on _read.

And I've been thinking that the best way to make _read work correctly and avoid breaking applications depending on _read erratic behaviour, would be to change how _read works, and keep the old way with a deprecation warning.

My proposal for this is:

1) Add a second argument callback to _read, similar to how _transform works.
2) emit 'data' only when _read callback has been called, instead of emitting data on every this.push
3) Make .push work similar to writeOrBuffer function of _stream_writable in order to fix the incorrect return value.
4) _read will be called again after the buffer has been cleared, on the next tick avoiding the event loop block issue.

We can detect the signature using this._read.length

if(this._read.length === 2)
   // new implementation
else {
  // deprecation warning
  // old implementation
}

With the callback argument, we can make sure that both synchronous/asynchronous implementations of _read work the same and avoid the consistency issue that is happening right now. Also there would be a performance benefit of using this way, since we only emit data, once highWaterMark has been reached (assuming the user implements the _read method correctly).

In my snippet, data would be emitted 610 times, each time with highWaterMark bytes, instead of 1e7 times with a single byte like is happening right now.

_read(size, callback) {

    const stop = Math.min(this._index + size, this._max);

    while (this._index < this._max) {
        this._index++

        if (!this.push('a'))
            return callback(null);
    }

    if (this._index >= this._max)
        this.push(null); // or return callback(null, null) directly

    return callback(null);
}

I don't know how proposals of API changes are handled, but I thought that it was a good time to throw it out there.

If you think it would be a great idea, let me know, and I will start working on that instead of trying to fix something in a breaking way.

The upside besides performance, would be that we can backport it.

Changing _read() API is a non-starter for me. We should look into fixing this without changing the API.

I am looking into it. But here's another case where adding a callback will result in a more powerful API for _read, since you have a better control over it, and it's simpler to use, and more forgiving for the developer.

Since this.push triggers a _read, the following code, a quite simple case that in my opinion should be supported does not work, you will get: stream.push() after EOF. Unless the two .push happen in the same tick.

Using multiple this.push with the current API is tricky, the documentation should probably reflect that too. (I'm willing to provide examples or help with it)

The following triggers: stream.push() after EOF, and even if it didn't, the chunks would be in different order. (A1, A2, A3...B1, B2, B3..., or similar depending on the execution time of fetchA & fetchB)

 async _read(size) {
    const data = await fetchA(this._index++);
    this.push(`A: ${data.id}`);

    const response = await fetchB(data.id);
    this.push(`B: ${response}\n`);

    if(this._index >= this._max)
      return this.push(null);
  }

You have to write it this way:

 async _read(size) {
    const data = await fetchA(this._index++);    
    const response = await fetchB(data.id);

    this.push(`A: ${data.id}`); // or a single push concatenating both
    this.push(`B: ${response}\n`);

    if(this._index >= this._max)
      return this.push(null);
}

Using a callback approach, it doesn't matter where you put .push or if they happen in different ticks. Since you control when _read is called again.

async _read(size, callback) {

    const data = await fetchA(this._index++);
    this.push(`A: ${data.id}`);

    const response = await fetchB(data.id);
    this.push(`B: ${response}\n`);

    if(this._index >= this._max) 
       this.push(null);

    // We can even make it in a way that the callback is not needed for promises
    // and we wait for it to resolve before calling _read
    callback(null);
}

I know it can be argued that in the first case _read is being used incorrectly. But changing the API it or adding a new method may be beneficial since It's well known that streams are something a lot of developers avoid or find it too complex to work with, and reducing the complexity where it can be done, it seems like a good way forward. Maybe other members can voice their opinion regarding this.

Having said that, I'm still looking into fixing the issue without changing the API, but it's not an easy task, maybe someone more experienced with the codebase can take a look too.

Unfortunately, I'm very -1 in changing the API at this point of Node.js history.
Also note that this is likely go cause a memory leak:

async _read(size) {
    const data = await fetchA(this._index++);

As well as this:

async _read(size, callback) {
    const data = await fetchA(this._index++);

Got it. Can you explain why it's likely to cause a memory leak?

There is no one adding a catch handler to the Promise returned by _read() at this time. As a result, you'll get an unhandledRejection and the default does not crash Node.js. No 'end' or 'error' would be emitted, and as a result whoever is waiting for the stream downstream would stay waiting forever (or until a long timeout expires).

I removed the try/catch on purpose for the snippet to make it cleaner and easier to read, error handling wasn't the point I was trying to make. I assume that we are all aware of error handling on promises.

So it's not an issue actually.

I might have a fix for this, 10 tests failed, need to review whether the tests are incorrect or the fix is. I'm reviewing them 1 by 1. So far ~3~ 4 of them were incorrect, so ~7~ 6 to go, I might need some help with a few of the remaining ones.

I hope I can submit a PR next week.

This looks similar to an issue I had with any project installing typescript using
_npm install typescript_
which gave the error:
npm ERR! code ERR_INVALID_OPT_VALUE
npm ERR! The value "false" is invalid for option "readableHighWaterMark"

The issue for me only occurs with larger packages like typescript (>5MB) because they are larger than _const MAX_MEM_SIZE = 5 * 1024 * 1024 // 5MB_ set inside node_modules/node/make-fetch-happen.js

My dirty fix and local was to edit inside
鈥odejs\node_modulesnpm\make-fetch-happen\cache.js - line 12

change
const MAX_MEM_SIZE = 5 * 1024 * 1024 // 5MB to become
const MAX_MEM_SIZE = 25 * 1024 * 1024 // 25MB my temp fix to allow large downloads

Now npm install typescript will work!!!

Clearly the key issue is inside node in how it is handling cache size - either Node should increase the default MAX_MEM_SIZE or for correctly code for a input that is larger than the cache

@madbiker88 I think you should open an issue on make-fetch-happen, as what you describe is likely a different issue than this one.

What's the status on this one?

I just started messing around with Node streams, and quickly ran into this issue. Doesn't it basically mean that we cannot know now when to stop pushing to the stream?

Doesn't it make read streams useless for situations, where the source emitting data faster than we can handle it?

I'll allow myself to ping @mcollina

This needs somebody to work on it, and possibly provide a fix that does not require changing the full API of _read(). I think @marcosc90 never submit a PR after all.

@ronag this might have been fixed by one of your recent changes for v14?

Still a problem.

'use strict';
const common = require('../common');
const assert = require('assert');

const { Readable, Transform } = require('stream');

let readCalls = 0;
class CustomReader extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1e7;
    this._index = 0;
  }

  _read(size) {
    readCalls++;

    while (this._index < this._max) {
      this._index++;

      if (this._index >= this._max) {
        return this.push(null);
      }

      if (!this.push('a')) {
        console.log('pause')
        return;
      }
    }
  }
}

new CustomReader()
  .on('end', common.mustCall())
  .pipe(new Transform({
    transform(chunk, encoding, cb) {
      process.nextTick(cb);
    }
  }))
  .on('finish', common.mustCall(() => {
    assert.strictEqual(readCalls, 610);
  }));

@mcollina Apologies for not posting an update got busy and forgot about this, to be honest.

Anyhow, I didn't submit a PR though because back then I thought it was not possible to fix the issue without changing the API, even in a backwards-compatible way, as I suggested in previous comments.

I'll pick it up again but will probably get to the same point. I can test it during the weekend and I'll report back.

Sorry, could someone summarise what the problem here is? The original post seems to be like intendent behavior. Readable is greedy and since we don't have a lowWaterMark it is working as it should.

Given the discussion in https://github.com/nodejs/node/pull/33340. I believe this whole issue goes back to whether or not we should have a lowWatermark.

The problem with doing it that way is that once the buffer is cleared then the whole stream chain is doing nothing while waiting for an element to arrive, thus introducing unnecessary latency.

I think what you are looking for is a lowWatermark i.e. making it configurable at which point the buffer starts filling up again.

I think this has been discussed a few times before. I've suggested a lowWatermark myself at some point. I believe it has existed but was removed for reasons I no longer remember.

If you want take up the discussion again I'd suggest you try searching old issues for lowWatermark.

As far as I can tell the current functionality works as intended and the buffering is greedy relative to highWatermark.

I think it would be good to add that lowWatermark.

Without lowWaterMark readable stream exposes unpredictable behavior and force user code to contain overhead that could have otherwise be inside the implementation itself. We need lowWaterMark :)

Would the lowWaterMark then also emit('continue')? Seems that it would make sense.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

sandeepks1 picture sandeepks1  路  3Comments

dfahlander picture dfahlander  路  3Comments

Icemic picture Icemic  路  3Comments

vsemozhetbyt picture vsemozhetbyt  路  3Comments

loretoparisi picture loretoparisi  路  3Comments