Kafka-node: Kafka HighLevelConsumer consumes very slowly

Created on 18 Jan 2017  路  11Comments  路  Source: SOHU-Co/kafka-node

Greetings!
I've encountered an issue, while trying to use kafka-node module on my production servers:
I'm producing 10-15k of records per second, and unfortunately, the most I've been able to get from my consumer is 1-1.5k.
Can you please give me an advice on how I can increase consumption speed, or maybe tune my consumer to get better results?

Looking forward to hearing from you,
Rainbow Pony

All 11 comments

Hello, I'm not familiar performance of the consumers and want to explore in more detail. Do you have any sample code used to collect these benchmarks? Thanks!

@hyperlink , thanks for you reply.
Well, I have a kafka server that has a topic with 3 partitions in it. I have a node.js app, runs in a cluster and conects to that topic. I know, in fact, that amount of data, written into that topic is 10-15k. And, the difference between message's createdAt attribute and timestamp of it being processed is increasing over time.

For my client config, I am using:

this._client = new kafka.Client(zkConnectionString, null,
      {
        sessionTimeout: 10000
      }, {},
      sslOptions || {});

    self._options = {
      host: zkConnectionString,
      groupId: groupId,
      ssl: this._sslOptions
    };

And for consumer:

this._consumer = new kafka.HighLevelConsumer(
      this._client,
      [
        { topic: this._topic }
      ],
      {
        groupId: this._groupId,

      }
    );

For timelag measurements I use the field 'createdAt' at my message and comparing it to
new Date().getTime();

@hyperlink , can you please help, or perhaps point me to someone, who will be able to.
Right now I'm trying to use HighLevelConsumer and ConsumerGroup in order to read relatively small amounts of data from topic (~2.5k messages per second).
However, I am not able to consume them in a full amount: from 2.5k messages I'm able to read approximately a thousand per second.
I use statsd to monitor performance.
My setup is as follows: cluster application with 3 workers (each worker consumes message from topic's partition) running on c4.xlarge instance. Kafka topic with 3 partitions, also running on c4.xlarge server. I am also using SSL encryption on my kafka, so maybe that's also slowing consumption for a fair amount.
To create message stream I use kafka-perf-test tool:

<kafka-dir>/bin/kafka-producer-perf-test.sh --topic myTopic --throughput 12000 --record-size 4000 --num-records 500000 --producer-props bootstrap.servers=localhost:9093 --producer.config config/producer.properties

For my HLC I use following settings:
fetchMaxBytes: 5 * 1024 * 1024, fetchMaxWaitMs: 500,

Also I've tried to use different values for fetchMinBytes, maxTickMessages and maxNumSegments but they had no impact on the result.

For my consumer group configuration was following:
sessionTimeout: 15000, maxTickMessages: 5000, fetchMaxBytes: 1024 * 1024 * 5, fetchMaxWaitMs: 500, heartbeatInverval: 500

For the reference. I have tried to use HLC written in Java, and the resulting amount of consumed messages was significantly larger. For it I used the same settings, as for my HLC.

Could you please tell, perhaps, I'm doing something wrong, or should try different setup for my kafka topic (e.g.more partitions), or my configurations are wrong.
Thank you in advance.

Sorry I haven't been much help! It sounds like you know more about the performance than I do. I also don't know anyone else working on the module besides me.

I was going to suggest tweaking fetchMaxBytes I noticed a larger fetchMaxBytes responds faster at a certain point.

Does your consumer have concurrency control? Currently both types of consumers will process incoming messages and emit synchronously which may hold up the event loop if you're processing your messages after receiving the message event.

We are using the async.queue to help with this there's a general strategy written in the README I believe someone also wrote has example code in the issues.

@hyperlink, thank you for the effort, but async wasn't much help.
Just in case, may I know, which setup of kafka have you been using in your developement (do you use ssl protocol, or plaintext; how many partitions do you usualy have in your topic; do you use standalone kafka, or cluster). Also, would you be so kind to provide me with your preferrable configurations for your consumer (either HLC or ConsumerGroup).
Another thing, that is interesting to me and somewhat related to the question of the topic is: are there any plans in the future to use new kafka consumer api, that relies on connection to bootstrap-server rather than using zookeeper client (Something, like java client's bootstrap.servers property).
As always, thank you in advance.

Hi @RainbowPony locally I'm using the docker setup defined in docker-compose file.

Our deployed environment is running a cluster of three kafka nodes on 0.10.1.1. We're in the process of migrating from HLC to the ConsumerGroup. And I believe our consumers are all using the default configurations.

Regarding the project it has mostly been in maintenance mode with exception of adding ConsumerGroup (which does use the new consumer API I believe). I would like to update client to not use zookeeper at some point but haven't had the time. That said I welcome more involvement from the community! :)

@hyperlink, hello again!
Could you please do me a favour: whenever you have time, would you mind running your test with consumption of 20k messages on ssl-protected kafka-topic and telling me, how long did it take?

And thank you for the warm welcome, I'll give it a spin over a weekend.)

In addition: could you please share your opinion on Kafka batch consumer (Like the one used in Kafka 0.9 Java API )?

Obviously using 脿sync.queue is an awesome way to control concurrency but it will decrease your consume-message-throughput tremendously. I have experienced cases where a smaller buffer value increased the total amount of processed messages.

Additionally I'd suggest using a ConsumerGroup with multiple instances if you need more consume-message-throughput. Also make sure you are using (and producing to) a decent amount of partitions e.g. 30+ if you want to speed up your consumption with multiple ConsumerGroup instances.

I know that this might be too late (since the question was made on 14 March.., but I figured this might be of help to other people reading this.)

Hello @RainbowPony

I also have the same problem, I use kafka-node(4.0.2) ConsumerGroup, but sometime receive message too slowly, below is my code

        const consumerGroupOptions = {
          kafkaHost: host,
          groupId: 'noodoe',
          encoding: 'buffer'
          autoCommit: true,
          protocol: ['roundrobin'],
          fromOffset: 'latest',
          autoCommitIntervalMs: 1000,
          fetchMaxBytes: 10 * 1024 * 1024,
          sessionTimeout: 15000
       };

       this.kafkaConsumerGroup = new kafka.ConsumerGroup(consumerGroupOptions, [
          KAFKA_TOPICS.TRANSACTION_TOPIC
       ]);

had you solved it ?
thank you !

Hello, @zealyen!
Well, kind of.) As a matter of fact, the issue that I had experienced was not connected with the kafka-node lib.
The fix for me was to upgrade java inside of my kafka docker container. After moving from an old version of java 8 to a newer one, the consumption had become much faster.
Could you please share your java version with us?
Thanks.)

Closed this issue as it was not related to kafka-node module. Thanks!

Was this page helpful?
0 / 5 - 0 ratings