Kafka-node: Wrong offset is being committed, messages are missing

Created on 10 Mar 2017  路  20Comments  路  Source: SOHU-Co/kafka-node

Hi there,
we are currently having problems running "kafka-node" in our production system.

Info:

  • Kafka-Broker Version 0.10.1
  • Node.js Version 7.4.0
  • kafka-node Version: 1.5.0
  • using ConsumerGroups
  • using a ConsumerGroup per topic
  • 3 topics with ~ 4,5 million messages each on 30 partitions each
  • Java and Scala consumer implementations have no problems consuming these topics

Implementation:

Problem:

  • the consumer reads messages just fine for a while, then suddenly stops @ about 800k incoming messages, sometimes makes it to ~ 2 million - however it never fully receives all messages (at least they are not emitted .on("message",..) and if we take a look at the consumer afterwards using Kafka Tool, we can see that the consumer-group has committed offsets to the latest state of the partitions - even though we are not committing manually and autCommit has been turned off.
  • (the reason why we see offset commits being made, is because we have turned on offset commit during close "consumer.close(true,..)" - yet the offsets shoud not be at the End of the partitions offset!)
  • if we consume other topics with fewer messages ~ 500k it works just fine

Questions:

  • is there any chance that using multiple ConsumerGroup instances in the same node process at once can break the consuming or offset committing process?
  • why is the final "consumer.close(true,..)" committing offsets for messages that we never recieved?
  • is there a chance (or better reason) for why we might drop messages on ".on("message",..)" ?
  • do we have to make any "batch" ack settings for consumers?
  • is there a chance that pausing & resuming the consumer too often kills the connection or partition offsets?

Configuration:

            ssl: false,
            groupId: "a-group-id",
            sessionTimeout: 30000,
            protocol: ["roundrobin"],
            fromOffset: "earliest",
            migrateHLC: false,
            migrateRolling: false,
            fetchMaxBytes: 1024 * 100,
            fetchMinBytes: 1,
            fetchMaxWaitMs: 100,
            autoCommit: false,
            autoCommitIntervalMs: 5000

We have hit some sort of wall here, thanks for any guesses in advance.

Chris

Most helpful comment

@hyperlink thanks again for your support and attempts to ensure the trust in this lib, we were able to identify the reason for the problems. And I am happy to sum them up, for anyone else who might be stuck in a similar situation in the future.

  1. using 1 or 30 partitions does not matter, having a single instance or a cluster does not matter either, we ran the large-message-test against all kinds of setups.

  2. combining .setOffset with manual consumer.commits() is a bad idea

  3. expecting .setOffset(topic, partition, 0) to work on a running consumer, even paused, can be problematic and is probably a bad behavior for any kind of kafka consumer

  4. when "experiencing missing messages" and "committed high offsets" make sure to understand the topic you are consuming - we have a large amount of topics here, with different owners. Keyed partitions might be very well subject to log compaction in large fashion depending on the data and frequency of publishes the topic goes through.

All 20 comments

@holgeradam

Hi @krystianity a low fetchMaxBytes could cause a consumer to hang.

Hi @hyperlink thanks for the quick reply, we have tried different settings for "fetchMaxBytes" already, ranging from 5 KB to 10 MB. Fun-fact: reducing the size results in faster message consumption.

I'll work on a test to consume large amount of messages.

@hyperlink okay cool, thanks for looking into this. I will try to check our Kafka Logs in a few minutes, we see a few logs: "Connection Close" related to "broken OffsetCommitRequest" but I have to make sure they are coming from my client.

When using autoCommit: true in this scenario the consumer seems to commit out of order and very high early. Like committing half of the partitions nearly completed and half set to zero while the consumer has received messages evenly distributed over all partitions. Looking forward to a test for that scenario. Thanks!

Okay, so I have already identified a few of these kafka errors are caused by the client:

[2017-03-10 20:56:01,395] ERROR Closing socket for x.x.x.x:9092-x.x.x.x:46674 because of error (kafka.network.Processor) org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 8 and apiVersion: 2 Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'member_id': String length -1 cannot be negative at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.common.requests.OffsetCommitRequest.parse(OffsetCommitRequest.java:260)

However I dont think this is the main problem, as they sometimes occur and sometimes dont. Just about 3-4 of these for thousand commits.

I wrote a simple test single consumer against a single broker, and partition. I published 2.1 million messages and was able to consume them without an issue. Ran it several times.
https://github.com/SOHU-Co/kafka-node/commit/283e6a88492168ed86993b683e5fc0868a469e26

If you're interested in running it checkout the large-message-test branch.

You will need to run the test w/o timeout since it could take about 14 minutes to run.

./node_modules/.bin/mocha --no-timeouts test/test.consumerGroup.js --grep "large"

How many consumers are your group?

What I have found out though is that the client always receives the exact same offset in every message, they just differ by partition. Meaning that even if i start a fresh consumer-group, the very first message I get on all of the 30 partitions is a number like: "135988" for partition: "0" or "138391" for partition "1". And they do not change; even after consuming 500k messages a message on partition 0 will have the offset of "135988".

Also: If I use client.getEarliestOffsets() run through it and apply client.setOffset() to every partition I received (I would expect to retrieve messages again, right from the beginning of the topic) this sometimes works, but for about 25% of the time nothing happens at all, and when I query the consumer-group state using Kafka Tool, I can see that the offset of the consumer-group has not been reset.

@hyperlink for this use-case we are only running a single consumer. I will try out your branch, could it make a difference having 30 partitions?

I can confirm that large-message-test is successfully running on my workstation using a docker kafka broker @ version 0.10.0.1, although the broker tells me there is still a lag of "18,921" messages on for the test consumer group, it has received all 2.1 mio messages. I will get some sleep and take another look at this tomorrow.

@hyperlink Okay, so this Problem is still on. As mentioned above I can confirm that your test is running on my machine as well + I have added a few things to check if having 30 partitions changes this behavior - and it did not, it just made the test a lot faster ;) (as expected).

However I discovered more strange things:

  1. Resetting the offset for a consumer-group on a single topic for all partitions simply does not work 80% of the time. What we do to accomplish this is:

_We grab all partitions from a topic using offset.fetchEarliestOffsets() we then iterate through these partitions and call consumer.setOffset(topic, partition, 0) for each, after that is done we call consumer.commit()_

I would expect to see my consumer-group lags to be at max/end value now, however they stick with a lag of 0 and do not change at all. Sometimes it works, sometimes it does not. You can check the function that resets our offsets here.

  1. Looking at the code in highLevelConsumer.js I can see that whenever a fetch is made "updateOffsets()" is called here; On the first look, this seems like something that could mess with the offsets - remembering that when we run the test-code on our production brokers our consumer-groups have an instant offset of 80%.

Our latest guess is that the offset trouble is caused by keyed partitioners.

@krystianity updating the offsets in the consumer changes the internal offset state of the consumer (which offsets to fetch next). The offset is automatically advanced after every fetch so changing this manually can lead to consuming duplicate messages. This could explain the high lag.

I think to do this correctly you will need to keep track of offsets outside of the consumer and call sendOffsetCommitRequest instead.

I dont think I really understood your answer yet, how would you go about making a consumer start consuming from the earliest offset, regardless of his current offset/lag state?

Another thing we have detected while investigating our stopping consumers is that they always stop at the same amount of consumed messages:

{ "total": 561539, "partitions": { "0": 24243, "1": 24526, "2": 7164, "3": 7364, "4": 24884, "5": 24295, "6": 24158, "7": 23835, "8": 24632, "9": 24202, "10": 7101, "11": 24881, "12": 7209, "13": 25238, "14": 7188, "15": 24106, "16": 7214, "17": 24248, "18": 7353, "19": 24302, "20": 23924, "21": 24434, "22": 24870, "23": 24604, "24": 24528, "25": 7202, "26": 23994, "27": 25452, "28": 7254, "29": 7134 } }
And it does not matter if we use the Consumer, HighLevelConsumer or ConsumerGroups - we pick a fresh GroupID and start to consume at the earliest offset and every attempt is stuck after 561539 messages. (Using different fetchMaxBytes settings does not matter).

(autoCommit or our own custom commits are not active in these tests, we are not committing or setting offsets at all currently.)

ok, I think there's several different scenarios being discussed and it's a bit confusing.

My last message was regarding manually committing instead of using autoCommit:true. In this case don't call .setOffset this could lead to issues.

The other scenario is to always consume from the earliest available. Then using fromOffset: 'earliest' with a new groupId seems reasonable.

Then there's the issue with stalled consumer with 30 partitions (and autoCommit off) is something that can be verified with a test (though with the current docker setup we can only have one broker).

@krystianity here's the updated test I wrote earlier to consume from 30 partitions w/ autoCommit off. I ran it several times without issues.

I ran the test several times in a row (test need to be adjusted) without resetting the kafka container was able to consume up to 6.3 million messages in 527 seconds.

Setup topic

Kafka bin tools

./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 30 --topic ThirtyPartitions

Test code

./test/large-cs-test.js

'use strict';

const host = process.env['KAFKA_TEST_HOST'] || '';
const ConsumerGroup = require('../lib/consumerGroup');
const _ = require('lodash');
const uuid = require('uuid');
const async = require('async');
const Client = require('../lib/client');
const Producer = require('../lib/highLevelProducer');
const assert = require('assert');

let topic, client, consumerGroup, consumed, interval;
const numberOfMessages = 2100000;
const chunkSize = 30;

function sendUUIDMessages (times, topic, done) {
  console.log(`Trying to send ${times} messages`);
  const producer = new Producer(client, { requireAcks: 1 });
  assert(producer.ready, 'Producer is not ready');
  console.log('producer ready');
  const allMessages = _.times(times, uuid.v4);
  const chunked = _.chunk(allMessages, times / chunkSize);
  console.log(`${allMessages.length} messages generated. Sending in ${chunked.length} chunks of ${times / chunkSize}`);
  let count = 0;
  async.eachSeries(chunked, function (messages, callback) {
    console.log(`sending ${++count}`);
    producer.send([{topic: topic, messages: messages}], callback);
  }, done);
  return producer;
}

function init (done) {
  topic = 'ThirtyPartitions'; // uuid.v4();
  async.series([
    function (callback) {
      client = new Client(host, uuid.v4());
      client.once('connect', callback);
    },
    function (callback) {
      client.refreshMetadata([topic], callback);
    },
    function (callback) {
      sendUUIDMessages(numberOfMessages, topic, function (error) {
        if (error) {
          return callback(error);
        }
        assert.equal(client.topicPartitions[topic].length, chunkSize);
        callback();
      });
    },
    function (callback) {
      client.close(callback);
    }
  ], function (error) {
    if (error) {
      return done(error);
    }
    console.log('All messages sent');
    done();
  });
}

function bench (done) {
  const groupId = uuid.v4();
  console.log(`starting to consume using groupId: ${groupId}`);
  const time = process.hrtime();
  consumerGroup = new ConsumerGroup({
    fetchMaxBytes: 1024 * 100,
    groupId: groupId,
    host: host,
    sessionTimeout: 8000,
    heartbeatInterval: 7000,
    retryMinTimeout: 250,
    autoCommit: false,
    fromOffset: 'earliest'
  }, topic);

  const stats = {}

  consumed = 0;
  consumerGroup.once('error', done);
  consumerGroup.on('message', function (message) {
    stats[message.partition] = message.offset;

    if (++consumed === numberOfMessages) {
      console.log(`consumed ${consumed}`);
      const [seconds, nanoseconds] = process.hrtime(time);
      console.log(`took ${seconds}s ${nanoseconds / 1e6}ms`);
      console.log(stats);
      clearInterval(interval);
      done();
    }
  });

  consumerGroup.once('connect', function () {
    interval = setInterval(function () {
      console.log(`consumed ${consumed}`);
    }, 1000);
  });
}

async.series([init, bench], function (error) {
  if (error) {
    console.error('failed with', error);
  }
  client.close();
  consumerGroup.close(function() {})
});

@hyperlink thanks again for your support and attempts to ensure the trust in this lib, we were able to identify the reason for the problems. And I am happy to sum them up, for anyone else who might be stuck in a similar situation in the future.

  1. using 1 or 30 partitions does not matter, having a single instance or a cluster does not matter either, we ran the large-message-test against all kinds of setups.

  2. combining .setOffset with manual consumer.commits() is a bad idea

  3. expecting .setOffset(topic, partition, 0) to work on a running consumer, even paused, can be problematic and is probably a bad behavior for any kind of kafka consumer

  4. when "experiencing missing messages" and "committed high offsets" make sure to understand the topic you are consuming - we have a large amount of topics here, with different owners. Keyed partitions might be very well subject to log compaction in large fashion depending on the data and frequency of publishes the topic goes through.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

chetandev picture chetandev  路  5Comments

Sonivaibhav26 picture Sonivaibhav26  路  5Comments

harshitgupta30 picture harshitgupta30  路  4Comments

ashishnetworks picture ashishnetworks  路  4Comments

kobuti picture kobuti  路  4Comments