Consumer
const {Client, Consumer} = require('kafka-node');
const client = new Client('localhost:2181'),
consumer = new Consumer(client, [{ topic: 'test' }]);
consumer.on('message', msg => console.log(msg));
Producer
const {Client, Producer} = require('kafka-node');
const client = new Client('localhost:2181');
const producer = new Producer(client, { requireAcks: 1 });
producer.on('ready', () => {
producer.send([{
topic: 'test',
messages: JSON.stringify({
msg: 'test',
timestamp: Date.now()
})
}], (err, data) => console.log('KAFKA-TEST: ', err, data));
});
process.on("SIGINT", () =>
producer.close(() =>
process.exit()
)
);
When I restart producer, consumer start endless emitting old messages in random order
Debug did not show anything
I have the same issue with the looped message on the consumer side. The consumer on console shows normal though.
If 'attributes' is not 0 on message from Producer then everything is ok.
But as sad in README:
attributes controls compression of the message set
So, it is strange that I should to enable message compression to fix this problem ¯_(ツ) _/¯
// hot fix
producer.send([{
topic: 'test',
attributes: 1, //or 2
messages: JSON.stringify({
msg: 'test',
timestamp: Date.now()
})
}])
thanks for sharing, seems to me this is a bug when compression is not set.
Same problem here ... and I cannot control the producer... :-( Also using the ConsumerGroup api I have the same issue.
It works only the first time I run the consumer, if I stop and start it again I start getting duplicated messages....
The problem is about kafka
After downgrading kafka to version 1.1.1 everything work fine without this fix
You are right, using Kafka 1.1.1 seems to work fine! Thanks!
Thanks for sharing @F1NYA, good observation.
I spent much more time than I'd care to admit stuck on that one...
I did encounter the same issue with a simple ConsumerGroup hooked to a Kafka 2.0.0 broker (cp-confluentinc docker image 5.0.0). The problem seemed to happen only if the consumer was started before the topic was created.
Debug output included empty string messages but also did replay all messages of the topic in a loop (going up to 30k msg/s locally).
any solution to it rather than downgrade kafka to previous version?
Enable compression helped me, but sometimes(like randomly) it's happening again on 2.0.0 and I can't reproduce it(
Thanks @F1NYA
Enable compression helps me too!
but enable compression you can only on producer? I have access only to client.js
this library has issues with kafka 2.0, not recommended to use 2.0 yet until the issues can be resolved in either kafka broker 2.1+ or client.
Has this been fixed in #1051 ? Could someone check out master and verify?
@hyperlink checked with the master and it seems that the problem was fixed! thank you!
Published as 3.0.0
Most helpful comment
The problem is about kafka
After downgrading kafka to version 1.1.1 everything work fine without this fix