Node-rdkafka: Rebalance callback not called unless consuming

Created on 12 Apr 2018  路  3Comments  路  Source: Blizzard/node-rdkafka

Passing a rebalance_cb to KafkaConsumer allows you to hook into and / or define the rebalancing behaviour. However, it's never triggered unless consumer.consume is called. For example, this will never consume any messages:

var consumer = new Kafka.KafkaConsumer({
  'group.id': 'kafka-testing',
  'metadata.broker.list': 'localhost:9092',
  'rebalance_cb': function(err, assignment) {
    debug('rebalancing')
    if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
      this.assign(assignment)

      setInterval(() => {
        consumer.consume(1)
      }, 45 * 1000)
    } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
      // Same as above
      this.unassign()
    } else {
      // We had a real error
      console.error(err)
    }
  }
})
consumer.connect()

consumer
  .on('ready', function() {
    consumer.subscribe(['example-topic'])
  })
  .on('data', function(data) {
    console.log('Message found!  Contents below.')
    console.log(data.value.toString())

    consumer.commitMessage(data)
  })
  .on('event.log', (data) => {
    console.log(data.message)
  })

When looking at the cgrp log, you can see that the consumer never gets beyond the state of wait-assign-rebalance_cb:

[thrd:main]: Group "kafka-testing": delegating assign of 4 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
[thrd:main]: Group "kafka-testing" changed join state wait-sync -> wait-assign-rebalance_cb (v1, state up)

Usecase

Joining the ConsumerGroup and waiting to be assigned work before doing any other work, like in stream processing.

Expected behaviour

To only have to call consumer.consume while trying to consume a message, with rebalance_cb triggered irrespective of that.

Workaround

Polling consumer.consume manually until we've got assignments from the rebalancing. Additionally, you can use consumer.pause and consumer.resume to make sure no actual messages are consumed.

Approach to fix

I'm not that familiar C/C++ and how they bind to Node.js, so the best I could do so far was going through librdkafkadocumentation. In it, you can find how calling KafkaConsumer::consume automatically triggers RebalanceCb, EventCb and OffsetCommitCb. However, the event_cb does seem to be triggered even though no consume calls are being made, as those would have giving me the debug log above? Perhaps there's some other way to trigger the rebalance callback, rather than consuming?

stale

Most helpful comment

Thanks for all that info!

I was aware of the heartbeats and how they're used to detect changes in the ConsumerGroup, as well as the mechanism for the group coordinator to monitor consumer liveliness, but I thought that by adoption of this KIP the sending of the heartbeat was moved to a background thread. However, they also cite that of course, you still need some mechanism of liveliness of the consumer, as processing might have gotten stuck on one thread while the other is sending hearbeats.

However, in the code I posted above, the hearbeats _do_ start sending before consumption, as evidenced by the logs. Also, the logs indicate that the rebalance event has been received through heartbeat, but responding to it is deferred to the custom rebalance callback.

[thrd:main]: SyncGroup response: Success (25 bytes of MemberState data)
[thrd:main]: Group "kafka-testing": delegating assign of 1 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
[thrd:main]: Group "kafka-testing" changed join state wait-sync -> wait-assign-rebalance_cb (v1, state up)
[thrd:main]: roodpatoot-3:9092/0: Heartbeat for group "kafka-testing" generation id 7
[thrd:main]: roodpatoot-3:9092/0: Heartbeat for group "kafka-testing" generation id 7

So is the idea, that despite it being known that a rebalance is happening, calling consume (or poll on the Java KafkaConsumer) is a way to make sure the thread that does the processing is responsive and can handle the assignment?

I guess that's why consumer.pause and consumer.resume exist in the first place, to allow a consumer to communicate that it's available to process more messages, but is doing something else at the moment. From the official KafkaConsumer docs:

Consumption Flow Control

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider fetching other topics.

Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls.

The way that it doesn't stroke with me, is that in event-loop world of Node.js land events just happen as far as the user is concerned, not because you've triggered a queued event by calling another method. I believe to a Node user my original posted example seems like it should work, not that you'd have to be polling consumer.consume. At the very least we should document this, but I'm wondering if we could do more, especially for the Stream implementations.

All 3 comments

This is a limitation of Kafka in general, and will not be able to be fixed the way you expect, I fear.

The best way we can accomplish this is by probing with a consume. There is a poll method in the Handle for librdkafka but it is not to be called outside of the context of a consume. The librdkafka consumer emits all queued events when poll is called, but since that is encapsulated in the consume call, it becomes a side effect that you must be consuming to have information like this.

This is because the Kafka consumer needs to communicate with the broker on an interval, and it communicates through heartbeats. You are only heartbeating when you consume, and you need to heartbeat to find out the information you are asking for.

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-HeartbeatRequest

The response to a heartbeat tells us if the group is in rebalance, which then starts the rebalance callback at the end. And we do not heartbeat until we begin our consumption from Kafka because the consuming and heartbeating are supposed to be linked in some fashion (so hung consumers are evicted from the group).

Thanks for all that info!

I was aware of the heartbeats and how they're used to detect changes in the ConsumerGroup, as well as the mechanism for the group coordinator to monitor consumer liveliness, but I thought that by adoption of this KIP the sending of the heartbeat was moved to a background thread. However, they also cite that of course, you still need some mechanism of liveliness of the consumer, as processing might have gotten stuck on one thread while the other is sending hearbeats.

However, in the code I posted above, the hearbeats _do_ start sending before consumption, as evidenced by the logs. Also, the logs indicate that the rebalance event has been received through heartbeat, but responding to it is deferred to the custom rebalance callback.

[thrd:main]: SyncGroup response: Success (25 bytes of MemberState data)
[thrd:main]: Group "kafka-testing": delegating assign of 1 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
[thrd:main]: Group "kafka-testing" changed join state wait-sync -> wait-assign-rebalance_cb (v1, state up)
[thrd:main]: roodpatoot-3:9092/0: Heartbeat for group "kafka-testing" generation id 7
[thrd:main]: roodpatoot-3:9092/0: Heartbeat for group "kafka-testing" generation id 7

So is the idea, that despite it being known that a rebalance is happening, calling consume (or poll on the Java KafkaConsumer) is a way to make sure the thread that does the processing is responsive and can handle the assignment?

I guess that's why consumer.pause and consumer.resume exist in the first place, to allow a consumer to communicate that it's available to process more messages, but is doing something else at the moment. From the official KafkaConsumer docs:

Consumption Flow Control

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider fetching other topics.

Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls.

The way that it doesn't stroke with me, is that in event-loop world of Node.js land events just happen as far as the user is concerned, not because you've triggered a queued event by calling another method. I believe to a Node user my original posted example seems like it should work, not that you'd have to be polling consumer.consume. At the very least we should document this, but I'm wondering if we could do more, especially for the Stream implementations.

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

Was this page helpful?
0 / 5 - 0 ratings