Hi Everyone! Great work here with kafka-node!
I have successfully been able to consume messages but cannot see the timestamp of the message? As of version 0.10.2.0, Kafka should have a timestamp on every record in the partition but I don't know how to query it.
Here is my simple code example:
~~~javascript
// configuration stuff
const TOPIC_NAME = 'test2';
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client("localhost:2181"), // use your kafka ip
offset = new kafka.Offset(client);
// fetch the offset from the beginning
offset.fetch([{ topic: TOPIC_NAME, partition: 0, time: -1, maxNum: 1 }], function (err, data) {
consumer = new Consumer(
client,
[
{ topic: TOPIC_NAME, partition: 0, offset:data[TOPIC_NAME]['0'][0] }
],
{
groupId: 'pacs_consumer',
autoCommit: false,
fromOffset: 'latest' // makes me consume only the latest records in the offset
}
);
// consumer.setOffset(TOPIC_NAME, 0, 223179);
consumer.on('message', function (message) {
if (message && message.topic == TOPIC_NAME) {
// if(message.offset < 100) {
console.log(message);
// }
}
});
consumer.on("error", function(err) {
console.log(err);
});
consumer.on('offsetOutOfRange', function (err) {
console.log(err);
});
});
~~~
And here is the output I get
ubuntu@ip-10-0-0-178:~/kafka2mysql$ node kafka_tester.js
{ topic: 'test',
value: '{"type":"gauge","value":0}',
offset: 131,
partition: 0,
highWaterOffset: 133,
key: 'local-1500469041571.driver.BlockManager.disk.diskSpaceUsed_MB' }
{ topic: 'test',
value: '{"type":"gauge","value":366}',
offset: 132,
partition: 0,
highWaterOffset: 133,
key: 'local-1500469041571.driver.BlockManager.memory.maxMem_MB' }
{ topic: 'test',
value: '{"type":"gauge","value":0}',
offset: 133,
partition: 0,
highWaterOffset: 158,
key: 'local-1500469041571.driver.BlockManager.memory.memUsed_MB' }
{ topic: 'test',
value: '{"type":"gauge","value":366}',
offset: 134,
partition: 0,
highWaterOffset: 158,
key: 'local-1500469041571.driver.BlockManager.memory.remainingMem_MB' }
^C
I am producing metric data from spark (with a scala producer) and am trying to read the metrics from kafka and eventually write them to a MongoDB and/or MySQL database.
Thanks again!
Thank you for your question. There's a PR out there to add timestamp support (#693) but before we can merge that in we need to add support for versioned API into kafka-node. Having this will help maintain compatibility with older versions of kafka.
Should be available in 2.2.0
any idea if it is fixed. It seems the issue is still there with a KSQL client
has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
Hi @hyperlink, I'm using the version 4.1.0, is there a way to obtain the kafka timestamp at consuming messages?
Thank you
if the timestamp is there it would be an attribute called timestamp on the message object.
if the timestamp is there it would be an attribute called
timestampon the message object.
I was able to get a timestamp in my consumed messages only after modifying sendFetchRequest in the the baseClient.js source code to use v2 encode and v1 decode.
I changed the following:
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
// TODO: state validator for HLC for ignoring stale fetch requests
var decoder = protocol.decodeFetchResponse(this._createMessageHandler(consumer), maxTickMessages);
...
};
to this:
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var encoder = protocol.encodeFetchRequestV2(fetchMaxWaitMs, fetchMinBytes);
// TODO: state validator for HLC for ignoring stale fetch requests
var decoder = protocol.decodeFetchResponseV1(this._createMessageHandler(consumer), maxTickMessages);
...
};
It seems like the fetch v1 and v2 functions have existed since version 2.6.0 (https://github.com/SOHU-Co/kafka-node/pull/871), but they aren't invoked anywhere in the source code. Are there any plans to make this configurable?
Most helpful comment
I was able to get a timestamp in my consumed messages only after modifying sendFetchRequest in the the baseClient.js source code to use v2 encode and v1 decode.
I changed the following:
to this:
It seems like the fetch v1 and v2 functions have existed since version 2.6.0 (https://github.com/SOHU-Co/kafka-node/pull/871), but they aren't invoked anywhere in the source code. Are there any plans to make this configurable?