Kafka-node: Wrong binary encoding on Consumer

Created on 27 Feb 2019  Â·  13Comments  Â·  Source: SOHU-Co/kafka-node

Bug Report

I'm creating the minimal example of a producer and consumer for node.
The producer works great, and our kafka monitoring tools seem to receive messages perfectly.
Even the kafka-console-consumer receives messages in the right way.

> kafka-console-consumer --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --topic t1 --offset 0 --partition 0
foobar
foobar
foobar

The problem looks on the node kafka consumer, where I'm receiving the following message.value:

'\u0004"M\u0018`@\u001a\u001b\u0000\u0000\u0000\u0016\u0000\u0001\u0000Q\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'

Environment

  • Node version: 10.15.0
  • Kafka-node version: 4.0.2
  • Kafka version: 1.1.1

For specific cases also provide

  • Number of Brokers: 2
  • Number partitions for topic: 1

Include Sample Code to reproduce behavior

// producer
const { load } = require('dotenv')

load()

const { KafkaClient, Producer, KeyedMessage } = require('kafka-node')

const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const client = new KafkaClient({ kafkaHost })
const producer = new Producer(client)
const messages = [ new KeyedMessage('key', 'foobar') ]

producer.on('ready', () => {
  setInterval(() => {
    producer.send([{ topic, messages }], (error, data) => {
      if (error) {
        console.error(data)
      } else {
        console.log(data)
      }
    })
  }, 5000)
})

producer.on('error', (error) => {
  console.error(error)
})
// consumer
const { load } = require('dotenv')

load()

const { KafkaClient, Consumer } = require('kafka-node')

const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const offset = process.env.KAFKA_OFFSET || 0
const client = new KafkaClient({ kafkaHost })
const consumer = new Consumer(
  client,
  [{ topic, offset }],
  { autoCommit: false, fromOffset: true }
)

consumer.on('message', (message) => {
  console.log(message.offset, message.value)
})

Include output with Debug turned on

Producer output:

  kafka-node:KafkaClient Connect attempt 1 +0ms
  kafka-node:KafkaClient Trying to connect to host: kafka1.playground.mainstream.on-prem.kaas.adsint.biz port: 9092 +3ms
  kafka-node:KafkaClient createBroker kafka1.playground.mainstream.on-prem.kaas.adsint.biz 9092 +1ms
  kafka-node:KafkaClient Sending versions request to kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +78ms
  kafka-node:KafkaClient broker socket connected {"host":"kafka1.playground.mainstream.on-prem.kaas.adsint.biz","port":"9092"} +4ms
  kafka-node:KafkaClient connected to socket, trying to load initial metadata +1ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
  kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
  kafka-node:KafkaClient Received versions response from kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +257ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":1,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +1ms
  kafka-node:KafkaClient broker is now ready +0ms
  kafka-node:KafkaClient updating metadatas +143ms
  kafka-node:KafkaClient compressing messages if needed +5s
{ t1: { '0': 0 } }
  kafka-node:KafkaClient compressing messages if needed +5s
{ t1: { '0': 1 } }
  kafka-node:KafkaClient compressing messages if needed +5s
{ t1: { '0': 2 } }

Consumer output:

  kafka-node:KafkaClient Connect attempt 1 +0ms
  kafka-node:KafkaClient Trying to connect to host: kafka1.playground.mainstream.on-prem.kaas.a
dsint.biz port: 9092 +2ms
  kafka-node:KafkaClient createBroker kafka1.playground.mainstream.on-prem.kaas.adsint.biz 9092
 +1ms
  kafka-node:KafkaClient Sending versions request to kafka1.playground.mainstream.on-prem.kaas.
adsint.biz:9092 +76ms
  kafka-node:KafkaClient broker socket connected {"host":"kafka1.playground.mainstream.on-prem.kaas.adsint.biz","port":"9092"} +4ms
  kafka-node:KafkaClient connected to socket, trying to load initial metadata +1ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +0ms
  kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
  kafka-node:KafkaClient Received versions response from kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +442ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":1,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
  kafka-node:KafkaClient broker is now ready +0ms
  kafka-node:KafkaClient updating metadatas +139ms
  kafka-node:Consumer consumer ready +0ms
  kafka-node:KafkaClient updating metadatas +138ms
  kafka-node:KafkaClient createBroker kafka1.playground.mainstream.on-prem.kaas.adsint.biz 9092 +2ms
  kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +1ms
  kafka-node:KafkaClient Sending versions request to kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +143ms
  kafka-node:KafkaClient Received versions response from kafka1.playground.mainstream.on-prem.kaas.adsint.biz:9092 +250ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":1,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
  kafka-node:KafkaClient broker is now ready +0ms
0 '\u0004"M\u0018`@\u001a\u001b\u0000\u0000\u0000\u0016\u0000\u0001\u0000Q\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
1 '\u0004"M\u0018`@\u001a\u001f\u0000\u0000\u0000\u0012\u0000\u0001\u0000�\u0001\u0000\u0000\u0000\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
2 '\u0004"M\u0018`@\u001a\u001f\u0000\u0000\u0000\u0012\u0000\u0001\u0000�\u0002\u0000\u0000\u0000\u00176)u+\u000f\u0000�\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'

Any ideas what could be the problem? Am I doing something wrong?

Most helpful comment

Hi @prakashkumars, our investigations lead towards the current builds of confluent kafka in our envs. Our team is currently researching what's so different in their kafka build than confluent's default kafka build. Hope to hear some news from them soon!

All 13 comments

if i set the encoding to binary, i got the same kand of message like you .

if you try to set the encoding to utf8, or transcoding the data from buffer to string, can you get the correct message?

const topic = 'test'
const options = {
  kafkaHost: 'localhost:9092',
  groupId: '*****t',
  fromOffset: 'earliest',
  encoding: 'binary',
  autoCommit: false
}
const consumerGroup = new ConsumerGroupStream(options, topic)
// or trancoding buffer to string
const buf = Buffer.from('runoob', 'utf8').toString();

Hi @yanqi321, I tried changing the consumer with the following:

const { load } = require('dotenv')

load()

const { KafkaClient, ConsumerGroupStream } = require('kafka-node')

const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const consumer = new ConsumerGroupStream({
  kafkaHost,
  fromOffset: 'earliest',
  encoding: 'binary',
  autoCommit: false
}, topic)

consumer.on('data', (message) => {
  console.log(message.offset, message.value)
})

Using either encoding: 'binary' or encoding: 'utf8' renders the same problem.

message.value with binary:

\u0004"M\u0018`@&\u0000\u0000\u0000\u0016\u0000\u0001\u0000ð\u0011\u001f{Îö¡\u0001\u0000\u0000\u0000\u0001iB¿­b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000

message.value with utf8:

\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000

Transcoding does not seem to fix the issue either.

I try your code in my computer, it is work fine. try set the encoding to 'buffer', and transcoding it to string, i hope that can help you(I don't expect much of that);
Sometime i think the npm module having same thing wrong while transcoding the message;

@yanqi321 just tried switching the encoding to buffer with same result:

const { load } = require('dotenv')

load()

const { ConsumerGroupStream } = require('kafka-node')

const kafkaHost = `${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`
const topic = process.env.KAFKA_TOPIC
const consumer = new ConsumerGroupStream({
  kafkaHost,
  fromOffset: 'earliest',
  encoding: 'buffer',
  autoCommit: false
}, topic)

consumer.on('data', (message) => {
  console.log(message.offset, message.value.toString('utf8'))
})

Output:

...
490 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
491 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
492 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
493 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
494 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001f{���\u0001\u0000\u0000\u0000\u0001iB��b\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
495 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001fw�*\u0001\u0000\u0000\u0000\u0001iB�F\f\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
496 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001fw�*\u0001\u0000\u0000\u0000\u0001iB�F\f\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'
497 '\u0004"M\u0018`@�&\u0000\u0000\u0000\u0016\u0000\u0001\u0000�\u0011\u001fw�*\u0001\u0000\u0000\u0000\u0001iB�F\f\u0000\u0000\u0000\u0003key\u0000\u0000\u0000\u0006foobar\u0000\u0000\u0000\u0000'

Just tried with a Python (3.6) consumer, and it seems to work fine:

from kafka import KafkaConsumer
from dotenv import load_dotenv, find_dotenv
from os import getenv

load_dotenv(find_dotenv())

topic = getenv('KAFKA_TOPIC')
host = getenv('KAFKA_HOST')
port = getenv('KAFKA_PORT')

consumer = KafkaConsumer(
  topic,
  bootstrap_servers=[host + ':' + port],
  auto_offset_reset='earliest',
  enable_auto_commit=False
)

for message in consumer:
  print("%d %s" % (message.offset, message.value.decode('utf-8')))
# requirements.txt
kafka
python-dotenv
lz4

Output:

...
490 foobar
491 foobar
492 foobar
493 foobar
494 foobar
495 foobar
496 foobar
497 foobar

So, could we confirm this is an issue decoding the message on node consumer?

Quick update:

I was testing how to setup a local kafka + zookeeper and got to test this again without our kafka playground instance. Apparently it "just works".

We are currently investigating why might this be caused on that specific playground instance with this specific library (as other consumers seem to be working fine).

Will you able to find anything related to this.

Hi @prakashkumars, our investigations lead towards the current builds of confluent kafka in our envs. Our team is currently researching what's so different in their kafka build than confluent's default kafka build. Hope to hear some news from them soon!

@RecuencoJones Was this problem solved for you ? I was getting the same garbled characters while using consumer.Didn't see any issue with Scala/Java kafka consumers.
@hyperlink - Could you help here ?

@anuragrk @RecuencoJones I am facing the same problem, is this issue is fixed? or any alternative fixes available.

@RecuencoJones we are facing same issue. Any progress on this?

Hey @vladaman @neelesh1206 @anuragrk, sorry for the late response, I've been out of Kafka topics for a long while.

We couldn't find a fix for this with kafka-node and eventually switched to kafkajs with LZ4 codec and didn't get any more problems.

I am experiencing the same problem. I found the issue to be that my Kafka Broker is using lz4 compression and kafka-node does not support lz4 compression. Update: switched to KafkaJS and everything works.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

quorak picture quorak  Â·  5Comments

kobuti picture kobuti  Â·  4Comments

mmiller42 picture mmiller42  Â·  6Comments

sergeyjsg picture sergeyjsg  Â·  4Comments

Sonivaibhav26 picture Sonivaibhav26  Â·  5Comments