node_modules/kafka-node/lib/protocol/protocol.js:491
payloads[topic].forEach(function (p) {
^
TypeError: payloads[topic].forEach is not a function
kafka-node 1.5.0
kafka 0.10.2.0
Could you provide some sample code to reproduce this issue? Thanks.
var Kafka = require('kafka-node')
var Q = require('q')
var uuid = require('uuid')
var topic = 'test-' + uuid()
Q(new Kafka.HighLevelProducer(new Kafka.Client()))
.then(producer => {
var defer = Q.defer()
producer.on('ready', () => defer.resolve(producer))
return defer.promise
})
.then(producer => {
console.log('creating topic: %s and %s', 'test', topic)
return Q()
.then(() => Q.ninvoke(producer, 'createTopics', 'test'))
.catch(err => console.log('error: %j', err))
.then(() => Q.ninvoke(producer, 'createTopics', topic))
.catch(err => console.log('error: %j', err))
.then(() => producer)
})
.then(producer => Q.ninvoke(producer, 'send', [{
topic: topic,
messages: [ new Date().toISOString() ]
}]))
.then(() => new Kafka.ConsumerGroup({host: 'localhost:2181', groupId: 'test'}, ['test']))
.tap(consumer => {
consumer.on('message', msg => console.log('message: %j', msg))
consumer.on('error', err => console.log('error: %j', err))
})
.then(consumer => {
var defer = Q.defer()
consumer.addTopics([topic], (err, added) => {
if (err) return defer.reject(err)
defer.resolve(added)
})
return defer.promise
})
.done(added => console.log('added topic: %j: ', added))
p.s. if I call the addTopics with "topic" string instead of [topic] array like:
consumer.addTopics(topic, (err, added) => {
if (err) return defer.reject(err)
defer.resolve(added)
})
The call would success but returns error:
TopicsNotExistError: The topic(s) t,e,s,t,-,6,c,3,0,c,c,b,a,-,8,1,5,d,-,4,7,8,8,-,a,9,b,0,-,7,2,f,3,b,2,8,4,1,d,a,0 do not exist
So the topic string here seems be treated like an array of single characters.
Experiencing same issue...
ConsumerGroup.removeTopics also returns TypeError:
this.payloads = this.payloads.filter(function (p) {
TypeError: Cannot read property 'filter' of undefined
at ConsumerGroup.HighLevelConsumer.updateOffsets.HighLevelConsumer.removeTopics (highLevelConsumer.js:637:32)
at null._onTimeout (/Users/michal/dev/source/Kafka-Node-POC/redshift/consumer-group-remove-topic.js:21:18)
at Timer.listOnTimeout (timers.js:92:15)
payloads seems to be undefined for the HighLevelConsumer.
I will try to get to this after the new kafka client is done.
@hyperlink I am also getting the same error for ConsumerGroup.fetchOffset which calls this.client.sendOffsetFetchV1Request.
Will using offset.fetch give me a reliable response? I need to fetch the first offset which was x time ago using the 'time' property in the payload. I noticed that it is returning a response, but only when sending 1 request in the array and not multiple.
BTW I need the ConsumerGroup to only start consuming from this offset (which is not committed yet). This is an example of needing a ConsumerGroup to be able to start consuming from a not committed offset, and right now I don't have a way of doing this since ConsumerGroup.sendOffsetCommitRequest will not return before the ConsumerGroup starts to consume messages...
Meet the same exception nodejs.TypeError: payloads[topic].forEach is not a function.
Any way around this?
It seems that the encodeOffsetFetchV1Request method is being used under two use cases.
The method doesn't support the second structure, though it still gets it when using the addTopics method of a ConsumerGroup.
@iMoses addTopics is currently not supported for the ConsumerGroup. The implementation should go something like this. PRs are welcome!
I have created a pull request for addTopics.
Should we close this issue?
Most helpful comment
Should we close this issue?