Node-rdkafka: Cannot start more than 5 consumers sequentially

Created on 27 Feb 2018  路  7Comments  路  Source: Blizzard/node-rdkafka

Given the following code:

var Kafka = require('node-rdkafka');
var _ = require('underscore');
var P = require('bluebird');

P.each(_.range(10), idx => {
  console.log(`Connecting consumer ${idx}`);

  var consumer = new Kafka.KafkaConsumer({
    'metadata.broker.list': 'localhost:9092',
    'group.id': `test-group-${idx}`
  }, {});

  return new P((resolve, reject) => {
    consumer
      .on('ready', () => {
        console.log(`Consumer ${idx} ready`);

        consumer.subscribe(['TEST']);
        consumer.consume();

        resolve(consumer);
      })
      .on('error', err => {
        console.error('Consumer error: ' + err);
      });

    consumer.connect();
  });
});

I get the following output:

Connecting consumer 0
Consumer 0 ready
Connecting consumer 1
Consumer 1 ready
Connecting consumer 2
Consumer 2 ready
Connecting consumer 3
Consumer 3 ready
Connecting consumer 4

(then it hangs).

If the connection is done in parallel (remove return statement before new P), it works fine:

Connecting consumer 0
Connecting consumer 1
Connecting consumer 2
Connecting consumer 3
Connecting consumer 4
Connecting consumer 5
Connecting consumer 6
Connecting consumer 7
Connecting consumer 8
Connecting consumer 9
Consumer 3 ready
Consumer 2 ready
Consumer 0 ready
Consumer 1 ready
Consumer 4 ready
Consumer 5 ready
Consumer 6 ready
Consumer 7 ready
Consumer 8 ready
Consumer 9 ready

If it's not a bug, then what am I doing wrong?

Kafka version 1.0.0.

Most helpful comment

I think I've run into the same problem, and tried the same change of the variable, and had the same "doesn't change things".

From the debugging however I could see that we clearly fetched all the messages, they just took ages to arrive at my consumer. Profiles showed that the time was "somewhere" in "(idle)"/"syscalls". So the theory above really made sense, and I checked: Turns out the variable name @webmakersteve suggested was _almost_ right :)

http://docs.libuv.org/en/latest/threadpool.html:

Its default size is 4, but it can be changed at startup time by setting the UV_THREADPOOL_SIZE environment variable to any value (the absolute maximum is 128).

Setting UV_THREADPOOL_SIZE to 8 vastly improved the performance for me.

All 7 comments

The reason this is happening is because consumers using the consume loop, i.e. using .consume() with no parameters, need to hold onto a thread in the libuv event loop. If you want to do this you need to increase the libuv threadpool size by setting process.env.UV_THREADPOOL to a number greater than 4.

I'm a bit lost.

Firstly, UV_THREADPOOL variable set to 8 does not make any difference, the result is the same. Secondly, if consumers hold libuv threads, then why is there a difference between parallel and sequential execution? Both scenarios should block 4 threads without moving on.

I looked through the code and see that consumers use blocking read with timeout of 1 second. How can libuv thread block forever this way?

I think I've run into the same problem, and tried the same change of the variable, and had the same "doesn't change things".

From the debugging however I could see that we clearly fetched all the messages, they just took ages to arrive at my consumer. Profiles showed that the time was "somewhere" in "(idle)"/"syscalls". So the theory above really made sense, and I checked: Turns out the variable name @webmakersteve suggested was _almost_ right :)

http://docs.libuv.org/en/latest/threadpool.html:

Its default size is 4, but it can be changed at startup time by setting the UV_THREADPOOL_SIZE environment variable to any value (the absolute maximum is 128).

Setting UV_THREADPOOL_SIZE to 8 vastly improved the performance for me.

Woops. That's what I get for sending it from my memory :(

This method takes a thread from the threadpool and consumes as fast as it can:

consumer.consume();

This means if you consume in this manner you are going to be capped by the number of libuv threads. My suggestion is to always have number of consumers + 4 as a minimum if you want to consume in that fashion.

Alternatively, you can use the consumer stream, which does not keep a thread open the entire time in the uv threadpool and instead just checks every time you call consume (which the stream schedules internally).

But I am going to close this issue out as I think the workaround was provided to you ;)

Alternatively, you can use the consumer stream, which does not keep a thread open the entire time in the uv threadpool and instead just checks every time you call consume (which the stream schedules internally).

FWIW: We were actually using the consumer stream (rdkafka.KafkaConsumer.createReadStream) in object mode, so while things didn't get "stuck" we saw something close to starvation -- rdkafka seemed horribly busy according to its own logging, but our stream callbacks only got called every other second. At that point we probably had 8+ independent consumers.

Yeah... unfortunately if those consumers are too busy things like that can happen. You can increase the performance in parallel if you reduce the consume timeout to something like 1 MS.

The reason this is happening is because consumers using the consume loop, i.e. using .consume() with no parameters, need to hold onto a thread in the libuv event loop. If you want to do this you need to increase the libuv threadpool size by setting process.env.UV_THREADPOOL to a number greater than 4.

Thanks, Was stuck in this for few hours.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

maxplanck76er picture maxplanck76er  路  3Comments

idangozlan picture idangozlan  路  3Comments

Avielyo10 picture Avielyo10  路  5Comments

8lueberry picture 8lueberry  路  5Comments

ighack picture ighack  路  5Comments