I'm creating the minimal example of a producer and consumer for node.
The producer works great, and our kafka monitoring tools seem to receive messages perfectly.
Even the kafka-console-consumer receives messages in the right way.
> kafka-console-consumer --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --topic t1 --offset 0 --partition 0
foobar
foobar
foobar
The problem looks on the node kafka consumer, where I'm receiving the following message.value:
'\u0004"M\u0018`@\u001a\u001b\u0000\u0000\u0000\u0016\u0000\u0001\u0000Q\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
// producer
const { load } = require('dotenv')
load()
const { KafkaClient, Producer, KeyedMessage } = require('kafka-node')
const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const client = new KafkaClient({ kafkaHost })
const producer = new Producer(client)
const messages = [ new KeyedMessage('key', 'foobar') ]
producer.on('ready', () => {
setInterval(() => {
producer.send([{ topic, messages }], (error, data) => {
if (error) {
console.error(data)
} else {
console.log(data)
}
})
}, 5000)
})
producer.on('error', (error) => {
console.error(error)
})
// consumer
const { load } = require('dotenv')
load()
const { KafkaClient, Consumer } = require('kafka-node')
const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const offset = process.env.KAFKA_OFFSET || 0
const client = new KafkaClient({ kafkaHost })
const consumer = new Consumer(
client,
[{ topic, offset }],
{ autoCommit: false, fromOffset: true }
)
consumer.on('message', (message) => {
console.log(message.offset, message.value)
})
Producer output:
kafka-node:KafkaClient Connect attempt 1 +0ms
kafka-node:KafkaClient Trying to connect to host: kafka1.playground.mainstream.on-prem.kaas.adsint.biz port: 9092 +3ms
kafka-node:KafkaClient createBroker kafka1.playground.mainstream.on-prem.kaas.adsint.biz 9092 +1ms
kafka-node:KafkaClient Sending versions request to kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +78ms
kafka-node:KafkaClient broker socket connected {"host":"kafka1.playground.mainstream.on-prem.kaas.adsint.biz","port":"9092"} +4ms
kafka-node:KafkaClient connected to socket, trying to load initial metadata +1ms
kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
kafka-node:KafkaClient Received versions response from kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +257ms
kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":1,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +1ms
kafka-node:KafkaClient broker is now ready +0ms
kafka-node:KafkaClient updating metadatas +143ms
kafka-node:KafkaClient compressing messages if needed +5s
{ t1: { '0': 0 } }
kafka-node:KafkaClient compressing messages if needed +5s
{ t1: { '0': 1 } }
kafka-node:KafkaClient compressing messages if needed +5s
{ t1: { '0': 2 } }
Consumer output:
kafka-node:KafkaClient Connect attempt 1 +0ms
kafka-node:KafkaClient Trying to connect to host: kafka1.playground.mainstream.on-prem.kaas.a
dsint.biz port: 9092 +2ms
kafka-node:KafkaClient createBroker kafka1.playground.mainstream.on-prem.kaas.adsint.biz 9092
+1ms
kafka-node:KafkaClient Sending versions request to kafka1.playground.mainstream.on-prem.kaas.
adsint.biz:9092 +76ms
kafka-node:KafkaClient broker socket connected {"host":"kafka1.playground.mainstream.on-prem.kaas.adsint.biz","port":"9092"} +4ms
kafka-node:KafkaClient connected to socket, trying to load initial metadata +1ms
kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +0ms
kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
kafka-node:KafkaClient Received versions response from kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +442ms
kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":1,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
kafka-node:KafkaClient broker is now ready +0ms
kafka-node:KafkaClient updating metadatas +139ms
kafka-node:Consumer consumer ready +0ms
kafka-node:KafkaClient updating metadatas +138ms
kafka-node:KafkaClient createBroker kafka1.playground.mainstream.on-prem.kaas.adsint.biz 9092 +2ms
kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +1ms
kafka-node:KafkaClient Sending versions request to kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +143ms
kafka-node:KafkaClient Received versions response from kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +250ms
kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":1,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
kafka-node:KafkaClient broker is now ready +0ms
0 '\u0004"M\u0018`@\u001a\u001b\u0000\u0000\u0000\u0016\u0000\u0001\u0000Q\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
1 '\u0004"M\u0018`@\u001a\u001f\u0000\u0000\u0000\u0012\u0000\u0001\u0000�\u0001\u0000\u0000\u0000\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
2 '\u0004"M\u0018`@\u001a\u001f\u0000\u0000\u0000\u0012\u0000\u0001\u0000�\u0002\u0000\u0000\u0000\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
Any ideas what could be the problem? Am I doing something wrong?
if you try to set the encoding to utf8, or transcoding the data from buffer to string, can you get the correct message?
const topic = 'test'
const options = {
kafkaHost: 'localhost:9092',
groupId: '*****t',
fromOffset: 'earliest',
encoding: 'binary',
autoCommit: false
}
const consumerGroup = new ConsumerGroupStream(options, topic)
// or trancoding buffer to string
const buf = Buffer.from('runoob', 'utf8').toString();
Hi @yanqi321, I tried changing the consumer with the following:
const { load } = require('dotenv')
load()
const { KafkaClient, ConsumerGroupStream } = require('kafka-node')
const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const consumer = new ConsumerGroupStream({
kafkaHost,
fromOffset: 'earliest',
encoding: 'binary',
autoCommit: false
}, topic)
consumer.on('data', (message) => {
console.log(message.offset, message.value)
})
Using either encoding: 'binary' or encoding: 'utf8' renders the same problem.
message.value with binary:
\u0004"M\u0018`@&\u0000\u0000\u0000\u0016\u0000\u0001\u0000ð\u0011\u001f{Îö¡\u0001\u0000\u0000\u0000\u0001iB¿Âb\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000
message.value with utf8:
\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000
Transcoding does not seem to fix the issue either.
I try your code in my computer, it is work fine. try set the encoding to 'buffer', and transcoding it to string, i hope that can help you(I don't expect much of that);
Sometime i think the npm module having same thing wrong while transcoding the message;
@yanqi321 just tried switching the encoding to buffer with same result:
const { load } = require('dotenv')
load()
const { ConsumerGroupStream } = require('kafka-node')
const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const consumer = new ConsumerGroupStream({
kafkaHost,
fromOffset: 'earliest',
encoding: 'buffer',
autoCommit: false
}, topic)
consumer.on('data', (message) => {
console.log(message.offset, message.value.toString('utf8'))
})
Output:
...
490 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
491 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
492 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
493 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
494 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
495 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001fw�*\u0001\u0000\u0000\u0000\u0001iB�F\f\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
496 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001fw�*\u0001\u0000\u0000\u0000\u0001iB�F\f\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
497 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001fw�*\u0001\u0000\u0000\u0000\u0001iB�F\f\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
Just tried with a Python (3.6) consumer, and it seems to work fine:
from kafka import KafkaConsumer
from dotenv import load_dotenv, find_dotenv
from os import getenv
load_dotenv(find_dotenv())
topic = getenv('KAFKA_TOPIC')
host = getenv('KAFKA_HOST')
port = getenv('KAFKA_PORT')
consumer = KafkaConsumer(
topic,
bootstrap_servers=[host + ':' + port],
auto_offset_reset='earliest',
enable_auto_commit=False
)
for message in consumer:
print("%d %s" % (message.offset, message.value.decode('utf-8')))
# requirements.txt
kafka
python-dotenv
lz4
Output:
...
490 foobar
491 foobar
492 foobar
493 foobar
494 foobar
495 foobar
496 foobar
497 foobar
So, could we confirm this is an issue decoding the message on node consumer?
Quick update:
I was testing how to setup a local kafka + zookeeper and got to test this again without our kafka playground instance. Apparently it "just works".
We are currently investigating why might this be caused on that specific playground instance with this specific library (as other consumers seem to be working fine).
Will you able to find anything related to this.
Hi @prakashkumars, our investigations lead towards the current builds of confluent kafka in our envs. Our team is currently researching what's so different in their kafka build than confluent's default kafka build. Hope to hear some news from them soon!
@RecuencoJones Was this problem solved for you ? I was getting the same garbled characters while using consumer.Didn't see any issue with Scala/Java kafka consumers.
@hyperlink - Could you help here ?
@anuragrk @RecuencoJones I am facing the same problem, is this issue is fixed? or any alternative fixes available.
@RecuencoJones we are facing same issue. Any progress on this?
Hey @vladaman @neelesh1206 @anuragrk, sorry for the late response, I've been out of Kafka topics for a long while.
We couldn't find a fix for this with kafka-node and eventually switched to kafkajs with LZ4 codec and didn't get any more problems.
I am experiencing the same problem. I found the issue to be that my Kafka Broker is using lz4 compression and kafka-node does not support lz4 compression. Update: switched to KafkaJS and everything works.
Most helpful comment
Hi @prakashkumars, our investigations lead towards the current builds of confluent kafka in our envs. Our team is currently researching what's so different in their kafka build than confluent's default kafka build. Hope to hear some news from them soon!