I am attempting to get a list of all topics using the consumer before subscribing to them. Looking at the example, I tried this:
const consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': 'localhost:9092',
'group.id': 'rdkafka-106',
'enable.auto.commit': false,
'offset.store.method': 'broker'
}, {
'auto.offset.reset': 'beginning'
});
consumer.on('ready', (arg) => {
console.log(`consumer ready.${JSON.stringify(arg)}`);
console.log(consumer.getMetadata());
});
But the output is:
consumer ready.{"name":"rdkafka#consumer-1"}
undefined
If I set 'debug' to 'all' I do see events with the topics, but I am looking to get a list of all topics in one call. What am I doing wrong?
getMetadata is an asynchronous method. It doesn't return the metadata, it fetches it.
When you run the connect method to connect to the broker, it will pass you back the metadata in that callback, as a connect implicitly gets metadata.
consumer.connect(null, (err, metadata) => {
// Now you have the metadata
});
Otherwise, using your code:
consumer.on('ready', (info) => {
consumer.getMetadata(null,(err, metadata) => {
// Now you have the metadata
});
});
You can also access the metadata on the _metadata property, which is meant to be used internally, but still accessible.
consumer.on('ready', (arg) => {
console.log(`consumer ready.${JSON.stringify(arg)}`);
console.log(consumer._metadata);
});
@webmakersteve Thank you!!
@webmakersteve Another related question:
If I need to poll for new topics at a set interval and keep my consumer connected, do I need a new consumer/producer to do so? Calling consumer.getMetadata does not seem to return newly added topics.
I'll update that example, as it is outdated. There used to be getMetadata and fetchMetadata, but we removed those I think very early into the release.
When you call consumer.getMetadata it should get the most recent metadata in the cluster. However, I think I found a bug in the code for fetching metadata - I'll investigate it for the next release which should be coming in the next week!
This fix has been added into the library!
Most helpful comment
getMetadatais an asynchronous method. It doesn't return the metadata, it fetches it.When you run the
connectmethod to connect to the broker, it will pass you back the metadata in that callback, as a connect implicitly gets metadata.Otherwise, using your code:
You can also access the metadata on the
_metadataproperty, which is meant to be used internally, but still accessible.