Kafka-node: ConsumerGroup.addTopics TypeError

Created on 16 Mar 2017  路  13Comments  路  Source: SOHU-Co/kafka-node

Stack

node_modules/kafka-node/lib/protocol/protocol.js:491
payloads[topic].forEach(function (p) {
^
TypeError: payloads[topic].forEach is not a function

More info

kafka-node 1.5.0
kafka 0.10.2.0

PRs Welcome bug consumer group

Most helpful comment

Should we close this issue?

All 13 comments

Could you provide some sample code to reproduce this issue? Thanks.

var Kafka = require('kafka-node')
var Q = require('q')
var uuid = require('uuid')

var topic = 'test-' + uuid()

Q(new Kafka.HighLevelProducer(new Kafka.Client()))
  .then(producer => {
    var defer = Q.defer()
    producer.on('ready', () => defer.resolve(producer))
    return defer.promise
  })
  .then(producer => {
    console.log('creating topic: %s and %s', 'test', topic)
    return Q()
      .then(() => Q.ninvoke(producer, 'createTopics', 'test'))
      .catch(err => console.log('error: %j', err))
      .then(() => Q.ninvoke(producer, 'createTopics', topic))
      .catch(err => console.log('error: %j', err))
      .then(() => producer)
  })
  .then(producer => Q.ninvoke(producer, 'send', [{
    topic: topic,
    messages: [ new Date().toISOString() ]
  }]))
  .then(() => new Kafka.ConsumerGroup({host: 'localhost:2181', groupId: 'test'}, ['test']))
  .tap(consumer => {
    consumer.on('message', msg => console.log('message: %j', msg))
    consumer.on('error', err => console.log('error: %j', err))
  })
  .then(consumer => {
    var defer = Q.defer()
    consumer.addTopics([topic], (err, added) => {
      if (err) return defer.reject(err)
      defer.resolve(added)
    })
    return defer.promise
  })
  .done(added => console.log('added topic: %j: ', added))

p.s. if I call the addTopics with "topic" string instead of [topic] array like:

    consumer.addTopics(topic, (err, added) => {
      if (err) return defer.reject(err)
      defer.resolve(added)
    })

The call would success but returns error:

TopicsNotExistError: The topic(s) t,e,s,t,-,6,c,3,0,c,c,b,a,-,8,1,5,d,-,4,7,8,8,-,a,9,b,0,-,7,2,f,3,b,2,8,4,1,d,a,0 do not exist

So the topic string here seems be treated like an array of single characters.

Experiencing same issue...

ConsumerGroup.removeTopics also returns TypeError:

this.payloads = this.payloads.filter(function (p) {

TypeError: Cannot read property 'filter' of undefined
    at ConsumerGroup.HighLevelConsumer.updateOffsets.HighLevelConsumer.removeTopics (highLevelConsumer.js:637:32)
    at null._onTimeout (/Users/michal/dev/source/Kafka-Node-POC/redshift/consumer-group-remove-topic.js:21:18)
    at Timer.listOnTimeout (timers.js:92:15)

payloads seems to be undefined for the HighLevelConsumer.

I will try to get to this after the new kafka client is done.

@hyperlink I am also getting the same error for ConsumerGroup.fetchOffset which calls this.client.sendOffsetFetchV1Request.

Will using offset.fetch give me a reliable response? I need to fetch the first offset which was x time ago using the 'time' property in the payload. I noticed that it is returning a response, but only when sending 1 request in the array and not multiple.

BTW I need the ConsumerGroup to only start consuming from this offset (which is not committed yet). This is an example of needing a ConsumerGroup to be able to start consuming from a not committed offset, and right now I don't have a way of doing this since ConsumerGroup.sendOffsetCommitRequest will not return before the ConsumerGroup starts to consume messages...

Meet the same exception nodejs.TypeError: payloads[topic].forEach is not a function.

Any way around this?

It seems that the encodeOffsetFetchV1Request method is being used under two use cases.

  • When payloads is an object containing topics as keys and a list of partitions as values.
  • When payloads is an array contaning objects with topics data, per partition.

The method doesn't support the second structure, though it still gets it when using the addTopics method of a ConsumerGroup.

@iMoses addTopics is currently not supported for the ConsumerGroup. The implementation should go something like this. PRs are welcome!

I have created a pull request for addTopics.

Should we close this issue?

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ghinks picture ghinks  路  6Comments

nithjino picture nithjino  路  3Comments

harshitgupta30 picture harshitgupta30  路  4Comments

kameshwari-suresh picture kameshwari-suresh  路  3Comments

sergeyjsg picture sergeyjsg  路  4Comments